Source code for gammapy.image.asmooth

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Implementation of adaptive smoothing algorithms."""
import numpy as np
from astropy.coordinates import Angle
from astropy.convolution import Gaussian2DKernel, Tophat2DKernel
from ..stats import significance
from .utils import scale_cube

__all__ = ["ASmooth"]


def _significance_asmooth(counts, background):
    """Significance according to formula (5) in asmooth paper."""
    return (counts - background) / np.sqrt(counts + background)


[docs]class ASmooth: """Adaptively smooth counts image. Achieves a roughly constant significance of features across the whole image. Algorithm based on https://ui.adsabs.harvard.edu/abs/2006MNRAS.368...65E The algorithm was slightly adapted to also allow Li & Ma and TS to estimate the significance of a feature in the image. Parameters ---------- kernel : `astropy.convolution.Kernel` Smoothing kernel. method : {'simple', 'asmooth', 'lima'} Significance estimation method. threshold : float Significance threshold. scales : `~astropy.units.Quantity` Smoothing scales. """ def __init__( self, kernel=Gaussian2DKernel, method="simple", threshold=5, scales=None ): self.parameters = { "kernel": kernel, "method": method, "threshold": threshold, "scales": scales, }
[docs] def kernels(self, pixel_scale): """ Ring kernels according to the specified method. Parameters ---------- pixel_scale : `~astropy.coordinates.Angle` Sky image pixel scale Returns ------- kernels : list List of `~astropy.convolution.Kernel` """ p = self.parameters scales = p["scales"].to("deg") / Angle(pixel_scale).deg kernels = [] for scale in scales.value: kernel = p["kernel"](scale, mode="oversample") # TODO: check if normalizing here makes sense kernel.normalize("peak") kernels.append(kernel) return kernels
@staticmethod def _significance_cube(cubes, method): if method in {"lima", "simple"}: scube = significance(cubes["counts"], cubes["background"], method="lima") elif method == "asmooth": scube = _significance_asmooth(cubes["counts"], cubes["background"]) elif method == "ts": raise NotImplementedError() else: raise ValueError( "Not a valid significance estimation method." " Choose one of the following: 'lima', 'simple'," " 'asmooth' or 'ts'" ) return scube
[docs] def run(self, counts, background=None, exposure=None): """ Run image smoothing. Parameters ---------- counts : `~gammapy.maps.WcsNDMap` Counts map background : `~gammapy.maps.WcsNDMap` Background map exposure : `~gammapy.maps.WcsNDMap` Exposure map Returns ------- images : dict of `~gammapy.maps.WcsNDMap` Smoothed images; keys are: * 'counts' * 'background' * 'flux' (optional) * 'scales' * 'significance'. """ from ..maps import WcsNDMap pixel_scale = counts.geom.pixel_scales.mean() kernels = self.kernels(pixel_scale) cubes = {} cubes["counts"] = scale_cube(counts.data, kernels) if background is not None: cubes["background"] = scale_cube(background.data, kernels) else: # TODO: Estimate background with asmooth method raise ValueError("Background estimation required.") if exposure is not None: flux = (counts.data - background.data) / exposure.data cubes["flux"] = scale_cube(flux, kernels) cubes["significance"] = self._significance_cube( cubes, method=self.parameters["method"] ) smoothed = self._reduce_cubes(cubes, kernels) result = {} for key in ["counts", "background", "scale", "significance"]: data = smoothed[key] # set remaining pixels with significance < threshold to mean value if key in ["counts", "background"]: mask = np.isnan(data) data[mask] = np.mean(locals()[key].data[mask]) result[key] = WcsNDMap(counts.geom, data) if exposure is not None: data = smoothed["flux"] mask = np.isnan(data) data[mask] = np.mean(flux[mask]) result["flux"] = WcsNDMap(counts.geom, data) return result
def _reduce_cubes(self, cubes, kernels): """ Combine scale cube to image. Parameters ---------- cubes : dict Data cubes """ p = self.parameters shape = cubes["counts"].shape[:2] smoothed = {} # Init smoothed data arrays for key in ["counts", "background", "scale", "significance", "flux"]: smoothed[key] = np.tile(np.nan, shape) for idx, scale in enumerate(p["scales"]): # slice out 2D image at index idx out of cube slice_ = np.s_[:, :, idx] mask = np.isnan(smoothed["counts"]) mask = (cubes["significance"][slice_] > p["threshold"]) & mask smoothed["scale"][mask] = scale smoothed["significance"][mask] = cubes["significance"][slice_][mask] # renormalize smoothed data arrays norm = kernels[idx].array.sum() for key in ["counts", "background"]: smoothed[key][mask] = cubes[key][slice_][mask] / norm if "flux" in cubes: smoothed["flux"][mask] = cubes["flux"][slice_][mask] / norm return smoothed
[docs] @staticmethod def make_scales(n_scales, factor=np.sqrt(2), kernel=Gaussian2DKernel): """Create list of Gaussian widths.""" if kernel == Gaussian2DKernel: sigma_0 = 1.0 / np.sqrt(9 * np.pi) elif kernel == Tophat2DKernel: sigma_0 = 1.0 / np.sqrt(np.pi) return sigma_0 * factor ** np.arange(n_scales)