Source code for gammapy.data.observations

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import collections.abc
import copy
import logging
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.time import Time
from astropy.units import Quantity
from gammapy.irf import Background3D
from gammapy.utils.fits import LazyFitsData, earth_location_from_dict
from gammapy.utils.testing import Checker
from .event_list import EventListChecker
from .filters import ObservationFilter
from .gti import GTI
from .pointing import FixedPointingInfo

__all__ = ["Observation", "Observations"]

log = logging.getLogger(__name__)


[docs]class Observation: """In-memory observation. Parameters ---------- obs_id : int Observation id obs_info : dict Observation info dict aeff : `~gammapy.irf.EffectiveAreaTable2D` Effective area edisp : `~gammapy.irf.EnergyDispersion2D` Energy dispersion psf : `~gammapy.irf.PSF3D` Point spread function bkg : `~gammapy.irf.Background3D` Background rate model gti : `~gammapy.data.GTI` Table with GTI start and stop time events : `~gammapy.data.EventList` Event list obs_filter : `ObservationFilter` Observation filter. """ aeff = LazyFitsData(cache=False) edisp = LazyFitsData(cache=False) psf = LazyFitsData(cache=False) bkg = LazyFitsData(cache=False) _events = LazyFitsData(cache=False) _gti = LazyFitsData(cache=False) def __init__( self, obs_id=None, obs_info=None, gti=None, aeff=None, edisp=None, psf=None, bkg=None, events=None, obs_filter=None, ): self.obs_id = obs_id self.obs_info = obs_info self.aeff = aeff self.edisp = edisp self.psf = psf self.bkg = bkg self._gti = gti self._events = events self.obs_filter = obs_filter or ObservationFilter() @property def events(self): events = self.obs_filter.filter_events(self._events) return events @property def gti(self): gti = self.obs_filter.filter_gti(self._gti) return gti @staticmethod def _get_obs_info(pointing, deadtime_fraction): """Create obs info dict from in memory data""" return { "RA_PNT": pointing.icrs.ra.deg, "DEC_PNT": pointing.icrs.dec.deg, "DEADC": 1 - deadtime_fraction, }
[docs] @classmethod def create( cls, pointing, obs_id=0, livetime=None, tstart=None, tstop=None, irfs=None, deadtime_fraction=0.0, reference_time="2000-01-01", ): """Create an observation. User must either provide the livetime, or the start and stop times. Parameters ---------- pointing : `~astropy.coordinates.SkyCoord` Pointing position obs_id : int Observation ID as identifier livetime : ~astropy.units.Quantity` Livetime exposure of the simulated observation tstart : `~astropy.units.Quantity` Start time of observation w.r.t reference_time tstop : `~astropy.units.Quantity` w.r.t reference_time Stop time of observation irfs : dict IRFs used for simulating the observation: `bkg`, `aeff`, `psf`, `edisp` deadtime_fraction : float, optional Deadtime fraction, defaults to 0 reference_time : `~astropy.time.Time` the reference time to use in GTI definition Returns ------- obs : `gammapy.data.MemoryObservation` """ if tstart is None: tstart = Quantity(0.0, "hr") if tstop is None: tstop = tstart + Quantity(livetime) gti = GTI.create([tstart], [tstop], reference_time=reference_time) obs_info = cls._get_obs_info( pointing=pointing, deadtime_fraction=deadtime_fraction ) return cls( obs_id=obs_id, obs_info=obs_info, gti=gti, aeff=irfs.get("aeff"), bkg=irfs.get("bkg"), edisp=irfs.get("edisp"), psf=irfs.get("psf"), )
@property def tstart(self): """Observation start time (`~astropy.time.Time`).""" return self.gti.time_start[0] @property def tstop(self): """Observation stop time (`~astropy.time.Time`).""" return self.gti.time_stop[0] @property def observation_time_duration(self): """Observation time duration in seconds (`~astropy.units.Quantity`). The wall time, including dead-time. """ return self.gti.time_sum @property def observation_live_time_duration(self): """Live-time duration in seconds (`~astropy.units.Quantity`). The dead-time-corrected observation time. Computed as ``t_live = t_observation * (1 - f_dead)`` where ``f_dead`` is the dead-time fraction. """ return self.observation_time_duration * ( 1 - self.observation_dead_time_fraction ) @property def observation_dead_time_fraction(self): """Dead-time fraction (float). Defined as dead-time over observation time. Dead-time is defined as the time during the observation where the detector didn't record events: https://en.wikipedia.org/wiki/Dead_time https://ui.adsabs.harvard.edu/abs/2004APh....22..285F The dead-time fraction is used in the live-time computation, which in turn is used in the exposure and flux computation. """ return 1 - self.obs_info["DEADC"] @property def pointing_radec(self): """Pointing RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`).""" lon, lat = ( self.obs_info.get("RA_PNT", np.nan), self.obs_info.get("DEC_PNT", np.nan), ) return SkyCoord(lon, lat, unit="deg", frame="icrs") @property def pointing_altaz(self): """Pointing ALT / AZ sky coordinates (`~astropy.coordinates.SkyCoord`).""" alt, az = ( self.obs_info.get("ALT_PNT", np.nan), self.obs_info.get("AZ_PNT", np.nan), ) return SkyCoord(az, alt, unit="deg", frame="altaz") @property def pointing_zen(self): """Pointing zenith angle sky (`~astropy.units.Quantity`).""" return Quantity(self.obs_info.get("ZEN_PNT", np.nan), unit="deg") @property def fixed_pointing_info(self): """Fixed pointing info for this observation (`FixedPointingInfo`).""" return FixedPointingInfo(self.events.table.meta) @property def target_radec(self): """Target RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`).""" lon, lat = ( self.obs_info.get("RA_OBJ", np.nan), self.obs_info.get("DEC_OBJ", np.nan), ) return SkyCoord(lon, lat, unit="deg", frame="icrs") @property def observatory_earth_location(self): """Observatory location (`~astropy.coordinates.EarthLocation`).""" return earth_location_from_dict(self.obs_info) @property def muoneff(self): """Observation muon efficiency.""" return self.obs_info.get("MUONEFF", 1) def __str__(self): ra = self.pointing_radec.ra.deg dec = self.pointing_radec.dec.deg pointing = f"{ra:.1f} deg, {dec:.1f} deg\n" # TODO: Which target was observed? # TODO: print info about available HDUs for this observation ... return ( f"{self.__class__.__name__}\n\n" f"\tobs id : {self.obs_id} \n " f"\ttstart : {self.tstart.mjd:.2f}\n" f"\ttstop : {self.tstop.mjd:.2f}\n" f"\tduration : {self.observation_time_duration:.2f}\n" f"\tpointing (icrs) : {pointing}\n" f"\tdeadtime fraction : {self.observation_dead_time_fraction:.1%}\n" )
[docs] def check(self, checks="all"): """Run checks. This is a generator that yields a list of dicts. """ checker = ObservationChecker(self) return checker.run(checks=checks)
[docs] def peek(self, figsize=(12, 10)): """Quick-look plots in a few panels. Parameters ---------- figszie : tuple Figure size """ import matplotlib.pyplot as plt fig, ((ax_aeff, ax_bkg), (ax_psf, ax_edisp)) = plt.subplots( nrows=2, ncols=2, figsize=figsize, gridspec_kw={"wspace": 0.25, "hspace": 0.25}, ) self.aeff.plot(ax=ax_aeff) try: if isinstance(self.bkg, Background3D): bkg = self.bkg.to_2d() else: bkg = self.bkg bkg.plot(ax=ax_bkg) except AttributeError: logging.warning(f"No background model found for obs {self.obs_id}.") self.psf.plot_containment_vs_energy(ax=ax_psf) self.edisp.plot_bias(ax=ax_edisp, add_cbar=True) ax_aeff.set_title("Effective area") ax_bkg.set_title("Background rate") ax_psf.set_title("Point spread function") ax_edisp.set_title("Energy dispersion")
[docs] def select_time(self, time_interval): """Select a time interval of the observation. Parameters ---------- time_interval : `astropy.time.Time` Start and stop time of the selected time interval. For now we only support a single time interval. Returns ------- new_obs : `~gammapy.data.Observation` A new observation instance of the specified time interval """ new_obs_filter = self.obs_filter.copy() new_obs_filter.time_filter = time_interval obs = copy.deepcopy(self) obs.obs_filter = new_obs_filter return obs
[docs]class Observations(collections.abc.MutableSequence): """Container class that holds a list of observations. Parameters ---------- observations : list A list of `~gammapy.data.Observation` """ def __init__(self, observations=None): self._observations = observations or [] def __getitem__(self, key): return self._observations[self.index(key)] def __delitem__(self, key): del self._observations[self.index(key)] def __setitem__(self, key, obs): if isinstance(obs, Observation): self._observations[self.index(key)] = obs else: raise TypeError(f"Invalid type: {type(obs)!r}")
[docs] def insert(self, idx, obs): if isinstance(obs, Observation): self._observations.insert(idx, obs) else: raise TypeError(f"Invalid type: {type(obs)!r}")
def __len__(self): return len(self._observations) def __str__(self): s = self.__class__.__name__ + "\n" s += "Number of observations: {}\n".format(len(self)) for obs in self: s += str(obs) return s
[docs] def index(self, key): if isinstance(key, (int, slice)): return key elif isinstance(key, str): return self.ids.index(key) elif isinstance(key, Observation): return self._observations.index(key) else: raise TypeError(f"Invalid type: {type(key)!r}")
@property def ids(self): """List of obs IDs (`list`)""" return [str(obs.obs_id) for obs in self]
[docs] def select_time(self, time_intervals): """Select a time interval of the observations. Parameters ---------- time_intervals : `astropy.time.Time` or list of `astropy.time.Time` list of Start and stop time of the time intervals or one Time interval Returns ------- new_observations : `~gammapy.data.Observations` A new Observations instance of the specified time intervals """ new_obs_list = [] if isinstance(time_intervals, Time): time_intervals = [time_intervals] for time_interval in time_intervals: for obs in self: if (obs.tstart < time_interval[1]) & (obs.tstop > time_interval[0]): new_obs = obs.select_time(time_interval) new_obs_list.append(new_obs) return self.__class__(new_obs_list)
def _ipython_key_completions_(self): return self.ids
class ObservationChecker(Checker): """Check an observation. Checks data format and a bit about the content. """ CHECKS = { "events": "check_events", "gti": "check_gti", "aeff": "check_aeff", "edisp": "check_edisp", "psf": "check_psf", } def __init__(self, observation): self.observation = observation def _record(self, level="info", msg=None): return {"level": level, "obs_id": self.observation.obs_id, "msg": msg} def check_events(self): yield self._record(level="debug", msg="Starting events check") try: events = self.observation.events except Exception: yield self._record(level="warning", msg="Loading events failed") return yield from EventListChecker(events).run() # TODO: split this out into a GTIChecker def check_gti(self): yield self._record(level="debug", msg="Starting gti check") try: gti = self.observation.gti except Exception: yield self._record(level="warning", msg="Loading GTI failed") return if len(gti.table) == 0: yield self._record(level="error", msg="GTI table has zero rows") columns_required = ["START", "STOP"] for name in columns_required: if name not in gti.table.colnames: yield self._record(level="error", msg=f"Missing table column: {name!r}") # TODO: Check that header keywords agree with table entries # TSTART, TSTOP, MJDREFI, MJDREFF # Check that START and STOP times are consecutive # times = np.ravel(self.table['START'], self.table['STOP']) # # TODO: not sure this is correct ... add test with a multi-gti table from Fermi. # if not np.all(np.diff(times) >= 0): # yield 'GTIs are not consecutive or sorted.' # TODO: add reference times for all instruments and check for this # Use TELESCOP header key to check which instrument it is. def _check_times(self): """Check if various times are consistent. The headers and tables of the FITS EVENTS and GTI extension contain various observation and event time information. """ # http://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Cicerone/Cicerone_Data/Time_in_ScienceTools.html # https://hess-confluence.desy.de/confluence/display/HESS/HESS+FITS+data+-+References+and+checks#HESSFITSdata-Referencesandchecks-Time telescope_met_refs = { "FERMI": Time("2001-01-01T00:00:00"), "HESS": Time("2001-01-01T00:00:00"), } meta = self.dset.event_list.table.meta telescope = meta["TELESCOP"] if telescope in telescope_met_refs.keys(): dt = self.time_ref - telescope_met_refs[telescope] if dt > self.accuracy["time"]: yield self._record( level="error", msg="Reference time incorrect for telescope" ) def check_aeff(self): yield self._record(level="debug", msg="Starting aeff check") try: aeff = self.observation.aeff except Exception: yield self._record(level="warning", msg="Loading aeff failed") return # Check that thresholds are meaningful for aeff if ( "LO_THRES" in aeff.meta and "HI_THRES" in aeff.meta and aeff.meta["LO_THRES"] >= aeff.meta["HI_THRES"] ): yield self._record( level="error", msg="LO_THRES >= HI_THRES in effective area meta data" ) # Check that data isn't all null if np.max(aeff.data.data) <= 0: yield self._record( level="error", msg="maximum entry of effective area is <= 0" ) def check_edisp(self): yield self._record(level="debug", msg="Starting edisp check") try: edisp = self.observation.edisp except Exception: yield self._record(level="warning", msg="Loading edisp failed") return # Check that data isn't all null if np.max(edisp.data.data) <= 0: yield self._record(level="error", msg="maximum entry of edisp is <= 0") def check_psf(self): yield self._record(level="debug", msg="Starting psf check") try: self.observation.psf except Exception: yield self._record(level="warning", msg="Loading psf failed") return # TODO: implement some basic check # The following doesn't work, because EnergyDependentMultiGaussPSF # has no attribute `data` # Check that data isn't all null # if np.max(psf.data.data) <= 0: # yield self._record( # level="error", msg="maximum entry of psf is <= 0" # )