# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Ring background estimation."""
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import OrderedDict
from itertools import product
import numpy as np
from astropy.convolution import Ring2DKernel, Tophat2DKernel
import astropy.units as u
from ..image import SkyImageList, SkyImage
from ..image.utils import scale_cube
__all__ = [
'AdaptiveRingBackgroundEstimator',
'RingBackgroundEstimator',
'ring_r_out',
'ring_area_factor',
'ring_alpha',
]
[docs]class AdaptiveRingBackgroundEstimator(object):
"""Adaptive ring background algorithm.
This algorithm extends the standard `RingBackground` method by adapting the
size of the ring to achieve a minimum on / off exposure ratio (alpha) in regions
where the area to estimate the background from is limited.
Parameters
----------
r_in : `~astropy.units.Quantity`
Inner radius of the ring.
r_out_max : `~astropy.units.Quantity`
Maximal outer radius of the ring.
width : `~astropy.units.Quantity`
Width of the ring.
stepsize : `~astropy.units.Quantity`
Stepsize used for increasing the radius.
threshold_alpha : float
Threshold on alpha above which the adaptive ring takes action.
theta : `~astropy.units.Quantity`
Integration radius used for alpha computation.
method : {'fixed_width', 'fixed_r_in'}
Adaptive ring method.
Examples
--------
Here's an example how to use the `AdaptiveRingBackgroundEstimator`:
.. code:: python
from astropy import units as u
from gammapy.background import AdaptiveRingBackgroundEstimator
from gammapy.image import SkyImageList
filename = '$GAMMAPY_EXTRA/test_datasets/unbundled/poisson_stats_image/input_all.fits.gz'
images = SkyImageList.read(filename)
images['exposure'].name = 'exposure_on'
adaptive_ring_bkg = RingBackgroundEstimator(r_in=0.22 * u.deg,
r_out_max=0.8 * u.deg,
width=0.1 * u.deg)
results = adaptive_ring_bkg.run(images)
results['background'].show()
See Also
--------
RingBackgroundEstimator, gammapy.detect.KernelBackgroundEstimator
"""
def __init__(self, r_in, r_out_max, width, stepsize=0.02 * u.deg,
threshold_alpha=0.1, theta=0.22 * u.deg, method='fixed_width'):
# input validation
if method not in ['fixed_width', 'fixed_r_in']:
raise ValueError("Not a valid adaptive ring method.")
self.parameters = OrderedDict(r_in=r_in, r_out_max=r_out_max, width=width,
stepsize=stepsize, threshold_alpha=threshold_alpha,
method=method, theta=theta)
[docs] def kernels(self, image):
"""Ring kernels according to the specified method.
Parameters
----------
image : `~gammapy.image.SkyImage`
Sky image specifying the WCS information.
Returns
-------
kernels : list
List of `~astropy.convolution.Ring2DKernel`
"""
p = self.parameters
scale = image.wcs_pixel_scale()[0]
r_in = p['r_in'].to('deg') / scale
r_out_max = p['r_out_max'].to('deg') / scale
width = p['width'].to('deg') / scale
stepsize = p['stepsize'].to('deg') / scale
kernels = []
if p['method'] == 'fixed_width':
r_ins = np.arange(r_in.value, (r_out_max - width).value, stepsize.value)
widths = [width.value]
elif p['method'] == 'fixed_r_in':
widths = np.arange(width.value, (r_out_max - r_in).value, stepsize.value)
r_ins = [r_in.value]
for r_in, width in product(r_ins, widths):
kernel = Ring2DKernel(r_in, width)
kernel.normalize('peak')
kernels.append(kernel)
return kernels
def _alpha_approx_cube(self, cubes):
"""Compute alpha as on_exposure / off_exposure.
Where off_exposure < 0, alpha is set to infinity.
"""
exposure_on = cubes['exposure_on']
exposure_off = cubes['exposure_off']
alpha_approx = np.where(exposure_off > 0, exposure_on / exposure_off, np.inf)
return alpha_approx
def _exposure_off_cube(self, images, kernels):
"""Compute off exposure cube.
The on exposure is convolved with different
ring kernels and stacking the data along the third dimension.
"""
exposure = images['exposure_on'].data
exclusion = images['exclusion'].data
return scale_cube(exposure * exclusion, kernels)
def _exposure_on_cube(self, images, kernels):
"""Compute on exposure cube.
Calculated by convolving the on exposure with a tophat
of radius theta, and stacking all images along the third dimension.
"""
from scipy.ndimage import convolve
exposure_on = images['exposure_on']
scale = exposure_on.wcs_pixel_scale()[0]
theta = self.parameters['theta'] * scale
tophat = Tophat2DKernel(theta.value)
tophat.normalize('peak')
exposure_on = convolve(exposure_on, tophat.array)
exposure_on_cube = np.repeat(exposure_on[:, :, np.newaxis], len(kernels), axis=2)
return exposure_on_cube
def _off_cube(self, images, kernels):
"""Compute off cube.
Calculated by convolving the raw counts with different ring kernels
and stacking the data along the third dimension.
"""
counts = images['counts'].data
exclusion = images['exclusion'].data
return scale_cube(counts * exclusion, kernels)
def _reduce_cubes(self, cubes):
"""Compute off and off exposure map.
Calulated by reducing the cubes. The data is
iterated along the third axis (i.e. increasing ring sizes), the value
with the first approximate alpha < threshold is taken.
"""
threshold = self.parameters['threshold_alpha']
alpha_approx_cube = cubes['alpha_approx']
off_cube = cubes['off']
exposure_off_cube = cubes['exposure_off']
shape = alpha_approx_cube.shape[:2]
off = np.tile(np.nan, shape)
exposure_off = np.tile(np.nan, shape)
for idx in np.arange(alpha_approx_cube.shape[-1]):
mask = (alpha_approx_cube[:, :, idx] <= threshold) & np.isnan(off)
off[mask] = off_cube[:, :, idx][mask]
exposure_off[mask] = exposure_off_cube[:, :, idx][mask]
return exposure_off, off
[docs] def run(self, images):
"""Run adaptive ring background algorithm.
Parameters
----------
images : `SkyImageList`
Input sky images.
Returns
-------
result : `SkyImageList`
Result sky images.
"""
required = ['counts', 'exposure_on', 'exclusion']
images.check_required(required)
wcs = images['counts'].wcs.copy()
cubes = OrderedDict()
kernels = self.kernels(images['counts'])
cubes['exposure_on'] = self._exposure_on_cube(images, kernels)
cubes['exposure_off'] = self._exposure_off_cube(images, kernels)
cubes['off'] = self._off_cube(images, kernels)
cubes['alpha_approx'] = self._alpha_approx_cube(cubes)
exposure_off, off = self._reduce_cubes(cubes)
alpha = images['exposure_on'].data / exposure_off
not_has_exposure = ~(images['exposure_on'].data > 0)
# set data outside fov to zero
for data in [alpha, off, exposure_off]:
data[not_has_exposure] = 0
background = alpha * off
result = SkyImageList()
result['exposure_off'] = SkyImage(data=exposure_off, wcs=wcs)
result['off'] = SkyImage(data=off, wcs=wcs)
result['alpha'] = SkyImage(data=alpha, wcs=wcs)
result['background'] = SkyImage(data=background, wcs=wcs)
return result
[docs]class RingBackgroundEstimator(object):
"""Ring background method for cartesian coordinates.
Step 1: apply exclusion mask
Step 2: ring-correlate
Step 3: apply psi cut
TODO: add method to apply the psi cut
Parameters
----------
r_in : `~astropy.units.Quantity`
Inner ring radius
width : `~astropy.units.Quantity`
Ring width.
use_fft_convolution : bool
Use fft convolution.
Examples
--------
Here's an example how to use the `RingBackgroundEstimator`:
.. code:: python
from astropy import units as u
from gammapy.background import RingBackgroundEstimator
from gammapy.image import SkyImageList
filename = '$GAMMAPY_EXTRA/test_datasets/unbundled/poisson_stats_image/input_all.fits.gz'
images = SkyImageList.read(filename)
images['exposure'].name = 'exposure_on'
ring_bkg = RingBackgroundEstimator(r_in=0.35 * u.deg, width=0.3 * u.deg)
results = ring_bkg.run(images)
results['background'].show()
See Also
--------
gammapy.detect.KernelBackgroundEstimator, AdaptiveRingBackgroundEstimator
"""
def __init__(self, r_in, width, use_fft_convolution=False):
self.parameters = dict(r_in=r_in, width=width, use_fft_convolution=use_fft_convolution)
[docs] def kernel(self, image):
"""Ring kernel.
Parameters
----------
image : `gammapy.image.SkyImage`
Image
Returns
-------
ring : `~astropy.convolution.Ring2DKernel`
Ring kernel.
"""
p = self.parameters
scale = image.wcs_pixel_scale()[0]
r_in = p['r_in'].to('deg') / scale
width = p['width'].to('deg') / scale
ring = Ring2DKernel(r_in.value, width.value)
ring.normalize('peak')
return ring
[docs] def run(self, images):
"""Run ring background algorithm.
Required sky images: {required}
Parameters
----------
images : `SkyImageList`
Input sky images.
Returns
-------
result : `SkyImageList`
Result sky images
"""
p = self.parameters
required = ['counts', 'exposure_on', 'exclusion']
images.check_required(required)
counts, exposure_on, exclusion = [images[_] for _ in required]
wcs = counts.wcs.copy()
result = SkyImageList()
ring = self.kernel(counts)
counts_excluded = SkyImage(data=counts.data * exclusion.data, wcs=wcs)
result['off'] = counts_excluded.convolve(ring.array, mode='constant',
use_fft=p['use_fft_convolution'])
result['off'].data = result['off'].data.astype(int)
exposure_on_excluded = SkyImage(data=exposure_on.data * exclusion.data, wcs=wcs)
result['exposure_off'] = exposure_on_excluded.convolve(ring.array, mode='constant',
use_fft=p['use_fft_convolution'])
with np.errstate(divide='ignore'):
# set pixels, where ring is too small to NaN
not_has_off_exposure = ~(result['exposure_off'].data > 0)
result['exposure_off'].data[not_has_off_exposure] = np.nan
result['alpha'] = SkyImage(data=exposure_on.data / result['exposure_off'].data, wcs=wcs)
not_has_exposure = ~(exposure_on.data > 0)
result['alpha'].data[not_has_exposure] = 0
result['background'] = SkyImage(data=result['alpha'].data * result['off'].data, wcs=wcs)
return result
[docs] def info(self):
"""Print summary info about the parameters."""
print(str(self))
def __str__(self):
"""String representation of the class."""
info = "RingBackground parameters: \n"
info += 'r_in : {}\n'.format(self.parameters['r_in'])
info += 'width: {}\n'.format(self.parameters['width'])
return info
[docs]def ring_r_out(theta, r_in, area_factor):
"""Compute ring outer radius.
The determining equation is:
area_factor =
off_area / on_area =
(pi (r_out**2 - r_in**2)) / (pi * theta**2 )
Parameters
----------
theta : float
On region radius
r_in : float
Inner ring radius
area_factor : float
Desired off / on area ratio
Returns
-------
r_out : float
Outer ring radius
"""
return np.sqrt(area_factor * theta ** 2 + r_in ** 2)
[docs]def ring_area_factor(theta, r_in, r_out):
"""Compute ring area factor.
Parameters
----------
theta : float
On region radius
r_in : float
Inner ring radius
r_out : float
Outer ring radius
"""
return (r_out ** 2 - r_in ** 2) / theta ** 2
[docs]def ring_alpha(theta, r_in, r_out):
"""Compute ring alpha, the inverse area factor.
Parameters
----------
theta : float
On region radius
r_in : float
Inner ring radius
r_out : float
Outer ring radius
"""
return 1. / ring_area_factor(theta, r_in, r_out)