# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Implementation of adaptive smoothing algorithms."""
import numpy as np
from astropy.convolution import Gaussian2DKernel, Tophat2DKernel
from astropy.coordinates import Angle
from gammapy.cube import MapDatasetOnOff
from gammapy.maps import WcsNDMap, scale_cube
from gammapy.stats import significance
__all__ = ["ASmoothMapEstimator"]
def _significance_asmooth(counts, background):
"""Significance according to formula (5) in asmooth paper."""
return (counts - background) / np.sqrt(counts + background)
[docs]class ASmoothMapEstimator:
"""Adaptively smooth counts image.
Achieves a roughly constant significance of features across the whole image.
Algorithm based on https://ui.adsabs.harvard.edu/abs/2006MNRAS.368...65E
The algorithm was slightly adapted to also allow Li & Ma and TS to estimate the
significance of a feature in the image.
Parameters
----------
scales : `~astropy.units.Quantity`
Smoothing scales.
kernel : `astropy.convolution.Kernel`
Smoothing kernel.
method : {'simple', 'asmooth', 'lima'}
Significance estimation method.
threshold : float
Significance threshold.
"""
def __init__(self, scales, kernel=Gaussian2DKernel, method="simple", threshold=5):
self.parameters = {
"kernel": kernel,
"method": method,
"threshold": threshold,
"scales": scales,
}
[docs] def kernels(self, pixel_scale):
"""
Ring kernels according to the specified method.
Parameters
----------
pixel_scale : `~astropy.coordinates.Angle`
Sky image pixel scale
Returns
-------
kernels : list
List of `~astropy.convolution.Kernel`
"""
p = self.parameters
scales = p["scales"].to_value("deg") / Angle(pixel_scale).deg
kernels = []
for scale in scales: # .value:
kernel = p["kernel"](scale, mode="oversample")
# TODO: check if normalizing here makes sense
kernel.normalize("peak")
kernels.append(kernel)
return kernels
@staticmethod
def _significance_cube(cubes, method):
if method in {"lima", "simple"}:
scube = significance(cubes["counts"], cubes["background"], method="lima")
elif method == "asmooth":
scube = _significance_asmooth(cubes["counts"], cubes["background"])
elif method == "ts":
raise NotImplementedError()
else:
raise ValueError(
"Not a valid significance estimation method."
" Choose one of the following: 'lima', 'simple',"
" 'asmooth' or 'ts'"
)
return scube
[docs] def run(self, dataset):
"""
Run adaptive smoothing on input MapDataset.
The latter should have
Parameters
----------
dataset : `~gammapy.cube.MapDataset` or `~gammapy.cube.MapDatasetOnOff`
the input dataset (with one bin in energy at most)
Returns
-------
images : dict of `~gammapy.maps.WcsNDMap`
Smoothed images; keys are:
* 'counts'
* 'background'
* 'flux' (optional)
* 'scales'
* 'significance'.
"""
# Check dimensionality
if len(dataset.data_shape) == 3:
if dataset.data_shape[0] != 1:
raise ValueError(
"ASmoothMapEstimator.run() requires a dataset with 1 energy bin at most."
)
counts = dataset.counts.sum_over_axes(keepdims=False)
background = dataset.npred()
if isinstance(dataset, MapDatasetOnOff):
background += dataset.background
background = background.sum_over_axes(keepdims=False)
if dataset.exposure is not None:
exposure = dataset.exposure.sum_over_axes(keepdims=False)
else:
exposure = None
return self.estimate_maps(counts, background, exposure)
[docs] def estimate_maps(self, counts, background, exposure=None):
"""
Run adaptive smoothing on input Maps.
Parameters
----------
counts : `~gammapy.maps.Map`
counts map
background : `~gammapy.maps.Map`
estimated background counts map
exposure : `~gammapy.maps.Map`
exposure map. If set, it will produce a flux smoothed map.
Returns
-------
images : dict of `~gammapy.maps.WcsNDMap`
Smoothed images; keys are:
* 'counts'
* 'background'
* 'flux' (optional)
* 'scales'
* 'significance'.
"""
pixel_scale = counts.geom.pixel_scales.mean()
kernels = self.kernels(pixel_scale)
cubes = {}
cubes["counts"] = scale_cube(counts.data, kernels)
if background is not None:
cubes["background"] = scale_cube(background.data, kernels)
else:
# TODO: Estimate background with asmooth method
raise ValueError("Background estimation required.")
if exposure is not None:
flux = (counts - background) / exposure
cubes["flux"] = scale_cube(flux.data, kernels)
cubes["significance"] = self._significance_cube(
cubes, method=self.parameters["method"]
)
smoothed = self._reduce_cubes(cubes, kernels)
result = {}
for key in ["counts", "background", "scale", "significance"]:
data = smoothed[key]
# set remaining pixels with significance < threshold to mean value
if key in ["counts", "background"]:
mask = np.isnan(data)
data[mask] = np.mean(locals()[key].data[mask])
result[key] = WcsNDMap(counts.geom, data, unit=counts.unit)
else:
result[key] = WcsNDMap(counts.geom, data, unit="deg")
if exposure is not None:
data = smoothed["flux"]
mask = np.isnan(data)
data[mask] = np.mean(flux.data[mask])
result["flux"] = WcsNDMap(counts.geom, data, unit=flux.unit)
return result
def _reduce_cubes(self, cubes, kernels):
"""
Combine scale cube to image.
Parameters
----------
cubes : dict
Data cubes
"""
p = self.parameters
shape = cubes["counts"].shape[:2]
smoothed = {}
# Init smoothed data arrays
for key in ["counts", "background", "scale", "significance", "flux"]:
smoothed[key] = np.tile(np.nan, shape)
for idx, scale in enumerate(p["scales"]):
# slice out 2D image at index idx out of cube
slice_ = np.s_[:, :, idx]
mask = np.isnan(smoothed["counts"])
mask = (cubes["significance"][slice_] > p["threshold"]) & mask
smoothed["scale"][mask] = scale
smoothed["significance"][mask] = cubes["significance"][slice_][mask]
# renormalize smoothed data arrays
norm = kernels[idx].array.sum()
for key in ["counts", "background"]:
smoothed[key][mask] = cubes[key][slice_][mask] / norm
if "flux" in cubes:
smoothed["flux"][mask] = cubes["flux"][slice_][mask] / norm
return smoothed
[docs] @staticmethod
def get_scales(n_scales, factor=np.sqrt(2), kernel=Gaussian2DKernel):
"""Create list of Gaussian widths."""
if kernel == Gaussian2DKernel:
sigma_0 = 1.0 / np.sqrt(9 * np.pi)
elif kernel == Tophat2DKernel:
sigma_0 = 1.0 / np.sqrt(np.pi)
return sigma_0 * factor ** np.arange(n_scales)