Source code for gammapy.datasets.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import abc
import collections.abc
import copy
import logging
import numpy as np
from astropy import units as u
from astropy.table import Table, vstack
from gammapy.data import GTI
from gammapy.modeling.models import DatasetModels, Models
from gammapy.utils.scripts import make_name, make_path, read_yaml, write_yaml
from gammapy.utils.table import table_from_row_data

log = logging.getLogger(__name__)


__all__ = ["Dataset", "Datasets"]


[docs]class Dataset(abc.ABC): """Dataset abstract base class. TODO: add tutorial how to create your own dataset types. For now, see existing examples in Gammapy how this works: - `gammapy.datasets.MapDataset` - `gammapy.datasets.SpectrumDataset` - `gammapy.datasets.FluxPointsDataset` """ _residuals_labels = { "diff": "data - model", "diff/model": "(data - model) / model", "diff/sqrt(model)": "(data - model) / sqrt(model)", } @property @abc.abstractmethod def tag(self): pass @property def name(self): return self._name
[docs] def to_dict(self): """Convert to dict for YAML serialization.""" name = self.name.replace(" ", "_") filename = f"{name}.fits" return {"name": self.name, "type": self.tag, "filename": filename}
@property def mask(self): """Combined fit and safe mask""" if self.mask_safe is not None and self.mask_fit is not None: return self.mask_safe & self.mask_fit elif self.mask_fit is not None: return self.mask_fit elif self.mask_safe is not None: return self.mask_safe
[docs] def stat_sum(self): """Total statistic given the current model parameters.""" stat = self.stat_array() if self.mask is not None: stat = stat[self.mask.data] return np.sum(stat, dtype=np.float64)
[docs] @abc.abstractmethod def stat_array(self): """Statistic array, one value per data point."""
[docs] def copy(self, name=None): """A deep copy. Parameters ---------- name : str Name of the copied dataset Returns ------- dataset : `Dataset` Copied datasets. """ new = copy.deepcopy(self) name = make_name(name) new._name = name # TODO: check the model behaviour? new.models = None return new
@staticmethod def _compute_residuals(data, model, method="diff"): with np.errstate(invalid="ignore"): if method == "diff": residuals = data - model elif method == "diff/model": residuals = (data - model) / model elif method == "diff/sqrt(model)": residuals = (data - model) / np.sqrt(model) else: raise AttributeError( f"Invalid method: {method!r} for computing residuals" ) return residuals
[docs]class Datasets(collections.abc.MutableSequence): """Dataset collection. Parameters ---------- datasets : `Dataset` or list of `Dataset` Datasets """ def __init__(self, datasets=None): if datasets is None: datasets = [] if isinstance(datasets, Datasets): datasets = datasets._datasets elif isinstance(datasets, Dataset): datasets = [datasets] elif not isinstance(datasets, list): raise TypeError(f"Invalid type: {datasets!r}") unique_names = [] for dataset in datasets: if dataset.name in unique_names: raise (ValueError("Dataset names must be unique")) unique_names.append(dataset.name) self._datasets = datasets @property def parameters(self): """Unique parameters (`~gammapy.modeling.Parameters`). Duplicate parameter objects have been removed. The order of the unique parameters remains. """ return self.models.parameters.unique_parameters @property def models(self): """Unique models (`~gammapy.modeling.Models`). Duplicate model objects have been removed. The order of the unique models remains. """ models = {} for dataset in self: if dataset.models is not None: for model in dataset.models: models[model] = model return DatasetModels(list(models.keys())) @models.setter def models(self, models): """Unique models (`~gammapy.modeling.Models`). Duplicate model objects have been removed. The order of the unique models remains. """ for dataset in self: dataset.models = models @property def names(self): return [d.name for d in self._datasets] @property def is_all_same_type(self): """Whether all contained datasets are of the same type""" return len(set(_.__class__ for _ in self)) == 1 @property def is_all_same_shape(self): """Whether all contained datasets have the same data shape""" return len(set(_.data_shape for _ in self)) == 1 @property def is_all_same_energy_shape(self): """Whether all contained datasets have the same data shape""" return len(set(_.data_shape[0] for _ in self)) == 1 @property def energy_axes_are_aligned(self): """Whether all contained datasets have aligned energy axis""" axes = [d.counts.geom.axes["energy"] for d in self] return np.all([axes[0].is_aligned(ax) for ax in axes]) @property def contributes_to_stat(self): """Stat contributions Returns ------- contributions : `~numpy.array` Array indicating which dataset contributes to the likelihood. """ contributions = [] for dataset in self: if dataset.mask is not None: value = dataset.mask.data.any() else: value = True contributions.append(value) return np.array(contributions)
[docs] def stat_sum(self): """Compute joint likelihood""" stat_sum = 0 # TODO: add parallel evaluation of likelihoods for dataset in self: stat_sum += dataset.stat_sum() return stat_sum
[docs] def select_time(self, time_min, time_max, atol="1e-6 s"): """Select datasets in a given time interval. Parameters ---------- time_min, time_max : `~astropy.time.Time` Time interval atol : `~astropy.units.Quantity` Tolerance value for time comparison with different scale. Default 1e-6 sec. Returns ------- datasets : `Datasets` Datasets in the given time interval. """ atol = u.Quantity(atol) datasets = [] for dataset in self: t_start = dataset.gti.time_start[0] t_stop = dataset.gti.time_stop[-1] if t_start >= (time_min - atol) and t_stop <= (time_max + atol): datasets.append(dataset) return self.__class__(datasets)
[docs] def slice_by_energy(self, energy_min, energy_max): """Select and slice datasets in energy range The method keeps the current dataset names. Datasets, that do not contribute to the selected energy range are dismissed. Parameters ---------- energy_min, energy_max : `~astropy.units.Quantity` Energy bounds to compute the flux point for. Returns ------- datasets : Datasets Datasets """ datasets = [] for dataset in self: try: dataset_sliced = dataset.slice_by_energy( energy_min=energy_min, energy_max=energy_max, name=dataset.name, ) except ValueError: log.info( f"Dataset {dataset.name} does not contribute in the energy range" ) continue datasets.append(dataset_sliced) return self.__class__(datasets=datasets)
[docs] def to_spectrum_datasets(self, region): """Extract spectrum datasets for the given region. Parameters ---------- region : `~regions.SkyRegion` Region definition. Returns -------- datasets : `Datasets` List of `~gammapy.datasets.SpectrumDataset` """ datasets = Datasets() for dataset in self: spectrum_dataset = dataset.to_spectrum_dataset( on_region=region, name=dataset.name ) datasets.append(spectrum_dataset) return datasets
@property # TODO: make this a method to support different methods? def energy_ranges(self): """Get global energy range of datasets. The energy range is derived as the minimum / maximum of the energy ranges of all datasets. Returns ------- energy_min, energy_max : `~astropy.units.Quantity` Energy range. """ energy_mins, energy_maxs = [], [] for dataset in self: energy_axis = dataset.counts.geom.axes["energy"] energy_mins.append(energy_axis.edges[0]) energy_maxs.append(energy_axis.edges[-1]) return u.Quantity(energy_mins), u.Quantity(energy_maxs) def __str__(self): str_ = self.__class__.__name__ + "\n" str_ += "--------\n\n" for idx, dataset in enumerate(self): str_ += f"Dataset {idx}: \n\n" str_ += f"\tType : {dataset.tag}\n" str_ += f"\tName : {dataset.name}\n" try: instrument = set(dataset.meta_table["TELESCOP"]).pop() except (KeyError, TypeError): instrument = "" str_ += f"\tInstrument : {instrument}\n" if dataset.models: names = dataset.models.names else: names = "" str_ += f"\tModels : {names}\n\n" return str_.expandtabs(tabsize=2)
[docs] def copy(self): """A deep copy.""" return copy.deepcopy(self)
[docs] @classmethod def read(cls, filename, filename_models=None, lazy=True, cache=True): """De-serialize datasets from YAML and FITS files. Parameters ---------- filename : str or `Path` File path or name of datasets yaml file filename_models : str or `Path` File path or name of models fyaml ile lazy : bool Whether to lazy load data into memory cache : bool Whether to cache the data after loading. Returns ------- dataset : `gammapy.datasets.Datasets` Datasets """ from . import DATASET_REGISTRY filename = make_path(filename) data_list = read_yaml(filename) datasets = [] for data in data_list["datasets"]: path = filename.parent if (path / data["filename"]).exists(): data["filename"] = str(make_path(path / data["filename"])) dataset_cls = DATASET_REGISTRY.get_cls(data["type"]) dataset = dataset_cls.from_dict(data, lazy=lazy, cache=cache) datasets.append(dataset) datasets = cls(datasets) if filename_models: datasets.models = Models.read(filename_models) return datasets
[docs] def write( self, filename, filename_models=None, overwrite=False, write_covariance=True ): """Serialize datasets to YAML and FITS files. Parameters ---------- filename : str or `Path` File path or name of datasets yaml file filename_models : str or `Path` File path or name of models yaml file overwrite : bool overwrite datasets FITS files write_covariance : bool save covariance or not """ path = make_path(filename) data = {"datasets": []} for dataset in self._datasets: d = dataset.to_dict() filename = d["filename"] dataset.write(path.parent / filename, overwrite=overwrite) data["datasets"].append(d) write_yaml(data, path, sort_keys=False) if filename_models: self.models.write( filename_models, overwrite=overwrite, write_covariance=write_covariance, )
[docs] def stack_reduce(self, name=None, nan_to_num=True): """Reduce the Datasets to a unique Dataset by stacking them together. This works only if all Dataset are of the same type and if a proper in-place stack method exists for the Dataset type. Parameters ---------- name : str Name of the stacked dataset. nan_to_num: bool Non-finite values are replaced by zero if True (default). Returns ------- dataset : `~gammapy.datasets.Dataset` the stacked dataset """ if not self.is_all_same_type: raise ValueError( "Stacking impossible: all Datasets contained are not of a unique type." ) stacked = self[0].to_masked(name=name, nan_to_num=nan_to_num) for dataset in self[1:]: stacked.stack(dataset, nan_to_num=nan_to_num) return stacked
[docs] def info_table(self, cumulative=False): """Get info table for datasets. Parameters ---------- cumulative : bool Cumulate info across all observations Returns ------- info_table : `~astropy.table.Table` Info table. """ if not self.is_all_same_type: raise ValueError("Info table not supported for mixed dataset type.") stacked = self[0].to_masked(name="stacked") rows = [stacked.info_dict()] for dataset in self[1:]: if cumulative: stacked.stack(dataset) row = stacked.info_dict() else: row = dataset.info_dict() rows.append(row) return table_from_row_data(rows=rows)
# TODO: merge with meta table? @property def gti(self): """GTI table""" time_intervals = [] for dataset in self: if dataset.gti is not None and len(dataset.gti.table) > 0: interval = (dataset.gti.time_start[0], dataset.gti.time_stop[-1]) time_intervals.append(interval) if len(time_intervals) == 0: return None return GTI.from_time_intervals(time_intervals) @property def meta_table(self): """Meta table""" tables = [d.meta_table for d in self] if np.all([table is None for table in tables]): meta_table = Table() else: meta_table = vstack(tables) meta_table.add_column([d.tag for d in self], index=0, name="TYPE") meta_table.add_column(self.names, index=0, name="NAME") return meta_table def __getitem__(self, key): return self._datasets[self.index(key)] def __delitem__(self, key): del self._datasets[self.index(key)] def __setitem__(self, key, dataset): if isinstance(dataset, Dataset): if dataset.name in self.names: raise (ValueError("Dataset names must be unique")) self._datasets[self.index(key)] = dataset else: raise TypeError(f"Invalid type: {type(dataset)!r}")
[docs] def insert(self, idx, dataset): if isinstance(dataset, Dataset): if dataset.name in self.names: raise (ValueError("Dataset names must be unique")) self._datasets.insert(idx, dataset) else: raise TypeError(f"Invalid type: {type(dataset)!r}")
[docs] def index(self, key): if isinstance(key, (int, slice)): return key elif isinstance(key, str): return self.names.index(key) elif isinstance(key, Dataset): return self._datasets.index(key) else: raise TypeError(f"Invalid type: {type(key)!r}")
def __len__(self): return len(self._datasets)