Source code for gammapy.datasets.simulate
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Simulate observations."""
import numpy as np
import astropy.units as u
from astropy.coordinates import AltAz, SkyCoord, SkyOffsetFrame
from astropy.table import Table
import gammapy
from gammapy.data import EventList, observatory_locations
from gammapy.maps import MapCoord
from gammapy.modeling.models import ConstantTemporalModel
from gammapy.utils.random import get_random_state
__all__ = ["MapDatasetEventSampler"]
[docs]class MapDatasetEventSampler:
"""Sample events from a map dataset.
Parameters
----------
random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}
Defines random number generator initialisation via the `~gammapy.utils.random.get_random_state` function
"""
def __init__(self, random_state="random-seed"):
self.random_state = get_random_state(random_state)
def _sample_coord_time(self, npred, temporal_model, gti):
data = npred.data[np.isfinite(npred.data)]
n_events = self.random_state.poisson(np.sum(data))
coords = npred.sample_coord(n_events=n_events, random_state=self.random_state)
table = Table()
try:
energy = coords["energy_true"]
except KeyError:
energy = coords["energy"]
table["ENERGY_TRUE"] = energy
table["RA_TRUE"] = coords.skycoord.icrs.ra.to("deg")
table["DEC_TRUE"] = coords.skycoord.icrs.dec.to("deg")
time_start, time_stop, time_ref = (gti.time_start, gti.time_stop, gti.time_ref)
time = temporal_model.sample_time(
n_events=n_events,
t_min=time_start,
t_max=time_stop,
random_state=self.random_state,
)
table["TIME"] = (time - time_ref).to("s")
return table
[docs] def sample_sources(self, dataset):
"""Sample source model components.
Parameters
----------
dataset : `~gammapy.datasets.MapDataset`
Map dataset
Returns
-------
events : `~gammapy.data.EventList`
Event list
"""
events_all = []
for idx, evaluator in enumerate(dataset.evaluators.values()):
if evaluator.needs_update:
evaluator.update(
dataset.exposure,
dataset.psf,
dataset.edisp,
dataset._geom,
dataset.mask,
)
flux = evaluator.compute_flux()
npred = evaluator.apply_exposure(flux)
if evaluator.model.temporal_model is None:
temporal_model = ConstantTemporalModel()
else:
temporal_model = evaluator.model.temporal_model
table = self._sample_coord_time(npred, temporal_model, dataset.gti)
if len(table) == 0:
mcid = table.Column(name="MC_ID", length=0, dtype=int)
table.add_column(mcid)
table["MC_ID"] = idx + 1
table.meta["MID{:05d}".format(idx + 1)] = idx + 1
table.meta["MMN{:05d}".format(idx + 1)] = evaluator.model.name
events_all.append(EventList(table))
return EventList.from_stack(events_all)
[docs] def sample_background(self, dataset):
"""Sample background.
Parameters
----------
dataset : `~gammapy.datasets.MapDataset`
Map dataset
Returns
-------
events : `gammapy.data.EventList`
Background events
"""
background = dataset.npred_background()
temporal_model = ConstantTemporalModel()
table = self._sample_coord_time(background, temporal_model, dataset.gti)
table["MC_ID"] = 0
table["ENERGY"] = table["ENERGY_TRUE"]
table["RA"] = table["RA_TRUE"]
table["DEC"] = table["DEC_TRUE"]
table.meta["MID{:05d}".format(0)] = 0
table.meta["MMN{:05d}".format(0)] = dataset.background_model.name
return EventList(table)
[docs] def sample_edisp(self, edisp_map, events):
"""Sample energy dispersion map.
Parameters
----------
edisp_map : `~gammapy.irf.EDispMap`
Energy dispersion map
events : `~gammapy.data.EventList`
Event list with the true energies
Returns
-------
events : `~gammapy.data.EventList`
Event list with reconstructed energy column
"""
coord = MapCoord(
{
"lon": events.table["RA_TRUE"].quantity,
"lat": events.table["DEC_TRUE"].quantity,
"energy_true": events.table["ENERGY_TRUE"].quantity,
},
frame="icrs",
)
coords_reco = edisp_map.sample_coord(coord, self.random_state)
events.table["ENERGY"] = coords_reco["energy"]
return events
[docs] def sample_psf(self, psf_map, events):
"""Sample psf map.
Parameters
----------
psf_map : `~gammapy.irf.PSFMap`
PSF map
events : `~gammapy.data.EventList`
Event list
Returns
-------
events : `~gammapy.data.EventList`
Event list with reconstructed position columns
"""
coord = MapCoord(
{
"lon": events.table["RA_TRUE"].quantity,
"lat": events.table["DEC_TRUE"].quantity,
"energy_true": events.table["ENERGY_TRUE"].quantity,
},
frame="icrs",
)
coords_reco = psf_map.sample_coord(coord, self.random_state)
events.table["RA"] = coords_reco["lon"] * u.deg
events.table["DEC"] = coords_reco["lat"] * u.deg
return events
[docs] @staticmethod
def event_det_coords(observation, events):
"""Add columns of detector coordinates (DETX-DETY) to the event list.
Parameters
----------
observation : `~gammapy.data.Observation`
In memory observation
events : `~gammapy.data.EventList`
Event list
Returns
-------
events : `~gammapy.data.EventList`
Event list with columns of event detector coordinates
"""
sky_coord = SkyCoord(events.table["RA"], events.table["DEC"], frame="icrs")
frame = SkyOffsetFrame(origin=observation.pointing_radec.icrs)
pseudo_fov_coord = sky_coord.transform_to(frame)
events.table["DETX"] = pseudo_fov_coord.lon
events.table["DETY"] = pseudo_fov_coord.lat
return events
[docs] @staticmethod
def event_list_meta(dataset, observation):
"""Event list meta info.
Parameters
----------
dataset : `~gammapy.datasets.MapDataset`
Map dataset
observation : `~gammapy.data.Observation`
In memory observation
Returns
-------
meta : dict
Meta dictionary
"""
# See: https://gamma-astro-data-formats.readthedocs.io/en/latest/events/events.html#mandatory-header-keywords # noqa: E501
meta = {}
meta["HDUCLAS1"] = "EVENTS"
meta["EXTNAME"] = "EVENTS"
meta[
"HDUDOC"
] = "https://github.com/open-gamma-ray-astro/gamma-astro-data-formats"
meta["HDUVERS"] = "0.2"
meta["HDUCLASS"] = "GADF"
meta["OBS_ID"] = observation.obs_id
meta["TSTART"] = (
((observation.tstart.mjd - dataset.gti.time_ref.mjd) * u.day).to(u.s).value
)
meta["TSTOP"] = (
((observation.tstop.mjd - dataset.gti.time_ref.mjd) * u.day).to(u.s).value
)
meta["ONTIME"] = observation.observation_time_duration.to("s").value
meta["LIVETIME"] = observation.observation_live_time_duration.to("s").value
meta["DEADC"] = 1 - observation.observation_dead_time_fraction
meta["RA_PNT"] = observation.pointing_radec.icrs.ra.deg
meta["DEC_PNT"] = observation.pointing_radec.icrs.dec.deg
meta["EQUINOX"] = "J2000"
meta["RADECSYS"] = "icrs"
meta["CREATOR"] = "Gammapy {}".format(gammapy.__version__)
meta["EUNIT"] = "TeV"
meta["EVTVER"] = ""
meta["OBSERVER"] = "Gammapy user"
meta["DSTYP1"] = "TIME"
meta["DSUNI1"] = "s"
meta["DSVAL1"] = "TABLE"
meta["DSREF1"] = ":GTI"
meta["DSTYP2"] = "ENERGY"
meta["DSUNI2"] = "TeV"
meta[
"DSVAL2"
] = f'{dataset._geom.axes["energy"].edges.min().value}:{dataset._geom.axes["energy"].edges.max().value}' # noqa: E501
meta["DSTYP3"] = "POS(RA,DEC) "
offset_max = np.max(dataset._geom.width).to_value("deg")
meta[
"DSVAL3"
] = f"CIRCLE({observation.pointing_radec.ra.deg},{observation.pointing_radec.dec.deg},{offset_max})" # noqa: E501
meta["DSUNI3"] = "deg "
meta["NDSKEYS"] = " 3 "
# get first non background model component
for model in dataset.models:
if model is not dataset.background_model:
break
else:
model = None
if model:
meta["OBJECT"] = model.name
meta["RA_OBJ"] = model.position.icrs.ra.deg
meta["DEC_OBJ"] = model.position.icrs.dec.deg
meta["TELAPSE"] = dataset.gti.time_sum.to("s").value
meta["MJDREFI"] = int(dataset.gti.time_ref.mjd)
meta["MJDREFF"] = dataset.gti.time_ref.mjd % 1
meta["TIMEUNIT"] = "s"
meta["TIMESYS"] = dataset.gti.time_ref.scale
meta["TIMEREF"] = "LOCAL"
meta["DATE-OBS"] = dataset.gti.time_start.isot[0][0:10]
meta["DATE-END"] = dataset.gti.time_stop.isot[0][0:10]
meta["CONV_DEP"] = 0
meta["CONV_RA"] = 0
meta["CONV_DEC"] = 0
meta["NMCIDS"] = len(dataset.models)
# Necessary for DataStore, but they should be ALT and AZ instead!
telescope = observation.aeff.meta["TELESCOP"]
instrument = observation.aeff.meta["INSTRUME"]
if telescope == "CTA":
if instrument == "Southern Array":
loc = observatory_locations["cta_south"]
elif instrument == "Northern Array":
loc = observatory_locations["cta_north"]
else:
loc = observatory_locations["cta_south"]
else:
loc = observatory_locations[telescope.lower()]
# this is not really correct but maybe OK for now
altaz_frame = AltAz(obstime=dataset.gti.time_start, location=loc)
coord_altaz = observation.pointing_radec.transform_to(altaz_frame)
meta["ALT_PNT"] = str(coord_altaz.alt.deg[0])
meta["AZ_PNT"] = str(coord_altaz.az.deg[0])
# TO DO: these keywords should be taken from the IRF of the dataset
meta["ORIGIN"] = "Gammapy"
meta["TELESCOP"] = observation.aeff.meta["TELESCOP"]
meta["INSTRUME"] = observation.aeff.meta["INSTRUME"]
meta["N_TELS"] = ""
meta["TELLIST"] = ""
meta["CREATED"] = ""
meta["OBS_MODE"] = ""
meta["EV_CLASS"] = ""
return meta
[docs] def run(self, dataset, observation=None):
"""Run the event sampler, applying IRF corrections.
Parameters
----------
dataset : `~gammapy.datasets.MapDataset`
Map dataset
observation : `~gammapy.data.Observation`
In memory observation
edisp : Bool
Whether to include or exclude the Edisp in the simulation
Returns
-------
events : `~gammapy.data.EventList`
Event list
"""
if len(dataset.models) > 1:
events_src = self.sample_sources(dataset)
if len(events_src.table) > 0:
if dataset.psf:
events_src = self.sample_psf(dataset.psf, events_src)
else:
events_src.table["RA"] = events_src.table["RA_TRUE"]
events_src.table["DEC"] = events_src.table["DEC_TRUE"]
if dataset.edisp:
events_src = self.sample_edisp(dataset.edisp, events_src)
else:
events_src.table["ENERGY"] = events_src.table["ENERGY_TRUE"]
if dataset.background:
events_bkg = self.sample_background(dataset)
events = EventList.from_stack([events_bkg, events_src])
else:
events = events_src
if len(dataset.models) == 1 and dataset.background_model is not None:
events_bkg = self.sample_background(dataset)
events = EventList.from_stack([events_bkg])
events = self.event_det_coords(observation, events)
events.table["EVENT_ID"] = np.arange(len(events.table))
events.table.meta.update(self.event_list_meta(dataset, observation))
geom = dataset._geom
selection = geom.contains(events.map_coord(geom))
return events.select_row_subset(selection)