Source code for gammapy.datasets.flux_points

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import numpy as np
from scipy.special import erfc
from astropy import units as u
from astropy.io import fits
from astropy.table import Table
from astropy.visualization import quantity_support
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from gammapy.maps.axes import UNIT_STRING_FORMAT, MapAxis
from gammapy.modeling.models import (
    DatasetModels,
    Models,
    SkyModel,
    TemplateSpatialModel,
)
from gammapy.utils.interpolation import interpolate_profile
from gammapy.utils.scripts import make_name, make_path
from .core import Dataset

log = logging.getLogger(__name__)

__all__ = ["FluxPointsDataset"]


def _get_reference_model(model, energy_bounds, margin_percent=70):
    if isinstance(model.spatial_model, TemplateSpatialModel):
        geom = model.spatial_model.map.geom
        emin = energy_bounds[0] * (1 - margin_percent / 100)
        emax = energy_bounds[-1] * (1 + margin_percent / 100)
        energy_axis = MapAxis.from_energy_bounds(
            emin, emax, nbin=20, per_decade=True, name="energy_true"
        )
        geom = geom.to_image().to_cube([energy_axis])
        return Models([model]).to_template_spectral_model(geom)
    else:
        return model.spectral_model


[docs] class FluxPointsDataset(Dataset): """Bundle a set of flux points with a parametric model, to compute fit statistic function using different statistics (see ``stat_type``). For more information see :ref:`datasets`. Parameters ---------- models : `~gammapy.modeling.models.Models` Models (only spectral part needs to be set). data : `~gammapy.estimators.FluxPoints` Flux points. Must be sorted along the energy axis. mask_fit : `numpy.ndarray` Mask to apply for fitting. mask_safe : `numpy.ndarray` Mask defining the safe data range. By default, upper limit values are excluded. meta_table : `~astropy.table.Table` Table listing information on observations used to create the dataset. One line per observation for stacked datasets. stat_type : str Method used to compute the statistics: * chi2 : estimate from chi2 statistics. * profile : estimate from interpolation of the likelihood profile. * distrib : Assuming gaussian errors the likelihood is given by the probability density function of the normal distribution. For the upper limit case it is necessary to marginalize over the unknown measurement, so we integrate the normal distribution up to the upper limit value which gives the complementary error function. See eq. C7 of `Mohanty et al (2013) <https://iopscience.iop.org/article/10.1088/0004-637X/773/2/168/pdf>`__ Default is `chi2`, in that case upper limits are ignored and the mean of asymetrics error is used. However, it is recommended to use `profile` if `stat_scan` is available on flux points. The `distrib` case provides an approximation if the profile is not available. stat_kwargs : dict Extra arguments specifying the interpolation scheme of the likelihood profile. Used only if `stat_type=="profile"`. In that case the default is : `stat_kwargs={"interp_scale":"sqrt", "extrapolate":True}` Examples -------- Load flux points from file and fit with a power-law model:: >>> from gammapy.modeling import Fit >>> from gammapy.modeling.models import PowerLawSpectralModel, SkyModel >>> from gammapy.estimators import FluxPoints >>> from gammapy.datasets import FluxPointsDataset >>> filename = "$GAMMAPY_DATA/tests/spectrum/flux_points/diff_flux_points.fits" >>> dataset = FluxPointsDataset.read(filename) >>> model = SkyModel(spectral_model=PowerLawSpectralModel()) >>> dataset.models = model Make the fit: >>> fit = Fit() >>> result = fit.run([dataset]) >>> print(result) OptimizeResult <BLANKLINE> backend : minuit method : migrad success : True message : Optimization terminated successfully. nfev : 135 total stat : 25.21 <BLANKLINE> CovarianceResult <BLANKLINE> backend : minuit method : hesse success : True message : Hesse terminated successfully. >>> print(result.parameters.to_table()) type name value unit ... frozen link prior ---- --------- ---------- -------------- ... ------ ---- ----- index 2.2159e+00 ... False amplitude 2.1619e-13 TeV-1 s-1 cm-2 ... False reference 1.0000e+00 TeV ... True Note: In order to reproduce the example, you need the tests datasets folder. You may download it with the command: ``gammapy download datasets --tests --out $GAMMAPY_DATA`` """ tag = "FluxPointsDataset" def __init__( self, models=None, data=None, mask_fit=None, mask_safe=None, name=None, meta_table=None, stat_type="chi2", stat_kwargs=None, ): if not data.geom.has_energy_axis: raise ValueError("FluxPointsDataset needs an energy axis") self.data = data self.mask_fit = mask_fit self._name = make_name(name) self.models = models self.meta_table = meta_table self._available_stat_type = dict( chi2=self._stat_array_chi2, profile=self._stat_array_profile, distrib=self._stat_array_distrib, ) if stat_kwargs is None: stat_kwargs = dict() self.stat_kwargs = stat_kwargs self.stat_type = stat_type if mask_safe is None: mask_safe = np.ones(self.data.dnde.data.shape, dtype=bool) self.mask_safe = mask_safe @property def available_stat_type(self): return list(self._available_stat_type.keys()) @property def stat_type(self): return self._stat_type @stat_type.setter def stat_type(self, stat_type): if stat_type not in self.available_stat_type: raise ValueError( f"Invalid stat_type: possible options are {self.available_stat_type}" ) if stat_type == "chi2": self._mask_valid = (~self.data.is_ul).data & np.isfinite(self.data.dnde) elif stat_type == "distrib": self._mask_valid = ( self.data.is_ul.data & np.isfinite(self.data.dnde_ul) ) | np.isfinite(self.data.dnde) elif stat_type == "profile": self.stat_kwargs.setdefault("interp_scale", "sqrt") self.stat_kwargs.setdefault("extrapolate", True) self._profile_interpolators = self._get_valid_profile_interpolators() self._stat_type = stat_type @property def mask_valid(self): return self._mask_valid @property def mask_safe(self): return self._mask_safe & self.mask_valid @mask_safe.setter def mask_safe(self, mask_safe): self._mask_safe = mask_safe @property def name(self): return self._name @property def gti(self): """Good time interval info (`GTI`).""" return self.data.gti @property def models(self): return self._models @models.setter def models(self, models): if models is None: self._models = None else: models = DatasetModels(models) self._models = models.select(datasets_names=self.name)
[docs] def write(self, filename, overwrite=False, checksum=False, **kwargs): """Write flux point dataset to file. Parameters ---------- filename : str Filename to write to. overwrite : bool, optional Overwrite existing file. Default is False. checksum : bool When True adds both DATASUM and CHECKSUM cards to the headers written to the FITS file. Applies only if filename has .fits suffix. Default is False. **kwargs : dict, optional Keyword arguments passed to `~astropy.table.Table.write`. """ table = self.data.to_table() if self.mask_fit is None: mask_fit = self.mask_safe else: mask_fit = self.mask_fit table["mask_fit"] = mask_fit table["mask_safe"] = self.mask_safe filename = make_path(filename) if "fits" in filename.suffixes: primary_hdu = fits.PrimaryHDU() hdu_table = fits.BinTableHDU(table, name="TABLE") fits.HDUList([primary_hdu, hdu_table]).writeto( filename, overwrite=overwrite, checksum=checksum ) else: table.write(make_path(filename), overwrite=overwrite, **kwargs)
[docs] @classmethod def read(cls, filename, name=None): """Read pre-computed flux points and create a dataset. Parameters ---------- filename : str Filename to read from. name : str, optional Name of the new dataset. Default is None. Returns ------- dataset : `FluxPointsDataset` FluxPointsDataset. """ from gammapy.estimators import FluxPoints filename = make_path(filename) table = Table.read(filename) mask_fit = None mask_safe = None if "mask_safe" in table.colnames: mask_safe = table["mask_safe"].data.astype("bool") if "mask_fit" in table.colnames: mask_fit = table["mask_fit"].data.astype("bool") return cls( name=make_name(name), data=FluxPoints.from_table(table), mask_fit=mask_fit, mask_safe=mask_safe, )
[docs] @classmethod def from_dict(cls, data, **kwargs): """Create flux point dataset from dict. Parameters ---------- data : dict Dictionary containing data to create dataset from. Returns ------- dataset : `FluxPointsDataset` Flux point datasets. """ from gammapy.estimators import FluxPoints filename = make_path(data["filename"]) table = Table.read(filename) mask_fit = table["mask_fit"].data.astype("bool") mask_safe = table["mask_safe"].data.astype("bool") table.remove_columns(["mask_fit", "mask_safe"]) return cls( name=data["name"], data=FluxPoints.from_table(table), mask_fit=mask_fit, mask_safe=mask_safe, )
def __str__(self): str_ = f"{self.__class__.__name__}\n" str_ += "-" * len(self.__class__.__name__) + "\n" str_ += "\n" str_ += "\t{:32}: {} \n\n".format("Name", self.name) # data section n_bins = 0 if self.data is not None: n_bins = np.prod(self.data.geom.data_shape) str_ += "\t{:32}: {} \n".format("Number of total flux points", n_bins) n_fit_bins = 0 if self.mask is not None: n_fit_bins = np.sum(self.mask.data) str_ += "\t{:32}: {} \n\n".format("Number of fit bins", n_fit_bins) # likelihood section str_ += "\t{:32}: {}\n".format("Fit statistic type", self.stat_type) stat = np.nan if self.data is not None and self.models is not None: stat = self.stat_sum() str_ += "\t{:32}: {:.2f}\n\n".format("Fit statistic value (-2 log(L))", stat) # model section n_models = 0 if self.models is not None: n_models = len(self.models) str_ += "\t{:32}: {} \n".format("Number of models", n_models) if self.models is not None: str_ += "\t{:32}: {}\n".format( "Number of parameters", len(self.models.parameters) ) str_ += "\t{:32}: {}\n\n".format( "Number of free parameters", len(self.models.parameters.free_parameters) ) str_ += "\t" + "\n\t".join(str(self.models).split("\n")[2:]) return str_.expandtabs(tabsize=2)
[docs] def data_shape(self): """Shape of the flux points data (tuple).""" return self.data.energy_ref.shape
[docs] def flux_pred(self): """Compute predicted flux.""" flux = 0.0 for model in self.models: reference_model = _get_reference_model(model, self._energy_bounds) sky_model = SkyModel( spectral_model=reference_model, temporal_model=model.temporal_model ) flux_model = sky_model.evaluate_geom( self.data.geom.as_energy_true, self.gti ) flux += flux_model return flux
[docs] def stat_array(self): """Fit statistic array.""" return self._available_stat_type[self.stat_type]()
def _stat_array_chi2(self): """Chi2 statistics.""" model = self.flux_pred() data = self.data.dnde.quantity try: sigma = self.data.dnde_err.quantity except AttributeError: sigma = (self.data.dnde_errn + self.data.dnde_errp).quantity / 2 return ((data - model) / sigma).to_value("") ** 2 def _stat_array_profile(self): """Estimate statitistic from interpolation of the likelihood profile.""" model = np.zeros(self.data.dnde.data.shape) + ( self.flux_pred() / self.data.dnde_ref ).to_value("") stat = np.zeros(model.shape) for idx in np.ndindex(self._profile_interpolators.shape): stat[idx] = self._profile_interpolators[idx](model[idx]) return stat def _get_valid_profile_interpolators(self): value_scan = self.data.stat_scan.geom.axes["norm"].center shape_axes = self.data.stat_scan.geom._shape[slice(3, None)][::-1] interpolators = np.empty(shape_axes, dtype=object) self._mask_valid = np.ones(self.data.dnde.data.shape, dtype=bool) for idx in np.ndindex(shape_axes): stat_scan = np.abs( self.data.stat_scan.data[idx].squeeze() - self.data.stat.data[idx].squeeze() ) self._mask_valid[idx] = np.all(np.isfinite(stat_scan)) interpolators[idx] = interpolate_profile( value_scan, stat_scan, interp_scale=self.stat_kwargs["interp_scale"], extrapolate=self.stat_kwargs["extrapolate"], ) return interpolators def _stat_array_distrib(self): """Estimate statistic from probability distributions, assumes that flux points correspond to asymmetric gaussians and upper limits complementary error functions. """ model = np.zeros(self.data.dnde.data.shape) + self.flux_pred().to_value( self.data.dnde.unit ) stat = np.zeros(model.shape) mask_valid = ~np.isnan(self.data.dnde.data) loc = self.data.dnde.data[mask_valid] value = model[mask_valid] try: mask_p = (model >= self.data.dnde.data)[mask_valid] scale = np.zeros(mask_p.shape) scale[mask_p] = self.data.dnde_errp.data[mask_valid][mask_p] scale[~mask_p] = self.data.dnde_errn.data[mask_valid][~mask_p] mask_invalid = np.isnan(scale) scale[mask_invalid] = self.data.dnde_err.data[mask_valid][mask_invalid] except AttributeError: scale = self.data.dnde_err.data[mask_valid] stat[mask_valid] = ((value - loc) / scale) ** 2 mask_ul = self.data.is_ul.data value = model[mask_ul] loc_ul = self.data.dnde_ul.data[mask_ul] scale_ul = self.data.dnde_ul.data[mask_ul] stat[mask_ul] = 2 * np.log( (erfc((loc_ul - value) / scale_ul) / 2) / (erfc((loc_ul - 0) / scale_ul) / 2) ) stat[np.isnan(stat.data)] = 0 return stat
[docs] def residuals(self, method="diff"): """Compute flux point residuals. Parameters ---------- method: {"diff", "diff/model"} Method used to compute the residuals. Available options are: - ``"diff"`` (default): data - model. - ``"diff/model"``: (data - model) / model. Returns ------- residuals : `~numpy.ndarray` Residuals array. """ fp = self.data model = self.flux_pred() residuals = self._compute_residuals(fp.dnde.quantity, model, method) # Remove residuals for upper_limits residuals[fp.is_ul.data] = np.nan return residuals
[docs] def plot_fit( self, ax_spectrum=None, ax_residuals=None, kwargs_spectrum=None, kwargs_residuals=None, ): """Plot flux points, best fit model and residuals in two panels. Calls `~FluxPointsDataset.plot_spectrum` and `~FluxPointsDataset.plot_residuals`. Parameters ---------- ax_spectrum : `~matplotlib.axes.Axes`, optional Axes to plot flux points and best fit model on. Default is None. ax_residuals : `~matplotlib.axes.Axes`, optional Axes to plot residuals on. Default is None. kwargs_spectrum : dict, optional Keyword arguments passed to `~FluxPointsDataset.plot_spectrum`. Default is None. kwargs_residuals : dict, optional Keyword arguments passed to `~FluxPointsDataset.plot_residuals`. Default is None. Returns ------- ax_spectrum, ax_residuals : `~matplotlib.axes.Axes` Flux points, best fit model and residuals plots. Examples -------- >>> from gammapy.modeling.models import PowerLawSpectralModel, SkyModel >>> from gammapy.estimators import FluxPoints >>> from gammapy.datasets import FluxPointsDataset >>> #load precomputed flux points >>> filename = "$GAMMAPY_DATA/tests/spectrum/flux_points/diff_flux_points.fits" >>> flux_points = FluxPoints.read(filename) >>> model = SkyModel(spectral_model=PowerLawSpectralModel()) >>> dataset = FluxPointsDataset(model, flux_points) >>> #configuring optional parameters >>> kwargs_spectrum = {"kwargs_model": {"color":"red", "ls":"--"}, "kwargs_fp":{"color":"green", "marker":"o"}} >>> kwargs_residuals = {"color": "blue", "markersize":4, "marker":'s', } >>> dataset.plot_fit(kwargs_residuals=kwargs_residuals, kwargs_spectrum=kwargs_spectrum) # doctest: +SKIP """ if self.data.geom.ndim > 3: raise ValueError("Plot fit works with only one energy axis") fig = plt.figure(figsize=(9, 7)) gs = GridSpec(7, 1) if ax_spectrum is None: ax_spectrum = fig.add_subplot(gs[:5, :]) if ax_residuals is None: plt.setp(ax_spectrum.get_xticklabels(), visible=False) if ax_residuals is None: ax_residuals = fig.add_subplot(gs[5:, :], sharex=ax_spectrum) kwargs_spectrum = kwargs_spectrum or {} kwargs_residuals = kwargs_residuals or {} kwargs_residuals.setdefault("method", "diff/model") self.plot_spectrum(ax=ax_spectrum, **kwargs_spectrum) self.plot_residuals(ax=ax_residuals, **kwargs_residuals) return ax_spectrum, ax_residuals
@property def _energy_bounds(self): try: return u.Quantity([self.data.energy_min.min(), self.data.energy_max.max()]) except KeyError: return u.Quantity([self.data.energy_ref.min(), self.data.energy_ref.max()]) @property def _energy_unit(self): return self.data.energy_ref.unit
[docs] def plot_residuals(self, ax=None, method="diff", **kwargs): """Plot flux point residuals. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axes to plot on. Default is None. method : {"diff", "diff/model"} Normalization used to compute the residuals, see `FluxPointsDataset.residuals`. Default is "diff". **kwargs : dict Keyword arguments passed to `~matplotlib.axes.Axes.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes` Axes object. """ if self.data.geom.ndim > 3: raise ValueError("Plot residuals works with only one energy axis") ax = ax or plt.gca() fp = self.data residuals = self.residuals(method) xerr = self.data.energy_axis.as_plot_xerr yerr = fp._plot_get_flux_err(sed_type="dnde") if method == "diff/model": model = self.flux_pred() yerr = ( (yerr[0].quantity / model).squeeze(), (yerr[1].quantity / model).squeeze(), ) elif method == "diff": yerr = yerr[0].quantity.squeeze(), yerr[1].quantity.squeeze() else: raise ValueError('Invalid method, choose between "diff" and "diff/model"') kwargs.setdefault("color", kwargs.pop("c", "black")) kwargs.setdefault("marker", "+") kwargs.setdefault("linestyle", kwargs.pop("ls", "none")) with quantity_support(): ax.errorbar( fp.energy_ref, residuals.squeeze(), xerr=xerr, yerr=yerr, **kwargs ) ax.axhline(0, color=kwargs["color"], lw=0.5) # format axes ax.set_xlabel(f"Energy [{self._energy_unit.to_string(UNIT_STRING_FORMAT)}]") ax.set_xscale("log") label = self._residuals_labels[method] ax.set_ylabel(f"Residuals\n {label}") ymin = np.nanmin(residuals - yerr[0]) ymax = np.nanmax(residuals + yerr[1]) ymax = max(abs(ymin), ymax) ax.set_ylim(-1.05 * ymax, 1.05 * ymax) return ax
[docs] def plot_spectrum( self, ax=None, kwargs_fp=None, kwargs_model=None, axis_name="energy" ): """Plot flux points and model. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axes to plot on. Default is None. kwargs_fp : dict, optional Keyword arguments passed to `gammapy.estimators.FluxPoints.plot` to configure the plot style. Default is None. kwargs_model : dict, optional Keyword arguments passed to `gammapy.modeling.models.SpectralModel.plot` and `gammapy.modeling.models.SpectralModel.plot_error` to configure the plot style. Default is None. axis_name : str Axis along which to plot the flux points for multiple axes. Default is energy. Returns ------- ax : `~matplotlib.axes.Axes` Axes object. Examples -------- >>> from gammapy.modeling.models import PowerLawSpectralModel, SkyModel >>> from gammapy.estimators import FluxPoints >>> from gammapy.datasets import FluxPointsDataset >>> #load precomputed flux points >>> filename = "$GAMMAPY_DATA/tests/spectrum/flux_points/diff_flux_points.fits" >>> flux_points = FluxPoints.read(filename) >>> model = SkyModel(spectral_model=PowerLawSpectralModel()) >>> dataset = FluxPointsDataset(model, flux_points) >>> #configuring optional parameters >>> kwargs_model = {"color":"red", "ls":"--"} >>> kwargs_fp = {"color":"green", "marker":"o"} >>> dataset.plot_spectrum(kwargs_fp=kwargs_fp, kwargs_model=kwargs_model) # doctest: +SKIP """ kwargs_fp = (kwargs_fp or {}).copy() kwargs_model = (kwargs_model or {}).copy() # plot flux points kwargs_fp.setdefault("sed_type", "e2dnde") if axis_name == "time": kwargs_fp["sed_type"] = "norm" ax = self.data.plot(ax=ax, **kwargs_fp, axis_name=axis_name) kwargs_model.setdefault("label", "Best fit model") kwargs_model.setdefault("zorder", 10) for model in self.models: if model.datasets_names is None or self.name in model.datasets_names: if axis_name == "energy": kwargs_model.setdefault("sed_type", "e2dnde") kwargs_model.setdefault("energy_bounds", self._energy_bounds) model.spectral_model.plot(ax=ax, **kwargs_model) if axis_name == "time": kwargs_model.setdefault( "time_range", self.data.geom.axes["time"].time_bounds ) model.temporal_model.plot(ax=ax, **kwargs_model) if axis_name == "energy": kwargs_model["color"] = ax.lines[-1].get_color() kwargs_model.pop("label") for model in self.models: if model.datasets_names is None or self.name in model.datasets_names: model.spectral_model.plot_error(ax=ax, **kwargs_model) return ax