# Licensed under a 3-clause BSD style license - see LICENSE.rstimportloggingimportnumpyasnpfromastropyimportunitsasufromastropy.ioimportfitsfromastropy.utilsimportclasspropertyfromgammapy.dataimportGTIfromgammapy.mapsimportMap,Maps,TimeMapAxisfromgammapy.modeling.modelsimport(Models,PowerLawSpectralModel,SkyModel,SpectralModel,)fromgammapy.utils.scriptsimportmake_path__all__=["FluxMaps"]log=logging.getLogger(__name__)DEFAULT_UNIT={"dnde":u.Unit("cm-2 s-1 TeV-1"),"e2dnde":u.Unit("erg cm-2 s-1"),"flux":u.Unit("cm-2 s-1"),"eflux":u.Unit("erg cm-2 s-1"),"norm":u.Unit(""),}REQUIRED_MAPS={"dnde":["dnde"],"e2dnde":["e2dnde"],"flux":["flux"],"eflux":["eflux"],"likelihood":["norm"],}REQUIRED_COLUMNS={"dnde":["e_ref","dnde"],"e2dnde":["e_ref","e2dnde"],"flux":["e_min","e_max","flux"],"eflux":["e_min","e_max","eflux"],# TODO: extend required columns"likelihood":["e_min","e_max","e_ref","ref_dnde","ref_flux","ref_eflux","norm",],}REQUIRED_QUANTITIES_SCAN=["stat_scan","stat"]OPTIONAL_QUANTITIES={"dnde":["dnde_err","dnde_errp","dnde_errn","dnde_ul"],"e2dnde":["e2dnde_err","e2dnde_errp","e2dnde_errn","e2dnde_ul"],"flux":["flux_err","flux_errp","flux_errn","flux_ul","flux_sensitivity"],"eflux":["eflux_err","eflux_errp","eflux_errn","eflux_ul"],"likelihood":["norm_err","norm_errn","norm_errp","norm_ul"],}VALID_QUANTITIES=["norm","norm_err","norm_errn","norm_errp","norm_ul","norm_sensitivity","ts","sqrt_ts","npred","npred_excess","stat","stat_scan","stat_null","niter","is_ul","counts","success","n_dof",]OPTIONAL_QUANTITIES_COMMON=["ts","sqrt_ts","npred","npred_excess","stat","stat_null","niter","is_ul","counts","success","n_dof",]
[docs]classFluxMaps:"""A flux map / points container. It contains a set of `~gammapy.maps.Map` objects that store the estimated flux as a function of energy as well as associated quantities (typically errors, upper limits, delta TS and possibly raw quantities such counts, excesses etc). It also contains a reference model to convert the flux values in different formats. Usually, this should be the model used to produce the flux map. The associated map geometry can use a `RegionGeom` to store the equivalent of flux points, or a `WcsGeom`/`HpxGeom` to store an energy dependent flux map. The container relies internally on the 'Likelihood' SED type defined in :ref:`gadf:flux-points` and offers convenience properties to convert to other flux formats, namely: ``dnde``, ``flux``, ``eflux`` or ``e2dnde``. The conversion is done according to the reference model spectral shape. Parameters ---------- data : dict of `~gammapy.maps.Map` The maps dictionary. Expected entries are the following: * norm : the norm factor. * norm_err : optional, the error on the norm factor. * norm_errn : optional, the negative error on the norm factor. * norm_errp : optional, the positive error on the norm factor. * norm_ul : optional, the upper limit on the norm factor. * norm_scan : optional, the norm values of the test statistic scan. * stat_scan : optional, the test statistic scan values. * ts : optional, the delta test statistic associated with the flux value. * sqrt_ts : optional, the square root of the test statistic, when relevant. * success : optional, a boolean tagging the validity of the estimation. * n_dof : optional, the number of degrees of freedom used in TS computation reference_model : `~gammapy.modeling.models.SkyModel`, optional The reference model to use for conversions. If None, a model consisting of a point source with a power law spectrum of index 2 is assumed. meta : dict, optional Dict of metadata. gti : `~gammapy.data.GTI`, optional Maps GTI information. filter_success_nan : boolean, optional Set fitted norm and error to NaN when the fit has not succeeded. """_expand_slice=(slice(None),np.newaxis,np.newaxis)def__init__(self,data,reference_model,meta=None,gti=None,filter_success_nan=True):self._data=dataifisinstance(reference_model,SpectralModel):reference_model=SkyModel(reference_model)self._reference_model=reference_modelifmetaisNone:meta={}meta.setdefault("sed_type_init","likelihood")self.meta=metaself.gti=gtiself._filter_success_nan=filter_success_nan@propertydeffilter_success_nan(self):returnself._filter_success_nan@filter_success_nan.setterdeffilter_success_nan(self,value):self._filter_success_nan=value@propertydefavailable_quantities(self):"""Available quantities"""returnlist(self._data.keys())
[docs]@staticmethoddefall_quantities(sed_type):"""All quantities allowed for a given SED type. Parameters ---------- sed_type : {"likelihood", "dnde", "e2dnde", "flux", "eflux"} SED type. Returns ------- list : list of str All allowed quantities for a given SED type. """quantities=[]quantities+=REQUIRED_MAPS[sed_type]quantities+=OPTIONAL_QUANTITIES[sed_type]quantities+=OPTIONAL_QUANTITIES_COMMONifsed_type=="likelihood":quantities+=REQUIRED_QUANTITIES_SCANreturnquantities
@staticmethoddef_validate_data(data,sed_type,check_scan=False):"""Check that map input is valid and correspond to one of the SED type."""try:keys=data.keys()required=set(REQUIRED_MAPS[sed_type])exceptKeyError:raiseValueError(f"Unknown SED type: '{sed_type}'")ifcheck_scan:required=required.union(REQUIRED_QUANTITIES_SCAN)ifnotrequired.issubset(keys):missing=required.difference(keys)raiseValueError("Missing data / column for SED type '{}':"" {}".format(sed_type,missing))# TODO: add support for scandef_check_quantity(self,quantity):ifquantitynotinself.available_quantities:raiseAttributeError(f"Quantity '{quantity}' is not defined on current flux estimate.")@staticmethoddef_guess_sed_type(quantities):"""Guess SED type from table content."""valid_sed_types=list(REQUIRED_COLUMNS.keys())forsed_typeinvalid_sed_types:required=set(REQUIRED_COLUMNS[sed_type])ifrequired.issubset(quantities):returnsed_type@propertydefis_convertible_to_flux_sed_type(self):"""Check whether differential SED type is convertible to integral SED type."""ifself.sed_type_initin["dnde","e2dnde"]:returnself.energy_axis.node_type=="edges"returnTrue@propertydefhas_ul(self):"""Whether the flux estimate has norm_ul defined."""return"norm_ul"inself._data@propertydefhas_any_ts(self):"""Whether the flux estimate has either sqrt(TS) or test statistic defined."""returnany(_inself._datafor_in["ts","sqrt_ts"])@propertydefhas_stat_profiles(self):"""Whether the flux estimate has test statistic profiles."""return"stat_scan"inself._data@propertydefhas_success(self):"""Whether the flux estimate has the fit status."""return"success"inself._data@propertydefn_sigma(self):"""n sigma"""returnself.meta.get("n_sigma",1)@propertydefn_sigma_ul(self):"""n sigma UL."""returnself.meta.get("n_sigma_ul")@propertydefsqrt_ts_threshold_ul(self):"""sqrt(TS) threshold for upper limits."""returnself.meta.get("sqrt_ts_threshold_ul",2)@sqrt_ts_threshold_ul.setterdefsqrt_ts_threshold_ul(self,value):"""sqrt(TS) threshold for upper limits. Parameters ---------- value : int Threshold value in sqrt(TS) for upper limits. """self.meta["sqrt_ts_threshold_ul"]=valueifself.has_any_ts:self.is_ul=self.sqrt_ts<self.sqrt_ts_threshold_ulelse:raiseValueError("Either TS or sqrt_ts is required to set the threshold")@propertydefsed_type_init(self):"""Initial SED type."""returnself.meta.get("sed_type_init")@propertydefsed_type_plot_default(self):"""Initial SED type."""ifself.sed_type_init=="likelihood":return"dnde"returnself.sed_type_init@propertydefgeom(self):"""Reference map geometry as a `~gammapy.maps.Geom`."""returnself.norm.geom@propertydefenergy_axis(self):"""Energy axis as a `~gammapy.maps.MapAxis`."""returnself.geom.axes["energy"]@classpropertydefreference_model_default(self):"""Reference model default as a `~gammapy.modeling.models.SkyModel`."""returnSkyModel(PowerLawSpectralModel(index=2))@propertydefreference_model(self):"""Reference model as a `~gammapy.modeling.models.SkyModel`."""returnself._reference_model@propertydefreference_spectral_model(self):"""Reference spectral model as a `SpectralModel`"""returnself.reference_model.spectral_model@propertydefenergy_ref(self):"""Reference energy. Defined by `energy_ref` column in `FluxPoints.table` or computed as log center, if `energy_min` and `energy_max` columns are present in `FluxEstimate.data`. Returns ------- energy_ref : `~astropy.units.Quantity` Reference energy. """returnself.energy_axis.center@propertydefenergy_min(self):"""Energy minimum. Returns ------- energy_min : `~astropy.units.Quantity` Lower bound of energy bin. """returnself.energy_axis.edges[:-1]@propertydefenergy_max(self):"""Energy maximum. Returns ------- energy_max : `~astropy.units.Quantity` Upper bound of energy bin. """returnself.energy_axis.edges[1:]# TODO: keep or remove?@propertydefniter(self):"""Number of iterations of fit."""self._check_quantity("niter")returnself._data["niter"]@propertydefsuccess(self):"""Fit success flag."""self._check_quantity("success")returnself._data["success"]@propertydefis_ul(self):"""Whether data is an upper limit."""# TODO: make this a well defined behaviouris_ul=self.norm.copy(data=False)if"is_ul"inself._data:is_ul=self._data["is_ul"]elifself.has_any_tsandself.has_ul:is_ul.data=self.sqrt_ts.data<self.sqrt_ts_threshold_ulelifself.has_ul:is_ul.data=np.isfinite(self.norm_ul)else:is_ul.data=np.isnan(self.norm)returnis_ul@is_ul.setterdefis_ul(self,value):"""Whether data is an upper limit. Parameters ---------- value : `~Map` Boolean map. """ifnotisinstance(value,Map):value=self.norm.copy(data=value)self._data["is_ul"]=value@propertydefcounts(self):"""Predicted counts null hypothesis."""self._check_quantity("counts")returnself._data["counts"]@propertydefnpred(self):"""Predicted counts from best fit hypothesis."""self._check_quantity("npred")returnself._data["npred"]@propertydefnpred_background(self):"""Predicted background counts from best fit hypothesis."""self._check_quantity("npred")self._check_quantity("npred_excess")returnself.npred-self.npred_excess@propertydefnpred_excess(self):"""Predicted excess count from best fit hypothesis."""self._check_quantity("npred_excess")returnself._data["npred_excess"]def_expand_dims(self,data):# TODO: instead make map support broadcastingaxes=self.npred.geom.axes# here we need to rely on broadcastingif"dataset"inaxes.names:idx=axes.index_data("dataset")data=np.expand_dims(data,axis=idx)returndata@staticmethoddef_use_center_as_labels(input_map):"""Change the node_type of the input map."""energy_axis=input_map.geom.axes["energy"]energy_axis.use_center_as_plot_labels=Truereturninput_map@propertydefnpred_excess_ref(self):"""Predicted excess reference counts."""returnself.npred_excess/self._expand_dims(self.norm.data)@propertydefnpred_excess_err(self):"""Predicted excess counts error."""returnself.npred_excess_ref*self._expand_dims(self.norm_err.data)@propertydefnpred_excess_errp(self):"""Predicted excess counts positive error."""returnself.npred_excess_ref*self._expand_dims(self.norm_errp.data)@propertydefnpred_excess_errn(self):"""Predicted excess counts negative error."""returnself.npred_excess_ref*self._expand_dims(self.norm_errn.data)@propertydefnpred_excess_ul(self):"""Predicted excess counts upper limits."""returnself.npred_excess_ref*self._expand_dims(self.norm_ul.data)@propertydefstat_scan(self):"""Fit statistic scan value."""self._check_quantity("stat_scan")returnself._data["stat_scan"]@propertydefstat(self):"""Fit statistic value."""self._check_quantity("stat")returnself._data["stat"]@propertydefstat_null(self):"""Fit statistic value for the null hypothesis."""self._check_quantity("stat_null")returnself._data["stat_null"]@propertydefts(self):"""Test statistic map as a `~gammapy.maps.Map` object."""self._check_quantity("ts")returnself._data["ts"]@propertydefts_scan(self):"""Test statistic scan as a `~gammapy.maps.Map` object"""returnself.stat_scan-np.expand_dims(self.stat.data,2)# TODO: always derive sqrt(TS) from TS?@propertydefsqrt_ts(self):r"""sqrt(TS) as defined by: .. math:: \sqrt{TS} = \left \{ \begin{array}{ll} -\sqrt{TS} & : \text{if} \ norm < 0 \\ \sqrt{TS} & : \text{else} \end{array} \right. Returns ------- sqrt_ts : `Map` sqrt(TS) map. """if"sqrt_ts"inself._data:returnself._data["sqrt_ts"]else:withnp.errstate(invalid="ignore",divide="ignore"):ts=np.clip(self.ts.data,0,None)data=np.where(self.norm>0,np.sqrt(ts),-np.sqrt(ts))returnMap.from_geom(geom=self.geom,data=data)@propertydefnorm(self):"""Norm values."""returnself._filter_convergence_failure(self._data["norm"])@propertydefnorm_err(self):"""Norm error."""self._check_quantity("norm_err")returnself._filter_convergence_failure(self._data["norm_err"])@propertydefnorm_errn(self):"""Negative norm error."""self._check_quantity("norm_errn")returnself._data["norm_errn"]@propertydefnorm_errp(self):"""Positive norm error."""self._check_quantity("norm_errp")returnself._data["norm_errp"]@propertydefnorm_ul(self):"""Norm upper limit."""self._check_quantity("norm_ul")returnself._data["norm_ul"]@propertydefnorm_sensitivity(self):"""Norm sensitivity."""self._check_quantity("norm_sensitivity")returnself._data["norm_sensitivity"]@propertydefn_dof(self):"""Number of degrees of freedom of the fit per energy bin."""self._check_quantity("n_dof")returnself._data["n_dof"]@propertydefdnde_ref(self):"""Reference differential flux."""result=self.reference_spectral_model(self.energy_axis.center)returnresult[self._expand_slice]@propertydefe2dnde_ref(self):"""Reference differential flux * energy ** 2."""energy=self.energy_axis.centerresult=self.reference_spectral_model(energy)*energy**2returnresult[self._expand_slice]@propertydefflux_ref(self):"""Reference integral flux."""ifnotself.is_convertible_to_flux_sed_type:raiseValueError("Missing energy range definition, cannot convert to SED type 'flux'.")energy_min=self.energy_axis.edges[:-1]energy_max=self.energy_axis.edges[1:]result=self.reference_spectral_model.integral(energy_min,energy_max)returnresult[self._expand_slice]@propertydefeflux_ref(self):"""Reference energy flux."""ifnotself.is_convertible_to_flux_sed_type:raiseValueError("Missing energy range definition, cannot convert to SED type 'eflux'.")energy_min=self.energy_axis.edges[:-1]energy_max=self.energy_axis.edges[1:]result=self.reference_spectral_model.energy_flux(energy_min,energy_max)returnresult[self._expand_slice]@propertydefdnde(self):"""Return differential flux (dnde) SED values."""returnself._use_center_as_labels(self.norm*self.dnde_ref)@propertydefdnde_err(self):"""Return differential flux (dnde) SED errors."""returnself._use_center_as_labels(self.norm_err*self.dnde_ref)@propertydefdnde_errn(self):"""Return differential flux (dnde) SED negative errors."""returnself._use_center_as_labels(self.norm_errn*self.dnde_ref)@propertydefdnde_errp(self):"""Return differential flux (dnde) SED positive errors."""returnself._use_center_as_labels(self.norm_errp*self.dnde_ref)@propertydefdnde_ul(self):"""Return differential flux (dnde) SED upper limit."""returnself._use_center_as_labels(self.norm_ul*self.dnde_ref)@propertydefe2dnde(self):"""Return differential energy flux (e2dnde) SED values."""returnself._use_center_as_labels(self.norm*self.e2dnde_ref)@propertydefe2dnde_err(self):"""Return differential energy flux (e2dnde) SED errors."""returnself._use_center_as_labels(self.norm_err*self.e2dnde_ref)@propertydefe2dnde_errn(self):"""Return differential energy flux (e2dnde) SED negative errors."""returnself._use_center_as_labels(self.norm_errn*self.e2dnde_ref)@propertydefe2dnde_errp(self):"""Return differential energy flux (e2dnde) SED positive errors."""returnself._use_center_as_labels(self.norm_errp*self.e2dnde_ref)@propertydefe2dnde_ul(self):"""Return differential energy flux (e2dnde) SED upper limit."""returnself._use_center_as_labels(self.norm_ul*self.e2dnde_ref)@propertydefflux(self):"""Return integral flux (flux) SED values."""returnself.norm*self.flux_ref@propertydefflux_err(self):"""Return integral flux (flux) SED values."""returnself.norm_err*self.flux_ref@propertydefflux_errn(self):"""Return integral flux (flux) SED negative errors."""returnself.norm_errn*self.flux_ref@propertydefflux_errp(self):"""Return integral flux (flux) SED positive errors."""returnself.norm_errp*self.flux_ref@propertydefflux_ul(self):"""Return integral flux (flux) SED upper limits."""returnself.norm_ul*self.flux_ref@propertydefflux_sensitivity(self):"""Sensitivity given as the flux for which the significance is ``self.meta["n_sigma_sensitivity]``."""returnself.norm_sensitivity*self.flux_ref@propertydefeflux(self):"""Return energy flux (eflux) SED values."""returnself.norm*self.eflux_ref@propertydefeflux_err(self):"""Return energy flux (eflux) SED errors."""returnself.norm_err*self.eflux_ref@propertydefeflux_errn(self):"""Return energy flux (eflux) SED negative errors."""returnself.norm_errn*self.eflux_ref@propertydefeflux_errp(self):"""Return energy flux (eflux) SED positive errors."""returnself.norm_errp*self.eflux_ref@propertydefeflux_ul(self):"""Return energy flux (eflux) SED upper limits."""returnself.norm_ul*self.eflux_refdef_filter_convergence_failure(self,some_map):"""Put NaN where pixels did not converge."""ifnotself._filter_success_nan:returnsome_mapifnotself.has_success:returnsome_mapifself.success.data.shape==some_map.data.shape:new_map=some_map.copy()new_map.data[~self.success.data]=np.nanelse:mask_nan=np.ones(self.success.data.shape)mask_nan[~self.success.data]=np.nannew_map=some_map*np.expand_dims(mask_nan,2)new_map.data=new_map.data.astype(some_map.data.dtype)returnnew_map
[docs]defget_flux_points(self,position=None):"""Extract flux point at a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position where the flux points are extracted. Returns ------- flux_points : `~gammapy.estimators.FluxPoints` Flux points object. """fromgammapy.estimatorsimportFluxPointsifpositionisNone:position=self.geom.center_skydirdata={}fornameinself._data:m=getattr(self,name)ifm.data.dtypeisnp.dtype(bool):data[name]=m.to_region_nd_map(region=position,method="nearest",func=np.any)else:data[name]=m.to_region_nd_map(region=position,method="nearest")returnFluxPoints(data,reference_model=self.reference_model,meta=self.meta.copy(),gti=self.gti,)
[docs]defto_maps(self,sed_type=None):"""Return maps in a given SED type. Parameters ---------- sed_type : {"likelihood", "dnde", "e2dnde", "flux", "eflux"}, optional SED type to convert to. If None, set to `Likelihood`. Default is None. Returns ------- maps : `Maps` Maps object containing the requested maps. """maps=Maps()ifsed_typeisNone:sed_type=self.sed_type_initforquantityinself.all_quantities(sed_type=sed_type):m=getattr(self,quantity,None)ifmisnotNone:maps[quantity]=mreturnmaps
[docs]@classmethoddeffrom_stack(cls,maps,axis,meta=None):"""Create flux points by stacking list of flux points. The first `FluxPoints` object in the list is taken as a reference to infer column names and units for the stacked object. Parameters ---------- maps : list of `FluxMaps` List of maps to stack. axis : `MapAxis` New axis to create. meta : dict, optional Metadata of the resulting flux points. Default is None. Returns ------- flux_maps : `FluxMaps` Stacked flux maps along axis. """reference=maps[0]data={}forquantityinreference.available_quantities:data[quantity]=Map.from_stack([_._data[quantity]for_inmaps],axis=axis)ifmetaisNone:meta=reference.meta.copy()gtis=[_.gtifor_inmapsif_.gti]ifgtis:gti=GTI.from_stack(gtis)else:gti=Nonereturncls(data=data,reference_model=reference.reference_model,meta=meta,gti=gti)
[docs]defiter_by_axis(self,axis_name):"""Create a set of FluxMaps by splitting along an axis. Parameters ---------- axis_name : str Name of the axis to split on. Returns ------- flux_maps : `FluxMap` FluxMap iteration. """split_maps={}axis=self.geom.axes[axis_name]gti=self.gtiforamapinself.available_quantities:split_maps[amap]=list(getattr(self,amap).iter_by_axis(axis_name))foridxinrange(axis.nbin):maps={}foramapinself.available_quantities:maps[amap]=split_maps[amap][idx]ifisinstance(axis,TimeMapAxis):gti=self.gti.select_time([axis.time_min[idx],axis.time_max[idx]])yieldself.__class__.from_maps(maps=maps,sed_type=self.sed_type_init,reference_model=self.reference_model,gti=gti,meta=self.meta,)
[docs]@classmethoddeffrom_maps(cls,maps,sed_type=None,reference_model=None,gti=None,meta=None):"""Create FluxMaps from a dictionary of maps. Parameters ---------- maps : `Maps` Maps object containing the input maps. sed_type : str, optional SED type of the input maps. If None, set to "likelihood". Default is None. reference_model : `~gammapy.modeling.models.SkyModel`, optional Reference model to use for conversions. If None, a model consisting of a point source with a power law spectrum of index 2 is assumed. Default is None. gti : `~gammapy.data.GTI`, optional Maps GTI information. Default is None. meta : `dict` Meta dictionary. Returns ------- flux_maps : `~gammapy.estimators.FluxMaps` Flux maps object. """ifsed_typeisNone:sed_type=cls._guess_sed_type(maps.keys())ifsed_typeisNone:raiseValueError("Specifying the SED type is required")cls._validate_data(data=maps,sed_type=sed_type)ifsed_type=="likelihood":returncls(data=maps,reference_model=reference_model,gti=gti,meta=meta)ifreference_modelisNone:log.warning("No reference model set for FluxMaps. Assuming point source with E^-2 spectrum.")reference_model=cls.reference_model_defaultelifisinstance(reference_model,SpectralModel):reference_model=SkyModel(reference_model)map_ref=maps[sed_type]energy_axis=map_ref.geom.axes["energy"]withnp.errstate(invalid="ignore",divide="ignore"):fluxes=reference_model.spectral_model.reference_fluxes(energy_axis=energy_axis)# TODO: handle reshaping in MapAxisfactor=fluxes[f"ref_{sed_type}"].to(map_ref.unit)[cls._expand_slice]data={}data["norm"]=map_ref/factorforkeyinOPTIONAL_QUANTITIES[sed_type]:ifkeyinmaps:norm_type=key.replace(sed_type,"norm")data[norm_type]=maps[key]/factor# We add the remaining mapsforkeyinOPTIONAL_QUANTITIES_COMMON:ifkeyinmaps:data[key]=maps[key]returncls(data=data,reference_model=reference_model,gti=gti,meta=meta)
[docs]defto_hdulist(self,sed_type=None,hdu_bands=None):"""Convert flux map to list of HDUs. For now, one cannot export the reference model. Parameters ---------- sed_type : str, optional SED type to convert to. If None, set to "likelihood". Default is None. hdu_bands : str, optional Name of the HDU with the BANDS table. Default is 'BANDS' If set to None, each map will have its own hdu_band. Default is None. Returns ------- hdulist : `~astropy.io.fits.HDUList` Map dataset list of HDUs. """ifsed_typeisNone:sed_type=self.sed_type_initexclude_primary=slice(1,None)hdu_primary=fits.PrimaryHDU()hdu_primary.header["SED_TYPE"]=sed_typehdulist=fits.HDUList([hdu_primary])maps=self.to_maps(sed_type=sed_type)hdulist.extend(maps.to_hdulist(hdu_bands=hdu_bands)[exclude_primary])ifself.gti:hdu=self.gti.to_table_hdu(format="gadf")hdulist.append(hdu)returnhdulist
[docs]@classmethoddeffrom_hdulist(cls,hdulist,hdu_bands=None,sed_type=None,checksum=False):"""Create flux map dataset from list of HDUs. Parameters ---------- hdulist : `~astropy.io.fits.HDUList` List of HDUs. hdu_bands : str, optional Name of the HDU with the BANDS table. Default is 'BANDS' If set to None, each map should have its own hdu_band. Default is None. sed_type : {"dnde", "flux", "e2dnde", "eflux", "likelihood"}, optional Sed type. Default is None. Returns ------- flux_maps : `~gammapy.estimators.FluxMaps` Flux maps object. """maps=Maps.from_hdulist(hdulist=hdulist,hdu_bands=hdu_bands)ifsed_typeisNone:sed_type=hdulist[0].header.get("SED_TYPE",None)filename=hdulist[0].header.get("MODEL",None)iffilename:reference_model=Models.read(filename,checksum=checksum)[0]else:reference_model=Noneif"GTI"inhdulist:gti=GTI.from_table_hdu(hdulist["GTI"])else:gti=Nonereturncls.from_maps(maps=maps,sed_type=sed_type,reference_model=reference_model,gti=gti)
[docs]defwrite(self,filename,filename_model=None,overwrite=False,sed_type=None,checksum=False,):"""Write flux map to file. Parameters ---------- filename : str Filename to write to. filename_model : str Filename of the model (yaml format). If None, keep string before '.' and add '_model.yaml' suffix. overwrite : bool, optional Overwrite existing file. Default is False. sed_type : str, optional Sed type to convert to. If None, set to "likelihood". Default is None. checksum : bool, optional When True adds both DATASUM and CHECKSUM cards to the headers written to the file. Default is False. """ifsed_typeisNone:sed_type=self.sed_type_initfilename=make_path(filename)iffilename_modelisNone:name_string=filename.as_posix()forsuffixinfilename.suffixes:name_string.replace(suffix,"")filename_model=name_string+"_model.yaml"filename_model=make_path(filename_model)hdulist=self.to_hdulist(sed_type)models=Models(self.reference_model)models.write(filename_model,overwrite=overwrite,write_covariance=False)hdulist[0].header["MODEL"]=filename_model.as_posix()hdulist.writeto(filename,overwrite=overwrite)
[docs]@classmethoddefread(cls,filename,checksum=False):"""Read map dataset from file. Parameters ---------- filename : str Filename to read from. checksum : bool If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False. Returns ------- flux_maps : `~gammapy.estimators.FluxMaps` Flux maps object. """withfits.open(str(make_path(filename)),memmap=False,checksum=checksum)ashdulist:returncls.from_hdulist(hdulist,checksum=checksum)
[docs]defslice_by_idx(self,slices):"""Slice flux maps by index. Parameters ---------- slices : dict Dict of axes names and integers or `slice` object pairs. Contains one element for each non-spatial dimension. For integer indexing the corresponding axes is dropped from the map. Axes not specified in the dict are kept unchanged. Returns ------- flux_maps : `FluxMaps` Sliced flux maps object. """data={}forkey,iteminself._data.items():data[key]=item.slice_by_idx(slices)returnself.__class__(data=data,reference_model=self.reference_model,meta=self.meta.copy(),gti=self.gti,)
[docs]defslice_by_coord(self,slices):"""Slice flux maps by coordinate values Parameters ---------- slices : dict Dict of axes names and `astropy.Quantity` or `astropy.Time` or `slice` object pairs. Contains one element for each non-spatial dimension. For integer indexing the corresponding axes is dropped from the map. Axes not specified in the dict are kept unchanged. Returns ------- flux_maps : `FluxMaps` Sliced flux maps object. """idx_intervals=[]forkey,intervalinzip(slices.keys(),slices.values()):axis=self.geom.axes[key]group=axis.group_table([interval.start,interval.stop])is_normal=group["bin_type"]=="normal "group=group[is_normal]idx_intervals.append(slice(int(group["idx_min"][0]),int(group["idx_max"][0]+1)))returnself.slice_by_idx(dict(zip(slices.keys(),idx_intervals)))
[docs]defslice_by_time(self,time_min,time_max):"""Slice flux maps by coordinate values along the time axis. Parameters ---------- time_min, time_max : `~astropy.time.Time` Time bounds used to slice the flux map. Returns ------- flux_maps : `FluxMaps` Sliced flux maps object. """time_slice=slice(time_min,time_max)returnself.slice_by_coord({"time":time_slice})
[docs]defslice_by_energy(self,energy_min,energy_max):"""Slice flux maps by coordinate values along the energy axis. Parameters ---------- energy_min, energy_max : `~astropy.units.Quantity` Energy bounds used to slice the flux map. Returns ------- flux_maps : `FluxMaps` Sliced flux maps object. """energy_slice=slice(energy_min,energy_max)returnself.slice_by_coord({"energy":energy_slice})
# TODO: should we allow this?def__getitem__(self,item):returngetattr(self,item)def__str__(self):str_=f"{self.__class__.__name__}\n"str_+="-"*len(self.__class__.__name__)str_+="\n\n"str_+="\t"+f"geom : {self.geom.__class__.__name__}\n"str_+="\t"+f"axes : {self.geom.axes_names}\n"str_+="\t"+f"shape : {self.geom.data_shape[::-1]}\n"str_+="\t"+f"quantities : {list(self.available_quantities)}\n"str_+=("\t"+f"ref. model : {self.reference_spectral_model.tag[-1]}\n")str_+="\t"+f"n_sigma : {self.n_sigma}\n"str_+="\t"+f"n_sigma_ul : {self.n_sigma_ul}\n"str_+="\t"+f"sqrt_ts_threshold_ul : {self.sqrt_ts_threshold_ul}\n"str_+="\t"+f"sed type init : {self.sed_type_init}\n"returnstr_.expandtabs(tabsize=2)