Source code for gammapy.utils.nddata

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Utility functions and classes for n-dimensional data and axes."""
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import OrderedDict
import numpy as np
from astropy.units import Quantity
from .array import array_stats_str
from .interpolation import ScaledRegularGridInterpolator

__all__ = ["NDDataArray", "DataAxis", "BinnedDataAxis", "sqrt_space"]

[docs]class NDDataArray(object): """ND Data Array Base class for usage examples see :gp-notebook:`nddata_demo` Parameters ---------- axes : list List of `~gammapy.utils.nddata.DataAxis` data : `~astropy.units.Quantity` Data meta : dict Meta info interp_kwargs : dict TODO """ default_interp_kwargs = dict(bounds_error=False, values_scale="lin") """Default interpolation kwargs used to initialize the `scipy.interpolate.RegularGridInterpolator`. The interpolation behaviour of an individual axis ('log', 'linear') can be passed to the axis on initialization.""" def __init__(self, axes, data=None, meta=None, interp_kwargs=None): self._axes = axes if data is not None: = data if meta is not None: self.meta = OrderedDict(meta) self.interp_kwargs = interp_kwargs or self.default_interp_kwargs self._regular_grid_interp = None def __str__(self): ss = "NDDataArray summary info\n" for axis in self.axes: ss += array_stats_str(axis.nodes, ss += array_stats_str(, "Data") return ss @property def axes(self): """Array holding the axes in correct order""" return self._axes
[docs] def axis(self, name): """Return axis by name""" try: idx = [ for _ in self.axes].index(name) except ValueError: raise ValueError("Axis {} not found".format(name)) return self.axes[idx]
@property def data(self): """Array holding the n-dimensional data.""" return self._data @data.setter def data(self, data): """Set data. Some sanity checks are performed to avoid an invalid array. Also, the interpolator is set to None to avoid unwanted behaviour. Parameters ---------- data : `~astropy.units.Quantity`, array-like Data array """ data = Quantity(data) dimension = len(data.shape) if dimension != self.dim: raise ValueError( "Overall dimensions to not match. " "Data: {}, Hist: {}".format(dimension, self.dim) ) for dim in np.arange(self.dim): axis = self.axes[dim] if axis.nbins != data.shape[dim]: msg = "Data shape does not match in dimension {d}\n" msg += "Axis {n} : {sa}, Data {sd}" raise ValueError( msg.format(d=dim,, sa=axis.nbins, sd=data.shape[dim]) ) self._regular_grid_interp = None self._data = data @property def dim(self): """Dimension (number of axes)""" return len(self.axes)
[docs] def find_node(self, **kwargs): """Find next node Parameters ---------- kwargs : dict Keys are the axis names, Values the evaluation points """ node = [] for axis in self.axes: lookup_val = Quantity(kwargs.pop( temp = axis.find_node(lookup_val) node.append(temp) return node
[docs] def evaluate(self, method=None, **kwargs): """Evaluate NDData Array This function provides a uniform interface to several interpolators. The evaluation nodes are given as ``kwargs``. Currently available: `~scipy.interpolate.RegularGridInterpolator`, methods: linear, nearest Parameters ---------- method : str {'linear', 'nearest'}, optional Interpolation method kwargs : dict Keys are the axis names, Values the evaluation points Returns ------- array : `~astropy.units.Quantity` Interpolated values, axis order is the same as for the NDData array """ values = [] for idx, axis in enumerate(self.axes): # Extract values for each axis, default: nodes shape = np.ones(len(self.axes), dtype=int) shape[idx] = -1 default = axis.nodes.reshape(shape) temp = Quantity(kwargs.pop(, default)) # Transform to correct unit temp = temp.to_value(axis.unit) # Transform to match interpolation behaviour of axis values.append(np.atleast_1d(axis._interp_values(temp))) # This is to catch e.g. typos in axis names if kwargs != {}: raise ValueError("Input given for unknown axis: {}".format(kwargs)) if self._regular_grid_interp is None: self._add_regular_grid_interp() return self._regular_grid_interp(values, method=method, **kwargs)
def _add_regular_grid_interp(self, interp_kwargs=None): """Add `~scipy.interpolate.RegularGridInterpolator` Parameters ---------- interp_kwargs : dict, optional Interpolation kwargs """ if interp_kwargs is None: interp_kwargs = self.interp_kwargs points = [a._interp_nodes() for a in self.axes] values = # If values contains nan, only setup interpolator in valid range if np.isnan(values).any(): if self.dim > 1: raise NotImplementedError( "Data grid contains nan. This is not" "supported for arrays dimension > 1" ) else: mask = np.isfinite(values) points = [points[0][mask]] values = values[mask] self._regular_grid_interp = ScaledRegularGridInterpolator( points, values, **interp_kwargs )
[docs]class DataAxis(object): """Data axis to be used with NDDataArray Axis values are interpreted as nodes. For binned data see `~gammapy.utils.nddata.BinnedDataAxis`. Parameters ---------- nodes : `~astropy.units.Quantity` Interpolation nodes name : str, optional Axis name, default: 'Default' interpolation_mode : str {'linear', 'log'} Interpolation behaviour, default: 'linear' """ def __init__(self, nodes, name="Default", interpolation_mode="linear"): # Need this for subclassing (see BinnedDataAxis) if nodes is not None: self._data = Quantity(nodes) = name self._interpolation_mode = interpolation_mode def __str__(self): ss = self.__class__.__name__ ss += "\nName: {}".format( ss += "\nUnit: {}".format(self.unit) ss += "\nNodes: {}".format(self.nbins) ss += "\nInterpolation mode: {}".format(self.interpolation_mode) return ss @property def unit(self): """Axis unit""" return self.nodes.unit
[docs] @classmethod def logspace(cls, vmin, vmax, nbins, unit=None, **kwargs): """Create axis with equally log-spaced nodes if no unit is given, it will be taken from vmax, log interpolation is enable by default. Parameters ---------- vmin : `~astropy.units.Quantity`, float Lowest value vmax : `~astropy.units.Quantity`, float Highest value bins : int Number of bins unit : `~astropy.units.UnitBase`, str Unit """ kwargs.setdefault("interpolation_mode", "log") if unit is not None: vmin = Quantity(vmin, unit) vmax = Quantity(vmax, unit) else: vmin = Quantity(vmin) vmax = Quantity(vmax) unit = vmax.unit vmin = x_min, x_max = np.log10([vmin.value, vmax.value]) vals = np.logspace(x_min, x_max, nbins) return cls(vals * unit, **kwargs)
[docs] def find_node(self, val): """Find next node Parameters ---------- val : `~astropy.units.Quantity` Lookup value """ val = Quantity(val) if not val.unit.is_equivalent(self.unit): raise ValueError( "Units mismatch: val.unit = {!r}, self.unit = {!r}".format( val.unit, self.unit ) ) val = val = np.atleast_1d(val) x1 = np.array([val] * self.nbins).transpose() x2 = np.array([self.nodes] * len(val)) temp = np.abs(x1 - x2) idx = np.argmin(temp, axis=1) return idx
@property def nbins(self): """Number of bins""" return len(self.nodes) @property def nodes(self): """Evaluation nodes""" return self._data @property def interpolation_mode(self): """Interpolation mode """ return self._interpolation_mode def _interp_nodes(self): """Nodes to be used for interpolation""" if self.interpolation_mode == "log": return np.log10(self.nodes.value) else: return self.nodes.value def _interp_values(self, values): """Transform values correctly for interpolation""" if self.interpolation_mode == "log": return np.log10(values) else: return values
[docs]class BinnedDataAxis(DataAxis): """Data axis for binned data Parameters ---------- lo : `~astropy.units.Quantity` Lower bin edges hi : `~astropy.units.Quantity` Upper bin edges name : str, optional Axis name, default: 'Default' interpolation_mode : str {'linear', 'log'} Interpolation behaviour, default: 'linear' """ def __init__(self, lo, hi, **kwargs): self.lo = Quantity(lo) self.hi = Quantity(hi) super(BinnedDataAxis, self).__init__(None, **kwargs)
[docs] @classmethod def logspace(cls, emin, emax, nbins, unit=None, **kwargs): # TODO: splitout log space into a helper function vals = DataAxis.logspace(emin, emax, nbins + 1, unit)._data return cls(vals[:-1], vals[1:], **kwargs)
def __str__(self): ss = super(BinnedDataAxis, self).__str__() ss += "\nLower bounds {}".format(self.lo) ss += "\nUpper bounds {}".format(self.hi) return ss @property def bins(self): """Bin edges""" unit = self.lo.unit val = np.append(self.lo.value, self.hi.to_value(unit)[-1]) return val * unit @property def bin_width(self): """Bin width""" return self.hi - self.lo @property def nodes(self): """Evaluation nodes. Depending on the interpolation mode, either log or lin center are returned """ if self.interpolation_mode == "log": return self.log_center() else: return self.lin_center()
[docs] def lin_center(self): """Linear bin centers""" return (self.lo + self.hi) / 2
[docs] def log_center(self): """Logarithmic bin centers""" return np.sqrt(self.lo * self.hi)
[docs]def sqrt_space(start, stop, num): """Return numbers spaced evenly on a square root scale. This function is similar to `numpy.linspace` and `numpy.logspace`. Parameters ---------- start : float start is the starting value of the sequence stop : float stop is the final value of the sequence num : int Number of samples to generate. Returns ------- samples : `~numpy.ndarray` 1D array with a square root scale Examples -------- >>> from gammapy.utils.nddata import sqrt_space >>> samples = sqrt_space(0, 2, 5) array([ 0. , 1. , 1.41421356, 1.73205081, 2. ]) """ samples2 = np.linspace(start ** 2, stop ** 2, num) samples = np.sqrt(samples2) return samples