# Licensed under a 3-clause BSD style license - see LICENSE.rst
import copy
from operator import le, lt
import numpy as np
import astropy.units as u
from astropy.io import fits
from astropy.table import Table, vstack
from astropy.time import Time
from astropy.units import Quantity
from gammapy.utils.scripts import make_path
from gammapy.utils.time import (
time_ref_from_dict,
time_ref_to_dict,
time_relative_to_ref,
)
__all__ = ["GTI"]
[docs]class GTI:
"""Good time intervals (GTI) `~astropy.table.Table`.
Data format specification: :ref:`gadf:iact-gti`
Note: at the moment dead-time and live-time is in the
EVENTS header ... the GTI header just deals with
observation times.
Parameters
----------
table : `~astropy.table.Table`
GTI table
Examples
--------
Load GTIs for a H.E.S.S. event list:
>>> from gammapy.data import GTI
>>> gti = GTI.read('$GAMMAPY_DATA/hess-dl3-dr1//data/hess_dl3_dr1_obs_id_023523.fits.gz')
>>> print(gti)
GTI info:
- Number of GTIs: 1
- Duration: 1687.0 s
- Start: 53343.92234009259 MET
- Start: 2004-12-04T22:08:10.184 (time standard: TT)
- Stop: 53343.94186555556 MET
- Stop: 2004-12-04T22:36:17.184 (time standard: TT)
Load GTIs for a Fermi-LAT event list:
>>> gti = GTI.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-events.fits.gz")
>>> print(gti)
GTI info:
- Number of GTIs: 39042
- Duration: 183139597.9032163 s
- Start: 54682.65603794185 MET
- Start: 2008-08-04T15:44:41.678 (time standard: TT)
- Stop: 57236.96833546296 MET
- Stop: 2015-08-02T23:14:24.184 (time standard: TT)
"""
def __init__(self, table):
self.table = table
[docs] def copy(self):
return copy.deepcopy(self)
[docs] @classmethod
def create(cls, start, stop, reference_time="2000-01-01"):
"""Creates a GTI table from start and stop times.
Parameters
----------
start : `~astropy.time.Time` or `~astropy.units.Quantity`
Start times, if a quantity then w.r.t. reference time
stop : `~astropy.time.Time` or `~astropy.units.Quantity`
Stop times, if a quantity then w.r.t. reference time
reference_time : `~astropy.time.Time`
the reference time to use in GTI definition
"""
reference_time = Time(reference_time)
if isinstance(start, Time):
start = (start - reference_time).to(u.s)
if isinstance(stop, Time):
stop = (stop - reference_time).to(u.s)
start = Quantity(start, ndmin=1)
stop = Quantity(stop, ndmin=1)
meta = time_ref_to_dict(reference_time)
table = Table({"START": start.to("s"), "STOP": stop.to("s")}, meta=meta)
return cls(table)
[docs] @classmethod
def read(cls, filename, hdu="GTI"):
"""Read from FITS file.
Parameters
----------
filename : `pathlib.Path`, str
Filename
hdu : str
hdu name. Default GTI.
"""
filename = make_path(filename)
table = Table.read(filename, hdu=hdu)
return cls(table)
[docs] def to_table_hdu(self, format="gadf"):
"""
Convert this GTI instance to a `astropy.io.fits.BinTableHDU`.
Parameters
----------
format: str
Output format, currently only "gadf" is supported
Returns
-------
hdu: `astropy.io.fits.BinTableHDU`
GTI table converted to FITS representation
"""
if format != "gadf":
raise ValueError(f'Only the "gadf" format supported, got {format}')
return fits.BinTableHDU(self.table, name="GTI")
[docs] def write(self, filename, **kwargs):
"""Write to file.
Parameters
----------
filename : str or `Path`
File name to write to.
"""
hdu = self.to_table_hdu()
hdulist = fits.HDUList([fits.PrimaryHDU(), hdu])
hdulist.writeto(make_path(filename), **kwargs)
def __str__(self):
return (
"GTI info:\n"
f"- Number of GTIs: {len(self.table)}\n"
f"- Duration: {self.time_sum}\n"
f"- Start: {self.time_start[0]} MET\n"
f"- Start: {self.time_start[0].fits} (time standard: {self.time_start[-1].scale.upper()})\n"
f"- Stop: {self.time_stop[-1]} MET\n"
f"- Stop: {self.time_stop[-1].fits} (time standard: {self.time_stop[-1].scale.upper()})\n"
)
@property
def time_delta(self):
"""GTI durations in seconds (`~astropy.units.Quantity`)."""
start = self.table["START"].astype("float64")
stop = self.table["STOP"].astype("float64")
return Quantity(stop - start, "second")
@property
def time_ref(self):
"""Time reference (`~astropy.time.Time`)."""
return time_ref_from_dict(self.table.meta)
@property
def time_sum(self):
"""Sum of GTIs in seconds (`~astropy.units.Quantity`)."""
return self.time_delta.sum()
@property
def time_start(self):
"""GTI start times (`~astropy.time.Time`)."""
met = Quantity(self.table["START"].astype("float64"), "second")
return self.time_ref + met
@property
def time_stop(self):
"""GTI end times (`~astropy.time.Time`)."""
met = Quantity(self.table["STOP"].astype("float64"), "second")
return self.time_ref + met
@property
def time_intervals(self):
"""List of time intervals"""
return [
(t_start, t_stop)
for t_start, t_stop in zip(self.time_start, self.time_stop)
]
[docs] @classmethod
def from_time_intervals(cls, time_intervals, reference_time="2000-01-01"):
"""From list of time intervals
Parameters
----------
time_intervals : list of `~astropy.time.Time` objects
Time intervals
reference_time : `~astropy.time.Time`
Reference time to use in GTI definition
Returns
-------
gti : `GTI`
GTI table.
"""
reference_time = Time(reference_time)
start = Time([_[0] for _ in time_intervals]) - reference_time
stop = Time([_[1] for _ in time_intervals]) - reference_time
meta = time_ref_to_dict(reference_time)
table = Table({"START": start.to("s"), "STOP": stop.to("s")}, meta=meta)
return cls(table=table)
[docs] def select_time(self, time_interval):
"""Select and crop GTIs in time interval.
Parameters
----------
time_interval : `astropy.time.Time`
Start and stop time for the selection.
Returns
-------
gti : `GTI`
Copy of the GTI table with selection applied.
"""
# get GTIs that fall within the time_interval
mask = self.time_start < time_interval[1]
mask &= self.time_stop > time_interval[0]
gti_within = self.table[mask]
# crop the GTIs
start_met = time_relative_to_ref(time_interval[0], self.table.meta)
stop_met = time_relative_to_ref(time_interval[1], self.table.meta)
np.clip(
gti_within["START"],
start_met.value,
stop_met.value,
out=gti_within["START"],
)
np.clip(
gti_within["STOP"], start_met.value, stop_met.value, out=gti_within["STOP"]
)
return self.__class__(gti_within)
[docs] def stack(self, other):
"""Stack with another GTI in place.
This simply changes the time reference of the second GTI table
and stack the two tables. No logic is applied to the intervals.
Parameters
----------
other : `~gammapy.data.GTI`
GTI to stack to self
"""
start = (other.time_start - self.time_ref).sec
end = (other.time_stop - self.time_ref).sec
table = Table({"START": start, "STOP": end}, names=["START", "STOP"])
self.table = vstack([self.table, table])
[docs] @classmethod
def from_stack(cls, gtis, **kwargs):
"""Stack (concatenate) list of event lists.
Calls `~astropy.table.vstack`.
Parameters
----------
gtis : list of `GTI`
List of good time intervals to stack
**kwargs : dict
Keywords passed on to `~astropy.table.vstack`
Returns
-------
gti : `GTI`
Stacked good time intervals.
"""
tables = [_.table for _ in gtis]
stacked_table = vstack(tables, **kwargs)
return cls(stacked_table)
[docs] def union(self, overlap_ok=True, merge_equal=True):
"""Union of overlapping time intervals.
Returns a new `~gammapy.data.GTI` object.
Parameters
----------
overlap_ok : bool
Whether to raise an error when overlapping time bins are found.
merge_equal : bool
Whether to merge touching time bins e.g. ``(1, 2)`` and ``(2, 3)``
will result in ``(1, 3)``.
"""
# Algorithm to merge overlapping intervals is well-known,
# see e.g. https://stackoverflow.com/a/43600953/498873
table = self.table.copy()
table.sort("START")
compare = lt if merge_equal else le
# We use Python dict instead of astropy.table.Row objects,
# because on some versions modifying Row entries doesn't behave as expected
merged = [{"START": table[0]["START"], "STOP": table[0]["STOP"]}]
for row in table[1:]:
interval = {"START": row["START"], "STOP": row["STOP"]}
if compare(merged[-1]["STOP"], interval["START"]):
merged.append(interval)
else:
if not overlap_ok:
raise ValueError("Overlapping time bins")
merged[-1]["STOP"] = max(interval["STOP"], merged[-1]["STOP"])
merged = Table(rows=merged, names=["START", "STOP"], meta=self.table.meta)
return self.__class__(merged)
[docs] def group_table(self, time_intervals, atol="1e-6 s"):
"""Compute the table with the info on the group to which belong each time interval.
The t_start and t_stop are stored in MJD from a scale in "utc".
Parameters
----------
time_intervals : list of `astropy.time.Time`
Start and stop time for each interval to compute the LC
atol : `~astropy.units.Quantity`
Tolerance value for time comparison with different scale. Default 1e-6 sec.
Returns
-------
group_table : `~astropy.table.Table`
Contains the grouping info.
"""
atol = Quantity(atol)
group_table = Table(
names=("group_idx", "time_min", "time_max", "bin_type"),
dtype=("i8", "f8", "f8", "S10"),
)
time_intervals_lowedges = Time(
[time_interval[0] for time_interval in time_intervals]
)
time_intervals_upedges = Time(
[time_interval[1] for time_interval in time_intervals]
)
for t_start, t_stop in zip(self.time_start, self.time_stop):
mask1 = t_start >= time_intervals_lowedges - atol
mask2 = t_stop <= time_intervals_upedges + atol
mask = mask1 & mask2
if np.any(mask):
group_index = np.where(mask)[0]
bin_type = ""
else:
group_index = -1
if np.any(mask1):
bin_type = "overflow"
elif np.any(mask2):
bin_type = "underflow"
else:
bin_type = "outflow"
group_table.add_row(
[group_index, t_start.utc.mjd, t_stop.utc.mjd, bin_type]
)
return group_table