Source code for gammapy.estimators.points.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
from copy import deepcopy
import numpy as np
from scipy import stats
from astropy.io.registry import IORegistryError
from astropy.table import Table, vstack
from astropy.visualization import quantity_support
import matplotlib.pyplot as plt
from gammapy.maps import MapAxis, Maps, RegionNDMap, TimeMapAxis
from gammapy.maps.axes import flat_if_equal
from gammapy.modeling.models import TemplateSpectralModel
from gammapy.modeling.models.spectral import scale_plot_flux
from gammapy.modeling.scipy import stat_profile_ul_scipy
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_standardise_units_copy
from ..map.core import DEFAULT_UNIT, FluxMaps

__all__ = ["FluxPoints"]

log = logging.getLogger(__name__)


[docs]class FluxPoints(FluxMaps): """Flux points container. The supported formats are described here: :ref:`gadf:flux-points` In summary, the following formats and minimum required columns are: * Format ``dnde``: columns ``e_ref`` and ``dnde`` * Format ``e2dnde``: columns ``e_ref``, ``e2dnde`` * Format ``flux``: columns ``e_min``, ``e_max``, ``flux`` * Format ``eflux``: columns ``e_min``, ``e_max``, ``eflux`` Parameters ---------- table : `~astropy.table.Table` Table with flux point data Attributes ---------- table : `~astropy.table.Table` Table with flux point data Examples -------- The `FluxPoints` object is most easily created by reading a file with flux points given in one of the formats documented above:: >>> from gammapy.estimators import FluxPoints >>> filename = '$GAMMAPY_DATA/hawc_crab/HAWC19_flux_points.fits' >>> flux_points = FluxPoints.read(filename) >>> flux_points.plot() #doctest: +SKIP An instance of `FluxPoints` can also be created by passing an instance of `astropy.table.Table`, which contains the required columns, such as `'e_ref'` and `'dnde'`. The corresponding `sed_type` has to be defined in the meta data of the table:: >>> import numpy as np >>> from astropy import units as u >>> from astropy.table import Table >>> from gammapy.estimators import FluxPoints >>> from gammapy.modeling.models import PowerLawSpectralModel >>> table = Table() >>> pwl = PowerLawSpectralModel() >>> e_ref = np.geomspace(1, 100, 7) * u.TeV >>> table["e_ref"] = e_ref >>> table["dnde"] = pwl(e_ref) >>> table["dnde_err"] = pwl.evaluate_error(e_ref)[0] >>> table.meta["SED_TYPE"] = "dnde" >>> flux_points = FluxPoints.from_table(table) >>> flux_points.plot(sed_type="flux") #doctest: +SKIP If you have flux points in a different data format, the format can be changed by renaming the table columns and adding meta data:: >>> from astropy import units as u >>> from astropy.table import Table >>> from gammapy.estimators import FluxPoints >>> from gammapy.utils.scripts import make_path >>> filename = make_path('$GAMMAPY_DATA/tests/spectrum/flux_points/flux_points_ctb_37b.txt') >>> table = Table.read(filename ,format='ascii.csv', delimiter=' ', comment='#') >>> table.rename_column('Differential_Flux', 'dnde') >>> table['dnde'].unit = 'cm-2 s-1 TeV-1' >>> table.rename_column('lower_error', 'dnde_errn') >>> table['dnde_errn'].unit = 'cm-2 s-1 TeV-1' >>> table.rename_column('upper_error', 'dnde_errp') >>> table['dnde_errp'].unit = 'cm-2 s-1 TeV-1' >>> table.rename_column('E', 'e_ref') >>> table['e_ref'].unit = 'TeV' >>> flux_points = FluxPoints.from_table(table, sed_type="dnde") >>> flux_points.plot(sed_type="e2dnde") #doctest: +SKIP Note: In order to reproduce the example you need the tests datasets folder. You may download it with the command ``gammapy download datasets --tests --out $GAMMAPY_DATA`` """
[docs] @classmethod def read( cls, filename, sed_type=None, format="gadf-sed", reference_model=None, **kwargs ): """Read precomputed flux points. Parameters ---------- filename : str Filename sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"} Sed type format : {"gadf-sed", "lightcurve"} Format string. reference_model : `SpectralModel` Reference spectral model **kwargs : dict Keyword arguments passed to `astropy.table.Table.read`. Returns ------- flux_points : `FluxPoints` Flux points """ filename = make_path(filename) try: table = Table.read(filename, **kwargs) except IORegistryError: kwargs.setdefault("format", "ascii.ecsv") table = Table.read(filename, **kwargs) return cls.from_table( table=table, sed_type=sed_type, reference_model=reference_model, format=format, )
[docs] def write(self, filename, sed_type=None, format="gadf-sed", **kwargs): """Write flux points. Parameters ---------- filename : str Filename sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"} Sed type format : {"gadf-sed", "lightcurve", "binned-time-series", "profile"} Format specification. The following formats are supported: * "gadf-sed": format for sed flux points see :ref:`gadf:flux-points` for details * "lightcurve": Gammapy internal format to store energy dependent lightcurves. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. * "binned-time-series": table format support by Astropy's `~astropy.timeseries.BinnedTimeSeries`. * "profile": Gammapy internal format to store energy dependent flux profiles. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. **kwargs : dict Keyword arguments passed to `astropy.table.Table.write`. """ if sed_type is None: sed_type = self.sed_type_init filename = make_path(filename) table = self.to_table(sed_type=sed_type, format=format) table.write(filename, **kwargs)
@staticmethod def _convert_loglike_columns(table): # TODO: check sign and factor 2 here # https://github.com/gammapy/gammapy/pull/2546#issuecomment-554274318 # The idea below is to support the format here: # https://gamma-astro-data-formats.readthedocs.io/en/latest/spectra/flux_points/index.html#likelihood-columns # but internally to go to the uniform "stat" if "loglike" in table.colnames and "stat" not in table.colnames: table["stat"] = 2 * table["loglike"] if "loglike_null" in table.colnames and "stat_null" not in table.colnames: table["stat_null"] = 2 * table["loglike_null"] if "dloglike_scan" in table.colnames and "stat_scan" not in table.colnames: table["stat_scan"] = 2 * table["dloglike_scan"] return table
[docs] @classmethod def from_table( cls, table, sed_type=None, format="gadf-sed", reference_model=None, gti=None ): """Create flux points from a table. The table column names must be consistent with the sed_type Parameters ---------- table : `~astropy.table.Table` Table sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"} Sed type format : {"gadf-sed", "lightcurve", "profile"} Table format. reference_model : `SpectralModel` Reference spectral model gti : `GTI` Good time intervals meta : dict Meta data. Returns ------- flux_points : `FluxPoints` Flux points """ table = table_standardise_units_copy(table) if sed_type is None: sed_type = table.meta.get("SED_TYPE", None) if sed_type is None: sed_type = cls._guess_sed_type(table.colnames) if sed_type is None: raise ValueError("Specifying the sed type is required") if sed_type == "likelihood": table = cls._convert_loglike_columns(table) if reference_model is None: reference_model = TemplateSpectralModel( energy=flat_if_equal(table["e_ref"].quantity), values=flat_if_equal(table["ref_dnde"].quantity), ) maps = Maps() table.meta.setdefault("SED_TYPE", sed_type) for name in cls.all_quantities(sed_type=sed_type): if name in table.colnames: maps[name] = RegionNDMap.from_table( table=table, colname=name, format=format ) meta = cls._get_meta_gadf(table) return cls.from_maps( maps=maps, reference_model=reference_model, meta=meta, sed_type=sed_type, gti=gti, )
@staticmethod def _get_meta_gadf(table): meta = {} meta.update(table.meta) conf_ul = table.meta.get("UL_CONF") if conf_ul: n_sigma_ul = np.round(stats.norm.isf(0.5 * (1 - conf_ul)), 1) meta["n_sigma_ul"] = n_sigma_ul meta["sed_type_init"] = table.meta.get("SED_TYPE") return meta @staticmethod def _format_table(table): """Format table""" for column in table.colnames: if column.startswith(("dnde", "eflux", "flux", "e2dnde", "ref")): table[column].format = ".3e" elif column.startswith( ("e_min", "e_max", "e_ref", "sqrt_ts", "norm", "ts", "stat") ): table[column].format = ".3f" return table
[docs] def to_table(self, sed_type=None, format="gadf-sed", formatted=False): """Create table for a given SED type. Parameters ---------- sed_type : {"likelihood", "dnde", "e2dnde", "flux", "eflux"} Sed type to convert to. Default is `likelihood` format : {"gadf-sed", "lightcurve", "binned-time-series", "profile"} Format specification. The following formats are supported: * "gadf-sed": format for sed flux points see :ref:`gadf:flux-points` for details * "lightcurve": Gammapy internal format to store energy dependent lightcurves. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. * "binned-time-series": table format support by Astropy's `~astropy.timeseries.BinnedTimeSeries`. * "profile": Gammapy internal format to store energy dependent flux profiles. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. formatted : bool Formatted version with column formats applied. Numerical columns are formatted to .3f and .3e respectively. Returns ------- table : `~astropy.table.Table` Flux points table Examples -------- This is how to read and plot example flux points: >>> from gammapy.estimators import FluxPoints >>> fp = FluxPoints.read("$GAMMAPY_DATA/hawc_crab/HAWC19_flux_points.fits") >>> table = fp.to_table(sed_type="flux", format="gadf-sed", formatted=True) >>> print(table[:2]) e_ref e_min e_max flux flux_err flux_ul ts sqrt_ts is_ul TeV TeV TeV 1 / (cm2 s) 1 / (cm2 s) 1 / (cm2 s) ----- ----- ----- ----------- ----------- ----------- -------- ------- ----- 1.334 1.000 1.780 1.423e-11 3.135e-13 nan 2734.000 52.288 False 2.372 1.780 3.160 5.780e-12 1.082e-13 nan 4112.000 64.125 False """ if sed_type is None: sed_type = self.sed_type_init if format == "gadf-sed": # TODO: what to do with GTI info? if not self.geom.axes.names == ["energy"]: raise ValueError( "Only flux points with a single energy axis " "can be converted to 'gadf-sed'" ) idx = (Ellipsis, 0, 0) table = self.energy_axis.to_table(format="gadf-sed") table.meta["SED_TYPE"] = sed_type if not self.is_convertible_to_flux_sed_type: table.remove_columns(["e_min", "e_max"]) if self.n_sigma_ul: table.meta["UL_CONF"] = np.round( 1 - 2 * stats.norm.sf(self.n_sigma_ul), 7 ) if sed_type == "likelihood": table["ref_dnde"] = self.dnde_ref[idx] table["ref_flux"] = self.flux_ref[idx] table["ref_eflux"] = self.eflux_ref[idx] for quantity in self.all_quantities(sed_type=sed_type): data = getattr(self, quantity, None) if data: table[quantity] = data.quantity[idx] if self.has_stat_profiles: norm_axis = self.stat_scan.geom.axes["norm"] table["norm_scan"] = norm_axis.center.reshape((1, -1)) table["stat"] = self.stat.data[idx] table["stat_scan"] = self.stat_scan.data[idx] table["is_ul"] = self.is_ul.data[idx] elif format == "lightcurve": time_axis = self.geom.axes["time"] tables = [] for idx, (time_min, time_max) in enumerate(time_axis.iter_by_edges): table_flat = Table() table_flat["time_min"] = [time_min.mjd] table_flat["time_max"] = [time_max.mjd] fp = self.slice_by_idx(slices={"time": idx}) table = fp.to_table(sed_type=sed_type, format="gadf-sed") for column in table.columns: table_flat[column] = table[column][np.newaxis] tables.append(table_flat) table = vstack(tables) elif format == "binned-time-series": message = ( "Format 'binned-time-series' support a single time axis " f"only. Got {self.geom.axes.names}" ) if not self.geom.axes.is_unidimensional: raise ValueError(message) axis = self.geom.axes.primary_axis if not isinstance(axis, TimeMapAxis): raise ValueError(message) table = Table() table["time_bin_start"] = axis.time_min table["time_bin_size"] = axis.time_delta for quantity in self.all_quantities(sed_type=sed_type): data = getattr(self, quantity, None) if data: table[quantity] = data.quantity.squeeze() elif format == "profile": x_axis = self.geom.axes["projected-distance"] tables = [] for idx, (x_min, x_max) in enumerate(x_axis.iter_by_edges): table_flat = Table() table_flat["x_min"] = [x_min] table_flat["x_max"] = [x_max] table_flat["x_ref"] = [(x_max + x_min) / 2] fp = self.slice_by_idx(slices={"projected-distance": idx}) table = fp.to_table(sed_type=sed_type, format="gadf-sed") for column in table.columns: table_flat[column] = table[column][np.newaxis] tables.append(table_flat) table = vstack(tables) else: raise ValueError(f"Not a supported format {format}") if formatted: table = self._format_table(table=table) return table
@staticmethod def _energy_ref_lafferty(model, energy_min, energy_max): """Helper for `to_sed_type`. Compute energy_ref that the value at energy_ref corresponds to the mean value between energy_min and energy_max. """ flux = model.integral(energy_min, energy_max) dnde_mean = flux / (energy_max - energy_min) return model.inverse(dnde_mean) def _plot_get_flux_err(self, sed_type=None): """Compute flux error for given sed type""" y_errn, y_errp = None, None if "norm_err" in self.available_quantities: # symmetric error y_errn = getattr(self, sed_type + "_err") y_errp = y_errn.copy() if "norm_errp" in self.available_quantities: y_errn = getattr(self, sed_type + "_errn") y_errp = getattr(self, sed_type + "_errp") return y_errn, y_errp
[docs] def plot(self, ax=None, sed_type=None, energy_power=0, **kwargs): """Plot flux points. Parameters ---------- ax : `~matplotlib.axes.Axes` Axis object to plot on. sed_type : {"dnde", "flux", "eflux", "e2dnde"} Sed type energy_power : float Power of energy to multiply flux axis with **kwargs : dict Keyword arguments passed to `~RegionNDMap.plot` Returns ------- ax : `~matplotlib.axes.Axes` Axis object """ if sed_type is None: sed_type = self.sed_type_plot_default if not self.norm.geom.is_region: raise ValueError("Plotting only supported for region based flux points") if ax is None: ax = plt.gca() flux_unit = DEFAULT_UNIT[sed_type] flux = getattr(self, sed_type) # get errors and ul y_errn, y_errp = self._plot_get_flux_err(sed_type=sed_type) is_ul = self.is_ul.data if self.has_ul and y_errn and is_ul.any(): flux_ul = getattr(self, sed_type + "_ul").quantity y_errn.data[is_ul] = 0.5 * flux_ul[is_ul].to_value(y_errn.unit) y_errp.data[is_ul] = 0 flux.data[is_ul] = flux_ul[is_ul].to_value(flux.unit) kwargs.setdefault("uplims", is_ul) # set flux points plotting defaults if y_errp and y_errn: y_errp = scale_plot_flux(y_errp, energy_power=energy_power).quantity y_errn = scale_plot_flux(y_errn, energy_power=energy_power).quantity kwargs.setdefault("yerr", (y_errn, y_errp)) else: kwargs.setdefault("yerr", None) flux = scale_plot_flux(flux=flux.to_unit(flux_unit), energy_power=energy_power) ax = flux.plot(ax=ax, **kwargs) ax.set_ylabel(f"{sed_type} ({ax.yaxis.units})") ax.set_yscale("log") return ax
[docs] def plot_ts_profiles( self, ax=None, sed_type=None, add_cbar=True, **kwargs, ): """Plot fit statistic SED profiles as a density plot. Parameters ---------- ax : `~matplotlib.axes.Axes` Axis object to plot on. sed_type : {"dnde", "flux", "eflux", "e2dnde"} Sed type add_cbar : bool Whether to add a colorbar to the plot. **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.pcolormesh` Returns ------- ax : `~matplotlib.axes.Axes` Axis object """ if ax is None: ax = plt.gca() if sed_type is None: sed_type = self.sed_type_plot_default if not self.norm.geom.is_region: raise ValueError("Plotting only supported for region based flux points") if not self.geom.axes.is_unidimensional: raise ValueError( "Profile plotting is only supported for unidimensional maps" ) axis = self.geom.axes.primary_axis if isinstance(axis, TimeMapAxis) and not axis.is_contiguous: axis = axis.to_contiguous() if ax.yaxis.units is None: yunits = DEFAULT_UNIT[sed_type] else: yunits = ax.yaxis.units ax.yaxis.set_units(yunits) flux_ref = getattr(self, sed_type + "_ref").to(yunits) ts = self.ts_scan norm_min, norm_max = ts.geom.axes["norm"].bounds.to_value("") flux = MapAxis.from_bounds( norm_min * flux_ref.value.min(), norm_max * flux_ref.value.max(), nbin=500, interp=axis.interp, unit=flux_ref.unit, ) norm = flux.center / flux_ref.reshape((-1, 1)) coords = ts.geom.get_coord() coords["norm"] = norm coords[axis.name] = axis.center.reshape((-1, 1)) z = ts.interp_by_coord(coords, values_scale="stat-profile") kwargs.setdefault("vmax", 0) kwargs.setdefault("vmin", -4) kwargs.setdefault("zorder", 0) kwargs.setdefault("cmap", "Blues") kwargs.setdefault("linewidths", 0) kwargs.setdefault("shading", "auto") # clipped values are set to NaN so that they appear white on the plot z[-z < kwargs["vmin"]] = np.nan with quantity_support(): caxes = ax.pcolormesh(axis.as_plot_edges, flux.edges, -z.T, **kwargs) axis.format_plot_xaxis(ax=ax) ax.set_ylabel(f"{sed_type} ({ax.yaxis.units})") ax.set_yscale("log") if add_cbar: label = "Fit statistic difference" ax.figure.colorbar(caxes, ax=ax, label=label) return ax
[docs] def recompute_ul(self, n_sigma_ul=2, **kwargs): """Recompute upper limits corresponding to the given value. The pre-computed stat profiles must exist for the re-computation. Parameters ---------- n_sigma_ul : int Number of sigma to use for upper limit computation. Default is 2. **kwargs : dict Keyword arguments passed to `~scipy.optimize.brentq`. Returns ------- flux_points : `FluxPoints` A new FluxPoints object with modified upper limits Examples -------- >>> from gammapy.estimators import FluxPoints >>> filename = '$GAMMAPY_DATA/tests/spectrum/flux_points/binlike.fits' >>> flux_points = FluxPoints.read(filename) >>> flux_points_recomputed = flux_points.recompute_ul(n_sigma_ul=3) >>> print(flux_points.meta["n_sigma_ul"], flux_points.flux_ul.data[0]) 2.0 [[3.95451985e-09]] >>> print(flux_points_recomputed.meta["n_sigma_ul"], flux_points_recomputed.flux_ul.data[0]) 3 [[6.22245374e-09]] """ if not self.has_stat_profiles: raise ValueError( "Stat profiles not present. Upper limit computation is not possible" ) delta_ts = n_sigma_ul**2 flux_points = deepcopy(self) value_scan = self.stat_scan.geom.axes["norm"].center shape_axes = self.stat_scan.geom._shape[slice(3, None)] for idx in np.ndindex(shape_axes): stat_scan = np.abs( self.stat_scan.data[idx].squeeze() - self.stat.data[idx].squeeze() ) flux_points.norm_ul.data[idx] = stat_profile_ul_scipy( value_scan, stat_scan, delta_ts=delta_ts, **kwargs ) flux_points.meta["n_sigma_ul"] = n_sigma_ul return flux_points