# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Implementation of adaptive smoothing algorithms.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import OrderedDict
import numpy as np
from astropy.convolution import Gaussian2DKernel, Tophat2DKernel
from ..stats import significance
from .utils import scale_cube
from . import SkyImage, SkyImageList
__all__ = ['ASmooth', 'asmooth_scales']
# TODO: move to gammapy.stats.significance
def _significance_asmooth(counts, background):
"""
Significance according to formula (5) in asmooth paper.
Parameters
----------
counts : ndarray
Counts data array
background : ndarray
Background data array.
"""
return (counts - background) / np.sqrt(counts + background)
[docs]def asmooth_scales(n_scales, factor=np.sqrt(2), kernel=Gaussian2DKernel):
"""Create list of Gaussian widths."""
if kernel == Gaussian2DKernel:
sigma_0 = 1. / np.sqrt(9 * np.pi)
elif kernel == Tophat2DKernel:
sigma_0 = 1. / np.sqrt(np.pi)
return sigma_0 * factor ** np.arange(n_scales)
[docs]class ASmooth(object):
"""Adaptively smooth counts image.
Achieves a roughly constant significance of features across the whole image.
Algorithm based on http://adsabs.harvard.edu/abs/2006MNRAS.368...65E
The algorithm was slightly adapted to also allow Li & Ma and TS to estimate the
significance of a feature in the image.
Parameters
----------
kernel : `astropy.convolution.Kernel`
Smoothing kernel.
method : {'simple', 'asmooth', 'lima'}
Significance estimation method.
threshold : float
Significance threshold.
scales : `~astropy.units.Quantity`
Smoothing scales.
"""
def __init__(self, kernel=Gaussian2DKernel, method='simple', threshold=5,
scales=None):
self.parameters = OrderedDict(kernel=kernel, method=method,
threshold=threshold, scales=scales)
[docs] def kernels(self, image):
"""
Ring kernels according to the specified method.
Parameters
----------
image : `~gammapy.image.SkyImage`
Sky image specifying the WCS information.
Returns
-------
kernels : list
List of `~astropy.convolution.Kernel`
"""
p = self.parameters
pix_per_deg = image.wcs_pixel_scale()[0]
scales = p['scales'].to('deg') / pix_per_deg
kernels = []
for scale in scales.value:
kernel = p['kernel'](scale, mode='oversample')
# TODO: check if normalizing here makes sense
kernel.normalize('peak')
kernels.append(kernel)
return kernels
def _significance_cube(self, cubes):
p = self.parameters
if p['method'] == 'lima':
scube = significance(cubes['counts'], cubes['background'], method='lima')
elif p['method'] == 'simple':
scube = significance(cubes['counts'], cubes['background'], method='simple')
elif p['method'] == 'asmooth':
scube = _significance_asmooth(cubes['counts'], cubes['background'])
elif p['method'] == 'ts':
raise NotImplementedError
else:
raise ValueError("Not a valid significance estimation method."
" Choose one of the following: 'lima', 'simple',"
" 'asmooth' or 'ts'")
return scube
[docs] def run(self, images):
"""
Run image smoothing.
Parameters
----------
images : `SkyImageList`
List of input sky images.
Returns
-------
smoothed : `SkyImageList`
List of smoothed sky images containing:
* 'counts'
* 'background'
* 'flux' (optional)
* 'scales'
* 'significance'.
"""
images.check_required(['counts'])
wcs = images['counts'].wcs.copy()
kernels = self.kernels(images['counts'])
cubes = {}
cubes['counts'] = scale_cube(images['counts'].data, kernels)
if 'background' in images.names:
cubes['background'] = scale_cube(images['background'].data, kernels)
else:
# TODO: Estimate background with asmooth method
raise ValueError('Background estimation required.')
if 'exposure' in images.names:
flux = ((images['counts'].data - images['background'].data) /
images['exposure'].data)
cubes['flux'] = scale_cube(flux, kernels)
cubes['significance'] = self._significance_cube(cubes)
smoothed = self._reduce_cubes(cubes, kernels)
result = SkyImageList()
for key in ['counts', 'background', 'scale', 'significance']:
data = smoothed[key]
# set remaining pixels with significance < threshold to mean value
if key in ['counts', 'background']:
mask = np.isnan(data)
data[mask] = np.mean(images[key].data[mask])
result[key] = SkyImage(data=data, wcs=wcs)
if 'exposure' in images.names:
data = smoothed['flux']
mask = np.isnan(data)
data[mask] = np.mean(flux[mask])
result['flux'] = SkyImage(data=data, wcs=wcs)
return result
def _reduce_cubes(self, cubes, kernels):
"""
Combine scale cube to image.
Parameters
----------
cubes : dict
Data cubes
"""
p = self.parameters
shape = cubes['counts'].shape[:2]
smoothed = {}
# Init smoothed data arrays
for key in ['counts', 'background', 'scale', 'significance', 'flux']:
smoothed[key] = np.tile(np.nan, shape)
for idx, scale in enumerate(p['scales']):
# slice out 2D image at index idx out of cube
slice_ = np.s_[:, :, idx]
mask = np.isnan(smoothed['counts'])
mask = (cubes['significance'][slice_] > p['threshold']) & mask
smoothed['scale'][mask] = scale
smoothed['significance'][mask] = cubes['significance'][slice_][mask]
# renormalize smoothed data arrays
norm = kernels[idx].array.sum()
for key in ['counts', 'background']:
smoothed[key][mask] = cubes[key][slice_][mask] / norm
if 'flux' in cubes:
smoothed['flux'][mask] = cubes['flux'][slice_][mask] / norm
return smoothed