Source code for gammapy.estimators.points.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
from copy import deepcopy
import numpy as np
from scipy import stats
from astropy.io.registry import IORegistryError
from astropy.table import Table, vstack
from astropy.time import Time
from astropy.visualization import quantity_support
import matplotlib.pyplot as plt
from gammapy.maps import MapAxis, Maps, RegionNDMap, TimeMapAxis
from gammapy.maps.axes import flat_if_equal
from gammapy.modeling.models import TemplateSpectralModel
from gammapy.modeling.models.spectral import scale_plot_flux
from gammapy.modeling.scipy import stat_profile_ul_scipy
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_standardise_units_copy
from gammapy.utils.time import time_ref_to_dict
from ..map.core import DEFAULT_UNIT, FluxMaps

__all__ = ["FluxPoints"]

log = logging.getLogger(__name__)


[docs]class FluxPoints(FluxMaps): """Flux points container. The supported formats are described here: :ref:`gadf:flux-points` In summary, the following formats and minimum required columns are: * Format ``dnde``: columns ``e_ref`` and ``dnde`` * Format ``e2dnde``: columns ``e_ref``, ``e2dnde`` * Format ``flux``: columns ``e_min``, ``e_max``, ``flux`` * Format ``eflux``: columns ``e_min``, ``e_max``, ``eflux`` Parameters ---------- table : `~astropy.table.Table` Table with flux point data Attributes ---------- table : `~astropy.table.Table` Table with flux point data Examples -------- The `FluxPoints` object is most easily created by reading a file with flux points given in one of the formats documented above:: >>> from gammapy.estimators import FluxPoints >>> filename = '$GAMMAPY_DATA/hawc_crab/HAWC19_flux_points.fits' >>> flux_points = FluxPoints.read(filename) >>> flux_points.plot() #doctest: +SKIP An instance of `FluxPoints` can also be created by passing an instance of `astropy.table.Table`, which contains the required columns, such as `'e_ref'` and `'dnde'`. The corresponding `sed_type` has to be defined in the meta data of the table:: >>> import numpy as np >>> from astropy import units as u >>> from astropy.table import Table >>> from gammapy.estimators import FluxPoints >>> from gammapy.modeling.models import PowerLawSpectralModel >>> table = Table() >>> pwl = PowerLawSpectralModel() >>> e_ref = np.geomspace(1, 100, 7) * u.TeV >>> table["e_ref"] = e_ref >>> table["dnde"] = pwl(e_ref) >>> table["dnde_err"] = pwl.evaluate_error(e_ref)[0] >>> table.meta["SED_TYPE"] = "dnde" >>> flux_points = FluxPoints.from_table(table) >>> flux_points.plot(sed_type="flux") #doctest: +SKIP If you have flux points in a different data format, the format can be changed by renaming the table columns and adding meta data:: >>> from astropy import units as u >>> from astropy.table import Table >>> from gammapy.estimators import FluxPoints >>> from gammapy.utils.scripts import make_path >>> filename = make_path('$GAMMAPY_DATA/tests/spectrum/flux_points/flux_points_ctb_37b.txt') >>> table = Table.read(filename ,format='ascii.csv', delimiter=' ', comment='#') >>> table.rename_column('Differential_Flux', 'dnde') >>> table['dnde'].unit = 'cm-2 s-1 TeV-1' >>> table.rename_column('lower_error', 'dnde_errn') >>> table['dnde_errn'].unit = 'cm-2 s-1 TeV-1' >>> table.rename_column('upper_error', 'dnde_errp') >>> table['dnde_errp'].unit = 'cm-2 s-1 TeV-1' >>> table.rename_column('E', 'e_ref') >>> table['e_ref'].unit = 'TeV' >>> flux_points = FluxPoints.from_table(table, sed_type="dnde") >>> flux_points.plot(sed_type="e2dnde") #doctest: +SKIP Note: In order to reproduce the example you need the tests datasets folder. You may download it with the command ``gammapy download datasets --tests --out $GAMMAPY_DATA`` """
[docs] @classmethod def read( cls, filename, sed_type=None, format="gadf-sed", reference_model=None, **kwargs ): """Read precomputed flux points. Parameters ---------- filename : str Filename sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"} Sed type format : {"gadf-sed", "lightcurve"} Format string. reference_model : `SpectralModel` Reference spectral model **kwargs : dict Keyword arguments passed to `astropy.table.Table.read`. Returns ------- flux_points : `FluxPoints` Flux points """ filename = make_path(filename) try: table = Table.read(filename, **kwargs) except IORegistryError: kwargs.setdefault("format", "ascii.ecsv") table = Table.read(filename, **kwargs) return cls.from_table( table=table, sed_type=sed_type, reference_model=reference_model, format=format, )
[docs] def write(self, filename, sed_type=None, format="gadf-sed", **kwargs): """Write flux points. Parameters ---------- filename : str Filename sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"} Sed type format : {"gadf-sed", "lightcurve", "binned-time-series", "profile"} Format specification. The following formats are supported: * "gadf-sed": format for sed flux points see :ref:`gadf:flux-points` for details * "lightcurve": Gammapy internal format to store energy dependent lightcurves. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. * "binned-time-series": table format support by Astropy's `~astropy.timeseries.BinnedTimeSeries`. * "profile": Gammapy internal format to store energy dependent flux profiles. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. **kwargs : dict Keyword arguments passed to `astropy.table.Table.write`. """ if sed_type is None: sed_type = self.sed_type_init filename = make_path(filename) table = self.to_table(sed_type=sed_type, format=format) table.write(filename, **kwargs)
@staticmethod def _convert_loglike_columns(table): # TODO: check sign and factor 2 here # https://github.com/gammapy/gammapy/pull/2546#issuecomment-554274318 # The idea below is to support the format here: # https://gamma-astro-data-formats.readthedocs.io/en/latest/spectra/flux_points/index.html#likelihood-columns # but internally to go to the uniform "stat" if "loglike" in table.colnames and "stat" not in table.colnames: table["stat"] = 2 * table["loglike"] if "loglike_null" in table.colnames and "stat_null" not in table.colnames: table["stat_null"] = 2 * table["loglike_null"] if "dloglike_scan" in table.colnames and "stat_scan" not in table.colnames: table["stat_scan"] = 2 * table["dloglike_scan"] return table
[docs] @classmethod def from_table( cls, table, sed_type=None, format="gadf-sed", reference_model=None, gti=None ): """Create flux points from a table. The table column names must be consistent with the sed_type Parameters ---------- table : `~astropy.table.Table` Table sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"} Sed type format : {"gadf-sed", "lightcurve", "profile"} Table format. reference_model : `SpectralModel` Reference spectral model gti : `GTI` Good time intervals meta : dict Meta data. Returns ------- flux_points : `FluxPoints` Flux points """ table = table_standardise_units_copy(table) if sed_type is None: sed_type = table.meta.get("SED_TYPE", None) if sed_type is None: sed_type = cls._guess_sed_type(table.colnames) if sed_type is None: raise ValueError("Specifying the sed type is required") if sed_type == "likelihood": table = cls._convert_loglike_columns(table) if reference_model is None: reference_model = TemplateSpectralModel( energy=flat_if_equal(table["e_ref"].quantity), values=flat_if_equal(table["ref_dnde"].quantity), ) maps = Maps() table.meta.setdefault("SED_TYPE", sed_type) for name in cls.all_quantities(sed_type=sed_type): if name in table.colnames: maps[name] = RegionNDMap.from_table( table=table, colname=name, format=format ) meta = cls._get_meta_gadf(table) return cls.from_maps( maps=maps, reference_model=reference_model, meta=meta, sed_type=sed_type, gti=gti, )
@staticmethod def _get_meta_gadf(table): meta = {} meta.update(table.meta) conf_ul = table.meta.get("UL_CONF") if conf_ul: n_sigma_ul = np.round(stats.norm.isf(0.5 * (1 - conf_ul)), 1) meta["n_sigma_ul"] = n_sigma_ul meta["sed_type_init"] = table.meta.get("SED_TYPE") return meta @staticmethod def _format_table(table): """Format table""" for column in table.colnames: if column.startswith(("dnde", "eflux", "flux", "e2dnde", "ref")): table[column].format = ".3e" elif column.startswith( ("e_min", "e_max", "e_ref", "sqrt_ts", "norm", "ts", "stat") ): table[column].format = ".3f" return table
[docs] def to_table(self, sed_type=None, format="gadf-sed", formatted=False): """Create table for a given SED type. Parameters ---------- sed_type : {"likelihood", "dnde", "e2dnde", "flux", "eflux"} Sed type to convert to. Default is `likelihood` format : {"gadf-sed", "lightcurve", "binned-time-series", "profile"} Format specification. The following formats are supported: * "gadf-sed": format for sed flux points see :ref:`gadf:flux-points` for details * "lightcurve": Gammapy internal format to store energy dependent lightcurves. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. * "binned-time-series": table format support by Astropy's `~astropy.timeseries.BinnedTimeSeries`. * "profile": Gammapy internal format to store energy dependent flux profiles. Basically a generalisation of the "gadf" format, but currently there is no detailed documentation available. formatted : bool Formatted version with column formats applied. Numerical columns are formatted to .3f and .3e respectively. Returns ------- table : `~astropy.table.Table` Flux points table Examples -------- This is how to read and plot example flux points: >>> from gammapy.estimators import FluxPoints >>> fp = FluxPoints.read("$GAMMAPY_DATA/hawc_crab/HAWC19_flux_points.fits") >>> table = fp.to_table(sed_type="flux", format="gadf-sed", formatted=True) >>> print(table[:2]) e_ref e_min e_max flux flux_err flux_ul ts sqrt_ts is_ul TeV TeV TeV 1 / (cm2 s) 1 / (cm2 s) 1 / (cm2 s) ----- ----- ----- ----------- ----------- ----------- -------- ------- ----- 1.334 1.000 1.780 1.423e-11 3.135e-13 nan 2734.000 52.288 False 2.372 1.780 3.160 5.780e-12 1.082e-13 nan 4112.000 64.125 False """ if sed_type is None: sed_type = self.sed_type_init if format == "gadf-sed": # TODO: what to do with GTI info? if not self.geom.axes.names == ["energy"]: raise ValueError( "Only flux points with a single energy axis " "can be converted to 'gadf-sed'" ) idx = (Ellipsis, 0, 0) table = self.energy_axis.to_table(format="gadf-sed") table.meta["SED_TYPE"] = sed_type if not self.is_convertible_to_flux_sed_type: table.remove_columns(["e_min", "e_max"]) if self.n_sigma_ul: table.meta["UL_CONF"] = np.round( 1 - 2 * stats.norm.sf(self.n_sigma_ul), 7 ) if sed_type == "likelihood": table["ref_dnde"] = self.dnde_ref[idx] table["ref_flux"] = self.flux_ref[idx] table["ref_eflux"] = self.eflux_ref[idx] for quantity in self.all_quantities(sed_type=sed_type): data = getattr(self, quantity, None) if data: table[quantity] = data.quantity[idx] if self.has_stat_profiles: norm_axis = self.stat_scan.geom.axes["norm"] table["norm_scan"] = norm_axis.center.reshape((1, -1)) table["stat"] = self.stat.data[idx] table["stat_scan"] = self.stat_scan.data[idx] table["is_ul"] = self.is_ul.data[idx] elif format == "lightcurve": time_axis = self.geom.axes["time"] tables = [] for idx, (time_min, time_max) in enumerate(time_axis.iter_by_edges): table_flat = Table() table_flat["time_min"] = [time_min.mjd] table_flat["time_max"] = [time_max.mjd] fp = self.slice_by_idx(slices={"time": idx}) table = fp.to_table(sed_type=sed_type, format="gadf-sed") for column in table.columns: table_flat[column] = table[column][np.newaxis] tables.append(table_flat) table = vstack(tables) # serialize with reference time set to mjd=0.0 ref_time = Time(0.0, format="mjd", scale=time_axis.reference_time.scale) table.meta.update(time_ref_to_dict(ref_time, scale=ref_time.scale)) table.meta["TIMEUNIT"] = "d" elif format == "binned-time-series": message = ( "Format 'binned-time-series' support a single time axis " f"only. Got {self.geom.axes.names}" ) if not self.geom.axes.is_unidimensional: raise ValueError(message) axis = self.geom.axes.primary_axis if not isinstance(axis, TimeMapAxis): raise ValueError(message) table = Table() table["time_bin_start"] = axis.time_min table["time_bin_size"] = axis.time_delta for quantity in self.all_quantities(sed_type=sed_type): data = getattr(self, quantity, None) if data: table[quantity] = data.quantity.squeeze() elif format == "profile": x_axis = self.geom.axes["projected-distance"] tables = [] for idx, (x_min, x_max) in enumerate(x_axis.iter_by_edges): table_flat = Table() table_flat["x_min"] = [x_min] table_flat["x_max"] = [x_max] table_flat["x_ref"] = [(x_max + x_min) / 2] fp = self.slice_by_idx(slices={"projected-distance": idx}) table = fp.to_table(sed_type=sed_type, format="gadf-sed") for column in table.columns: table_flat[column] = table[column][np.newaxis] tables.append(table_flat) table = vstack(tables) else: raise ValueError(f"Not a supported format {format}") if formatted: table = self._format_table(table=table) return table
@staticmethod def _energy_ref_lafferty(model, energy_min, energy_max): """Helper for `to_sed_type`. Compute energy_ref that the value at energy_ref corresponds to the mean value between energy_min and energy_max. """ flux = model.integral(energy_min, energy_max) dnde_mean = flux / (energy_max - energy_min) return model.inverse(dnde_mean) def _plot_get_flux_err(self, sed_type=None): """Compute flux error for given sed type""" y_errn, y_errp = None, None if "norm_err" in self.available_quantities: # symmetric error y_errn = getattr(self, sed_type + "_err") y_errp = y_errn.copy() if "norm_errp" in self.available_quantities: y_errn = getattr(self, sed_type + "_errn") y_errp = getattr(self, sed_type + "_errp") return y_errn, y_errp
[docs] def plot(self, ax=None, sed_type=None, energy_power=0, **kwargs): """Plot flux points. Parameters ---------- ax : `~matplotlib.axes.Axes` Axis object to plot on. sed_type : {"dnde", "flux", "eflux", "e2dnde"} Sed type energy_power : float Power of energy to multiply flux axis with **kwargs : dict Keyword arguments passed to `~RegionNDMap.plot` Returns ------- ax : `~matplotlib.axes.Axes` Axis object """ if sed_type is None: sed_type = self.sed_type_plot_default if not self.norm.geom.is_region: raise ValueError("Plotting only supported for region based flux points") if ax is None: ax = plt.gca() flux_unit = DEFAULT_UNIT[sed_type] flux = getattr(self, sed_type) # get errors and ul y_errn, y_errp = self._plot_get_flux_err(sed_type=sed_type) is_ul = self.is_ul.data if self.has_ul and y_errn and is_ul.any(): flux_ul = getattr(self, sed_type + "_ul").quantity y_errn.data[is_ul] = np.clip( 0.5 * flux_ul[is_ul].to_value(y_errn.unit), 0, np.inf ) y_errp.data[is_ul] = 0 flux.data[is_ul] = flux_ul[is_ul].to_value(flux.unit) kwargs.setdefault("uplims", is_ul) # set flux points plotting defaults if y_errp and y_errn: y_errp = np.clip( scale_plot_flux(y_errp, energy_power=energy_power).quantity, 0, np.inf ) y_errn = np.clip( scale_plot_flux(y_errn, energy_power=energy_power).quantity, 0, np.inf ) kwargs.setdefault("yerr", (y_errn, y_errp)) else: kwargs.setdefault("yerr", None) flux = scale_plot_flux(flux=flux.to_unit(flux_unit), energy_power=energy_power) ax = flux.plot(ax=ax, **kwargs) ax.set_ylabel(f"{sed_type} [{ax.yaxis.units}]") ax.set_yscale("log") return ax
[docs] def plot_ts_profiles( self, ax=None, sed_type=None, add_cbar=True, **kwargs, ): """Plot fit statistic SED profiles as a density plot. Parameters ---------- ax : `~matplotlib.axes.Axes` Axis object to plot on. sed_type : {"dnde", "flux", "eflux", "e2dnde"} Sed type add_cbar : bool Whether to add a colorbar to the plot. **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.pcolormesh` Returns ------- ax : `~matplotlib.axes.Axes` Axis object """ if ax is None: ax = plt.gca() if sed_type is None: sed_type = self.sed_type_plot_default if not self.norm.geom.is_region: raise ValueError("Plotting only supported for region based flux points") if not self.geom.axes.is_unidimensional: raise ValueError( "Profile plotting is only supported for unidimensional maps" ) axis = self.geom.axes.primary_axis if isinstance(axis, TimeMapAxis) and not axis.is_contiguous: axis = axis.to_contiguous() if ax.yaxis.units is None: yunits = DEFAULT_UNIT[sed_type] else: yunits = ax.yaxis.units ax.yaxis.set_units(yunits) flux_ref = getattr(self, sed_type + "_ref").to(yunits) ts = self.ts_scan norm_min, norm_max = ts.geom.axes["norm"].bounds.to_value("") flux = MapAxis.from_bounds( norm_min * flux_ref.value.min(), norm_max * flux_ref.value.max(), nbin=500, interp=axis.interp, unit=flux_ref.unit, ) norm = flux.center / flux_ref.reshape((-1, 1)) coords = ts.geom.get_coord() coords["norm"] = norm coords[axis.name] = axis.center.reshape((-1, 1)) z = ts.interp_by_coord(coords, values_scale="stat-profile") kwargs.setdefault("vmax", 0) kwargs.setdefault("vmin", -4) kwargs.setdefault("zorder", 0) kwargs.setdefault("cmap", "Blues") kwargs.setdefault("linewidths", 0) kwargs.setdefault("shading", "auto") # clipped values are set to NaN so that they appear white on the plot z[-z < kwargs["vmin"]] = np.nan with quantity_support(): caxes = ax.pcolormesh(axis.as_plot_edges, flux.edges, -z.T, **kwargs) axis.format_plot_xaxis(ax=ax) ax.set_ylabel(f"{sed_type} [{ax.yaxis.units}]") ax.set_yscale("log") if add_cbar: label = "Fit statistic difference" ax.figure.colorbar(caxes, ax=ax, label=label) return ax
[docs] def recompute_ul(self, n_sigma_ul=2, **kwargs): """Recompute upper limits corresponding to the given value. The pre-computed stat profiles must exist for the re-computation. Parameters ---------- n_sigma_ul : int Number of sigma to use for upper limit computation. Default is 2. **kwargs : dict Keyword arguments passed to `~scipy.optimize.brentq`. Returns ------- flux_points : `FluxPoints` A new FluxPoints object with modified upper limits Examples -------- >>> from gammapy.estimators import FluxPoints >>> filename = '$GAMMAPY_DATA/tests/spectrum/flux_points/binlike.fits' >>> flux_points = FluxPoints.read(filename) >>> flux_points_recomputed = flux_points.recompute_ul(n_sigma_ul=3) >>> print(flux_points.meta["n_sigma_ul"], flux_points.flux_ul.data[0]) 2.0 [[3.95451985e-09]] >>> print(flux_points_recomputed.meta["n_sigma_ul"], flux_points_recomputed.flux_ul.data[0]) 3 [[6.22245374e-09]] """ if not self.has_stat_profiles: raise ValueError( "Stat profiles not present. Upper limit computation is not possible" ) delta_ts = n_sigma_ul**2 flux_points = deepcopy(self) value_scan = self.stat_scan.geom.axes["norm"].center shape_axes = self.stat_scan.geom._shape[slice(3, None)] for idx in np.ndindex(shape_axes): stat_scan = np.abs( self.stat_scan.data[idx].squeeze() - self.stat.data[idx].squeeze() ) flux_points.norm_ul.data[idx] = stat_profile_ul_scipy( value_scan, stat_scan, delta_ts=delta_ts, **kwargs ) flux_points.meta["n_sigma_ul"] = n_sigma_ul return flux_points