# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Simulate observations."""importnumpyasnpimportastropy.unitsasufromastropy.coordinatesimportSkyCoord,SkyOffsetFramefromastropy.tableimportTablefromastropy.timeimportTimeimportgammapyfromgammapy.dataimportEventList,observatory_locationsfromgammapy.mapsimportMapAxis,MapCoord,RegionNDMap,TimeMapAxisfromgammapy.modeling.modelsimport(ConstantSpectralModel,ConstantTemporalModel,PointSpatialModel,)fromgammapy.utils.randomimportget_random_state__all__=["MapDatasetEventSampler"]
[docs]classMapDatasetEventSampler:"""Sample events from a map dataset. Parameters ---------- random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation via the `~gammapy.utils.random.get_random_state` function oversample_energy_factor: {int} Defines an oversampling factor for the energies; it is used only when sampling an energy-dependent time-varying source t_delta : `~astropy.units.Quantity` Time interval used to sample the time-dependent source """def__init__(self,random_state="random-seed",oversample_energy_factor=10,t_delta=0.5*u.s):self.random_state=get_random_state(random_state)self.oversample_energy_factor=oversample_energy_factorself.t_delta=t_deltadef_make_table(self,coords,time_ref):"""Create a table for sampled events. Parameters ---------- coords : `~gammapy.maps.MapCoord` Coordinates of the sampled events time_ref : `~astropy.time.Time` Reference time of the event list Returns ------- table : `~astropy.table.Table` Table of the sampled events """table=Table()try:energy=coords["energy_true"]exceptKeyError:energy=coords["energy"]table["TIME"]=(coords["time"]-time_ref).to("s")table["ENERGY_TRUE"]=energytable["RA_TRUE"]=coords.skycoord.icrs.ra.to("deg")table["DEC_TRUE"]=coords.skycoord.icrs.dec.to("deg")returntabledef_evaluate_timevar_source(self,dataset,model,):"""Calculate Npred for a given `dataset.model` by evaluating it on a region geometry. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Map dataset model : `~gammapy.modeling.models.SkyModel` Sky model instance Returns ------- npred : `~gammapy.maps.RegionNDMap` Npred map """energy_true=dataset.edisp.edisp_map.geom.axes["energy_true"]energy_new=energy_true.upsample(self.oversample_energy_factor)position=model.spatial_model.positionregion_exposure=dataset.exposure.to_region_nd_map(position)time_axis_eval=TimeMapAxis.from_gti_bounds(gti=dataset.gti,t_delta=self.t_delta)time_min,time_max=time_axis_eval.time_boundstime_axis=MapAxis.from_bounds(time_min.mjd*u.d,time_max.mjd*u.d,nbin=time_axis_eval.nbin,name="time",)temp_eval=model.temporal_model.evaluate(time_axis_eval.time_mid,energy=energy_new.center)norm_parameters=model.spectral_model.parameters.norm_parametersnorm=norm_parameters[0].quantityiftemp_eval.unit.is_equivalent(norm.unit):flux_diff=temp_eval.to(norm.unit)else:flux_diff=temp_eval*normflux_inte=flux_diff*energy_new.bin_width[:,None]flux_pred=RegionNDMap.create(region=position,axes=[time_axis,energy_new],data=np.array(flux_inte),unit=flux_inte.unit,)mapcoord=flux_pred.geom.get_coord()mapcoord["energy_true"]=energy_true.center[:,None,None,None]flux_values=flux_pred.interp_by_coord(mapcoord)*flux_pred.unitdata=flux_values*region_exposure.quantity[:,None,:,:]data/=time_axis.nbin/self.oversample_energy_factornpred=RegionNDMap.create(region=position,axes=[time_axis,energy_true],data=data.to_value(""),)returnnpreddef_sample_coord_time_energy(self,dataset,model):"""Sample model components of a source with time-dependent spectrum. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Map dataset model : `~gammapy.modeling.models.SkyModel` Sky model instance Returns ------- table : `~astropy.table.Table` Table of sampled events """ifnotisinstance(model.spatial_model,PointSpatialModel):raiseTypeError(f"Event sampler expects PointSpatialModel for a time varying source. Got {model.spatial_model} instead.")ifnotisinstance(model.spectral_model,ConstantSpectralModel):raiseTypeError(f"Event sampler expects ConstantSpectralModel for a time varying source. Got {model.spectral_model} instead.")npred=self._evaluate_timevar_source(dataset,model=model)data=npred.data[np.isfinite(npred.data)]n_events=self.random_state.poisson(np.sum(data))coords=npred.sample_coord(n_events=n_events,random_state=self.random_state)coords["time"]=Time(coords["time"],format="mjd",scale="tt")table=self._make_table(coords,dataset.gti.time_ref)returntabledef_sample_coord_time(self,npred,temporal_model,gti):"""Sample model components of a time-varying source. Parameters ---------- npred : `~gammapy.dataset.MapDataset` Npred map temporal_model : `~gammapy.modeling.models\ Temporal model of the source gti : `~gammapy.data.GTI` GTI of the dataset Returns ------- table : `~astropy.table.Table` Table of sampled events """data=npred.data[np.isfinite(npred.data)]n_events=self.random_state.poisson(np.sum(data))coords=npred.sample_coord(n_events=n_events,random_state=self.random_state)time_start,time_stop,time_ref=(gti.time_start,gti.time_stop,gti.time_ref)coords["time"]=temporal_model.sample_time(n_events=n_events,t_min=time_start,t_max=time_stop,random_state=self.random_state,t_delta=self.t_delta,)table=self._make_table(coords,time_ref)returntable
[docs]defsample_sources(self,dataset):"""Sample source model components. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Map dataset Returns ------- events : `~gammapy.data.EventList` Event list """events_all=[]foridx,evaluatorinenumerate(dataset.evaluators.values()):ifevaluator.needs_update:evaluator.update(dataset.exposure,dataset.psf,dataset.edisp,dataset._geom,dataset.mask,)ifevaluator.model.temporal_modelisNone:temporal_model=ConstantTemporalModel()else:temporal_model=evaluator.model.temporal_modeliftemporal_model.is_energy_dependent:table=self._sample_coord_time_energy(dataset,evaluator.model)else:flux=evaluator.compute_flux()npred=evaluator.apply_exposure(flux)table=self._sample_coord_time(npred,temporal_model,dataset.gti)iflen(table)==0:mcid=table.Column(name="MC_ID",length=0,dtype=int)table.add_column(mcid)table["MC_ID"]=idx+1table.meta["MID{:05d}".format(idx+1)]=idx+1table.meta["MMN{:05d}".format(idx+1)]=evaluator.model.nameevents_all.append(EventList(table))returnEventList.from_stack(events_all)
[docs]defsample_edisp(self,edisp_map,events):"""Sample energy dispersion map. Parameters ---------- edisp_map : `~gammapy.irf.EDispMap` Energy dispersion map events : `~gammapy.data.EventList` Event list with the true energies Returns ------- events : `~gammapy.data.EventList` Event list with reconstructed energy column """coord=MapCoord({"lon":events.table["RA_TRUE"].quantity,"lat":events.table["DEC_TRUE"].quantity,"energy_true":events.table["ENERGY_TRUE"].quantity,},frame="icrs",)coords_reco=edisp_map.sample_coord(coord,self.random_state)events.table["ENERGY"]=coords_reco["energy"]returnevents
[docs]defsample_psf(self,psf_map,events):"""Sample psf map. Parameters ---------- psf_map : `~gammapy.irf.PSFMap` PSF map events : `~gammapy.data.EventList` Event list Returns ------- events : `~gammapy.data.EventList` Event list with reconstructed position columns """coord=MapCoord({"lon":events.table["RA_TRUE"].quantity,"lat":events.table["DEC_TRUE"].quantity,"energy_true":events.table["ENERGY_TRUE"].quantity,},frame="icrs",)coords_reco=psf_map.sample_coord(coord,self.random_state)events.table["RA"]=coords_reco["lon"]*u.degevents.table["DEC"]=coords_reco["lat"]*u.degreturnevents
[docs]@staticmethoddefevent_det_coords(observation,events):"""Add columns of detector coordinates (DETX-DETY) to the event list. Parameters ---------- observation : `~gammapy.data.Observation` In memory observation events : `~gammapy.data.EventList` Event list Returns ------- events : `~gammapy.data.EventList` Event list with columns of event detector coordinates """sky_coord=SkyCoord(events.table["RA"],events.table["DEC"],frame="icrs")frame=SkyOffsetFrame(origin=observation.get_pointing_icrs(observation.tmid))pseudo_fov_coord=sky_coord.transform_to(frame)events.table["DETX"]=pseudo_fov_coord.lonevents.table["DETY"]=pseudo_fov_coord.latreturnevents
[docs]@staticmethoddefevent_list_meta(dataset,observation):"""Event list meta info. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Map dataset observation : `~gammapy.data.Observation` In memory observation Returns ------- meta : dict Meta dictionary """# See: https://gamma-astro-data-formats.readthedocs.io/en/latest/events/events.html#mandatory-header-keywords # noqa: E501meta={}meta["HDUCLAS1"]="EVENTS"meta["EXTNAME"]="EVENTS"meta["HDUDOC"]="https://github.com/open-gamma-ray-astro/gamma-astro-data-formats"meta["HDUVERS"]="0.2"meta["HDUCLASS"]="GADF"meta["OBS_ID"]=observation.obs_idmeta["TSTART"]=(observation.tstart-dataset.gti.time_ref).to_value("s")meta["TSTOP"]=(observation.tstop-dataset.gti.time_ref).to_value("s")meta["ONTIME"]=observation.observation_time_duration.to("s").valuemeta["LIVETIME"]=observation.observation_live_time_duration.to("s").valuemeta["DEADC"]=1-observation.observation_dead_time_fractionfixed_icrs=observation.pointing.fixed_icrsmeta["RA_PNT"]=fixed_icrs.ra.degmeta["DEC_PNT"]=fixed_icrs.dec.degmeta["EQUINOX"]="J2000"meta["RADECSYS"]="icrs"meta["CREATOR"]="Gammapy {}".format(gammapy.__version__)meta["EUNIT"]="TeV"meta["EVTVER"]=""meta["OBSERVER"]="Gammapy user"meta["DSTYP1"]="TIME"meta["DSUNI1"]="s"meta["DSVAL1"]="TABLE"meta["DSREF1"]=":GTI"meta["DSTYP2"]="ENERGY"meta["DSUNI2"]="TeV"meta["DSVAL2"]=f'{dataset._geom.axes["energy"].edges.min().value}:{dataset._geom.axes["energy"].edges.max().value}'# noqa: E501meta["DSTYP3"]="POS(RA,DEC) "offset_max=np.max(dataset._geom.width).to_value("deg")meta["DSVAL3"]=f"CIRCLE({fixed_icrs.ra.deg},{fixed_icrs.dec.deg},{offset_max})"# noqa: E501meta["DSUNI3"]="deg "meta["NDSKEYS"]=" 3 "# get first non background model componentformodelindataset.models:ifmodelisnotdataset.background_model:breakelse:model=Noneifmodel:meta["OBJECT"]=model.namemeta["RA_OBJ"]=model.position.icrs.ra.degmeta["DEC_OBJ"]=model.position.icrs.dec.degmeta["TELAPSE"]=dataset.gti.time_sum.to("s").valuemeta["MJDREFI"]=int(dataset.gti.time_ref.mjd)meta["MJDREFF"]=dataset.gti.time_ref.mjd%1meta["TIMEUNIT"]="s"meta["TIMESYS"]=dataset.gti.time_ref.scalemeta["TIMEREF"]="LOCAL"meta["DATE-OBS"]=dataset.gti.time_start.isot[0][0:10]meta["DATE-END"]=dataset.gti.time_stop.isot[0][0:10]meta["CONV_DEP"]=0meta["CONV_RA"]=0meta["CONV_DEC"]=0meta["NMCIDS"]=len(dataset.models)# Necessary for DataStore, but they should be ALT and AZ instead!telescope=observation.aeff.meta["TELESCOP"]instrument=observation.aeff.meta["INSTRUME"]iftelescope=="CTA":ifinstrument=="Southern Array":loc=observatory_locations["cta_south"]elifinstrument=="Northern Array":loc=observatory_locations["cta_north"]else:loc=observatory_locations["cta_south"]else:loc=observatory_locations[telescope.lower()]# this is not really correct but maybe OK for nowcoord_altaz=observation.pointing.get_altaz(dataset.gti.time_start,loc)meta["ALT_PNT"]=str(coord_altaz.alt.deg[0])meta["AZ_PNT"]=str(coord_altaz.az.deg[0])# TO DO: these keywords should be taken from the IRF of the datasetmeta["ORIGIN"]="Gammapy"meta["TELESCOP"]=observation.aeff.meta["TELESCOP"]meta["INSTRUME"]=observation.aeff.meta["INSTRUME"]meta["N_TELS"]=""meta["TELLIST"]=""meta["CREATED"]=""meta["OBS_MODE"]=""meta["EV_CLASS"]=""returnmeta
[docs]defrun(self,dataset,observation=None):"""Run the event sampler, applying IRF corrections. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Map dataset observation : `~gammapy.data.Observation` In memory observation edisp : Bool Whether to include or exclude the Edisp in the simulation Returns ------- events : `~gammapy.data.EventList` Event list """iflen(dataset.models)>1:events_src=self.sample_sources(dataset)iflen(events_src.table)>0:ifdataset.psf:events_src=self.sample_psf(dataset.psf,events_src)else:events_src.table["RA"]=events_src.table["RA_TRUE"]events_src.table["DEC"]=events_src.table["DEC_TRUE"]ifdataset.edisp:events_src=self.sample_edisp(dataset.edisp,events_src)else:events_src.table["ENERGY"]=events_src.table["ENERGY_TRUE"]ifdataset.background:events_bkg=self.sample_background(dataset)events=EventList.from_stack([events_bkg,events_src])else:events=events_srciflen(dataset.models)==1anddataset.background_modelisnotNone:events_bkg=self.sample_background(dataset)events=EventList.from_stack([events_bkg])events=self.event_det_coords(observation,events)events.table["EVENT_ID"]=np.arange(len(events.table))events.table.meta.update(self.event_list_meta(dataset,observation))geom=dataset._geomselection=geom.contains(events.map_coord(geom))returnevents.select_row_subset(selection)