# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.time import Time
from gammapy.utils.scripts import make_path
__all__ = ["LightCurve"]
[docs]class LightCurve:
"""Lightcurve container.
The lightcurve data is stored in ``table``.
For now we only support times stored in MJD format!
TODO: specification of format is work in progress
See https://github.com/open-gamma-ray-astro/gamma-astro-data-formats/pull/61
Usage: :ref:`time`
Parameters
----------
table : `~astropy.table.Table`
Table with lightcurve data
"""
def __init__(self, table):
self.table = table
def __repr__(self):
return f"{self.__class__.__name__}(len={len(self.table)})"
@property
def time_scale(self):
"""Time scale (str).
Taken from table "TIMESYS" header.
Common values: "TT" or "UTC".
Assumed default is "UTC".
"""
return self.table.meta.get("TIMESYS", "utc")
@property
def time_format(self):
"""Time format (str)."""
return "mjd"
# @property
# def time_ref(self):
# """Time reference (`~astropy.time.Time`)."""
# return time_ref_from_dict(self.table.meta)
def _make_time(self, colname):
val = self.table[colname].data
scale = self.time_scale
format = self.time_format
return Time(val, scale=scale, format=format)
@property
def time(self):
"""Time (`~astropy.time.Time`)."""
return self.time_mid
@property
def time_min(self):
"""Time bin start (`~astropy.time.Time`)."""
return self._make_time("time_min")
@property
def time_max(self):
"""Time bin end (`~astropy.time.Time`)."""
return self._make_time("time_max")
@property
def time_mid(self):
"""Time bin center (`~astropy.time.Time`)."""
return self.time_min + 0.5 * self.time_delta
@property
def time_delta(self):
"""Time bin width (`~astropy.time.TimeDelta`)."""
return self.time_max - self.time_min
[docs] @classmethod
def read(cls, filename, **kwargs):
"""Read from file.
Parameters
----------
filename : str
Filename
kwargs : dict
Keyword arguments passed to `astropy.table.Table.read`.
"""
table = Table.read(make_path(filename), **kwargs)
return cls(table=table)
[docs] def write(self, filename, **kwargs):
"""Write to file.
Parameters
----------
filename : str
Filename
kwargs : dict
Keyword arguments passed to `astropy.table.Table.write`.
"""
self.table.write(make_path(filename), **kwargs)
[docs] def plot(self, ax=None, time_format="mjd", flux_unit="cm-2 s-1", **kwargs):
"""Plot flux points.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional.
The `~matplotlib.axes.Axes` object to be drawn on.
If None, uses the current `~matplotlib.axes.Axes`.
time_format : {'mjd', 'iso'}, optional
If 'iso', the x axis will contain Matplotlib dates.
For formatting these dates see: https://matplotlib.org/gallery/ticks_and_spines/date_demo_rrule.html
flux_unit : str, `~astropy.units.Unit`, optional
Unit of the flux axis
kwargs : dict
Keyword arguments passed to :func:`matplotlib.pyplot.errorbar`
Returns
-------
ax : `~matplotlib.axes.Axes`
Axis object
"""
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
if ax is None:
ax = plt.gca()
x, xerr = self._get_times_and_errors(time_format)
y, yerr = self._get_fluxes_and_errors(flux_unit)
is_ul, yul = self._get_flux_uls(flux_unit)
# length of the ul arrow
ul_arr = (
np.nanmax(np.concatenate((y[~is_ul], yul[is_ul])))
- np.nanmin(np.concatenate((y[~is_ul], yul[is_ul])))
) * 0.1
# join fluxes and upper limits for the plot
y[is_ul] = yul[is_ul]
yerr[0][is_ul] = ul_arr
# set plotting defaults and plot
kwargs.setdefault("marker", "+")
kwargs.setdefault("ls", "None")
ax.errorbar(x=x, y=y, xerr=xerr, yerr=yerr, uplims=is_ul, **kwargs)
ax.set_xlabel("Time ({})".format(time_format.upper()))
ax.set_ylabel("Flux ({:FITS})".format(u.Unit(flux_unit)))
if time_format == "iso":
ax.xaxis.set_major_formatter(DateFormatter("%Y-%m-%d %H:%M:%S"))
plt.setp(
ax.xaxis.get_majorticklabels(),
rotation=30,
ha="right",
rotation_mode="anchor",
)
return ax
def _get_fluxes_and_errors(self, unit="cm-2 s-1"):
"""Extract fluxes and corresponding errors
Helper function for the plot method.
Parameters
----------
unit : str, `~astropy.units.Unit`, optional
Unit of the returned flux and errors values
Returns
-------
y : `numpy.ndarray`
Flux values
(yn, yp) : tuple of `numpy.ndarray`
Flux error values
"""
y = self.table["flux"].quantity.to(unit)
if all(k in self.table.colnames for k in ["flux_errp", "flux_errn"]):
yp = self.table["flux_errp"].quantity.to(unit)
yn = self.table["flux_errn"].quantity.to(unit)
elif "flux_err" in self.table.colnames:
yp = self.table["flux_err"].quantity.to(unit)
yn = self.table["flux_err"].quantity.to(unit)
else:
yp, yn = np.zeros_like(y), np.zeros_like(y)
return y.value, (yn.value, yp.value)
def _get_flux_uls(self, unit="cm-2 s-1"):
"""Extract flux upper limits
Helper function for the plot method.
Parameters
----------
unit : str, `~astropy.units.Unit`, optional
Unit of the returned flux upper limit values
Returns
-------
is_ul : `numpy.ndarray`
Is flux point is an upper limit? (boolean array)
yul : `numpy.ndarray`
Flux upper limit values
"""
try:
is_ul = self.table["is_ul"].data.astype("bool")
except KeyError:
is_ul = np.zeros_like(self.table["flux"]).data.astype("bool")
if is_ul.any():
yul = self.table["flux_ul"].quantity.to(unit)
else:
yul = np.zeros_like(self.table["flux"]).quantity
yul[:] = np.nan
return is_ul, yul.value
def _get_times_and_errors(self, time_format="mjd"):
"""Extract times and corresponding errors
Helper function for the plot method.
Parameters
----------
time_format : {'mjd', 'iso'}, optional
Time format of the times. If 'iso', times and errors will be
returned as `~datetime.datetime` and `~datetime.timedelta` objects
Returns
-------
x : `~numpy.ndarray` or of `~datetime.datetime`
Time values or `~datetime.datetime` instances if 'iso' is chosen
as time format
(xn, xp) : tuple of `numpy.ndarray` of `~datetime.timedelta`
Tuple of time error values or `~datetime.timedelta` instances if
'iso' is chosen as time format
"""
x = self.time
try:
xn, xp = x - self.time_min, self.time_max - x
except KeyError:
xn, xp = x - x, x - x
if time_format == "iso":
x = x.datetime
xn = xn.to_datetime()
xp = xp.to_datetime()
elif time_format == "mjd":
x = x.mjd
xn = xn.to("day").value
xp = xp.to("day").value
else:
raise ValueError(f"Invalid time_format: {time_format}")
return x, (xn, xp)