# Licensed under a 3-clause BSD style license - see LICENSE.rst
import copy
import html
import warnings
from operator import le, lt
import numpy as np
import astropy.units as u
from astropy.io import fits
from astropy.table import Table, vstack
from astropy.time import Time
from gammapy.utils.scripts import make_path
from gammapy.utils.time import TIME_REF_DEFAULT, time_ref_from_dict, time_ref_to_dict
from .metadata import GTIMetaData
__all__ = ["GTI"]
[docs]class GTI:
"""Good time intervals (GTI) `~astropy.table.Table`.
Data format specification: :ref:`gadf:iact-gti`.
Note: at the moment dead-time and live-time is in the
EVENTS header ... the GTI header just deals with
observation times.
Parameters
----------
table : `~astropy.table.Table`
GTI table.
reference_time : `~astropy.time.Time`, optional
The reference time. Default is None.
If None, use TIME_REF_DEFAULT.
Examples
--------
Load GTIs for a H.E.S.S. event list:
>>> from gammapy.data import GTI
>>> gti = GTI.read('$GAMMAPY_DATA/hess-dl3-dr1//data/hess_dl3_dr1_obs_id_023523.fits.gz')
>>> print(gti)
GTI info:
- Number of GTIs: 1
- Duration: 1687.0000000000016 s
- Start: 123890826.00000001 s MET
- Start: 2004-12-04T22:08:10.184 (time standard: TT)
- Stop: 123892513.00000001 s MET
- Stop: 2004-12-04T22:36:17.184 (time standard: TT)
<BLANKLINE>
Load GTIs for a Fermi-LAT event list:
>>> gti = GTI.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-events.fits.gz")
>>> print(gti)
GTI info:
- Number of GTIs: 39042
- Duration: 183139597.9032163 s
- Start: 239557417.49417615 s MET
- Start: 2008-08-04T15:44:41.678 (time standard: TT)
- Stop: 460249999.99999994 s MET
- Stop: 2015-08-02T23:14:24.184 (time standard: TT)
<BLANKLINE>
"""
def __init__(self, table, reference_time=None):
self.table = self._validate_table(table)
if reference_time is None:
reference_time = TIME_REF_DEFAULT
meta = GTIMetaData()
meta.reference_time = reference_time
self._meta = meta
def _repr_html_(self):
try:
return self.to_html()
except AttributeError:
return f"<pre>{html.escape(str(self))}</pre>"
@staticmethod
def _validate_table(table):
"""Check that the input GTI fits the gammapy internal model."""
if not isinstance(table, Table):
raise TypeError("GTI table is not an astropy Table.")
colnames = ["START", "STOP"]
if not set(colnames).issubset(table.colnames):
raise ValueError("GTI table not correctly defined.")
if len(table) == 0:
return table
for name in colnames:
if not isinstance(table[name], Time):
raise TypeError(f"Column {name} is not a Time object.")
return table
[docs] def copy(self):
"""Deep copy of the `~gammapy.data.GIT` object."""
return copy.deepcopy(self)
[docs] @classmethod
def create(cls, start, stop, reference_time=None):
"""Create a GTI table from start and stop times.
Parameters
----------
start : `~astropy.time.Time` or `~astropy.units.Quantity`
Start times, if a quantity then w.r.t. reference time.
stop : `~astropy.time.Time` or `~astropy.units.Quantity`
Stop times, if a quantity then w.r.t. reference time.
reference_time : `~astropy.time.Time`, optional
The reference time to use in GTI definition. Default is None.
If None, use TIME_REF_DEFAULT.
"""
if reference_time is None:
reference_time = TIME_REF_DEFAULT
reference_time = Time(reference_time)
reference_time.format = "mjd"
if not isinstance(start, Time):
start = reference_time + u.Quantity(start)
if not isinstance(stop, Time):
stop = reference_time + u.Quantity(stop)
table = Table({"START": np.atleast_1d(start), "STOP": np.atleast_1d(stop)})
return cls(table, reference_time=reference_time)
[docs] @classmethod
def read(cls, filename, hdu="GTI", format="gadf", checksum=False):
"""Read from FITS file.
Parameters
----------
filename : `pathlib.Path` or str
Filename
hdu : str
hdu name. Default is "GTI".
format: str
Input format, currently only "gadf" is supported. Default is "gadf".
checksum : bool
If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False.
"""
filename = make_path(filename)
with fits.open(str(make_path(filename)), memmap=False) as hdulist:
gti_hdu = hdulist[hdu]
if checksum:
if gti_hdu.verify_checksum() != 1:
warnings.warn(
f"Checksum verification failed for HDU {hdu} of {filename}.",
UserWarning,
)
return cls.from_table_hdu(gti_hdu, format=format)
[docs] @classmethod
def from_table_hdu(cls, table_hdu, format="gadf"):
"""Read from table HDU.
Parameters
----------
table_hdu : `~astropy.io.fits.BinTableHDU`
table hdu.
format : {"gadf"}
Input format, currently only "gadf" is supported. Default is "gadf".
"""
if format != "gadf":
raise ValueError(f'Only the "gadf" format supported, got {format}')
table = Table.read(table_hdu)
time_ref = time_ref_from_dict(table.meta, format="mjd", scale="tt")
# Check if TIMEUNIT keyword is present, otherwise assume seconds
unit = table.meta.pop("TIMEUNIT", "s")
start = u.Quantity(table["START"], unit)
stop = u.Quantity(table["STOP"], unit)
return cls.create(start, stop, time_ref)
[docs] def to_table_hdu(self, format="gadf"):
"""
Convert this GTI instance to a `astropy.io.fits.BinTableHDU`.
Parameters
----------
format : str, optional
Output format, currently only "gadf" is supported. Default is "gadf".
Returns
-------
hdu : `astropy.io.fits.BinTableHDU`
GTI table converted to FITS representation.
"""
if format != "gadf":
raise ValueError(f'Only the "gadf" format supported, got {format}')
# Don't impose the scale. GADF does not require it to be TT
meta = time_ref_to_dict(self.time_ref, scale=self.time_ref.scale)
start = self.time_start - self.time_ref
stop = self.time_stop - self.time_ref
table = Table({"START": start.to("s"), "STOP": stop.to("s")}, meta=meta)
return fits.BinTableHDU(table, name="GTI")
[docs] def write(self, filename, **kwargs):
"""Write to file.
Parameters
----------
filename : str or `~pathlib.Path`
Filename.
**kwargs : dict, optional
Keyword arguments passed to `~astropy.io.fits.HDUList.writeto`.
"""
hdu = self.to_table_hdu()
hdulist = fits.HDUList([fits.PrimaryHDU(), hdu])
hdulist.writeto(make_path(filename), **kwargs)
def __str__(self):
t_start_met = self.met_start[0]
t_stop_met = self.met_stop[-1]
t_start = self.time_start[0].fits
t_stop = self.time_stop[-1].fits
return (
"GTI info:\n"
f"- Number of GTIs: {len(self.table)}\n"
f"- Duration: {self.time_sum}\n"
f"- Start: {t_start_met} MET\n"
f"- Start: {t_start} (time standard: {self.time_start[-1].scale.upper()})\n"
f"- Stop: {t_stop_met} MET\n"
f"- Stop: {t_stop} (time standard: {self.time_stop[-1].scale.upper()})\n"
)
@property
def time_delta(self):
"""GTI durations in seconds as a `~astropy.units.Quantity`."""
delta = self.time_stop - self.time_start
return delta.to("s")
@property
def time_ref(self):
"""Time reference as a `~astropy.time.Time` object."""
return self._meta.reference_time
@property
def time_sum(self):
"""Sum of GTIs in seconds as a `~astropy.units.Quantity`."""
return self.time_delta.sum()
@property
def time_start(self):
"""GTI start times as a `~astropy.time.Time` object."""
return self.table["START"]
@property
def time_stop(self):
"""GTI end times as a `~astropy.time.Time` object."""
return self.table["STOP"]
@property
def met_start(self):
"""GTI start time difference with reference time in seconds, MET as a `~astropy.units.Quantity`."""
return (self.time_start - self.time_ref).to("s")
@property
def met_stop(self):
"""GTI start time difference with reference time in seconds, MET as a `~astropy.units.Quantity`."""
return (self.time_stop - self.time_ref).to("s")
@property
def time_intervals(self):
"""List of time intervals."""
return [
(t_start, t_stop)
for t_start, t_stop in zip(self.time_start, self.time_stop)
]
[docs] @classmethod
def from_time_intervals(cls, time_intervals, reference_time=None):
"""From list of time intervals.
Parameters
----------
time_intervals : list of `~astropy.time.Time` objects
Time intervals.
reference_time : `~astropy.time.Time`, optional
Reference time to use in GTI definition. Default is None.
If None, use TIME_REF_DEFAULT.
Returns
-------
gti : `GTI`
GTI table.
"""
start = Time([_[0] for _ in time_intervals])
stop = Time([_[1] for _ in time_intervals])
if reference_time is None:
reference_time = TIME_REF_DEFAULT
return cls.create(start, stop, reference_time)
[docs] def select_time(self, time_interval):
"""Select and crop GTIs in time interval.
Parameters
----------
time_interval : `astropy.time.Time`
Start and stop time for the selection.
Returns
-------
gti : `GTI`
Copy of the GTI table with selection applied.
"""
interval_start, interval_stop = time_interval
interval_start.format = self.time_start.format
interval_stop.format = self.time_stop.format
# get GTIs that fall within the time_interval
mask = self.time_start < interval_stop
mask &= self.time_stop > interval_start
gti_within = self.table[mask]
# crop the GTIs
gti_within["START"] = np.clip(
gti_within["START"], interval_start, interval_stop
)
gti_within["STOP"] = np.clip(gti_within["STOP"], interval_start, interval_stop)
return self.__class__(gti_within)
[docs] def delete_interval(self, time_interval):
"""Select and crop GTIs in time interval.
Parameters
----------
time_interval : [`astropy.time.Time`, `astropy.time.Time`]
Start and stop time for the selection.
Returns
-------
gti : `GTI`
Copy of the GTI table with the bad time interval deleted.
"""
interval_start, interval_stop = time_interval
interval_start.format = self.time_start.format
interval_stop.format = self.time_stop.format
trim_table = self.table.copy()
trim_table["STOP"][
(self.time_start < interval_start) & (self.time_stop > interval_start)
] = interval_start
trim_table["START"][
(self.time_start < interval_stop) & (self.time_stop > interval_stop)
] = interval_stop
mask = (self.time_stop > interval_stop) | (self.time_start < interval_start)
return self.__class__(trim_table[mask])
[docs] def stack(self, other):
"""Stack with another GTI in place.
This simply changes the time reference of the second GTI table
and stack the two tables. No logic is applied to the intervals.
Parameters
----------
other : `~gammapy.data.GTI`
GTI to stack to self.
"""
self.table = self._validate_table(vstack([self.table, other.table]))
[docs] @classmethod
def from_stack(cls, gtis, **kwargs):
"""Stack (concatenate) list of GTIs.
Calls `~astropy.table.vstack`.
Parameters
----------
gtis : list of `GTI`
List of good time intervals to stack.
**kwargs : dict, optional
Keywords passed on to `~astropy.table.vstack`.
Returns
-------
gti : `GTI`
Stacked good time intervals.
"""
tables = [_.table for _ in gtis]
stacked_table = vstack(tables, **kwargs)
return cls(stacked_table)
[docs] def union(self, overlap_ok=True, merge_equal=True):
"""Union of overlapping time intervals.
Returns a new `~gammapy.data.GTI` object.
Parameters
----------
overlap_ok : bool, optional
Whether to raise an error when overlapping time bins are found. Default is True.
merge_equal : bool; optional
Whether to merge touching time bins e.g. ``(1, 2)`` and ``(2, 3)``
will result in ``(1, 3)``. Default is True.
"""
# Algorithm to merge overlapping intervals is well-known,
# see e.g. https://stackoverflow.com/a/43600953/498873
table = self.table.copy()
table.sort("START")
compare = lt if merge_equal else le
# We use Python dict instead of astropy.table.Row objects,
# because on some versions modifying Row entries doesn't behave as expected
merged = [{"START": table[0]["START"], "STOP": table[0]["STOP"]}]
for row in table[1:]:
interval = {"START": row["START"], "STOP": row["STOP"]}
if compare(merged[-1]["STOP"], interval["START"]):
merged.append(interval)
else:
if not overlap_ok:
raise ValueError("Overlapping time bins")
merged[-1]["STOP"] = max(interval["STOP"], merged[-1]["STOP"])
merged = Table(rows=merged, names=["START", "STOP"], meta=self.table.meta)
return self.__class__(merged, reference_time=self.time_ref)
[docs] def group_table(self, time_intervals, atol="1e-6 s"):
"""Compute the table with the info on the group to which belong each time interval.
The t_start and t_stop are stored in MJD from a scale in "utc".
Parameters
----------
time_intervals : list of `astropy.time.Time`
Start and stop time for each interval to compute the LC.
atol : `~astropy.units.Quantity`
Tolerance value for time comparison with different scale. Default is "1e-6 s".
Returns
-------
group_table : `~astropy.table.Table`
Contains the grouping info.
"""
atol = u.Quantity(atol)
group_table = Table(
names=("group_idx", "time_min", "time_max", "bin_type"),
dtype=("i8", "f8", "f8", "S10"),
)
time_intervals_lowedges = Time(
[time_interval[0] for time_interval in time_intervals]
)
time_intervals_upedges = Time(
[time_interval[1] for time_interval in time_intervals]
)
for t_start, t_stop in zip(self.time_start, self.time_stop):
mask1 = t_start >= time_intervals_lowedges - atol
mask2 = t_stop <= time_intervals_upedges + atol
mask = mask1 & mask2
if np.any(mask):
group_index = np.where(mask)[0]
bin_type = ""
else:
group_index = -1
if np.any(mask1):
bin_type = "overflow"
elif np.any(mask2):
bin_type = "underflow"
else:
bin_type = "outflow"
group_table.add_row(
[group_index, t_start.utc.mjd, t_stop.utc.mjd, bin_type]
)
return group_table