# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Functions to compute TS images."""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import contextlib
import warnings
from functools import partial
from multiprocessing import Pool
import numpy as np
from scipy.optimize import newton, brentq
from astropy.convolution import CustomKernel, Kernel2D
from ..utils.array import shape_2N, symmetric_crop_pad_width
from ._test_statistics_cython import (
_cash_cython,
_amplitude_bounds_cython,
_cash_sum_cython,
_f_cash_root_cython,
_x_best_leastsq,
)
__all__ = ["TSMapEstimator"]
log = logging.getLogger(__name__)
FLUX_FACTOR = 1e-12
MAX_NITER = 20
RTOL = 1e-3
def _extract_array(array, shape, position):
"""Helper function to extract parts of a larger array.
Simple implementation of an array extract function , because
`~astropy.ndata.utils.extract_array` introduces too much overhead.`
Parameters
----------
array : `~numpy.ndarray`
The array from which to extract.
shape : tuple or int
The shape of the extracted array.
position : tuple of numbers or number
The position of the small array's center with respect to the
large array.
"""
x_width = shape[1] // 2
y_width = shape[0] // 2
y_lo = position[0] - y_width
y_hi = position[0] + y_width + 1
x_lo = position[1] - x_width
x_hi = position[1] + x_width + 1
return array[y_lo:y_hi, x_lo:x_hi]
def f_cash(x, counts, background, model):
"""Wrapper for cash statistics, that defines the model function.
Parameters
----------
x : float
Model amplitude.
counts : `~numpy.ndarray`
Count image slice, where model is defined.
background : `~numpy.ndarray`
Background image slice, where model is defined.
model : `~numpy.ndarray`
Source template (multiplied with exposure).
"""
return _cash_sum_cython(counts, background + x * FLUX_FACTOR * model)
[docs]class TSMapEstimator(object):
"""
Compute TS map using different optimization methods.
The map is computed fitting by a single parameter amplitude fit. The fit is
simplified by finding roots of the the derivative of the fit statistics using
various root finding algorithms. The approach is sescribed in Appendix A
in Stewart (2009).
Parameters
----------
method : str ('root')
The following options are available:
* ``'root brentq'`` (default)
Fit amplitude by finding the roots of the the derivative of the fit
statistics using the brentq method.
* ``'root newton'``
Fit amplitude by finding the roots of the the derivative of the fit
statistics using Newton's method.
* ``'leastsq iter'``
Fit the amplitude by an iterative least square fit, that can be solved
analytically.
error_method : ['covar', 'conf']
Error estimation method.
error_sigma : int (1)
Sigma for flux error.
ul_method : ['covar', 'conf']
Upper limit estimation method.
ul_sigma : int (2)
Sigma for flux upper limits.
n_jobs : int
Number of parallel jobs to use for the computation.
threshold : float (None)
If the TS value corresponding to the initial flux estimate is not above
this threshold, the optimizing step is omitted to save computing time.
rtol : float (0.001)
Relative precision of the flux estimate. Used as a stopping criterion for
the amplitude fit.
Notes
-----
Negative :math:`TS` values are defined as following:
.. math::
TS = \\left \\{
\\begin{array}{ll}
-TS & : \\textnormal{if} \\ F < 0 \\\\
\\ \\ TS & : \\textnormal{else}
\\end{array}
\\right.
Where :math:`F` is the fitted flux amplitude.
References
----------
[Stewart2009]_
"""
def __init__(
self,
method="root brentq",
error_method="covar",
error_sigma=1,
ul_method="covar",
ul_sigma=2,
n_jobs=1,
threshold=None,
rtol=0.001,
):
if method not in ["root brentq", "root newton", "leastsq iter"]:
raise ValueError("Not a valid method: '{}'".format(method))
if error_method not in ["covar", "conf"]:
raise ValueError("Not a valid error method '{}'".format(error_method))
self.parameters = {
"method": method,
"error_method": error_method,
"error_sigma": error_sigma,
"ul_method": ul_method,
"ul_sigma": ul_sigma,
"n_jobs": n_jobs,
"threshold": threshold,
"rtol": rtol,
}
[docs] @staticmethod
def flux_default(maps, kernel):
"""Estimate default flux map using a given kernel.
Parameters
----------
maps : dict
Input sky maps. Requires `counts`, `background` and `exposure` maps.
kernel : `astropy.convolution.Kernel2D`
Source model kernel.
Returns
-------
flux_approx : `WcsNDMap`
Approximate flux map.
"""
flux = (maps["counts"].data - maps["background"].data) / maps["exposure"].data
flux = maps["counts"].copy(data=flux / np.sum(kernel.array ** 2))
return flux.convolve(kernel.array)
[docs] @staticmethod
def mask_default(maps, kernel):
"""Compute default mask where to estimate TS values.
Parameters
----------
maps : dict
Input sky maps. Requires `background` and `exposure`.
kernel : `astropy.convolution.Kernel2D`
Source model kernel.
Returns
-------
mask : `WcsNDMap`
Mask map.
"""
mask = np.zeros(maps["exposure"].data.shape, dtype=int)
# mask boundary
slice_x = slice(kernel.shape[1] // 2, -kernel.shape[1] // 2 + 1)
slice_y = slice(kernel.shape[0] // 2, -kernel.shape[0] // 2 + 1)
mask[slice_y, slice_x] = 1
# positions where exposure == 0 are not processed
mask &= maps["exposure"].data > 0
# in some image there are pixels, which have exposure, but zero
# background, which doesn't make sense and causes the TS computation
# to fail, this is a temporary fix
mask[maps["background"].data == 0] = 0
return maps["exposure"].copy(data=mask.astype("int"))
[docs] @staticmethod
def sqrt_ts(map_ts):
r"""Compute sqrt(TS) map.
Compute sqrt(TS) as defined by:
.. math::
\sqrt{TS} = \left \{
\begin{array}{ll}
-\sqrt{-TS} & : \textnormal{if} \ TS < 0 \\
\sqrt{TS} & : \textnormal{else}
\end{array}
\right.
Parameters
----------
map_ts : `WcsNDMap`
Input TS map.
Returns
-------
sqrt_ts : `WcsNDMap`
Sqrt(TS) map.
"""
with np.errstate(invalid="ignore", divide="ignore"):
ts = map_ts.data
sqrt_ts = np.where(ts > 0, np.sqrt(ts), -np.sqrt(-ts))
return map_ts.copy(data=sqrt_ts)
[docs] def run(self, maps, kernel, which="all", downsampling_factor=None):
"""
Run TS map estimation.
Requires `counts`, `exposure` and `background` map to run.
Parameters
----------
maps : dict
Input sky maps.
kernel : `astropy.convolution.Kernel2D` or 2D `~numpy.ndarray`
Source model kernel.
which : list of str or 'all'
Which maps to compute.
downsampling_factor : int
Sample down the input maps to speed up the computation. Only integer
values that are a multiple of 2 are allowed. Note that the kernel is
not sampled down, but must be provided with the downsampled bin size.
Returns
-------
maps : dict
Result maps.
"""
p = self.parameters
if (np.array(kernel.shape) > np.array(maps["counts"].data.shape)).any():
raise ValueError(
"Kernel shape larger than map shape, please adjust"
" size of the kernel"
)
if downsampling_factor:
shape = maps["counts"].data.shape
pad_width = symmetric_crop_pad_width(shape, shape_2N(shape))[0]
for name in maps:
maps[name] = maps[name].pad(pad_width)
preserve_counts = name in ["counts", "background", "exclusion"]
maps[name] = maps[name].downsample(
downsampling_factor, preserve_counts=preserve_counts
)
if not isinstance(kernel, Kernel2D):
kernel = CustomKernel(kernel)
if which == "all":
which = ["ts", "sqrt_ts", "flux", "flux_err", "flux_ul", "niter"]
result = {}
for name in which:
data = np.nan * np.ones_like(maps["counts"].data)
result[name] = maps["counts"].copy(data=data)
mask = self.mask_default(maps, kernel)
if "mask" in maps:
mask.data &= maps["mask"].data
if p["threshold"] or p["method"] == "root newton":
flux = self.flux_default(maps, kernel).data
else:
flux = None
# prepare dtype for cython methods
counts = maps["counts"].data.astype(float)
background = maps["background"].data.astype(float)
exposure = maps["exposure"].data.astype(float)
# Compute null statistics per pixel for the whole image
c_0 = _cash_cython(counts, background)
error_method = p["error_method"] if "flux_err" in which else "none"
ul_method = p["ul_method"] if "flux_ul" in which else "none"
wrap = partial(
_ts_value,
counts=counts,
exposure=exposure,
background=background,
c_0=c_0,
kernel=kernel,
flux=flux,
method=p["method"],
error_method=error_method,
threshold=p["threshold"],
error_sigma=p["error_sigma"],
ul_method=ul_method,
ul_sigma=p["ul_sigma"],
rtol=p["rtol"],
)
x, y = np.where(mask.data)
positions = list(zip(x, y))
with contextlib.closing(Pool(processes=p["n_jobs"])) as pool:
log.info("Using {} jobs to compute TS map.".format(p["n_jobs"]))
results = pool.map(wrap, positions)
# Set TS values at given positions
j, i = zip(*positions)
for name in ["ts", "flux", "niter"]:
result[name].data[j, i] = [_[name] for _ in results]
if "flux_err" in which:
result["flux_err"].data[j, i] = [_["flux_err"] for _ in results]
if "flux_ul" in which:
result["flux_ul"].data[j, i] = [_["flux_ul"] for _ in results]
# Compute sqrt(TS) values
if "sqrt_ts" in which:
result["sqrt_ts"] = self.sqrt_ts(result["ts"])
if downsampling_factor:
for name in which:
order = 0 if name == "niter" else 1
result[name] = result[name].upsample(
factor=downsampling_factor, preserve_counts=False, order=order
)
result[name] = result[name].crop(crop_width=pad_width)
return result
def __repr__(self):
p = self.parameters
info = self.__class__.__name__
info += "\n\nParameters:\n\n"
for key in p:
info += "\t{key:13s}: {value}\n".format(key=key, value=p[key])
return info
def _ts_value(
position,
counts,
exposure,
background,
c_0,
kernel,
flux,
method,
error_method,
error_sigma,
ul_method,
ul_sigma,
threshold,
rtol,
):
"""Compute TS value at a given pixel position.
Uses approach described in Stewart (2009).
Parameters
----------
position : tuple (i, j)
Pixel position.
counts : `~numpy.ndarray`
Counts image
background : `~numpy.ndarray`
Background image
exposure : `~numpy.ndarray`
Exposure image
kernel : `astropy.convolution.Kernel2D`
Source model kernel
flux : `~numpy.ndarray`
Flux image. The flux value at the given pixel position is used as
starting value for the minimization.
Returns
-------
TS : float
TS value at the given pixel position.
"""
# Get data slices
counts_ = _extract_array(counts, kernel.shape, position)
background_ = _extract_array(background, kernel.shape, position)
exposure_ = _extract_array(exposure, kernel.shape, position)
c_0_ = _extract_array(c_0, kernel.shape, position)
model = exposure_ * kernel._array
c_0 = c_0_.sum()
if threshold is not None:
with np.errstate(invalid="ignore", divide="ignore"):
amplitude = flux[position]
c_1 = f_cash(amplitude / FLUX_FACTOR, counts_, background_, model)
# Don't fit if pixel significance is low
if c_0 - c_1 < threshold:
result = {}
result["ts"] = (c_0 - c_1) * np.sign(amplitude)
result["flux"] = amplitude
result["niter"] = 0
result["flux_err"] = np.nan
result["flux_ul"] = np.nan
return result
if method == "root brentq":
amplitude, niter = _root_amplitude_brentq(
counts_, background_, model, rtol=rtol
)
elif method == "root newton":
amplitude, niter = _root_amplitude(
counts_, background_, model, flux[position], rtol=rtol
)
elif method == "leastsq iter":
amplitude, niter = _leastsq_iter_amplitude(
counts_, background_, model, rtol=rtol
)
else:
raise ValueError("Invalid method: {}".format(method))
with np.errstate(invalid="ignore", divide="ignore"):
c_1 = f_cash(amplitude, counts_, background_, model)
result = {}
result["ts"] = (c_0 - c_1) * np.sign(amplitude)
result["flux"] = amplitude * FLUX_FACTOR
result["niter"] = niter
if error_method == "covar":
flux_err = _compute_flux_err_covar(amplitude, counts_, background_, model)
result["flux_err"] = flux_err * error_sigma
elif error_method == "conf":
flux_err = _compute_flux_err_conf(
amplitude, counts_, background_, model, c_1, error_sigma
)
result["flux_err"] = FLUX_FACTOR * flux_err
if ul_method == "covar":
result["flux_ul"] = result["flux"] + ul_sigma * result["flux_err"]
elif ul_method == "conf":
flux_ul = _compute_flux_err_conf(
amplitude, counts_, background_, model, c_1, ul_sigma
)
result["flux_ul"] = FLUX_FACTOR * flux_ul + result["flux"]
return result
def _leastsq_iter_amplitude(counts, background, model, maxiter=MAX_NITER, rtol=RTOL):
"""Fit amplitude using an iterative least squares algorithm.
Parameters
----------
counts : `~numpy.ndarray`
Slice of counts image
background : `~numpy.ndarray`
Slice of background image
model : `~numpy.ndarray`
Model template to fit.
maxiter : int
Maximum number of iterations.
rtol : float
Relative flux error.
Returns
-------
amplitude : float
Fitted flux amplitude.
niter : int
Number of function evaluations needed for the fit.
"""
bounds = _amplitude_bounds_cython(counts, background, model)
amplitude_min, amplitude_max, amplitude_min_total = bounds
if not counts.sum() > 0:
return amplitude_min_total, 0
weights = np.ones(model.shape)
x_old = 0
for i in range(maxiter):
x = _x_best_leastsq(counts, background, model, weights)
if abs((x - x_old) / x) < rtol:
return max(x / FLUX_FACTOR, amplitude_min_total), i + 1
else:
weights = x * model + background
x_old = x
return max(x / FLUX_FACTOR, amplitude_min_total), MAX_NITER
def _root_amplitude(counts, background, model, flux, rtol=RTOL):
"""Fit amplitude by finding roots using newton algorithm.
See Appendix A Stewart (2009).
Parameters
----------
counts : `~numpy.ndarray`
Slice of count image
background : `~numpy.ndarray`
Slice of background image
model : `~numpy.ndarray`
Model template to fit.
flux : float
Starting value for the fit.
Returns
-------
amplitude : float
Fitted flux amplitude.
niter : int
Number of function evaluations needed for the fit.
"""
args = (counts, background, model)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
return (
newton(
_f_cash_root_cython, flux, args=args, maxiter=MAX_NITER, tol=rtol
),
0,
)
except RuntimeError:
# Where the root finding fails NaN is set as amplitude
return np.nan, MAX_NITER
def _root_amplitude_brentq(counts, background, model, rtol=RTOL):
"""Fit amplitude by finding roots using Brent algorithm.
See Appendix A Stewart (2009).
Parameters
----------
counts : `~numpy.ndarray`
Slice of count image
background : `~numpy.ndarray`
Slice of background image
model : `~numpy.ndarray`
Model template to fit.
Returns
-------
amplitude : float
Fitted flux amplitude.
niter : int
Number of function evaluations needed for the fit.
"""
# Compute amplitude bounds and assert counts > 0
bounds = _amplitude_bounds_cython(counts, background, model)
amplitude_min, amplitude_max, amplitude_min_total = bounds
if not counts.sum() > 0:
return amplitude_min_total, 0
args = (counts, background, model)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
result = brentq(
_f_cash_root_cython,
amplitude_min,
amplitude_max,
args=args,
maxiter=MAX_NITER,
full_output=True,
rtol=rtol,
)
return max(result[0], amplitude_min_total), result[1].iterations
except (RuntimeError, ValueError):
# Where the root finding fails NaN is set as amplitude
return np.nan, MAX_NITER
def _compute_flux_err_covar(x, counts, background, model):
"""
Compute amplitude errors using inverse 2nd derivative method.
"""
with np.errstate(invalid="ignore", divide="ignore"):
stat = (model ** 2 * counts) / (background + x * FLUX_FACTOR * model) ** 2
return np.sqrt(1.0 / stat.sum())
def _compute_flux_err_conf(amplitude, counts, background, model, c_1, error_sigma):
"""
Compute amplitude errors using likelihood profile method.
"""
def ts_diff(x, counts, background, model):
return (c_1 + error_sigma ** 2) - f_cash(x, counts, background, model)
args = (counts, background, model)
amplitude_max = amplitude + 1e4
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
result = brentq(
ts_diff,
amplitude,
amplitude_max,
args=args,
maxiter=MAX_NITER,
rtol=1e-3,
)
return result - amplitude
except (RuntimeError, ValueError):
# Where the root finding fails NaN is set as amplitude
return np.nan