# Licensed under a 3-clause BSD style license - see LICENSE.rstimportabcimportcopyimportinspectimportjsonimportnumpyasnpfromastropyimportunitsasufromastropy.ioimportfitsimportmatplotlib.pyplotaspltfromgammapy.utils.scriptsimportmake_pathfromgammapy.utils.unitsimportenergy_unit_formatfrom.axesimportMapAxisfrom.coordimportMapCoordfrom.geomimportpix_tuple_to_idxfrom.ioimportJsonQuantityDecoder__all__=["Map"]
[docs]classMap(abc.ABC):"""Abstract map class. This can represent WCS- or HEALPIX-based maps with 2 spatial dimensions and N non-spatial dimensions. Parameters ---------- geom : `~gammapy.maps.Geom` Geometry data : `~numpy.ndarray` or `~astropy.units.Quantity` Data array meta : `dict` Dictionary to store meta data unit : str or `~astropy.units.Unit` Data unit, ignored if data is a Quantity. """tag="map"def__init__(self,geom,data,meta=None,unit=""):self._geom=geomifisinstance(data,u.Quantity):self._unit=u.Unit(unit)self.quantity=dataelse:self.data=dataself._unit=u.Unit(unit)ifmetaisNone:self.meta={}else:self.meta=metadef_init_copy(self,**kwargs):"""Init map instance by copying missing init arguments from self."""argnames=inspect.getfullargspec(self.__init__).argsargnames.remove("self")argnames.remove("dtype")forarginargnames:value=getattr(self,"_"+arg)kwargs.setdefault(arg,copy.deepcopy(value))returnself.from_geom(**kwargs)@propertydefis_mask(self):"""Whether map is mask with bool dtype"""returnself.data.dtype==bool@propertydefgeom(self):"""Map geometry (`~gammapy.maps.Geom`)"""returnself._geom@propertydefdata(self):"""Data array (`~numpy.ndarray`)"""returnself._data@data.setterdefdata(self,value):"""Set data Parameters ---------- value : array-like Data array """ifnp.isscalar(value):value=value*np.ones(self.geom.data_shape,dtype=type(value))ifisinstance(value,u.Quantity):raiseTypeError("Map data must be a Numpy array. Set unit separately")ifnotvalue.shape==self.geom.data_shape:value=value.reshape(self.geom.data_shape)self._data=value@propertydefunit(self):"""Map unit (`~astropy.units.Unit`)"""returnself._unit@propertydefmeta(self):"""Map meta (`dict`)"""returnself._meta@meta.setterdefmeta(self,val):self._meta=val@propertydefquantity(self):"""Map data times unit (`~astropy.units.Quantity`)"""returnu.Quantity(self.data,self.unit,copy=False)@quantity.setterdefquantity(self,val):"""Set data and unit Parameters ---------- value : `~astropy.units.Quantity` Quantity """val=u.Quantity(val,copy=False)self.data=val.valueself._unit=val.unit
[docs]@staticmethoddefcreate(**kwargs):"""Create an empty map object. This method accepts generic options listed below, as well as options for `HpxMap` and `WcsMap` objects. For WCS-specific options, see `WcsMap.create` and for HPX-specific options, see `HpxMap.create`. Parameters ---------- frame : str Coordinate system, either Galactic ("galactic") or Equatorial ("icrs"). map_type : {'wcs', 'wcs-sparse', 'hpx', 'hpx-sparse', 'region'} Map type. Selects the class that will be used to instantiate the map. binsz : float or `~numpy.ndarray` Pixel size in degrees. skydir : `~astropy.coordinates.SkyCoord` Coordinate of map center. axes : list List of `~MapAxis` objects for each non-spatial dimension. If None then the map will be a 2D image. dtype : str Data type, default is 'float32' unit : str or `~astropy.units.Unit` Data unit. meta : `dict` Dictionary to store meta data. region : `~regions.SkyRegion` Sky region used for the region map. Returns ------- map : `Map` Empty map object. """from.hpximportHpxMapfrom.regionimportRegionNDMapfrom.wcsimportWcsMapmap_type=kwargs.setdefault("map_type","wcs")if"wcs"inmap_type.lower():returnWcsMap.create(**kwargs)elif"hpx"inmap_type.lower():returnHpxMap.create(**kwargs)elifmap_type=="region":_=kwargs.pop("map_type")returnRegionNDMap.create(**kwargs)else:raiseValueError(f"Unrecognized map type: {map_type!r}")
[docs]@staticmethoddefread(filename,hdu=None,hdu_bands=None,map_type="auto",format=None,colname=None):"""Read a map from a FITS file. Parameters ---------- filename : str or `~pathlib.Path` Name of the FITS file. hdu : str Name or index of the HDU with the map data. hdu_bands : str Name or index of the HDU with the BANDS table. If not defined this will be inferred from the FITS header of the map HDU. map_type : {'wcs', 'wcs-sparse', 'hpx', 'hpx-sparse', 'auto', 'region'} Map type. Selects the class that will be used to instantiate the map. The map type should be consistent with the format of the input file. If map_type is 'auto' then an appropriate map type will be inferred from the input file. colname : str, optional data column name to be used of healix map. Returns ------- map_out : `Map` Map object """withfits.open(str(make_path(filename)),memmap=False)ashdulist:returnMap.from_hdulist(hdulist,hdu,hdu_bands,map_type,format=format,colname=colname)
[docs]@staticmethoddeffrom_geom(geom,meta=None,data=None,unit="",dtype="float32"):"""Generate an empty map from a `Geom` instance. Parameters ---------- geom : `Geom` Map geometry. data : `numpy.ndarray` data array meta : `dict` Dictionary to store meta data. unit : str or `~astropy.units.Unit` Data unit. Returns ------- map_out : `Map` Map object """from.hpximportHpxGeomfrom.regionimportRegionGeomfrom.wcsimportWcsGeomifisinstance(geom,HpxGeom):map_type="hpx"elifisinstance(geom,WcsGeom):map_type="wcs"elifisinstance(geom,RegionGeom):map_type="region"else:raiseValueError("Unrecognized geom type.")cls_out=Map._get_map_cls(map_type)returncls_out(geom,data=data,meta=meta,unit=unit,dtype=dtype)
[docs]@staticmethoddeffrom_hdulist(hdulist,hdu=None,hdu_bands=None,map_type="auto",format=None,colname=None):"""Create from `astropy.io.fits.HDUList`. Parameters ---------- hdulist : `~astropy.io.fits.HDUList` HDU list containing HDUs for map data and bands. hdu : str Name or index of the HDU with the map data. hdu_bands : str Name or index of the HDU with the BANDS table. map_type : {"auto", "wcs", "hpx", "region"} Map type. format : {'gadf', 'fgst-ccube', 'fgst-template'} FITS format convention. colname : str, optional Data column name to be used for the HEALPix map. Returns ------- map_out : `Map` Map object """ifmap_type=="auto":map_type=Map._get_map_type(hdulist,hdu)cls_out=Map._get_map_cls(map_type)ifmap_type=="hpx":returncls_out.from_hdulist(hdulist,hdu=hdu,hdu_bands=hdu_bands,format=format,colname=colname)else:returncls_out.from_hdulist(hdulist,hdu=hdu,hdu_bands=hdu_bands,format=format)
@staticmethoddef_get_meta_from_header(header):"""Load meta data from a FITS header."""if"META"inheader:returnjson.loads(header["META"],cls=JsonQuantityDecoder)else:return{}@staticmethoddef_get_map_type(hdu_list,hdu_name):"""Infer map type from a FITS HDU. Only read header, never data, to have good performance. """ifhdu_nameisNone:# Find the header of the first non-empty HDUheader=hdu_list[0].headerifheader["NAXIS"]==0:header=hdu_list[1].headerelse:header=hdu_list[hdu_name].headerif("PIXTYPE"inheader)and(header["PIXTYPE"]=="HEALPIX"):return"hpx"elif"CTYPE1"inheader:return"wcs"else:return"region"@staticmethoddef_get_map_cls(map_type):"""Get map class for given `map_type` string. This should probably be a registry dict so that users can add supported map types to the `gammapy.maps` I/O (see e.g. the Astropy table format I/O registry), but that's non-trivial to implement without avoiding circular imports. """ifmap_type=="wcs":from.wcsimportWcsNDMapreturnWcsNDMapelifmap_type=="wcs-sparse":raiseNotImplementedError()elifmap_type=="hpx":from.hpximportHpxNDMapreturnHpxNDMapelifmap_type=="hpx-sparse":raiseNotImplementedError()elifmap_type=="region":from.regionimportRegionNDMapreturnRegionNDMapelse:raiseValueError(f"Unrecognized map type: {map_type!r}")
[docs]defwrite(self,filename,overwrite=False,**kwargs):"""Write to a FITS file. Parameters ---------- filename : str Output file name. overwrite : bool Overwrite existing file? hdu : str Set the name of the image extension. By default this will be set to SKYMAP (for BINTABLE HDU) or PRIMARY (for IMAGE HDU). hdu_bands : str Set the name of the bands table extension. By default this will be set to BANDS. format : str, optional FITS format convention. By default files will be written to the gamma-astro-data-formats (GADF) format. This option can be used to write files that are compliant with format conventions required by specific software (e.g. the Fermi Science Tools). The following formats are supported: - "gadf" (default) - "fgst-ccube" - "fgst-ltcube" - "fgst-bexpcube" - "fgst-srcmap" - "fgst-template" - "fgst-srcmap-sparse" - "galprop" - "galprop2" sparse : bool Sparsify the map by dropping pixels with zero amplitude. This option is only compatible with the 'gadf' format. """hdulist=self.to_hdulist(**kwargs)hdulist.writeto(str(make_path(filename)),overwrite=overwrite)
[docs]defiter_by_axis(self,axis_name,keepdims=False):""" "Iterate over a given axis Yields ------ map : `Map` Map iteration. See also -------- iter_by_image : iterate by image returning a map """axis=self.geom.axes[axis_name]foridxinrange(axis.nbin):idx_axis=slice(idx,idx+1)ifkeepdimselseidxslices={axis_name:idx_axis}yieldself.slice_by_idx(slices=slices)
[docs]defiter_by_image(self,keepdims=False):"""Iterate over image planes of a map. Parameters ---------- keepdims : bool Keep dimensions. Yields ------ map : `Map` Map iteration. See also -------- iter_by_image_data : iterate by image returning data and index """foridxinnp.ndindex(self.geom.shape_axes):ifkeepdims:names=self.geom.axes.namesslices={name:slice(_,_+1)forname,_inzip(names,idx)}yieldself.slice_by_idx(slices=slices)else:yieldself.get_image_by_idx(idx=idx)
[docs]defiter_by_image_data(self):"""Iterate over image planes of the map. The image plane index is in data order, so that the data array can be indexed directly. Yields ------ (data, idx) : tuple Where ``data`` is a `numpy.ndarray` view of the image plane data, and ``idx`` is a tuple of int, the index of the image plane. See also -------- iter_by_image : iterate by image returning a map """foridxinnp.ndindex(self.geom.shape_axes):yieldself.data[idx[::-1]],idx[::-1]
[docs]defcoadd(self,map_in,weights=None):"""Add the contents of ``map_in`` to this map. This method can be used to combine maps containing integral quantities (e.g. counts) or differential quantities if the maps have the same binning. Parameters ---------- map_in : `Map` Input map. weights: `Map` or `~numpy.ndarray` The weight factors while adding """ifnotself.unit.is_equivalent(map_in.unit):raiseValueError("Incompatible units")# TODO: Check whether geometries are aligned and if so sum the# data vectors directlyifweightsisnotNone:map_in=map_in*weightsidx=map_in.geom.get_idx()coords=map_in.geom.get_coord()vals=u.Quantity(map_in.get_by_idx(idx),map_in.unit)self.fill_by_coord(coords,vals)
[docs]defpad(self,pad_width,axis_name=None,mode="constant",cval=0,method="linear"):"""Pad the spatial dimensions of the map. Parameters ---------- pad_width : {sequence, array_like, int} Number of pixels padded to the edges of each axis. axis_name : str Which axis to downsample. By default spatial axes are padded. mode : {'edge', 'constant', 'interp'} Padding mode. 'edge' pads with the closest edge value. 'constant' pads with a constant value. 'interp' pads with an extrapolated value. cval : float Padding value when mode='consant'. Returns ------- map : `Map` Padded map. """ifaxis_name:ifnp.isscalar(pad_width):pad_width=(pad_width,pad_width)geom=self.geom.pad(pad_width=pad_width,axis_name=axis_name)idx=self.geom.axes.index_data(axis_name)pad_width_np=[(0,0)]*self.data.ndimpad_width_np[idx]=pad_widthkwargs={}ifmode=="constant":kwargs["constant_values"]=cvaldata=np.pad(self.data,pad_width=pad_width_np,mode=mode,**kwargs)returnself.__class__(geom=geom,data=data,unit=self.unit,meta=self.meta.copy())returnself._pad_spatial(pad_width,mode="constant",cval=cval)
[docs]@abc.abstractmethoddefcrop(self,crop_width):"""Crop the spatial dimensions of the map. Parameters ---------- crop_width : {sequence, array_like, int} Number of pixels cropped from the edges of each axis. Defined analogously to ``pad_with`` from `numpy.pad`. Returns ------- map : `Map` Cropped map. """pass
[docs]@abc.abstractmethoddefdownsample(self,factor,preserve_counts=True,axis_name=None):"""Downsample the spatial dimension by a given factor. Parameters ---------- factor : int Downsampling factor. preserve_counts : bool Preserve the integral over each bin. This should be true if the map is an integral quantity (e.g. counts) and false if the map is a differential quantity (e.g. intensity). axis_name : str Which axis to downsample. By default spatial axes are downsampled. Returns ------- map : `Map` Downsampled map. """pass
[docs]@abc.abstractmethoddefupsample(self,factor,order=0,preserve_counts=True,axis_name=None):"""Upsample the spatial dimension by a given factor. Parameters ---------- factor : int Upsampling factor. order : int Order of the interpolation used for upsampling. preserve_counts : bool Preserve the integral over each bin. This should be true if the map is an integral quantity (e.g. counts) and false if the map is a differential quantity (e.g. intensity). axis_name : str Which axis to upsample. By default spatial axes are upsampled. Returns ------- map : `Map` Upsampled map. """pass
[docs]defresample_axis(self,axis,weights=None,ufunc=np.add):"""Resample map to a new axis binning by grouping over smaller bins and apply ufunc to the bin contents. By default, the map content are summed over the smaller bins. Other numpy ufunc can be used, e.g. np.logical_and, np.logical_or Parameters ---------- axis : `MapAxis` New map axis. weights : `Map` Array to be used as weights. The spatial geometry must be equivalent to `other` and additional axes must be broadcastable. ufunc : `~numpy.ufunc` ufunc to use to resample the axis. Default is numpy.add. Returns ------- map : `Map` Map with resampled axis. """from.hpximportHpxGeomgeom=self.geom.resample_axis(axis)axis_self=self.geom.axes[axis.name]axis_resampled=geom.axes[axis.name]# We don't use MapAxis.coord_to_idx because is does not behave as needed with boundariescoord=axis_resampled.edges.valueedges=axis_self.edges.valueindices=np.digitize(coord,edges)-1idx=self.geom.axes.index_data(axis.name)weights=1ifweightsisNoneelseweights.dataifnotisinstance(self.geom,HpxGeom):shape=self.geom._shape[:2]else:shape=(self.geom.data_shape[-1],)shape+=tuple([ax.nbinifax!=axiselse1foraxinself.geom.axes])padded_array=np.append(self.data*weights,np.zeros(shape[::-1]),axis=idx)slices=tuple([slice(0,_)for_ingeom.data_shape])data=ufunc.reduceat(padded_array,indices=indices,axis=idx)[slices]returnself._init_copy(data=data,geom=geom)
[docs]defslice_by_idx(self,slices,):"""Slice sub map from map object. Parameters ---------- slices : dict Dict of axes names and integers or `slice` object pairs. Contains one element for each non-spatial dimension. For integer indexing the corresponding axes is dropped from the map. Axes not specified in the dict are kept unchanged. Returns ------- map_out : `Map` Sliced map object. """geom=self.geom.slice_by_idx(slices)slices=tuple([slices.get(ax.name,slice(None))foraxinself.geom.axes])data=self.data[slices[::-1]]returnself.__class__(geom=geom,data=data,unit=self.unit,meta=self.meta)
[docs]defget_image_by_coord(self,coords):"""Return spatial map at the given axis coordinates. Parameters ---------- coords : tuple or dict Tuple should be ordered as (x_0, ..., x_n) where x_i are coordinates for non-spatial dimensions of the map. Dict should specify the axis names of the non-spatial axes such as {'axes0': x_0, ..., 'axesn': x_n}. Returns ------- map_out : `Map` Map with spatial dimensions only. See Also -------- get_image_by_idx, get_image_by_pix Examples -------- :: import numpy as np from gammapy.maps import Map, MapAxis from astropy.coordinates import SkyCoord from astropy import units as u # Define map axes energy_axis = MapAxis.from_edges( np.logspace(-1., 1., 4), unit='TeV', name='energy', ) time_axis = MapAxis.from_edges( np.linspace(0., 10, 20), unit='h', name='time', ) # Define map center skydir = SkyCoord(0, 0, frame='galactic', unit='deg') # Create map m_wcs = Map.create( map_type='wcs', binsz=0.02, skydir=skydir, width=10.0, axes=[energy_axis, time_axis], ) # Get image by coord tuple image = m_wcs.get_image_by_coord(('500 GeV', '1 h')) # Get image by coord dict with strings image = m_wcs.get_image_by_coord({'energy': '500 GeV', 'time': '1 h'}) # Get image by coord dict with quantities image = m_wcs.get_image_by_coord({'energy': 0.5 * u.TeV, 'time': 1 * u.h}) """ifisinstance(coords,tuple):coords=dict(zip(self.geom.axes.names,coords))idx=self.geom.axes.coord_to_idx(coords)returnself.get_image_by_idx(idx)
[docs]defget_image_by_pix(self,pix):"""Return spatial map at the given axis pixel coordinates Parameters ---------- pix : tuple Tuple of scalar pixel coordinates for each non-spatial dimension of the map. Tuple should be ordered as (I_0, ..., I_n). Pixel coordinates can be either float or integer type. See Also -------- get_image_by_coord, get_image_by_idx Returns ------- map_out : `Map` Map with spatial dimensions only. """idx=self.geom.pix_to_idx(pix)returnself.get_image_by_idx(idx)
[docs]defget_image_by_idx(self,idx):"""Return spatial map at the given axis pixel indices. Parameters ---------- idx : tuple Tuple of scalar indices for each non spatial dimension of the map. Tuple should be ordered as (I_0, ..., I_n). See Also -------- get_image_by_coord, get_image_by_pix Returns ------- map_out : `Map` Map with spatial dimensions only. """iflen(idx)!=len(self.geom.axes):raiseValueError("Tuple length must equal number of non-spatial dimensions")# Only support scalar indices per axisidx=tuple([int(_)for_inidx])geom=self.geom.to_image()data=self.data[idx[::-1]]returnself.__class__(geom=geom,data=data,unit=self.unit,meta=self.meta)
[docs]defget_by_coord(self,coords,fill_value=np.nan):"""Return map values at the given map coordinates. Parameters ---------- coords : tuple or `~gammapy.maps.MapCoord` Coordinate arrays for each dimension of the map. Tuple should be ordered as (lon, lat, x_0, ..., x_n) where x_i are coordinates for non-spatial dimensions of the map. fill_value : float Value which is returned if the position is outside of the projection footprint Returns ------- vals : `~numpy.ndarray` Values of pixels in the map. np.nan used to flag coords outside of map. """coords=MapCoord.create(coords,frame=self.geom.frame,axis_names=self.geom.axes.names)pix=self.geom.coord_to_pix(coords)vals=self.get_by_pix(pix,fill_value=fill_value)returnvals
[docs]defget_by_pix(self,pix,fill_value=np.nan):"""Return map values at the given pixel coordinates. Parameters ---------- pix : tuple Tuple of pixel index arrays for each dimension of the map. Tuple should be ordered as (I_lon, I_lat, I_0, ..., I_n) for WCS maps and (I_hpx, I_0, ..., I_n) for HEALPix maps. Pixel indices can be either float or integer type. fill_value : float Value which is returned if the position is outside of the projection footprint Returns ------- vals : `~numpy.ndarray` Array of pixel values. np.nan used to flag coordinates outside of map """# FIXME: Support local indexing here?# FIXME: Support slicing?pix=np.broadcast_arrays(*pix)idx=self.geom.pix_to_idx(pix)vals=self.get_by_idx(idx)mask=self.geom.contains_pix(pix)ifnotmask.all():vals=vals.astype(type(fill_value))vals[~mask]=fill_valuereturnvals
[docs]@abc.abstractmethoddefget_by_idx(self,idx):"""Return map values at the given pixel indices. Parameters ---------- idx : tuple Tuple of pixel index arrays for each dimension of the map. Tuple should be ordered as (I_lon, I_lat, I_0, ..., I_n) for WCS maps and (I_hpx, I_0, ..., I_n) for HEALPix maps. Returns ------- vals : `~numpy.ndarray` Array of pixel values. np.nan used to flag coordinate outside of map """pass
[docs]@abc.abstractmethoddefinterp_by_coord(self,coords,method="linear",fill_value=None):"""Interpolate map values at the given map coordinates. Parameters ---------- coords : tuple or `~gammapy.maps.MapCoord` Coordinate arrays for each dimension of the map. Tuple should be ordered as (lon, lat, x_0, ..., x_n) where x_i are coordinates for non-spatial dimensions of the map. method : {"linear", "nearest"} Method to interpolate data values. By default linear interpolation is performed. fill_value : None or float value The value to use for points outside of the interpolation domain. If None, values outside the domain are extrapolated. Returns ------- vals : `~numpy.ndarray` Interpolated pixel values. """pass
[docs]@abc.abstractmethoddefinterp_by_pix(self,pix,method="linear",fill_value=None):"""Interpolate map values at the given pixel coordinates. Parameters ---------- pix : tuple Tuple of pixel coordinate arrays for each dimension of the map. Tuple should be ordered as (p_lon, p_lat, p_0, ..., p_n) where p_i are pixel coordinates for non-spatial dimensions of the map. method : {"linear", "nearest"} Method to interpolate data values. By default linear interpolation is performed. fill_value : None or float value The value to use for points outside of the interpolation domain. If None, values outside the domain are extrapolated. Returns ------- vals : `~numpy.ndarray` Interpolated pixel values. """pass
[docs]definterp_to_geom(self,geom,preserve_counts=False,fill_value=0,**kwargs):"""Interpolate map to input geometry. Parameters ---------- geom : `~gammapy.maps.Geom` Target Map geometry preserve_counts : bool Preserve the integral over each bin. This should be true if the map is an integral quantity (e.g. counts) and false if the map is a differential quantity (e.g. intensity) **kwargs : dict Keyword arguments passed to `Map.interp_by_coord` Returns ------- interp_map : `Map` Interpolated Map """coords=geom.get_coord()map_copy=self.copy()ifpreserve_counts:ifgeom.ndim>2andgeom.axes[0]!=self.geom.axes[0]:raiseValueError(f"Energy axis do not match: expected {self.geom.axes[0]}, but got {geom.axes[0]}.")map_copy.data/=map_copy.geom.solid_angle().to_value("deg2")ifmap_copy.is_mask:# TODO: check this NaN handling is neededdata=map_copy.get_by_coord(coords)data=np.nan_to_num(data,nan=fill_value).astype(bool)else:data=map_copy.interp_by_coord(coords,fill_value=fill_value,**kwargs)ifpreserve_counts:data*=geom.solid_angle().to_value("deg2")returnMap.from_geom(geom,data=data,unit=self.unit)
[docs]defresample(self,geom,weights=None,preserve_counts=True):"""Resample pixels to ``geom`` with given ``weights``. Parameters ---------- geom : `~gammapy.maps.Geom` Target Map geometry weights : `~numpy.ndarray` Weights vector. Default is weight of one. preserve_counts : bool Preserve the integral over each bin. This should be true if the map is an integral quantity (e.g. counts) and false if the map is a differential quantity (e.g. intensity) """coords=geom.get_coord()idx=self.geom.coord_to_idx(coords)returnself._resample_by_idx(idx,weights=weights,preserve_counts=preserve_counts)
@abc.abstractmethoddef_resample_by_idx(self,idx,weights=None,preserve_counts=False):"""Resample pixels at ``idx`` with given ``weights``. Parameters ---------- idx : tuple Tuple of pixel index arrays for each dimension of the map. Tuple should be ordered as (I_lon, I_lat, I_0, ..., I_n) for WCS maps and (I_hpx, I_0, ..., I_n) for HEALPix maps. weights : `~numpy.ndarray` Weights vector. Default is weight of one. preserve_counts : bool Preserve the integral over each bin. This should be true if the map is an integral quantity (e.g. counts) and false if the map is a differential quantity (e.g. intensity) """pass
[docs]deffill_by_coord(self,coords,weights=None):"""Fill pixels at ``coords`` with given ``weights``. Parameters ---------- coords : tuple or `~gammapy.maps.MapCoord` Coordinate arrays for each dimension of the map. Tuple should be ordered as (lon, lat, x_0, ..., x_n) where x_i are coordinates for non-spatial dimensions of the map. weights : `~numpy.ndarray` Weights vector. Default is weight of one. """idx=self.geom.coord_to_idx(coords)self.fill_by_idx(idx,weights)
[docs]deffill_by_pix(self,pix,weights=None):"""Fill pixels at ``pix`` with given ``weights``. Parameters ---------- pix : tuple Tuple of pixel index arrays for each dimension of the map. Tuple should be ordered as (I_lon, I_lat, I_0, ..., I_n) for WCS maps and (I_hpx, I_0, ..., I_n) for HEALPix maps. Pixel indices can be either float or integer type. Float indices will be rounded to the nearest integer. weights : `~numpy.ndarray` Weights vector. Default is weight of one. """idx=pix_tuple_to_idx(pix)returnself.fill_by_idx(idx,weights=weights)
[docs]@abc.abstractmethoddeffill_by_idx(self,idx,weights=None):"""Fill pixels at ``idx`` with given ``weights``. Parameters ---------- idx : tuple Tuple of pixel index arrays for each dimension of the map. Tuple should be ordered as (I_lon, I_lat, I_0, ..., I_n) for WCS maps and (I_hpx, I_0, ..., I_n) for HEALPix maps. weights : `~numpy.ndarray` Weights vector. Default is weight of one. """pass
[docs]defset_by_coord(self,coords,vals):"""Set pixels at ``coords`` with given ``vals``. Parameters ---------- coords : tuple or `~gammapy.maps.MapCoord` Coordinate arrays for each dimension of the map. Tuple should be ordered as (lon, lat, x_0, ..., x_n) where x_i are coordinates for non-spatial dimensions of the map. vals : `~numpy.ndarray` Values vector. """idx=self.geom.coord_to_pix(coords)self.set_by_pix(idx,vals)
[docs]defset_by_pix(self,pix,vals):"""Set pixels at ``pix`` with given ``vals``. Parameters ---------- pix : tuple Tuple of pixel index arrays for each dimension of the map. Tuple should be ordered as (I_lon, I_lat, I_0, ..., I_n) for WCS maps and (I_hpx, I_0, ..., I_n) for HEALPix maps. Pixel indices can be either float or integer type. Float indices will be rounded to the nearest integer. vals : `~numpy.ndarray` Values vector. """idx=pix_tuple_to_idx(pix)returnself.set_by_idx(idx,vals)
[docs]@abc.abstractmethoddefset_by_idx(self,idx,vals):"""Set pixels at ``idx`` with given ``vals``. Parameters ---------- idx : tuple Tuple of pixel index arrays for each dimension of the map. Tuple should be ordered as (I_lon, I_lat, I_0, ..., I_n) for WCS maps and (I_hpx, I_0, ..., I_n) for HEALPix maps. vals : `~numpy.ndarray` Values vector. """pass
[docs]defplot_grid(self,figsize=None,ncols=3,**kwargs):"""Plot map as a grid of subplots for non-spatial axes Parameters ---------- figsize : tuple of int Figsize to plot on ncols : int Number of columns to plot **kwargs : dict Keyword arguments passed to `Map.plot`. Returns ------- axes : `~numpy.ndarray` of `~matplotlib.pyplot.Axes` Axes grid """iflen(self.geom.axes)>1:raiseValueError("Grid plotting is only supported for one non spatial axis")axis=self.geom.axes[0]cols=min(ncols,axis.nbin)rows=1+(axis.nbin-1)//colsiffigsizeisNone:width=12figsize=(width,width*rows/cols)ifself.geom.is_hpx:wcs=self.geom.to_wcs_geom().wcselse:wcs=self.geom.wcsfig,axes=plt.subplots(ncols=cols,nrows=rows,subplot_kw={"projection":wcs},figsize=figsize,gridspec_kw={"hspace":0.1,"wspace":0.1},)foridxinrange(cols*rows):ax=axes.flat[idx]try:image=self.get_image_by_idx((idx,))exceptIndexError:ax.set_visible(False)continueifimage.geom.is_hpx:image_wcs=image.to_wcs(normalize=False,proj="AIT",oversample=2,)else:image_wcs=imageimage_wcs.plot(ax=ax,**kwargs)ifaxis.node_type=="center":ifaxis.name=="energy"oraxis.name=="energy_true":info=energy_unit_format(axis.center[idx])else:info=f"{axis.center[idx]:.1f}"else:ifaxis.name=="energy"oraxis.name=="energy_true":info=f"{energy_unit_format(axis.edges[idx])} - {energy_unit_format(axis.edges[idx+1])}"else:info=f"{axis.edges[idx]:.1f} - {axis.edges[idx+1]:.1f} "ax.set_title(f"{axis.name.capitalize()} "+info)lon,lat=ax.coords[0],ax.coords[1]lon.set_ticks_position("b")lat.set_ticks_position("l")row,col=np.unravel_index(idx,shape=(rows,cols))ifcol>0:lat.set_ticklabel_visible(False)lat.set_axislabel("")ifrow<(rows-1):lon.set_ticklabel_visible(False)lon.set_axislabel("")returnaxes
[docs]defplot_interactive(self,rc_params=None,**kwargs):""" Plot map with interactive widgets to explore the non spatial axes. Parameters ---------- rc_params : dict Passed to ``matplotlib.rc_context(rc=rc_params)`` to style the plot. **kwargs : dict Keyword arguments passed to `WcsNDMap.plot`. Examples -------- You can try this out e.g. using a Fermi-LAT diffuse model cube with an energy axis:: from gammapy.maps import Map m = Map.read("$GAMMAPY_DATA/fermi_3fhl/gll_iem_v06_cutout.fits") m.plot_interactive(add_cbar=True, stretch="sqrt") If you would like to adjust the figure size you can use the ``rc_params`` argument:: rc_params = {'figure.figsize': (12, 6), 'font.size': 12} m.plot_interactive(rc_params=rc_params) """importmatplotlibasmplfromipywidgetsimportRadioButtons,SelectionSliderfromipywidgets.widgets.interactionimportfixed,interactifself.geom.is_image:raiseTypeError("Use .plot() for 2D Maps")kwargs.setdefault("interpolation","nearest")kwargs.setdefault("origin","lower")kwargs.setdefault("cmap","afmhot")rc_params=rc_paramsor{}stretch=kwargs.pop("stretch","sqrt")interact_kwargs={}foraxisinself.geom.axes:ifaxis.node_type=="center":ifaxis.name=="energy"oraxis.name=="energy_true":options=energy_unit_format(axis.center)else:options=axis.as_plot_labelselifaxis.name=="energy"oraxis.name=="energy_true":E=energy_unit_format(axis.edges)options=[f"{E[i]} - {E[i+1]}"foriinrange(len(E)-1)]else:options=axis.as_plot_labelsinteract_kwargs[axis.name]=SelectionSlider(options=options,description=f"Select {axis.name}:",continuous_update=False,style={"description_width":"initial"},layout={"width":"50%"},)interact_kwargs[axis.name+"_options"]=fixed(options)interact_kwargs["stretch"]=RadioButtons(options=["linear","sqrt","log"],value=stretch,description="Select stretch:",style={"description_width":"initial"},)@interact(**interact_kwargs)def_plot_interactive(**ikwargs):idx=[ikwargs[ax.name+"_options"].index(ikwargs[ax.name])foraxinself.geom.axes]img=self.get_image_by_idx(idx)stretch=ikwargs["stretch"]withmpl.rc_context(rc=rc_params):img.plot(stretch=stretch,**kwargs)plt.show()
[docs]defcopy(self,**kwargs):"""Copy map instance and overwrite given attributes, except for geometry. Parameters ---------- **kwargs : dict Keyword arguments to overwrite in the map constructor. Returns ------- copy : `Map` Copied Map. """if"geom"inkwargs:geom=kwargs["geom"]ifnotgeom.data_shape==self.geom.data_shape:raiseValueError("Can't copy and change data size of the map. "f" Current shape {self.geom.data_shape},"f" requested shape {geom.data_shape}")returnself._init_copy(**kwargs)
[docs]defapply_edisp(self,edisp):"""Apply energy dispersion to map. Requires energy axis. Parameters ---------- edisp : `gammapy.irf.EDispKernel` Energy dispersion matrix Returns ------- map : `WcsNDMap` Map with energy dispersion applied. """# TODO: either use sparse matrix mutiplication or something like edisp.is_diagonalifedispisnotNone:loc=self.geom.axes.index("energy_true")data=np.rollaxis(self.data,loc,len(self.data.shape))data=np.dot(data,edisp.pdf_matrix)data=np.rollaxis(data,-1,loc)energy_axis=edisp.axes["energy"].copy(name="energy")else:data=self.dataenergy_axis=self.geom.axes["energy_true"].copy(name="energy")geom=self.geom.to_image().to_cube(axes=[energy_axis])returnself._init_copy(geom=geom,data=data)
[docs]defmask_nearest_position(self,position):"""Given a sky coordinate return nearest valid position in the mask If the mask contains additional axes, the mask is reduced over those. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Test position Returns ------- position : `~astropy.coordinates.SkyCoord` Nearest position in the mask """ifnotself.geom.is_image:raiseValueError("Method only supported for 2D images")coords=self.geom.to_image().get_coord().skycoordseparation=coords.separation(position)separation[~self.data]=np.infidx=np.argmin(separation)returncoords.flatten()[idx]
[docs]defsum_over_axes(self,axes_names=None,keepdims=True,weights=None):"""To sum map values over all non-spatial axes. Parameters ---------- keepdims : bool, optional If this is set to true, the axes which are summed over are left in the map with a single bin axes_names: list of str Names of MapAxis to reduce over. If None, all will summed over weights : `Map` Weights to be applied. The Map should have the same geometry. Returns ------- map_out : `~Map` Map with non-spatial axes summed over """returnself.reduce_over_axes(func=np.add,axes_names=axes_names,keepdims=keepdims,weights=weights)
[docs]defreduce_over_axes(self,func=np.add,keepdims=False,axes_names=None,weights=None):"""Reduce map over non-spatial axes Parameters ---------- func : `~numpy.ufunc` Function to use for reducing the data. keepdims : bool, optional If this is set to true, the axes which are summed over are left in the map with a single bin axes_names: list Names of MapAxis to reduce over If None, all will reduced weights : `Map` Weights to be applied. Returns ------- map_out : `~Map` Map with non-spatial axes reduced """ifaxes_namesisNone:axes_names=self.geom.axes.namesmap_out=self.copy()foraxis_nameinaxes_names:map_out=map_out.reduce(axis_name,func=func,keepdims=keepdims,weights=weights)returnmap_out
[docs]defreduce(self,axis_name,func=np.add,keepdims=False,weights=None):"""Reduce map over a single non-spatial axis Parameters ---------- axis_name: str The name of the axis to reduce over func : `~numpy.ufunc` Function to use for reducing the data. keepdims : bool, optional If this is set to true, the axes which are summed over are left in the map with a single bin weights : `Map` Weights to be applied. Returns ------- map_out : `~Map` Map with the given non-spatial axes reduced """ifkeepdims:geom=self.geom.squash(axis_name=axis_name)else:geom=self.geom.drop(axis_name=axis_name)idx=self.geom.axes.index_data(axis_name)data=self.dataifweightsisnotNone:data=data*weightsdata=func.reduce(data,axis=idx,keepdims=keepdims,where=~np.isnan(data))returnself._init_copy(geom=geom,data=data)
[docs]defcumsum(self,axis_name):"""Compute cumulative sum along a given axis Parameters ---------- axis_name : str Along which axis to integrate. Returns ------- cumsum : `Map` Map with cumulative sum """axis=self.geom.axes[axis_name]axis_idx=self.geom.axes.index_data(axis_name)# TODO: the broadcasting should be done by axis.center, axis.bin_width etc.shape=[1]*len(self.geom.data_shape)shape[axis_idx]=-1values=self.quantity*axis.bin_width.reshape(shape)ifaxis_name=="rad":# take Jacobian into accountvalues=2*np.pi*axis.center.reshape(shape)*valuesdata=np.insert(values.cumsum(axis=axis_idx),0,0,axis=axis_idx)axis_shifted=MapAxis.from_nodes(axis.edges,name=axis.name,interp=axis.interp)axes=self.geom.axes.replace(axis_shifted)geom=self.geom.to_image().to_cube(axes)returnself.__class__(geom=geom,data=data.value,unit=data.unit)
[docs]defintegral(self,axis_name,coords,**kwargs):"""Compute integral along a given axis This method uses interpolation of the cumulative sum. Parameters ---------- axis_name : str Along which axis to integrate. coords : dict or `MapCoord` Map coordinates **kwargs : dict Coordinates at which to evaluate the IRF Returns ------- array : `~astropy.units.Quantity` Returns 2D array with axes offset """cumsum=self.cumsum(axis_name=axis_name)cumsum=cumsum.pad(pad_width=1,axis_name=axis_name,mode="edge")returnu.Quantity(cumsum.interp_by_coord(coords,**kwargs),cumsum.unit,copy=False)
[docs]defnormalize(self,axis_name=None):"""Normalise data in place along a given axis. Parameters ---------- axis_name : str Along which axis to normalize. """cumsum=self.cumsum(axis_name=axis_name).quantitywithnp.errstate(invalid="ignore",divide="ignore"):axis=self.geom.axes.index_data(axis_name=axis_name)normed=self.quantity/cumsum.max(axis=axis,keepdims=True)self.quantity=np.nan_to_num(normed)
[docs]@classmethoddeffrom_stack(cls,maps,axis=None,axis_name=None):"""Create Map from list of images and a non-spatial axis. The image geometries must be aligned, except for the axis that is stacked. Parameters ---------- maps : list of `Map` objects List of maps axis : `MapAxis` If a `MapAxis` is provided the maps are stacked along the last data axis and the new axis is introduced. axis_name : str If an axis name is as string the given the maps are stacked along the given axis name. Returns ------- map : `Map` Map with additional non-spatial axis. """geom=maps[0].geomifaxis_nameisNoneandaxisisNone:axis_name=geom.axes.names[-1]ifaxis_name:axis=MapAxis.from_stack(axes=[m.geom.axes[axis_name]forminmaps])geom=geom.drop(axis_name=axis_name)data=[]forminmaps:ifaxis_name:m_geom=m.geom.drop(axis_name=axis_name)else:m_geom=m.geomifnotm_geom==geom:raiseValueError(f"Image geometries not aligned: {m.geom} and {geom}")data.append(m.quantity.to_value(maps[0].unit))returncls.from_geom(data=np.stack(data),geom=geom.to_cube(axes=[axis]),unit=maps[0].unit)
[docs]defsplit_by_axis(self,axis_name):"""Split a Map along an axis into multiple maps. Parameters ---------- axis_name : str Name of the axis to split Returns ------- maps : list A list of `~gammapy.maps.Map` """maps=[]axis=self.geom.axes[axis_name]foridxinrange(axis.nbin):m=self.slice_by_idx({axis_name:idx})maps.append(m)returnmaps
[docs]defto_cube(self,axes):"""Append non-spatial axes to create a higher-dimensional Map. This will result in a Map with a new geometry with N+M dimensions where N is the number of current dimensions and M is the number of axes in the list. The data is reshaped onto the new geometry Parameters ---------- axes : list Axes that will be appended to this Map. The axes should have only one bin Returns ------- map : `~gammapy.maps.WcsNDMap` new map """foraxinaxes:ifax.nbin>1:raiseValueError(ax.name,"should have only one bin")geom=self.geom.to_cube(axes)data=self.data.reshape((1,)*len(axes)+self.data.shape)returnself.from_geom(data=data,geom=geom,unit=self.unit)
[docs]defget_spectrum(self,region=None,func=np.nansum,weights=None):"""Extract spectrum in a given region. The spectrum can be computed by summing (or, more generally, applying ``func``) along the spatial axes in each energy bin. This occurs only inside the ``region``, which by default is assumed to be the whole spatial extension of the map. Parameters ---------- region: `~regions.Region` Region (pixel or sky regions accepted). func : numpy.func Function to reduce the data. Default is np.nansum. For a boolean Map, use np.any or np.all. weights : `WcsNDMap` Array to be used as weights. The geometry must be equivalent. Returns ------- spectrum : `~gammapy.maps.RegionNDMap` Spectrum in the given region. """ifnotself.geom.has_energy_axis:raiseValueError("Energy axis required")returnself.to_region_nd_map(region=region,func=func,weights=weights)
[docs]defto_unit(self,unit):"""Convert map to different unit Parameters ---------- unit : `~astropy.unit.Unit` or str New unit Returns ------- map : `Map` Map with new unit and converted data """data=self.quantity.to_value(unit)returnself.from_geom(self.geom,data=data,unit=unit)
[docs]defis_allclose(self,other,rtol_axes=1e-3,atol_axes=1e-6,**kwargs):"""Compare two Maps for close equivalency Parameters ---------- other : `gammapy.maps.Map` The Map to compare against rtol_axes : float Relative tolerance for the axes comparison. atol_axes : float Relative tolerance for the axes comparison. **kwargs : dict keywords passed to `numpy.allclose` Returns ------- is_allclose : bool Whether the Map is all close. """ifnotisinstance(other,self.__class__):returnTypeError(f"Cannot compare {type(self)} and {type(other)}")ifself.data.shape!=other.data.shape:returnFalseaxes_eq=self.axes.is_allclose(other.axes,rtol=rtol_axes,atol=atol_axes)data_eq=np.allclose(self.quantity,other.quantity,**kwargs)returnaxes_eqanddata_eq
def__repr__(self):geom=self.geom.__class__.__name__axes=["skycoord"]ifself.geom.is_hpxelse["lon","lat"]axes=axes+[_.namefor_inself.geom.axes]return(f"{self.__class__.__name__}\n\n"f"\tgeom : {geom}\n "f"\taxes : {axes}\n"f"\tshape : {self.geom.data_shape[::-1]}\n"f"\tndim : {self.geom.ndim}\n"f"\tunit : {self.unit}\n"f"\tdtype : {self.data.dtype}\n")def_arithmetics(self,operator,other,copy):"""Perform arithmetic on maps after checking geometry consistency."""ifisinstance(other,Map):ifself.geom==other.geom:q=other.quantityelse:raiseValueError("Map Arithmetic: Inconsistent geometries.")else:q=u.Quantity(other,copy=False)out=self.copy()ifcopyelseselfout.quantity=operator(out.quantity,q)returnoutdef_boolean_arithmetics(self,operator,other,copy):"""Perform arithmetic on maps after checking geometry consistency."""ifoperator==np.logical_not:out=self.copy()out.data=operator(out.data)returnoutifisinstance(other,Map):ifself.geom==other.geom:other=other.dataelse:raiseValueError("Map Arithmetic: Inconsistent geometries.")out=self.copy()ifcopyelseselfout.data=operator(out.data,other)returnoutdef__add__(self,other):returnself._arithmetics(np.add,other,copy=True)def__iadd__(self,other):returnself._arithmetics(np.add,other,copy=False)def__sub__(self,other):returnself._arithmetics(np.subtract,other,copy=True)def__isub__(self,other):returnself._arithmetics(np.subtract,other,copy=False)def__mul__(self,other):returnself._arithmetics(np.multiply,other,copy=True)def__imul__(self,other):returnself._arithmetics(np.multiply,other,copy=False)def__truediv__(self,other):returnself._arithmetics(np.true_divide,other,copy=True)def__itruediv__(self,other):returnself._arithmetics(np.true_divide,other,copy=False)def__le__(self,other):returnself._arithmetics(np.less_equal,other,copy=True)def__lt__(self,other):returnself._arithmetics(np.less,other,copy=True)def__ge__(self,other):returnself._arithmetics(np.greater_equal,other,copy=True)def__gt__(self,other):returnself._arithmetics(np.greater,other,copy=True)def__eq__(self,other):returnself._arithmetics(np.equal,other,copy=True)def__ne__(self,other):returnself._arithmetics(np.not_equal,other,copy=True)def__and__(self,other):returnself._boolean_arithmetics(np.logical_and,other,copy=True)def__or__(self,other):returnself._boolean_arithmetics(np.logical_or,other,copy=True)def__invert__(self):returnself._boolean_arithmetics(np.logical_not,None,copy=True)def__xor__(self,other):returnself._boolean_arithmetics(np.logical_xor,other,copy=True)def__iand__(self,other):returnself._boolean_arithmetics(np.logical_and,other,copy=False)def__ior__(self,other):returnself._boolean_arithmetics(np.logical_or,other,copy=False)def__ixor__(self,other):returnself._boolean_arithmetics(np.logical_xor,other,copy=False)def__array__(self):returnself.data