# Licensed under a 3-clause BSD style license - see LICENSE.rst
import copy
import logging
import numpy as np
import scipy.ndimage
import scipy.signal
from astropy.convolution import Gaussian2DKernel, MexicanHat2DKernel
from astropy.io import fits
from astropy.table import Table
from gammapy.maps import MapAxis, WcsGeom, WcsNDMap
__all__ = ["CWT", "CWTData", "CWTKernels"]
log = logging.getLogger(__name__)
def difference_of_gauss_kernel(radius, scale_step, n_sigmas=8):
"""Difference of 2 Gaussians (i.e. Mexican hat) kernel array.
TODO: replace by http://astropy.readthedocs.io/en/latest/api/astropy.convolution.MexicanHat2DKernel.html
once there are tests in place that establish the algorithm
"""
sizex = int(n_sigmas * scale_step * radius)
sizey = int(n_sigmas * scale_step * radius)
radius = float(radius)
xc = 0.5 * sizex
yc = 0.5 * sizey
y, x = np.mgrid[0 : sizey - 1, 0 : sizex - 1]
x = x - xc
y = y - yc
x1 = x / radius
y1 = y / radius
g1 = np.exp(-0.5 * (x1 ** 2 + y1 ** 2))
g1 = g1 / (2 * np.pi * radius ** 2)
x1 = x1 / scale_step
y1 = y1 / scale_step
g2 = np.exp(-0.5 * (x1 ** 2 + y1 ** 2))
g2 = g2 / (2 * np.pi * radius ** 2 * scale_step ** 2)
return g1 - g2
[docs]class CWT:
"""Continuous wavelet transform.
TODO: describe algorithm (modify the words below)
Depending on their spectral index, sources won't have the same characteristic scale.
Therefore to detect sources, we need to compute the wavelet transform at several
scales in order to search for various PSF sizes. Then for each scale, the wavelet
transform values under the given significance threshold are rejected. This gives us
a multiscale support. Then, using the reconstruction by continuous wavelet packets,
we obtain a filtered image yielding the detected sources.
To compute the threshold image for a given scale a, the standard EGRET diffuse
background model to which was added the flux of the extragalactic background,
and the exposure for the considered energy range were used.
Parameters
----------
kernels : `~gammapy.detect.CWTKernels`
Kernels for the algorithm.
max_iter : int, optional (default 10)
The maximum number of iterations of the CWT algorithm.
tol : float, optional (default 1e-5)
Tolerance for stopping criterion.
significance_threshold : float, optional (default 3.0)
Measure of statistical significance.
significance_island_threshold : float, optional (default None)
Measure is used for cleaning of isolated pixel islands
that are slightly above ``significance_threshold``.
remove_isolated : boolean, optional (default True)
If ``True``, isolated pixels will be removed.
keep_history : boolean, optional (default False)
Save cwt data from all the iterations.
References
----------
R. Terrier et al (2001) "Wavelet analysis of EGRET data"
See https://ui.adsabs.harvard.edu/abs/2001ICRC....7.2923T
"""
def __init__(
self,
kernels,
max_iter=10,
tol=1e-5,
significance_threshold=3.0,
significance_island_threshold=None,
remove_isolated=True,
keep_history=False,
):
self.kernels = kernels
self.max_iter = max_iter
self.tol = tol
self.significance_threshold = significance_threshold
self.significance_island_threshold = significance_island_threshold
self.remove_isolated = remove_isolated
self.history = [] if keep_history else None
# previous_variance is initialized on the first iteration
self.previous_variance = None
def _execute_iteration(self, data):
"""Do one iteration of the algorithm.
Parameters
----------
data : `~gammapy.detect.CWTData`
Images.
"""
self._transform(data=data)
self._compute_support(data=data)
self._inverse_transform(data=data)
def _transform(self, data):
"""Do the transform itself.
The transform is made by using `scipy.signal.fftconvolve`.
TODO: document.
Parameters
----------
data : `~gammapy.detect.CWTData`
Images for transform.
"""
total_background = data._model + data._background + data._approx
excess = data._counts - total_background
log.debug(f"Excess sum: {excess.sum():.4f}")
log.debug(f"Excess max: {excess.max():.4f}")
log.debug("Computing transform and error")
for idx_scale, kern in self.kernels.kern_base.items():
data._transform_3d[idx_scale] = scipy.signal.fftconvolve(
excess, kern, mode="same"
)
data._error[idx_scale] = np.sqrt(
scipy.signal.fftconvolve(total_background, kern ** 2, mode="same")
)
log.debug(f"Error sum: {data._error.sum():.4f}")
log.debug(f"Error max: {data._error.max():.4f}")
log.debug("Computing approx and approx_bkg")
data._approx = scipy.signal.fftconvolve(
data._counts - data._model - data._background,
self.kernels.kern_approx,
mode="same",
)
data._approx_bkg = scipy.signal.fftconvolve(
data._background, self.kernels.kern_approx, mode="same"
)
log.debug(f"Approximate sum: {data._approx.sum():.4f}")
log.debug(f"Approximate background sum: {data._approx_bkg.sum():.4f}")
def _compute_support(self, data):
"""Compute the multiresolution support with hard sigma clipping.
Imposing a minimum significance on a connected region of significant pixels
(i.e. source detection).
TODO: document?
What's happening here:
- calc significance for all scales
- for each scale compute support:
- create mask, whether significance value more than significance_threshold
- find all separated structures on mask, label them (with help
of 'scipy.ndimage.label')
- for each structure do:
- find pixels coordinates of the structure
- if just one pixel, we can remove it from mask if we want
- if the max value of significance in that structure less
than significance_island_threshold, remove this structure from mask
- update support by mask
Parameters
----------
data : `~gammapy.detect.CWTData`
Images after transform.
"""
log.debug("Computing significance")
significance = data._transform_3d / data._error
log.debug("For each scale start to compute support")
for idx_scale in range(self.kernels.n_scale):
log.debug(
"Start to compute support for scale "
"{:.2f}".format(self.kernels.scales[idx_scale])
)
log.debug(
"Create mask based on significance "
"threshold {:.2f}".format(self.significance_threshold)
)
mask = significance[idx_scale] > self.significance_threshold
# Produce a list of connex structures in the support
labeled_mask, n_structures = scipy.ndimage.label(mask)
for struct_label in range(n_structures):
coords = np.where(labeled_mask == struct_label + 1)
# Remove isolated pixels from support
if self.remove_isolated and coords[0].size == 1:
log.debug("Remove isolated pixels from support")
mask[coords] = False
if self.significance_island_threshold is not None:
# If maximal significance of the structure does not reach significance
# island threshold, remove significant pixels island from support
struct_signif = significance[idx_scale][coords]
if struct_signif.max() < self.significance_island_threshold:
log.debug(
"Remove significant pixels island {} from support".format(
struct_label + 1
)
)
mask[coords] = False
log.debug(
"Update support for scale {:.2f}".format(self.kernels.scales[idx_scale])
)
data._support[idx_scale] |= mask
log.debug("Support sum: {}".format(data._support.sum()))
def _inverse_transform(self, data):
"""Do the inverse transform (reconstruct the image).
TODO: describe better what this does.
Parameters
----------
data : `~gammapy.detect.CWTData`
Images for inverse transform.
"""
data._transform_2d = np.sum(data._support * data._transform_3d, axis=0)
log.debug("Update model")
data._model += data._transform_2d * (data._transform_2d > 0)
log.debug("Model sum: {:.4f}".format(data._model.sum()))
log.debug("Model max: {:.4f}".format(data._model.max()))
def _is_converged(self, data):
"""Check if the algorithm has converged on current iteration.
TODO: document metric used, but not super important.
Parameters
----------
data : `~gammapy.detect.CWTData`
Images after iteration.
Returns
-------
answer : boolean
Answer if CWT has converged.
"""
log.debug("Check the convergence")
residual = data._counts - (data._model + data._approx)
variance = residual.var()
log.info("Residual sum: {:.4f}".format(residual.sum()))
log.info("Residual max: {:.4f}".format(residual.max()))
log.info("Residual variance: {:.4f}".format(residual.var()))
if self.previous_variance is None:
self.previous_variance = variance
return False
variance_ratio = abs(
(self.previous_variance - variance) / self.previous_variance
)
log.info(f"Variance ratio: {variance_ratio:.7f}")
self.previous_variance = variance
return variance_ratio < self.tol
[docs] def analyze(self, data):
"""Run iterative filter peak algorithm.
The algorithm modifies the original data.
Parameters
----------
data : `~gammapy.detect.CWTData`
Input images.
"""
if self.history is not None:
self.history = [copy.deepcopy(data)]
for n_iter in range(self.max_iter):
log.info(f"************ Start iteration {n_iter + 1} ************")
self._execute_iteration(data=data)
if self.history is not None:
log.debug("Save current data")
self.history.append(copy.deepcopy(data))
converge_answer = self._is_converged(data=data)
if converge_answer:
break
if converge_answer:
log.info(f"Convergence reached at iteration {n_iter + 1}")
else:
log.info(f"Convergence not formally reached at iteration {n_iter + 1}")
[docs]class CWTKernels:
"""Conduct arrays of kernels and scales for CWT algorithm.
Parameters
----------
n_scale : int
Number of scales.
min_scale : float
First scale used.
step_scale : float
Base scaling factor.
old : boolean (default False)
DEBUG attribute. If False, use astropy MaxicanHat kernels for kernel_base.
Attributes
----------
n_scale : int
Number of scales considered.
min_scale : float
First scale used.
step_scale : float
Base scaling factor.
scales : `~numpy.ndarray`
Grid of scales.
kern_base : dict
Dictionary of scale powers as keys and 2D kernel arrays.
(mexican hat) as values
kern_approx : `~numpy.ndarray`
2D Gaussian kernel array from maximum scale.
Examples
--------
>>> from gammapy.detect import CWTKernels
>>> kernels = CWTKernels(n_scale=3, min_scale=2.0, step_scale=2.6)
>>> print (kernels.info_table)
Name Source
---------------------------------- ----------------------
Number of scales 3
Minimal scale 2.0
Step scale 2.6
Scales [ 2. 5.2 13.52]
Kernels approx width 280
Kernels approx sum 0.99986288557
Kernels approx max 0.00012877518599
Kernels base width for 2.0 scale 40
Kernels base sum for 2.0 scale 0.000305108917065
Kernels base max for 2.0 scale 0.0315463182128
Kernels base width for 5.2 scale 107
Kernels base sum for 5.2 scale 0.000158044776015
Kernels base max for 5.2 scale 0.0050152112595
Kernels base width for 13.52 scale 280
Kernels base sum for 13.52 scale 0.000137114430344
Kernels base max for 13.52 scale 0.000740731187317
"""
def __init__(self, n_scale, min_scale, step_scale, old=False):
self.n_scale = n_scale
self.min_scale = min_scale
self.step_scale = step_scale
self.scales = np.array(
[min_scale * step_scale ** _ for _ in range(n_scale)], dtype=float
)
self.kern_base = {}
for idx_scale, scale in enumerate(self.scales):
if old:
self.kern_base[idx_scale] = difference_of_gauss_kernel(
scale, step_scale
)
else:
self.kern_base[idx_scale] = MexicanHat2DKernel(scale * step_scale).array
max_scale = min_scale * step_scale ** n_scale
self.kern_approx = Gaussian2DKernel(max_scale).array
def _info(self):
"""Return information about the object as a dict.
Returns
-------
info : dict
Information about object with str characteristic as keys and
characteristic results as values.
"""
info_dict = {}
info_dict["Number of scales"] = self.n_scale
info_dict["Minimal scale"] = self.min_scale
info_dict["Step scale"] = self.step_scale
info_dict["Scales"] = str(self.scales)
info_dict["Kernels approx width"] = len(self.kern_approx)
info_dict["Kernels approx sum"] = self.kern_approx.sum()
info_dict["Kernels approx max"] = self.kern_approx.max()
for idx_scale, scale in enumerate(self.scales):
info_dict[f"Kernels base width for {scale} scale"] = len(
self.kern_base[idx_scale]
)
info_dict[f"Kernels base sum for {scale} scale"] = self.kern_base[
idx_scale
].sum()
info_dict[f"Kernels base max for {scale} scale"] = self.kern_base[
idx_scale
].max()
return info_dict
@property
def info_table(self):
"""Summary info table about the object.
Returns
-------
table : `~astropy.table.Table`
Information about the object.
"""
info_dict = self._info()
rows = []
for name in info_dict:
rows.append({"Name": name, "Source": info_dict[name]})
return Table(rows=rows, names=["Name", "Source"])
[docs]class CWTData:
"""Images for CWT algorithm.
Contains also input counts and background.
Parameters
----------
counts : `~gammapy.maps.WcsNDMap`
2D counts image.
background : `~gammapy.maps.WcsNDMap`
2D background image.
n_scale : int
Number of scales.
Examples
--------
>>> from gammapy.maps import Map
>>> from gammapy.detect import CWTData
>>> counts = Map.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-counts.fits.gz")
>>> background = Map.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-background.fits.gz")
>>> data = CWTData(counts=counts, background=background, n_scale=2)
"""
def __init__(self, counts, background, n_scale):
self._counts = np.array(counts.data, dtype=float)
self._background = np.array(background.data, dtype=float)
self._geom2d = counts.geom.copy()
scale_axis = MapAxis(np.arange(n_scale + 1))
self._geom3d = WcsGeom(
wcs=counts.geom.wcs, npix=counts.geom.npix, axes=[scale_axis]
)
shape_2d = self._counts.shape
self._model = np.zeros(shape_2d)
self._approx = np.zeros(shape_2d)
self._approx_bkg = np.zeros(shape_2d)
self._transform_2d = np.zeros(shape_2d)
shape_3d = n_scale, shape_2d[0], shape_2d[1]
self._transform_3d = np.zeros(shape_3d)
self._error = np.zeros(shape_3d)
self._support = np.zeros(shape_3d, dtype=bool)
@property
def counts(self):
"""2D counts input image (`~gammapy.maps.WcsNDMap`)."""
return WcsNDMap(self._geom2d, self._counts)
@property
def background(self):
"""2D background input image (`~gammapy.maps.WcsNDMap`)."""
return WcsNDMap(self._geom2d, self._background)
@property
def model(self):
"""2D model image (`~gammapy.maps.WcsNDMap`).
Positive version of transform_2d image.
Primordial initialized by zero array.
"""
return WcsNDMap(self._geom2d, self._model)
@property
def approx(self):
"""2D approx ??? image (`~gammapy.maps.WcsNDMap`).
In the course of iterations updated by convolution of
``counts - model - background`` with ``kern_approx``
Primordial initialized by zero array.
"""
return WcsNDMap(self._geom2d, self._approx)
@property
def approx_bkg(self):
"""2D approx bkg image (`~gammapy.maps.WcsNDMap`).
In the course of iterations updated by convolution of ``background`` with ``kern_approx``.
Primordial initialized by zero array.
"""
return WcsNDMap(self._geom2d, self._approx_bkg)
@property
def transform_2d(self):
"""2D transform ??? image (`~gammapy.maps.WcsNDMap`).
Created from transform_3d by summarize values per 0 axes.
Primordial initialized by zero array.
"""
return WcsNDMap(self._geom2d, self._transform_2d)
@property
def support_2d(self):
"""2D cube exclusion mask (`~gammapy.maps.WcsNDMap`).
Created from support_3d by OR-operation per 0 axis.
"""
support_2d = self._support.sum(0) > 0
return WcsNDMap(self._geom2d, support_2d)
@property
def residual(self):
"""2D residual image (`~gammapy.maps.WcsNDMap`).
Calculate as ``counts - model - approx``.
"""
residual = self._counts - (self._model + self._approx)
return WcsNDMap(self._geom2d, residual)
@property
def model_plus_approx(self):
"""TODO: document what this is."""
return WcsNDMap(self._geom2d, self._model + self._approx)
@property
def transform_3d(self):
"""3D transform ??? cube (`~gammapy.maps.WcsNDMap`).
Primordial initialized by zero array. In the course of
iterations updated by convolution of ``counts - total_background`` with kernel
for each scale (``total_background = model + background + approx``).
"""
return WcsNDMap(self._geom3d, self._transform_3d)
@property
def error(self):
"""3D error cube (`~gammapy.maps.WcsNDMap`).
Primordial initialized by zero array.
In the course of iterations updated by convolution of ``total_background``
with kernel^2 for each scale.
"""
return WcsNDMap(self._geom3d, self._error)
@property
def support_3d(self):
"""3D support (exclusion) cube (`~gammapy.maps.WcsNDMap`).
Primordial initialized by zero array.
"""
return WcsNDMap(self._geom3d, self._support)
@property
def max_scale_image(self):
"""Maximum scale image (`~gammapy.maps.WcsNDMap`)."""
# Previous version:
# idx_scale_max = np.argmax(self._transform_3d, axis=0)
# return kernels.scales[idx_scale_max] * (self._support.sum(0) > 0)
transform_2d_max = np.max(self._transform_3d, axis=0)
maximal_image = transform_2d_max * self.support_2d.data
return WcsNDMap(self._geom2d, maximal_image)
def __sub__(self, other):
data = CWTData(
counts=self.counts,
background=self.background,
n_scale=len(self._transform_3d),
)
data._model = self._model - other._model
data._approx = self._approx - other._approx
data._approx_bkg = self._approx_bkg - other._approx_bkg
data._transform_2d = self._transform_2d - other._transform_2d
data._transform_3d = self._transform_3d - other._transform_3d
data._error = self._error - other._error
data._support = self._support ^ other._support
return data
[docs] def images(self):
"""All the images in a dict.
Returns
-------
images : dict
Dictionary with keys {'counts', 'background', 'model', 'approx',
'approx_bkg', 'transform_2d', 'maximal', 'support_2d'}
and 2D `~numpy.ndarray` images as values.
"""
return dict(
counts=self.counts,
background=self.background,
model=self.model,
approx=self.approx,
approx_bkg=self.approx_bkg,
transform_2d=self.transform_2d,
model_plus_approx=self.model_plus_approx,
residual=self.residual,
maximal=self.max_scale_image,
support_2d=self.support_2d,
)
[docs] def cubes(self):
"""All the cubes in a dict.
Returns
-------
cubes : dict
Dictionary with keys {'transform_3d', 'error', 'support_3d'} and 3D
`~numpy.ndarray` cubes as values.
"""
return dict(
transform_3d=self.transform_3d, error=self.error, support=self.support_3d
)
@staticmethod
def _metrics_info(data, name):
"""Compute variance, mean, find max and min values and compute sum for given data.
Parameters
----------
data : `~numpy.ndarray`
2D image or 3D cube.
name : string
Name of the data.
Returns
-------
info : dict
The information about the data.
"""
return {
"Name": name,
"Shape": "2D image" if len(data.shape) == 2 else "3D cube",
"Variance": data.var(),
"Mean": data.mean(),
"Max value": data.max(),
"Min value": data.min(),
"Sum values": data.sum(),
}
[docs] def image_info(self, name):
"""Compute image info.
Compute variance, mean, find max and min values and compute sum for image with given name.
Return that information about the image.
Parameters
----------
name : string
Name of the image. Name can be as one of the follow: {'counts', 'background',
'model', 'approx', 'approx_bkg', 'transform_2d', 'model_plus_approx', 'residual',
'maximal', 'support_2d'}
Returns
-------
table : `~astropy.table.Table`
Information about the object.
"""
if name not in self.images():
raise ValueError(
"Incorrect name of image. It should be one of the following:"
"{'counts', 'background', 'model', 'approx', 'approx_bkg', "
"'transform_2d', 'model_plus_approx', 'residual', 'support_2d', "
"'maximal'}"
)
image = self.images()[name]
info_dict = self._metrics_info(data=image.data, name=name)
rows = []
for metric in info_dict:
rows.append({"Metrics": metric, "Source": info_dict[metric]})
return Table(rows=rows, names=["Metrics", "Source"])
[docs] def cube_info(self, name, per_scale=False):
"""Compute cube info.
Compute variance, mean, find max and min values and compute sum for image with given name.
Return that information about the image.
Parameters
----------
name : string
Name of the image. Name can be as one of the follow: {'transform_3d', 'error', 'support'}
per_scale : boolean, optional (default False)
If True, return information about the cube per all the scales.
Returns
-------
table : `~astropy.Table`
Information about the object.
"""
if name not in self.cubes():
raise ValueError(
"Incorrect name of cube. It should be one of the following:"
"{'transform_3d', 'error', 'support'}"
)
cube = self.cubes()[name]
rows = []
if per_scale:
mask = []
for index in range(len(cube.data)):
info_dict = self._metrics_info(data=cube.data[index], name=name)
for metric in info_dict:
rows.append(
{
"Scale power": index + 1,
"Metrics": metric,
"Source": info_dict[metric],
}
)
# For missing values in `Power scale` column
scale_mask = np.ones(len(info_dict), dtype=bool)
scale_mask[0] = False
mask.extend(scale_mask)
columns = ["Scale power", "Metrics", "Source"]
table = Table(rows=rows, names=columns, masked=True)
table["Scale power"].mask = mask
elif per_scale is False:
info_dict = self._metrics_info(data=cube.data, name=name)
for metric in info_dict:
rows.append({"Metrics": metric, "Source": info_dict[metric]})
columns = ["Metrics", "Source"]
table = Table(rows=rows, names=columns)
else:
raise ValueError("Incorrect value for per_scale attribute.")
return table
@property
def info_table(self):
"""Information about all the images and cubes.
Returns
-------
table : `~astropy.Table`
Information about the object.
"""
rows = []
for name, image in self.images().items():
info_dict = self._metrics_info(data=image.data, name=name)
rows.append(info_dict)
for name, cube in self.cubes().items():
info_dict = self._metrics_info(data=cube.data, name=name)
rows.append(info_dict)
columns = rows[0].keys()
return Table(rows=rows, names=columns)
[docs] def write(self, filename, overwrite=False):
"""Save results to FITS file.
Parameters
----------
filename : str
Fits file name.
overwrite : bool, optional (default False)
If True, overwrite file with name as filename.
"""
header = self._geom2d.make_header()
hdu_list = fits.HDUList()
hdu_list.append(fits.PrimaryHDU())
hdu_list.append(fits.ImageHDU(data=self._counts, header=header, name="counts"))
hdu_list.append(
fits.ImageHDU(data=self._background, header=header, name="background")
)
hdu_list.append(fits.ImageHDU(data=self._model, header=header, name="model"))
hdu_list.append(fits.ImageHDU(data=self._approx, header=header, name="approx"))
hdu_list.append(
fits.ImageHDU(data=self._transform_2d, header=header, name="transform_2d")
)
hdu_list.append(
fits.ImageHDU(data=self._approx_bkg, header=header, name="approx_bkg")
)
hdu_list.writeto(filename, overwrite=overwrite)