# Licensed under a 3-clause BSD style license - see LICENSE.rst
from collections import namedtuple, OrderedDict
import numpy as np
from astropy.table import Table
from astropy.units import Unit, Quantity
from astropy.coordinates import Angle, SkyCoord
from astropy.time import Time
from astropy.utils import lazyproperty
from ..utils.scripts import make_path
from ..utils.time import time_relative_to_ref
from ..utils.testing import Checker
from ..utils.regions import SphericalCircleSkyRegion
from .gti import GTI
__all__ = ["ObservationTable"]
[docs]class ObservationTable(Table):
"""Observation table.
Data format specification: :ref:`gadf:obs-index`
"""
[docs] @classmethod
def read(cls, filename, **kwargs):
"""Read an observation table from file.
Parameters
----------
filename : `pathlib.Path`, str
Filename
"""
filename = make_path(filename)
return super().read(str(filename), **kwargs)
@property
def pointing_radec(self):
"""Pointing positions as ICRS (`~astropy.coordinates.SkyCoord`)."""
return SkyCoord(self["RA_PNT"], self["DEC_PNT"], unit="deg", frame="icrs")
@property
def pointing_galactic(self):
"""Pointing positions as Galactic (`~astropy.coordinates.SkyCoord`)."""
return SkyCoord(
self["GLON_PNT"], self["GLAT_PNT"], unit="deg", frame="galactic"
)
@lazyproperty
def _index_dict(self):
"""Dict containing row index for all obs ids."""
# TODO: Switch to http://docs.astropy.org/en/latest/table/indexing.html once it is more stable
temp = zip(self["OBS_ID"], np.arange(len(self)))
return dict(temp)
[docs] def get_obs_idx(self, obs_id):
"""Get row index for given ``obs_id``.
Raises KeyError if observation is not available.
Parameters
----------
obs_id : int, list
observation ids
Returns
-------
idx : list
indices corresponding to obs_id
"""
idx = [self._index_dict[key] for key in np.atleast_1d(obs_id)]
return idx
[docs] def select_obs_id(self, obs_id):
"""Get `~gammapy.data.ObservationTable` containing only ``obs_id``.
Raises KeyError if observation is not available.
Parameters
----------
obs_id: int, list
observation ids
"""
return self[self.get_obs_idx(obs_id)]
[docs] def summary(self):
"""Summary info string (str)."""
obs_name = self.meta.get("OBSERVATORY_NAME", "N/A")
return "\n".join(
[
"Observation table:",
"Observatory name: {!r}".format(obs_name),
"Number of observations: {}".format(len(self)),
# TODO: clean this up. Make those properties?
# ontime = Quantity(self['ONTIME'].sum(), self['ONTIME'].unit)
#
# ss += 'Total observation time: {}\n'.format(ontime)
# livetime = Quantity(self['LIVETIME'].sum(), self['LIVETIME'].unit)
# ss += 'Total live time: {}\n'.format(livetime)
# dtf = 100. * (1 - livetime / ontime)
# ss += 'Average dead time fraction: {:5.2f}%\n'.format(dtf)
# time_ref = time_ref_from_dict(self.meta)
# time_ref_unit = time_ref_from_dict(self.meta).format
# ss += 'Time reference: {} {}'.format(time_ref, time_ref_unit)
]
)
[docs] def select_range(self, selection_variable, value_range, inverted=False):
"""Make an observation table, applying some selection.
Generic function to apply a 1D box selection (min, max) to a
table on any variable that is in the observation table and can
be casted into a `~astropy.units.Quantity` object.
If the range length is 0 (min = max), the selection is applied
to the exact value indicated by the min value. This is useful
for selection of exact values, for instance in discrete
variables like the number of telescopes.
If the inverted flag is activated, the selection is applied to
keep all elements outside the selected range.
Parameters
----------
selection_variable : str
Name of variable to apply a cut (it should exist on the table).
value_range : `~astropy.units.Quantity`-like
Allowed range of values (min, max). The type should be
consistent with the selection_variable.
inverted : bool, optional
Invert selection: keep all entries outside the (min, max) range.
Returns
-------
obs_table : `~gammapy.data.ObservationTable`
Observation table after selection.
"""
value_range = Quantity(value_range)
# read values into a quantity in case units have to be taken into account
value = Quantity(self[selection_variable])
mask = (value_range[0] <= value) & (value < value_range[1])
if np.allclose(value_range[0].value, value_range[1].value):
mask = value_range[0] == value
if inverted:
mask = np.invert(mask)
return self[mask]
[docs] def select_time_range(self, selection_variable, time_range, inverted=False):
"""Make an observation table, applying a time selection.
Apply a 1D box selection (min, max) to a
table on any time variable that is in the observation table.
It supports both fomats: absolute times in
`~astropy.time.Time` variables and [MET]_.
If the inverted flag is activated, the selection is applied to
keep all elements outside the selected range.
Parameters
----------
selection_variable : str
Name of variable to apply a cut (it should exist on the table).
time_range : `~astropy.time.Time`
Allowed time range (min, max).
inverted : bool, optional
Invert selection: keep all entries outside the (min, max) range.
Returns
-------
obs_table : `~gammapy.data.ObservationTable`
Observation table after selection.
"""
if self.meta["TIME_FORMAT"] == "absolute":
# read times into a Time object
time = Time(self[selection_variable])
else:
# transform time to MET
time_range = time_relative_to_ref(time_range, self.meta)
# read values into a quantity in case units have to be taken into account
time = Quantity(self[selection_variable])
mask = (time_range[0] <= time) & (time < time_range[1])
if inverted:
mask = np.invert(mask)
return self[mask]
[docs] def select_observations(self, selection=None):
"""Select subset of observations.
Returns a new observation table representing the subset.
There are 3 main kinds of selection criteria, according to the
value of the **type** keyword in the **selection** dictionary:
- sky regions
- time intervals (min, max)
- intervals (min, max) on any other parameter present in the
observation table, that can be casted into an
`~astropy.units.Quantity` object
Allowed selection criteria are interpreted using the following
keywords in the **selection** dictionary under the **type** key.
- ``sky_circle`` is a circular region centered in the coordinate
marked by the **lon** and **lat** keywords, and radius **radius**;
uses `~gammapy.catalog.select_sky_circle`
- ``time_box`` is a 1D selection criterion acting on the observation
start time (**TSTART**); the interval is set via the
**time_range** keyword; uses
`~gammapy.data.ObservationTable.select_time_range`
- ``par_box`` is a 1D selection criterion acting on any
parameter defined in the observation table that can be casted
into an `~astropy.units.Quantity` object; the parameter name
and interval can be specified using the keywords **variable** and
**value_range** respectively; min = max selects exact
values of the parameter; uses
`~gammapy.data.ObservationTable.select_range`
In all cases, the selection can be inverted by activating the
**inverted** flag, in which case, the selection is applied to keep all
elements outside the selected range.
A few examples of selection criteria are given below.
Parameters
----------
selection : dict
Dictionary with a few keywords for applying selection cuts.
Returns
-------
obs_table : `~gammapy.data.ObservationTable`
Observation table after selection.
Examples
--------
>>> selection = dict(type='sky_circle', frame='galactic',
... lon=Angle(0, 'deg'),
... lat=Angle(0, 'deg'),
... radius=Angle(5, 'deg'),
... border=Angle(2, 'deg'))
>>> selected_obs_table = obs_table.select_observations(selection)
>>> selection = dict(type='time_box',
... time_range=Time(['2012-01-01T01:00:00', '2012-01-01T02:00:00']))
>>> selected_obs_table = obs_table.select_observations(selection)
>>> selection = dict(type='par_box', variable='ALT',
... value_range=Angle([60., 70.], 'deg'))
>>> selected_obs_table = obs_table.select_observations(selection)
>>> selection = dict(type='par_box', variable='OBS_ID',
... value_range=[2, 5])
>>> selected_obs_table = obs_table.select_observations(selection)
>>> selection = dict(type='par_box', variable='N_TELS',
... value_range=[4, 4])
>>> selected_obs_table = obs_table.select_observations(selection)
"""
if "inverted" not in selection.keys():
selection["inverted"] = False
if selection["type"] == "sky_circle":
lon = Angle(selection["lon"], "deg")
lat = Angle(selection["lat"], "deg")
radius = Angle(selection["radius"])
border = Angle(selection["border"])
region = SphericalCircleSkyRegion(
center=SkyCoord(lon, lat, frame=selection["frame"]),
radius=radius + border,
)
mask = region.contains(self.pointing_radec)
if selection["inverted"]:
mask = np.invert(mask)
return self[mask]
elif selection["type"] == "time_box":
return self.select_time_range(
"TSTART", selection["time_range"], selection["inverted"]
)
elif selection["type"] == "par_box":
return self.select_range(
selection["variable"], selection["value_range"], selection["inverted"]
)
else:
raise ValueError("Invalid selection type: {}".format(selection["type"]))
[docs] def create_gti(self, obs_id):
"""Returns a GTI table containing TSTART and TSTOP from the table.
TODO: This method can be removed once GTI tables are explicitly required in Gammapy.
Parameters
----------
obs_id : int
ID of the observation for which the GTI table will be created
Returns
-------
gti : `~gammapy.data.GTI`
GTI table containing one row (TSTART and TSTOP of the observation with ``obs_id``)
"""
meta = OrderedDict(
EXTNAME="GTI",
HDUCLASS="GADF",
HDUDOC="https://github.com/open-gamma-ray-astro/gamma-astro-data-formats",
HDUVERS="0.2",
HDUCLAS1="GTI",
MJDREFI=self.meta["MJDREFI"],
MJDREFF=self.meta["MJDREFF"],
TIMEUNIT=self.meta.get("TIMEUNIT", "s"),
TIMESYS=self.meta["TIMESYS"],
TIMEREF=self.meta.get("TIMEREF", "LOCAL"),
)
obs = self.select_obs_id(obs_id)
gti = Table(meta=meta)
gti["START"] = obs["TSTART"].quantity.to("s")
gti["STOP"] = obs["TSTOP"].quantity.to("s")
return GTI(gti)
class ObservationTableChecker(Checker):
"""Event list checker.
Data format specification: ref:`gadf:iact-events`
Parameters
----------
event_list : `~gammapy.data.EventList`
Event list
"""
CHECKS = {
"meta": "check_meta",
"columns": "check_columns",
# "times": "check_times",
# "coordinates_galactic": "check_coordinates_galactic",
# "coordinates_altaz": "check_coordinates_altaz",
}
# accuracy = {"angle": Angle("1 arcsec"), "time": Quantity(1, "microsecond")}
# https://gamma-astro-data-formats.readthedocs.io/en/latest/events/events.html#mandatory-header-keywords
meta_required = [
"HDUCLASS",
"HDUDOC",
"HDUVERS",
"HDUCLAS1",
"HDUCLAS2",
# https://gamma-astro-data-formats.readthedocs.io/en/latest/general/time.html#time-formats
"MJDREFI",
"MJDREFF",
"TIMEUNIT",
"TIMESYS",
"TIMEREF",
# https://gamma-astro-data-formats.readthedocs.io/en/latest/general/coordinates.html#coords-location
"GEOLON",
"GEOLAT",
"ALTITUDE",
]
_col = namedtuple("col", ["name", "unit"])
columns_required = [
_col(name="OBS_ID", unit=""),
_col(name="RA_PNT", unit="deg"),
_col(name="DEC_PNT", unit="deg"),
_col(name="TSTART", unit="s"),
_col(name="TSTOP", unit="s"),
]
def __init__(self, obs_table):
self.obs_table = obs_table
def _record(self, level="info", msg=None):
return {"level": level, "hdu": "obs-index", "msg": msg}
def check_meta(self):
m = self.obs_table.meta
meta_missing = sorted(set(self.meta_required) - set(m))
if meta_missing:
yield self._record(
level="error", msg="Missing meta keys: {!r}".format(meta_missing)
)
if m.get("HDUCLAS1", "") != "INDEX":
yield self._record(level="error", msg="HDUCLAS1 must be INDEX")
if m.get("HDUCLAS2", "") != "OBS":
yield self._record(level="error", msg="HDUCLAS2 must be OBS")
def check_columns(self):
t = self.obs_table
if len(t) == 0:
yield self._record(level="error", msg="Observation table has zero rows")
for name, unit in self.columns_required:
if name not in t.colnames:
yield self._record(
level="error", msg="Missing table column: {!r}".format(name)
)
else:
if Unit(unit) != (t[name].unit or ""):
yield self._record(
level="error", msg="Invalid unit for column: {!r}".format(name)
)