# Licensed under a 3-clause BSD style license - see LICENSE.rstimportloggingimportnumpyasnpimportastropy.unitsasufromastropy.coordinatesimportSkyCoordfromastropy.ioimportfitsfromregionsimportPointSkyRegionimportmatplotlib.pyplotaspltfromgammapy.utils.unitsimportunit_from_fits_image_hdufrom..coordimportMapCoordfrom..geomimportpix_tuple_to_idxfrom..utilsimportINVALID_INDEXfrom.coreimportHpxMapfrom.geomimportHpxGeomfrom.ioimportHPX_FITS_CONVENTIONS,HpxConvfrom.utilsimportHpxToWcsMapping,get_pix_size_from_nside,get_superpixels__all__=["HpxNDMap"]log=logging.getLogger(__name__)
[docs]classHpxNDMap(HpxMap):"""HEALPix map with any number of non-spatial dimensions. This class uses an N+1D numpy array to represent the sequence of HEALPix image planes. Following the convention of WCS-based maps this class uses a column-wise ordering for the data array with the spatial dimension being tied to the last index of the array. Parameters ---------- geom : `~gammapy.maps.HpxGeom` HEALPix geometry object. data : `~numpy.ndarray` HEALPix data array. If None, then an empty array will be allocated. meta : `dict` Dictionary to store metadata. unit : str or `~astropy.units.Unit` The map unit. """def__init__(self,geom,data=None,dtype="float32",meta=None,unit=""):data_shape=geom.data_shapeifdataisNone:data=self._make_default_data(geom,data_shape,dtype)super().__init__(geom,data,meta,unit)@staticmethoddef_make_default_data(geom,shape_np,dtype):ifgeom.npix.size>1:data=np.full(shape_np,np.nan,dtype=dtype)idx=geom.get_idx(local=True)data[idx[::-1]]=0else:data=np.zeros(shape_np,dtype=dtype)returndata
[docs]@classmethoddeffrom_wcs_tiles(cls,wcs_tiles,nest=True):"""Create HEALPix map from WCS tiles. Parameters ---------- wcs_tiles : list of `WcsNDMap` WCS map tiles. nest : bool, optional Indexing scheme. If True, "NESTED" scheme. If False, "RING" scheme. Default is True. Returns ------- hpx_map : `HpxNDMap` HEALPix map. """importhealpyashpgeom_wcs=wcs_tiles[0].geomgeom_hpx=HpxGeom.create(binsz=geom_wcs.pixel_scales[0],frame=geom_wcs.frame,nest=nest,axes=geom_wcs.axes,)map_hpx=cls.from_geom(geom=geom_hpx,unit=wcs_tiles[0].unit)coords=map_hpx.geom.get_coord().skycoordnside_superpix=hp.npix2nside(len(wcs_tiles))hpx_ref=HpxGeom(nside=nside_superpix,nest=nest,frame=geom_wcs.frame)idx=np.arange(map_hpx.geom.to_image().npix.item())indices=get_superpixels(idx,map_hpx.geom.nside,nside_superpix,nest=nest)forwcs_tileinwcs_tiles:hpx_idx=hpx_ref.coord_to_idx(wcs_tile.geom.center_skydir)[0]hpx_idx=int(hpx_idx.item())mask=indices==hpx_idxmap_hpx.data[mask]=wcs_tile.interp_by_coord(coords[mask])returnmap_hpx
[docs]defto_wcs_tiles(self,nside_tiles=4,margin="0 deg",method="nearest",oversampling_factor=1):"""Convert HpxNDMap to a list of WCS tiles. Parameters ---------- nside_tiles : int, optional HEALPix NSIDE parameter for super pixel tiles. Default is 4. margin : Angle, optional Width margin of the WCS tile. Default is "0 deg". method : {'nearest', 'linear'} Interpolation method. Default is "nearest". oversampling_factor : int, optional Oversampling factor. Default is 1. Returns ------- wcs_tiles : list of `WcsNDMap` WCS tiles. """wcs_tiles=[]wcs_geoms=self.geom.to_wcs_tiles(nside_tiles=nside_tiles,margin=margin)forgeominwcs_geoms:ifoversampling_factor>1:geom=geom.upsample(oversampling_factor)wcs_map=self.interp_to_geom(geom=geom,method=method)wcs_tiles.append(wcs_map)returnwcs_tiles
[docs]@classmethoddeffrom_hdu(cls,hdu,hdu_bands=None,format=None,colname=None):"""Make a HpxNDMap object from a FITS HDU. Parameters ---------- hdu : `~astropy.io.fits.BinTableHDU` The FITS HDU. hdu_bands : `~astropy.io.fits.BinTableHDU`, optional The BANDS table HDU. Default is None. format : str, optional FITS convention. Default is None. If None the format is guessed. The following formats are supported: - "gadf" - "fgst-ccube" - "fgst-ltcube" - "fgst-bexpcube" - "fgst-srcmap" - "fgst-template" - "fgst-srcmap-sparse" - "galprop" - "galprop2" colname : str, optional Data column name to be used for the HEALPix map. Default is None. Returns ------- map : `HpxMap` HEALPix map. """ifformatisNone:format=HpxConv.identify_hpx_format(hdu.header)geom=HpxGeom.from_header(hdu.header,hdu_bands,format=format)hpx_conv=HPX_FITS_CONVENTIONS[format]shape=geom.axes.shape[::-1]# TODO: Should we support extracting slices?meta=cls._get_meta_from_header(hdu.header)unit=unit_from_fits_image_hdu(hdu.header)map_out=cls(geom,None,meta=meta,unit=unit)colnames=hdu.columns.namescnames=[]ifhdu.header.get("INDXSCHM",None)=="SPARSE":pix=hdu.data.field("PIX")vals=hdu.data.field("VALUE")if"CHANNEL"inhdu.data.columns.names:chan=hdu.data.field("CHANNEL")chan=np.unravel_index(chan,shape)idx=chan+(pix,)else:idx=(pix,)map_out.set_by_idx(idx[::-1],vals)else:ifcolnameisnotNone:cnames.append(colname)else:forcincolnames:ifc.find(hpx_conv.colstring)==0:cnames.append(c)nbin=len(cnames)ifnbin==1:map_out.data=hdu.data.field(cnames[0])else:foridx,cnameinenumerate(cnames):idx=np.unravel_index(idx,shape)map_out.data[idx+(slice(None),)]=hdu.data.field(cname)returnmap_out
[docs]defto_wcs(self,sum_bands=False,normalize=True,proj="AIT",oversample=2,width_pix=None,hpx2wcs=None,fill_nan=True,):fromgammapy.mapsimportWcsNDMapifsum_bandsandself.geom.nside.size>1:map_sum=self.sum_over_axes()returnmap_sum.to_wcs(sum_bands=False,normalize=normalize,proj=proj,oversample=oversample,width_pix=width_pix,)# FIXME: Check whether the old mapping is still valid and reuse itifhpx2wcsisNone:geom_wcs_image=self.geom.to_wcs_geom(proj=proj,oversample=oversample,width_pix=width_pix).to_image()hpx2wcs=HpxToWcsMapping.create(self.geom,geom_wcs_image)# FIXME: Need a function to extract a valid shape from npix propertyifsum_bands:axes=np.arange(self.data.ndim-1)hpx_data=np.apply_over_axes(np.sum,self.data,axes=axes)hpx_data=np.squeeze(hpx_data)wcs_shape=tuple([t.flat[0]fortinhpx2wcs.npix])wcs_data=np.zeros(wcs_shape).Twcs=hpx2wcs.wcs.to_image()else:hpx_data=self.datawcs_shape=tuple([t.flat[0]fortinhpx2wcs.npix])+self.geom.shape_axeswcs_data=np.zeros(wcs_shape).Twcs=hpx2wcs.wcs.to_cube(self.geom.axes)# FIXME: Should reimplement instantiating map first and fill data arrayhpx2wcs.fill_wcs_map_from_hpx_data(hpx_data,wcs_data,normalize,fill_nan)returnWcsNDMap(wcs,wcs_data,unit=self.unit)
def_pad_spatial(self,pad_width,mode="constant",cval=0):geom=self.geom._pad_spatial(pad_width=pad_width)map_out=self._init_copy(geom=geom,data=None)map_out.coadd(self)coords=geom.get_coord(flat=True)m=self.geom.contains(coords)coords=tuple([c[~m]forcincoords])ifmode=="constant":map_out.set_by_coord(coords,cval)elifmode=="interp":raiseValueError("Method 'interp' not supported for HpxMap")else:raiseValueError(f"Unrecognized pad mode: {mode!r}")returnmap_out
[docs]defupsample(self,factor,order=0,preserve_counts=True,axis_name=None):ifaxis_name:raiseNotImplementedError("HpxNDMap.upsample does currently not support upsampling of non-spatial axes.")iforder!=0:raiseValueError("HpxNDMap.upsample currently only supports nearest upsampling")geom=self.geom.upsample(factor)coords=geom.get_coord()data=self.get_by_coord(coords)ifpreserve_counts:data/=factor**2returnself._init_copy(geom=geom,data=data)
[docs]defdownsample(self,factor,preserve_counts=True,axis_name=None):ifaxis_name:raiseNotImplementedError("HpxNDMap does currently not support upsampling of non-spatial axes.")geom=self.geom.downsample(factor)coords=self.geom.get_coord()vals=self.get_by_coord(coords)map_out=self._init_copy(geom=geom,data=None)map_out.fill_by_coord(coords,vals)ifnotpreserve_counts:map_out.data/=factor**2returnmap_out
[docs]defto_nside(self,nside,preserve_counts=True):"""Upsample or downsample the map to a given nside. Parameters ---------- nside : int HEALPix NSIDE parameter. preserve_counts : bool, optional Preserve the integral over each bin. This should be true if the map is an integral quantity (e.g. counts) and false if the map is a differential quantity (e.g. intensity). Default is True. Returns ------- geom : `~HpxNDMap` HEALPix map with new NSIDE. """iflen(self.geom.nside)>1:raiseNotImplementedError("to_nside() is not supported for an irregular map.")factor=nside/self.geom.nside.item()iffactor>1:returnself.upsample(factor=int(factor),preserve_counts=preserve_counts)eliffactor<1:returnself.downsample(factor=int(1/factor),preserve_counts=preserve_counts)else:returnself.copy()
[docs]definterp_by_pix(self,pix,method=None,fill_value=None):"""Interpolate map values at the given pixel coordinates."""raiseNotImplementedError
[docs]defcutout(self,position,width,*args,**kwargs):"""Create a cutout around a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Center position of the cutout region. width : `~astropy.coordinates.Angle` or `~astropy.units.Quantity` Diameter of the circular cutout region. Returns ------- cutout : `~gammapy.maps.HpxNDMap` Cutout map. """geom=self.geom.cutout(position=position,width=width)ifself.geom.is_allsky:idx=geom._ipixelse:idx=self.geom.to_image().global_to_local((geom._ipix,))data=self.data[...,idx]returnself.__class__(geom=geom,data=data,unit=self.unit,meta=self.meta)
[docs]defstack(self,other,weights=None,nan_to_num=True):"""Stack cutout into map. Parameters ---------- other : `HpxNDMap` Other map to stack. weights : `HpxNDMap`, optional Array to be used as weights. The spatial geometry must be equivalent to `other` and additional axes must be broadcastable. Default is None. nan_to_num: bool, optional Non-finite values are replaced by zero if True. Default is True. """ifself.geom==other.geom:idx=Noneelifself.geom.is_aligned(other.geom):ifself.geom.is_allsky:idx=other.geom._ipixelse:idx=self.geom.to_image().global_to_local((other.geom._ipix,))[0]else:raiseValueError("Can only stack equivalent maps or cutout of the same map.")data=other.quantity.to_value(self.unit)ifnan_to_num:not_finite=~np.isfinite(data)ifnp.any(not_finite):data=data.copy()data[not_finite]=0ifweightsisnotNone:ifnotother.geom.to_image()==weights.geom.to_image():raiseValueError("Incompatible spatial geoms between map and weights")data=data*weights.dataifidxisNone:self.data+=dataelse:self.data[...,idx]+=data
[docs]defsmooth(self,width,kernel="gauss"):"""Smooth the map. Iterates over 2D image planes, processing one at a time. Parameters ---------- width : `~astropy.units.Quantity`, str or float Smoothing width given as quantity or float. If a float is given it is interpreted as smoothing width in pixels. If an (angular) quantity is given it is converted to pixels using `~healpy.nside2resol`. It corresponds to the standard deviation in case of a Gaussian kernel, and the radius in case of a disk kernel. kernel : {'gauss', 'disk'}, optional Kernel shape. Default is "gauss". Returns ------- image : `HpxNDMap` Smoothed image (a copy, the original object is unchanged). """importhealpyashpiflen(self.geom.nside)>1:raiseNotImplementedError("smooth is not supported for an irregular map.")nside=self.geom.nside.item()lmax=int(3*nside-1)# maximum l of the power spectrumipix=self.geom._ipixifnotself.geom.is_allsky:# stack into an all sky mapfull_sky_geom=HpxGeom.create(nside=self.geom.nside,nest=self.geom.nest,frame=self.geom.frame,axes=self.geom.axes,)full_sky_map=HpxNDMap.from_geom(full_sky_geom)forimg,idxinself.iter_by_image_data():full_sky_map.data[idx][ipix]=imgelse:full_sky_map=self# The smoothing width is expected by healpy in radiansifisinstance(width,(u.Quantity,str)):width=u.Quantity(width)width=width.to_value("rad")else:binsz=np.degrees(hp.nside2resol(nside))width=width*binszwidth=np.deg2rad(width)smoothed_data=np.empty(self.data.shape,dtype=float)forimg,idxinfull_sky_map.iter_by_image_data():img=img.astype(float)ifself.geom.nest:# reorder to ring to do the smoothingimg=hp.pixelfunc.reorder(img,n2r=True)ifkernel=="gauss":data=hp.sphtfunc.smoothing(img,sigma=width,pol=False,lmax=lmax)elifkernel=="disk":# create the step function in angular spacetheta=np.linspace(0,width)beam=np.ones(len(theta))beam[theta>width]=0# convert to the spherical harmonics spacewindow_beam=hp.sphtfunc.beam2bl(beam,theta,lmax)# normalize the window beamwindow_beam=window_beam/window_beam.max()data=hp.sphtfunc.smoothing(img,beam_window=window_beam,pol=False,lmax=lmax)else:raiseValueError(f"Invalid kernel: {kernel!r}")ifself.geom.nest:# reorder back to nest after the smoothingdata=hp.pixelfunc.reorder(data,r2n=True)smoothed_data[idx]=data[ipix]returnself._init_copy(data=smoothed_data)
[docs]defconvolve(self,kernel,convolution_method="wcs-tan",**kwargs):"""Convolve map with a WCS kernel. Project the map into a WCS geometry, convolve with a WCS kernel and project back into the initial HEALPix geometry. If the kernel is two-dimensional, it is applied to all image planes likewise. If the kernel is higher dimensional it must match the map in the number of dimensions and the corresponding kernel is selected for every image plane. Parameters ---------- kernel : `~gammapy.irf.PSFKernel` Convolution kernel. The pixel size must be upsampled by a factor 2 or bigger with respect to the input map to prevent artifacts in the projection. convolution_method : {"wcs-tan", ""} Convolution method. If "wcs-tan", project on WCS geometry and convolve with WCS kernel. See `~gammapy.maps.HpxNDMap.convolve_wcs`. If "", convolve map with a symmetrical WCS kernel. See `~gammapy.maps.HpxNDMap.convolve_full`. Default is "wcs-tan". **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsNDMap.convolve`. Returns ------- map : `HpxNDMap` Convolved map. """ifconvolution_method=="wcs-tan":returnself.convolve_wcs(kernel,**kwargs)elifconvolution_method=="":returnself.convolve_full(kernel)else:raiseValueError(f"Not a valid method for HPX convolution: {convolution_method}")
[docs]defconvolve_wcs(self,kernel,**kwargs):"""Convolve map with a WCS kernel. Project the map into a WCS geometry, convolve with a WCS kernel and project back into the initial HEALPix geometry. If the kernel is two-dimensional, it is applied to all image planes likewise. If the kernel is higher dimensional should either match the map in the number of dimensions or the map must be an image (no non-spatial axes). In that case, the corresponding kernel is selected and applied to every image plane or to the single input image respectively. Parameters ---------- kernel : `~gammapy.irf.PSFKernel` Convolution kernel. The pixel size must be upsampled by a factor 2 or bigger with respect to the input map to prevent artifacts in the projection. **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsNDMap.convolve`. Returns ------- map : `HpxNDMap` Convolved map. """# TODO: maybe go through `.to_wcs_tiles()` to make this work for allsky mapsifself.geom.is_allsky:raiseValueError("Convolution via WCS projection is not supported for allsky maps.")ifself.geom.width>10*u.deg:log.warning("Convolution via WCS projection is not recommended for large ""maps (> 10 deg). Perhaps the method `convolve_full()` is more suited for ""this case.")geom_kernel=kernel.psf_kernel_map.geomwcs_size=np.max(geom_kernel.to_image().pixel_scales.deg)hpx_size=get_pix_size_from_nside(self.geom.nside[0])ifwcs_size>0.5*hpx_size:raiseValueError(f"The kernel pixel size of {wcs_size} has to be smaller by at least"f" a factor 2 than the pixel size of the input map of {hpx_size}")geom_wcs=self.geom.to_wcs_geom(proj="TAN").to_image()hpx2wcs=HpxToWcsMapping.create(hpx=self.geom,wcs=geom_wcs.to_binsz(binsz=wcs_size))# Project to WCS and convolvewcs_map=self.to_wcs(hpx2wcs=hpx2wcs,fill_nan=False)conv_wcs_map=wcs_map.convolve(kernel=kernel,**kwargs)ifself.geom.is_imageandgeom_kernel.ndim>2:target_geom=self.geom.to_cube(geom_kernel.axes)else:target_geom=self.geom# and back to hpxdata=np.zeros(target_geom.data_shape)data=hpx2wcs.fill_hpx_map_from_wcs_data(wcs_data=conv_wcs_map.data,hpx_data=data)returnHpxNDMap.from_geom(target_geom,data=data)
[docs]defconvolve_full(self,kernel):"""Convolve map with a symmetrical WCS kernel. Extract the radial profile of the kernel (assuming radial symmetry) and convolve via `~healpy.sphtfunc.smoothing`. Since no projection is applied, this is suited for full-sky and large maps. If the kernel is two-dimensional, it is applied to all image planes likewise. If the kernel is higher dimensional it must match the map in the number of dimensions and the corresponding kernel is selected for every image plane. Parameters ---------- kernel : `~gammapy.irf.PSFKernel` Convolution kernel. The pixel size must be upsampled by a factor 2 or bigger with respect to the input map to prevent artifacts in the projection. Returns ------- map : `HpxNDMap` Convolved map. """importhealpyashpiflen(self.geom.nside)>1:raiseNotImplementedError("convolve_full() is not supported for an irregular map.")nside=self.geom.nside.item()lmax=int(3*nside-1)# maximum l of the power spectrumnest=self.geom.nestallsky=self.geom.is_allskyipix=self.geom._ipixifnotallsky:# stack into an all sky mapfull_sky_geom=HpxGeom.create(nside=self.geom.nside,nest=self.geom.nest,frame=self.geom.frame,axes=self.geom.axes,)full_sky_map=HpxNDMap.from_geom(full_sky_geom)forimg,idxinself.iter_by_image_data():full_sky_map.data[idx][ipix]=imgelse:full_sky_map=self# Get radial profile from the kernelpsf_kernel=kernel.psf_kernel_mapcenter_pix=psf_kernel.geom.center_pix[:2]center=max(center_pix)dim=np.argmax(center_pix)pixels=[0,0]pixels[dim]=np.linspace(0,center,int(center+1))# assuming radially symmetric kernelpixels[abs(1-dim)]=center_pix[abs(1-dim)]*np.ones(int(center+1))coords=psf_kernel.geom.pix_to_coord(pixels)coordinates=SkyCoord(coords[0],coords[1],frame=psf_kernel.geom.frame)angles=coordinates.separation(psf_kernel.geom.center_skydir).radvalues=psf_kernel.get_by_pix(pixels)# Do the convolution in each image planeconvolved_data=np.empty(self.data.shape,dtype=float)forimg,idxinfull_sky_map.iter_by_image_data():img=img.astype(float)ifnest:# reorder to ring to do the convolutionimg=hp.pixelfunc.reorder(img,n2r=True)radial_profile=np.reshape(values[:,idx],(values.shape[0],))window_beam=hp.sphtfunc.beam2bl(np.flip(radial_profile),np.flip(angles),lmax)window_beam=window_beam/window_beam.max()data=hp.sphtfunc.smoothing(img,beam_window=window_beam,pol=False,lmax=lmax)ifnest:# reorder back to nest after the convolutiondata=hp.pixelfunc.reorder(data,r2n=True)convolved_data[idx]=data[ipix]returnself._init_copy(data=convolved_data)
def_interp_by_coord(self,coords):"""Linearly interpolate map values."""pix,wts=self.geom.interp_weights(coords)ifself.geom.is_image:returnnp.sum(self.data.T[tuple(pix)]*wts,axis=0)val=np.zeros(pix[0].shape[1:])# Loop over function values at cornersforiinrange(2**len(self.geom.axes)):pix_i=[]wt=np.ones(pix[0].shape[1:])[np.newaxis,...]forj,axinenumerate(self.geom.axes):idx=ax.coord_to_idx(coords[ax.name])idx=np.clip(idx,0,len(ax.center)-2)w=ax.center[idx+1]-ax.center[idx]c=u.Quantity(coords[ax.name],ax.center.unit,copy=False).valueifi&(1<<j):wt*=(c-ax.center[idx].value)/w.valuepix_i+=[idx+1]else:wt*=1.0-(c-ax.center[idx].value)/w.valuepix_i+=[idx]ifnotself.geom.is_regular:pix,wts=self.geom.interp_weights(coords,idxs=pix_i)wts[pix[0]==INVALID_INDEX.int]=0wt[~np.isfinite(wt)]=0val+=np.nansum(wts*wt*self.data.T[tuple(pix[:1]+pix_i)],axis=0)returnvaldef_resample_by_idx(self,idx,weights=None,preserve_counts=False):idx=pix_tuple_to_idx(idx)msk=np.all(np.stack([t!=INVALID_INDEX.intfortinidx]),axis=0)ifweightsisnotNone:weights=weights[msk]idx=[t[msk]fortinidx]idx_local=list(self.geom.global_to_local(idx))msk=idx_local[0]>=0idx_local=[t[msk]fortinidx_local]ifweightsisnotNone:ifisinstance(weights,u.Quantity):weights=weights.to_value(self.unit)weights=weights[msk]idx_local=np.ravel_multi_index(idx_local,self.data.T.shape)idx_local,idx_inv=np.unique(idx_local,return_inverse=True)weights=np.bincount(idx_inv,weights=weights)ifnotpreserve_counts:weights/=np.bincount(idx_inv).astype(self.data.dtype)self.data.T.flat[idx_local]+=weights
[docs]defto_region_nd_map(self,region,func=np.nansum,weights=None,method="nearest"):"""Get region ND map in a given region. By default, the whole map region is considered. Parameters ---------- region: `~regions.Region` or `~astropy.coordinates.SkyCoord` Region. func : numpy.func, optional Function to reduce the data. Default is np.nansum. For boolean Map, use np.any or np.all. weights : `WcsNDMap`, optional Array to be used as weights. The geometry must be equivalent. Default is None. method : {"nearest", "linear"} How to interpolate if a position is given. Default is "neraest". Returns ------- spectrum : `~gammapy.maps.RegionNDMap` Spectrum in the given region. """fromgammapy.mapsimportRegionGeom,RegionNDMapifisinstance(region,SkyCoord):region=PointSkyRegion(region)ifweightsisnotNone:ifnotself.geom==weights.geom:raiseValueError("Incompatible spatial geoms between map and weights")geom=RegionGeom(region=region,axes=self.geom.axes)ifisinstance(region,PointSkyRegion):coords=geom.get_coord()data=self.interp_by_coord(coords=coords,method=method)ifweightsisnotNone:data*=weights.interp_by_coord(coords=coords,method=method)else:cutout=self.cutout(position=geom.center_skydir,width=np.max(geom.width))ifweightsisnotNone:weights_cutout=weights.cutout(position=geom.center_skydir,width=geom.width)cutout.data*=weights_cutout.datamask=cutout.geom.to_image().region_mask([region]).datadata=func(cutout.data[...,mask],axis=-1)returnRegionNDMap(geom=geom,data=data,unit=self.unit,meta=self.meta.copy())
[docs]defplot(self,method="raster",ax=None,normalize=False,proj="AIT",oversample=2,width_pix=1000,**kwargs,):"""Quickplot method. This will generate a visualization of the map by converting to a rasterized WCS image (method='raster') or drawing polygons for each pixel (method='poly'). Parameters ---------- method : {'raster','poly'} Method for mapping HEALPix pixels to a two-dimensional image. Can be set to 'raster' (rasterization to cartesian image plane) or 'poly' (explicit polygons for each pixel). WARNING: The 'poly' method is much slower than 'raster' and only suitable for maps with less than ~10k pixels. Default is "raster". proj : string, optional Any valid WCS projection type. Default is "AIT". oversample : float, optional Oversampling factor for WCS map. This will be the approximate ratio of the width of a HPX pixel to a WCS pixel. If this parameter is None then the width will be set from ``width_pix``. Default is 2. width_pix : int, optional Width of the WCS geometry in pixels. The pixel size will be set to the number of pixels satisfying ``oversample`` or ``width_pix`` whichever is smaller. If this parameter is None then the width will be set from ``oversample``. Default is 1000. **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.imshow`. Returns ------- ax : `~astropy.visualization.wcsaxes.WCSAxes` WCS axes object. """ifmethod=="raster":m=self.to_wcs(sum_bands=True,normalize=normalize,proj=proj,oversample=oversample,width_pix=width_pix,)returnm.plot(ax,**kwargs)elifmethod=="poly":returnself._plot_poly(proj=proj,ax=ax)else:raiseValueError(f"Invalid method: {method!r}")
def_plot_poly(self,proj="AIT",step=1,ax=None):"""Plot the map using a collection of polygons. Parameters ---------- proj : string, optional Any valid WCS projection type. Default is "AIT". step : int, optional Set the number vertices that will be computed for each pixel in multiples of 4. Default is 1. ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. Default is None. """# FIXME: At the moment this only works for all-sky maps if the# projection is centered at (0,0)# FIXME: Figure out how to force a square aspect-ratio like imshowimporthealpyashpfrommatplotlib.collectionsimportPatchCollectionfrommatplotlib.patchesimportPolygonwcs=self.geom.to_wcs_geom(proj=proj,oversample=1)ifaxisNone:fig=plt.gcf()ax=fig.add_subplot(111,projection=wcs.wcs,aspect="equal")wcs_lonlat=wcs.center_coord[:2]idx=self.geom.get_idx()vtx=hp.boundaries(self.geom.nside.item(),idx[0],nest=self.geom.nest,step=step)theta,phi=hp.vec2ang(np.rollaxis(vtx,2))theta=theta.reshape((4*step,-1)).Tphi=phi.reshape((4*step,-1)).Tpatches=[]data=[]defget_angle(x,t):return180.0-(180.0-x+t)%360.0fori,(x,y)inenumerate(zip(phi,theta)):lon,lat=np.degrees(x),np.degrees(np.pi/2.0-y)# Add a small offset to avoid vertices wrapping to the# other size of the projectionifget_angle(np.median(lon),wcs_lonlat[0].to_value("deg"))>0:idx=wcs.coord_to_pix((lon-1e-4,lat))else:idx=wcs.coord_to_pix((lon+1e-4,lat))dist=np.max(np.abs(idx[0][0]-idx[0]))# Split pixels that wrap around the edges of the projectionifdist>wcs.npix[0]/1.5:lon,lat=np.degrees(x),np.degrees(np.pi/2.0-y)lon0=lon-1e-4lon1=lon+1e-4pix0=wcs.coord_to_pix((lon0,lat))pix1=wcs.coord_to_pix((lon1,lat))idx0=np.argsort(pix0[0])idx1=np.argsort(pix1[0])pix0=(pix0[0][idx0][:3],pix0[1][idx0][:3])pix1=(pix1[0][idx1][1:],pix1[1][idx1][1:])patches.append(Polygon(np.vstack((pix0[0],pix0[1])).T,closed=True))patches.append(Polygon(np.vstack((pix1[0],pix1[1])).T,closed=True))data.append(self.data[i])data.append(self.data[i])else:polygon=Polygon(np.vstack((idx[0],idx[1])).T,closed=True)patches.append(polygon)data.append(self.data[i])p=PatchCollection(patches,linewidths=0,edgecolors="None")p.set_array(np.array(data))ax.add_collection(p)ax.autoscale_view()ax.coords.grid(color="w",linestyle=":",linewidth=0.5)returnax
[docs]defplot_mask(self,method="raster",ax=None,proj="AIT",oversample=2,width_pix=1000,**kwargs,):"""Plot the mask as a shaded area. Parameters ---------- method : {'raster','poly'} Method for mapping HEALPix pixels to a two-dimensional image. Can be set to 'raster' (rasterization to cartesian image plane) or 'poly' (explicit polygons for each pixel). WARNING: The 'poly' method is much slower than 'raster' and only suitable for maps with less than ~10k pixels. Default is "raster". proj : string, optional Any valid WCS projection type. Default is "AIT". oversample : float, optional Oversampling factor for WCS map. This will be the approximate ratio of the width of a HPX pixel to a WCS pixel. If this parameter is None then the width will be set from ``width_pix``. Default is 2. width_pix : int, optional Width of the WCS geometry in pixels. The pixel size will be set to the number of pixels satisfying ``oversample`` or ``width_pix`` whichever is smaller. If this parameter is None then the width will be set from ``oversample``. Default is 1000. **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.imshow`. Returns ------- ax : `~astropy.visualization.wcsaxes.WCSAxes` WCS axis object. """ifnotself.is_mask:raiseValueError("`.plot_mask()` only supports maps containing boolean values.")ifmethod=="raster":m=self.to_wcs(sum_bands=True,normalize=False,proj=proj,oversample=oversample,width_pix=width_pix,)m.data=np.nan_to_num(m.data).astype(bool)returnm.plot_mask(ax=ax,**kwargs)else:raiseValueError(f"Invalid method: {method!r}")
[docs]defsample_coord(self,n_events,random_state=0):raiseNotImplementedError("HpXNDMap.sample_coord is not implemented yet.")