# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
from copy import deepcopy
import numpy as np
from scipy import stats
from astropy.io.registry import IORegistryError
from astropy.table import Table, vstack
from astropy.time import Time
from astropy.visualization import quantity_support
import matplotlib.pyplot as plt
from gammapy.maps import MapAxis, Maps, RegionNDMap, TimeMapAxis
from gammapy.maps.axes import flat_if_equal
from gammapy.modeling.models import TemplateSpectralModel
from gammapy.modeling.models.spectral import scale_plot_flux
from gammapy.modeling.scipy import stat_profile_ul_scipy
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_standardise_units_copy
from gammapy.utils.time import time_ref_to_dict
from ..map.core import DEFAULT_UNIT, FluxMaps
__all__ = ["FluxPoints"]
log = logging.getLogger(__name__)
[docs]class FluxPoints(FluxMaps):
"""Flux points container.
The supported formats are described here: :ref:`gadf:flux-points`
In summary, the following formats and minimum required columns are:
* Format ``dnde``: columns ``e_ref`` and ``dnde``
* Format ``e2dnde``: columns ``e_ref``, ``e2dnde``
* Format ``flux``: columns ``e_min``, ``e_max``, ``flux``
* Format ``eflux``: columns ``e_min``, ``e_max``, ``eflux``
Parameters
----------
table : `~astropy.table.Table`
Table with flux point data
Attributes
----------
table : `~astropy.table.Table`
Table with flux point data
Examples
--------
The `FluxPoints` object is most easily created by reading a file with
flux points given in one of the formats documented above::
>>> from gammapy.estimators import FluxPoints
>>> filename = '$GAMMAPY_DATA/hawc_crab/HAWC19_flux_points.fits'
>>> flux_points = FluxPoints.read(filename)
>>> flux_points.plot() #doctest: +SKIP
An instance of `FluxPoints` can also be created by passing an instance of
`astropy.table.Table`, which contains the required columns, such as `'e_ref'`
and `'dnde'`. The corresponding `sed_type` has to be defined in the meta data
of the table::
>>> import numpy as np
>>> from astropy import units as u
>>> from astropy.table import Table
>>> from gammapy.estimators import FluxPoints
>>> from gammapy.modeling.models import PowerLawSpectralModel
>>> table = Table()
>>> pwl = PowerLawSpectralModel()
>>> e_ref = np.geomspace(1, 100, 7) * u.TeV
>>> table["e_ref"] = e_ref
>>> table["dnde"] = pwl(e_ref)
>>> table["dnde_err"] = pwl.evaluate_error(e_ref)[0]
>>> table.meta["SED_TYPE"] = "dnde"
>>> flux_points = FluxPoints.from_table(table)
>>> flux_points.plot(sed_type="flux") #doctest: +SKIP
If you have flux points in a different data format, the format can be changed
by renaming the table columns and adding meta data::
>>> from astropy import units as u
>>> from astropy.table import Table
>>> from gammapy.estimators import FluxPoints
>>> from gammapy.utils.scripts import make_path
>>> filename = make_path('$GAMMAPY_DATA/tests/spectrum/flux_points/flux_points_ctb_37b.txt')
>>> table = Table.read(filename ,format='ascii.csv', delimiter=' ', comment='#')
>>> table.rename_column('Differential_Flux', 'dnde')
>>> table['dnde'].unit = 'cm-2 s-1 TeV-1'
>>> table.rename_column('lower_error', 'dnde_errn')
>>> table['dnde_errn'].unit = 'cm-2 s-1 TeV-1'
>>> table.rename_column('upper_error', 'dnde_errp')
>>> table['dnde_errp'].unit = 'cm-2 s-1 TeV-1'
>>> table.rename_column('E', 'e_ref')
>>> table['e_ref'].unit = 'TeV'
>>> flux_points = FluxPoints.from_table(table, sed_type="dnde")
>>> flux_points.plot(sed_type="e2dnde") #doctest: +SKIP
Note: In order to reproduce the example you need the tests datasets folder.
You may download it with the command
``gammapy download datasets --tests --out $GAMMAPY_DATA``
"""
[docs] @classmethod
def read(
cls, filename, sed_type=None, format="gadf-sed", reference_model=None, **kwargs
):
"""Read precomputed flux points.
Parameters
----------
filename : str
Filename
sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"}
Sed type
format : {"gadf-sed", "lightcurve"}
Format string.
reference_model : `SpectralModel`
Reference spectral model
**kwargs : dict
Keyword arguments passed to `astropy.table.Table.read`.
Returns
-------
flux_points : `FluxPoints`
Flux points
"""
filename = make_path(filename)
try:
table = Table.read(filename, **kwargs)
except IORegistryError:
kwargs.setdefault("format", "ascii.ecsv")
table = Table.read(filename, **kwargs)
return cls.from_table(
table=table,
sed_type=sed_type,
reference_model=reference_model,
format=format,
)
[docs] def write(self, filename, sed_type=None, format="gadf-sed", **kwargs):
"""Write flux points.
Parameters
----------
filename : str
Filename
sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"}
Sed type
format : {"gadf-sed", "lightcurve", "binned-time-series", "profile"}
Format specification. The following formats are supported:
* "gadf-sed": format for sed flux points see :ref:`gadf:flux-points`
for details
* "lightcurve": Gammapy internal format to store energy dependent
lightcurves. Basically a generalisation of the "gadf" format, but
currently there is no detailed documentation available.
* "binned-time-series": table format support by Astropy's
`~astropy.timeseries.BinnedTimeSeries`.
* "profile": Gammapy internal format to store energy dependent
flux profiles. Basically a generalisation of the "gadf" format, but
currently there is no detailed documentation available.
**kwargs : dict
Keyword arguments passed to `astropy.table.Table.write`.
"""
if sed_type is None:
sed_type = self.sed_type_init
filename = make_path(filename)
table = self.to_table(sed_type=sed_type, format=format)
table.write(filename, **kwargs)
@staticmethod
def _convert_loglike_columns(table):
# TODO: check sign and factor 2 here
# https://github.com/gammapy/gammapy/pull/2546#issuecomment-554274318
# The idea below is to support the format here:
# https://gamma-astro-data-formats.readthedocs.io/en/latest/spectra/flux_points/index.html#likelihood-columns
# but internally to go to the uniform "stat"
if "loglike" in table.colnames and "stat" not in table.colnames:
table["stat"] = 2 * table["loglike"]
if "loglike_null" in table.colnames and "stat_null" not in table.colnames:
table["stat_null"] = 2 * table["loglike_null"]
if "dloglike_scan" in table.colnames and "stat_scan" not in table.colnames:
table["stat_scan"] = 2 * table["dloglike_scan"]
return table
[docs] @classmethod
def from_table(
cls, table, sed_type=None, format="gadf-sed", reference_model=None, gti=None
):
"""Create flux points from a table. The table column names must be consistent with the
sed_type
Parameters
----------
table : `~astropy.table.Table`
Table
sed_type : {"dnde", "flux", "eflux", "e2dnde", "likelihood"}
Sed type
format : {"gadf-sed", "lightcurve", "profile"}
Table format.
reference_model : `SpectralModel`
Reference spectral model
gti : `GTI`
Good time intervals
meta : dict
Meta data.
Returns
-------
flux_points : `FluxPoints`
Flux points
"""
table = table_standardise_units_copy(table)
if sed_type is None:
sed_type = table.meta.get("SED_TYPE", None)
if sed_type is None:
sed_type = cls._guess_sed_type(table.colnames)
if sed_type is None:
raise ValueError("Specifying the sed type is required")
if sed_type == "likelihood":
table = cls._convert_loglike_columns(table)
if reference_model is None:
reference_model = TemplateSpectralModel(
energy=flat_if_equal(table["e_ref"].quantity),
values=flat_if_equal(table["ref_dnde"].quantity),
)
maps = Maps()
table.meta.setdefault("SED_TYPE", sed_type)
for name in cls.all_quantities(sed_type=sed_type):
if name in table.colnames:
maps[name] = RegionNDMap.from_table(
table=table, colname=name, format=format
)
meta = cls._get_meta_gadf(table)
return cls.from_maps(
maps=maps,
reference_model=reference_model,
meta=meta,
sed_type=sed_type,
gti=gti,
)
@staticmethod
def _get_meta_gadf(table):
meta = {}
meta.update(table.meta)
conf_ul = table.meta.get("UL_CONF")
if conf_ul:
n_sigma_ul = np.round(stats.norm.isf(0.5 * (1 - conf_ul)), 1)
meta["n_sigma_ul"] = n_sigma_ul
meta["sed_type_init"] = table.meta.get("SED_TYPE")
return meta
@staticmethod
def _format_table(table):
"""Format table"""
for column in table.colnames:
if column.startswith(("dnde", "eflux", "flux", "e2dnde", "ref")):
table[column].format = ".3e"
elif column.startswith(
("e_min", "e_max", "e_ref", "sqrt_ts", "norm", "ts", "stat")
):
table[column].format = ".3f"
return table
[docs] def to_table(self, sed_type=None, format="gadf-sed", formatted=False):
"""Create table for a given SED type.
Parameters
----------
sed_type : {"likelihood", "dnde", "e2dnde", "flux", "eflux"}
Sed type to convert to. Default is `likelihood`
format : {"gadf-sed", "lightcurve", "binned-time-series", "profile"}
Format specification. The following formats are supported:
* "gadf-sed": format for sed flux points see :ref:`gadf:flux-points`
for details
* "lightcurve": Gammapy internal format to store energy dependent
lightcurves. Basically a generalisation of the "gadf" format, but
currently there is no detailed documentation available.
* "binned-time-series": table format support by Astropy's
`~astropy.timeseries.BinnedTimeSeries`.
* "profile": Gammapy internal format to store energy dependent
flux profiles. Basically a generalisation of the "gadf" format, but
currently there is no detailed documentation available.
formatted : bool
Formatted version with column formats applied. Numerical columns are
formatted to .3f and .3e respectively.
Returns
-------
table : `~astropy.table.Table`
Flux points table
Examples
--------
This is how to read and plot example flux points:
>>> from gammapy.estimators import FluxPoints
>>> fp = FluxPoints.read("$GAMMAPY_DATA/hawc_crab/HAWC19_flux_points.fits")
>>> table = fp.to_table(sed_type="flux", format="gadf-sed", formatted=True)
>>> print(table[:2])
e_ref e_min e_max flux flux_err flux_ul ts sqrt_ts is_ul
TeV TeV TeV 1 / (cm2 s) 1 / (cm2 s) 1 / (cm2 s)
----- ----- ----- ----------- ----------- ----------- -------- ------- -----
1.334 1.000 1.780 1.423e-11 3.135e-13 nan 2734.000 52.288 False
2.372 1.780 3.160 5.780e-12 1.082e-13 nan 4112.000 64.125 False
"""
if sed_type is None:
sed_type = self.sed_type_init
if format == "gadf-sed":
# TODO: what to do with GTI info?
if not self.geom.axes.names == ["energy"]:
raise ValueError(
"Only flux points with a single energy axis "
"can be converted to 'gadf-sed'"
)
idx = (Ellipsis, 0, 0)
table = self.energy_axis.to_table(format="gadf-sed")
table.meta["SED_TYPE"] = sed_type
if not self.is_convertible_to_flux_sed_type:
table.remove_columns(["e_min", "e_max"])
if self.n_sigma_ul:
table.meta["UL_CONF"] = np.round(
1 - 2 * stats.norm.sf(self.n_sigma_ul), 7
)
if sed_type == "likelihood":
table["ref_dnde"] = self.dnde_ref[idx]
table["ref_flux"] = self.flux_ref[idx]
table["ref_eflux"] = self.eflux_ref[idx]
for quantity in self.all_quantities(sed_type=sed_type):
data = getattr(self, quantity, None)
if data:
table[quantity] = data.quantity[idx]
if self.has_stat_profiles:
norm_axis = self.stat_scan.geom.axes["norm"]
table["norm_scan"] = norm_axis.center.reshape((1, -1))
table["stat"] = self.stat.data[idx]
table["stat_scan"] = self.stat_scan.data[idx]
table["is_ul"] = self.is_ul.data[idx]
elif format == "lightcurve":
time_axis = self.geom.axes["time"]
tables = []
for idx, (time_min, time_max) in enumerate(time_axis.iter_by_edges):
table_flat = Table()
table_flat["time_min"] = [time_min.mjd]
table_flat["time_max"] = [time_max.mjd]
fp = self.slice_by_idx(slices={"time": idx})
table = fp.to_table(sed_type=sed_type, format="gadf-sed")
for column in table.columns:
table_flat[column] = table[column][np.newaxis]
tables.append(table_flat)
table = vstack(tables)
# serialize with reference time set to mjd=0.0
ref_time = Time(0.0, format="mjd", scale=time_axis.reference_time.scale)
table.meta.update(time_ref_to_dict(ref_time, scale=ref_time.scale))
table.meta["TIMEUNIT"] = "d"
elif format == "binned-time-series":
message = (
"Format 'binned-time-series' support a single time axis "
f"only. Got {self.geom.axes.names}"
)
if not self.geom.axes.is_unidimensional:
raise ValueError(message)
axis = self.geom.axes.primary_axis
if not isinstance(axis, TimeMapAxis):
raise ValueError(message)
table = Table()
table["time_bin_start"] = axis.time_min
table["time_bin_size"] = axis.time_delta
for quantity in self.all_quantities(sed_type=sed_type):
data = getattr(self, quantity, None)
if data:
table[quantity] = data.quantity.squeeze()
elif format == "profile":
x_axis = self.geom.axes["projected-distance"]
tables = []
for idx, (x_min, x_max) in enumerate(x_axis.iter_by_edges):
table_flat = Table()
table_flat["x_min"] = [x_min]
table_flat["x_max"] = [x_max]
table_flat["x_ref"] = [(x_max + x_min) / 2]
fp = self.slice_by_idx(slices={"projected-distance": idx})
table = fp.to_table(sed_type=sed_type, format="gadf-sed")
for column in table.columns:
table_flat[column] = table[column][np.newaxis]
tables.append(table_flat)
table = vstack(tables)
else:
raise ValueError(f"Not a supported format {format}")
if formatted:
table = self._format_table(table=table)
return table
@staticmethod
def _energy_ref_lafferty(model, energy_min, energy_max):
"""Helper for `to_sed_type`.
Compute energy_ref that the value at energy_ref corresponds
to the mean value between energy_min and energy_max.
"""
flux = model.integral(energy_min, energy_max)
dnde_mean = flux / (energy_max - energy_min)
return model.inverse(dnde_mean)
def _plot_get_flux_err(self, sed_type=None):
"""Compute flux error for given sed type"""
y_errn, y_errp = None, None
if "norm_err" in self.available_quantities:
# symmetric error
y_errn = getattr(self, sed_type + "_err")
y_errp = y_errn.copy()
if "norm_errp" in self.available_quantities:
y_errn = getattr(self, sed_type + "_errn")
y_errp = getattr(self, sed_type + "_errp")
return y_errn, y_errp
[docs] def plot(self, ax=None, sed_type=None, energy_power=0, **kwargs):
"""Plot flux points.
Parameters
----------
ax : `~matplotlib.axes.Axes`
Axis object to plot on.
sed_type : {"dnde", "flux", "eflux", "e2dnde"}
Sed type
energy_power : float
Power of energy to multiply flux axis with
**kwargs : dict
Keyword arguments passed to `~RegionNDMap.plot`
Returns
-------
ax : `~matplotlib.axes.Axes`
Axis object
"""
if sed_type is None:
sed_type = self.sed_type_plot_default
if not self.norm.geom.is_region:
raise ValueError("Plotting only supported for region based flux points")
if ax is None:
ax = plt.gca()
flux_unit = DEFAULT_UNIT[sed_type]
flux = getattr(self, sed_type)
# get errors and ul
y_errn, y_errp = self._plot_get_flux_err(sed_type=sed_type)
is_ul = self.is_ul.data
if self.has_ul and y_errn and is_ul.any():
flux_ul = getattr(self, sed_type + "_ul").quantity
y_errn.data[is_ul] = np.clip(
0.5 * flux_ul[is_ul].to_value(y_errn.unit), 0, np.inf
)
y_errp.data[is_ul] = 0
flux.data[is_ul] = flux_ul[is_ul].to_value(flux.unit)
kwargs.setdefault("uplims", is_ul)
# set flux points plotting defaults
if y_errp and y_errn:
y_errp = np.clip(
scale_plot_flux(y_errp, energy_power=energy_power).quantity, 0, np.inf
)
y_errn = np.clip(
scale_plot_flux(y_errn, energy_power=energy_power).quantity, 0, np.inf
)
kwargs.setdefault("yerr", (y_errn, y_errp))
else:
kwargs.setdefault("yerr", None)
flux = scale_plot_flux(flux=flux.to_unit(flux_unit), energy_power=energy_power)
ax = flux.plot(ax=ax, **kwargs)
ax.set_ylabel(f"{sed_type} [{ax.yaxis.units}]")
ax.set_yscale("log")
return ax
[docs] def plot_ts_profiles(
self,
ax=None,
sed_type=None,
add_cbar=True,
**kwargs,
):
"""Plot fit statistic SED profiles as a density plot.
Parameters
----------
ax : `~matplotlib.axes.Axes`
Axis object to plot on.
sed_type : {"dnde", "flux", "eflux", "e2dnde"}
Sed type
add_cbar : bool
Whether to add a colorbar to the plot.
**kwargs : dict
Keyword arguments passed to `~matplotlib.pyplot.pcolormesh`
Returns
-------
ax : `~matplotlib.axes.Axes`
Axis object
"""
if ax is None:
ax = plt.gca()
if sed_type is None:
sed_type = self.sed_type_plot_default
if not self.norm.geom.is_region:
raise ValueError("Plotting only supported for region based flux points")
if not self.geom.axes.is_unidimensional:
raise ValueError(
"Profile plotting is only supported for unidimensional maps"
)
axis = self.geom.axes.primary_axis
if isinstance(axis, TimeMapAxis) and not axis.is_contiguous:
axis = axis.to_contiguous()
if ax.yaxis.units is None:
yunits = DEFAULT_UNIT[sed_type]
else:
yunits = ax.yaxis.units
ax.yaxis.set_units(yunits)
flux_ref = getattr(self, sed_type + "_ref").to(yunits)
ts = self.ts_scan
norm_min, norm_max = ts.geom.axes["norm"].bounds.to_value("")
flux = MapAxis.from_bounds(
norm_min * flux_ref.value.min(),
norm_max * flux_ref.value.max(),
nbin=500,
interp=axis.interp,
unit=flux_ref.unit,
)
norm = flux.center / flux_ref.reshape((-1, 1))
coords = ts.geom.get_coord()
coords["norm"] = norm
coords[axis.name] = axis.center.reshape((-1, 1))
z = ts.interp_by_coord(coords, values_scale="stat-profile")
kwargs.setdefault("vmax", 0)
kwargs.setdefault("vmin", -4)
kwargs.setdefault("zorder", 0)
kwargs.setdefault("cmap", "Blues")
kwargs.setdefault("linewidths", 0)
kwargs.setdefault("shading", "auto")
# clipped values are set to NaN so that they appear white on the plot
z[-z < kwargs["vmin"]] = np.nan
with quantity_support():
caxes = ax.pcolormesh(axis.as_plot_edges, flux.edges, -z.T, **kwargs)
axis.format_plot_xaxis(ax=ax)
ax.set_ylabel(f"{sed_type} [{ax.yaxis.units}]")
ax.set_yscale("log")
if add_cbar:
label = "Fit statistic difference"
ax.figure.colorbar(caxes, ax=ax, label=label)
return ax
[docs] def recompute_ul(self, n_sigma_ul=2, **kwargs):
"""Recompute upper limits corresponding to the given value.
The pre-computed stat profiles must exist for the re-computation.
Parameters
----------
n_sigma_ul : int
Number of sigma to use for upper limit computation. Default is 2.
**kwargs : dict
Keyword arguments passed to `~scipy.optimize.brentq`.
Returns
-------
flux_points : `FluxPoints`
A new FluxPoints object with modified upper limits
Examples
--------
>>> from gammapy.estimators import FluxPoints
>>> filename = '$GAMMAPY_DATA/tests/spectrum/flux_points/binlike.fits'
>>> flux_points = FluxPoints.read(filename)
>>> flux_points_recomputed = flux_points.recompute_ul(n_sigma_ul=3)
>>> print(flux_points.meta["n_sigma_ul"], flux_points.flux_ul.data[0])
2.0 [[3.95451985e-09]]
>>> print(flux_points_recomputed.meta["n_sigma_ul"], flux_points_recomputed.flux_ul.data[0])
3 [[6.22245374e-09]]
"""
if not self.has_stat_profiles:
raise ValueError(
"Stat profiles not present. Upper limit computation is not possible"
)
delta_ts = n_sigma_ul**2
flux_points = deepcopy(self)
value_scan = self.stat_scan.geom.axes["norm"].center
shape_axes = self.stat_scan.geom._shape[slice(3, None)]
for idx in np.ndindex(shape_axes):
stat_scan = np.abs(
self.stat_scan.data[idx].squeeze() - self.stat.data[idx].squeeze()
)
flux_points.norm_ul.data[idx] = stat_profile_ul_scipy(
value_scan, stat_scan, delta_ts=delta_ts, **kwargs
)
flux_points.meta["n_sigma_ul"] = n_sigma_ul
return flux_points