Source code for gammapy.stats.variability

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import scipy.stats as stats
import astropy.units as u
from gammapy.utils.random import get_random_state

__all__ = [
    "compute_fvar",
    "compute_fpp",
    "compute_chisq",
    "compute_flux_doubling",
    "structure_function",
    "TimmerKonig_lightcurve_simulator",
    "discrete_correlation",
]


[docs] def compute_fvar(flux, flux_err, axis=0): r"""Calculate the fractional excess variance. This method accesses the ``FLUX`` and ``FLUX_ERR`` columns from the lightcurve data. The fractional excess variance :math:`F_{var}`, an intrinsic variability estimator, is given by: .. math:: F_{var} = \sqrt{ \frac{S^{2} - \bar{ \sigma^{2}}}{ \bar{x}^{2}}} It is the excess variance after accounting for the measurement errors on the light curve :math:`\sigma`. :math:`S` is the variance. It is important to note that the errors on the flux must be gaussian. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. axis : int, optional Axis along which the excess variance is computed. Default is 0. Returns ------- fvar, fvar_err : `~numpy.ndarray` Fractional excess variance. References ---------- .. [Vaughan2003] "On characterizing the variability properties of X-ray light curves from active galaxies", Vaughan et al. (2003) https://ui.adsabs.harvard.edu/abs/2003MNRAS.345.1271V """ flux_mean = np.nanmean(flux, axis=axis) n_points = np.count_nonzero(~np.isnan(flux), axis=axis) s_square = np.nansum((flux - flux_mean) ** 2, axis=axis) / (n_points - 1) sig_square = np.nansum(flux_err**2, axis=axis) / n_points fvar = np.sqrt(np.abs(s_square - sig_square)) / flux_mean sigxserr_a = np.sqrt(2 / n_points) * sig_square / flux_mean**2 sigxserr_b = np.sqrt(sig_square / n_points) * (2 * fvar / flux_mean) sigxserr = np.sqrt(sigxserr_a**2 + sigxserr_b**2) fvar_err = sigxserr / (2 * fvar) return fvar, fvar_err
[docs] def compute_fpp(flux, flux_err, axis=0): r"""Calculate the point-to-point excess variance. F_pp is a quantity strongly related to the fractional excess variance F_var implemented in `~gammapy.stats.compute_fvar`; F_pp probes the variability on a shorter timescale. For white noise, F_pp and F_var give the same value. However, for red noise, F_var will be larger than F_pp, as the variations will be larger on longer timescales. It is important to note that the errors on the flux must be Gaussian. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. axis : int, optional Axis along which the excess variance is computed. Default is 0. Returns ------- fpp, fpp_err : `~numpy.ndarray` Point-to-point excess variance. References ---------- .. [Edelson2002] "X-Ray Spectral Variability and Rapid Variability of the Soft X-Ray Spectrum Seyfert 1 Galaxies Arakelian 564 and Ton S180", Edelson et al. (2002), equation 3, https://iopscience.iop.org/article/10.1086/323779 """ flux_mean = np.nanmean(flux, axis=axis) n_points = np.count_nonzero(~np.isnan(flux), axis=axis) flux = flux.swapaxes(0, axis).T s_square = np.nansum((flux[..., 1:] - flux[..., :-1]) ** 2, axis=-1) / ( n_points.T - 1 ) sig_square = np.nansum(flux_err**2, axis=axis) / n_points fpp = np.sqrt(np.abs(s_square.T - sig_square)) / flux_mean sigxserr_a = np.sqrt(2 / n_points) * sig_square / flux_mean**2 sigxserr_b = np.sqrt(sig_square / n_points) * (2 * fpp / flux_mean) sigxserr = np.sqrt(sigxserr_a**2 + sigxserr_b**2) fpp_err = sigxserr / (2 * fpp) return fpp, fpp_err
[docs] def compute_chisq(flux): r"""Calculate the chi-square test for `LightCurve`. Chisquare test is a variability estimator. It computes deviations from the expected value here mean value. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. Returns ------- ChiSq, P-value : tuple of float or `~numpy.ndarray` Tuple of Chi-square and P-value. """ yexp = np.mean(flux) yobs = flux.data chi2, pval = stats.chisquare(yobs, yexp) return chi2, pval
[docs] def compute_flux_doubling(flux, flux_err, coords, axis=0): r"""Compute the minimum characteristic flux doubling and halving over a certain coordinate axis for a series of measurements. Computing the flux doubling can give the doubling time in a lightcurve displaying significant temporal variability, e.g. an AGN flare. The variable is computed as: .. math:: doubling = min(\frac{t_(i+1)-t_i}{log_2{f_(i+1)/f_i}}) where f_i and f_(i+1) are the fluxes measured at subsequent coordinates t_i and t_(i+1). The error is obtained by propagating the relative errors on the flux measures. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. coords : `~astropy.units.Quantity` The coordinates at which the fluxes are measured. axis : int, optional Axis along which the value is computed. Returns ------- doubling_dict : dict Dictionary containing the characteristic flux doubling, halving and errors, with coordinates at which they were found. """ flux = np.atleast_2d(flux).swapaxes(0, axis).T flux_err = np.atleast_2d(flux_err).swapaxes(0, axis).T axes = np.diff(coords) / np.log2(flux[..., 1:] / flux[..., :-1]) axes_err_1 = ( np.diff(coords) * np.log(2) / flux[..., 1:] * np.log(flux[..., 1:] / flux[..., :-1]) ** 2 ) axes_err_2 = ( np.diff(coords) * np.log(2) / flux[..., :-1] * np.log(flux[..., 1:] / flux[..., :-1]) ** 2 ) axes_err = np.sqrt( (flux_err[..., 1:] * axes_err_1) ** 2 + (flux_err[..., :-1] * axes_err_2) ** 2 ) imin = np.expand_dims( np.argmin( np.where( np.logical_and(np.isfinite(axes), axes > 0), axes, np.inf * coords.unit ), axis=-1, ), axis=-1, ) imax = np.expand_dims( np.argmax( np.where( np.logical_and(np.isfinite(axes), axes < 0), axes, -np.inf * coords.unit ), axis=-1, ), axis=-1, ) index = np.concatenate([imin, imax], axis=-1) coord = np.take_along_axis(coords, index.flatten(), axis=0).reshape(index.shape) doubling = np.take_along_axis(axes, index, axis=-1) doubling_err = np.take_along_axis(axes_err, index, axis=-1) doubling_dict = { "doubling": doubling.T[0], "doubling_err": doubling_err.T[0], "doubling_coord": coord.T[0], "halving": np.abs(doubling.T[1]), "halving_err": doubling_err.T[1], "halving_coord": coord.T[1], } return doubling_dict
[docs] def structure_function(flux, flux_err, time, tdelta_precision=5): """Compute the discrete structure function for a variable source. Parameters ---------- flux : `~astropy.units.Quantity` The measured fluxes. flux_err : `~astropy.units.Quantity` The error on measured fluxes. time : `~astropy.units.Quantity` The time coordinates at which the fluxes are measured. tdelta_precision : int, optional The number of decimal places to check to separate the time deltas. Default is 5. Returns ------- sf, distances : `~numpy.ndarray`, `~astropy.units.Quantity` Discrete structure function and array of time distances. References ---------- .. [Emmanoulopoulos2010] "On the use of structure functions to study blazar variability: caveats and problems", Emmanoulopoulos et al. (2010) https://academic.oup.com/mnras/article/404/2/931/968488 """ dist_matrix = (time[np.newaxis, :] - time[:, np.newaxis]).round( decimals=tdelta_precision ) distances = np.unique(dist_matrix) distances = distances[distances > 0] shape = distances.shape + flux.shape[1:] factor = np.zeros(shape) norm = np.zeros(shape) for i, distance in enumerate(distances): indexes = np.array(np.where(dist_matrix == distance)) for index in indexes.T: f = (flux[index[1], ...] - flux[index[0], ...]) ** 2 w = (flux[index[1], ...] / flux_err[index[1], ...]) * ( flux[index[0], ...] / flux_err[index[0], ...] ) f = np.nan_to_num(f) w = np.nan_to_num(w) factor[i] = factor[i] + f * w norm[i] = norm[i] + w sf = factor / norm return sf, distances
[docs] def discrete_correlation(flux1, flux_err1, flux2, flux_err2, time1, time2, tau, axis=0): """Compute the discrete correlation function for a variable source. Parameters ---------- flux1, flux_err1: `~astropy.units.Quantity` The first set of measured fluxes and associated error. flux2, flux_err2 : `~astropy.units.Quantity` The second set of measured fluxes and associated error. time1, time2 : `~astropy.units.Quantity` The time coordinates at which the fluxes are measured. tau : `~astropy.units.Quantity` Size of the bins to compute the discrete correlation. axis : int, optional Axis along which the correlation is computed. Default is 0. Returns ------- bincenters: `~astropy.units.Quantity` Array of discrete time bins. discrete_correlation: `~numpy.ndarray` Array of discrete correlation function values for each bin. discrete_correlation_err : `~numpy.ndarray` Error associated to the discrete correlation values. References ---------- .. [Edelson1988] "THE DISCRETE CORRELATION FUNCTION: A NEW METHOD FOR ANALYZING UNEVENLY SAMPLED VARIABILITY DATA", Edelson et al. (1988) https://ui.adsabs.harvard.edu/abs/1988ApJ...333..646E/abstract """ flux1 = np.rollaxis(flux1, axis, 0) flux2 = np.rollaxis(flux2, axis, 0) if np.squeeze(flux1).shape[1:] != np.squeeze(flux2).shape[1:]: raise ValueError( "flux1 and flux2 must have the same squeezed shape, apart from the chosen axis." ) tau = tau.to(time1.unit) time2 = time2.to(time1.unit) mean1, mean2 = np.nanmean(flux1, axis=0), np.nanmean(flux2, axis=0) sigma1, sigma2 = np.nanstd(flux1, axis=0), np.nanstd(flux2, axis=0) udcf1 = (flux1 - mean1) / np.sqrt((sigma1**2 - np.nanmean(flux_err1, axis=0) ** 2)) udcf2 = (flux2 - mean2) / np.sqrt((sigma2**2 - np.nanmean(flux_err2, axis=0) ** 2)) udcf = np.empty(((flux1.shape[0],) + flux2.shape)) dist = u.Quantity(np.empty(((flux1.shape[0], flux2.shape[0]))), unit=time1.unit) for i, x1 in enumerate(udcf1): for j, x2 in enumerate(udcf2): udcf[i, j, ...] = x1 * x2 dist[i, j] = time1[i] - time2[j] maxfactor = np.floor(np.amax(dist) / tau).value + 1 minfactor = np.floor(np.amin(dist) / tau).value bins = ( np.linspace( minfactor, maxfactor, int(np.abs(maxfactor) + np.abs(minfactor) + 1) ) * tau ) bin_indices = np.digitize(dist, bins).flatten() udcf = np.reshape(udcf, (udcf.shape[0] * udcf.shape[1], -1)) discrete_correlation = np.array( [np.nanmean(udcf[bin_indices == i], axis=0) for i in range(1, len(bins))] ) discrete_correlation_err = [] for i in range(1, len(bins)): terms = (discrete_correlation[i - 1] - udcf[bin_indices == i]) ** 2 num = np.sqrt(np.nansum(terms, axis=0)) den = len(udcf[bin_indices == i]) - 1 discrete_correlation_err.append(num / den) bincenters = (bins[1:] + bins[:-1]) / 2 return bincenters, discrete_correlation, np.array(discrete_correlation_err)
[docs] def TimmerKonig_lightcurve_simulator( power_spectrum, npoints, spacing, nchunks=10, random_state="random-seed", power_spectrum_params=None, mean=0.0, std=1.0, poisson=False, ): """Implementation of the Timmer-Koenig algorithm to simulate a time series from a power spectrum. Parameters ---------- power_spectrum : function Power spectrum used to generate the time series. It is expected to be a function mapping the input frequencies to the periodogram. npoints : int Number of points in the output time series. spacing : `~astropy.units.Quantity` Sample spacing, inverse of the sampling rate. The units are inherited by the resulting time axis. nchunks : int, optional Factor by which to multiply the length of the time series to avoid red noise leakage. Default is 10. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}, optional Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Default is "random-seed". power_spectrum_params : dict, optional Dictionary of parameters to be provided to the power spectrum function. mean : float, `~astropy.units.Quantity`, optional Desired mean of the final series. Default is 0. std : float, `~astropy.units.Quantity`, optional Desired standard deviation of the final series. Default is 1. poisson : bool, optional Whether to apply poissonian noise to the final time series. Default is False. Returns ------- time_series : `~numpy.ndarray` Simulated time series. time_axis : `~astropy.units.Quantity` Time axis of the series in the same units as 'spacing'. It will be defined with length 'npoints', from 0 to 'npoints'*'spacing'. Examples -------- To pass the function to be used in the simlation one can use either the 'lambda' keyword or an extended definition. Parameters of the function can be passed using the 'power_spectrum_params' keyword. For example, these are three ways to pass a power law (red noise) with index 2: >>> from gammapy.stats import TimmerKonig_lightcurve_simulator >>> import astropy.units as u >>> def powerlaw(x): ... return x**(-2) >>> def powerlaw_with_parameters(x, i): ... return x**(-i) >>> ts, ta = TimmerKonig_lightcurve_simulator(lambda x: x**(-2), 20, 1*u.h) >>> ts2, ta2 = TimmerKonig_lightcurve_simulator(powerlaw, 20, 1*u.h) >>> ts3, ta3 = TimmerKonig_lightcurve_simulator(powerlaw_with_parameters, ... 20, 1*u.h, power_spectrum_params={"i":2}) References ---------- .. [Timmer1995] "On generating power law noise", J. Timmer and M, Konig, section 3 https://ui.adsabs.harvard.edu/abs/1995A%26A...300..707T/abstract """ if not callable(power_spectrum): raise ValueError( "The power spectrum has to be provided as a callable function." ) if not isinstance(npoints * nchunks, int): raise TypeError("npoints and nchunks must be integers") if poisson: if isinstance(mean, u.Quantity): wmean = mean.value * spacing.value else: wmean = mean * spacing.value if wmean < 1.0: raise Warning( "Poisson noise was requested but the target mean is too low - resulting counts will likely be 0." ) random_state = get_random_state(random_state) npoints_ext = npoints * nchunks frequencies = np.fft.fftfreq(npoints_ext, spacing.value) # To obtain real data only the positive or negative part of the frequency is necessary. real_frequencies = np.sort(np.abs(frequencies[frequencies < 0])) if power_spectrum_params: periodogram = power_spectrum(real_frequencies, **power_spectrum_params) else: periodogram = power_spectrum(real_frequencies) real_part = random_state.normal(0, 1, len(periodogram) - 1) imaginary_part = random_state.normal(0, 1, len(periodogram) - 1) # Nyquist frequency component handling if npoints_ext % 2 == 0: idx0 = -2 random_factor = random_state.normal(0, 1) else: idx0 = -1 random_factor = random_state.normal(0, 1) + 1j * random_state.normal(0, 1) fourier_coeffs = np.concatenate( [ np.sqrt(0.5 * periodogram[:-1]) * (real_part + 1j * imaginary_part), np.sqrt(0.5 * periodogram[-1:]) * random_factor, ] ) fourier_coeffs = np.concatenate( [fourier_coeffs, np.conjugate(fourier_coeffs[idx0::-1])] ) fourier_coeffs = np.insert(fourier_coeffs, 0, 0) time_series = np.fft.ifft(fourier_coeffs).real ndiv = npoints_ext // (2 * nchunks) setstart = npoints_ext // 2 - ndiv setend = npoints_ext // 2 + ndiv if npoints % 2 != 0: setend = setend + 1 time_series = time_series[setstart:setend] time_series = (time_series - time_series.mean()) / time_series.std() time_series = time_series * std + mean if poisson: time_series = ( random_state.poisson( np.where(time_series >= 0, time_series, 0) * spacing.value ) / spacing.value ) time_axis = np.linspace(0, npoints * spacing.value, npoints) * spacing.unit return time_series, time_axis