Source code for gammapy.data.gti

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import copy
from operator import le, lt
import numpy as np
from astropy.io import fits
from astropy.table import Table, vstack
from astropy.time import Time
from astropy.units import Quantity
from gammapy.utils.scripts import make_path
from gammapy.utils.time import (
    time_ref_from_dict,
    time_ref_to_dict,
    time_relative_to_ref,
)

__all__ = ["GTI"]


[docs]class GTI: """Good time intervals (GTI) `~astropy.table.Table`. Data format specification: :ref:`gadf:iact-gti` Note: at the moment dead-time and live-time is in the EVENTS header ... the GTI header just deals with observation times. Parameters ---------- table : `~astropy.table.Table` GTI table Examples -------- Load GTIs for a H.E.S.S. event list: >>> from gammapy.data import GTI >>> gti = GTI.read('$GAMMAPY_DATA/hess-dl3-dr1//data/hess_dl3_dr1_obs_id_023523.fits.gz') >>> print(gti) GTI info: - Number of GTIs: 1 - Duration: 1687.0 s - Start: 53343.92234009259 MET - Start: 2004-12-04T22:08:10.184 (time standard: TT) - Stop: 53343.94186555556 MET - Stop: 2004-12-04T22:36:17.184 (time standard: TT) Load GTIs for a Fermi-LAT event list: >>> gti = GTI.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-events.fits.gz") >>> print(gti) GTI info: - Number of GTIs: 39042 - Duration: 183139597.9032163 s - Start: 54682.65603794185 MET - Start: 2008-08-04T15:44:41.678 (time standard: TT) - Stop: 57236.96833546296 MET - Stop: 2015-08-02T23:14:24.184 (time standard: TT) """ def __init__(self, table): self.table = table
[docs] def copy(self): return copy.deepcopy(self)
[docs] @classmethod def create(cls, start, stop, reference_time="2000-01-01"): """Creates a GTI table from start and stop times. Parameters ---------- start : `~astropy.units.Quantity` start times w.r.t. reference time stop : `~astropy.units.Quantity` stop times w.r.t. reference time reference_time : `~astropy.time.Time` the reference time to use in GTI definition """ start = Quantity(start, ndmin=1) stop = Quantity(stop, ndmin=1) reference_time = Time(reference_time) meta = time_ref_to_dict(reference_time) table = Table({"START": start.to("s"), "STOP": stop.to("s")}, meta=meta) return cls(table)
[docs] @classmethod def read(cls, filename, hdu="GTI"): """Read from FITS file. Parameters ---------- filename : `pathlib.Path`, str Filename hdu : str hdu name. Default GTI. """ filename = make_path(filename) table = Table.read(filename, hdu=hdu) return cls(table)
[docs] def write(self, filename, **kwargs): """Write to file. Parameters ---------- filename : str or `Path` File name to write to. """ hdu = fits.BinTableHDU(self.table, name="GTI") hdulist = fits.HDUList([fits.PrimaryHDU(), hdu]) hdulist.writeto(make_path(filename), **kwargs)
def __str__(self): return ( "GTI info:\n" f"- Number of GTIs: {len(self.table)}\n" f"- Duration: {self.time_sum}\n" f"- Start: {self.time_start[0]} MET\n" f"- Start: {self.time_start[0].fits} (time standard: {self.time_start[-1].scale.upper()})\n" f"- Stop: {self.time_stop[-1]} MET\n" f"- Stop: {self.time_stop[-1].fits} (time standard: {self.time_stop[-1].scale.upper()})\n" ) @property def time_delta(self): """GTI durations in seconds (`~astropy.units.Quantity`).""" start = self.table["START"].astype("float64") stop = self.table["STOP"].astype("float64") return Quantity(stop - start, "second") @property def time_ref(self): """Time reference (`~astropy.time.Time`).""" return time_ref_from_dict(self.table.meta) @property def time_sum(self): """Sum of GTIs in seconds (`~astropy.units.Quantity`).""" return self.time_delta.sum() @property def time_start(self): """GTI start times (`~astropy.time.Time`).""" met = Quantity(self.table["START"].astype("float64"), "second") return self.time_ref + met @property def time_stop(self): """GTI end times (`~astropy.time.Time`).""" met = Quantity(self.table["STOP"].astype("float64"), "second") return self.time_ref + met @property def time_intervals(self): """List of time intervals""" return [ (t_start, t_stop) for t_start, t_stop in zip(self.time_start, self.time_stop) ]
[docs] @classmethod def from_time_intervals(cls, time_intervals, reference_time="2000-01-01"): """From list of time intervals Parameters ---------- time_intervals : list of `~astropy.time.Time` objects Time intervals reference_time : `~astropy.time.Time` Reference time to use in GTI definition Returns ------- gti : `GTI` GTI table. """ reference_time = Time(reference_time) start = Time([_[0] for _ in time_intervals]) - reference_time stop = Time([_[1] for _ in time_intervals]) - reference_time meta = time_ref_to_dict(reference_time) table = Table({"START": start.to("s"), "STOP": stop.to("s")}, meta=meta) return cls(table=table)
[docs] def select_time(self, time_interval): """Select and crop GTIs in time interval. Parameters ---------- time_interval : `astropy.time.Time` Start and stop time for the selection. Returns ------- gti : `GTI` Copy of the GTI table with selection applied. """ # get GTIs that fall within the time_interval mask = self.time_start < time_interval[1] mask &= self.time_stop > time_interval[0] gti_within = self.table[mask] # crop the GTIs start_met = time_relative_to_ref(time_interval[0], self.table.meta) stop_met = time_relative_to_ref(time_interval[1], self.table.meta) np.clip( gti_within["START"], start_met.value, stop_met.value, out=gti_within["START"], ) np.clip( gti_within["STOP"], start_met.value, stop_met.value, out=gti_within["STOP"] ) return self.__class__(gti_within)
[docs] def stack(self, other): """Stack with another GTI in place. This simply changes the time reference of the second GTI table and stack the two tables. No logic is applied to the intervals. Parameters ---------- other : `~gammapy.data.GTI` GTI to stack to self """ start = (other.time_start - self.time_ref).sec end = (other.time_stop - self.time_ref).sec table = Table({"START": start, "STOP": end}, names=["START", "STOP"]) self.table = vstack([self.table, table])
[docs] @classmethod def from_stack(cls, gtis, **kwargs): """Stack (concatenate) list of event lists. Calls `~astropy.table.vstack`. Parameters ---------- gtis : list of `GTI` List of good time intervals to stack **kwargs : dict Keywords passed on to `~astropy.table.vstack` Returns ------- gti : `GTI` Stacked good time intervals. """ tables = [_.table for _ in gtis] stacked_table = vstack(tables, **kwargs) return cls(stacked_table)
[docs] def union(self, overlap_ok=True, merge_equal=True): """Union of overlapping time intervals. Returns a new `~gammapy.data.GTI` object. Parameters ---------- overlap_ok : bool Whether to raise an error when overlapping time bins are found. merge_equal : bool Whether to merge touching time bins e.g. ``(1, 2)`` and ``(2, 3)`` will result in ``(1, 3)``. """ # Algorithm to merge overlapping intervals is well-known, # see e.g. https://stackoverflow.com/a/43600953/498873 table = self.table.copy() table.sort("START") compare = lt if merge_equal else le # We use Python dict instead of astropy.table.Row objects, # because on some versions modifying Row entries doesn't behave as expected merged = [{"START": table[0]["START"], "STOP": table[0]["STOP"]}] for row in table[1:]: interval = {"START": row["START"], "STOP": row["STOP"]} if compare(merged[-1]["STOP"], interval["START"]): merged.append(interval) else: if not overlap_ok: raise ValueError("Overlapping time bins") merged[-1]["STOP"] = max(interval["STOP"], merged[-1]["STOP"]) merged = Table(rows=merged, names=["START", "STOP"], meta=self.table.meta) return self.__class__(merged)
[docs] def group_table(self, time_intervals, atol="1e-6 s"): """Compute the table with the info on the group to which belong each time interval. The t_start and t_stop are stored in MJD from a scale in "utc". Parameters ---------- time_intervals : list of `astropy.time.Time` Start and stop time for each interval to compute the LC atol : `~astropy.units.Quantity` Tolerance value for time comparison with different scale. Default 1e-6 sec. Returns ------- group_table : `~astropy.table.Table` Contains the grouping info. """ atol = Quantity(atol) group_table = Table( names=("group_idx", "time_min", "time_max", "bin_type"), dtype=("i8", "f8", "f8", "S10"), ) time_intervals_lowedges = Time( [time_interval[0] for time_interval in time_intervals] ) time_intervals_upedges = Time( [time_interval[1] for time_interval in time_intervals] ) for t_start, t_stop in zip(self.time_start, self.time_stop): mask1 = t_start >= time_intervals_lowedges - atol mask2 = t_stop <= time_intervals_upedges + atol mask = mask1 & mask2 if np.any(mask): group_index = np.where(mask)[0] bin_type = "" else: group_index = -1 if np.any(mask1): bin_type = "overflow" elif np.any(mask2): bin_type = "underflow" else: bin_type = "outflow" group_table.add_row( [group_index, t_start.utc.mjd, t_stop.utc.mjd, bin_type] ) return group_table