# Licensed under a 3-clause BSD style license - see LICENSE.rst
import collections.abc
import copy
import inspect
import logging
import warnings
from itertools import zip_longest
import numpy as np
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.time import Time
from astropy.units import Quantity
from astropy.utils import lazyproperty
import matplotlib.pyplot as plt
from gammapy import __version__
from gammapy.utils.deprecation import GammapyDeprecationWarning, deprecated
from gammapy.utils.fits import LazyFitsData, earth_location_to_dict
from gammapy.utils.scripts import make_path
from gammapy.utils.testing import Checker
from gammapy.utils.time import time_ref_to_dict, time_relative_to_ref
from .event_list import EventList, EventListChecker
from .filters import ObservationFilter
from .gti import GTI
from .pointing import FixedPointingInfo
__all__ = ["Observation", "Observations"]
log = logging.getLogger(__name__)
[docs]class Observation:
"""In-memory observation.
Parameters
----------
obs_id : int
Observation id
obs_info : dict
Observation info dict
aeff : `~gammapy.irf.EffectiveAreaTable2D`
Effective area
edisp : `~gammapy.irf.EnergyDispersion2D`
Energy dispersion
psf : `~gammapy.irf.PSF3D`
Point spread function
bkg : `~gammapy.irf.Background3D`
Background rate model
rad_max: `~gammapy.irf.RadMax2D`
Only for point-like IRFs: RAD_MAX table (energy dependent RAD_MAX)
For a fixed RAD_MAX, create a RadMax2D with a single bin.
gti : `~gammapy.data.GTI`
Table with GTI start and stop time
events : `~gammapy.data.EventList`
Event list
obs_filter : `ObservationFilter`
Observation filter.
"""
aeff = LazyFitsData(cache=False)
edisp = LazyFitsData(cache=False)
psf = LazyFitsData(cache=False)
bkg = LazyFitsData(cache=False)
_rad_max = LazyFitsData(cache=False)
_events = LazyFitsData(cache=False)
_gti = LazyFitsData(cache=False)
_pointing = LazyFitsData(cache=True)
def __init__(
self,
obs_id=None,
obs_info=None,
gti=None,
aeff=None,
edisp=None,
psf=None,
bkg=None,
rad_max=None,
events=None,
obs_filter=None,
pointing=None,
location=None,
):
self.obs_id = obs_id
self._obs_info = obs_info
self.aeff = aeff
self.edisp = edisp
self.psf = psf
self.bkg = bkg
self._rad_max = rad_max
self._gti = gti
self._events = events
self._pointing = pointing
self._location = location
self.obs_filter = obs_filter or ObservationFilter()
@property
def rad_max(self):
# prevent circular import
from gammapy.irf import RadMax2D
if self._rad_max is not None:
return self._rad_max
# load once to avoid trigger lazy loading it three times
aeff = self.aeff
if aeff is not None and aeff.is_pointlike:
self._rad_max = RadMax2D.from_irf(aeff)
return self._rad_max
edisp = self.edisp
if edisp is not None and edisp.is_pointlike:
self._rad_max = RadMax2D.from_irf(self.edisp)
return self._rad_max
@property
def available_hdus(self):
"""Which HDUs are available"""
available_hdus = []
keys = ["_events", "_gti", "aeff", "edisp", "psf", "bkg", "_rad_max"]
hdus = ["events", "gti", "aeff", "edisp", "psf", "bkg", "rad_max"]
for key, hdu in zip(keys, hdus):
available = self.__dict__.get(key, False)
available_hdu = self.__dict__.get(f"_{hdu}_hdu", False)
available_hdu_ = self.__dict__.get(f"_{key}_hdu", False)
if available or available_hdu or available_hdu_:
available_hdus.append(hdu)
return available_hdus
@property
def available_irfs(self):
"""Which IRFs are available"""
return [_ for _ in self.available_hdus if _ not in ["events", "gti"]]
@property
def events(self):
events = self.obs_filter.filter_events(self._events)
return events
@property
def gti(self):
gti = self.obs_filter.filter_gti(self._gti)
return gti
@staticmethod
def _get_obs_info(
pointing, deadtime_fraction, time_start, time_stop, reference_time, location
):
"""Create obs info dict from in memory data"""
obs_info = {
"DEADC": 1 - deadtime_fraction,
}
if isinstance(pointing, SkyCoord):
obs_info["RA_PNT"] = pointing.icrs.ra.deg
obs_info["DEC_PNT"] = pointing.icrs.dec.deg
obs_info.update(time_ref_to_dict(reference_time))
obs_info["TSTART"] = time_relative_to_ref(time_start, obs_info).to_value(u.s)
obs_info["TSTOP"] = time_relative_to_ref(time_stop, obs_info).to_value(u.s)
if location is not None:
obs_info.update(earth_location_to_dict(location))
return obs_info
[docs] @classmethod
def create(
cls,
pointing,
location=None,
obs_id=0,
livetime=None,
tstart=None,
tstop=None,
irfs=None,
deadtime_fraction=0.0,
reference_time=Time("2000-01-01 00:00:00"),
):
"""Create an observation.
User must either provide the livetime, or the start and stop times.
Parameters
----------
pointing : `~gammapy.data.FixedPointingInfo` or `~astropy.coordinates.SkyCoord`
Pointing information
obs_id : int
Observation ID as identifier
livetime : ~astropy.units.Quantity`
Livetime exposure of the simulated observation
tstart: `~astropy.time.Time` or `~astropy.units.Quantity`
Start time of observation as `~astropy.time.Time` or duration
relative to `reference_time`
tstop: `astropy.time.Time` or `~astropy.units.Quantity`
Stop time of observation as `~astropy.time.Time` or duration
relative to `reference_time`
irfs: dict
IRFs used for simulating the observation: `bkg`, `aeff`, `psf`, `edisp`, `rad_max`
deadtime_fraction : float, optional
Deadtime fraction, defaults to 0
reference_time : `~astropy.time.Time`
the reference time to use in GTI definition
Returns
-------
obs : `gammapy.data.MemoryObservation`
"""
if tstart is None:
tstart = reference_time.copy()
if tstop is None:
tstop = tstart + Quantity(livetime)
gti = GTI.create(tstart, tstop, reference_time=reference_time)
obs_info = cls._get_obs_info(
pointing=pointing,
deadtime_fraction=deadtime_fraction,
time_start=gti.time_start[0],
time_stop=gti.time_stop[0],
reference_time=reference_time,
location=location,
)
if not isinstance(pointing, FixedPointingInfo):
warnings.warn(
"Pointing will be required to be provided as FixedPointingInfo",
GammapyDeprecationWarning,
)
pointing = FixedPointingInfo.from_fits_header(obs_info)
return cls(
obs_id=obs_id,
obs_info=obs_info,
gti=gti,
aeff=irfs.get("aeff"),
bkg=irfs.get("bkg"),
edisp=irfs.get("edisp"),
psf=irfs.get("psf"),
rad_max=irfs.get("rad_max"),
pointing=pointing,
location=location,
)
@property
def tstart(self):
"""Observation start time (`~astropy.time.Time`)."""
return self.gti.time_start[0]
@property
def tstop(self):
"""Observation stop time (`~astropy.time.Time`)."""
return self.gti.time_stop[0]
@property
def tmid(self):
"""Midpoint between start and stop time"""
return self.tstart + 0.5 * (self.tstop - self.tstart)
@property
def observation_time_duration(self):
"""Observation time duration in seconds (`~astropy.units.Quantity`).
The wall time, including dead-time.
"""
return self.gti.time_sum
@property
def observation_live_time_duration(self):
"""Live-time duration in seconds (`~astropy.units.Quantity`).
The dead-time-corrected observation time.
Computed as ``t_live = t_observation * (1 - f_dead)``
where ``f_dead`` is the dead-time fraction.
"""
return (
self.observation_time_duration
* (1 - self.observation_dead_time_fraction)
* self.obs_filter.livetime_fraction
)
@property
def observation_dead_time_fraction(self):
"""Dead-time fraction (float).
Defined as dead-time over observation time.
Dead-time is defined as the time during the observation
where the detector didn't record events:
https://en.wikipedia.org/wiki/Dead_time
https://ui.adsabs.harvard.edu/abs/2004APh....22..285F
The dead-time fraction is used in the live-time computation,
which in turn is used in the exposure and flux computation.
"""
return 1 - self.obs_info["DEADC"]
@lazyproperty
def obs_info(self):
"""Observation info dictionary."""
meta = self._obs_info.copy() if self._obs_info is not None else {}
if self.events is not None:
meta.update(
{
k: v
for k, v in self.events.table.meta.items()
if not k.startswith("HDU")
}
)
return meta
@property
def pointing(self):
if self._pointing is None:
self._pointing = FixedPointingInfo.from_fits_header(self.obs_info)
return self._pointing
[docs] def get_pointing_altaz(self, time):
"""Get the pointing in altaz for given time"""
return self.pointing.get_altaz(time, self.observatory_earth_location)
[docs] def get_pointing_icrs(self, time):
"""Get the pointing in icrs for given time"""
return self.pointing.get_icrs(time, self.observatory_earth_location)
@lazyproperty
@deprecated(
"v1.1",
message="Use observation.pointing or observation.get_pointing_{{altaz,icrs}} instead",
)
def fixed_pointing_info(self):
"""Fixed pointing info for this observation (`FixedPointingInfo`)."""
return self._pointing
@property
@deprecated("v1.1", message="Use observation.get_pointing_icrs(time) instead")
def pointing_radec(self):
"""Pointing RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`)."""
return self.fixed_pointing_info.radec
@property
@deprecated("v1.1", message="Use observation.get_pointing_altaz(time) instead")
def pointing_altaz(self):
return self.fixed_pointing_info.altaz
@property
@deprecated("v1.1", message="Use observation.get_pointing_altaz(time).zen instead")
def pointing_zen(self):
"""Pointing zenith angle sky (`~astropy.units.Quantity`)."""
return self.get_pointing_altaz(self.tmid).zen
@property
def observatory_earth_location(self):
"""Observatory location (`~astropy.coordinates.EarthLocation`)."""
if self._location is not None:
return self._location
# for now we catch the deprecation warning until we store location on the observation properly
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=GammapyDeprecationWarning)
return self.fixed_pointing_info.location
@lazyproperty
def target_radec(self):
"""Target RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`)."""
lon, lat = (
self.obs_info.get("RA_OBJ", np.nan),
self.obs_info.get("DEC_OBJ", np.nan),
)
return SkyCoord(lon, lat, unit="deg", frame="icrs")
@property
def muoneff(self):
"""Observation muon efficiency."""
return self.obs_info.get("MUONEFF", 1)
def __str__(self):
pointing = self.get_pointing_icrs(self.tmid)
ra = pointing.ra.deg
dec = pointing.dec.deg
pointing = f"{ra:.1f} deg, {dec:.1f} deg\n"
# TODO: Which target was observed?
# TODO: print info about available HDUs for this observation ...
return (
f"{self.__class__.__name__}\n\n"
f"\tobs id : {self.obs_id} \n "
f"\ttstart : {self.tstart.mjd:.2f}\n"
f"\ttstop : {self.tstop.mjd:.2f}\n"
f"\tduration : {self.observation_time_duration:.2f}\n"
f"\tpointing (icrs) : {pointing}\n"
f"\tdeadtime fraction : {self.observation_dead_time_fraction:.1%}\n"
)
[docs] def check(self, checks="all"):
"""Run checks.
This is a generator that yields a list of dicts.
"""
checker = ObservationChecker(self)
return checker.run(checks=checks)
[docs] def peek(self, figsize=(15, 10)):
"""Quick-look plots in a few panels.
Parameters
----------
figsize : tuple
Figure size
"""
plottable_hds = ["events", "aeff", "psf", "edisp", "bkg", "rad_max"]
plot_hdus = list(set(plottable_hds) & set(self.available_hdus))
plot_hdus.sort()
n_irfs = len(plot_hdus)
nrows = n_irfs // 2
ncols = 2 + n_irfs % 2
fig, axes = plt.subplots(
nrows=nrows,
ncols=ncols,
figsize=figsize,
gridspec_kw={"wspace": 0.3, "hspace": 0.3},
)
for idx, (ax, name) in enumerate(zip_longest(axes.flat, plot_hdus)):
if name == "aeff":
self.aeff.plot(ax=ax)
ax.set_title("Effective area")
if name == "bkg":
bkg = self.bkg
if not bkg.has_offset_axis:
bkg = bkg.to_2d()
bkg.plot(ax=ax)
ax.set_title("Background rate")
if name == "psf":
self.psf.plot_containment_radius_vs_energy(ax=ax)
ax.set_title("Point spread function")
if name == "edisp":
self.edisp.plot_bias(ax=ax, add_cbar=True)
ax.set_title("Energy dispersion")
if name == "rad_max":
self.rad_max.plot_rad_max_vs_energy(ax=ax)
ax.set_title("Rad max")
if name == "events":
m = self.events._counts_image(allsky=False)
ax.remove()
ax = fig.add_subplot(nrows, ncols, idx + 1, projection=m.geom.wcs)
m.plot(ax=ax, stretch="sqrt", vmin=0, add_cbar=True)
ax.set_title("Events")
if name is None:
ax.set_visible(False)
[docs] def select_time(self, time_interval):
"""Select a time interval of the observation.
Parameters
----------
time_interval : `astropy.time.Time`
Start and stop time of the selected time interval.
For now we only support a single time interval.
Returns
-------
new_obs : `~gammapy.data.Observation`
A new observation instance of the specified time interval
"""
new_obs_filter = self.obs_filter.copy()
new_obs_filter.time_filter = time_interval
obs = copy.deepcopy(self)
obs.obs_filter = new_obs_filter
return obs
[docs] @classmethod
def read(cls, event_file, irf_file=None):
"""Create an Observation from a Event List and an (optional) IRF file.
Parameters
----------
event_file : str, Path
path to the .fits file containing the event list and the GTI
irf_file : str, Path
(optional) path to the .fits file containing the IRF components,
if not provided the IRF will be read from the event file
Returns
-------
observation : `~gammapy.data.Observation`
observation with the events and the irf read from the file
"""
from gammapy.irf.io import load_irf_dict_from_file
events = EventList.read(event_file)
gti = GTI.read(event_file)
irf_file = irf_file if irf_file is not None else event_file
irf_dict = load_irf_dict_from_file(irf_file)
obs_info = events.table.meta
return cls(
events=events,
gti=gti,
obs_info=obs_info,
obs_id=obs_info.get("OBS_ID"),
pointing=FixedPointingInfo.from_fits_header(obs_info),
**irf_dict,
)
[docs] def write(self, path, overwrite=False, format="gadf", include_irfs=True):
"""
Write this observation into `path` using the specified format
Parameters
----------
path: str or `~pathlib.Path`
Path for the output file
overwrite: bool
If true, existing files are overwritten.
format: str
Output format, currently only "gadf" is supported
include_irfs: bool
Whether to include irf components in the output file
"""
if format != "gadf":
raise ValueError(f'Only the "gadf" format supported, got {format}')
path = make_path(path)
primary = fits.PrimaryHDU()
primary.header["CREATOR"] = f"Gammapy {__version__}"
primary.header["DATE"] = Time.now().iso
hdul = fits.HDUList([primary])
events = self.events
if events is not None:
events_hdu = events.to_table_hdu(format=format)
events_hdu.header.update(self.pointing.to_fits_header())
hdul.append(events_hdu)
gti = self.gti
if gti is not None:
hdul.append(gti.to_table_hdu(format=format))
if include_irfs:
for irf_name in self.available_irfs:
irf = getattr(self, irf_name)
if irf is not None:
hdul.append(irf.to_table_hdu(format="gadf-dl3"))
hdul.writeto(path, overwrite=overwrite)
[docs] def copy(self, in_memory=False, **kwargs):
"""Copy observation
Overwriting arguments requires the 'in_memory` argument to be true.
Parameters
----------
in_memory : bool
Copy observation in memory.
**kwargs : dict
Keyword arguments passed to `Observation`
Examples
--------
.. code::
from gammapy.data import Observation
obs = Observation.read(
"$GAMMAPY_DATA/hess-dl3-dr1/data/hess_dl3_dr1_obs_id_020136.fits.gz"
)
obs_copy = obs.copy(obs_id=1234)
print(obs_copy)
Returns
-------
obs : `Observation`
Copied observation
"""
if in_memory:
argnames = inspect.getfullargspec(self.__init__).args
argnames.remove("self")
for name in argnames:
if name == "location":
attr = "observatory_earth_location"
else:
attr = name
value = getattr(self, attr)
kwargs.setdefault(name, copy.deepcopy(value))
return self.__class__(**kwargs)
if kwargs:
raise ValueError("Overwriting arguments requires to set 'in_memory=True'")
return copy.deepcopy(self)
[docs]class Observations(collections.abc.MutableSequence):
"""Container class that holds a list of observations.
Parameters
----------
observations : list
A list of `~gammapy.data.Observation`
"""
def __init__(self, observations=None):
self._observations = observations or []
def __getitem__(self, key):
return self._observations[self.index(key)]
def __delitem__(self, key):
del self._observations[self.index(key)]
def __setitem__(self, key, obs):
if isinstance(obs, Observation):
self._observations[self.index(key)] = obs
else:
raise TypeError(f"Invalid type: {type(obs)!r}")
[docs] def insert(self, idx, obs):
if isinstance(obs, Observation):
self._observations.insert(idx, obs)
else:
raise TypeError(f"Invalid type: {type(obs)!r}")
def __len__(self):
return len(self._observations)
def __str__(self):
s = self.__class__.__name__ + "\n"
s += "Number of observations: {}\n".format(len(self))
for obs in self:
s += str(obs)
return s
[docs] def index(self, key):
if isinstance(key, (int, slice)):
return key
elif isinstance(key, str):
return self.ids.index(key)
elif isinstance(key, Observation):
return self._observations.index(key)
else:
raise TypeError(f"Invalid type: {type(key)!r}")
@property
def ids(self):
"""List of obs IDs (`list`)"""
return [str(obs.obs_id) for obs in self]
[docs] def select_time(self, time_intervals):
"""Select a time interval of the observations.
Parameters
----------
time_intervals : `astropy.time.Time` or list of `astropy.time.Time`
list of Start and stop time of the time intervals or one Time interval
Returns
-------
new_observations : `~gammapy.data.Observations`
A new Observations instance of the specified time intervals
"""
new_obs_list = []
if isinstance(time_intervals, Time):
time_intervals = [time_intervals]
for time_interval in time_intervals:
for obs in self:
if (obs.tstart < time_interval[1]) & (obs.tstop > time_interval[0]):
new_obs = obs.select_time(time_interval)
new_obs_list.append(new_obs)
return self.__class__(new_obs_list)
def _ipython_key_completions_(self):
return self.ids
[docs] def group_by_label(self, labels):
"""Split obsevations in multiple groups of observations
Parameters
----------
labels : array
Array of group labels
Returns
-------
obs_clusters : dict of `~gammapy.data.Observations`
dict of Observations instance, one instance for each group.
"""
obs_groups = {}
for label in np.unique(labels):
observations = self.__class__(
[obs for k, obs in enumerate(self) if labels[k] == label]
)
obs_groups[f"group_{label}"] = observations
return obs_groups
class ObservationChecker(Checker):
"""Check an observation.
Checks data format and a bit about the content.
"""
CHECKS = {
"events": "check_events",
"gti": "check_gti",
"aeff": "check_aeff",
"edisp": "check_edisp",
"psf": "check_psf",
}
def __init__(self, observation):
self.observation = observation
def _record(self, level="info", msg=None):
return {"level": level, "obs_id": self.observation.obs_id, "msg": msg}
def check_events(self):
yield self._record(level="debug", msg="Starting events check")
try:
events = self.observation.events
except Exception:
yield self._record(level="warning", msg="Loading events failed")
return
yield from EventListChecker(events).run()
# TODO: split this out into a GTIChecker
def check_gti(self):
yield self._record(level="debug", msg="Starting gti check")
try:
gti = self.observation.gti
except Exception:
yield self._record(level="warning", msg="Loading GTI failed")
return
if len(gti.table) == 0:
yield self._record(level="error", msg="GTI table has zero rows")
columns_required = ["START", "STOP"]
for name in columns_required:
if name not in gti.table.colnames:
yield self._record(level="error", msg=f"Missing table column: {name!r}")
# TODO: Check that header keywords agree with table entries
# TSTART, TSTOP, MJDREFI, MJDREFF
# Check that START and STOP times are consecutive
# times = np.ravel(self.table['START'], self.table['STOP'])
# # TODO: not sure this is correct ... add test with a multi-gti table from Fermi.
# if not np.all(np.diff(times) >= 0):
# yield 'GTIs are not consecutive or sorted.'
# TODO: add reference times for all instruments and check for this
# Use TELESCOP header key to check which instrument it is.
def _check_times(self):
"""Check if various times are consistent.
The headers and tables of the FITS EVENTS and GTI extension
contain various observation and event time information.
"""
# http://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Cicerone/Cicerone_Data/Time_in_ScienceTools.html
# https://hess-confluence.desy.de/confluence/display/HESS/HESS+FITS+data+-+References+and+checks#HESSFITSdata-Referencesandchecks-Time
telescope_met_refs = {
"FERMI": Time("2001-01-01T00:00:00"),
"HESS": Time("2001-01-01T00:00:00"),
}
meta = self.dset.event_list.table.meta
telescope = meta["TELESCOP"]
if telescope in telescope_met_refs.keys():
dt = self.time_ref - telescope_met_refs[telescope]
if dt > self.accuracy["time"]:
yield self._record(
level="error", msg="Reference time incorrect for telescope"
)
def check_aeff(self):
yield self._record(level="debug", msg="Starting aeff check")
try:
aeff = self.observation.aeff
except Exception:
yield self._record(level="warning", msg="Loading aeff failed")
return
# Check that thresholds are meaningful for aeff
if (
"LO_THRES" in aeff.meta
and "HI_THRES" in aeff.meta
and aeff.meta["LO_THRES"] >= aeff.meta["HI_THRES"]
):
yield self._record(
level="error", msg="LO_THRES >= HI_THRES in effective area meta data"
)
# Check that data isn't all null
if np.max(aeff.data.data) <= 0:
yield self._record(
level="error", msg="maximum entry of effective area is <= 0"
)
def check_edisp(self):
yield self._record(level="debug", msg="Starting edisp check")
try:
edisp = self.observation.edisp
except Exception:
yield self._record(level="warning", msg="Loading edisp failed")
return
# Check that data isn't all null
if np.max(edisp.data.data) <= 0:
yield self._record(level="error", msg="maximum entry of edisp is <= 0")
def check_psf(self):
yield self._record(level="debug", msg="Starting psf check")
try:
self.observation.psf
except Exception:
yield self._record(level="warning", msg="Loading psf failed")
return