# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Cube models (axes: lon, lat, energy)."""importloggingimportosimportnumpyasnpimportastropy.unitsasufromastropy.nddataimportNoOverlapErrorfromastropy.timeimportTimefromgammapy.mapsimportMap,MapAxis,WcsGeomfromgammapy.modelingimportCovariance,Parametersfromgammapy.modeling.parameterimport_get_parameters_strfromgammapy.utils.fitsimportLazyFitsDatafromgammapy.utils.scriptsimportmake_name,make_pathfrom.coreimportModel,ModelBase,Modelsfrom.spatialimportConstantSpatialModel,SpatialModelfrom.spectralimportPowerLawNormSpectralModel,SpectralModel,TemplateSpectralModelfrom.temporalimportTemporalModellog=logging.getLogger(__name__)__all__=["create_fermi_isotropic_diffuse_model","FoVBackgroundModel","SkyModel","TemplateNPredModel",]
[docs]classSkyModel(ModelBase):"""Sky model component. This model represents a factorised sky model. It has `~gammapy.modeling.Parameters` combining the spatial and spectral parameters. Parameters ---------- spectral_model : `~gammapy.modeling.models.SpectralModel` Spectral model spatial_model : `~gammapy.modeling.models.SpatialModel` Spatial model (must be normalised to integrate to 1) temporal_model : `~gammapy.modeling.models.temporalModel` Temporal model name : str Model identifier apply_irf : dict Dictionary declaring which IRFs should be applied to this model. Options are {"exposure": True, "psf": True, "edisp": True} datasets_names : list of str Which datasets this model is applied to. """tag=["SkyModel","sky-model"]_apply_irf_default={"exposure":True,"psf":True,"edisp":True}def__init__(self,spectral_model,spatial_model=None,temporal_model=None,name=None,apply_irf=None,datasets_names=None,covariance_data=None,):self.spatial_model=spatial_modelself.spectral_model=spectral_modelself.temporal_model=temporal_modelself._name=make_name(name)ifapply_irfisNone:apply_irf=self._apply_irf_default.copy()self.apply_irf=apply_irfself.datasets_names=datasets_namesself._check_unit()is_norm=np.array([par.is_normforparinspectral_model.parameters])ifnotnp.any(is_norm):raiseValueError("Spectral model used with SkyModel requires a norm type parameter.")super().__init__(covariance_data=covariance_data)@propertydef_models(self):models=self.spectral_model,self.spatial_model,self.temporal_modelreturn[modelformodelinmodelsifmodelisnotNone]def_check_covariance(self):ifnotself.parameters==self._covariance.parameters:self._covariance=Covariance.from_stack([model.covarianceformodelinself._models],)def_check_unit(self):axis=MapAxis.from_energy_bounds("0.1 TeV","10 TeV",nbin=1,name="energy_true")geom=WcsGeom.create(skydir=self.position,npix=(2,2),axes=[axis])time=Time(55555,format="mjd")ifself.apply_irf["exposure"]:ref_unit=u.Unit("cm-2 s-1 MeV-1")else:ref_unit=u.Unit("")obt_unit=self.spectral_model(axis.center).unitifself.spatial_model:obt_unit=obt_unit*self.spatial_model.evaluate_geom(geom).unitref_unit=ref_unit/u.srifself.temporal_model:ifu.Quantity(self.temporal_model(time)).unit.is_equivalent(self.spectral_model(axis.center).unit):obt_unit=((self.temporal_model(time)*self.spatial_model.evaluate_geom(geom).unit).to(obt_unit.to_string()).unit)else:obt_unit=obt_unit*u.Quantity(self.temporal_model(time)).unitifnotobt_unit.is_equivalent(ref_unit):raiseValueError(f"SkyModel unit {obt_unit} is not equivalent to {ref_unit}")@propertydefcovariance(self):self._check_covariance()formodelinself._models:self._covariance.set_subcovariance(model.covariance)returnself._covariance@covariance.setterdefcovariance(self,covariance):self._check_covariance()self._covariance.data=covarianceformodelinself._models:subcovar=self._covariance.get_subcovariance(model.covariance.parameters)model.covariance=subcovar@propertydefname(self):returnself._name@propertydefparameters(self):parameters=[]parameters.append(self.spectral_model.parameters)ifself.spatial_modelisnotNone:parameters.append(self.spatial_model.parameters)ifself.temporal_modelisnotNone:parameters.append(self.temporal_model.parameters)returnParameters.from_stack(parameters)@propertydefspatial_model(self):"""`~gammapy.modeling.models.SpatialModel`"""returnself._spatial_model@spatial_model.setterdefspatial_model(self,model):ifnot(modelisNoneorisinstance(model,SpatialModel)):raiseTypeError(f"Invalid type: {model!r}")self._spatial_model=model@propertydefspectral_model(self):"""`~gammapy.modeling.models.SpectralModel`"""returnself._spectral_model@spectral_model.setterdefspectral_model(self,model):ifnot(modelisNoneorisinstance(model,SpectralModel)):raiseTypeError(f"Invalid type: {model!r}")self._spectral_model=model@propertydeftemporal_model(self):"""`~gammapy.modeling.models.TemporalModel`"""returnself._temporal_model@temporal_model.setterdeftemporal_model(self,model):ifnot(modelisNoneorisinstance(model,TemporalModel)):raiseTypeError(f"Invalid type: {model!r}")self._temporal_model=model@propertydefposition(self):"""`~astropy.coordinates.SkyCoord`"""returngetattr(self.spatial_model,"position",None)@propertydefposition_lonlat(self):"""Spatial model center position `(lon, lat)` in rad and frame of the model"""returngetattr(self.spatial_model,"position_lonlat",None)@propertydefevaluation_bin_size_min(self):"""Minimal spatial bin size for spatial model evaluation."""if(self.spatial_modelisnotNoneandself.spatial_model.evaluation_bin_size_minisnotNone):returnself.spatial_model.evaluation_bin_size_minelse:returnNone@propertydefevaluation_radius(self):"""`~astropy.coordinates.Angle`"""returnself.spatial_model.evaluation_radius@propertydefevaluation_region(self):"""`~astropy.coordinates.Angle`"""returnself.spatial_model.evaluation_region@propertydefframe(self):returnself.spatial_model.framedef__add__(self,other):ifisinstance(other,(Models,list)):returnModels([self,*other])elifisinstance(other,(SkyModel,TemplateNPredModel)):returnModels([self,other])else:raiseTypeError(f"Invalid type: {other!r}")def__radd__(self,model):returnself.__add__(model)
[docs]defcontributes(self,mask,margin="0 deg"):"""Check if a skymodel contributes within a mask map. Parameters ---------- mask : `~gammapy.maps.WcsNDMap` of boolean type Map containing a boolean mask margin : `~astropy.units.Quantity` Add a margin in degree to the source evaluation radius. Used to take into account PSF width. Returns ------- models : `DatasetModels` Selected models contributing inside the region where mask==True """fromgammapy.datasets.evaluatorimportCUTOUT_MARGINmargin=u.Quantity(margin)ifnotmask.geom.is_image:mask=mask.reduce_over_axes(func=np.logical_or)ifmask.geom.is_regionandmask.geom.regionisnotNone:ifmask.geom.is_all_point_sky_regions:returnTruegeom=mask.geom.to_wcs_geom()mask=geom.region_mask([mask.geom.region])try:mask_cutout=mask.cutout(position=self.position,width=(2*self.evaluation_radius+CUTOUT_MARGIN+margin),)contributes=np.any(mask_cutout.data)except(NoOverlapError,ValueError):contributes=Falsereturncontributes
[docs]defevaluate(self,lon,lat,energy,time=None):"""Evaluate the model at given points. The model evaluation follows numpy broadcasting rules. Return differential surface brightness cube. At the moment in units: ``cm-2 s-1 TeV-1 deg-2`` Parameters ---------- lon, lat : `~astropy.units.Quantity` Spatial coordinates energy : `~astropy.units.Quantity` Energy coordinate time: `~astropy.time.Time` Time coordinate Returns ------- value : `~astropy.units.Quantity` Model value at the given point. """value=self.spectral_model(energy)# pylint:disable=not-callable# TODO: case if self.temporal_model is not None, introduce time in arguments ?ifself.spatial_modelisnotNone:ifself.spatial_model.is_energy_dependent:spatial=self.spatial_model(lon,lat,energy)else:spatial=self.spatial_model(lon,lat)value=value*spatial# pylint:disable=not-callableif(self.temporal_modelisnotNone)and(timeisnotNone):value=value*self.temporal_model(time)returnvalue
[docs]defevaluate_geom(self,geom,gti=None):"""Evaluate model on `~gammapy.maps.Geom`."""coords=geom.get_coord(sparse=True)value=self.spectral_model(coords["energy_true"])ifself.spatial_model:value=value*self.spatial_model.evaluate_geom(geom)ifself.temporal_model:integral=self.temporal_model.integral(gti.time_start,gti.time_stop)value=value*np.sum(integral)returnvalue
[docs]defintegrate_geom(self,geom,gti=None,oversampling_factor=None):"""Integrate model on `~gammapy.maps.Geom`. See `~gammapy.modeling.models.SpatialModel.integrate_geom` and `~gammapy.modeling.models.SpectralModel.integral`. Parameters ---------- geom : `Geom` or `~gammapy.maps.RegionGeom` Map geometry gti : `GTI` GIT table oversampling_factor : int or None The oversampling factor to use for spatial integration. Default is None: the factor is estimated from the model minimal bin size Returns ------- flux : `Map` Predicted flux map """energy=geom.axes["energy_true"].edgesvalue=self.spectral_model.integral(energy[:-1],energy[1:],).reshape((-1,1,1))ifself.spatial_model:value=(value*self.spatial_model.integrate_geom(geom,oversampling_factor=oversampling_factor).quantity)ifself.temporal_model:integral=self.temporal_model.integral(gti.time_start,gti.time_stop)value=value*np.sum(integral)returnMap.from_geom(geom=geom,data=value.value,unit=value.unit)
[docs]defcopy(self,name=None,copy_data=False,**kwargs):"""Copy sky model Parameters ---------- name : str Assign a new name to the copied model. copy_data : bool Copy the data arrays attached to models. **kwargs : dict Keyword arguments forwarded to `SkyModel` Returns ------- model : `SkyModel` Copied sky model. """ifself.spatial_modelisnotNone:spatial_model=self.spatial_model.copy(copy_data=copy_data)else:spatial_model=Noneifself.temporal_modelisnotNone:temporal_model=self.temporal_model.copy()else:temporal_model=Nonekwargs.setdefault("name",make_name(name))kwargs.setdefault("spectral_model",self.spectral_model.copy())kwargs.setdefault("spatial_model",spatial_model)kwargs.setdefault("temporal_model",temporal_model)kwargs.setdefault("apply_irf",self.apply_irf.copy())kwargs.setdefault("datasets_names",self.datasets_names)kwargs.setdefault("covariance_data",self.covariance.data.copy())returnself.__class__(**kwargs)
[docs]defto_dict(self,full_output=False):"""Create dict for YAML serilisation"""data={}data["name"]=self.namedata["type"]=self.tag[0]ifself.apply_irf!=self._apply_irf_default:data["apply_irf"]=self.apply_irfifself.datasets_namesisnotNone:data["datasets_names"]=self.datasets_namesdata.update(self.spectral_model.to_dict(full_output))ifself.spatial_modelisnotNone:data.update(self.spatial_model.to_dict(full_output))ifself.temporal_modelisnotNone:data.update(self.temporal_model.to_dict(full_output))returndata
[docs]@classmethoddeffrom_dict(cls,data):"""Create SkyModel from dict"""fromgammapy.modeling.modelsimport(SPATIAL_MODEL_REGISTRY,SPECTRAL_MODEL_REGISTRY,TEMPORAL_MODEL_REGISTRY,)model_class=SPECTRAL_MODEL_REGISTRY.get_cls(data["spectral"]["type"])spectral_model=model_class.from_dict({"spectral":data["spectral"]})spatial_data=data.get("spatial")ifspatial_dataisnotNone:model_class=SPATIAL_MODEL_REGISTRY.get_cls(spatial_data["type"])spatial_model=model_class.from_dict({"spatial":spatial_data})else:spatial_model=Nonetemporal_data=data.get("temporal")iftemporal_dataisnotNone:model_class=TEMPORAL_MODEL_REGISTRY.get_cls(temporal_data["type"])temporal_model=model_class.from_dict({"temporal":temporal_data})else:temporal_model=Nonereturncls(name=data["name"],spatial_model=spatial_model,spectral_model=spectral_model,temporal_model=temporal_model,apply_irf=data.get("apply_irf",cls._apply_irf_default),datasets_names=data.get("datasets_names"),)
def__str__(self):str_=f"{self.__class__.__name__}\n\n"str_+="\t{:26}: {}\n".format("Name",self.name)str_+="\t{:26}: {}\n".format("Datasets names",self.datasets_names)str_+="\t{:26}: {}\n".format("Spectral model type",self.spectral_model.__class__.__name__)ifself.spatial_modelisnotNone:spatial_type=self.spatial_model.__class__.__name__else:spatial_type=""str_+="\t{:26}: {}\n".format("Spatial model type",spatial_type)ifself.temporal_modelisnotNone:temporal_type=self.temporal_model.__class__.__name__else:temporal_type=""str_+="\t{:26}: {}\n".format("Temporal model type",temporal_type)str_+="\tParameters:\n"info=_get_parameters_str(self.parameters)lines=info.split("\n")str_+="\t"+"\n\t".join(lines[:-1])str_+="\n\n"returnstr_.expandtabs(tabsize=2)
[docs]@classmethoddefcreate(cls,spectral_model,spatial_model=None,temporal_model=None,**kwargs):"""Create a model instance. Parameters ---------- spectral_model : str Tag to create spectral model spatial_model : str Tag to create spatial model temporal_model : str Tag to create temporal model **kwargs : dict Keyword arguments passed to `SkyModel` Returns ------- model : SkyModel Sky model """spectral_model=Model.create(spectral_model,model_type="spectral")ifspatial_model:spatial_model=Model.create(spatial_model,model_type="spatial")iftemporal_model:temporal_model=Model.create(temporal_model,model_type="temporal")returncls(spectral_model=spectral_model,spatial_model=spatial_model,temporal_model=temporal_model,**kwargs,)
[docs]deffreeze(self,model_type=None):"""Freeze parameters depending on model type Parameters ---------- model_type : {None, "spatial", "spectral", "temporal"} freeze all parameters or only or only spatial/spectral/temporal. Default is None so all parameters are frozen. """ifmodel_typeisNone:self.parameters.freeze_all()else:model=getattr(self,f"{model_type}_model")model.freeze()
[docs]defunfreeze(self,model_type=None):"""Restore parameters frozen status to default depending on model type Parameters ---------- model_type : {None, "spatial", "spectral", "temporal"} restore frozen status to default for all parameters or only spatial/spectral/temporal Default is None so all parameters are restore to default frozen status. """ifmodel_typeisNone:formodel_typein["spectral","spatial","temporal"]:self.unfreeze(model_type)else:model=getattr(self,f"{model_type}_model")ifmodel:model.unfreeze()
[docs]classFoVBackgroundModel(ModelBase):"""Field of view background model The background model holds the correction parameters applied to the instrumental background attached to a `MapDataset` or `SpectrumDataset`. Parameters ---------- spectral_model : `~gammapy.modeling.models.SpectralModel` Normalized spectral model. Default is `~gammapy.modeling.models.PowerLawNormSpectralModel` dataset_name : str Dataset name spatial_model : `~gammapy.modeling.models.SpatialModel` Unitless Spatial model (unit is dropped on evaluation if defined). Default is None. """tag=["FoVBackgroundModel","fov-bkg"]def__init__(self,spectral_model=None,dataset_name=None,spatial_model=None,covariance_data=None,):ifdataset_nameisNone:raiseValueError("Dataset name a is required argument")self.datasets_names=[dataset_name]ifspectral_modelisNone:spectral_model=PowerLawNormSpectralModel()ifnotspectral_model.is_norm_spectral_model:raiseValueError("A norm spectral model is required.")self._spatial_model=spatial_modelself._spectral_model=spectral_modelsuper().__init__(covariance_data=covariance_data)
[docs]defevaluate(self,energy,lon=None,lat=None):"""Evaluate model"""value=self.spectral_model(energy)ifself.spatial_modelisnotNone:iflonisnotNoneandlatisnotNone:ifself.spatial_model.is_energy_dependent:returnself.spatial_model(lon,lat,energy).value*valueelse:returnself.spatial_model(lon,lat).value*valueelse:raiseValueError("lon and lat are required if a spatial model is defined")else:returnvalue
[docs]defcopy(self,name=None,copy_data=False,**kwargs):"""Copy FoVBackgroundModel Parameters ---------- name : str Ignored, present for API compatibility. copy_data : bool Ignored, present for API compatibility. **kwargs : dict Keyword arguments forwarded to `FoVBackgroundModel` Returns ------- model : `FoVBackgroundModel` Copied FoV background model. """kwargs.setdefault("spectral_model",self.spectral_model.copy())kwargs.setdefault("dataset_name",self.datasets_names[0])kwargs.setdefault("covariance_data",self.covariance.data.copy())returnself.__class__(**kwargs)
[docs]@classmethoddeffrom_dict(cls,data):"""Create model from dict Parameters ---------- data : dict Data dictionary """fromgammapy.modeling.modelsimport(SPATIAL_MODEL_REGISTRY,SPECTRAL_MODEL_REGISTRY,)spectral_data=data.get("spectral")ifspectral_dataisnotNone:model_class=SPECTRAL_MODEL_REGISTRY.get_cls(spectral_data["type"])spectral_model=model_class.from_dict({"spectral":spectral_data})else:spectral_model=Nonespatial_data=data.get("spatial")ifspatial_dataisnotNone:model_class=SPATIAL_MODEL_REGISTRY.get_cls(spatial_data["type"])spatial_model=model_class.from_dict(spatial_data)else:spatial_model=Nonedatasets_names=data.get("datasets_names")ifdatasets_namesisNone:raiseValueError("FoVBackgroundModel must define a dataset name")iflen(datasets_names)>1:raiseValueError("FoVBackgroundModel can only be assigned to one dataset")returncls(spatial_model=spatial_model,spectral_model=spectral_model,dataset_name=datasets_names[0],)
[docs]defreset_to_default(self):"""Reset parameter values to default"""values=self.spectral_model.default_parameters.valueself.spectral_model.parameters.value=values
[docs]deffreeze(self,model_type="spectral"):"""Freeze model parameters"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.freeze()
[docs]defunfreeze(self,model_type="spectral"):"""Restore parameters frozen status to default"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.unfreeze()
[docs]classTemplateNPredModel(ModelBase):"""Background model. Create a new map by a tilt and normalization on the available map Parameters ---------- map : `~gammapy.maps.Map` Background model map spectral_model : `~gammapy.modeling.models.SpectralModel` Normalized spectral model, default is `~gammapy.modeling.models.PowerLawNormSpectralModel` copy_data : bool Create a deepcopy of the map data or directly use the original. True by default, can be turned to False to save memory in case of large maps. spatial_model : `~gammapy.modeling.models.SpatialModel` Unitless Spatial model (unit is dropped on evaluation if defined). Default is None. """tag="TemplateNPredModel"map=LazyFitsData(cache=True)def__init__(self,map,spectral_model=None,name=None,filename=None,datasets_names=None,copy_data=True,spatial_model=None,covariance_data=None,):ifisinstance(map,Map):axis=map.geom.axes["energy"]ifaxis.node_type!="edges":raiseValueError('Need an integrated map, energy axis node_type="edges"')ifcopy_data:self.map=map.copy()else:self.map=mapself._name=make_name(name)self.filename=filenameifspectral_modelisNone:spectral_model=PowerLawNormSpectralModel()spectral_model.tilt.frozen=Trueself.spatial_model=spatial_modelself.spectral_model=spectral_modelifisinstance(datasets_names,str):datasets_names=[datasets_names]ifisinstance(datasets_names,list):iflen(datasets_names)!=1:raiseValueError("Currently background models can only be assigned to one dataset.")self.datasets_names=datasets_namessuper().__init__(covariance_data=covariance_data)
[docs]defcopy(self,name=None,copy_data=False,**kwargs):"""Copy template npred model. Parameters ---------- name : str Assign a new name to the copied model. copy_data : bool Copy the data arrays attached to models. **kwargs : dict Keyword arguments forwarded to `TemplateNPredModel` Returns ------- model : `TemplateNPredModel` Copied template npred model. """name=make_name(name)kwargs.setdefault("map",self.map)kwargs.setdefault("spectral_model",self.spectral_model.copy())kwargs.setdefault("filename",self.filename)kwargs.setdefault("datasets_names",self.datasets_names)kwargs.setdefault("covariance_data",self.covariance.data.copy())returnself.__class__(name=name,copy_data=copy_data,**kwargs)
@propertydefname(self):returnself._name@propertydefenergy_center(self):"""True energy axis bin centers (`~astropy.units.Quantity`)"""energy_axis=self.map.geom.axes["energy"]energy=energy_axis.centerreturnenergy[:,np.newaxis,np.newaxis]@propertydefspectral_model(self):"""`~gammapy.modeling.models.SpectralModel`"""returnself._spectral_model@spectral_model.setterdefspectral_model(self,model):ifnot(modelisNoneorisinstance(model,SpectralModel)):raiseTypeError(f"Invalid type: {model!r}")self._spectral_model=model@propertydefparameters(self):parameters=[]parameters.append(self.spectral_model.parameters)returnParameters.from_stack(parameters)
[docs]defevaluate(self):"""Evaluate background model. Returns ------- background_map : `~gammapy.maps.Map` Background evaluated on the Map """value=self.spectral_model(self.energy_center).valueback_values=self.map.data*valueifself.spatial_modelisnotNone:value=self.spatial_model.evaluate_geom(self.map.geom).valueback_values*=valuereturnself.map.copy(data=back_values)
[docs]defwrite(self,overwrite=False):ifself.filenameisNone:raiseIOError("Missing filename")elifos.path.isfile(make_path(self.filename))andnotoverwrite:log.warning("Template file already exits, and overwrite is False")else:self.map.write(self.filename)
[docs]defcutout(self,position,width,mode="trim",name=None):"""Cutout background model. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Center position of the cutout region. width : tuple of `~astropy.coordinates.Angle` Angular sizes of the region in (lon, lat) in that specific order. If only one value is passed, a square region is extracted. mode : {'trim', 'partial', 'strict'} Mode option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`. name : str Name of the returned background model. Returns ------- cutout : `TemplateNPredModel` Cutout background model. """cutout_kwargs={"position":position,"width":width,"mode":mode}bkg_map=self.map.cutout(**cutout_kwargs)spectral_model=self.spectral_model.copy()returnself.__class__(bkg_map,spectral_model=spectral_model,name=name)
[docs]defstack(self,other,weights=None,nan_to_num=True):"""Stack background model in place. Stacking the background model resets the current parameters values. Parameters ---------- other : `TemplateNPredModel` Other background model. nan_to_num: bool Non-finite values are replaced by zero if True (default). """bkg=self.evaluate()ifnan_to_num:bkg.data[~np.isfinite(bkg.data)]=0other_bkg=other.evaluate()bkg.stack(other_bkg,weights=weights,nan_to_num=nan_to_num)self.map=bkg# reset parameter valuesself.spectral_model.norm.value=1self.spectral_model.tilt.value=0
[docs]deffreeze(self,model_type="spectral"):"""Freeze model parameters"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.freeze()
[docs]defunfreeze(self,model_type="spectral"):"""Restore parameters frozen status to default"""ifmodel_typeisNoneormodel_type=="spectral":self._spectral_model.unfreeze()