Source code for gammapy.spectrum.flux_point

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Differential and integral flux point computations."""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
from collections import OrderedDict
import numpy as np
from astropy.table import Table, vstack
from astropy import units as u
from astropy.io.registry import IORegistryError
from ..utils.scripts import make_path
from ..utils.fits import table_from_row_data

__all__ = [
    'compute_flux_points_dnde',
    'FluxPoints',
    'FluxPointEstimator',
    'FluxPointsFitter',
    'SEDLikelihoodProfile',
]

log = logging.getLogger(__name__)

REQUIRED_COLUMNS = {'dnde': ['e_ref', 'dnde'],
                    'e2dnde': ['e_ref', 'e2dnde'],
                    'flux': ['e_min', 'e_max', 'flux'],
                    'eflux': ['e_min', 'e_max', 'eflux']}

OPTIONAL_COLUMNS = {'dnde': ['dnde_err', 'dnde_errp', 'dnde_errn',
                             'dnde_ul', 'is_ul'],
                    'e2dnde': ['e2dnde_err', 'e2dnde_errp', 'e2dnde_errn',
                               'e2dnde_ul', 'is_ul'],
                    'flux': ['flux_err', 'flux_errp', 'flux_errn',
                             'flux_ul', 'is_ul'],
                    'eflux': ['eflux_err', 'eflux_errp', 'eflux_errn',
                              'eflux_ul', 'is_ul']}

DEFAULT_UNIT = {'dnde': u.Unit('cm-2 s-1 TeV-1'),
                'e2dnde': u.Unit('erg cm-2 s-1'),
                'flux': u.Unit('cm-2 s-1'),
                'eflux': u.Unit('erg cm-2 s-1')}


[docs]class FluxPoints(object): """ Flux point object. For a complete documentation see :ref:`gadf:flux-points`, for an usage example see :ref:`flux-point-computation`. Parameters ---------- table : `~astropy.table.Table` Input data table, with the following minimal required columns: * Format `'dnde'`: `'dnde'` and `'e_ref'` * Format `'flux'`: `'flux'`, `'e_min'`, `'e_max'` * Format `'eflux'`: `'eflux'`, `'e_min'`, `'e_max'` Examples -------- >>> from gammapy.spectrum import FluxPoints >>> filename = '$GAMMAPY_EXTRA/test_datasets/spectrum/flux_points/flux_points.fits' >>> flux_points = FluxPoints.read(filename) >>> flux_points.show() """ def __init__(self, table): # validate that the table is a valid representation of the given # flux point sed type # TODO: this is a temp solution # Make sure we don't have "ph" in units # Should we use a unit equivalency? unit_changes = [ ('ph cm-2 s-1', 'cm-2 s-1'), ('ph cm-2 s-1 TeV-1', 'cm-2 s-1 TeV-1'), ('ph cm-2 s-1 MeV-1', 'cm-2 s-1 MeV-1'), ] for colname in table.colnames: for unit_old, unit_new in unit_changes: if (table[colname].unit is not None) and (u.Unit(table[colname].unit) == u.Unit(unit_old)): table[colname].unit = u.Unit(unit_new) self.table = self._validate_table(table) @property def sed_type(self): """ Flux points sed type. Returns ------- sed_type : str Can be either 'dnde', 'e2dnde', 'flux' or 'eflux'. """ return self.table.meta['SED_TYPE'] @staticmethod def _guess_sed_type(table): """ Guess sed type from table content. """ valid_sed_types = list(REQUIRED_COLUMNS.keys()) for sed_type in valid_sed_types: required = set(REQUIRED_COLUMNS[sed_type]) if required.issubset(table.colnames): return sed_type @staticmethod def _guess_sed_type_from_unit(unit): """ Guess sed type from unit. """ for sed_type, default_unit in DEFAULT_UNIT.items(): if unit.is_equivalent(default_unit): return sed_type def _validate_table(self, table): """ Validate input flux point table. """ table = Table(table) sed_type = table.meta['SED_TYPE'] required = set(REQUIRED_COLUMNS[sed_type]) if not required.issubset(table.colnames): missing = required.difference(table.colnames) raise ValueError("Missing columns for sed type '{0}':" " {1}".format(sed_type, missing)) return table def _get_y_energy_unit(self, y_unit): """ Get energy part of the given y unit. """ try: return [_ for _ in y_unit.bases if _.physical_type == 'energy'][0] except IndexError: return u.Unit('TeV')
[docs] def plot(self, ax=None, sed_type=None, energy_unit='TeV', flux_unit=None, energy_power=0, **kwargs): """ Plot flux points Parameters ---------- ax : `~matplotlib.axes.Axes` Axis object to plot on. sed_type : ['dnde', 'flux', 'eflux'] Which sed type to plot. energy_unit : str, `~astropy.units.Unit`, optional Unit of the energy axis flux_unit : str, `~astropy.units.Unit`, optional Unit of the flux axis energy_power : int Power of energy to multiply y axis with kwargs : dict Keyword arguments passed to :func:`~matplotlib.pyplot.errorbar` Returns ------- ax : `~matplotlib.axes.Axes` Axis object """ import matplotlib.pyplot as plt if ax is None: ax = plt.gca() sed_type = sed_type or self.sed_type y_unit = u.Unit(flux_unit or DEFAULT_UNIT[sed_type]) y = self.table[sed_type].quantity.to(y_unit) x = self.e_ref.to(energy_unit) # get errors and ul is_ul = self._is_ul x_err_all = self.get_energy_err(sed_type) y_err_all = self.get_flux_err(sed_type) # handle energy power e_unit = self._get_y_energy_unit(y_unit) y_unit = y.unit * e_unit ** energy_power y = (y * np.power(x, energy_power)).to(y_unit) y_err, x_err = None, None if y_err_all: y_errn = (y_err_all[0] * np.power(x, energy_power)).to(y_unit) y_errp = (y_err_all[1] * np.power(x, energy_power)).to(y_unit) y_err = (y_errn[~is_ul].to(y_unit).value, y_errp[~is_ul].to(y_unit).value) if x_err_all: x_errn, x_errp = x_err_all x_err = (x_errn[~is_ul].to(energy_unit).value, x_errp[~is_ul].to(energy_unit).value) # set flux points plotting defaults kwargs.setdefault('marker', '+') kwargs.setdefault('ls', 'None') ebar = ax.errorbar(x[~is_ul].value, y[~is_ul].value, yerr=y_err, xerr=x_err, **kwargs) if is_ul.any(): if x_err_all: x_errn, x_errp = x_err_all x_err = (x_errn[is_ul].to(energy_unit).value, x_errp[is_ul].to(energy_unit).value) y_ul = self.table[sed_type + '_ul'].quantity y_ul = (y_ul * np.power(x, energy_power)).to(y_unit) y_err = (0.5 * y_ul[is_ul].value, np.zeros_like(y_ul[is_ul].value)) kwargs.setdefault('c', ebar[0].get_color()) ax.errorbar(x[is_ul].value, y_ul[is_ul].value, xerr=x_err, yerr=y_err, uplims=True, **kwargs) ax.set_xscale('log', nonposx='clip') ax.set_yscale('log', nonposy='clip') ax.set_xlabel('Energy ({})'.format(energy_unit)) ax.set_ylabel('{0} ({1})'.format(self.sed_type, y_unit)) return ax
[docs] def get_energy_err(self, sed_type=None): """Compute energy error for given sed type""" # TODO: sed_type is not used if sed_type is None: sed_type = self.sed_type try: e_min = self.table['e_min'].quantity e_max = self.table['e_max'].quantity e_ref = self.e_ref x_err = ((e_ref - e_min), (e_max - e_ref)) except KeyError: x_err = None return x_err
[docs] def get_flux_err(self, sed_type=None): """Compute flux error for given sed type""" if sed_type is None: sed_type = self.sed_type try: # assymmetric error y_errn = self.table[sed_type + '_errn'].quantity y_errp = self.table[sed_type + '_errp'].quantity y_err = (y_errn, y_errp) except KeyError: try: # symmetric error y_err = self.table[sed_type + '_err'].quantity y_err = (y_err, y_err) except KeyError: # no error at all y_err = None return y_err
@property def _is_ul(self): try: return self.table['is_ul'].data.astype('bool') except KeyError: return np.isnan(self.table[self.sed_type])
[docs] def peek(self, figsize=(8, 5), **kwargs): """ Show flux points. Parameters ---------- figsize : tuple Figure size kwargs : dict Keyword arguments passed to `FluxPoints.plot()`. Returns ------- ax : `~matplotlib.axes.Axes` Plotting axes object. """ import matplotlib.pyplot as plt fig = plt.figure(figsize=figsize) ax = fig.add_subplot(111) self.plot(ax=ax, **kwargs) return ax
def __str__(self): """ String representation of the flux points class. """ info = '' info += "Flux points of type '{}'".format(self.sed_type) return info
[docs] def info(self): """ Print flux points info. """ print(self)
@classmethod
[docs] def read(cls, filename, **kwargs): """ Read flux points. Parameters ---------- filename : str Filename kwargs : dict Keyword arguments passed to `~astropy.table.Table.read`. """ filename = make_path(filename) try: table = Table.read(str(filename), **kwargs) except IORegistryError: kwargs.setdefault('format', 'ascii.ecsv') table = Table.read(str(filename), **kwargs) if 'SED_TYPE' not in table.meta.keys(): sed_type = cls._guess_sed_type(table) table.meta['SED_TYPE'] = sed_type return cls(table=table)
[docs] def write(self, filename, **kwargs): """ Write flux points. Parameters ---------- filename : str Filename kwargs : dict Keyword arguments passed to `~astropy.table.Table.write`. """ filename = make_path(filename) try: self.table.write(str(filename), **kwargs) except IORegistryError: kwargs.setdefault('format', 'ascii.ecsv') self.table.write(str(filename), **kwargs)
# TODO: handle with Energy or EnergyBounds classes? @property def e_ref(self): """ Reference energy. Defined by `e_ref` column in `FluxPoints.table` or computed as log center, if `e_min` and `e_max` columns are present in `FluxPoints.table`. Returns ------- e_ref : `~astropy.units.Quantity` Reference energy. """ try: return self.table['e_ref'].quantity except KeyError: e_ref = np.sqrt(self.e_min * self.e_max) return e_ref # TODO: handle with Energy or EnergyBounds classes? @property def e_min(self): """ Lower bound of energy bin. Defined by `e_min` column in `FluxPoints.table`. Returns ------- e_min : `~astropy.units.Quantity` Lower bound of energy bin. """ return self.table['e_min'].quantity # TODO: handle with Energy or EnergyBounds classes? @property def e_max(self): """ Upper bound of energy bin. Defined by `e_max` column in `FluxPoints.table`. Returns ------- e_max : `~astropy.units.Quantity` Upper bound of energy bin. """ return self.table['e_max'].quantity
[docs] def drop_ul(self): """ Drop upper limit flux points. Examples -------- from gammapy.spectrum import FluxPoints filename = '$GAMMAPY_EXTRA/test_datasets/spectrum/flux_points/flux_points.fits' flux_points = FluxPoints.read(filename) print(flux_points) print(flux_points.drop_ul()) Returns ------- flux_points : `FluxPoints` Flux points with upper limit points removed. """ table = self.table.copy() table_drop_ul = table[~self._is_ul] return self.__class__(table_drop_ul)
@classmethod
[docs] def stack(cls, flux_points): """ Create a new `FluxPoints` object by stacking a list of existing flux points. The first `FluxPoints` object in the list is taken as a reference to infer column names and units for the stacked object. Parameters ---------- flux_points : list of `FluxPoints` objects List of flux points to stack. Returns ------- flux_points : `FluxPoints` Flux points without upper limit points. """ tables = [] reference = flux_points[0].table for _ in flux_points: table = _.table for colname in reference.colnames: column = reference[colname] if column.unit: table[colname] = table[colname].quantity.to(column.unit) tables.append(table[reference.colnames]) table_stacked = vstack(tables) table_stacked.meta['SED_TYPE'] = reference.meta['SED_TYPE'] return cls(table_stacked)
[docs]def compute_flux_points_dnde(flux_points, model, method='lafferty'): """ Compute differential flux points quantities. See: http://adsabs.harvard.edu/abs/1995NIMPA.355..541L for details on the `'lafferty'` method. Parameters ---------- flux_points : `FluxPoints` Input integral flux points. model : `~gammapy.spectrum.SpectralModel` Spectral model assumption. Note that the value of the amplitude parameter does not matter. Still it is recommended to use something with the right scale and units. E.g. `amplitude = 1e-12 * u.Unit('cm-2 s-1 TeV-1')` method : {'lafferty', 'log_center', 'table'} Flux points `e_ref` estimation method: * `'laferty'` Lafferty & Wyatt model-based e_ref * `'log_center'` log bin center e_ref * `'table'` using column 'e_ref' from input flux_points Returns ------- flux_points : `FluxPoints` Flux points including differential quantity columns `dnde` and `dnde_err` (optional), `dnde_ul` (optional). Examples -------- >>> from astropy import units as u >>> from gammapy.spectrum import FluxPoints, compute_flux_points_dnde >>> from gammapy.spectrum.models import PowerLaw >>> filename = '$GAMMAPY_EXTRA/test_datasets/spectrum/flux_points/flux_points.fits' >>> flux_points = FluxPoints.read(filename) >>> model = PowerLaw(2.2 * u.Unit(''), 1e-12 * u.Unit('cm-2 s-1 TeV-1'), 1 * u.TeV) >>> flux_point_dnde = compute_flux_points_dnde(flux_points, model=model) """ input_table = flux_points.table flux = input_table['flux'].quantity e_min = flux_points.e_min e_max = flux_points.e_max # Compute e_ref if method == 'table': e_ref = input_table['e_ref'].quantity elif method == 'log_center': e_ref = np.sqrt(e_min * e_max) elif method == 'lafferty': # set e_ref that it represents the mean dnde in the given energy bin e_ref = _e_ref_lafferty(model, e_min, e_max) else: raise ValueError('Invalid method: {0}'.format(method)) dnde = _dnde_from_flux(flux, model, e_ref, e_min, e_max) # Add to result table table = input_table.copy() table['e_ref'] = e_ref table['dnde'] = dnde if 'flux_err' in table.colnames: # TODO: implement better error handling, e.g. MC based method table['dnde_err'] = dnde * table['flux_err'].quantity / flux if 'flux_errn' in table.colnames: table['dnde_errn'] = dnde * table['flux_errn'].quantity / flux table['dnde_errp'] = dnde * table['flux_errp'].quantity / flux if 'flux_ul' in table.colnames: flux_ul = table['flux_ul'].quantity dnde_ul = _dnde_from_flux(flux_ul, model, e_ref, e_min, e_max) table['dnde_ul'] = dnde_ul table.meta['SED_TYPE'] = 'dnde' return FluxPoints(table)
def _e_ref_lafferty(model, e_min, e_max): # compute e_ref that the value at e_ref corresponds to the mean value # between e_min and e_max flux = model.integral(e_min, e_max) dnde_mean = flux / (e_max - e_min) return model.inverse(dnde_mean) def _dnde_from_flux(flux, model, e_ref, e_min, e_max): # Compute dnde under the assumption that flux equals expected # flux from model flux_model = model.integral(e_min, e_max) dnde_model = model(e_ref) return dnde_model * (flux / flux_model)
[docs]class FluxPointEstimator(object): """ Flux point estimator. Parameters ---------- obs : `~gammapy.spectrum.SpectrumObservation` Spectrum observation groups : `~gammapy.spectrum.SpectrumEnergyGroups` Energy groups (usually output of `~gammapy.spectrum.SpectrumEnergyGroupsMaker`) model : `~gammapy.spectrum.models.SpectralModel` Global model (usually output of `~gammapy.spectrum.SpectrumFit`) """ def __init__(self, obs, groups, model): self.obs = obs self.groups = groups self.model = model self.flux_points = None def __str__(self): s = 'FluxPointEstimator:\n' s += str(self.obs) + '\n' s += str(self.groups) + '\n' s += str(self.model) + '\n' return s
[docs] def compute_points(self): meta = OrderedDict( METHOD='TODO', SED_TYPE='dnde' ) rows = [] for group in self.groups: if group.bin_type != 'normal': log.debug('Skipping energy group:\n{}'.format(group)) continue row = self.compute_flux_point(group) rows.append(row) self.flux_points = FluxPoints(table_from_row_data(rows=rows, meta=meta))
[docs] def compute_flux_point(self, energy_group): log.debug('Computing flux point for energy group:\n{}'.format(energy_group)) model = self.compute_approx_model( global_model=self.model, energy_range=energy_group.energy_range, ) energy_ref = self.compute_energy_ref(energy_group) return self.fit_point( model=model, energy_group=energy_group, energy_ref=energy_ref, )
[docs] def compute_energy_ref(self, energy_group): return energy_group.energy_range.log_center
@staticmethod
[docs] def compute_approx_model(global_model, energy_range): """ Compute approximate model, to be used in the energy bin. TODO: At the moment just the global model with fixed parameters is returned """ # binning = EnergyBounds(binning) # low_bins = binning.lower_bounds # high_bins = binning.upper_bounds # # from sherpa.models import PowLaw1D # # if isinstance(model, models.PowerLaw): # temp = model.to_sherpa() # temp.gamma.freeze() # sherpa_models = [temp] * binning.nbins # else: # sherpa_models = [None] * binning.nbins # # for low, high, sherpa_model in zip(low_bins, high_bins, sherpa_models): # log.info('Computing flux points in bin [{}, {}]'.format(low, high)) # # # Make PowerLaw approximation for higher order models # if sherpa_model is None: # flux_low = model(low) # flux_high = model(high) # index = powerlaw.power_law_g_from_points(e1=low, e2=high, # f1=flux_low, # f2=flux_high) # # log.debug('Approximated power law index: {}'.format(index)) # sherpa_model = PowLaw1D('powlaw1d.default') # sherpa_model.gamma = index # sherpa_model.gamma.freeze() # sherpa_model.ref = model.parameters.reference.to('keV') # sherpa_model.ampl = 1e-20 # return PowerLaw( # index=u.Quantity(2, ''), # amplitude=u.Quantity(1, 'm-2 s-1 TeV-1'), # reference=u.Quantity(1, 'TeV'), # ) approx_model = global_model.copy() for par in approx_model.parameters.parameters: if par.name != 'amplitude': par.frozen = True return approx_model
[docs] def fit_point(self, model, energy_group, energy_ref): from .fit import SpectrumFit fit = SpectrumFit(self.obs, model) erange = energy_group.energy_range # TODO: Notice channels contained in energy_group fit.fit_range = erange.min, erange.max log.debug( 'Calling Sherpa fit for flux point ' ' in energy range:\n{}'.format(fit) ) fit.fit() fit.est_errors() # First result contain correct model res = fit.result[0] e_max = energy_group.energy_range.max e_min = energy_group.energy_range.min diff_flux, diff_flux_err = res.model.evaluate_error(energy_ref) return OrderedDict( e_ref=energy_ref, e_min=e_min, e_max=e_max, dnde=diff_flux.to('m-2 s-1 TeV-1'), dnde_err=diff_flux_err.to('m-2 s-1 TeV-1'), )
[docs]class SEDLikelihoodProfile(object): """SED likelihood profile. See :ref:`gadf:likelihood_sed`. TODO: merge this class with the classes in ``fermipy/castro.py``, which are much more advanced / feature complete. This is just a temp solution because we don't have time for that. """ def __init__(self, table): self.table = table @classmethod
[docs] def read(cls, filename, **kwargs): filename = make_path(filename) table = Table.read(str(filename), **kwargs) return cls(table=table)
def __str__(self): s = self.__class__.__name__ + '\n' s += str(self.table) return s
[docs] def plot(self, ax=None): import matplotlib.pyplot as plt if ax is None: ax = plt.gca()
# TODO def chi2_flux_points(flux_points, gp_model): """ Chi2 statistics for a list of flux points and model. """ model = gp_model(flux_points.e_ref) data = flux_points.table['dnde'].quantity data_err = flux_points.table['dnde_err'].quantity stat_per_bin = ((data - model) / data_err).value ** 2 return np.nansum(stat_per_bin), stat_per_bin def chi2_flux_points_assym(flux_points, gp_model): """ Assymetric chi2 statistics for a list of flux points and model. """ model = gp_model(flux_points.e_ref) data = flux_points.table['dnde'].quantity data_errp = flux_points.table['dnde_errp'].quantity data_errn = flux_points.table['dnde_errn'].quantity data_err = np.where(model > data, data_errp, data_errn) stat_per_bin = ((data - model) / data_err).value ** 2 return np.nansum(stat_per_bin), stat_per_bin
[docs]class FluxPointsFitter(object): """ Fit a set of flux points with a parametric model. Parameters ---------- optimizer : {'simplex', 'moncar', 'gridsearch'} Select optimizer error_estimator : {'covar'} Select error estimator ul_handling : {'ignore'} How to handle flux point upper limits in the fit Examples -------- Load flux points from file and fit with a power-law model:: from astropy import units as u from gammapy.spectrum import FluxPoints, FluxPointsFitter from gammapy.spectrum.models import PowerLaw filename = '$GAMMAPY_EXTRA/test_datasets/spectrum/flux_points/diff_flux_points.fits' flux_points = FluxPoints.read(filename) model = PowerLaw( index=2. * u.Unit(''), amplitude=1e-12 * u.Unit('cm-2 s-1 TeV-1'), reference=1. * u.TeV, ) fitter = FluxPointsFitter() result = fitter.run(flux_points, model) print(result['best_fit_model']) """ def __init__(self, stat='chi2', optimizer='simplex', error_estimator='covar', ul_handling='ignore'): if stat == 'chi2': self.stat = chi2_flux_points elif stat == 'chi2assym': self.stat = chi2_flux_points_assym else: raise ValueError("'{stat}' is not a valid fit statistic, please choose" " either 'chi2' or 'chi2assym'") if not ul_handling == 'ignore': raise NotImplementedError('No handling of upper limits implemented.') self.parameters = OrderedDict(optimizer=optimizer, error_estimator=error_estimator, ul_handling=ul_handling) def _setup_sherpa_fit(self, data, model): """Fit flux point using sherpa""" from sherpa.fit import Fit from sherpa.data import DataSimulFit from ..utils.sherpa import (SherpaDataWrapper, SherpaStatWrapper, SherpaModelWrapper, SHERPA_OPTMETHODS) optimizer = self.parameters['optimizer'] if data.sed_type == 'dnde': data = SherpaDataWrapper(data) else: raise NotImplementedError('Only fitting of differential flux points data ' 'is supported.') stat = SherpaStatWrapper(self.stat) data = DataSimulFit(name='GPFluxPoints', datasets=[data]) method = SHERPA_OPTMETHODS[optimizer] models = SherpaModelWrapper(model) return Fit(data=data, model=models, stat=stat, method=method)
[docs] def fit(self, data, model): """ Fit given model to data. Parameters ---------- model : `~gammapy.spectrum.models.SpectralModel` Spectral model (with fit start parameters) Returns ------- best_fit_model : `~gammapy.spectrum.models.SpectralModel` Best fit model """ p = self.parameters # TODO: make copy of model? if p['optimizer'] in ['simplex', 'moncar', 'gridsearch']: sherpa_fitter = self._setup_sherpa_fit(data, model) sherpa_fitter.fit() else: raise ValueError('Not a valid optimizer') return model
[docs] def statval(self, data, model): """ Compute statval for given model and data. Parameters ---------- model : `~gammapy.spectrum.models.SpectralModel` Spectral model """ return self.stat(data, model)
[docs] def dof(self, data, model): """ Degrees of freedom. Parameters ---------- model : `~gammapy.spectrum.models.SpectralModel` Spectral model """ m = len(model.parameters.free) n = len(data.table) return n - m
[docs] def estimate_errors(self, data, model): """ Estimate errors on best fit parameters. """ sherpa_fitter = self._setup_sherpa_fit(data, model) result = sherpa_fitter.est_errors() covariance = result.extra_output covar_axis = model.parameters.free model.parameters.set_parameter_covariance(covariance, covar_axis) return model
[docs] def run(self, data, model): """ Run all fitting adn extra information steps. Parameters ---------- data : list of `~gammapy.spectrum.FluxPoints` Flux points. model : `~gammapy.spectrum.models.SpectralModel` Spectral model Returns ------- result : `~collections.OrderedDict` Dictionary with fit results and debug output. """ result = OrderedDict() best_fit_model = self.fit(data, model) best_fit_model = self.estimate_errors(data, best_fit_model) result['best_fit_model'] = best_fit_model result['dof'] = self.dof(data, model) result['statval'] = self.statval(data, model)[0] result['statval/dof'] = result['statval'] / result['dof'] return result