[docs]classFoVAlignment(str,Enum):""" Orientation of the Field of View Coordinate System Currently, only two possible alignments are supported: alignment with the horizontal coordinate system (ALTAZ) and alignment with the equatorial coordinate system (RADEC). """ALTAZ="ALTAZ"RADEC="RADEC"
classIRF(metaclass=abc.ABCMeta):"""IRF base class for DL3 instrument response functions Parameters ----------- axes : list of `MapAxis` or `MapAxes` Axes data : `~numpy.ndarray` or `~astropy.units.Quantity` Data unit : str or `~astropy.units.Unit` Unit, ignored if data is a Quantity. is_pointlike: boolean True for point-like IRFs, False for full-enclosure. fov_alignment: `FoVAlignment` The orientation of the field of view coordinate system. meta : dict Meta data """default_interp_kwargs=dict(bounds_error=False,fill_value=0.0,)def__init__(self,axes,data=0,unit="",is_pointlike=False,fov_alignment=FoVAlignment.RADEC,meta=None,interp_kwargs=None,):axes=MapAxes(axes)axes.assert_names(self.required_axes)self._axes=axesself._fov_alignment=FoVAlignment(fov_alignment)self._is_pointlike=is_pointlikeifisinstance(data,u.Quantity):self.data=data.valueifnotself.default_unit.is_equivalent(data.unit):raiseValueError(f"Error: {data.unit} is not an allowed unit. {self.tag} "f"requires {self.default_unit} data quantities.")else:self._unit=data.unitelse:self.data=dataself._unit=unitself.meta=metaor{}ifinterp_kwargsisNone:interp_kwargs=self.default_interp_kwargs.copy()self.interp_kwargs=interp_kwargs@property@abc.abstractmethoddeftag(self):pass@property@abc.abstractmethoddefrequired_axes(self):pass@propertydefis_pointlike(self):"""Whether the IRF is pointlike of full containment."""returnself._is_pointlike@propertydefhas_offset_axis(self):"""Whether the IRF explicitly depends on offset"""return"offset"inself.required_axes@propertydeffov_alignment(self):"""Alignment of the field of view coordinate axes, see `FoVAlignment`"""returnself._fov_alignment@propertydefdata(self):returnself._data@data.setterdefdata(self,value):"""Set data Parameters ---------- value : array-like Data array """required_shape=self.axes.shapeifnp.isscalar(value):value=value*np.ones(required_shape)ifisinstance(value,u.Quantity):raiseTypeError("Map data must be a Numpy array. Set unit separately")ifnp.shape(value)!=required_shape:raiseValueError(f"data shape {value.shape} does not match"f"axes shape {required_shape}")self._data=value# reset cached interpolatorsself.__dict__.pop("_interpolate",None)self.__dict__.pop("_integrate_rad",None)definterp_missing_data(self,axis_name):"""Interpolate missing data along a given axis"""data=self.data.copy()values_scale=self.interp_kwargs.get("values_scale","lin")scale=interpolation_scale(values_scale)axis=self.axes.index(axis_name)mask=~np.isfinite(data)|(data==0.0)coords=np.where(mask)xp=np.arange(data.shape[axis])forcoordinzip(*coords):idx=list(coord)idx[axis]=slice(None)fp=data[tuple(idx)]valid=~mask[tuple(idx)]ifnp.any(valid):value=np.interp(x=coord[axis],xp=xp[valid],fp=scale(fp[valid]),left=np.nan,right=np.nan,)ifnotnp.isnan(value):data[coord]=scale.inverse(value)self.data=data# reset cached values@propertydefunit(self):"""Map unit (`~astropy.units.Unit`)"""returnself._unit@lazypropertydef_interpolate(self):kwargs=self.interp_kwargs.copy()# Allow extrap[olation with in binskwargs["fill_value"]=Nonepoints=[a.centerforainself.axes]points_scale=tuple([a.interpforainself.axes])returnScaledRegularGridInterpolator(points,self.quantity,points_scale=points_scale,**kwargs,)@propertydefquantity(self):"""`~astropy.units.Quantity`"""returnu.Quantity(self.data,unit=self.unit,copy=False)@quantity.setterdefquantity(self,val):"""Set data and unit Parameters ---------- value : `~astropy.units.Quantity` Quantity """val=u.Quantity(val,copy=False)self.data=val.valueself._unit=val.unitdefto_unit(self,unit):"""Convert irf to different unit Parameters ---------- unit : `~astropy.unit.Unit` or str New unit Returns ------- irf : `IRF` IRF with new unit and converted data """data=self.quantity.to_value(unit)returnself.__class__(self.axes,data=data,meta=self.meta,interp_kwargs=self.interp_kwargs)@propertydefaxes(self):"""`MapAxes`"""returnself._axesdef__str__(self):str_=f"{self.__class__.__name__}\n"str_+="-"*len(self.__class__.__name__)+"\n\n"str_+=f"\taxes : {self.axes.names}\n"str_+=f"\tshape : {self.data.shape}\n"str_+=f"\tndim : {len(self.axes)}\n"str_+=f"\tunit : {self.unit}\n"str_+=f"\tdtype : {self.data.dtype}\n"returnstr_.expandtabs(tabsize=2)defevaluate(self,method=None,**kwargs):"""Evaluate IRF Parameters ---------- **kwargs : dict Coordinates at which to evaluate the IRF method : str {'linear', 'nearest'}, optional Interpolation method Returns ------- array : `~astropy.units.Quantity` Interpolated values """# TODO: change to coord dict?non_valid_axis=set(kwargs).difference(self.axes.names)ifnon_valid_axis:raiseValueError(f"Not a valid coordinate axis {non_valid_axis}"f" Choose from: {self.axes.names}")coords_default=self.axes.get_coord()forkey,valueinkwargs.items():coord=kwargs.get(key,value)ifcoordisnotNone:coords_default[key]=u.Quantity(coord,copy=False)data=self._interpolate(coords_default.values(),method=method)ifself.interp_kwargs["fill_value"]isnotNone:idxs=self.axes.coord_to_idx(coords_default,clip=False)invalid=np.broadcast_arrays(*[idx==-1foridxinidxs])mask=self._mask_out_bounds(invalid)ifnotdata.shape:mask=mask.squeeze()data[mask]=self.interp_kwargs["fill_value"]data[~np.isfinite(data)]=self.interp_kwargs["fill_value"]returndata@staticmethoddef_mask_out_bounds(invalid):returnnp.any(invalid,axis=0)defintegrate_log_log(self,axis_name,**kwargs):"""Integrate along a given axis. This method uses log-log trapezoidal integration. Parameters ---------- axis_name : str Along which axis to integrate. **kwargs : dict Coordinates at which to evaluate the IRF Returns ------- array : `~astropy.units.Quantity` Returns 2D array with axes offset """axis=self.axes.index(axis_name)data=self.evaluate(**kwargs,method="linear")values=kwargs[axis_name]returntrapz_loglog(data,values,axis=axis)defcumsum(self,axis_name):"""Compute cumsum along a given axis Parameters ---------- axis_name : str Along which axis to integrate. Returns ------- irf : `~IRF` Cumsum IRF """axis=self.axes[axis_name]axis_idx=self.axes.index(axis_name)# TODO: the broadcasting should be done by axis.center, axis.bin_width etc.shape=[1]*len(self.axes)shape[axis_idx]=-1values=self.quantity*axis.bin_width.reshape(shape)ifaxis_name=="rad":# take Jacobian into accountvalues=2*np.pi*axis.center.reshape(shape)*valuesdata=values.cumsum(axis=axis_idx)axis_shifted=MapAxis.from_nodes(axis.edges[1:],name=axis.name,interp=axis.interp)axes=self.axes.replace(axis_shifted)returnself.__class__(axes=axes,data=data.value,unit=data.unit)defintegral(self,axis_name,**kwargs):"""Compute integral along a given axis This method uses interpolation of the cumulative sum. Parameters ---------- axis_name : str Along which axis to integrate. **kwargs : dict Coordinates at which to evaluate the IRF Returns ------- array : `~astropy.units.Quantity` Returns 2D array with axes offset """cumsum=self.cumsum(axis_name=axis_name)returncumsum.evaluate(**kwargs)defnormalize(self,axis_name):"""Normalise data in place along a given axis. Parameters ---------- axis_name : str Along which axis to normalize. """cumsum=self.cumsum(axis_name=axis_name).quantitywithnp.errstate(invalid="ignore",divide="ignore"):axis=self.axes.index(axis_name=axis_name)normed=self.quantity/cumsum.max(axis=axis,keepdims=True)self.quantity=np.nan_to_num(normed)@classmethoddeffrom_hdulist(cls,hdulist,hdu=None,format="gadf-dl3"):"""Create from `~astropy.io.fits.HDUList`. Parameters ---------- hdulist : `~astropy.io.HDUList` HDU list hdu : str HDU name format : {"gadf-dl3"} Format specification Returns ------- irf : `IRF` IRF class """ifhduisNone:hdu=IRF_DL3_HDU_SPECIFICATION[cls.tag]["extname"]returncls.from_table(Table.read(hdulist[hdu]),format=format)@classmethoddefread(cls,filename,hdu=None,format="gadf-dl3"):"""Read from file. Parameters ---------- filename : str or `Path` Filename hdu : str HDU name format : {"gadf-dl3"} Format specification Returns ------- irf : `IRF` IRF class """withfits.open(str(make_path(filename)),memmap=False)ashdulist:returncls.from_hdulist(hdulist,hdu=hdu)@classmethoddeffrom_table(cls,table,format="gadf-dl3"):"""Read from `~astropy.table.Table`. Parameters ---------- table : `~astropy.table.Table` Table with irf data format : {"gadf-dl3"} Format specification Returns ------- irf : `IRF` IRF class. """axes=MapAxes.from_table(table=table,format=format)axes=axes[cls.required_axes]column_name=IRF_DL3_HDU_SPECIFICATION[cls.tag]["column_name"]data=table[column_name].quantity[0].transpose()returncls(axes=axes,data=data.value,meta=table.meta,unit=data.unit,is_pointlike=gadf_is_pointlike(table.meta),fov_alignment=table.meta.get("FOVALIGN","RADEC"),)defto_table(self,format="gadf-dl3"):"""Convert to table Parameters ---------- format : {"gadf-dl3"} Format specification Returns ------- table : `~astropy.table.Table` IRF data table """table=self.axes.to_table(format=format)ifformat=="gadf-dl3":table.meta=self.meta.copy()spec=IRF_DL3_HDU_SPECIFICATION[self.tag]table.meta.update(spec["mandatory_keywords"])if"FOVALIGN"intable.meta:table.meta["FOVALIGN"]=self.fov_alignment.valueifself.is_pointlike:table.meta["HDUCLAS3"]="POINT-LIKE"else:table.meta["HDUCLAS3"]="FULL-ENCLOSURE"table[spec["column_name"]]=self.quantity.T[np.newaxis]else:raiseValueError(f"Not a valid supported format: '{format}'")returntabledefto_table_hdu(self,format="gadf-dl3"):"""Convert to `~astropy.io.fits.BinTableHDU`. Parameters ---------- format : {"gadf-dl3"} Format specification Returns ------- hdu : `~astropy.io.fits.BinTableHDU` IRF data table hdu """name=IRF_DL3_HDU_SPECIFICATION[self.tag]["extname"]returnfits.BinTableHDU(self.to_table(format=format),name=name)defto_hdulist(self,format="gadf-dl3"):""""""hdu=self.to_table_hdu(format=format)returnfits.HDUList([fits.PrimaryHDU(),hdu])defwrite(self,filename,*args,**kwargs):"""Write IRF to fits. Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments. """self.to_hdulist().writeto(str(make_path(filename)),*args,**kwargs)defpad(self,pad_width,axis_name,**kwargs):"""Pad irf along a given axis. Parameters ---------- pad_width : {sequence, array_like, int} Number of pixels padded to the edges of each axis. axis_name : str Which axis to downsample. By default spatial axes are padded. **kwargs : dict Keyword argument forwarded to `~numpy.pad` Returns ------- irf : `IRF` Padded irf """ifnp.isscalar(pad_width):pad_width=(pad_width,pad_width)idx=self.axes.index(axis_name)pad_width_np=[(0,0)]*self.data.ndimpad_width_np[idx]=pad_widthkwargs.setdefault("mode","constant")axes=self.axes.pad(axis_name=axis_name,pad_width=pad_width)data=np.pad(self.data,pad_width=pad_width_np,**kwargs)returnself.__class__(data=data,axes=axes,meta=self.meta.copy(),unit=self.unit)defslice_by_idx(self,slices):"""Slice sub IRF from IRF object. Parameters ---------- slices : dict Dict of axes names and `slice` object pairs. Contains one element for each non-spatial dimension. Axes not specified in the dict are kept unchanged. Returns ------- sliced : `IRF` Sliced IRF object. """axes=self.axes.slice_by_idx(slices)diff=set(self.axes.names).difference(axes.names)ifdiff:diff_slice={key:valueforkey,valueinslices.items()ifkeyindiff}raiseValueError(f"Integer indexing not supported, got {diff_slice}")slices=tuple([slices.get(ax.name,slice(None))foraxinself.axes])data=self.data[slices]returnself.__class__(axes=axes,data=data,unit=self.unit,meta=self.meta)defis_allclose(self,other,rtol_axes=1e-3,atol_axes=1e-6,**kwargs):"""Compare two data IRFs for equivalency Parameters ---------- other : `gammapy.irfs.IRF` The irf to compare against rtol_axes : float Relative tolerance for the axes comparison. atol_axes : float Relative tolerance for the axes comparison. **kwargs : dict keywords passed to `numpy.allclose` Returns ------- is_allclose : bool Whether the IRF is all close. """ifnotisinstance(other,self.__class__):returnTypeError(f"Cannot compare {type(self)} and {type(other)}")ifself.data.shape!=other.data.shape:returnFalseaxes_eq=self.axes.is_allclose(other.axes,rtol=rtol_axes,atol=atol_axes)data_eq=np.allclose(self.quantity,other.quantity,**kwargs)returnaxes_eqanddata_eqdef__eq__(self,other):ifnotisinstance(other,self.__class__):returnFalsereturnself.is_allclose(other=other,rtol=1e-3,rtol_axes=1e-6)
[docs]classIRFMap:"""IRF map base class for DL4 instrument response functions"""def__init__(self,irf_map,exposure_map):self._irf_map=irf_mapself.exposure_map=exposure_mapirf_map.geom.axes.assert_names(self.required_axes)@property@abc.abstractmethoddeftag(self):pass@property@abc.abstractmethoddefrequired_axes(self):pass# TODO: add mask safe to IRFMap as a regular attribute and don't derive it from the data@propertydefmask_safe_image(self):"""Mask safe for the map"""mask=self._irf_map>(0*self._irf_map.unit)returnmask.reduce_over_axes(func=np.logical_or)
[docs]defto_region_nd_map(self,region):"""Extract IRFMap in a given region or position If a region is given a mean IRF is computed, if a position is given the IRF is interpolated. Parameters ---------- region : `~regions.SkyRegion` or `~astropy.coordinates.SkyCoord` Region or position where to get the map. Returns ------- irf : `IRFMap` IRF map with region geometry. """ifregionisNone:region=self._irf_map.geom.center_skydir# TODO: compute an exposure weighted mean PSF herekwargs={"region":region,"func":np.nanmean}if"energy"inself._irf_map.geom.axes.names:kwargs["method"]="nearest"irf_map=self._irf_map.to_region_nd_map(**kwargs)ifself.exposure_map:exposure_map=self.exposure_map.to_region_nd_map(**kwargs)else:exposure_map=Nonereturnself.__class__(irf_map,exposure_map=exposure_map)
def_get_nearest_valid_position(self,position):"""Get nearest valid position"""is_valid=np.nan_to_num(self.mask_safe_image.get_by_coord(position))[0]ifnotis_validandnp.any(self.mask_safe_image>0):log.warning(f"Position {position} is outside ""valid IRF map range, using nearest IRF defined within")position=self.mask_safe_image.mask_nearest_position(position)returnposition
[docs]@classmethoddeffrom_hdulist(cls,hdulist,hdu=None,hdu_bands=None,exposure_hdu=None,exposure_hdu_bands=None,format="gadf",):"""Create from `~astropy.io.fits.HDUList`. Parameters ---------- hdulist : `~astropy.fits.HDUList` HDU list. hdu : str Name or index of the HDU with the IRF map. hdu_bands : str Name or index of the HDU with the IRF map BANDS table. exposure_hdu : str Name or index of the HDU with the exposure map data. exposure_hdu_bands : str Name or index of the HDU with the exposure map BANDS table. format : {"gadf", "gtpsf"} File format Returns ------- irf_map : `IRFMap` IRF map. """output_class=clsifformat=="gadf":ifhduisNone:hdu=IRF_MAP_HDU_SPECIFICATION[cls.tag]irf_map=Map.from_hdulist(hdulist,hdu=hdu,hdu_bands=hdu_bands,format=format)ifexposure_hduisNone:exposure_hdu=IRF_MAP_HDU_SPECIFICATION[cls.tag]+"_exposure"ifexposure_hduinhdulist:exposure_map=Map.from_hdulist(hdulist,hdu=exposure_hdu,hdu_bands=exposure_hdu_bands,format=format,)else:exposure_map=Noneifcls.tag=="psf_map"and"energy"inirf_map.geom.axes.names:from.psfimportRecoPSFMapoutput_class=RecoPSFMapifcls.tag=="edisp_map"andirf_map.geom.axes[0].name=="energy":from.edispimportEDispKernelMapoutput_class=EDispKernelMapelifformat=="gtpsf":rad_axis=MapAxis.from_table_hdu(hdulist["THETA"],format=format)table=Table.read(hdulist["PSF"])energy_axis_true=MapAxis.from_table(table,format=format)geom_psf=RegionGeom.create(region=None,axes=[rad_axis,energy_axis_true])psf_map=Map.from_geom(geom=geom_psf,data=table["Psf"].data,unit="sr-1")geom_exposure=geom_psf.squash("rad")exposure_map=Map.from_geom(geom=geom_exposure,data=table["Exposure"].data,unit="cm2 s")returncls(psf_map=psf_map,exposure_map=exposure_map)else:raiseValueError(f"Format {format} not supported")returnoutput_class(irf_map,exposure_map)
[docs]@classmethoddefread(cls,filename,format="gadf",hdu=None):"""Read an IRF_map from file and create corresponding object" Parameters ---------- filename : str or `Path` File name format : {"gadf", "gtpsf"} File format hdu : str or int HDU location Returns ------- irf_map : `PSFMap`, `EDispMap` or `EDispKernelMap` IRF map """filename=make_path(filename)withfits.open(filename,memmap=False)ashdulist:returncls.from_hdulist(hdulist,format=format,hdu=hdu)
[docs]defto_hdulist(self,format="gadf"):"""Convert to `~astropy.io.fits.HDUList`. Parameters ---------- format : {"gadf", "gtpsf"} File format Returns ------- hdu_list : `~astropy.io.fits.HDUList` HDU list. """ifformat=="gadf":hdu=IRF_MAP_HDU_SPECIFICATION[self.tag]hdulist=self._irf_map.to_hdulist(hdu=hdu,format=format)exposure_hdu=hdu+"_exposure"ifself.exposure_mapisnotNone:new_hdulist=self.exposure_map.to_hdulist(hdu=exposure_hdu,format=format)hdulist.extend(new_hdulist[1:])elifformat=="gtpsf":ifnotself._irf_map.geom.is_region:raiseValueError("Format 'gtpsf' is only supported for region geometries")rad_hdu=self._irf_map.geom.axes["rad"].to_table_hdu(format=format)psf_table=self._irf_map.geom.axes["energy_true"].to_table(format=format)psf_table["Exposure"]=self.exposure_map.quantity[...,0,0].to("cm^2 s")psf_table["Psf"]=self._irf_map.quantity[...,0,0].to("sr^-1")psf_hdu=fits.BinTableHDU(data=psf_table,name="PSF")hdulist=fits.HDUList([fits.PrimaryHDU(),rad_hdu,psf_hdu])else:raiseValueError(f"Format {format} not supported")returnhdulist
[docs]defwrite(self,filename,overwrite=False,format="gadf"):"""Write IRF map to fits Parameters ---------- filename : str or `Path` Filename to write to overwrite : bool Whether to overwrite format : {"gadf", "gtpsf"} File format """hdulist=self.to_hdulist(format=format)hdulist.writeto(str(filename),overwrite=overwrite)
[docs]defstack(self,other,weights=None,nan_to_num=True):"""Stack IRF map with another one in place. Parameters ---------- other : `~gammapy.irf.IRFMap` IRF map to be stacked with this one. weights : `~gammapy.maps.Map` Map with stacking weights. nan_to_num: bool Non-finite values are replaced by zero if True (default). """ifself.exposure_mapisNoneorother.exposure_mapisNone:raiseValueError(f"Missing exposure map for {self.__class__.__name__}.stack")cutout_info=getattr(other._irf_map.geom,"cutout_info",None)ifcutout_infoisnotNone:slices=cutout_info["parent-slices"]parent_slices=Ellipsis,slices[0],slices[1]else:parent_slices=slice(None)self._irf_map.data[parent_slices]*=self.exposure_map.data[parent_slices]self._irf_map.stack(other._irf_map*other.exposure_map.data,weights=weights,nan_to_num=nan_to_num,)# stack exposure mapifweightsand"energy"inweights.geom.axes.names:weights=weights.reduce(axis_name="energy",func=np.logical_or,keepdims=True)self.exposure_map.stack(other.exposure_map,weights=weights,nan_to_num=nan_to_num)withnp.errstate(invalid="ignore"):self._irf_map.data[parent_slices]/=self.exposure_map.data[parent_slices]self._irf_map.data=np.nan_to_num(self._irf_map.data)
[docs]defcutout(self,position,width,mode="trim"):"""Cutout IRF map. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Center position of the cutout region. width : tuple of `~astropy.coordinates.Angle` Angular sizes of the region in (lon, lat) in that specific order. If only one value is passed, a square region is extracted. mode : {'trim', 'partial', 'strict'} Mode option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`. Returns ------- cutout : `IRFMap` Cutout IRF map. """irf_map=self._irf_map.cutout(position,width,mode)ifself.exposure_map:exposure_map=self.exposure_map.cutout(position,width,mode)else:exposure_map=Nonereturnself.__class__(irf_map,exposure_map=exposure_map)
[docs]defdownsample(self,factor,axis_name=None,weights=None):"""Downsample the spatial dimension by a given factor. Parameters ---------- factor : int Downsampling factor. axis_name : str Which axis to downsample. By default spatial axes are downsampled. weights : `~gammapy.maps.Map` Map with weights downsampling. Returns ------- map : `IRFMap` Downsampled irf map. """irf_map=self._irf_map.downsample(factor=factor,axis_name=axis_name,preserve_counts=True,weights=weights)ifaxis_nameisNone:exposure_map=self.exposure_map.downsample(factor=factor,preserve_counts=False)else:exposure_map=self.exposure_map.copy()returnself.__class__(irf_map,exposure_map=exposure_map)
[docs]defslice_by_idx(self,slices):"""Slice sub dataset. The slicing only applies to the maps that define the corresponding axes. Parameters ---------- slices : dict Dict of axes names and integers or `slice` object pairs. Contains one element for each non-spatial dimension. For integer indexing the corresponding axes is dropped from the map. Axes not specified in the dict are kept unchanged. Returns ------- map_out : `IRFMap` Sliced irf map object. """irf_map=self._irf_map.slice_by_idx(slices=slices)if"energy_true"inslicesandself.exposure_map:exposure_map=self.exposure_map.slice_by_idx(slices=slices)else:exposure_map=self.exposure_mapreturnself.__class__(irf_map,exposure_map=exposure_map)