Source code for gammapy.time.lightcurve

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.time import Time
from gammapy.utils.scripts import make_path

__all__ = ["LightCurve"]


[docs]class LightCurve: """Lightcurve container. The lightcurve data is stored in ``table``. For now we only support times stored in MJD format! TODO: specification of format is work in progress See https://github.com/open-gamma-ray-astro/gamma-astro-data-formats/pull/61 Usage: :ref:`time-lc` Parameters ---------- table : `~astropy.table.Table` Table with lightcurve data """ def __init__(self, table): self.table = table def __repr__(self): return f"{self.__class__.__name__}(len={len(self.table)})" @property def time_scale(self): """Time scale (str). Taken from table "TIMESYS" header. Common values: "TT" or "UTC". Assumed default is "UTC". """ return self.table.meta.get("TIMESYS", "utc") @property def time_format(self): """Time format (str).""" return "mjd" # @property # def time_ref(self): # """Time reference (`~astropy.time.Time`).""" # return time_ref_from_dict(self.table.meta) def _make_time(self, colname): val = self.table[colname].data scale = self.time_scale format = self.time_format return Time(val, scale=scale, format=format) @property def time(self): """Time (`~astropy.time.Time`).""" return self.time_mid @property def time_min(self): """Time bin start (`~astropy.time.Time`).""" return self._make_time("time_min") @property def time_max(self): """Time bin end (`~astropy.time.Time`).""" return self._make_time("time_max") @property def time_mid(self): """Time bin center (`~astropy.time.Time`).""" return self.time_min + 0.5 * self.time_delta @property def time_delta(self): """Time bin width (`~astropy.time.TimeDelta`).""" return self.time_max - self.time_min
[docs] @classmethod def read(cls, filename, **kwargs): """Read from file. Parameters ---------- filename : str Filename kwargs : dict Keyword arguments passed to `astropy.table.Table.read`. """ filename = make_path(filename) table = Table.read(str(filename), **kwargs) return cls(table=table)
[docs] def write(self, filename, **kwargs): """Write to file. Parameters ---------- filename : str Filename kwargs : dict Keyword arguments passed to `astropy.table.Table.write`. """ filename = make_path(filename) self.table.write(str(filename), **kwargs)
[docs] def compute_fvar(self): r"""Calculate the fractional excess variance. This method accesses the the ``FLUX`` and ``FLUX_ERR`` columns from the lightcurve data. The fractional excess variance :math:`F_{var}`, an intrinsic variability estimator, is given by .. math:: F_{var} = \sqrt{\frac{S^{2} - \bar{\sigma^{2}}}{\bar{x}^{2}}}. It is the excess variance after accounting for the measurement errors on the light curve :math:`\sigma`. :math:`S` is the variance. Returns ------- fvar, fvar_err : `~numpy.ndarray` Fractional excess variance. References ---------- .. [Vaughan2003] "On characterizing the variability properties of X-ray light curves from active galaxies", Vaughan et al. (2003) https://ui.adsabs.harvard.edu/abs/2003MNRAS.345.1271V """ flux = self.table["flux"].data.astype("float64") flux_err = self.table["flux_err"].data.astype("float64") flux_mean = np.mean(flux) n_points = len(flux) s_square = np.sum((flux - flux_mean) ** 2) / (n_points - 1) sig_square = np.nansum(flux_err ** 2) / n_points fvar = np.sqrt(np.abs(s_square - sig_square)) / flux_mean sigxserr_a = np.sqrt(2 / n_points) * (sig_square / flux_mean) ** 2 sigxserr_b = np.sqrt(sig_square / n_points) * (2 * fvar / flux_mean) sigxserr = np.sqrt(sigxserr_a ** 2 + sigxserr_b ** 2) fvar_err = sigxserr / (2 * fvar) return fvar, fvar_err
[docs] def compute_chisq(self): """Calculate the chi-square test for `LightCurve`. Chisquare test is a variability estimator. It computes deviations from the expected value here mean value Returns ------- ChiSq, P-value : tuple of float or `~numpy.ndarray` Tuple of Chi-square and P-value """ import scipy.stats as stats flux = self.table["flux"] yexp = np.mean(flux) yobs = flux.data chi2, pval = stats.chisquare(yobs, yexp) return chi2, pval
[docs] def plot(self, ax=None, time_format="mjd", flux_unit="cm-2 s-1", **kwargs): """Plot flux points. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional. The `~matplotlib.axes.Axes` object to be drawn on. If None, uses the current `~matplotlib.axes.Axes`. time_format : {'mjd', 'iso'}, optional If 'iso', the x axis will contain Matplotlib dates. For formatting these dates see: https://matplotlib.org/gallery/ticks_and_spines/date_demo_rrule.html flux_unit : str, `~astropy.units.Unit`, optional Unit of the flux axis kwargs : dict Keyword arguments passed to :func:`matplotlib.pyplot.errorbar` Returns ------- ax : `~matplotlib.axes.Axes` Axis object """ import matplotlib.pyplot as plt from matplotlib.dates import DateFormatter if ax is None: ax = plt.gca() y, yerr = self._get_fluxes_and_errors(flux_unit) is_ul, yul = self._get_flux_uls(flux_unit) x, xerr = self._get_times_and_errors(time_format) # length of the ul arrow ul_arr = ( np.nanmax(np.concatenate((y[~is_ul], yul[is_ul]))) - np.nanmin(np.concatenate((y[~is_ul], yul[is_ul]))) ) * 0.1 # join fluxes and upper limits for the plot y[is_ul] = yul[is_ul] yerr[0][is_ul] = ul_arr # set plotting defaults and plot kwargs.setdefault("marker", "+") kwargs.setdefault("ls", "None") ax.errorbar(x=x, y=y, xerr=xerr, yerr=yerr, uplims=is_ul, **kwargs) ax.set_xlabel("Time ({})".format(time_format.upper())) ax.set_ylabel("Flux ({:FITS})".format(u.Unit(flux_unit))) if time_format == "iso": ax.xaxis.set_major_formatter(DateFormatter("%Y-%m-%d %H:%M:%S")) plt.setp( ax.xaxis.get_majorticklabels(), rotation=30, ha="right", rotation_mode="anchor", ) return ax
def _get_fluxes_and_errors(self, unit="cm-2 s-1"): """Extract fluxes and corresponding errors Helper function for the plot method. Parameters ---------- unit : str, `~astropy.units.Unit`, optional Unit of the returned flux and errors values Returns ------- y : `numpy.ndarray` Flux values (yn, yp) : tuple of `numpy.ndarray` Flux error values """ y = self.table["flux"].quantity.to(unit) if all(k in self.table.colnames for k in ["flux_errp", "flux_errn"]): yp = self.table["flux_errp"].quantity.to(unit) yn = self.table["flux_errn"].quantity.to(unit) elif "flux_err" in self.table.colnames: yp = self.table["flux_err"].quantity.to(unit) yn = self.table["flux_err"].quantity.to(unit) else: yp, yn = np.zeros_like(y), np.zeros_like(y) return y.value, (yn.value, yp.value) def _get_flux_uls(self, unit="cm-2 s-1"): """Extract flux upper limits Helper function for the plot method. Parameters ---------- unit : str, `~astropy.units.Unit`, optional Unit of the returned flux upper limit values Returns ------- is_ul : `numpy.ndarray` Is flux point is an upper limit? (boolean array) yul : `numpy.ndarray` Flux upper limit values """ try: is_ul = self.table["is_ul"].data.astype("bool") except KeyError: is_ul = np.zeros_like(self.table["flux"]).data.astype("bool") if is_ul.any(): yul = self.table["flux_ul"].quantity.to(unit) else: yul = np.zeros_like(self.table["flux"]).quantity yul[:] = np.nan return is_ul, yul.value def _get_times_and_errors(self, time_format="mjd"): """Extract times and corresponding errors Helper function for the plot method. Parameters ---------- time_format : {'mjd', 'iso'}, optional Time format of the times. If 'iso', times and errors will be returned as `~datetime.datetime` and `~datetime.timedelta` objects Returns ------- x : `~numpy.ndarray` or of `~datetime.datetime` Time values or `~datetime.datetime` instances if 'iso' is chosen as time format (xn, xp) : tuple of `numpy.ndarray` of `~datetime.timedelta` Tuple of time error values or `~datetime.timedelta` instances if 'iso' is chosen as time format """ from matplotlib.dates import num2timedelta x = self.time try: xn, xp = x - self.time_min, self.time_max - x except KeyError: xn, xp = x - x, x - x # convert to given time format if time_format == "iso": x = x.datetime # TODO: In astropy version >3.1 the TimeDelta.to_datetime() method # should start working, for now i will use num2timedelta. xn = np.asarray(num2timedelta(xn.to("d").value)) xp = np.asarray(num2timedelta(xp.to("d").value)) elif time_format == "mjd": x = x.mjd xn, xp = xn.to("d").value, xp.to("d").value else: raise ValueError(f"Invalid time_format: {time_format}") return x, (xn, xp)