# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Cube models (axes: lon, lat, energy)."""importnumpyasnpimportastropy.unitsasufromastropy.nddataimportNoOverlapErrorfromgammapy.mapsimportMap,MapAxis,WcsGeomfromgammapy.modelingimportCovariance,Parametersfromgammapy.modeling.covarianceimportcopy_covariancefromgammapy.modeling.parameterimport_get_parameters_strfromgammapy.utils.fitsimportLazyFitsDatafromgammapy.utils.scriptsimportmake_name,make_pathfrom.coreimportModel,ModelBase,Modelsfrom.spatialimportConstantSpatialModel,SpatialModelfrom.spectralimportPowerLawNormSpectralModel,SpectralModel,TemplateSpectralModelfrom.temporalimportTemporalModel__all__=["create_fermi_isotropic_diffuse_model","FoVBackgroundModel","SkyModel","TemplateNPredModel",]
[docs]classSkyModel(ModelBase):"""Sky model component. This model represents a factorised sky model. It has `~gammapy.modeling.Parameters` combining the spatial and spectral parameters. Parameters ---------- spectral_model : `~gammapy.modeling.models.SpectralModel` Spectral model spatial_model : `~gammapy.modeling.models.SpatialModel` Spatial model (must be normalised to integrate to 1) temporal_model : `~gammapy.modeling.models.temporalModel` Temporal model name : str Model identifier apply_irf : dict Dictionary declaring which IRFs should be applied to this model. Options are {"exposure": True, "psf": True, "edisp": True} datasets_names : list of str Which datasets this model is applied to. """tag=["SkyModel","sky-model"]_apply_irf_default={"exposure":True,"psf":True,"edisp":True}def__init__(self,spectral_model,spatial_model=None,temporal_model=None,name=None,apply_irf=None,datasets_names=None,):self.spatial_model=spatial_modelself.spectral_model=spectral_modelself.temporal_model=temporal_modelself._name=make_name(name)ifapply_irfisNone:apply_irf=self._apply_irf_default.copy()self.apply_irf=apply_irfself.datasets_names=datasets_namesself._check_unit()is_norm=np.array([par.is_normforparinspectral_model.parameters])ifnotnp.any(is_norm):raiseValueError("Spectral model used with SkyModel requires a norm type parameter.")super().__init__()@propertydef_models(self):models=self.spectral_model,self.spatial_model,self.temporal_modelreturn[modelformodelinmodelsifmodelisnotNone]def_check_covariance(self):ifnotself.parameters==self._covariance.parameters:self._covariance=Covariance.from_stack([model.covarianceformodelinself._models],)def_check_unit(self):fromgammapy.data.gtiimportGTI# evaluate over a test geom to check output unit# TODO simpler way to test this ?axis=MapAxis.from_energy_bounds("0.1 TeV","10 TeV",nbin=1,name="energy_true")geom=WcsGeom.create(skydir=self.position,npix=(2,2),axes=[axis])gti=GTI.create(1*u.day,2*u.day)value=self.evaluate_geom(geom,gti)ifself.apply_irf["exposure"]:ref_unit=u.Unit("cm-2 s-1 MeV-1 sr-1")else:ref_unit=u.Unit("sr-1")ifself.spatial_modelisNone:ref_unit=ref_unit/u.Unit("sr-1")ifnotvalue.unit.is_equivalent(ref_unit):raiseValueError(f"SkyModel unit {value.unit} is not equivalent to {ref_unit}")@propertydefcovariance(self):self._check_covariance()formodelinself._models:self._covariance.set_subcovariance(model.covariance)returnself._covariance@covariance.setterdefcovariance(self,covariance):self._check_covariance()self._covariance.data=covarianceformodelinself._models:subcovar=self._covariance.get_subcovariance(model.covariance.parameters)model.covariance=subcovar@propertydefname(self):returnself._name@propertydefparameters(self):parameters=[]parameters.append(self.spectral_model.parameters)ifself.spatial_modelisnotNone:parameters.append(self.spatial_model.parameters)ifself.temporal_modelisnotNone:parameters.append(self.temporal_model.parameters)returnParameters.from_stack(parameters)@propertydefspatial_model(self):"""`~gammapy.modeling.models.SpatialModel`"""returnself._spatial_model@spatial_model.setterdefspatial_model(self,model):ifnot(modelisNoneorisinstance(model,SpatialModel)):raiseTypeError(f"Invalid type: {model!r}")self._spatial_model=model@propertydefspectral_model(self):"""`~gammapy.modeling.models.SpectralModel`"""returnself._spectral_model@spectral_model.setterdefspectral_model(self,model):ifnot(modelisNoneorisinstance(model,SpectralModel)):raiseTypeError(f"Invalid type: {model!r}")self._spectral_model=model@propertydeftemporal_model(self):"""`~gammapy.modeling.models.TemporalModel`"""returnself._temporal_model@temporal_model.setterdeftemporal_model(self,model):ifnot(modelisNoneorisinstance(model,TemporalModel)):raiseTypeError(f"Invalid type: {model!r}")self._temporal_model=model@propertydefposition(self):"""`~astropy.coordinates.SkyCoord`"""returngetattr(self.spatial_model,"position",None)@propertydefposition_lonlat(self):"""Spatial model center position `(lon, lat)` in rad and frame of the model"""returngetattr(self.spatial_model,"position_lonlat",None)@propertydefevaluation_bin_size_min(self):"""Minimal spatial bin size for spatial model evaluation."""if(self.spatial_modelisnotNoneandself.spatial_model.evaluation_bin_size_minisnotNone):returnself.spatial_model.evaluation_bin_size_minelse:returnNone@propertydefevaluation_radius(self):"""`~astropy.coordinates.Angle`"""returnself.spatial_model.evaluation_radius@propertydefevaluation_region(self):"""`~astropy.coordinates.Angle`"""returnself.spatial_model.evaluation_region@propertydefframe(self):returnself.spatial_model.framedef__add__(self,other):ifisinstance(other,(Models,list)):returnModels([self,*other])elifisinstance(other,(SkyModel,TemplateNPredModel)):returnModels([self,other])else:raiseTypeError(f"Invalid type: {other!r}")def__radd__(self,model):returnself.__add__(model)
[docs]defcontributes(self,mask,margin="0 deg"):"""Check if a skymodel contributes within a mask map. Parameters ---------- mask : `~gammapy.maps.WcsNDMap` of boolean type Map containing a boolean mask margin : `~astropy.units.Quantity` Add a margin in degree to the source evaluation radius. Used to take into account PSF width. Returns ------- models : `DatasetModels` Selected models contributing inside the region where mask==True """fromgammapy.datasets.evaluatorimportCUTOUT_MARGINmargin=u.Quantity(margin)ifnotmask.geom.is_image:mask=mask.reduce_over_axes(func=np.logical_or)ifmask.geom.is_regionandmask.geom.regionisnotNone:ifmask.geom.is_all_point_sky_regions:returnTruegeom=mask.geom.to_wcs_geom()mask=geom.region_mask([mask.geom.region])try:mask_cutout=mask.cutout(position=self.position,width=(2*self.evaluation_radius+CUTOUT_MARGIN+margin),)contributes=np.any(mask_cutout.data)except(NoOverlapError,ValueError):contributes=Falsereturncontributes
[docs]defevaluate(self,lon,lat,energy,time=None):"""Evaluate the model at given points. The model evaluation follows numpy broadcasting rules. Return differential surface brightness cube. At the moment in units: ``cm-2 s-1 TeV-1 deg-2`` Parameters ---------- lon, lat : `~astropy.units.Quantity` Spatial coordinates energy : `~astropy.units.Quantity` Energy coordinate time: `~astropy.time.Time` Time coordinate Returns ------- value : `~astropy.units.Quantity` Model value at the given point. """value=self.spectral_model(energy)# pylint:disable=not-callable# TODO: case if self.temporal_model is not None, introduce time in arguments ?ifself.spatial_modelisnotNone:ifself.spatial_model.is_energy_dependent:spatial=self.spatial_model(lon,lat,energy)else:spatial=self.spatial_model(lon,lat)value=value*spatial# pylint:disable=not-callableif(self.temporal_modelisnotNone)and(timeisnotNone):value=value*self.temporal_model(time)returnvalue
[docs]defevaluate_geom(self,geom,gti=None):"""Evaluate model on `~gammapy.maps.Geom`."""coords=geom.get_coord(sparse=True)value=self.spectral_model(coords["energy_true"])ifself.spatial_model:value=value*self.spatial_model.evaluate_geom(geom)ifself.temporal_model:integral=self.temporal_model.integral(gti.time_start,gti.time_stop)value=value*np.sum(integral)returnvalue
[docs]defintegrate_geom(self,geom,gti=None,oversampling_factor=None):"""Integrate model on `~gammapy.maps.Geom`. See `~gammapy.modeling.models.SpatialModel.integrate_geom` and `~gammapy.modeling.models.SpectralModel.integral`. Parameters ---------- geom : `Geom` or `~gammapy.maps.RegionGeom` Map geometry gti : `GTI` GIT table oversampling_factor : int or None The oversampling factor to use for spatial integration. Default is None: the factor is estimated from the model minimal bin size Returns ------- flux : `Map` Predicted flux map """energy=geom.axes["energy_true"].edgesvalue=self.spectral_model.integral(energy[:-1],energy[1:],).reshape((-1,1,1))ifself.spatial_model:value=(value*self.spatial_model.integrate_geom(geom,oversampling_factor=oversampling_factor).quantity)ifself.temporal_model:integral=self.temporal_model.integral(gti.time_start,gti.time_stop)value=value*np.sum(integral)returnMap.from_geom(geom=geom,data=value.value,unit=value.unit)
@copy_covariancedefcopy(self,name=None,copy_data=False,**kwargs):"""Copy sky model Parameters ---------- name : str Assign a new name to the copied model. copy_data : bool Copy the data arrays attached to models. **kwargs : dict Keyword arguments forwarded to `SkyModel` Returns ------- model : `SkyModel` Copied sky model. """ifself.spatial_modelisnotNone:spatial_model=self.spatial_model.copy(copy_data=copy_data)else:spatial_model=Noneifself.temporal_modelisnotNone:temporal_model=self.temporal_model.copy()else:temporal_model=Nonekwargs.setdefault("name",make_name(name))kwargs.setdefault("spectral_model",self.spectral_model.copy())kwargs.setdefault("spatial_model",spatial_model)kwargs.setdefault("temporal_model",temporal_model)kwargs.setdefault("apply_irf",self.apply_irf.copy())kwargs.setdefault("datasets_names",self.datasets_names)returnself.__class__(**kwargs)
[docs]defto_dict(self,full_output=False):"""Create dict for YAML serilisation"""data={}data["name"]=self.namedata["type"]=self.tag[0]ifself.apply_irf!=self._apply_irf_default:data["apply_irf"]=self.apply_irfifself.datasets_namesisnotNone:data["datasets_names"]=self.datasets_namesdata.update(self.spectral_model.to_dict(full_output))ifself.spatial_modelisnotNone:data.update(self.spatial_model.to_dict(full_output))ifself.temporal_modelisnotNone:data.update(self.temporal_model.to_dict(full_output))returndata
[docs]@classmethoddeffrom_dict(cls,data):"""Create SkyModel from dict"""fromgammapy.modeling.modelsimport(SPATIAL_MODEL_REGISTRY,SPECTRAL_MODEL_REGISTRY,TEMPORAL_MODEL_REGISTRY,)model_class=SPECTRAL_MODEL_REGISTRY.get_cls(data["spectral"]["type"])spectral_model=model_class.from_dict({"spectral":data["spectral"]})spatial_data=data.get("spatial")ifspatial_dataisnotNone:model_class=SPATIAL_MODEL_REGISTRY.get_cls(spatial_data["type"])spatial_model=model_class.from_dict({"spatial":spatial_data})else:spatial_model=Nonetemporal_data=data.get("temporal")iftemporal_dataisnotNone:model_class=TEMPORAL_MODEL_REGISTRY.get_cls(temporal_data["type"])temporal_model=model_class.from_dict({"temporal":temporal_data})else:temporal_model=Nonereturncls(name=data["name"],spatial_model=spatial_model,spectral_model=spectral_model,temporal_model=temporal_model,apply_irf=data.get("apply_irf",cls._apply_irf_default),datasets_names=data.get("datasets_names"),)
def__str__(self):str_=f"{self.__class__.__name__}\n\n"str_+="\t{:26}: {}\n".format("Name",self.name)str_+="\t{:26}: {}\n".format("Datasets names",self.datasets_names)str_+="\t{:26}: {}\n".format("Spectral model type",self.spectral_model.__class__.__name__)ifself.spatial_modelisnotNone:spatial_type=self.spatial_model.__class__.__name__else:spatial_type=""str_+="\t{:26}: {}\n".format("Spatial model type",spatial_type)ifself.temporal_modelisnotNone:temporal_type=self.temporal_model.__class__.__name__else:temporal_type=""str_+="\t{:26}: {}\n".format("Temporal model type",temporal_type)str_+="\tParameters:\n"info=_get_parameters_str(self.parameters)lines=info.split("\n")str_+="\t"+"\n\t".join(lines[:-1])str_+="\n\n"returnstr_.expandtabs(tabsize=2)
[docs]@classmethoddefcreate(cls,spectral_model,spatial_model=None,temporal_model=None,**kwargs):"""Create a model instance. Parameters ---------- spectral_model : str Tag to create spectral model spatial_model : str Tag to create spatial model temporal_model : str Tag to create temporal model **kwargs : dict Keyword arguments passed to `SkyModel` Returns ------- model : SkyModel Sky model """spectral_model=Model.create(spectral_model,model_type="spectral")ifspatial_model:spatial_model=Model.create(spatial_model,model_type="spatial")iftemporal_model:temporal_model=Model.create(temporal_model,model_type="temporal")returncls(spectral_model=spectral_model,spatial_model=spatial_model,temporal_model=temporal_model,**kwargs,)
[docs]deffreeze(self,model_type=None):"""Freeze parameters depending on model type Parameters ---------- model_type : {None, "spatial", "spectral", "temporal"} freeze all parameters or only or only spatial/spectral/temporal. Default is None so all parameters are frozen. """ifmodel_typeisNone:self.parameters.freeze_all()else:model=getattr(self,f"{model_type}_model")model.freeze()
[docs]defunfreeze(self,model_type=None):"""Restore parameters frozen status to default depending on model type Parameters ---------- model_type : {None, "spatial", "spectral", "temporal"} restore frozen status to default for all parameters or only spatial/spectral/temporal Default is None so all parameters are restore to default frozen status. """ifmodel_typeisNone:formodel_typein["spectral","spatial","temporal"]:self.unfreeze(model_type)else:model=getattr(self,f"{model_type}_model")ifmodel:model.unfreeze()
[docs]classFoVBackgroundModel(ModelBase):"""Field of view background model The background model holds the correction parameters applied to the instrumental background attached to a `MapDataset` or `SpectrumDataset`. Parameters ---------- spectral_model : `~gammapy.modeling.models.SpectralModel` Normalized spectral model. dataset_name : str Dataset name """tag=["FoVBackgroundModel","fov-bkg"]def__init__(self,spectral_model=None,dataset_name=None):ifdataset_nameisNone:raiseValueError("Dataset name a is required argument")self.datasets_names=[dataset_name]ifspectral_modelisNone:spectral_model=PowerLawNormSpectralModel()ifnotspectral_model.is_norm_spectral_model:raiseValueError("A norm spectral model is required.")self._spectral_model=spectral_modelsuper().__init__()
[docs]@classmethoddeffrom_dict(cls,data):"""Create model from dict Parameters ---------- data : dict Data dictionary """fromgammapy.modeling.modelsimportSPECTRAL_MODEL_REGISTRYspectral_data=data.get("spectral")ifspectral_dataisnotNone:model_class=SPECTRAL_MODEL_REGISTRY.get_cls(spectral_data["type"])spectral_model=model_class.from_dict({"spectral":spectral_data})else:spectral_model=Nonedatasets_names=data.get("datasets_names")ifdatasets_namesisNone:raiseValueError("FoVBackgroundModel must define a dataset name")iflen(datasets_names)>1:raiseValueError("FoVBackgroundModel can only be assigned to one dataset")returncls(spectral_model=spectral_model,dataset_name=datasets_names[0],)
[docs]defreset_to_default(self):"""Reset parameter values to default"""values=self.spectral_model.default_parameters.valueself.spectral_model.parameters.value=values
[docs]deffreeze(self,model_type="spectral"):"""Freeze model parameters"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.freeze()
[docs]defunfreeze(self,model_type="spectral"):"""Restore parameters frozen status to default"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.unfreeze()
[docs]classTemplateNPredModel(ModelBase):"""Background model. Create a new map by a tilt and normalization on the available map Parameters ---------- map : `~gammapy.maps.Map` Background model map spectral_model : `~gammapy.modeling.models.SpectralModel` Normalized spectral model, default is `~gammapy.modeling.models.PowerLawNormSpectralModel` copy_data : bool Create a deepcopy of the map data or directly use the original. True by default, can be turned to False to save memory in case of large maps. """tag="TemplateNPredModel"map=LazyFitsData(cache=True)def__init__(self,map,spectral_model=None,name=None,filename=None,datasets_names=None,copy_data=True,):ifisinstance(map,Map):axis=map.geom.axes["energy"]ifaxis.node_type!="edges":raiseValueError('Need an integrated map, energy axis node_type="edges"')ifcopy_data:self.map=map.copy()else:self.map=mapself._name=make_name(name)self.filename=filenameifspectral_modelisNone:spectral_model=PowerLawNormSpectralModel()spectral_model.tilt.frozen=Trueself.spectral_model=spectral_modelifisinstance(datasets_names,str):datasets_names=[datasets_names]ifisinstance(datasets_names,list):iflen(datasets_names)!=1:raiseValueError("Currently background models can only be assigned to one dataset.")self.datasets_names=datasets_namessuper().__init__()@copy_covariancedefcopy(self,name=None,copy_data=False,**kwargs):"""Copy template npred model. Parameters ---------- name : str Assign a new name to the copied model. copy_data : bool Copy the data arrays attached to models. **kwargs : dict Keyword arguments forwarded to `TemplateNPredModel` Returns ------- model : `TemplateNPredModel` Copied template npred model. """name=make_name(name)kwargs.setdefault("map",self.map)kwargs.setdefault("spectral_model",self.spectral_model.copy())kwargs.setdefault("filename",self.filename)kwargs.setdefault("datasets_names",self.datasets_names)returnself.__class__(name=name,copy_data=copy_data,**kwargs)@propertydefname(self):returnself._name@propertydefenergy_center(self):"""True energy axis bin centers (`~astropy.units.Quantity`)"""energy_axis=self.map.geom.axes["energy"]energy=energy_axis.centerreturnenergy[:,np.newaxis,np.newaxis]@propertydefspectral_model(self):"""`~gammapy.modeling.models.SpectralModel`"""returnself._spectral_model@spectral_model.setterdefspectral_model(self,model):ifnot(modelisNoneorisinstance(model,SpectralModel)):raiseTypeError(f"Invalid type: {model!r}")self._spectral_model=model@propertydefparameters(self):parameters=[]parameters.append(self.spectral_model.parameters)returnParameters.from_stack(parameters)
[docs]defevaluate(self):"""Evaluate background model. Returns ------- background_map : `~gammapy.maps.Map` Background evaluated on the Map """value=self.spectral_model(self.energy_center).valueback_values=self.map.data*valuereturnself.map.copy(data=back_values)
[docs]@classmethoddeffrom_dict(cls,data):fromgammapy.modeling.modelsimportSPECTRAL_MODEL_REGISTRYspectral_data=data.get("spectral")ifspectral_dataisnotNone:model_class=SPECTRAL_MODEL_REGISTRY.get_cls(spectral_data["type"])spectral_model=model_class.from_dict({"spectral":spectral_data})else:spectral_model=Noneif"filename"indata:bkg_map=Map.read(data["filename"])elif"map"indata:bkg_map=data["map"]else:# TODO: for now create a fake map for serialization,# uptdated in MapDataset.from_dict()axis=MapAxis.from_edges(np.logspace(-1,1,2),unit=u.TeV,name="energy")geom=WcsGeom.create(skydir=(0,0),npix=(1,1),frame="galactic",axes=[axis])bkg_map=Map.from_geom(geom)returncls(map=bkg_map,spectral_model=spectral_model,name=data["name"],datasets_names=data.get("datasets_names"),filename=data.get("filename"),)
[docs]defcutout(self,position,width,mode="trim",name=None):"""Cutout background model. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Center position of the cutout region. width : tuple of `~astropy.coordinates.Angle` Angular sizes of the region in (lon, lat) in that specific order. If only one value is passed, a square region is extracted. mode : {'trim', 'partial', 'strict'} Mode option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`. name : str Name of the returned background model. Returns ------- cutout : `TemplateNPredModel` Cutout background model. """cutout_kwargs={"position":position,"width":width,"mode":mode}bkg_map=self.map.cutout(**cutout_kwargs)spectral_model=self.spectral_model.copy()returnself.__class__(bkg_map,spectral_model=spectral_model,name=name)
[docs]defstack(self,other,weights=None,nan_to_num=True):"""Stack background model in place. Stacking the background model resets the current parameters values. Parameters ---------- other : `TemplateNPredModel` Other background model. nan_to_num: bool Non-finite values are replaced by zero if True (default). """bkg=self.evaluate()ifnan_to_num:bkg.data[~np.isfinite(bkg.data)]=0other_bkg=other.evaluate()bkg.stack(other_bkg,weights=weights,nan_to_num=nan_to_num)self.map=bkg# reset parameter valuesself.spectral_model.norm.value=1self.spectral_model.tilt.value=0
[docs]deffreeze(self,model_type="spectral"):"""Freeze model parameters"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.freeze()
[docs]defunfreeze(self,model_type="spectral"):"""Restore parameters frozen status to default"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.unfreeze()