Source code for gammapy.detect.test_statistics

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Functions to compute TS images."""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
from time import time
import warnings
from collections import OrderedDict
from itertools import product
from functools import partial
from multiprocessing import Pool, cpu_count
import numpy as np
from astropy.convolution import Model2DKernel, Gaussian2DKernel, CustomKernel, Kernel2D
from astropy.convolution.kernels import _round_up_to_odd_integer
from astropy.io import fits
from ..utils.array import shape_2N, symmetric_crop_pad_width
from ..irf import multi_gauss_psf_kernel
from ..image import measure_containment_radius, SkyImageList, SkyImage
from ..image.models import Shell2D
from ._test_statistics_cython import (_cash_cython, _amplitude_bounds_cython,
                                      _cash_sum_cython, _f_cash_root_cython,
                                      _x_best_leastsq)

__all__ = [
    'compute_ts_image',
    'compute_ts_image_multiscale',
    'compute_maximum_ts_image',
]

log = logging.getLogger(__name__)

FLUX_FACTOR = 1e-12
MAX_NITER = 20
CONTAINMENT = 0.8


def _extract_array(array, shape, position):
    """Helper function to extract parts of a larger array.

    Simple implementation of an array extract function , because
    `~astropy.ndata.utils.extract_array` introduces too much overhead.`

    Parameters
    ----------
    array : `~numpy.ndarray`
        The array from which to extract.
    shape : tuple or int
        The shape of the extracted array.
    position : tuple of numbers or number
        The position of the small array's center with respect to the
        large array.
    """
    x_width = shape[0] // 2
    y_width = shape[0] // 2
    y_lo = position[0] - y_width
    y_hi = position[0] + y_width + 1
    x_lo = position[1] - x_width
    x_hi = position[1] + x_width + 1
    return array[y_lo:y_hi, x_lo:x_hi]


def f_cash(x, counts, background, model):
    """Wrapper for cash statistics, that defines the model function.

    Parameters
    ----------
    x : float
        Model amplitude.
    counts : `~numpy.ndarray`
        Count image slice, where model is defined.
    background : `~numpy.ndarray`
        Background image slice, where model is defined.
    model : `~numpy.ndarray`
        Source template (multiplied with exposure).
    """
    return _cash_sum_cython(counts, background + x * FLUX_FACTOR * model)


[docs]def compute_ts_image_multiscale(images, psf_parameters, scales=[0], downsample='auto', residual=False, morphology='Gaussian2D', width=None, *args, **kwargs): """Compute multi-scale TS images using ``compute_ts_image``. High level TS image computation using a multi-Gauss PSF kernel and assuming a given source morphology. To optimize the performance the input data can be sampled down when computing TS images on larger scales. Parameters ---------- images : `~gammapy.image.SkyImageList` Image collection containing the data. Must contain the following: * 'counts', Counts image * 'background', Background image * 'exposure', Exposure image psf_parameters : dict Dict defining the multi gauss PSF parameters. See `~gammapy.irf.multi_gauss_psf` for details. scales : list ([0]) List of scales to use for TS image computation. downsample : int ('auto') Down sampling factor. Can be set to 'auto' if the down sampling factor should be chosen automatically. residual : bool (False) Compute a TS residual image. morphology : str ('Gaussian2D') Source morphology assumption. Either 'Gaussian2D' or 'Shell2D'. Returns ------- multiscale_result : list List of `~gammapy.image.SkyImageList` objects. """ BINSZ = abs(images['counts'].wcs.wcs.cdelt[0]) shape = images['counts'].data.shape multiscale_result = [] for scale in scales: log.info('Computing {0}TS image for scale {1:.3f} deg and {2}' ' morphology.'.format('residual ' if residual else '', scale, morphology)) # Sample down and require that scale parameters is at least 5 pix if downsample == 'auto': factor = int(np.select([scale < 5 * BINSZ, scale < 10 * BINSZ, scale < 20 * BINSZ, scale < 40 * BINSZ], [1, 2, 4, 4], 8)) else: factor = int(downsample) if factor == 1: log.info('No down sampling used.') downsampled = False else: if morphology == 'Shell2D': factor /= 2 log.info('Using down sampling factor of {0}'.format(factor)) downsampled = True funcs = [np.nansum, np.mean, np.nansum, np.nansum, np.nansum] images2 = SkyImageList() for name, func in zip(images.names, funcs): if downsampled: pad_width = symmetric_crop_pad_width(shape, shape_2N(shape)) images2[name] = images[name].pad(pad_width) images2[name] = images2[name].downsample(factor, func) else: images2[name] = images[name] # Set up PSF and source kernel kernel = multi_gauss_psf_kernel(psf_parameters, BINSZ=BINSZ, NEW_BINSZ=BINSZ * factor, mode='oversample') if scale > 0: from astropy.convolution import convolve sigma = scale / (BINSZ * factor) if morphology == 'Gaussian2D': source_kernel = Gaussian2DKernel(sigma, mode='oversample') elif morphology == 'Shell2D': model = Shell2D(1, 0, 0, sigma, sigma * width) x_size = _round_up_to_odd_integer(2 * sigma * (1 + width) + kernel.shape[0] / 2) source_kernel = Model2DKernel(model, x_size=x_size, mode='oversample') else: raise ValueError('Unknown morphology: {}'.format(morphology)) kernel = convolve(source_kernel, kernel) kernel.normalize() if residual: images2['background'].data += images2['model'].data # Compute TS image ts_results = compute_ts_image( images2['counts'], images2['background'], images2['exposure'], kernel, *args, **kwargs ) log.info('TS image computation took {0:.1f} s \n'.format(ts_results.meta['runtime'])) ts_results.meta['MORPH'] = (morphology, 'Source morphology assumption') ts_results.meta['SCALE'] = (scale, 'Source morphology size scale in deg') if downsampled: for name, order in zip(['ts', 'sqrt_ts', 'amplitude', 'niter'], [1, 1, 1, 0]): ts_results[name] = ts_results[name].upsample(factor, order=order) ts_results[name] = ts_results[name].crop(crop_width=pad_width) multiscale_result.append(ts_results) return multiscale_result
[docs]def compute_maximum_ts_image(ts_image_results): """Compute maximum TS image across a list of given TS images. Parameters ---------- ts_image_results : list List of `~gammapy.image.SkyImageList` objects. Returns ------- images : `~gammapy.image.SkyImageList` Images (ts, niter, amplitude) """ # Get data ts = np.dstack([result.ts for result in ts_image_results]) niter = np.dstack([result.niter for result in ts_image_results]) amplitude = np.dstack([result.amplitude for result in ts_image_results]) scales = [result.scale for result in ts_image_results] # Set up max arrays ts_max = np.max(ts, axis=2) scale_max = np.zeros(ts.shape[:-1]) niter_max = np.zeros(ts.shape[:-1]) amplitude_max = np.zeros(ts.shape[:-1]) for idx_scale, scale in enumerate(scales): index = np.where(ts[:, :, idx_scale] == ts_max) scale_max[index] = scale niter_max[index] = niter[:, :, idx_scale][index] amplitude_max[index] = amplitude[:, :, idx_scale][index] meta = OrderedDict() meta['MORPH'] = (ts_image_results[0].morphology, 'Source morphology assumption') return SkyImageList([ SkyImage(name='ts', data=ts_max.astype('float32')), SkyImage(name='niter', data=niter_max.astype('int16')), SkyImage(name='amplitude', data=amplitude_max.astype('float32')), ], meta=meta)
[docs]def compute_ts_image(counts, background, exposure, kernel, mask=None, flux=None, method='root brentq', parallel=True, threshold=None): """Compute TS image using different optimization methods. Parameters ---------- counts : `~gammapy.image.SkyImage` Counts image. background : `~gammapy.image.SkyImage` Background image exposure : `~gammapy.image.SkyImage` Exposure image kernel : `astropy.convolution.Kernel2D` or 2D `~numpy.ndarray` Source model kernel. flux : float (None) Flux image used as a starting value for the amplitude fit. method : str ('root') The following options are available: * ``'root brentq'`` (default) Fit amplitude finding roots of the the derivative of the fit statistics. Described in Appendix A in Stewart (2009). * ``'root newton'`` TODO: document * ``'leastsq iter'`` TODO: document parallel : bool (True) Whether to use multiple cores for parallel processing. threshold : float (None) If the TS value corresponding to the initial flux estimate is not above this threshold, the optimizing step is omitted to save computing time. Returns ------- images : `~gammapy.image.SkyImageList` Images (ts, niter, amplitude) Notes ----- Negative :math:`TS` values are defined as following: .. math:: TS = \\left \\{ \\begin{array}{ll} -TS & : \\textnormal{if} \\ F < 0 \\\\ \\ \\ TS & : \\textnormal{else} \\end{array} \\right. Where :math:`F` is the fitted flux amplitude. References ---------- [Stewart2009]_ """ t_0 = time() log.info("Using method '{}'".format(method)) if not isinstance(kernel, Kernel2D): kernel = CustomKernel(kernel) wcs = counts.wcs.deepcopy() # Parse data type counts = counts.data.astype(float) background = background.data.astype(float) exposure = exposure.data.astype(float) assert counts.shape == background.shape assert counts.shape == exposure.shape # in some image there are pixels, which have exposure, but zero # background, which doesn't make sense and causes the TS computation # to fail, this is a temporary fix mask_ = np.logical_and(background == 0, exposure > 0) if mask_.any(): log.warning('There are pixels in the data, that have exposure, but ' 'zero background, which can cause the ts computation to ' 'fail. Setting exposure of this pixels to zero.') exposure[mask_] = 0 if (flux is None and method != 'root brentq') or threshold is not None: from scipy.signal import fftconvolve with np.errstate(invalid='ignore', divide='ignore'): flux = (counts - background) / exposure / FLUX_FACTOR flux[~np.isfinite(flux)] = 0 flux = fftconvolve(flux, kernel.array, mode='same') / np.sum(kernel.array ** 2) # Compute null statistics for the whole image c_0_image = _cash_cython(counts, background) x_min, x_max = kernel.shape[1] // 2, counts.shape[1] - kernel.shape[1] // 2 y_min, y_max = kernel.shape[0] // 2, counts.shape[0] - kernel.shape[0] // 2 positions = product(range(y_min, y_max), range(x_min, x_max)) # Positions where exposure == 0 are not processed if mask is None: mask = exposure > 0 positions = [(j, i) for j, i in positions if mask[j][i]] wrap = partial(_ts_value, counts=counts, exposure=exposure, background=background, c_0_image=c_0_image, kernel=kernel, flux=flux, method=method, threshold=threshold) if parallel: log.info('Using {0} cores to compute TS image.'.format(cpu_count())) pool = Pool() results = pool.map(wrap, positions) pool.close() pool.join() else: results = map(wrap, positions) assert positions, ("Positions are empty: possibly kernel " + "{} is larger than counts {}".format(kernel.shape, counts.shape)) # Set TS values at given positions j, i = zip(*positions) ts = np.ones(counts.shape) * np.nan amplitudes = np.ones(counts.shape) * np.nan niter = np.ones(counts.shape) * np.nan ts[j, i] = [_[0] for _ in results] amplitudes[j, i] = [_[1] for _ in results] niter[j, i] = [_[2] for _ in results] # Handle negative TS values with np.errstate(invalid='ignore', divide='ignore'): sqrt_ts = np.where(ts > 0, np.sqrt(ts), -np.sqrt(-ts)) runtime = np.round(time() - t_0, 2) meta = OrderedDict(runtime=runtime) return SkyImageList([ SkyImage(name='ts', data=ts.astype('float32'), wcs=wcs), SkyImage(name='sqrt_ts', data=sqrt_ts.astype('float32'), wcs=wcs), SkyImage(name='amplitude', data=amplitudes.astype('float32'), wcs=wcs), SkyImage(name='niter', data=niter.astype('int16'), wcs=wcs), ], meta=meta)
def _ts_value(position, counts, exposure, background, c_0_image, kernel, flux, method, threshold): """Compute TS value at a given pixel position. Uses approach described in Stewart (2009). Parameters ---------- position : tuple (i, j) Pixel position. counts : `~numpy.ndarray` Counts image background : `~numpy.ndarray` Background image exposure : `~numpy.ndarray` Exposure image kernel : `astropy.convolution.Kernel2D` Source model kernel flux : `~numpy.ndarray` Flux image. The flux value at the given pixel position is used as starting value for the minimization. Returns ------- TS : float TS value at the given pixel position. """ # Get data slices counts_ = _extract_array(counts, kernel.shape, position) background_ = _extract_array(background, kernel.shape, position) exposure_ = _extract_array(exposure, kernel.shape, position) c_0_ = _extract_array(c_0_image, kernel.shape, position) model = (exposure_ * kernel._array) c_0 = c_0_.sum() if threshold is not None: with np.errstate(invalid='ignore', divide='ignore'): c_1 = f_cash(flux[position], counts_, background_, model) # Don't fit if pixel significance is low if c_0 - c_1 < threshold: return c_0 - c_1, flux[position] * FLUX_FACTOR, 0 if method == 'root brentq': amplitude, niter = _root_amplitude_brentq(counts_, background_, model) elif method == 'root newton': amplitude, niter = _root_amplitude(counts_, background_, model, flux[position]) elif method == 'leastsq iter': amplitude, niter = _leastsq_iter_amplitude(counts_, background_, model) else: raise ValueError('Invalid method: {}'.format(method)) with np.errstate(invalid='ignore', divide='ignore'): c_1 = f_cash(amplitude, counts_, background_, model) # Compute and return TS value return (c_0 - c_1) * np.sign(amplitude), amplitude * FLUX_FACTOR, niter def _leastsq_iter_amplitude(counts, background, model, maxiter=MAX_NITER, rtol=0.001): """Fit amplitude using an iterative least squares algorithm. Parameters ---------- counts : `~numpy.ndarray` Slice of counts image background : `~numpy.ndarray` Slice of background image model : `~numpy.ndarray` Model template to fit. maxiter : int Maximum number of iterations. rtol : float Relative flux error. Returns ------- amplitude : float Fitted flux amplitude. niter : int Number of function evaluations needed for the fit. """ bounds = _amplitude_bounds_cython(counts, background, model) amplitude_min, amplitude_max, amplitude_min_total = bounds if not counts.sum() > 0: return amplitude_min_total, 0 weights = np.ones(model.shape) x_old = 0 for i in range(maxiter): x = _x_best_leastsq(counts, background, model, weights) if abs((x - x_old) / x) < rtol: return max(x / FLUX_FACTOR, amplitude_min_total), i + 1 else: weights = x * model + background x_old = x return max(x / FLUX_FACTOR, amplitude_min_total), MAX_NITER def _root_amplitude(counts, background, model, flux): """Fit amplitude by finding roots using newton algorithm. See Appendix A Stewart (2009). Parameters ---------- counts : `~numpy.ndarray` Slice of count image background : `~numpy.ndarray` Slice of background image model : `~numpy.ndarray` Model template to fit. flux : float Starting value for the fit. Returns ------- amplitude : float Fitted flux amplitude. niter : int Number of function evaluations needed for the fit. """ from scipy.optimize import newton args = (counts, background, model) with warnings.catch_warnings(): warnings.simplefilter("ignore") try: return newton(_f_cash_root_cython, flux, args=args, maxiter=MAX_NITER, tol=1e-2), 0 except RuntimeError: # Where the root finding fails NaN is set as amplitude return np.nan, MAX_NITER def _root_amplitude_brentq(counts, background, model): """Fit amplitude by finding roots using Brent algorithm. See Appendix A Stewart (2009). Parameters ---------- counts : `~numpy.ndarray` Slice of count image background : `~numpy.ndarray` Slice of background image model : `~numpy.ndarray` Model template to fit. Returns ------- amplitude : float Fitted flux amplitude. niter : int Number of function evaluations needed for the fit. """ from scipy.optimize import brentq # Compute amplitude bounds and assert counts > 0 bounds = _amplitude_bounds_cython(counts, background, model) amplitude_min, amplitude_max, amplitude_min_total = bounds if not counts.sum() > 0: return amplitude_min_total, 0 args = (counts, background, model) with warnings.catch_warnings(): warnings.simplefilter("ignore") try: result = brentq(_f_cash_root_cython, amplitude_min, amplitude_max, args=args, maxiter=MAX_NITER, full_output=True, rtol=1e-3) return max(result[0], amplitude_min_total), result[1].iterations except (RuntimeError, ValueError): # Where the root finding fails NaN is set as amplitude return np.nan, MAX_NITER def _flux_correlation_radius(kernel, containment=CONTAINMENT): """Compute equivalent top-hat kernel radius for a given kernel instance and containment fraction. Parameters ---------- kernel : `astropy.convolution.Kernel2D` Astropy kernel instance. containment : float (default = 0.8) Containment fraction. Returns ------- kernel : float Equivalent Tophat kernel radius. """ kernel_image = fits.ImageHDU(kernel.array) y, x = kernel.center r_c = measure_containment_radius(kernel_image, x, y, containment) # Containment radius of Tophat kernel is given by r_c_tophat = r_0 * sqrt(C) # by setting r_c = r_c_tophat we can estimate the equivalent containment radius r_0 return r_c / np.sqrt(containment)