# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import subprocess
from pathlib import Path
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_row_to_dict
from gammapy.utils.testing import Checker
from .hdu_index_table import HDUIndexTable
from .obs_table import ObservationTable, ObservationTableChecker
from .observations import Observation, ObservationChecker, Observations
__all__ = ["DataStore"]
log = logging.getLogger(__name__)
[docs]class DataStore:
"""IACT data store.
The data selection and access happens using an observation
and an HDU index file as described at :ref:`gadf:iact-storage`.
For a usage example see `cta.html <../tutorials/cta.html>`__
Parameters
----------
hdu_table : `~gammapy.data.HDUIndexTable`
HDU index table
obs_table : `~gammapy.data.ObservationTable`
Observation index table
Examples
--------
Here's an example how to create a `DataStore` to access H.E.S.S. data:
>>> from gammapy.data import DataStore
>>> data_store = DataStore.from_dir('$GAMMAPY_DATA/hess-dl3-dr1')
>>> data_store.info()
Data store:
HDU index table:
BASE_DIR: /home/runner/work/gammapy/gammapy/gammapy-datasets/hess-dl3-dr1
Rows: 630
OBS_ID: 20136 -- 47829
HDU_TYPE: ['aeff', 'bkg', 'edisp', 'events', 'gti', 'psf']
HDU_CLASS: ['aeff_2d', 'bkg_3d', 'edisp_2d', 'events', 'gti', 'psf_table']
<BLANKLINE>
<BLANKLINE>
Observation table:
Observatory name: 'N/A'
Number of observations: 105
<BLANKLINE>
"""
DEFAULT_HDU_TABLE = "hdu-index.fits.gz"
"""Default HDU table filename."""
DEFAULT_OBS_TABLE = "obs-index.fits.gz"
"""Default observation table filename."""
def __init__(self, hdu_table=None, obs_table=None):
self.hdu_table = hdu_table
self.obs_table = obs_table
def __str__(self):
return self.info(show=False)
[docs] @classmethod
def from_file(cls, filename, hdu_hdu="HDU_INDEX", hdu_obs="OBS_INDEX"):
"""Create from a FITS file.
The FITS file must contain both index files.
Parameters
----------
filename : str, Path
FITS filename
hdu_hdu : str or int
FITS HDU name or number for the HDU index table
hdu_obs : str or int
FITS HDU name or number for the observation index table
Returns
-------
data_store : `DataStore`
Data store
"""
filename = make_path(filename)
hdu_table = HDUIndexTable.read(filename, hdu=hdu_hdu, format="fits")
obs_table = ObservationTable.read(filename, hdu=hdu_obs, format="fits")
return cls(hdu_table=hdu_table, obs_table=obs_table)
[docs] @classmethod
def from_dir(cls, base_dir, hdu_table_filename=None, obs_table_filename=None):
"""Create from a directory.
Parameters
----------
base_dir : str, Path
Base directory of the data files.
hdu_table_filename : str, Path
Filename of the HDU index file. May be specified either relative
to `base_dir` or as an absolute path. If None, the default filename
will be looked for.
obs_table_filename : str, Path
Filename of the observation index file. May be specified either relative
to `base_dir` or as an absolute path. If None, the default filename
will be looked for.
"""
base_dir = make_path(base_dir)
if hdu_table_filename:
hdu_table_filename = make_path(hdu_table_filename)
if (base_dir / hdu_table_filename).exists():
hdu_table_filename = base_dir / hdu_table_filename
else:
hdu_table_filename = base_dir / cls.DEFAULT_HDU_TABLE
if obs_table_filename:
obs_table_filename = make_path(obs_table_filename)
if (base_dir / obs_table_filename).exists():
obs_table_filename = base_dir / obs_table_filename
else:
obs_table_filename = base_dir / cls.DEFAULT_OBS_TABLE
if not hdu_table_filename.exists():
raise OSError(f"File not found: {hdu_table_filename}")
log.debug(f"Reading {hdu_table_filename}")
hdu_table = HDUIndexTable.read(hdu_table_filename, format="fits")
hdu_table.meta["BASE_DIR"] = str(base_dir)
if not obs_table_filename.exists():
raise OSError(f"File not found: {obs_table_filename}")
log.debug(f"Reading {obs_table_filename}")
obs_table = ObservationTable.read(obs_table_filename, format="fits")
return cls(hdu_table=hdu_table, obs_table=obs_table)
[docs] @classmethod
def from_events_files(cls, paths):
"""Create from a list of event filenames.
HDU and observation index tables will be created from the EVENTS header.
IRFs are found only if you have a ``CALDB`` environment variable set,
and if the EVENTS files contain the following keys:
- ``TELESCOP`` (example: ``TELESCOP = CTA``)
- ``CALDB`` (example: ``CALDB = 1dc``)
- ``IRF`` (example: ``IRF = South_z20_50h``)
This method is useful specifically if you want to load data simulated
with `ctobssim`_
.. _ctobssim: http://cta.irap.omp.eu/ctools/users/reference_manual/ctobssim.html
Examples
--------
This is how you can access a single event list::
from gammapy.data import DataStore
path = "$GAMMAPY_DATA/cta-1dc/data/baseline/gps/gps_baseline_110380.fits"
data_store = DataStore.from_events_files([path])
observations = data_store.get_observations()
You can now analyse this data as usual (see any Gammapy tutorial).
If you have multiple event files, you have to make the list. Here's an example
using ``Path.glob`` to get a list of all events files in a given folder::
import os
from pathlib import Path
path = Path(os.environ["GAMMAPY_DATA"]) / "cta-1dc/data"
paths = list(path.rglob("*.fits"))
data_store = DataStore.from_events_files(paths)
observations = data_store.get_observations()
Note that you have a lot of flexibility to select the observations you want,
by having a few lines of custom code to prepare ``paths``, or to select a
subset via a method on the ``data_store`` or the ``observations`` objects.
If you want to generate HDU and observation index files, write the tables to disk::
data_store.hdu_table.write("hdu-index.fits.gz")
data_store.obs_table.write("obs-index.fits.gz")
"""
return DataStoreMaker(paths).run()
[docs] def info(self, show=True):
"""Print some info."""
s = "Data store:\n"
s += self.hdu_table.summary()
s += "\n\n"
s += self.obs_table.summary()
if show:
print(s)
else:
return s
[docs] def obs(self, obs_id):
"""Access a given `~gammapy.data.Observation`.
Parameters
----------
obs_id : int
Observation ID.
Returns
-------
observation : `~gammapy.data.Observation`
Observation container
"""
if obs_id not in self.obs_table["OBS_ID"]:
raise ValueError(f"OBS_ID = {obs_id} not in obs index table.")
if obs_id not in self.hdu_table["OBS_ID"]:
raise ValueError(f"OBS_ID = {obs_id} not in HDU index table.")
row = self.obs_table.select_obs_id(obs_id=obs_id)[0]
kwargs = {"obs_id": int(obs_id)}
kwargs["obs_info"] = table_row_to_dict(row)
hdu_list = ["events", "gti", "aeff", "edisp", "psf", "bkg", "rad_max"]
for hdu in hdu_list:
kwargs[hdu] = self.hdu_table.hdu_location(obs_id=obs_id, hdu_type=hdu)
return Observation(**kwargs)
[docs] def get_observations(self, obs_id=None, skip_missing=False, required_irf="all"):
"""Generate a `~gammapy.data.Observations`.
Parameters
----------
obs_id : list
Observation IDs (default of ``None`` means "all")
skip_missing : bool, optional
Skip missing observations, default: False
required_irf : list of str
Runs will be added to the list of observations only if the
required IRFs are present. Otherwise, the given run will be skipped
Available options are:
* `aeff` : Effective area
* `bkg` : Background
* `edisp`: Energy dispersion
* `psf` : Point Spread Function
By default, all the IRFs are required.
Returns
-------
observations : `~gammapy.data.Observations`
Container holding a list of `~gammapy.data.Observation`
"""
available_irf = ["aeff", "edisp", "psf", "bkg"]
if required_irf == "all":
required_irf = available_irf
elif required_irf is None:
required_irf = []
if not set(required_irf).issubset(available_irf):
difference = set(required_irf).difference(available_irf)
raise ValueError(
f"{difference} is not a valid irf key. Choose from: {available_irf}"
)
if obs_id is None:
obs_id = self.obs_table["OBS_ID"].data
obs_list = []
for _ in obs_id:
try:
obs = self.obs(_)
except ValueError as err:
if skip_missing:
log.warning(f"Skipping missing obs_id: {_!r}")
continue
else:
raise err
if set(required_irf).issubset(obs.available_irfs):
obs_list.append(obs)
else:
log.warning(f"Skipping run with missing IRFs; obs_id: {_!r}")
return Observations(obs_list)
[docs] def copy_obs(self, obs_id, outdir, hdu_class=None, verbose=False, overwrite=False):
"""Create a new `~gammapy.data.DataStore` containing a subset of observations.
Parameters
----------
obs_id : array-like, `~gammapy.data.ObservationTable`
List of observations to copy
outdir : str, Path
Directory for the new store
hdu_class : list of str
see :attr:`gammapy.data.HDUIndexTable.VALID_HDU_CLASS`
verbose : bool
Print copied files
overwrite : bool
Overwrite
"""
outdir = make_path(outdir)
if not outdir.is_dir():
raise OSError(f"Not a directory: outdir={outdir}")
if isinstance(obs_id, ObservationTable):
obs_id = obs_id["OBS_ID"].data
hdutable = self.hdu_table
hdutable.add_index("OBS_ID")
with hdutable.index_mode("discard_on_copy"):
subhdutable = hdutable.loc[obs_id]
if hdu_class is not None:
subhdutable.add_index("HDU_CLASS")
with subhdutable.index_mode("discard_on_copy"):
subhdutable = subhdutable.loc[hdu_class]
subobstable = self.obs_table.select_obs_id(obs_id)
for idx in range(len(subhdutable)):
# Changes to the file structure could be made here
loc = subhdutable.location_info(idx)
targetdir = outdir / loc.file_dir
targetdir.mkdir(exist_ok=True, parents=True)
cmd = ["cp"]
if verbose:
cmd += ["-v"]
if not overwrite:
cmd += ["-n"]
cmd += [str(loc.path()), str(targetdir)]
subprocess.run(cmd)
filename = outdir / self.DEFAULT_HDU_TABLE
subhdutable.write(filename, format="fits", overwrite=overwrite)
filename = outdir / self.DEFAULT_OBS_TABLE
subobstable.write(str(filename), format="fits", overwrite=overwrite)
[docs] def check(self, checks="all"):
"""Check index tables and data files.
This is a generator that yields a list of dicts.
"""
checker = DataStoreChecker(self)
return checker.run(checks=checks)
class DataStoreChecker(Checker):
"""Check data store.
Checks data format and a bit about the content.
"""
CHECKS = {
"obs_table": "check_obs_table",
"hdu_table": "check_hdu_table",
"observations": "check_observations",
"consistency": "check_consistency",
}
def __init__(self, data_store):
self.data_store = data_store
def check_obs_table(self):
"""Checks for the observation index table."""
yield from ObservationTableChecker(self.data_store.obs_table).run()
def check_hdu_table(self):
"""Checks for the HDU index table."""
t = self.data_store.hdu_table
m = t.meta
if m.get("HDUCLAS1", "") != "INDEX":
yield {
"level": "error",
"hdu": "hdu-index",
"msg": "Invalid header key. Must have HDUCLAS1=INDEX",
}
if m.get("HDUCLAS2", "") != "HDU":
yield {
"level": "error",
"hdu": "hdu-index",
"msg": "Invalid header key. Must have HDUCLAS2=HDU",
}
# Check that all HDU in the data files exist
for idx in range(len(t)):
location_info = t.location_info(idx)
try:
location_info.get_hdu()
except KeyError:
yield {
"level": "error",
"msg": f"HDU not found: {location_info.__dict__!r}",
}
def check_consistency(self):
"""Check consistency between multiple HDUs."""
# obs and HDU index should have the same OBS_ID
obs_table_obs_id = set(self.data_store.obs_table["OBS_ID"])
hdu_table_obs_id = set(self.data_store.hdu_table["OBS_ID"])
if not obs_table_obs_id == hdu_table_obs_id:
yield {
"level": "error",
"msg": "Inconsistent OBS_ID in obs and HDU index tables",
}
# TODO: obs table and events header should have the same times
def check_observations(self):
"""Perform some sanity checks for all observations."""
for obs_id in self.data_store.obs_table["OBS_ID"]:
obs = self.data_store.obs(obs_id)
yield from ObservationChecker(obs).run()
class DataStoreMaker:
"""Create data store index tables.
This is a multi-step process coded as a class.
Users will usually call this via `DataStore.from_events_files`.
"""
def __init__(self, paths):
if isinstance(paths, (str, Path)):
raise TypeError("Need list of paths, not a single string or Path object.")
self.paths = [make_path(path) for path in paths]
# Cache for EVENTS file header information, to avoid multiple reads
self._events_info = {}
def run(self):
hdu_table = self.make_hdu_table()
obs_table = self.make_obs_table()
return DataStore(hdu_table=hdu_table, obs_table=obs_table)
def get_events_info(self, path):
if path not in self._events_info:
self._events_info[path] = self.read_events_info(path)
return self._events_info[path]
def get_obs_info(self, path):
# We could add or remove info here depending on what we want in the obs table
return self.get_events_info(path)
@staticmethod
def read_events_info(path):
"""Read mandatory events header info"""
log.debug(f"Reading {path}")
with fits.open(path, memmap=False) as hdu_list:
header = hdu_list["EVENTS"].header
na_int, na_str = -1, "NOT AVAILABLE"
info = {}
# Note: for some reason `header["OBS_ID"]` is sometimes `str`, maybe trailing whitespace
# mandatory header info:
info["OBS_ID"] = int(header["OBS_ID"])
info["TSTART"] = header["TSTART"] * u.s
info["TSTOP"] = header["TSTOP"] * u.s
info["ONTIME"] = header["ONTIME"] * u.s
info["LIVETIME"] = header["LIVETIME"] * u.s
info["DEADC"] = header["DEADC"]
info["TELESCOP"] = header.get("TELESCOP", na_str)
obs_mode = header.get("OBS_MODE", "POINTING")
if obs_mode == "DRIFT":
info["ALT_PNT"] = header["ALT_PNT"] * u.deg
info["AZ_PNT"] = header["AZ_PNT"] * u.deg
info["ZEN_PNT"] = 90 * u.deg - info["ALT_PNT"]
else:
info["RA_PNT"] = header["RA_PNT"] * u.deg
info["DEC_PNT"] = header["DEC_PNT"] * u.deg
# optional header info
pos = SkyCoord(info["RA_PNT"], info["DEC_PNT"], unit="deg").galactic
info["GLON_PNT"] = pos.l
info["GLAT_PNT"] = pos.b
info["DATE-OBS"] = header.get("DATE_OBS", na_str)
info["TIME-OBS"] = header.get("TIME_OBS", na_str)
info["DATE-END"] = header.get("DATE_END", na_str)
info["TIME-END"] = header.get("TIME_END", na_str)
info["N_TELS"] = header.get("N_TELS", na_int)
info["OBJECT"] = header.get("OBJECT", na_str)
# This is the info needed to link from EVENTS to IRFs
info["CALDB"] = header.get("CALDB", na_str)
info["IRF"] = header.get("IRF", na_str)
# Not part of the spec, but good to know from which file the info comes
info["EVENTS_FILENAME"] = str(path)
info["EVENT_COUNT"] = header["NAXIS2"]
return info
def make_obs_table(self):
rows = []
for path in self.paths:
row = self.get_obs_info(path)
rows.append(row)
names = list(rows[0].keys())
table = ObservationTable(rows=rows, names=names)
# TODO: Values copied from one of the EVENTS headers
# TODO: check consistency for all EVENTS files and handle inconsistent case
# Transform times to first ref time? Or raise error for now?
# Test by combining some HESS & CTA runs?
m = table.meta
m["MJDREFI"] = 51544
m["MJDREFF"] = 5.0000000000e-01
m["TIMEUNIT"] = "s"
m["TIMESYS"] = "TT"
m["TIMEREF"] = "LOCAL"
m["HDUCLASS"] = "GADF"
m["HDUDOC"] = "https://github.com/open-gamma-ray-astro/gamma-astro-data-formats"
m["HDUVERS"] = "0.2"
m["HDUCLAS1"] = "INDEX"
m["HDUCLAS2"] = "OBS"
return table
def make_hdu_table(self):
rows = []
for path in self.paths:
rows.extend(self.get_hdu_table_rows(path))
names = list(rows[0].keys())
# names = ['OBS_ID', 'HDU_TYPE', 'HDU_CLASS', 'FILE_DIR', 'FILE_NAME', 'HDU_NAME']
table = HDUIndexTable(rows=rows, names=names)
m = table.meta
m["HDUCLASS"] = "GADF"
m["HDUDOC"] = "https://github.com/open-gamma-ray-astro/gamma-astro-data-formats"
m["HDUVERS"] = "0.2"
m["HDUCLAS1"] = "INDEX"
m["HDUCLAS2"] = "HDU"
return table
def get_hdu_table_rows(self, path):
events_info = self.get_events_info(path)
info = dict(
OBS_ID=events_info["OBS_ID"],
FILE_DIR=path.parent.as_posix(),
FILE_NAME=path.name,
)
yield dict(HDU_TYPE="events", HDU_CLASS="events", HDU_NAME="EVENTS", **info)
yield dict(HDU_TYPE="gti", HDU_CLASS="gti", HDU_NAME="GTI", **info)
caldb_irf = CalDBIRF.from_meta(events_info)
info = dict(
OBS_ID=events_info["OBS_ID"],
FILE_DIR=caldb_irf.file_dir,
FILE_NAME=caldb_irf.file_name,
)
yield dict(
HDU_TYPE="aeff", HDU_CLASS="aeff_2d", HDU_NAME="EFFECTIVE AREA", **info
)
yield dict(
HDU_TYPE="edisp", HDU_CLASS="edisp_2d", HDU_NAME="ENERGY DISPERSION", **info
)
yield dict(
HDU_TYPE="psf",
HDU_CLASS="psf_3gauss",
HDU_NAME="POINT SPREAD FUNCTION",
**info,
)
yield dict(HDU_TYPE="bkg", HDU_CLASS="bkg_3d", HDU_NAME="BACKGROUND", **info)
# TODO: load IRF file, and infer HDU_CLASS from IRF file contents!
class CalDBIRF:
"""Helper class to work with IRFs in CALDB format."""
def __init__(self, telescop, caldb, irf):
self.telescop = telescop
self.caldb = caldb
self.irf = irf
@classmethod
def from_meta(cls, meta):
return cls(telescop=meta["TELESCOP"], caldb=meta["CALDB"], irf=meta["IRF"])
@property
def file_dir(self):
# In CTA 1DC the header key is "CTA", but the directory is lower-case "cta"
telescop = self.telescop.lower()
return f"$CALDB/data/{telescop}/{self.caldb}/bcf/{self.irf}"
@property
def file_name(self):
return "irf_file.fits"