Source code for gammapy.data.observations

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
from collections import OrderedDict
from astropy.coordinates import SkyCoord
from astropy.units import Quantity
from astropy.utils import lazyproperty
from ..extern.six.moves import UserList  # pylint:disable=import-error
from ..irf import EnergyDependentTablePSF, PSF3D, IRFStacker
from .event_list import EventListChecker
from ..utils.testing import Checker
from ..utils.energy import Energy
from ..utils.fits import earth_location_from_dict
from ..utils.table import table_row_to_dict
from ..utils.time import time_ref_from_dict

__all__ = ["ObservationCTA", "DataStoreObservation", "ObservationList"]

log = logging.getLogger(__name__)


[docs]class ObservationCTA(object): """Container class for an CTA observation Parameters follow loosely the "Required columns" here: http://gamma-astro-data-formats.readthedocs.io/en/latest/data_storage/obs_index/index.html#required-columns Parameters ---------- obs_id : int Observation ID gti : `~gammapy.data.GTI` Good Time Intervals events : `~gammapy.data.EventList` Event list aeff : `~gammapy.irf.EffectiveAreaTable2D` Effective area edisp : `~gammapy.irf.EnergyDispersion2D` Energy dispersion psf : `~gammapy.irf.PSF3D` or `~gammapy.irf.EnergyDependentMultiGaussPSF` or `~gammapy.irf.PSFKing` Tabled Point Spread Function bkg : `~gammapy.irf.Background2D` or `~gammapy.irf.Background3D` Background rate pointing_radec : `~astropy.coordinates.SkyCoord` Pointing RA / DEC sky coordinates observation_live_time_duration : `~astropy.units.Quantity` Live-time duration in seconds The dead-time-corrected observation time. Computed as ``t_live = t_observation * (1 - f_dead)`` where ``f_dead`` is the dead-time fraction. observation_dead_time_fraction : float Dead-time fraction Defined as dead-time over observation time. Dead-time is defined as the time during the observation where the detector did not record events: https://en.wikipedia.org/wiki/Dead_time https://adsabs.harvard.edu/abs/2004APh....22..285F The dead-time fraction is used in the live-time computation, which in turn is used in the exposure and flux computation. meta : `~collections.OrderedDict` Dictionary to store metadata Examples -------- A minimal working example of how to create an observation from CTA's 1DC is given in examples/example_observation_cta.py """ def __init__( self, obs_id=None, gti=None, events=None, aeff=None, edisp=None, psf=None, bkg=None, pointing_radec=None, observation_live_time_duration=None, observation_dead_time_fraction=None, meta=None, ): self.obs_id = obs_id self.gti = gti self.events = events self.aeff = aeff self.edisp = edisp self.psf = psf self.bkg = bkg self.pointing_radec = pointing_radec self.observation_live_time_duration = observation_live_time_duration self.observation_dead_time_fraction = observation_dead_time_fraction self.meta = meta or OrderedDict() def __str__(self): ss = "Info for OBS_ID = {}\n".format(self.obs_id) ss += "- Pointing pos: RA {:.2f} / Dec {:.2f}\n".format( self.pointing_radec.ra if self.pointing_radec else "None", self.pointing_radec.dec if self.pointing_radec else "None", ) tstart = np.atleast_1d(self.gti.time_start.fits)[0] if self.gti else "None" ss += "- Start time: {}\n".format(tstart) ss += "- Observation duration: {}\n".format( self.gti.time_sum if self.gti else "None" ) ss += "- Dead-time fraction: {:5.3f} %\n".format( 100 * self.observation_dead_time_fraction ) return ss
[docs]class DataStoreObservation(object): """IACT data store observation. Parameters ---------- obs_id : int Observation ID data_store : `~gammapy.data.DataStore` Data store """ def __init__(self, obs_id, data_store): # Assert that `obs_id` is available if obs_id not in data_store.obs_table["OBS_ID"]: raise ValueError("OBS_ID = {} not in obs index table.".format(obs_id)) if obs_id not in data_store.hdu_table["OBS_ID"]: raise ValueError("OBS_ID = {} not in HDU index table.".format(obs_id)) self.obs_id = obs_id self.data_store = data_store def __str__(self): ss = "Info for OBS_ID = {}\n".format(self.obs_id) ss += "- Start time: {:.2f}\n".format(self.tstart.mjd) ss += "- Pointing pos: RA {:.2f} / Dec {:.2f}\n".format( self.pointing_radec.ra, self.pointing_radec.dec ) ss += "- Observation duration: {}\n".format(self.observation_time_duration) ss += "- Dead-time fraction: {:5.3f} %\n".format( 100 * self.observation_dead_time_fraction ) # TODO: Which target was observed? # TODO: print info about available HDUs for this observation ... return ss
[docs] def location(self, hdu_type=None, hdu_class=None): """HDU location object. Parameters ---------- hdu_type : str HDU type (see `~gammapy.data.HDUIndexTable.VALID_HDU_TYPE`) hdu_class : str HDU class (see `~gammapy.data.HDUIndexTable.VALID_HDU_CLASS`) Returns ------- location : `~gammapy.data.HDULocation` HDU location """ return self.data_store.hdu_table.hdu_location( obs_id=self.obs_id, hdu_type=hdu_type, hdu_class=hdu_class )
[docs] def load(self, hdu_type=None, hdu_class=None): """Load data file as appropriate object. Parameters ---------- hdu_type : str HDU type (see `~gammapy.data.HDUIndexTable.VALID_HDU_TYPE`) hdu_class : str HDU class (see `~gammapy.data.HDUIndexTable.VALID_HDU_CLASS`) Returns ------- object : object Object depends on type, e.g. for `events` it's a `~gammapy.data.EventList`. """ location = self.location(hdu_type=hdu_type, hdu_class=hdu_class) return location.load()
@property def events(self): """Load `gammapy.data.EventList` object.""" return self.load(hdu_type="events") @property def gti(self): """Load `gammapy.data.GTI` object.""" return self.load(hdu_type="gti") @property def aeff(self): """Load effective area object.""" return self.load(hdu_type="aeff") @property def edisp(self): """Load energy dispersion object.""" return self.load(hdu_type="edisp") @property def psf(self): """Load point spread function object.""" return self.load(hdu_type="psf") @property def bkg(self): """Load background object.""" return self.load(hdu_type="bkg") @lazyproperty def obs_info(self): """Observation information (`~collections.OrderedDict`).""" row = self.data_store.obs_table.select_obs_id(obs_id=self.obs_id)[0] return table_row_to_dict(row) @lazyproperty def tstart(self): """Observation start time (`~astropy.time.Time`).""" met_ref = time_ref_from_dict(self.data_store.obs_table.meta) met = Quantity(self.obs_info["TSTART"].astype("float64"), "second") time = met_ref + met return time @lazyproperty def tstop(self): """Observation stop time (`~astropy.time.Time`).""" met_ref = time_ref_from_dict(self.data_store.obs_table.meta) met = Quantity(self.obs_info["TSTOP"].astype("float64"), "second") time = met_ref + met return time @lazyproperty def observation_time_duration(self): """Observation time duration in seconds (`~astropy.units.Quantity`). The wall time, including dead-time. """ return Quantity(self.obs_info["ONTIME"], "second") @lazyproperty def observation_live_time_duration(self): """Live-time duration in seconds (`~astropy.units.Quantity`). The dead-time-corrected observation time. Computed as ``t_live = t_observation * (1 - f_dead)`` where ``f_dead`` is the dead-time fraction. """ return Quantity(self.obs_info["LIVETIME"], "second") @lazyproperty def observation_dead_time_fraction(self): """Dead-time fraction (float). Defined as dead-time over observation time. Dead-time is defined as the time during the observation where the detector didn't record events: https://en.wikipedia.org/wiki/Dead_time https://adsabs.harvard.edu/abs/2004APh....22..285F The dead-time fraction is used in the live-time computation, which in turn is used in the exposure and flux computation. """ return 1 - self.obs_info["DEADC"] @lazyproperty def pointing_radec(self): """Pointing RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`).""" lon, lat = self.obs_info["RA_PNT"], self.obs_info["DEC_PNT"] return SkyCoord(lon, lat, unit="deg", frame="icrs") @lazyproperty def pointing_altaz(self): """Pointing ALT / AZ sky coordinates (`~astropy.coordinates.SkyCoord`).""" alt, az = self.obs_info["ALT_PNT"], self.obs_info["AZ_PNT"] return SkyCoord(az, alt, unit="deg", frame="altaz") @lazyproperty def pointing_zen(self): """Pointing zenith angle sky (`~astropy.units.Quantity`).""" return Quantity(self.obs_info["ZEN_PNT"], unit="deg") @lazyproperty def target_radec(self): """Target RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`).""" lon, lat = self.obs_info["RA_OBJ"], self.obs_info["DEC_OBJ"] return SkyCoord(lon, lat, unit="deg", frame="icrs") @lazyproperty def observatory_earth_location(self): """Observatory location (`~astropy.coordinates.EarthLocation`).""" return earth_location_from_dict(self.obs_info) @lazyproperty def muoneff(self): """Observation muon efficiency.""" return self.obs_info["MUONEFF"]
[docs] def peek(self): """Quick-look plots in a few panels.""" raise NotImplementedError
[docs] def make_psf(self, position, energy=None, rad=None): """Make energy-dependent PSF for a given source position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position at which to compute the PSF energy : `~astropy.units.Quantity` 1-dim energy array for the output PSF. If none is given, the energy array of the PSF from the observation is used. rad : `~astropy.coordinates.Angle` 1-dim offset wrt source position array for the output PSF. If none is given, the offset array of the PSF from the observation is used. Returns ------- psf : `~gammapy.irf.EnergyDependentTablePSF` Energy dependent psf table """ offset = position.separation(self.pointing_radec) if energy is None: energy = self.psf.to_energy_dependent_table_psf(theta=offset).energy if rad is None: rad = self.psf.to_energy_dependent_table_psf(theta=offset).rad if isinstance(self.psf, PSF3D): # PSF3D is a table PSF, so we use the native RAD binning by default # TODO: should handle this via a uniform caller API psf_value = self.psf.to_energy_dependent_table_psf(theta=offset).evaluate( energy ) else: psf_value = self.psf.to_energy_dependent_table_psf( theta=offset, rad=rad ).evaluate(energy) arf = self.aeff.data.evaluate(offset=offset, energy=energy) exposure = arf * self.observation_live_time_duration psf = EnergyDependentTablePSF( energy=energy, rad=rad, exposure=exposure, psf_value=psf_value ) return psf
[docs] def to_observation_cta(self): """Convert to `~gammapy.data.ObservationCTA`. This loads all observation-related info from disk and stores it in the in-memory ``ObservationCTA``. Returns ------- obs : `~gammapy.data.ObservationCTA` Observation """ # maps the ObservationCTA class attributes to the DataStoreObservation properties props = { "obs_id": "obs_id", "gti": "gti", "events": "events", "aeff": "aeff", "edisp": "edisp", "psf": "psf", "bkg": "bkg", "pointing_radec": "pointing_radec", "observation_live_time_duration": "observation_live_time_duration", "observation_dead_time_fraction": "observation_dead_time_fraction", } for obs_cta_kwarg, ds_obs_prop in props.items(): try: props[obs_cta_kwarg] = getattr(self, ds_obs_prop) except Exception as exception: log.warning(exception) props[obs_cta_kwarg] = None return ObservationCTA(**props)
[docs] def check(self, checks="all"): """Run checks. This is a generator that yields a list of dicts. """ checker = ObservationChecker(self) return checker.run(checks=checks)
[docs]class ObservationList(UserList): """List of `~gammapy.data.DataStoreObservation`. Could be extended to hold a more generic class of observations. """ def __str__(self): s = self.__class__.__name__ + "\n" s += "Number of observations: {}\n".format(len(self)) for obs in self: s += str(obs) return s
[docs] def make_mean_psf(self, position, energy=None, rad=None): """Compute mean energy-dependent PSF. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position at which to compute the PSF energy : `~astropy.units.Quantity` 1-dim energy array for the output PSF. If none is given, the energy array of the PSF from the first observation is used. rad : `~astropy.coordinates.Angle` 1-dim offset wrt source position array for the output PSF. If none is given, the energy array of the PSF from the first observation is used. Returns ------- psf : `~gammapy.irf.EnergyDependentTablePSF` Mean PSF """ psf = self[0].make_psf(position, energy, rad) if rad is None: rad = psf.rad if energy is None: energy = psf.energy exposure = psf.exposure psf_value = psf.psf_value.T * psf.exposure for obs in self[1:]: psf = obs.make_psf(position, energy, rad) exposure += psf.exposure psf_value += psf.psf_value.T * psf.exposure psf_value /= exposure psf_tot = EnergyDependentTablePSF( energy=energy, rad=rad, exposure=exposure, psf_value=psf_value.T ) return psf_tot
[docs] def make_mean_edisp( self, position, e_true, e_reco, low_reco_threshold=Energy(0.002, "TeV"), high_reco_threshold=Energy(150, "TeV"), ): """Compute mean energy dispersion. Compute the mean edisp of a set of observations j at a given position The stacking is implemented in :func:`~gammapy.irf.IRFStacker.stack_edisp` Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position at which to compute the mean EDISP e_true : `~gammapy.utils.energy.EnergyBounds` True energy axis e_reco : `~gammapy.utils.energy.EnergyBounds` Reconstructed energy axis low_reco_threshold : `~gammapy.utils.energy.Energy` low energy threshold in reco energy, default 0.002 TeV high_reco_threshold : `~gammapy.utils.energy.Energy` high energy threshold in reco energy , default 150 TeV Returns ------- stacked_edisp : `~gammapy.irf.EnergyDispersion` Stacked EDISP for a set of observation """ list_aeff = [] list_edisp = [] list_livetime = [] list_low_threshold = [low_reco_threshold] * len(self) list_high_threshold = [high_reco_threshold] * len(self) for obs in self: offset = position.separation(obs.pointing_radec) list_aeff.append(obs.aeff.to_effective_area_table(offset, energy=e_true)) list_edisp.append( obs.edisp.to_energy_dispersion(offset, e_reco=e_reco, e_true=e_true) ) list_livetime.append(obs.observation_live_time_duration) irf_stack = IRFStacker( list_aeff=list_aeff, list_edisp=list_edisp, list_livetime=list_livetime, list_low_threshold=list_low_threshold, list_high_threshold=list_high_threshold, ) irf_stack.stack_edisp() return irf_stack.stacked_edisp
class ObservationChecker(Checker): """Check an observation. Checks data format and a bit about the content. """ CHECKS = { "events": "check_events", "gti": "check_gti", "aeff": "check_aeff", "edisp": "check_edisp", "psf": "check_psf", } def __init__(self, obs): self.obs = obs def _record(self, level="info", msg=None): return {"level": level, "obs_id": self.obs.obs_id, "msg": msg} def check_events(self): yield self._record(level="debug", msg="Starting events check") try: events = self.obs.load("events") except Exception: yield self._record(level="warning", msg="Loading events failed") return for record in EventListChecker(events).run(): yield record # TODO: split this out into a GTIChecker def check_gti(self): yield self._record(level="debug", msg="Starting gti check") try: gti = self.obs.load("gti") except Exception: yield self._record(level="warning", msg="Loading GTI failed") return if len(gti.table) == 0: yield self._record(level="error", msg="GTI table has zero rows") columns_required = ["START", "STOP"] for name in columns_required: if name not in gti.table.colnames: yield self._record( level="error", msg="Missing table column: {!r}".format(name) ) # TODO: Check that header keywords agree with table entries # TSTART, TSTOP, MJDREFI, MJDREFF # Check that START and STOP times are consecutive # times = np.ravel(self.table['START'], self.table['STOP']) # # TODO: not sure this is correct ... add test with a multi-gti table from Fermi. # if not np.all(np.diff(times) >= 0): # yield 'GTIs are not consecutive or sorted.' # TODO: add reference times for all instruments and check for this # Use TELESCOP header key to check which instrument it is. def _check_times(self): """Check if various times are consistent. The headers and tables of the FITS EVENTS and GTI extension contain various observation and event time information. """ # http://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Cicerone/Cicerone_Data/Time_in_ScienceTools.html # https://hess-confluence.desy.de/confluence/display/HESS/HESS+FITS+data+-+References+and+checks#HESSFITSdata-Referencesandchecks-Time telescope_met_refs = OrderedDict( FERMI=Time("2001-01-01T00:00:00"), HESS=Time("2001-01-01T00:00:00") ) meta = self.dset.event_list.table.meta telescope = meta["TELESCOP"] if telescope in telescope_met_refs.keys(): dt = self.time_ref - telescope_met_refs[telescope] if dt > self.accuracy["time"]: yield self._record( level="error", msg="Reference time incorrect for telescope" ) def check_aeff(self): yield self._record(level="debug", msg="Starting aeff check") try: aeff = self.obs.load("aeff") except Exception: yield self._record(level="warning", msg="Loading aeff failed") return # Check that thresholds are meaningful for aeff if ( "LO_THRES" in aeff.meta and "HI_THRES" in aeff.meta and aeff.meta["LO_THRES"] >= aeff.meta["HI_THRES"] ): yield self._record( level="error", msg="LO_THRES >= HI_THRES in effective area meta data" ) # Check that data isn't all null if np.max(aeff.data.data) <= 0: yield self._record( level="error", msg="maximum entry of effective area is <= 0" ) def check_edisp(self): yield self._record(level="debug", msg="Starting edisp check") try: edisp = self.obs.load("edisp") except Exception: yield self._record(level="warning", msg="Loading edisp failed") return # Check that data isn't all null if np.max(edisp.data.data) <= 0: yield self._record(level="error", msg="maximum entry of edisp is <= 0") def check_psf(self): yield self._record(level="debug", msg="Starting psf check") try: psf = self.obs.load("psf") except Exception: yield self._record(level="warning", msg="Loading psf failed") return # TODO: implement some basic check # The following doesn't work, because EnergyDependentMultiGaussPSF # has no attribute `data` # Check that data isn't all null # if np.max(psf.data.data) <= 0: # yield self._record( # level="error", msg="maximum entry of psf is <= 0" # )