# Licensed under a 3-clause BSD style license - see LICENSE.rstimportcollections.abcimportcopyimporthtmlimportloggingimportwarningsfromos.pathimportsplitimportnumpyasnpimportastropy.unitsasufromastropy.coordinatesimportSkyCoordfromastropy.tableimportTableimportmatplotlib.pyplotaspltimportyamlfromgammapy.mapsimportMap,RegionGeomfromgammapy.modelingimportCovariance,Parameter,Parametersfromgammapy.utils.checkimportadd_checksum,verify_checksumfromgammapy.utils.metadataimportCreatorMetaDatafromgammapy.utils.scriptsimportmake_name,make_path__all__=["Model","Models","DatasetModels","ModelBase"]log=logging.getLogger(__name__)def_set_link(shared_register,model):forparaminmodel.parameters:name=param.namelink_label=param._link_label_ioiflink_labelisnotNone:iflink_labelinshared_register:new_param=shared_register[link_label]setattr(model,name,new_param)else:shared_register[link_label]=paramreturnshared_registerdef_get_model_class_from_dict(data):"""Get a model class from a dictionary."""from.import(MODEL_REGISTRY,PRIOR_REGISTRY,SPATIAL_MODEL_REGISTRY,SPECTRAL_MODEL_REGISTRY,TEMPORAL_MODEL_REGISTRY,)if"type"indata:cls=MODEL_REGISTRY.get_cls(data["type"])elif"spatial"indata:cls=SPATIAL_MODEL_REGISTRY.get_cls(data["spatial"]["type"])elif"spectral"indata:cls=SPECTRAL_MODEL_REGISTRY.get_cls(data["spectral"]["type"])elif"temporal"indata:cls=TEMPORAL_MODEL_REGISTRY.get_cls(data["temporal"]["type"])elif"prior"indata:cls=PRIOR_REGISTRY.get_cls(data["prior"]["type"])returnclsdef_build_parameters_from_dict(data,default_parameters):"""Build a `~gammapy.modeling.Parameters` object from input dictionary and default parameter values."""par_data=[]input_names=[_["name"]for_indata]forparindefault_parameters:par_dict=par.to_dict()try:index=input_names.index(par_dict["name"])par_dict.update(data[index])exceptValueError:log.warning(f"Parameter '{par_dict['name']}' not defined in YAML file."f" Using default value: {par_dict['value']}{par_dict['unit']}")par_data.append(par_dict)returnParameters.from_dict(par_data)
[docs]classModelBase:"""Model base class."""_type=Nonedef__init__(self,**kwargs):# Copy default parameters from the class to the instancedefault_parameters=self.default_parameters.copy()forparindefault_parameters:value=kwargs.get(par.name,par)ifnotisinstance(value,Parameter):par.quantity=u.Quantity(value)else:par=valuesetattr(self,par.name,par)self._covariance=Covariance(self.parameters)covariance_data=kwargs.get("covariance_data",None)ifcovariance_dataisnotNone:self.covariance=covariance_datadef__getattribute__(self,name):value=object.__getattribute__(self,name)ifisinstance(value,Parameter):returnvalue.__get__(self,None)returnvalue@propertydeftype(self):returnself._typedef__init_subclass__(cls,**kwargs):# Add parameters list on the model sub-class (not instances)cls.default_parameters=Parameters([_for_incls.__dict__.values()ifisinstance(_,Parameter)])
[docs]@classmethoddeffrom_parameters(cls,parameters,**kwargs):"""Create model from parameter list. Parameters ---------- parameters : `Parameters` Parameters for init. Returns ------- model : `Model` Model instance. """forparinparameters:kwargs[par.name]=parreturncls(**kwargs)
def_check_covariance(self):ifnotself.parameters==self._covariance.parameters:self._covariance=Covariance(self.parameters)@propertydefcovariance(self):self._check_covariance()forparinself.parameters:pars=Parameters([par])error=np.nan_to_num(par.error**2,nan=1)covar=Covariance(pars,data=[[error]])self._covariance.set_subcovariance(covar)returnself._covariance@covariance.setterdefcovariance(self,covariance):self._check_covariance()self._covariance.data=covarianceforparinself.parameters:pars=Parameters([par])variance=self._covariance.get_subcovariance(pars).datapar.error=np.sqrt(variance[0][0])@propertydefparameters(self):"""Parameters as a `~gammapy.modeling.Parameters` object."""returnParameters([getattr(self,name)fornameinself.default_parameters.names])
[docs]defto_dict(self,full_output=False):"""Create dictionary for YAML serialisation."""tag=self.tag[0]ifisinstance(self.tag,list)elseself.tagparams=self.parameters.to_dict()ifnotfull_output:forpar,par_defaultinzip(params,self.default_parameters):init=par_default.to_dict()foritemin["min","max","error","interp","scale_method","is_norm",]:default=init[item]ifpar[item]==defaultor(np.isnan(par[item])andnp.isnan(default)):delpar[item]ifpar["frozen"]==init["frozen"]:delpar["frozen"]ifinit["unit"]=="":delpar["unit"]data={"type":tag,"parameters":params}ifself.typeisNone:returndataelse:return{self.type:data}
[docs]@classmethoddeffrom_dict(cls,data,**kwargs):key0=next(iter(data))ifkey0in["spatial","temporal","spectral"]:data=data[key0]ifdata["type"]notincls.tag:raiseValueError(f"Invalid model type {data['type']} for class {cls.__name__}")parameters=_build_parameters_from_dict(data["parameters"],cls.default_parameters)returncls.from_parameters(parameters,**kwargs)
def__str__(self):string=f"{self.__class__.__name__}\n"iflen(self.parameters)>0:string+=f"\n{self.parameters.to_table()}"returnstringdef_repr_html_(self):try:returnself.to_html()exceptAttributeError:returnf"<pre>{html.escape(str(self))}</pre>"@propertydeffrozen(self):"""Frozen status of a model, True if all parameters are frozen."""returnnp.all([p.frozenforpinself.parameters])
[docs]deffreeze(self):"""Freeze all parameters."""self.parameters.freeze_all()
[docs]defunfreeze(self):"""Restore parameters frozen status to default."""forp,defaultinzip(self.parameters,self.default_parameters):p.frozen=default.frozen
[docs]defreassign(self,datasets_names,new_datasets_names):"""Reassign a model from one dataset to another. Parameters ---------- datasets_names : str or list Name of the datasets where the model is currently defined. new_datasets_names : str or list Name of the datasets where the model should be defined instead. If multiple names are given the two list must have the save length, as the reassignment is element-wise. Returns ------- model : `Model` Reassigned model. """model=self.copy(name=self.name)ifnotisinstance(datasets_names,list):datasets_names=[datasets_names]ifnotisinstance(new_datasets_names,list):new_datasets_names=[new_datasets_names]ifisinstance(model.datasets_names,str):model.datasets_names=[model.datasets_names]ifgetattr(model,"datasets_names",None):forname,name_newinzip(datasets_names,new_datasets_names):model.datasets_names=[_.replace(name,name_new)for_inmodel.datasets_names]returnmodel
[docs]classModel:"""Model class that contains only methods to create a model listed in the registries."""
[docs]@staticmethoddefcreate(tag,model_type=None,*args,**kwargs):"""Create a model instance. Examples -------- >>> from gammapy.modeling.models import Model >>> spectral_model = Model.create( ... "pl-2", model_type="spectral", amplitude="1e-10 cm-2 s-1", index=3 ... ) >>> type(spectral_model) <class 'gammapy.modeling.models.spectral.PowerLaw2SpectralModel'> """data={"type":tag}ifmodel_typeisnotNone:data={model_type:data}cls=_get_model_class_from_dict(data)returncls(*args,**kwargs)
[docs]@staticmethoddeffrom_dict(data):"""Create a model instance from a dictionary."""cls=_get_model_class_from_dict(data)returncls.from_dict(data)
[docs]classDatasetModels(collections.abc.Sequence):"""Immutable models container. Parameters ---------- models : `SkyModel`, list of `SkyModel` or `Models` Sky models. covariance_data : `~numpy.ndarray` Covariance data. """def__init__(self,models=None,covariance_data=None):ifmodelsisNone:models=[]ifisinstance(models,(Models,DatasetModels)):models=models._modelselifisinstance(models,ModelBase):models=[models]elifnotisinstance(models,list):raiseTypeError(f"Invalid type: {models!r}")unique_names=[]formodelinmodels:ifmodel.nameinunique_names:raise(ValueError("Model names must be unique"))unique_names.append(model.name)self._models=modelsself._covar_file=Noneself._covariance=Covariance(self.parameters)# Set separately because this triggers the update mechanism on the sub-modelsifcovariance_dataisnotNone:self.covariance=covariance_datadef_check_covariance(self):ifnotself.parameters==self._covariance.parameters:self._covariance=Covariance.from_stack([model.covarianceformodelinself._models])@propertydefcovariance(self):"""Covariance as a `~gammapy.modeling.Covariance` object."""self._check_covariance()formodelinself._models:self._covariance.set_subcovariance(model.covariance)returnself._covariance@covariance.setterdefcovariance(self,covariance):self._check_covariance()self._covariance.data=covarianceformodelinself._models:subcovar=self._covariance.get_subcovariance(model.covariance.parameters)model.covariance=subcovar@propertydefparameters(self):"""Parameters as a `~gammapy.modeling.Parameters` object."""returnParameters.from_stack([_.parametersfor_inself._models])@propertydefparameters_unique_names(self):"""List of unique parameter names. Return formatted as model_name.par_type.par_name."""names=[]formodelinself:forparinmodel.parameters:components=[model.name,par.type,par.name]name=".".join(components)names.append(name)returnnames@propertydefnames(self):"""List of model names."""return[m.nameforminself._models]
[docs]@classmethoddefread(cls,filename,checksum=False):"""Read from YAML file. Parameters ---------- filename : str input filename checksum : bool Whether to perform checksum verification. Default is False. """yaml_str=make_path(filename).read_text()path,filename=split(filename)returncls.from_yaml(yaml_str,path=path,checksum=checksum)
[docs]@classmethoddeffrom_yaml(cls,yaml_str,path="",checksum=False):"""Create from YAML string."""data=yaml.safe_load(yaml_str)checksum_str=data.pop("checksum",None)ifchecksum:yaml_str=yaml.dump(data,sort_keys=False,indent=4,width=80,default_flow_style=False)ifnotverify_checksum(yaml_str,checksum_str):warnings.warn("Checksum verification failed.",UserWarning)# TODO : for now metadata are not kept. Add proper metadata creation.data.pop("metadata",None)returncls.from_dict(data,path=path)
[docs]@classmethoddeffrom_dict(cls,data,path=""):"""Create from dictionary."""from.importMODEL_REGISTRY,SkyModelmodels=[]forcomponentindata["components"]:model_cls=MODEL_REGISTRY.get_cls(component["type"])model=model_cls.from_dict(component)models.append(model)models=cls(models)if"covariance"indata:filename=data["covariance"]path=make_path(path)ifnot(path/filename).exists():path,filename=split(filename)models.read_covariance(path,filename,format="ascii.fixed_width")shared_register={}formodelinmodels:ifisinstance(model,SkyModel):submodels=[model.spectral_model,model.spatial_model,model.temporal_model,]forsubmodelinsubmodels:ifsubmodelisnotNone:shared_register=_set_link(shared_register,submodel)else:shared_register=_set_link(shared_register,model)returnmodels
[docs]defwrite(self,path,overwrite=False,full_output=False,overwrite_templates=False,write_covariance=True,checksum=False,):"""Write to YAML file. Parameters ---------- path : `pathlib.Path` or str Path to write files. overwrite : bool, optional Overwrite existing file. Default is False. full_output : bool, optional Store full parameter output. Default is False. overwrite_templates : bool, optional Overwrite templates FITS files. Default is False. write_covariance : bool, optional Whether to save the covariance. Default is True. checksum : bool When True adds a CHECKSUM entry to the file. Default is False. """base_path,_=split(path)path=make_path(path)base_path=make_path(base_path)ifpath.exists()andnotoverwrite:raiseIOError(f"File exists already: {path}")if(write_covarianceandself.covarianceisnotNoneandlen(self.parameters)!=0):filecovar=path.stem+"_covariance.dat"kwargs=dict(format="ascii.fixed_width",delimiter="|",overwrite=overwrite)self.write_covariance(base_path/filecovar,**kwargs)self._covar_file=filecovaryaml_str=self.to_yaml(full_output,overwrite_templates)ifchecksum:yaml_str=add_checksum(yaml_str)path.write_text(yaml_str)
[docs]defto_yaml(self,full_output=False,overwrite_templates=False):"""Convert to YAML string."""data=self.to_dict(full_output,overwrite_templates)text=yaml.dump(data,sort_keys=False,indent=4,width=80,default_flow_style=False)creation=CreatorMetaData()returntext+creation.to_yaml()
[docs]defupdate_link_label(self):"""Update linked parameters labels used for serialisation and print."""params_list=[]params_shared=[]forparaminself.parameters:ifparamnotinparams_list:params_list.append(param)params_list.append(param)elifparamnotinparams_shared:params_shared.append(param)forparaminparams_shared:param._link_label_io=param.name+"@"+make_name()
[docs]defto_dict(self,full_output=False,overwrite_templates=False):"""Convert to dictionary."""self.update_link_label()models_data=[]formodelinself._models:model_data=model.to_dict(full_output)models_data.append(model_data)if(hasattr(model,"spatial_model")andmodel.spatial_modelisnotNoneand"template"inmodel.spatial_model.tag):model.spatial_model.write(overwrite=overwrite_templates)ifmodel.tag=="TemplateNPredModel":model.write(overwrite=overwrite_templates)ifself._covar_fileisnotNone:return{"components":models_data,"covariance":str(self._covar_file),}else:return{"components":models_data}
[docs]defto_parameters_table(self):"""Convert model parameters to a `~astropy.table.Table`."""table=self.parameters.to_table()# Warning: splitting of parameters will break is source name has a "." in its name.model_name=[name.split(".")[0]fornameinself.parameters_unique_names]table.add_column(model_name,name="model",index=0)returntable
[docs]defupdate_parameters_from_table(self,t):"""Update models from a `~astropy.table.Table`."""parameters_dict=[dict(zip(t.colnames,row))forrowint]fork,datainenumerate(parameters_dict):self.parameters[k].update_from_dict(data)
[docs]defread_covariance(self,path,filename="_covariance.dat",**kwargs):"""Read covariance data from file. Parameters ---------- path : str or `Path` Base path. filename : str Filename. **kwargs : dict Keyword arguments passed to `~astropy.table.Table.read`. """path=make_path(path)filepath=str(path/filename)t=Table.read(filepath,**kwargs)t.remove_column("Parameters")arr=np.array(t)data=arr.view(float).reshape(arr.shape+(-1,))self.covariance=dataself._covar_file=filename
[docs]defwrite_covariance(self,filename,**kwargs):"""Write covariance to file. Parameters ---------- filename : str Filename. **kwargs : dict Keyword arguments passed to `~astropy.table.Table.write`. """names=self.parameters_unique_namestable=Table()table["Parameters"]=namesforidx,nameinenumerate(names):values=self.covariance.data[idx]table[str(idx)]=valuestable.write(make_path(filename),**kwargs)
def__str__(self):self.update_link_label()str_=f"{self.__class__.__name__}\n\n"foridx,modelinenumerate(self):str_+=f"Component {idx}: "str_+=str(model)returnstr_.expandtabs(tabsize=2)def_repr_html_(self):try:returnself.to_html()exceptAttributeError:returnf"<pre>{html.escape(str(self))}</pre>"def__add__(self,other):ifisinstance(other,(Models,list)):returnModels([*self,*other])elifisinstance(other,ModelBase):ifother.nameinself.names:raise(ValueError("Model names must be unique"))returnModels([*self,other])else:raiseTypeError(f"Invalid type: {other!r}")def__getitem__(self,key):ifisinstance(key,np.ndarray)andkey.dtype==bool:returnself.__class__(list(np.array(self._models)[key]))else:returnself._models[self.index(key)]
[docs]defcopy(self,copy_data=False):"""Deep copy. Parameters ---------- copy_data : bool Whether to copy data attached to template models. Returns ------- models: `Models` Copied models. """models=[]formodelinself:model_copy=model.copy(name=model.name,copy_data=copy_data)models.append(model_copy)returnself.__class__(models=models,covariance_data=self.covariance.data.copy())
def_slice_by_energy(self,energy_min,energy_max,sum_over_energy_groups=False):"""Copy models and slice TemplateNPredModel in energy range. Parameters ---------- energy_min, energy_max : `~astropy.units.Quantity` Energy bounds of the slice sum_over_energy_groups : bool Whether to sum over the energy groups or not. Default is False. Returns ------- models : `Models` Sliced models. """models_sliced=Models(self.copy())fork,minenumerate(models_sliced):ifm.tag=="TemplateNPredModel":m_sliced=m.slice_by_energy(energy_min,energy_max)ifsum_over_energy_groups:m_sliced.map=m_sliced.map.sum_over_axes(keepdims=True)models_sliced[k]=m_slicedreturnmodels_sliced
[docs]defselect(self,name_substring=None,datasets_names=None,tag=None,model_type=None,frozen=None,):"""Select models that meet all specified conditions. Parameters ---------- name_substring : str, optional Substring contained in the model name. Default is None. datasets_names : str or list, optional Name of the dataset. Default is None. tag : str or list, optional Model tag. Default is None. model_type : {'None', 'spatial', 'spectral'} Type of model, used together with "tag", if the tag is not unique. Default is None. frozen : bool, optional If True, select models with all parameters frozen; if False, exclude them. Default is None. Returns ------- models : `DatasetModels` Selected models. """mask=self.selection_mask(name_substring,datasets_names,tag,model_type,frozen)returnself[mask]
[docs]defselection_mask(self,name_substring=None,datasets_names=None,tag=None,model_type=None,frozen=None,):"""Create a mask of models, that meet all specified conditions. Parameters ---------- name_substring : str, optional Substring contained in the model name. Default is None. datasets_names : str or list of str, optional Name of the dataset. Default is None. tag : str or list of str, optional Model tag. Default is None. model_type : {'None', 'spatial', 'spectral'} Type of model, used together with "tag", if the tag is not unique. Default is None. frozen : bool, optional Select models with all parameters frozen if True, exclude them if False. Default is None. Returns ------- mask : `numpy.array` Boolean mask, True for selected models. """selection=np.ones(len(self),dtype=bool)iftagandnotisinstance(tag,list):tag=[tag]ifdatasets_namesandnotisinstance(datasets_names,list):datasets_names=[datasets_names]foridx,modelinenumerate(self):ifname_substring:selection[idx]&=name_substringinmodel.nameifdatasets_names:selection[idx]&=model.datasets_namesisNoneornp.any([nameinmodel.datasets_namesfornameindatasets_names])iftag:ifmodel_typeisNone:sub_model=modelelse:sub_model=getattr(model,f"{model_type}_model",None)ifsub_model:selection[idx]&=np.any([tinsub_model.tagfortintag])else:selection[idx]&=FalseiffrozenisnotNone:iffrozen:selection[idx]&=model.frozenelse:selection[idx]&=~model.frozenreturnnp.array(selection,dtype=bool)
[docs]defselect_mask(self,mask,margin="0 deg",use_evaluation_region=True):"""Check if sky models contribute within a mask map. Parameters ---------- mask : `~gammapy.maps.WcsNDMap` of boolean type Map containing a boolean mask. margin : `~astropy.unit.Quantity`, optional Add a margin in degree to the source evaluation radius. Used to take into account PSF width. Default is "0 deg". use_evaluation_region : bool, optional Account for the extension of the model or not. Default is True. Returns ------- models : `DatasetModels` Selected models contributing inside the region where mask==True. """models=[]ifnotmask.geom.is_image:mask=mask.reduce_over_axes(func=np.logical_or)formodelinself.select(tag="sky-model"):ifuse_evaluation_region:contributes=model.contributes(mask=mask,margin=margin)else:contributes=mask.get_by_coord(model.position,fill_value=0)ifnp.any(contributes):models.append(model)returnself.__class__(models=models)
[docs]defselect_from_geom(self,geom,**kwargs):"""Select models that fall inside a given geometry. Parameters ---------- geom : `~gammapy.maps.Geom` Geometry to select models from. **kwargs : dict Keyword arguments passed to `~gammapy.modeling.models.DatasetModels.select_mask`. Returns ------- models : `DatasetModels` Selected models. """mask=Map.from_geom(geom=geom,data=True,dtype=bool)returnself.select_mask(mask=mask,**kwargs)
[docs]defselect_region(self,regions,wcs=None):"""Select sky models with center position contained within a given region. Parameters ---------- regions : str, `~regions.Region` or list of `~regions.Region` Region or list of regions (pixel or sky regions accepted). A region can be defined as a string ind DS9 format as well. See http://ds9.si.edu/doc/ref/region.html for details. wcs : `~astropy.wcs.WCS`, optional World coordinate system transformation. Default is None. Returns ------- models : `DatasetModels` Selected models. """geom=RegionGeom.from_regions(regions,wcs=wcs)models=[]formodelinself.select(tag="sky-model"):ifgeom.contains(model.position):models.append(model)returnself.__class__(models=models)
[docs]defrestore_status(self,restore_values=True):"""Context manager to restore status. A copy of the values is made on enter, and those values are restored on exit. Parameters ---------- restore_values : bool, optional Restore values if True, otherwise restore only frozen status and covariance matrix. Default is True. """returnrestore_models_status(self,restore_values)
[docs]defset_parameters_bounds(self,tag,model_type,parameters_names=None,min=None,max=None,value=None):"""Set bounds for the selected models types and parameters names. Parameters ---------- tag : str or list Tag of the models. model_type : {"spatial", "spectral", "temporal"} Type of model. parameters_names : str or list, optional Parameters names. Default is None. min : float, optional Minimum value. Default is None. max : float, optional Maximum value. Default is None. value : float, optional Initial value. Default is None. """models=self.select(tag=tag,model_type=model_type)parameters=models.parameters.select(name=parameters_names,type=model_type)n=len(parameters)ifminisnotNone:parameters.min=np.ones(n)*minifmaxisnotNone:parameters.max=np.ones(n)*maxifvalueisnotNone:parameters.value=np.ones(n)*value
[docs]deffreeze(self,model_type=None):"""Freeze parameters depending on model type. Parameters ---------- model_type : {None, "spatial", "spectral"} Freeze all parameters or only spatial or only spectral. Default is None. """forminself:m.freeze(model_type)
[docs]defunfreeze(self,model_type=None):"""Restore parameters frozen status to default depending on model type. Parameters ---------- model_type : {None, "spatial", "spectral"} Restore frozen status to default for all parameters or only spatial or only spectral. Default is None. """forminself:m.unfreeze(model_type)
@propertydeffrozen(self):"""Boolean mask, True if all parameters of a given model are frozen."""returnnp.all([m.frozenforminself])
[docs]defreassign(self,dataset_name,new_dataset_name):"""Reassign a model from one dataset to another. Parameters ---------- dataset_name : str or list Name of the datasets where the model is currently defined. new_dataset_name : str or list Name of the datasets where the model should be defined instead. If multiple names are given the two list must have the save length, as the reassignment is element-wise. """models=[m.reassign(dataset_name,new_dataset_name)forminself]returnself.__class__(models)
[docs]defto_template_sky_model(self,geom,spectral_model=None,name=None):"""Merge a list of models into a single `~gammapy.modeling.models.SkyModel`. Parameters ---------- geom : `Geom` Map geometry of the result template model. spectral_model : `~gammapy.modeling.models.SpectralModel`, optional One of the NormSpectralModel. Default is None. name : str, optional Name of the new model. Default is None. Returns ------- model : `SkyModel` Template sky model. """from.importPowerLawNormSpectralModel,SkyModel,TemplateSpatialModelunit=u.Unit("1 / (cm2 s sr TeV)")map_=Map.from_geom(geom,unit=unit)forminself:map_+=m.evaluate_geom(geom).to(unit)spatial_model=TemplateSpatialModel(map_,normalize=False)ifspectral_modelisNone:spectral_model=PowerLawNormSpectralModel()returnSkyModel(spectral_model=spectral_model,spatial_model=spatial_model,name=name)
[docs]defto_template_spectral_model(self,geom,mask=None):"""Merge a list of models into a single `~gammapy.modeling.models.TemplateSpectralModel`. For each model the spatial component is integrated over the given geometry where the mask is true and multiplied by the spectral component value in each energy bin. Parameters ---------- geom : `~gammapy.maps.Geom` Map geometry on which the template model is computed. mask : `~gammapy.maps.Map` with bool dtype. Evaluate the model only where the mask is True. Returns ------- model : `~gammapy.modeling.models.TemplateSpectralModel` Template spectral model. """from.importTemplateSpectralModelenergy=geom.axes[0].centerifmaskisNone:mask=Map.from_geom(geom,data=True)elifmask.geom!=geom:mask=mask.interp_to_geom(geom,method="nearest")values=0forminself:dnde=m.spectral_model(energy)spatial_integ=m.spatial_model.integrate_geom(geom)masked_integ=spatial_integ.data*np.isfinite(spatial_integ.data)*maskspatial_integ.data[~np.isfinite(spatial_integ.data)]=np.nandnde*=masked_integ.sum(axis=(1,2))*spatial_integ.unitvalues+=dndereturnTemplateSpectralModel(energy,values)
@propertydefpositions(self):"""Positions of the models as a `~astropy.coordinates.SkyCoord`."""positions=[]formodelinself.select(tag="sky-model"):ifmodel.position:positions.append(model.position.icrs)else:log.warning(f"Skipping model {model.name} - no spatial component present")returnSkyCoord(positions)
[docs]defto_regions(self):"""Return a list of the regions for the spatial models. Returns ------- regions: list of `~regions.SkyRegion` Regions. """regions=[]formodelinself.select(tag="sky-model"):try:region=model.spatial_model.to_region()regions.append(region)exceptAttributeError:log.warning(f"Skipping model {model.name} - no spatial component present")returnregions
@propertydefwcs_geom(self):"""Minimum WCS geom in which all the models are contained."""regions=self.to_regions()try:returnRegionGeom.from_regions(regions).to_wcs_geom()exceptIndexError:log.error("No spatial component in any model. Geom not defined")
[docs]defplot_regions(self,ax=None,kwargs_point=None,path_effect=None,**kwargs):"""Plot extent of the spatial models on a given WCS axis. Parameters ---------- ax : `~astropy.visualization.WCSAxes`, optional Axes to plot on. If no axes are given, an all-sky WCS is chosen using a CAR projection. Default is None. kwargs_point : dict, optional Keyword arguments passed to `~matplotlib.lines.Line2D` for plotting of point sources. Default is None. path_effect : `~matplotlib.patheffects.PathEffect`, optional Path effect applied to artists and lines. Default is None. **kwargs : dict Keyword arguments passed to `~matplotlib.artists.Artist`. Returns ------- ax : `~astropy.visualization.WcsAxes` WCS axes. """regions=self.to_regions()geom=RegionGeom.from_regions(regions=regions)returngeom.plot_region(ax=ax,kwargs_point=kwargs_point,path_effect=path_effect,**kwargs)
[docs]defplot_positions(self,ax=None,**kwargs):"""Plot the centers of the spatial models on a given WCS axis. Parameters ---------- ax : `~astropy.visualization.WCSAxes`, optional Axes to plot on. If no axes are given, an all-sky WCS is chosen using a CAR projection. Default is None. **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.scatter`. Returns ------- ax : `~astropy.visualization.WcsAxes` WCS axes. """fromastropy.visualization.wcsaxesimportWCSAxesifaxisNoneornotisinstance(ax,WCSAxes):ax=Map.from_geom(self.wcs_geom).plot()kwargs.setdefault("marker","*")kwargs.setdefault("color","tab:blue")path_effects=kwargs.get("path_effects",None)xp,yp=self.positions.to_pixel(ax.wcs)p=ax.scatter(xp,yp,**kwargs)ifpath_effects:plt.setp(p,path_effects=path_effects)returnax
[docs]classModels(DatasetModels,collections.abc.MutableSequence):"""Sky model collection. Parameters ---------- models : `SkyModel`, list of `SkyModel` or `Models` Sky models. """def__delitem__(self,key):delself._models[self.index(key)]def__setitem__(self,key,model):fromgammapy.modeling.modelsimport(FoVBackgroundModel,SkyModel,TemplateNPredModel,)ifisinstance(model,(SkyModel,FoVBackgroundModel,TemplateNPredModel)):self._models[self.index(key)]=modelelse:raiseTypeError(f"Invalid type: {model!r}")
[docs]definsert(self,idx,model):ifmodel.nameinself.names:raise(ValueError("Model names must be unique"))self._models.insert(idx,model)