# Licensed under a 3-clause BSD style license - see LICENSE.rstimportabcimportcollections.abcimportcopyimporthtmlimportloggingimportnumpyasnpfromastropyimportunitsasufromastropy.tableimportTable,vstackfromgammapy.dataimportGTIfromgammapy.modeling.modelsimportDatasetModels,Modelsfromgammapy.utils.scriptsimportmake_name,make_path,read_yaml,write_yamllog=logging.getLogger(__name__)__all__=["Dataset","Datasets"]
[docs]classDataset(abc.ABC):"""Dataset abstract base class. For now, see existing examples of type of datasets: - `gammapy.datasets.MapDataset` - `gammapy.datasets.SpectrumDataset` - `gammapy.datasets.FluxPointsDataset` For more information see :ref:`datasets`. TODO: add tutorial how to create your own dataset types. """_residuals_labels={"diff":"data - model","diff/model":"(data - model) / model","diff/sqrt(model)":"(data - model) / sqrt(model)",}def_repr_html_(self):try:returnself.to_html()exceptAttributeError:returnf"<pre>{html.escape(str(self))}</pre>"@property@abc.abstractmethoddeftag(self):pass@propertydefname(self):returnself._name
[docs]defto_dict(self):"""Convert to dict for YAML serialization."""name=self.name.replace(" ","_")filename=f"{name}.fits"return{"name":self.name,"type":self.tag,"filename":filename}
@propertydefmask(self):"""Combined fit and safe mask."""ifself.mask_safeisnotNoneandself.mask_fitisnotNone:returnself.mask_safe&self.mask_fitelifself.mask_fitisnotNone:returnself.mask_fitelifself.mask_safeisnotNone:returnself.mask_safe
[docs]defstat_sum(self):"""Total statistic given the current model parameters and priors."""stat=self.stat_array()ifself.maskisnotNone:stat=stat[self.mask.data]prior_stat_sum=0.0ifself.modelsisnotNone:prior_stat_sum=self.models.parameters.prior_stat_sum()returnnp.sum(stat,dtype=np.float64)+prior_stat_sum
[docs]@abc.abstractmethoddefstat_array(self):"""Statistic array, one value per data point."""
[docs]defcopy(self,name=None):"""A deep copy. Parameters ---------- name : str, optional Name of the copied dataset. Default is None. Returns ------- dataset : `Dataset` Copied datasets. """new=copy.deepcopy(self)name=make_name(name)new._name=name# TODO: check the model behaviour?new.models=Nonereturnnew
@staticmethoddef_compute_residuals(data,model,method="diff"):withnp.errstate(invalid="ignore"):ifmethod=="diff":residuals=data-modelelifmethod=="diff/model":residuals=(data-model)/modelelifmethod=="diff/sqrt(model)":residuals=(data-model)/np.sqrt(model)else:raiseAttributeError(f"Invalid method: {method!r} for computing residuals")returnresiduals
[docs]classDatasets(collections.abc.MutableSequence):"""Container class that holds a list of datasets. Parameters ---------- datasets : `Dataset` or list of `Dataset` Datasets. """def__init__(self,datasets=None):ifdatasetsisNone:datasets=[]ifisinstance(datasets,Datasets):datasets=datasets._datasetselifisinstance(datasets,Dataset):datasets=[datasets]elifnotisinstance(datasets,list):raiseTypeError(f"Invalid type: {datasets!r}")unique_names=[]fordatasetindatasets:ifdataset.nameinunique_names:raise(ValueError("Dataset names must be unique"))unique_names.append(dataset.name)self._datasets=datasets@propertydefparameters(self):"""Unique parameters (`~gammapy.modeling.Parameters`). Duplicate parameter objects have been removed. The order of the unique parameters remains. """returnself.models.parameters.unique_parameters@propertydefmodels(self):"""Unique models (`~gammapy.modeling.Models`). Duplicate model objects have been removed. The order of the unique models remains. """models={}fordatasetinself:ifdataset.modelsisnotNone:formodelindataset.models:models[model]=modelreturnDatasetModels(list(models.keys()))@models.setterdefmodels(self,models):"""Unique models (`~gammapy.modeling.Models`). Duplicate model objects have been removed. The order of the unique models remains. """fordatasetinself:dataset.models=models@propertydefnames(self):return[d.namefordinself._datasets]@propertydefis_all_same_type(self):"""Whether all contained datasets are of the same type."""returnlen(set(_.__class__for_inself))==1@propertydefis_all_same_shape(self):"""Whether all contained datasets have the same data shape."""returnlen(set(_.data_shapefor_inself))==1@propertydefis_all_same_energy_shape(self):"""Whether all contained datasets have the same data shape."""returnlen(set(_.data_shape[0]for_inself))==1@propertydefenergy_axes_are_aligned(self):"""Whether all contained datasets have aligned energy axis."""axes=[d.counts.geom.axes["energy"]fordinself]returnnp.all([axes[0].is_aligned(ax)foraxinaxes])@propertydefcontributes_to_stat(self):"""Stat contributions. Returns ------- contributions : `~numpy.array` Array indicating which dataset contributes to the likelihood. """contributions=[]fordatasetinself:ifdataset.maskisnotNone:value=np.any(dataset.mask)else:value=Truecontributions.append(value)returnnp.array(contributions)
[docs]defstat_sum(self):"""Compute joint statistic function value."""stat_sum=0# TODO: add parallel evaluation of likelihoodsfordatasetinself:stat_sum+=dataset.stat_sum()returnstat_sum
[docs]defselect_time(self,time_min,time_max,atol="1e-6 s"):"""Select datasets in a given time interval. Parameters ---------- time_min, time_max : `~astropy.time.Time` Time interval. atol : `~astropy.units.Quantity` Tolerance value for time comparison with different scale. Default 1e-6 sec. Returns ------- datasets : `Datasets` Datasets in the given time interval. """atol=u.Quantity(atol)datasets=[]fordatasetinself:t_start=dataset.gti.time_start[0]t_stop=dataset.gti.time_stop[-1]ift_start>=(time_min-atol)andt_stop<=(time_max+atol):datasets.append(dataset)returnself.__class__(datasets)
[docs]defslice_by_energy(self,energy_min,energy_max):"""Select and slice datasets in energy range. The method keeps the current dataset names. Datasets that do not contribute to the selected energy range are dismissed. Parameters ---------- energy_min, energy_max : `~astropy.units.Quantity` Energy bounds to compute the flux point for. Returns ------- datasets : Datasets Datasets. """datasets=[]fordatasetinself:try:dataset_sliced=dataset.slice_by_energy(energy_min=energy_min,energy_max=energy_max,name=dataset.name,)exceptValueError:log.info(f"Dataset {dataset.name} does not contribute in the energy range")continuedatasets.append(dataset_sliced)returnself.__class__(datasets=datasets)
[docs]defto_spectrum_datasets(self,region):"""Extract spectrum datasets for the given region. To get more detailed information, see the corresponding function associated to each dataset type: `~gammapy.datasets.MapDataset.to_spectrum_dataset` or `~gammapy.datasets.MapDatasetOnOff.to_spectrum_dataset`. Parameters ---------- region : `~regions.SkyRegion` Region definition. Returns ------- datasets : `Datasets` List of `~gammapy.datasets.SpectrumDataset`. """datasets=Datasets()fordatasetinself:spectrum_dataset=dataset.to_spectrum_dataset(on_region=region,name=dataset.name)datasets.append(spectrum_dataset)returndatasets
@property# TODO: make this a method to support different methods?defenergy_ranges(self):"""Get global energy range of datasets. The energy range is derived as the minimum / maximum of the energy ranges of all datasets. Returns ------- energy_min, energy_max : `~astropy.units.Quantity` Energy range. """energy_mins,energy_maxs=[],[]fordatasetinself:energy_axis=dataset.counts.geom.axes["energy"]energy_mins.append(energy_axis.edges[0])energy_maxs.append(energy_axis.edges[-1])returnu.Quantity(energy_mins),u.Quantity(energy_maxs)def__str__(self):str_=self.__class__.__name__+"\n"str_+="--------\n\n"foridx,datasetinenumerate(self):str_+=f"Dataset {idx}: \n\n"str_+=f"\tType : {dataset.tag}\n"str_+=f"\tName : {dataset.name}\n"try:instrument=set(dataset.meta_table["TELESCOP"]).pop()except(KeyError,TypeError):instrument=""str_+=f"\tInstrument : {instrument}\n"ifdataset.models:names=dataset.models.nameselse:names=""str_+=f"\tModels : {names}\n\n"returnstr_.expandtabs(tabsize=2)def_repr_html_(self):try:returnself.to_html()exceptAttributeError:returnf"<pre>{html.escape(str(self))}</pre>"
[docs]defcopy(self):"""A deep copy."""returncopy.deepcopy(self)
[docs]@classmethoddefread(cls,filename,filename_models=None,lazy=True,cache=True,checksum=True):"""De-serialize datasets from YAML and FITS files. Parameters ---------- filename : str or `~pathlib.Path` File path or name of datasets yaml file. filename_models : str or `~pathlib.Path`, optional File path or name of models yaml file. Default is None. lazy : bool Whether to lazy load data into memory. Default is True. cache : bool Whether to cache the data after loading. Default is True. checksum : bool Whether to perform checksum verification. Default is False. Returns ------- dataset : `gammapy.datasets.Datasets` Datasets. """from.importDATASET_REGISTRYfilename=make_path(filename)data_list=read_yaml(filename,checksum=checksum)datasets=[]fordataindata_list["datasets"]:path=filename.parentif(path/data["filename"]).exists():data["filename"]=str(make_path(path/data["filename"]))dataset_cls=DATASET_REGISTRY.get_cls(data["type"])dataset=dataset_cls.from_dict(data,lazy=lazy,cache=cache)datasets.append(dataset)datasets=cls(datasets)iffilename_models:datasets.models=Models.read(filename_models,checksum=checksum)returndatasets
[docs]defwrite(self,filename,filename_models=None,overwrite=False,write_covariance=True,checksum=False,):"""Serialize datasets to YAML and FITS files. Parameters ---------- filename : str or `~pathlib.Path` File path or name of datasets yaml file. filename_models : str or `~pathlib.Path`, optional File path or name of models yaml file. Default is None. overwrite : bool, optional Overwrite existing file. Default is False. write_covariance : bool save covariance or not. Default is False. checksum : bool When True adds both DATASUM and CHECKSUM cards to the headers written to the FITS files. Default is False. """path=make_path(filename)data={"datasets":[]}fordatasetinself._datasets:d=dataset.to_dict()filename=d["filename"]dataset.write(path.parent/filename,overwrite=overwrite,checksum=checksum)data["datasets"].append(d)ifpath.exists()andnotoverwrite:raiseIOError(f"File exists already: {path}")write_yaml(data,path,sort_keys=False,checksum=checksum)iffilename_models:self.models.write(filename_models,overwrite=overwrite,write_covariance=write_covariance,checksum=checksum,)
[docs]defstack_reduce(self,name=None,nan_to_num=True):"""Reduce the Datasets to a unique Dataset by stacking them together. This works only if all datasets are of the same type and with aligned geometries, and if a proper in-place stack method exists for the Dataset type. Parameters ---------- name : str, optional Name of the stacked dataset. Default is None. nan_to_num : bool Non-finite values are replaced by zero if True. Default is True. Returns ------- dataset : `~gammapy.datasets.Dataset` The stacked dataset. """ifnotself.is_all_same_type:raiseValueError("Stacking impossible: all Datasets contained are not of a unique type.")stacked=self[0].to_masked(name=name,nan_to_num=nan_to_num)fordatasetinself[1:]:stacked.stack(dataset,nan_to_num=nan_to_num)returnstacked
[docs]definfo_table(self,cumulative=False):"""Get info table for datasets. Parameters ---------- cumulative : bool Cumulate information across all datasets. If True, all model-dependent information will be lost. Default is False. Returns ------- info_table : `~astropy.table.Table` Info table. """ifnotself.is_all_same_type:raiseValueError("Info table not supported for mixed dataset type.")rows=[]ifcumulative:name="stacked"stacked=self[0].to_masked(name=name)rows.append(stacked.info_dict())fordatasetinself[1:]:stacked.stack(dataset)rows.append(stacked.info_dict())else:fordatasetinself:rows.append(dataset.info_dict())returnTable(rows)
# TODO: merge with meta table?@propertydefgti(self):"""GTI table."""time_intervals=[]fordatasetinself:ifdataset.gtiisnotNoneandlen(dataset.gti.table)>0:interval=(dataset.gti.time_start[0],dataset.gti.time_stop[-1])time_intervals.append(interval)iflen(time_intervals)==0:returnNonereturnGTI.from_time_intervals(time_intervals)@propertydefmeta_table(self):"""Meta table."""tables=[d.meta_tablefordinself]ifnp.all([tableisNonefortableintables]):meta_table=Table()else:meta_table=vstack(tables).copy()meta_table.add_column([d.tagfordinself],index=0,name="TYPE")meta_table.add_column(self.names,index=0,name="NAME")returnmeta_tabledef__getitem__(self,key):returnself._datasets[self.index(key)]def__delitem__(self,key):delself._datasets[self.index(key)]def__setitem__(self,key,dataset):ifisinstance(dataset,Dataset):ifdataset.nameinself.names:raise(ValueError("Dataset names must be unique"))self._datasets[self.index(key)]=datasetelse:raiseTypeError(f"Invalid type: {type(dataset)!r}")
[docs]definsert(self,idx,dataset):ifisinstance(dataset,Dataset):ifdataset.nameinself.names:raise(ValueError("Dataset names must be unique"))self._datasets.insert(idx,dataset)else:raiseTypeError(f"Invalid type: {type(dataset)!r}")