# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Tools to create profiles (i.e. 1D "slices" from 2D images)"""
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import OrderedDict
import numpy as np
from astropy.table import Table
from astropy import units as u
from .core import SkyImage
__all__ = [
'ImageProfile',
'ImageProfileEstimator'
]
def compute_binning(data, n_bins, method='equal width', eps=1e-10):
"""Computes 1D array of bin edges.
The range of the bin_edges is always [min(data), max(data)]
Note that bin_edges has n_bins bins, i.e. length n_bins + 1.
Parameters
----------
data : array_like
Data to be binned (any dimension)
n_bins : int
Number of bins
method : str
One of: 'equal width', 'equal entries'
eps : float
added to range so that the max data point is inside the
last bin. If eps=0 it falls on the right edge of the last
data point and thus would be not cointained.
Returns
-------
bin_edges : 1D ndarray
Array of bin edges.
"""
data = np.asanyarray(data)
if method == 'equal width':
bin_edges = np.linspace(np.nanmin(data), np.nanmax(data), n_bins + 1)
elif method == 'equal entries':
# We use np.percentile to achieve equal number of entries per bin
# It takes a list of quantiles in the range [0, 100] as input
quantiles = list(np.linspace(0, 100, n_bins + 1))
bin_edges = np.percentile(data, quantiles)
else:
raise ValueError('Invalid option: method = {0}'.format(method))
bin_edges[-1] += eps
return bin_edges
class FluxProfile(object):
"""Flux profile.
Note: over- and underflow is ignored and not stored in the profile
Note: this is implemented by creating bin labels and storing the
input 2D data in 1D `pandas.DataFrame` tables.
The 1D profile is also stored as a `pandas.DataFrame` and computed
using the fast and flexible pandas groupby and apply functions.
* TODO: take mask into account everywhere
* TODO: separate FluxProfile.profile into a separate ProfileStack or HistogramStack class?
* TODO: add ``solid_angle`` to input arrays.
Parameters
----------
x_image : array_like
Label image (2-dimensional)
x_edges : array_like
Defines binning in ``x`` (could be GLON, GLAT, DIST, ...)
counts, background, exposure : array_like
Input images (2-dimensional)
mask : array_like
possibility to mask pixels (i.e. ignore in computations)
"""
def __init__(self, x_image, x_edges, counts, background, exposure, mask=None):
import pandas as pd
# Make sure inputs are numpy arrays
x_edges = np.asanyarray(x_edges)
x_image = np.asanyarray(x_image)
counts = np.asanyarray(counts)
background = np.asanyarray(background)
exposure = np.asanyarray(exposure)
if mask:
mask = np.asanyarray(mask)
assert (x_image.shape == counts.shape == background.shape ==
exposure.shape == mask.shape)
# Remember the shape of the 2D input arrays
self.shape = x_image.shape
# Store all input data as 1D vectors in a pandas.DataFrame
d = pd.DataFrame(index=np.arange(x_image.size))
d['x'] = x_image.flat
# By default np.digitize uses 0 as the underflow bin.
# Here we ignore under- and overflow, thus the -1
d['label'] = np.digitize(d['x'], x_edges) - 1
d['counts'] = counts.flat
d['background'] = background.flat
d['exposure'] = exposure.flat
if mask:
d['mask'] = mask.flat
else:
d['mask'] = np.ones_like(d['x'])
self.data = d
self.bins = np.arange(len(x_edges) + 1)
# Store all per-profile bin info in a pandas.DataFrame
p = pd.DataFrame(index=np.arange(x_edges.size - 1))
p['x_lo'] = x_edges[:-1]
p['x_hi'] = x_edges[1:]
p['x_center'] = 0.5 * (p['x_hi'] + p['x_lo'])
p['x_width'] = p['x_hi'] - p['x_lo']
self.profile = p
# The x_edges array is longer by one than the profile arrays,
# so we store it separately
self.x_edges = x_edges
def compute(self):
"""Compute the flux profile.
TODO: call `~gammapy.stats.compute_total_stats` instead.
Note: the current implementation is very inefficienct in speed and memory.
There are various fast implementations, but none is flexible enough to
allow combining many input quantities (counts, background, exposure) in a
flexlible way:
- `numpy.histogram`
- `scipy.ndimage.measurements.labeled_comprehension` and special cases
pandas DataFrame groupby followed by apply is flexible enough, I think:
http://pandas.pydata.org/pandas-docs/dev/groupby.html
Returns
-------
results : dict
Dictionary of profile measurements, also stored in ``self.profile``.
See also
--------
gammapy.stats.compute_total_stats
"""
# Shortcuts to access class info needed in this method
d = self.data
# Here the pandas magic happens: we group pixels by label
g = d.groupby('label')
p = self.profile
# Compute number of entries in each profile bin
p['n_entries'] = g['x'].aggregate(len)
for name in ['counts', 'background', 'exposure']:
p['{0}'.format(name)] = g[name].sum()
# p['{0}_mean'.format(name)] = p['{0}_sum'.format(name)] / p['n_entries']
p['excess'] = p['counts'] - p['background']
p['flux'] = p['excess'] / p['exposure']
return p
def plot(self, which='n_entries', xlabel='Distance (deg)', ylabel=None):
"""Plot flux profile.
Parameters
----------
TODO
"""
import matplotlib.pyplot as plt
if ylabel is None:
ylabel = which
p = self.profile
x = p['x_center']
xerr = 0.5 * p['x_width']
y = p[which]
plt.errorbar(x, y, xerr=xerr, fmt='o')
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.grid()
# plt.ylim(-10, 20)
def save(self, filename):
"""Save all profiles to a FITS file.
Parameters
----------
"""
raise NotImplementedError
# TODO: implement measuring profile along arbitrary directions
# TODO: think better about error handling. e.g. MC based methods
[docs]class ImageProfileEstimator(object):
"""
Estimate profile from image.
Parameters
----------
x_edges : `~astropy.coordinates.Angle`
Coordinate edges to define a custom measument grid (optional).
method : ['sum', 'mean']
Compute sum or mean within profile bins.
axis : ['lon', 'lat']
Along which axis to estimate the profile.
Examples
--------
This example shows how to compute a counts profile for the Fermi galactic
center region:
.. code::
import matplotlib.pyplot as plt
from gammapy.datasets import FermiGalacticCenter
from gammapy.image import ImageProfile, ImageProfileEstimator
from gammapy.image import SkyImage
from astropy import units as u
# load example data
fermi_cts = SkyImage.from_image_hdu(FermiGalacticCenter.counts())
fermi_cts.unit = u.count
# set up profile estimator and run
p = ImageProfileEstimator(axis='lon', method='sum')
profile = p.run(fermi_cts)
# smooth profile and plot
smoothed = profile.smooth(kernel='gauss')
smoothed.peek()
plt.show()
"""
def __init__(self, x_edges=None, method='sum', axis='lon'):
self._x_edges = x_edges
if method not in ['sum', 'mean']:
raise ValueError("Not a valid method, choose either 'sum' or 'mean'")
if axis not in ['lon', 'lat']:
raise ValueError("Not a valid axis, choose either 'lon' or 'lat'")
self.parameters = OrderedDict(method=method, axis=axis)
def _get_x_edges(self, image):
"""
Get x_ref coordinate array.
"""
if self._x_edges is not None:
return self._x_edges
p = self.parameters
coordinates = image.coordinates(mode='edges')
if p['axis'] == 'lat':
x_edges = coordinates[:, 0].data.lat
elif p['axis'] == 'lon':
lon = coordinates[0, :].data.lon
x_edges = lon.wrap_at('180d')
return x_edges
def _label_image(self, image):
"""
Compute label image.
"""
p = self.parameters
label_image = SkyImage.empty_like(image)
coordinates = image.coordinates()
x_edges = self._get_x_edges(image)
if p['axis'] == 'lon':
lon = coordinates.data.lon.wrap_at('180d')
data = np.digitize(lon.degree, x_edges.deg)
elif p['axis'] == 'lat':
lat = coordinates.data.lat
data = np.digitize(lat.degree, x_edges.deg)
label_image.data = data
return label_image
def _estimate_profile(self, image, image_err, mask):
"""
Estimate image profile.
"""
from scipy import ndimage
p = self.parameters
labels = self._label_image(image, mask)
profile_err = None
index = np.arange(1, len(self._get_x_edges(image)))
if p['method'] == 'sum':
profile = ndimage.sum(image.data, labels.data, index)
if image.unit.is_equivalent('counts'):
profile_err = np.sqrt(profile)
elif image_err:
# gaussian error propagation
err_sum = ndimage.sum(image_err.data ** 2, labels.data, index)
profile_err = np.sqrt(err_sum)
elif p['method'] == 'mean':
# gaussian error propagation
profile = ndimage.mean(image.data, labels.data, index)
if image_err:
N = ndimage.sum(~np.isnan(image_err.data), labels.data, index)
err_sum = ndimage.sum(image_err.data ** 2, labels.data, index)
profile_err = np.sqrt(err_sum) / N
return profile, profile_err
def _label_image(self, image, mask=None):
"""
Compute label image.
"""
p = self.parameters
label_image = SkyImage.empty_like(image)
coordinates = image.coordinates()
x_edges = self._get_x_edges(image)
if p['axis'] == 'lon':
lon = coordinates.data.lon.wrap_at('180d')
data = np.digitize(lon.degree, x_edges.deg)
elif p['axis'] == 'lat':
lat = coordinates.data.lat
data = np.digitize(lat.degree, x_edges.deg)
if mask is not None:
# assign masked values to background
data[mask.data] = 0
label_image.data = data
return label_image
[docs] def run(self, image, image_err=None, mask=None):
"""
Run image profile estimator.
Parameters
----------
image : `~gammapy.image.SkyImage`
Input image to run profile estimator on.
image_err : `~gammapy.image.SkyImage`
Input error image to run profile estimator on.
mask : `~gammapy.image.SkyImage`
Optional mask to exclude regions from the measurement.
Returns
-------
profile : `ImageProfile`
Result image profile object.
"""
p = self.parameters
image = image.copy()
if image.unit.is_equivalent('count'):
image_err = SkyImage.empty_like(image)
image_err.data = np.sqrt(image.data)
if image_err:
image_err = image_err.copy()
profile, profile_err = self._estimate_profile(image, image_err, mask)
result = Table()
x_edges = self._get_x_edges(image)
result['x_min'] = x_edges[:-1]
result['x_max'] = x_edges[1:]
result['x_ref'] = (x_edges[:-1] + x_edges[1:]) / 2
result['profile'] = profile * image.unit
if profile_err is not None:
result['profile_err'] = profile_err * image.unit
result.meta['PROFILE_TYPE'] = p['axis']
return ImageProfile(result)
[docs]class ImageProfile(object):
"""
Image profile class.
The image profile data is stored in `~astropy.table.Table` object, with the
following columns:
* `x_ref` Coordinate bin center (required).
* `x_min` Coordinate bin minimum (optional).
* `x_max` Coordinate bin maximum (optional).
* `profile` Image profile data (required).
* `profile_err` Image profile data error (optional).
Parameters
----------
table : `~astropy.table.Table`
Table instance with the columns specified as above.
"""
def __init__(self, table):
self.table = table
[docs] def smooth(self, kernel='box', radius=0.1 * u.deg, **kwargs):
"""
Smooth profile with error propagation.
Smoothing is described by a convolution:
.. math::
x_j = \sum_i x_{(j - i)} h_i
Where :math:`h_i` are the coefficients of the convolution kernel.
The corresponding error on :math:`x_j` is then estimated using Gaussian
error propagation, neglecting correlations between the individual
:math:`x_{(j - i)}`:
.. math::
\Delta x_j = \sqrt{\sum_i \Delta x^{2}_{(j - i)} h^{2}_i}
Parameters
----------
kernel : {'gauss', 'box'}
Kernel shape
radius : `~astropy.units.Quantity` or float
Smoothing width given as quantity or float. If a float is given it
is interpreted as smoothing width in pixels. If an (angular) quantity
is given it is converted to pixels using `xref[1] - x_ref[0]`.
kwargs : dict
Keyword arguments passed to `~scipy.ndimage.uniform_filter`
('box') and `~scipy.ndimage.gaussian_filter` ('gauss').
Returns
-------
profile : `ImageProfile`
Smoothed image profile.
"""
from scipy.ndimage.filters import uniform_filter, gaussian_filter
from scipy.ndimage import convolve
from astropy.convolution import Gaussian1DKernel, Box1DKernel
table = self.table.copy()
profile = table['profile']
radius = np.abs(radius / np.diff(self.x_ref))[0]
width = 2 * radius.value + 1
if kernel == 'box':
smoothed = uniform_filter(profile.astype('float'), width, **kwargs)
# renormalize data
if table['profile'].unit.is_equivalent('count'):
smoothed *= int(width)
smoothed_err = np.sqrt(smoothed)
elif 'profile_err' in table.colnames:
profile_err = table['profile_err']
# use gaussian error propagation
box = Box1DKernel(width)
err_sum = convolve(profile_err ** 2, box.array ** 2)
smoothed_err = np.sqrt(err_sum)
elif kernel == 'gauss':
smoothed = gaussian_filter(profile.astype('float'), width, **kwargs)
# use gaussian error propagation
if 'profile_err' in table.colnames:
profile_err = table['profile_err']
gauss = Gaussian1DKernel(width)
err_sum = convolve(profile_err ** 2, gauss.array ** 2)
smoothed_err = np.sqrt(err_sum)
else:
raise ValueError("Not valid kernel choose either 'box' or 'gauss'")
table['profile'] = smoothed * self.table['profile'].unit
if 'profile_err' in table.colnames:
table['profile_err'] = smoothed_err * self.table['profile'].unit
return self.__class__(table)
[docs] def plot(self, ax=None, **kwargs):
"""
Plot image profile.
Parameters
----------
ax : `~matplotlib.axes.Axes`
Axes object
**kwargs : dict
Keyword arguments passed to `~matplotlib.axes.Axes.plot`
Returns
-------
ax : `~matplotlib.axes.Axes`
Axes object
"""
import matplotlib.pyplot as plt
if ax is None:
ax = plt.gca()
y = self.table['profile'].data
x = self.x_ref.value
ax.plot(x, y, **kwargs)
ax.set_xlabel('lon')
ax.set_ylabel('profile')
ax.set_xlim(x.max(), x.min())
return ax
[docs] def plot_err(self, ax=None, **kwargs):
"""
Plot image profile error as band.
Parameters
----------
ax : `~matplotlib.axes.Axes`
Axes object
**kwargs : dict
Keyword arguments passed to plt.fill_between()
Returns
-------
ax : `~matplotlib.axes.Axes`
Axes object
"""
import matplotlib.pyplot as plt
if ax is None:
ax = plt.gca()
y = self.table['profile'].data
ymin = y - self.table['profile_err'].data
ymax = y + self.table['profile_err'].data
x = self.x_ref.value
# plotting defaults
kwargs.setdefault('alpha', 0.5)
ax.fill_between(x, ymin, ymax, **kwargs)
ax.set_xlabel('x (deg)')
ax.set_ylabel('profile')
return ax
@property
def x_ref(self):
"""
Reference x coordinates.
"""
return self.table['x_ref'].quantity
@property
def x_min(self):
"""
Min. x coordinates.
"""
return self.table['x_min'].quantity
@property
def x_max(self):
"""
Max. x coordinates.
"""
return self.table['x_max'].quantity
@property
def profile(self):
"""
Image profile quantity.
"""
return self.table['profile'].quantity
@property
def profile_err(self):
"""
Image profile error quantity.
"""
try:
return self.table['profile_err'].quantity
except KeyError:
return None
[docs] def peek(self, figsize=(8, 4.5), **kwargs):
"""
Show image profile and error.
Parameters
----------
**kwargs : dict
Keyword arguments passed to `ImageProfile.plot_profile()`
Returns
-------
ax : `~matplotlib.axes.Axes`
Axes object
"""
import matplotlib.pyplot as plt
fig = plt.figure(figsize=figsize)
ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])
ax = self.plot(ax, **kwargs)
if 'profile_err' in self.table.colnames:
opts = {}
opts['color'] = kwargs.get('c')
ax = self.plot_err(ax, **opts)
return ax
[docs] def normalize(self, mode='peak'):
"""
Normalize profile to peak value or integral.
Parameters
----------
mode : ['integral', 'peak']
Normalize image profile so that it integrates to unity ('integral')
or the maximum value corresponds to one ('peak').
Returns
-------
profile : `ImageProfile`
Normalized image profile.
"""
table = self.table.copy()
profile = self.table['profile']
if mode == 'peak':
norm = np.nanmax(profile)
elif mode == 'integral':
norm = np.nansum(profile)
else:
raise ValueError("Not a valid normalization mode. Choose either"
" 'peak' or 'integral'")
table['profile'] /= norm
if 'profile_err' in table.colnames:
table['profile_err'] /= norm
return self.__class__(table)