Source code for gammapy.stats.counts_statistic

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import abc
import numpy as np
from scipy.stats import chi2
from gammapy.utils.roots import find_roots
from .fit_statistics import cash, wstat

__all__ = ["WStatCountsStatistic", "CashCountsStatistic"]


class CountsStatistic(abc.ABC):
    """Counts statistics base class"""

    @property
    @abc.abstractmethod
    def stat_null(self):
        pass

    @property
    @abc.abstractmethod
    def stat_max(self):
        pass

    @property
    @abc.abstractmethod
    def n_sig(self):
        pass

    @property
    @abc.abstractmethod
    def n_bkg(self):
        pass

    @property
    @abc.abstractmethod
    def error(self):
        pass

    @abc.abstractmethod
    def _stat_fcn(self):
        pass

    @property
    def ts(self):
        """Return stat difference (TS) of measured excess versus no excess."""
        # Remove (small) negative TS due to error in root finding
        ts = np.clip(self.stat_null - self.stat_max, 0, None)
        return ts

    @property
    def sqrt_ts(self):
        """Return statistical significance of measured excess.
        The sign of the excess is applied to distinguish positive and negative fluctuations.
        """
        return np.sign(self.n_sig) * np.sqrt(self.ts)

    @property
    def p_value(self):
        """Return p_value of measured excess.
        Here the value accounts only for the positive excess significance (i.e. one-sided).
        """
        return 0.5 * chi2.sf(self.ts, 1)

    def compute_errn(self, n_sigma=1.0):
        """Compute downward excess uncertainties.

        Searches the signal value for which the test statistics is n_sigma**2 away from the maximum.

        Parameters
        ----------
        n_sigma : float
            Confidence level of the uncertainty expressed in number of sigma. Default is 1.
        """
        errn = np.zeros_like(self.n_sig, dtype="float")
        min_range = self.n_sig - 2 * n_sigma * (self.error + 1)

        it = np.nditer(errn, flags=["multi_index"])
        while not it.finished:
            roots, res = find_roots(
                self._stat_fcn,
                min_range[it.multi_index],
                self.n_sig[it.multi_index],
                nbin=1,
                args=(self.stat_max[it.multi_index] + n_sigma**2, it.multi_index),
            )
            if np.isnan(roots[0]):
                errn[it.multi_index] = self.n_on[it.multi_index]
            else:
                errn[it.multi_index] = self.n_sig[it.multi_index] - roots[0]
            it.iternext()

        return errn

    def compute_errp(self, n_sigma=1):
        """Compute upward excess uncertainties.

        Searches the signal value for which the test statistics is n_sigma**2 away from the maximum.

        Parameters
        ----------
        n_sigma : float
            Confidence level of the uncertainty expressed in number of sigma. Default is 1.
        """

        errp = np.zeros_like(self.n_on, dtype="float")
        max_range = self.n_sig + 2 * n_sigma * (self.error + 1)

        it = np.nditer(errp, flags=["multi_index"])
        while not it.finished:
            roots, res = find_roots(
                self._stat_fcn,
                self.n_sig[it.multi_index],
                max_range[it.multi_index],
                nbin=1,
                args=(self.stat_max[it.multi_index] + n_sigma**2, it.multi_index),
            )
            errp[it.multi_index] = roots[0]
            it.iternext()

        return errp - self.n_sig

    def compute_upper_limit(self, n_sigma=3):
        """Compute upper limit on the signal.

        Searches the signal value for which the test statistics is n_sigma**2 away from the maximum
        or from 0 if the measured excess is negative.

        Parameters
        ----------
        n_sigma : float
            Confidence level of the upper limit expressed in number of sigma. Default is 3.
        """

        ul = np.zeros_like(self.n_on, dtype="float")

        min_range = self.n_sig
        max_range = self.n_sig + 2 * n_sigma * (self.error + 1)
        it = np.nditer(ul, flags=["multi_index"])

        while not it.finished:
            ts_ref = self._stat_fcn(min_range[it.multi_index], 0.0, it.multi_index)

            roots, res = find_roots(
                self._stat_fcn,
                min_range[it.multi_index],
                max_range[it.multi_index],
                nbin=1,
                args=(ts_ref + n_sigma**2, it.multi_index),
            )
            ul[it.multi_index] = roots[0]
            it.iternext()
        return ul

    @abc.abstractmethod
    def _n_sig_matching_significance_fcn(self):
        pass

    def n_sig_matching_significance(self, significance):
        """Compute excess matching a given significance.

        This function is the inverse of `significance`.

        Parameters
        ----------
        significance : float
            Significance

        Returns
        -------
        n_sig : `numpy.ndarray`
            Excess
        """

        n_sig = np.zeros_like(self.n_bkg, dtype="float")
        it = np.nditer(n_sig, flags=["multi_index"])

        while not it.finished:
            lower_bound = np.sqrt(self.n_bkg[it.multi_index]) * significance
            # find upper bounds for secant method as in scipy
            eps = 1e-4
            upper_bound = lower_bound * (1 + eps)
            upper_bound += eps if upper_bound >= 0 else -eps
            roots, res = find_roots(
                self._n_sig_matching_significance_fcn,
                lower_bound=lower_bound,
                upper_bound=upper_bound,
                args=(significance, it.multi_index),
                nbin=1,
                method="secant",
            )
            n_sig[it.multi_index] = roots[0]  # return NaN if fail
            it.iternext()
        return n_sig

    @abc.abstractmethod
    def sum(self, axis=None):
        """Return summed CountsStatistics.

        Parameters
        ----------
        axis : None or int or tuple of ints, optional
             Axis or axes on which to perform the summation.
             Default, axis=None, will perform the sum over the whole array.

        Returns
        -------
        stat : `~gammapy.stats.CountsStatistics`
             the return stat object
        """
        pass


[docs]class CashCountsStatistic(CountsStatistic): """Class to compute statistics for Poisson distributed variable with known background. Parameters ---------- n_on : int Measured counts mu_bkg : float Known level of background """ def __init__(self, n_on, mu_bkg): self.n_on = np.asanyarray(n_on) self.mu_bkg = np.asanyarray(mu_bkg) @property def n_bkg(self): """Expected background counts""" return self.mu_bkg @property def n_sig(self): """Excess""" return self.n_on - self.n_bkg @property def error(self): """Approximate error from the covariance matrix.""" return np.sqrt(self.n_on) @property def stat_null(self): """Stat value for null hypothesis, i.e. 0 expected signal counts""" return cash(self.n_on, self.mu_bkg + 0) @property def stat_max(self): """Stat value for best fit hypothesis, i.e. expected signal mu = n_on - mu_bkg""" return cash(self.n_on, self.n_on) def _stat_fcn(self, mu, delta=0, index=None): return cash(self.n_on[index], self.mu_bkg[index] + mu) - delta def _n_sig_matching_significance_fcn(self, n_sig, significance, index): TS0 = cash(n_sig + self.mu_bkg[index], self.mu_bkg[index]) TS1 = cash(n_sig + self.mu_bkg[index], self.mu_bkg[index] + n_sig) return np.sign(n_sig) * np.sqrt(np.clip(TS0 - TS1, 0, None)) - significance
[docs] def sum(self, axis=None): n_on = self.n_on.sum(axis=axis) bkg = self.n_bkg.sum(axis=axis) return CashCountsStatistic(n_on=n_on, mu_bkg=bkg)
def __getitem__(self, key): return CashCountsStatistic(n_on=self.n_on[key], mu_bkg=self.n_bkg[key])
[docs]class WStatCountsStatistic(CountsStatistic): """Class to compute statistics for Poisson distributed variable with unknown background. Parameters ---------- n_on : int Measured counts in on region n_off : int Measured counts in off region alpha : float Acceptance ratio of on and off measurements mu_sig : float Expected signal counts in on region """ def __init__(self, n_on, n_off, alpha, mu_sig=None): self.n_on = np.asanyarray(n_on) self.n_off = np.asanyarray(n_off) self.alpha = np.asanyarray(alpha) if mu_sig is None: self.mu_sig = np.zeros_like(self.n_on) else: self.mu_sig = np.asanyarray(mu_sig) @property def n_bkg(self): """Known background computed alpha * n_off""" return self.alpha * self.n_off @property def n_sig(self): """Excess""" return self.n_on - self.n_bkg - self.mu_sig @property def error(self): """Approximate error from the covariance matrix.""" return np.sqrt(self.n_on + self.alpha**2 * self.n_off) @property def stat_null(self): """Stat value for null hypothesis, i.e. mu_sig expected signal counts""" return wstat(self.n_on, self.n_off, self.alpha, self.mu_sig) @property def stat_max(self): """Stat value for best fit hypothesis i.e. expected signal mu = n_on - alpha * n_off - mu_sig """ return wstat(self.n_on, self.n_off, self.alpha, self.n_sig + self.mu_sig) def _stat_fcn(self, mu, delta=0, index=None): return ( wstat( self.n_on[index], self.n_off[index], self.alpha[index], (mu + self.mu_sig[index]), ) - delta ) def _n_sig_matching_significance_fcn(self, n_sig, significance, index): stat0 = wstat( n_sig + self.n_bkg[index], self.n_off[index], self.alpha[index], 0 ) stat1 = wstat( n_sig + self.n_bkg[index], self.n_off[index], self.alpha[index], n_sig, ) return np.sign(n_sig) * np.sqrt(np.clip(stat0 - stat1, 0, None)) - significance
[docs] def sum(self, axis=None): n_on = self.n_on.sum(axis=axis) n_off = self.n_off.sum(axis=axis) alpha = self.n_bkg.sum(axis=axis) / n_off return WStatCountsStatistic(n_on=n_on, n_off=n_off, alpha=alpha)
def __getitem__(self, key): return WStatCountsStatistic( n_on=self.n_on[key], n_off=self.n_off[key], alpha=self.alpha[key] )