# Licensed under a 3-clause BSD style license - see LICENSE.rstimportcollections.abcimportcopyimportloggingfromos.pathimportsplitimportnumpyasnpimportastropy.unitsasufromastropy.coordinatesimportSkyCoordfromastropy.tableimportTableimportmatplotlib.pyplotaspltimportyamlfromgammapy.mapsimportMap,RegionGeomfromgammapy.modelingimportCovariance,Parameter,Parametersfromgammapy.modeling.covarianceimportcopy_covariancefromgammapy.utils.scriptsimportmake_name,make_path__all__=["Model","Models","DatasetModels","ModelBase"]log=logging.getLogger(__name__)def_set_link(shared_register,model):forparaminmodel.parameters:name=param.namelink_label=param._link_label_ioiflink_labelisnotNone:iflink_labelinshared_register:new_param=shared_register[link_label]setattr(model,name,new_param)else:shared_register[link_label]=paramreturnshared_registerdef_get_model_class_from_dict(data):"""get a model class from a dict"""from.import(MODEL_REGISTRY,SPATIAL_MODEL_REGISTRY,SPECTRAL_MODEL_REGISTRY,TEMPORAL_MODEL_REGISTRY,)if"type"indata:cls=MODEL_REGISTRY.get_cls(data["type"])elif"spatial"indata:cls=SPATIAL_MODEL_REGISTRY.get_cls(data["spatial"]["type"])elif"spectral"indata:cls=SPECTRAL_MODEL_REGISTRY.get_cls(data["spectral"]["type"])elif"temporal"indata:cls=TEMPORAL_MODEL_REGISTRY.get_cls(data["temporal"]["type"])returnclsdef_build_parameters_from_dict(data,default_parameters):"""Build Parameters object from input dict and default parameter values."""par_data=[]input_names=[_["name"]for_indata]forparindefault_parameters:par_dict=par.to_dict()try:index=input_names.index(par_dict["name"])par_dict.update(data[index])exceptValueError:log.warning(f"Parameter '{par_dict['name']}' not defined in YAML file."f" Using default value: {par_dict['value']}{par_dict['unit']}")par_data.append(par_dict)returnParameters.from_dict(par_data)
[docs]classModelBase:"""Model base class."""_type=Nonedef__init__(self,**kwargs):# Copy default parameters from the class to the instancedefault_parameters=self.default_parameters.copy()forparindefault_parameters:value=kwargs.get(par.name,par)ifnotisinstance(value,Parameter):par.quantity=u.Quantity(value)else:par=valuesetattr(self,par.name,par)self._covariance=Covariance(self.parameters)def__getattribute__(self,name):value=object.__getattribute__(self,name)ifisinstance(value,Parameter):returnvalue.__get__(self,None)returnvalue@propertydeftype(self):returnself._typedef__init_subclass__(cls,**kwargs):# Add parameters list on the model sub-class (not instances)cls.default_parameters=Parameters([_for_incls.__dict__.values()ifisinstance(_,Parameter)])
[docs]@classmethoddeffrom_parameters(cls,parameters,**kwargs):"""Create model from parameter list Parameters ---------- parameters : `Parameters` Parameters for init Returns ------- model : `Model` Model instance """forparinparameters:kwargs[par.name]=parreturncls(**kwargs)
def_check_covariance(self):ifnotself.parameters==self._covariance.parameters:self._covariance=Covariance(self.parameters)@propertydefcovariance(self):self._check_covariance()forparinself.parameters:pars=Parameters([par])error=np.nan_to_num(par.error**2,nan=1)covar=Covariance(pars,data=[[error]])self._covariance.set_subcovariance(covar)returnself._covariance@covariance.setterdefcovariance(self,covariance):self._check_covariance()self._covariance.data=covarianceforparinself.parameters:pars=Parameters([par])variance=self._covariance.get_subcovariance(pars)par.error=np.sqrt(variance)@propertydefparameters(self):"""Parameters (`~gammapy.modeling.Parameters`)"""returnParameters([getattr(self,name)fornameinself.default_parameters.names])@copy_covariancedefcopy(self,**kwargs):"""A deep copy."""returncopy.deepcopy(self)
[docs]defto_dict(self,full_output=False):"""Create dict for YAML serialisation"""tag=self.tag[0]ifisinstance(self.tag,list)elseself.tagparams=self.parameters.to_dict()ifnotfull_output:forpar,par_defaultinzip(params,self.default_parameters):init=par_default.to_dict()foritemin["min","max","error","interp","scale_method","is_norm",]:default=init[item]ifpar[item]==defaultor(np.isnan(par[item])andnp.isnan(default)):delpar[item]ifpar["frozen"]==init["frozen"]:delpar["frozen"]ifinit["unit"]=="":delpar["unit"]data={"type":tag,"parameters":params}ifself.typeisNone:returndataelse:return{self.type:data}
[docs]@classmethoddeffrom_dict(cls,data):kwargs={}key0=next(iter(data))ifkey0in["spatial","temporal","spectral"]:data=data[key0]ifdata["type"]notincls.tag:raiseValueError(f"Invalid model type {data['type']} for class {cls.__name__}")parameters=_build_parameters_from_dict(data["parameters"],cls.default_parameters)# TODO: this is a special case for spatial models, maybe better move to# `SpatialModel` base classif"frame"indata:kwargs["frame"]=data["frame"]returncls.from_parameters(parameters,**kwargs)
def__str__(self):string=f"{self.__class__.__name__}\n"iflen(self.parameters)>0:string+=f"\n{self.parameters.to_table()}"returnstring@propertydeffrozen(self):"""Frozen status of a model, True if all parameters are frozen"""returnnp.all([p.frozenforpinself.parameters])
[docs]deffreeze(self):"""Freeze all parameters"""self.parameters.freeze_all()
[docs]defunfreeze(self):"""Restore parameters frozen status to default"""forp,defaultinzip(self.parameters,self.default_parameters):p.frozen=default.frozen
[docs]defreassign(self,datasets_names,new_datasets_names):"""Reassign a model from one dataset to another Parameters ---------- datasets_names : str or list Name of the datasets where the model is currently defined new_datasets_names : str or list Name of the datasets where the model should be defined instead. If multiple names are given the two list must have the save length, as the reassignment is element-wise. Returns ------- model : `Model` Reassigned model. """model=self.copy(name=self.name)ifnotisinstance(datasets_names,list):datasets_names=[datasets_names]ifnotisinstance(new_datasets_names,list):new_datasets_names=[new_datasets_names]ifisinstance(model.datasets_names,str):model.datasets_names=[model.datasets_names]ifgetattr(model,"datasets_names",None):forname,name_newinzip(datasets_names,new_datasets_names):model.datasets_names=[_.replace(name,name_new)for_inmodel.datasets_names]returnmodel
[docs]classModel:"""Model class that contains only methods to create a model listed in the registries."""
[docs]@staticmethoddefcreate(tag,model_type=None,*args,**kwargs):"""Create a model instance. Examples -------- >>> from gammapy.modeling.models import Model >>> spectral_model = Model.create( "pl-2", model_type="spectral", amplitude="1e-10 cm-2 s-1", index=3 ) >>> type(spectral_model) <class 'gammapy.modeling.models.spectral.PowerLaw2SpectralModel'> """data={"type":tag}ifmodel_typeisnotNone:data={model_type:data}cls=_get_model_class_from_dict(data)returncls(*args,**kwargs)
[docs]@staticmethoddeffrom_dict(data):"""Create a model instance from a dict"""cls=_get_model_class_from_dict(data)returncls.from_dict(data)
[docs]classDatasetModels(collections.abc.Sequence):"""Immutable models container Parameters ---------- models : `SkyModel`, list of `SkyModel` or `Models` Sky models """def__init__(self,models=None):ifmodelsisNone:models=[]ifisinstance(models,(Models,DatasetModels)):models=models._modelselifisinstance(models,ModelBase):models=[models]elifnotisinstance(models,list):raiseTypeError(f"Invalid type: {models!r}")unique_names=[]formodelinmodels:ifmodel.nameinunique_names:raise(ValueError("Model names must be unique"))unique_names.append(model.name)self._models=modelsself._covar_file=Noneself._covariance=Covariance(self.parameters)def_check_covariance(self):ifnotself.parameters==self._covariance.parameters:self._covariance=Covariance.from_stack([model.covarianceformodelinself._models])@propertydefcovariance(self):self._check_covariance()formodelinself._models:self._covariance.set_subcovariance(model.covariance)returnself._covariance@covariance.setterdefcovariance(self,covariance):self._check_covariance()self._covariance.data=covarianceformodelinself._models:subcovar=self._covariance.get_subcovariance(model.covariance.parameters)model.covariance=subcovar@propertydefparameters(self):returnParameters.from_stack([_.parametersfor_inself._models])@propertydefparameters_unique_names(self):"""List of unique parameter names as model_name.par_type.par_name"""names=[]formodelinself:forparinmodel.parameters:components=[model.name,par.type,par.name]name=".".join(components)names.append(name)returnnames@propertydefnames(self):return[m.nameforminself._models]
[docs]@classmethoddefread(cls,filename):"""Read from YAML file."""yaml_str=make_path(filename).read_text()path,filename=split(filename)returncls.from_yaml(yaml_str,path=path)
[docs]@classmethoddeffrom_yaml(cls,yaml_str,path=""):"""Create from YAML string."""data=yaml.safe_load(yaml_str)returncls.from_dict(data,path=path)
[docs]@classmethoddeffrom_dict(cls,data,path=""):"""Create from dict."""from.importMODEL_REGISTRY,SkyModelmodels=[]forcomponentindata["components"]:model_cls=MODEL_REGISTRY.get_cls(component["type"])model=model_cls.from_dict(component)models.append(model)models=cls(models)if"covariance"indata:filename=data["covariance"]path=make_path(path)ifnot(path/filename).exists():path,filename=split(filename)models.read_covariance(path,filename,format="ascii.fixed_width")shared_register={}formodelinmodels:ifisinstance(model,SkyModel):submodels=[model.spectral_model,model.spatial_model,model.temporal_model,]forsubmodelinsubmodels:ifsubmodelisnotNone:shared_register=_set_link(shared_register,submodel)else:shared_register=_set_link(shared_register,model)returnmodels
[docs]defwrite(self,path,overwrite=False,full_output=False,overwrite_templates=False,write_covariance=True,):"""Write to YAML file. Parameters ---------- path : `pathlib.Path` or str path to write files overwrite : bool overwrite YAML files full_output : bool Store full parameter output. overwrite_templates : bool overwrite templates FITS files write_covariance : bool save covariance or not """base_path,_=split(path)path=make_path(path)base_path=make_path(base_path)ifpath.exists()andnotoverwrite:raiseIOError(f"File exists already: {path}")if(write_covarianceandself.covarianceisnotNoneandlen(self.parameters)!=0):filecovar=path.stem+"_covariance.dat"kwargs=dict(format="ascii.fixed_width",delimiter="|",overwrite=overwrite)self.write_covariance(base_path/filecovar,**kwargs)self._covar_file=filecovarpath.write_text(self.to_yaml(full_output,overwrite_templates))
[docs]defto_yaml(self,full_output=False,overwrite_templates=False):"""Convert to YAML string."""data=self.to_dict(full_output,overwrite_templates)returnyaml.dump(data,sort_keys=False,indent=4,width=80,default_flow_style=False)
[docs]defupdate_link_label(self):"""update linked parameters labels used for serialization and print"""params_list=[]params_shared=[]forparaminself.parameters:ifparamnotinparams_list:params_list.append(param)params_list.append(param)elifparamnotinparams_shared:params_shared.append(param)forparaminparams_shared:param._link_label_io=param.name+"@"+make_name()
[docs]defto_dict(self,full_output=False,overwrite_templates=False):"""Convert to dict."""self.update_link_label()models_data=[]formodelinself._models:model_data=model.to_dict(full_output)models_data.append(model_data)if(hasattr(model,"spatial_model")andmodel.spatial_modelisnotNoneand"template"inmodel.spatial_model.tag):model.spatial_model.write(overwrite=overwrite_templates)ifself._covar_fileisnotNone:return{"components":models_data,"covariance":str(self._covar_file),}else:return{"components":models_data}
[docs]defto_parameters_table(self):"""Convert Models parameters to an astropy Table."""table=self.parameters.to_table()# Warning: splitting of parameters will break is source name has a "." in its name.model_name=[name.split(".")[0]fornameinself.parameters_unique_names]table.add_column(model_name,name="model",index=0)returntable
[docs]defupdate_parameters_from_table(self,t):"""Update Models from an astropy Table."""parameters_dict=[dict(zip(t.colnames,row))forrowint]fork,datainenumerate(parameters_dict):self.parameters[k].update_from_dict(data)
[docs]defread_covariance(self,path,filename="_covariance.dat",**kwargs):"""Read covariance data from file Parameters ---------- path : str or `Path` Base path filename : str Filename **kwargs : dict Keyword arguments passed to `~astropy.table.Table.read` """path=make_path(path)filepath=str(path/filename)t=Table.read(filepath,**kwargs)t.remove_column("Parameters")arr=np.array(t)data=arr.view(float).reshape(arr.shape+(-1,))self.covariance=dataself._covar_file=filename
[docs]defwrite_covariance(self,filename,**kwargs):"""Write covariance to file Parameters ---------- filename : str Filename **kwargs : dict Keyword arguments passed to `~astropy.table.Table.write` """names=self.parameters_unique_namestable=Table()table["Parameters"]=namesforidx,nameinenumerate(names):values=self.covariance.data[idx]table[str(idx)]=valuestable.write(make_path(filename),**kwargs)
def__str__(self):self.update_link_label()str_=f"{self.__class__.__name__}\n\n"foridx,modelinenumerate(self):str_+=f"Component {idx}: "str_+=str(model)returnstr_.expandtabs(tabsize=2)def__add__(self,other):ifisinstance(other,(Models,list)):returnModels([*self,*other])elifisinstance(other,ModelBase):ifother.nameinself.names:raise(ValueError("Model names must be unique"))returnModels([*self,other])else:raiseTypeError(f"Invalid type: {other!r}")def__getitem__(self,key):ifisinstance(key,np.ndarray)andkey.dtype==bool:returnself.__class__(list(np.array(self._models)[key]))else:returnself._models[self.index(key)]
def__len__(self):returnlen(self._models)def_ipython_key_completions_(self):returnself.names@copy_covariancedefcopy(self,copy_data=False):"""A deep copy. Parameters ---------- copy_data : bool Whether to copy data attached to template models Returns ------- models: `Models` Copied models. """models=[]formodelinself:model_copy=model.copy(name=model.name,copy_data=copy_data)models.append(model_copy)returnself.__class__(models=models)
[docs]defselect(self,name_substring=None,datasets_names=None,tag=None,model_type=None,frozen=None,):"""Select models that meet all specified conditions Parameters ---------- name_substring : str Substring contained in the model name datasets_names : str or list Name of the dataset tag : str or list Model tag model_type : {None, spatial, spectral} Type of model, used together with "tag", if the tag is not unique. frozen : bool Select models with all parameters frozen if True, exclude them if False. Returns ------- models : `DatasetModels` Selected models """mask=self.selection_mask(name_substring,datasets_names,tag,model_type,frozen)returnself[mask]
[docs]defselection_mask(self,name_substring=None,datasets_names=None,tag=None,model_type=None,frozen=None,):"""Create a mask of models, that meet all specified conditions Parameters ---------- name_substring : str Substring contained in the model name datasets_names : str or list of str Name of the dataset tag : str or list of str Model tag model_type : {None, spatial, spectral} Type of model, used together with "tag", if the tag is not unique. frozen : bool Select models with all parameters frozen if True, exclude them if False. Returns ------- mask : `numpy.array` Boolean mask, True for selected models """selection=np.ones(len(self),dtype=bool)iftagandnotisinstance(tag,list):tag=[tag]ifdatasets_namesandnotisinstance(datasets_names,list):datasets_names=[datasets_names]foridx,modelinenumerate(self):ifname_substring:selection[idx]&=name_substringinmodel.nameifdatasets_names:selection[idx]&=model.datasets_namesisNoneornp.any([nameinmodel.datasets_namesfornameindatasets_names])iftag:ifmodel_typeisNone:sub_model=modelelse:sub_model=getattr(model,f"{model_type}_model",None)ifsub_model:selection[idx]&=np.any([tinsub_model.tagfortintag])else:selection[idx]&=FalseiffrozenisnotNone:iffrozen:selection[idx]&=model.frozenelse:selection[idx]&=~model.frozenreturnnp.array(selection,dtype=bool)
[docs]defselect_mask(self,mask,margin="0 deg",use_evaluation_region=True):"""Check if sky models contribute within a mask map. Parameters ---------- mask : `~gammapy.maps.WcsNDMap` of boolean type Map containing a boolean mask margin : `~astropy.unit.Quantity` Add a margin in degree to the source evaluation radius. Used to take into account PSF width. use_evaluation_region : bool Account for the extension of the model or not. The default is True. Returns ------- models : `DatasetModels` Selected models contributing inside the region where mask==True """models=[]ifnotmask.geom.is_image:mask=mask.reduce_over_axes(func=np.logical_or)formodelinself.select(tag="sky-model"):ifuse_evaluation_region:contributes=model.contributes(mask=mask,margin=margin)else:contributes=mask.get_by_coord(model.position,fill_value=0)ifnp.any(contributes):models.append(model)returnself.__class__(models=models)
[docs]defselect_region(self,regions,wcs=None):"""Select sky models with center position contained within a given region Parameters ---------- regions : str, `~regions.Region` or list of `~regions.Region` Region or list of regions (pixel or sky regions accepted). A region can be defined as a string ind DS9 format as well. See http://ds9.si.edu/doc/ref/region.html for details. wcs : `~astropy.wcs.WCS` World coordinate system transformation Returns ------- models : `DatasetModels` Selected models """geom=RegionGeom.from_regions(regions,wcs=wcs)models=[]formodelinself.select(tag="sky-model"):ifgeom.contains(model.position):models.append(model)returnself.__class__(models=models)
[docs]defrestore_status(self,restore_values=True):"""Context manager to restore status. A copy of the values is made on enter, and those values are restored on exit. Parameters ---------- restore_values : bool Restore values if True, otherwise restore only frozen status and covariance matrix. """returnrestore_models_status(self,restore_values)
[docs]defset_parameters_bounds(self,tag,model_type,parameters_names=None,min=None,max=None,value=None):"""Set bounds for the selected models types and parameters names Parameters ---------- tag : str or list Tag of the models model_type : {"spatial", "spectral", "temporal"} Type of model parameters_names : str or list parameters names min : float min value max : float max value value : float init value """models=self.select(tag=tag,model_type=model_type)parameters=models.parameters.select(name=parameters_names,type=model_type)n=len(parameters)ifminisnotNone:parameters.min=np.ones(n)*minifmaxisnotNone:parameters.max=np.ones(n)*maxifvalueisnotNone:parameters.value=np.ones(n)*value
[docs]deffreeze(self,model_type=None):"""Freeze parameters depending on model type Parameters ---------- model_type : {None, "spatial", "spectral"} freeze all parameters or only spatial or only spectral """forminself:m.freeze(model_type)
[docs]defunfreeze(self,model_type=None):"""Restore parameters frozen status to default depending on model type Parameters ---------- model_type : {None, "spatial", "spectral"} restore frozen status to default for all parameters or only spatial or only spectral """forminself:m.unfreeze(model_type)
@propertydeffrozen(self):"""Boolean mask, True if all parameters of a given model are frozen"""returnnp.all([m.frozenforminself])
[docs]defreassign(self,dataset_name,new_dataset_name):"""Reassign a model from one dataset to another Parameters ---------- dataset_name : str or list Name of the datasets where the model is currently defined new_dataset_name : str or list Name of the datasets where the model should be defined instead. If multiple names are given the two list must have the save length, as the reassignment is element-wise. """models=[m.reassign(dataset_name,new_dataset_name)forminself]returnself.__class__(models)
[docs]defto_template_sky_model(self,geom,spectral_model=None,name=None):"""Merge a list of models into a single `~gammapy.modeling.models.SkyModel` Parameters ---------- geom : `Geom` Map geometry of the result template model. spectral_model : `~gammapy.modeling.models.SpectralModel` One of the NormSpectralMdel name : str Name of the new model Returns ------- model : `SkyModel` Template sky model. """from.importPowerLawNormSpectralModel,SkyModel,TemplateSpatialModelunit=u.Unit("1 / (cm2 s sr TeV)")map_=Map.from_geom(geom,unit=unit)forminself:map_+=m.evaluate_geom(geom).to(unit)spatial_model=TemplateSpatialModel(map_,normalize=False)ifspectral_modelisNone:spectral_model=PowerLawNormSpectralModel()returnSkyModel(spectral_model=spectral_model,spatial_model=spatial_model,name=name)
@propertydefpositions(self):"""Positions of the models (`~astropy.coordinates.SkyCoord`)"""positions=[]formodelinself.select(tag="sky-model"):ifmodel.position:positions.append(model.position.icrs)else:log.warning(f"Skipping model {model.name} - no spatial component present")returnSkyCoord(positions)
[docs]defto_regions(self):"""Returns a list of the regions for the spatial models Returns ------- regions: list of `~regions.SkyRegion` Regions """regions=[]formodelinself.select(tag="sky-model"):try:region=model.spatial_model.to_region()regions.append(region)exceptAttributeError:log.warning(f"Skipping model {model.name} - no spatial component present")returnregions
@propertydefwcs_geom(self):"""Minimum WCS geom in which all the models are contained"""regions=self.to_regions()try:returnRegionGeom.from_regions(regions).to_wcs_geom()exceptIndexError:log.error("No spatial component in any model. Geom not defined")
[docs]defplot_regions(self,ax=None,kwargs_point=None,path_effect=None,**kwargs):"""Plot extent of the spatial models on a given wcs axis Parameters ---------- ax : `~astropy.visualization.WCSAxes` Axes to plot on. If no axes are given, an all-sky wcs is chosen using a CAR projection kwargs_point : dict Keyword arguments passed to `~matplotlib.lines.Line2D` for plotting of point sources path_effect : `~matplotlib.patheffects.PathEffect` Path effect applied to artists and lines. **kwargs : dict Keyword arguments passed to `~matplotlib.artists.Artist` Returns ------- ax : `~astropy.visualization.WcsAxes` WCS axes """regions=self.to_regions()geom=RegionGeom.from_regions(regions=regions)returngeom.plot_region(ax=ax,kwargs_point=kwargs_point,path_effect=path_effect,**kwargs)
[docs]defplot_positions(self,ax=None,**kwargs):""" "Plot the centers of the spatial models on a given wcs axis Parameters ---------- ax : `~astropy.visualization.WCSAxes` Axes to plot on. If no axes are given, an all-sky wcs is chosen using a CAR projection **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.scatter` Returns ------- ax : `~astropy.visualization.WcsAxes` Wcs axes """fromastropy.visualization.wcsaxesimportWCSAxesifaxisNoneornotisinstance(ax,WCSAxes):ax=Map.from_geom(self.wcs_geom).plot()kwargs.setdefault("marker","*")kwargs.setdefault("color","tab:blue")path_effects=kwargs.get("path_effects",None)xp,yp=self.positions.to_pixel(ax.wcs)p=ax.scatter(xp,yp,**kwargs)ifpath_effects:plt.setp(p,path_effects=path_effects)returnax
[docs]classModels(DatasetModels,collections.abc.MutableSequence):"""Sky model collection. Parameters ---------- models : `SkyModel`, list of `SkyModel` or `Models` Sky models """def__delitem__(self,key):delself._models[self.index(key)]def__setitem__(self,key,model):fromgammapy.modeling.modelsimportFoVBackgroundModel,SkyModelifisinstance(model,(SkyModel,FoVBackgroundModel)):self._models[self.index(key)]=modelelse:raiseTypeError(f"Invalid type: {model!r}")
[docs]definsert(self,idx,model):ifmodel.nameinself.names:raise(ValueError("Model names must be unique"))self._models.insert(idx,model)