Source code for gammapy.stats.variability

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import scipy.stats as stats
from gammapy.utils.random import get_random_state

__all__ = [
    "compute_fvar",
    "compute_fpp",
    "compute_chisq",
    "compute_flux_doubling",
    "structure_function",
    "TimmerKonig_lightcurve_simulator",
]


[docs] def compute_fvar(flux, flux_err, axis=0): r"""Calculate the fractional excess variance. This method accesses the ``FLUX`` and ``FLUX_ERR`` columns from the lightcurve data. The fractional excess variance :math:`F_{var}`, an intrinsic variability estimator, is given by: .. math:: F_{var} = \sqrt{ \frac{S^{2} - \bar{ \sigma^{2}}}{ \bar{x}^{2}}} It is the excess variance after accounting for the measurement errors on the light curve :math:`\sigma`. :math:`S` is the variance. It is important to note that the errors on the flux must be gaussian. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. axis : int, optional Axis along which the excess variance is computed. Default is 0. Returns ------- fvar, fvar_err : `~numpy.ndarray` Fractional excess variance. References ---------- .. [Vaughan2003] "On characterizing the variability properties of X-ray light curves from active galaxies", Vaughan et al. (2003) https://ui.adsabs.harvard.edu/abs/2003MNRAS.345.1271V """ flux_mean = np.nanmean(flux, axis=axis) n_points = np.count_nonzero(~np.isnan(flux), axis=axis) s_square = np.nansum((flux - flux_mean) ** 2, axis=axis) / (n_points - 1) sig_square = np.nansum(flux_err**2, axis=axis) / n_points fvar = np.sqrt(np.abs(s_square - sig_square)) / flux_mean sigxserr_a = np.sqrt(2 / n_points) * sig_square / flux_mean**2 sigxserr_b = np.sqrt(sig_square / n_points) * (2 * fvar / flux_mean) sigxserr = np.sqrt(sigxserr_a**2 + sigxserr_b**2) fvar_err = sigxserr / (2 * fvar) return fvar, fvar_err
[docs] def compute_fpp(flux, flux_err, axis=0): r"""Calculate the point-to-point excess variance. F_pp is a quantity strongly related to the fractional excess variance F_var implemented in `~gammapy.stats.compute_fvar`; F_pp probes the variability on a shorter timescale. For white noise, F_pp and F_var give the same value. However, for red noise, F_var will be larger than F_pp, as the variations will be larger on longer timescales. It is important to note that the errors on the flux must be Gaussian. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. axis : int, optional Axis along which the excess variance is computed. Default is 0. Returns ------- fpp, fpp_err : `~numpy.ndarray` Point-to-point excess variance. References ---------- .. [Edelson2002] "X-Ray Spectral Variability and Rapid Variability of the Soft X-Ray Spectrum Seyfert 1 Galaxies Arakelian 564 and Ton S180", Edelson et al. (2002), equation 3, https://iopscience.iop.org/article/10.1086/323779 """ flux_mean = np.nanmean(flux, axis=axis) n_points = np.count_nonzero(~np.isnan(flux), axis=axis) flux = flux.swapaxes(0, axis).T s_square = np.nansum((flux[..., 1:] - flux[..., :-1]) ** 2, axis=-1) / ( n_points.T - 1 ) sig_square = np.nansum(flux_err**2, axis=axis) / n_points fpp = np.sqrt(np.abs(s_square.T - sig_square)) / flux_mean sigxserr_a = np.sqrt(2 / n_points) * sig_square / flux_mean**2 sigxserr_b = np.sqrt(sig_square / n_points) * (2 * fpp / flux_mean) sigxserr = np.sqrt(sigxserr_a**2 + sigxserr_b**2) fpp_err = sigxserr / (2 * fpp) return fpp, fpp_err
[docs] def compute_chisq(flux): r"""Calculate the chi-square test for `LightCurve`. Chisquare test is a variability estimator. It computes deviations from the expected value here mean value. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. Returns ------- ChiSq, P-value : tuple of float or `~numpy.ndarray` Tuple of Chi-square and P-value. """ yexp = np.mean(flux) yobs = flux.data chi2, pval = stats.chisquare(yobs, yexp) return chi2, pval
[docs] def compute_flux_doubling(flux, flux_err, coords, axis=0): r"""Compute the minimum characteristic flux doubling and halving over a certain coordinate axis for a series of measurements. Computing the flux doubling can give the doubling time in a lightcurve displaying significant temporal variability, e.g. an AGN flare. The variable is computed as: .. math:: doubling = min(\frac{t_(i+1)-t_i}{log_2{f_(i+1)/f_i}}) where f_i and f_(i+1) are the fluxes measured at subsequent coordinates t_i and t_(i+1). The error is obtained by propagating the relative errors on the flux measures. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. coords : `~astropy.units.Quantity` The coordinates at which the fluxes are measured. axis : int, optional Axis along which the value is computed. Returns ------- doubling_dict : dict Dictionary containing the characteristic flux doubling, halving and errors, with coordinates at which they were found. """ flux = np.atleast_2d(flux).swapaxes(0, axis).T flux_err = np.atleast_2d(flux_err).swapaxes(0, axis).T axes = np.diff(coords) / np.log2(flux[..., 1:] / flux[..., :-1]) axes_err_1 = ( np.diff(coords) * np.log(2) / flux[..., 1:] * np.log(flux[..., 1:] / flux[..., :-1]) ** 2 ) axes_err_2 = ( np.diff(coords) * np.log(2) / flux[..., :-1] * np.log(flux[..., 1:] / flux[..., :-1]) ** 2 ) axes_err = np.sqrt( (flux_err[..., 1:] * axes_err_1) ** 2 + (flux_err[..., :-1] * axes_err_2) ** 2 ) imin = np.expand_dims( np.argmin( np.where( np.logical_and(np.isfinite(axes), axes > 0), axes, np.inf * coords.unit ), axis=-1, ), axis=-1, ) imax = np.expand_dims( np.argmax( np.where( np.logical_and(np.isfinite(axes), axes < 0), axes, -np.inf * coords.unit ), axis=-1, ), axis=-1, ) index = np.concatenate([imin, imax], axis=-1) coord = np.take_along_axis(coords, index.flatten(), axis=0).reshape(index.shape) doubling = np.take_along_axis(axes, index, axis=-1) doubling_err = np.take_along_axis(axes_err, index, axis=-1) doubling_dict = { "doubling": doubling.T[0], "doubling_err": doubling_err.T[0], "doubling_coord": coord.T[0], "halving": np.abs(doubling.T[1]), "halving_err": doubling_err.T[1], "halving_coord": coord.T[1], } return doubling_dict
[docs] def structure_function(flux, flux_err, time, tdelta_precision=5): """Compute the discrete structure function for a variable source. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. time : `~astropy.units.Quantity` The time coordinates at which the fluxes are measured. tdelta_precision : int, optional The number of decimal places to check to separate the time deltas. Default is 5. Returns ------- sf, distances : `~numpy.ndarray`, `~astropy.units.Quantity` Discrete structure function and array of time distances. References ---------- .. [Emmanoulopoulos2010] "On the use of structure functions to study blazar variability: caveats and problems", Emmanoulopoulos et al. (2010) https://academic.oup.com/mnras/article/404/2/931/968488 """ dist_matrix = (time[np.newaxis, :] - time[:, np.newaxis]).round( decimals=tdelta_precision ) distances = np.unique(dist_matrix) distances = distances[distances > 0] shape = distances.shape + flux.shape[1:] factor = np.zeros(shape) norm = np.zeros(shape) for i, distance in enumerate(distances): indexes = np.array(np.where(dist_matrix == distance)) for index in indexes.T: f = (flux[index[1], ...] - flux[index[0], ...]) ** 2 w = (flux[index[1], ...] / flux_err[index[1], ...]) * ( flux[index[0], ...] / flux_err[index[0], ...] ) f = np.nan_to_num(f) w = np.nan_to_num(w) factor[i] = factor[i] + f * w norm[i] = norm[i] + w sf = factor / norm return sf, distances
[docs] def TimmerKonig_lightcurve_simulator( power_spectrum, npoints, spacing, random_state="random-seed", power_spectrum_params=None, ): """Implementation of the Timmer-Koenig algorithm to simulate a time series from a power spectrum. Parameters ---------- power_spectrum : function Power spectrum used to generate the time series. It is expected to be a function mapping the input frequencies to the periodogram. npoints : int Number of points in the output time series. spacing : `~astropy.units.Quantity` Sample spacing, inverse of the sampling rate. The units are inherited by the resulting time axis. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Default is "random-seed". power_spectrum_params : dict, optional Dictionary of parameters to be provided to the power spectrum function. Returns ------- time_series : `~numpy.ndarray` Simulated time series. time_axis : `~astropy.units.Quantity` Time axis of the series in the same units as 'spacing'. It will be defined with length 'npoints', from 0 to 'npoints'*'spacing'. Examples -------- To pass the function to be used in the simlation one can use either the 'lambda' keyword or an extended definition. Parameters of the function can be passed using the 'power_spectrum_params' keyword. For example, these are three ways to pass a power law (red noise) with index 2: >>> from gammapy.stats import TimmerKonig_lightcurve_simulator >>> import astropy.units as u >>> def powerlaw(x): ... return x**(-2) >>> def powerlaw_with_parameters(x, i): ... return x**(-i) >>> ts, ta = TimmerKonig_lightcurve_simulator(lambda x: x**(-2), 20, 1*u.h) >>> ts2, ta2 = TimmerKonig_lightcurve_simulator(powerlaw, 20, 1*u.h) >>> ts3, ta3 = TimmerKonig_lightcurve_simulator(powerlaw_with_parameters, ... 20, 1*u.h, power_spectrum_params={"i":2}) References ---------- .. [Timmer1995] "On generating power law noise", J. Timmer and M, Konig, section 3 https://ui.adsabs.harvard.edu/abs/1995A%26A...300..707T/abstract """ if not callable(power_spectrum): raise ValueError( "The power spectrum has to be provided as a callable function." ) random_state = get_random_state(random_state) frequencies = np.fft.fftfreq(npoints, spacing.value) # To obtain real data only the positive or negative part of the frequency is necessary. real_frequencies = np.sort(np.abs(frequencies[frequencies < 0])) if power_spectrum_params: periodogram = power_spectrum(real_frequencies, **power_spectrum_params) else: periodogram = power_spectrum(real_frequencies) real_part = random_state.normal(0, 1, len(periodogram) - 1) imaginary_part = random_state.normal(0, 1, len(periodogram) - 1) # Nyquist frequency component handling if npoints % 2 == 0: idx0 = -2 random_factor = random_state.normal(0, 1) else: idx0 = -1 random_factor = random_state.normal(0, 1) + 1j * random_state.normal(0, 1) fourier_coeffs = np.concatenate( [ np.sqrt(0.5 * periodogram[:-1]) * (real_part + 1j * imaginary_part), np.sqrt(0.5 * periodogram[-1:]) * random_factor, ] ) fourier_coeffs = np.concatenate( [fourier_coeffs, np.conjugate(fourier_coeffs[idx0::-1])] ) fourier_coeffs = np.insert(fourier_coeffs, 0, 0) time_series = np.fft.ifft(fourier_coeffs).real tmax = time_series.max() if tmax < np.abs(time_series.min()): time_series = -time_series time_series = 0.5 + 0.5 * time_series / tmax time_axis = np.linspace(0, npoints * spacing.value, npoints) * spacing.unit return time_series, time_axis