# Licensed under a 3-clause BSD style license - see LICENSE.rstimportcollections.abcimportcopyimporthtmlimportinspectimportitertoolsimportloggingimportwarningsfromitertoolsimportzip_longestimportnumpyasnpimportastropy.unitsasufromastropy.coordinatesimportSkyCoordfromastropy.ioimportfitsfromastropy.timeimportTimefromastropy.unitsimportQuantityfromastropy.utilsimportlazypropertyimportmatplotlib.pyplotaspltfromgammapy.utils.deprecationimportGammapyDeprecationWarning,deprecatedfromgammapy.utils.fitsimportLazyFitsData,earth_location_to_dictfromgammapy.utils.metadataimportCreatorMetaData,TargetMetaData,TimeInfoMetaDatafromgammapy.utils.scriptsimportmake_pathfromgammapy.utils.testingimportCheckerfromgammapy.utils.timeimporttime_ref_to_dict,time_relative_to_reffrom.event_listimportEventList,EventListCheckerfrom.filtersimportObservationFilterfrom.gtiimportGTIfrom.metadataimportObservationMetaDatafrom.pointingimportFixedPointingInfo__all__=["Observation","Observations"]log=logging.getLogger(__name__)
[docs]classObservation:"""In-memory observation. Parameters ---------- obs_id : int, optional Observation id. Default is None obs_info : dict, optional Observation info dict. Default is None. aeff : `~gammapy.irf.EffectiveAreaTable2D`, optional Effective area. Default is None. edisp : `~gammapy.irf.EnergyDispersion2D`, optional Energy dispersion. Default is None. psf : `~gammapy.irf.PSF3D`, optional Point spread function. Default is None. bkg : `~gammapy.irf.Background3D`, optional Background rate model. Default is None. rad_max : `~gammapy.irf.RadMax2D`, optional Only for point-like IRFs: RAD_MAX table (energy dependent RAD_MAX) For a fixed RAD_MAX, create a RadMax2D with a single bin. Default is None. gti : `~gammapy.data.GTI`, optional Table with GTI start and stop time. Default is None. events : `~gammapy.data.EventList`, optional Event list. Default is None. obs_filter : `ObservationFilter`, optional Observation filter. Default is None. pointing : `~gammapy.data.FixedPointingInfo`, optional Pointing information. Default is None. location : `~astropy.coordinates.EarthLocation`, optional Earth location of the observatory. Default is None. """aeff=LazyFitsData(cache=False)edisp=LazyFitsData(cache=False)psf=LazyFitsData(cache=False)bkg=LazyFitsData(cache=False)_rad_max=LazyFitsData(cache=True)_events=LazyFitsData(cache=False)_gti=LazyFitsData(cache=True)_pointing=LazyFitsData(cache=True)def__init__(self,obs_id=None,meta=None,obs_info=None,gti=None,aeff=None,edisp=None,psf=None,bkg=None,rad_max=None,events=None,obs_filter=None,pointing=None,location=None,):ifobs_infoisnotNone:warnings.warn("obs_info argument is deprecated since v1.2. Use meta instead.",GammapyDeprecationWarning,)ifmetaisNone:meta=ObservationMetaData.from_header(obs_info)self.obs_id=obs_idself.aeff=aeffself.edisp=edispself.psf=psfself.bkg=bkgself._rad_max=rad_maxself._gti=gtiself._events=eventsself._pointing=pointingself._location=location# this is part of the meta or is it data?self.obs_filter=obs_filterorObservationFilter()self._meta=metadef_repr_html_(self):try:returnself.to_html()exceptAttributeError:returnf"<pre>{html.escape(str(self))}</pre>"@propertydefmeta(self):"""Return metadata container."""ifself._metaisNoneandself.events:self._meta=ObservationMetaData.from_header(self.events.table.meta)returnself._meta@propertydefrad_max(self):"""Rad max IRF. None if not available."""# prevent circular importfromgammapy.irfimportRadMax2Difself._rad_maxisnotNone:returnself._rad_max# load once to avoid trigger lazy loading it three timesaeff=self.aeffifaeffisnotNoneandaeff.is_pointlike:self._rad_max=RadMax2D.from_irf(aeff)returnself._rad_maxedisp=self.edispifedispisnotNoneandedisp.is_pointlike:self._rad_max=RadMax2D.from_irf(self.edisp)returnself._rad_max@propertydefavailable_hdus(self):"""Which HDUs are available."""available_hdus=[]keys=["_events","_gti","aeff","edisp","psf","bkg","_rad_max"]hdus=["events","gti","aeff","edisp","psf","bkg","rad_max"]forkey,hduinzip(keys,hdus):available=self.__dict__.get(key,False)available_hdu=self.__dict__.get(f"_{hdu}_hdu",False)available_hdu_=self.__dict__.get(f"_{key}_hdu",False)ifavailableoravailable_hduoravailable_hdu_:available_hdus.append(hdu)returnavailable_hdus@propertydefavailable_irfs(self):"""Which IRFs are available."""return[_for_inself.available_hdusif_notin["events","gti"]]@propertydefevents(self):"""Event list of the observation as an `~gammapy.data.EventList`."""events=self.obs_filter.filter_events(self._events)returnevents@events.setterdefevents(self,value):ifnotisinstance(value,EventList):raiseTypeError(f"events must be an EventList instance, got: {type(value)}")self._events=value@propertydefgti(self):"""GTI of the observation as a `~gammapy.data.GTI`."""gti=self.obs_filter.filter_gti(self._gti)returngti@staticmethoddef_get_obs_info(pointing,deadtime_fraction,time_start,time_stop,reference_time,location):"""Create observation information dictionary from in memory data."""obs_info={"DEADC":1-deadtime_fraction,}ifisinstance(pointing,SkyCoord):obs_info["RA_PNT"]=pointing.icrs.ra.degobs_info["DEC_PNT"]=pointing.icrs.dec.degobs_info.update(time_ref_to_dict(reference_time))obs_info["TSTART"]=time_relative_to_ref(time_start,obs_info).to_value(u.s)obs_info["TSTOP"]=time_relative_to_ref(time_stop,obs_info).to_value(u.s)iflocationisnotNone:obs_info.update(earth_location_to_dict(location))returnobs_info
[docs]@classmethoddefcreate(cls,pointing,location=None,obs_id=0,livetime=None,tstart=None,tstop=None,irfs=None,deadtime_fraction=0.0,reference_time=Time("2000-01-01 00:00:00"),):"""Create an observation. User must either provide the livetime, or the start and stop times. Parameters ---------- pointing : `~gammapy.data.FixedPointingInfo` or `~astropy.coordinates.SkyCoord` Pointing information. location : `~astropy.coordinates.EarthLocation`, optional Earth location of the observatory. Default is None. obs_id : int, optional Observation ID as identifier. Default is 0. livetime : ~astropy.units.Quantity`, optional Livetime exposure of the simulated observation. Default is None. tstart : `~astropy.time.Time` or `~astropy.units.Quantity`, optional Start time of observation as `~astropy.time.Time` or duration relative to `reference_time`. Default is None. tstop : `astropy.time.Time` or `~astropy.units.Quantity`, optional Stop time of observation as `~astropy.time.Time` or duration relative to `reference_time`. Default is None. irfs : dict, optional IRFs used for simulating the observation: `bkg`, `aeff`, `psf`, `edisp`, `rad_max`. Default is None. deadtime_fraction : float, optional Deadtime fraction. Default is 0. reference_time : `~astropy.time.Time`, optional the reference time to use in GTI definition. Default is `~astropy.time.Time("2000-01-01 00:00:00")`. Returns ------- obs : `gammapy.data.MemoryObservation` Observation. """iftstartisNone:tstart=reference_time.copy()iftstopisNone:tstop=tstart+Quantity(livetime)gti=GTI.create(tstart,tstop,reference_time=reference_time)obs_info=cls._get_obs_info(pointing=pointing,deadtime_fraction=deadtime_fraction,time_start=gti.time_start[0],time_stop=gti.time_stop[0],reference_time=reference_time,location=location,)time_info=TimeInfoMetaData(time_start=gti.time_start[0],time_stop=gti.time_stop[-1],reference_time=reference_time,)meta=ObservationMetaData(deadtime_fraction=deadtime_fraction,location=location,time_info=time_info,creation=CreatorMetaData(),target=TargetMetaData(),)ifnotisinstance(pointing,FixedPointingInfo):warnings.warn("Pointing will be required to be provided as FixedPointingInfo",GammapyDeprecationWarning,)pointing=FixedPointingInfo.from_fits_header(obs_info)returncls(obs_id=obs_id,meta=meta,gti=gti,aeff=irfs.get("aeff"),bkg=irfs.get("bkg"),edisp=irfs.get("edisp"),psf=irfs.get("psf"),rad_max=irfs.get("rad_max"),pointing=pointing,location=location,)
@propertydeftstart(self):"""Observation start time as a `~astropy.time.Time` object."""returnself.gti.time_start[0]@propertydeftstop(self):"""Observation stop time as a `~astropy.time.Time` object."""returnself.gti.time_stop[0]@propertydeftmid(self):"""Midpoint between start and stop time as a `~astropy.time.Time` object."""returnself.tstart+0.5*(self.tstop-self.tstart)@propertydefobservation_time_duration(self):"""Observation time duration in seconds as a `~astropy.units.Quantity`. The wall time, including dead-time. """returnself.gti.time_sum@propertydefobservation_live_time_duration(self):"""Live-time duration in seconds as a `~astropy.units.Quantity`. The dead-time-corrected observation time. Computed as ``t_live = t_observation * (1 - f_dead)`` where ``f_dead`` is the dead-time fraction. """return(self.observation_time_duration*(1-self.observation_dead_time_fraction)*self.obs_filter.livetime_fraction)@propertydefobservation_dead_time_fraction(self):"""Dead-time fraction (float). Defined as dead-time over observation time. Dead-time is defined as the time during the observation where the detector didn't record events: https://en.wikipedia.org/wiki/Dead_time https://ui.adsabs.harvard.edu/abs/2004APh....22..285F The dead-time fraction is used in the live-time computation, which in turn is used in the exposure and flux computation. """returnself.meta.deadtime_fraction@propertydefobs_info(self):"""Observation information dictionary."""warnings.warn("obs_info property is deprecated since v1.2. Use meta instead.",GammapyDeprecationWarning,)returnself.meta.to_header()@propertydefpointing(self):"""Get the pointing for the observation as a `~gammapy.data.FixedPointingInfo` object."""ifself._pointingisNone:self._pointing=FixedPointingInfo.from_fits_header(self.events.table.meta)returnself._pointing
[docs]defget_pointing_altaz(self,time):"""Get the pointing in alt-az for given time."""returnself.pointing.get_altaz(time,self.observatory_earth_location)
[docs]defget_pointing_icrs(self,time):"""Get the pointing in ICRS for given time."""returnself.pointing.get_icrs(time,self.observatory_earth_location)
@propertydefobservatory_earth_location(self):"""Observatory location as an `~astropy.coordinates.EarthLocation` object."""returnself._location@lazypropertydeftarget_radec(self):"""Target RA / DEC sky coordinates as a `~astropy.coordinates.SkyCoord` object."""returnself.meta.target.position@property@deprecated("v1.2",message="Access additional metadata directly in obs.meta.opt.",)defmuoneff(self):"""Observation muon efficiency."""if"MUONEFF"inself.meta.optional:returnself.meta.optional["MUONEFF"]else:raiseKeyError("No muon efficiency information.")def__str__(self):pointing=self.get_pointing_icrs(self.tmid)ra=pointing.ra.degdec=pointing.dec.degpointing=f"{ra:.1f} deg, {dec:.1f} deg\n"# TODO: Which target was observed?# TODO: print info about available HDUs for this observation ...return(f"{self.__class__.__name__}\n\n"f"\tobs id : {self.obs_id}\n "f"\ttstart : {self.tstart.mjd:.2f}\n"f"\ttstop : {self.tstop.mjd:.2f}\n"f"\tduration : {self.observation_time_duration:.2f}\n"f"\tpointing (icrs) : {pointing}\n"f"\tdeadtime fraction : {self.observation_dead_time_fraction:.1%}\n")
[docs]defcheck(self,checks="all"):"""Run checks. This is a generator that yields a list of dictionary. """checker=ObservationChecker(self)returnchecker.run(checks=checks)
[docs]defpeek(self,figsize=(15,10)):"""Quick-look plots in a few panels. Parameters ---------- figsize : tuple, optional Figure size. Default is (15, 10). """plottable_hds=["events","aeff","psf","edisp","bkg","rad_max"]plot_hdus=list(set(plottable_hds)&set(self.available_hdus))plot_hdus.sort()n_irfs=len(plot_hdus)nrows=n_irfs//2ncols=2+n_irfs%2fig,axes=plt.subplots(nrows=nrows,ncols=ncols,figsize=figsize,gridspec_kw={"wspace":0.3,"hspace":0.3},)foridx,(ax,name)inenumerate(zip_longest(axes.flat,plot_hdus)):ifname=="aeff":self.aeff.plot(ax=ax)ax.set_title("Effective area")ifname=="bkg":bkg=self.bkgifnotbkg.has_offset_axis:bkg=bkg.to_2d()bkg.plot(ax=ax)ax.set_title("Background rate")ifname=="psf":self.psf.plot_containment_radius_vs_energy(ax=ax)ax.set_title("Point spread function")ifname=="edisp":self.edisp.plot_bias(ax=ax,add_cbar=True)ax.set_title("Energy dispersion")ifname=="rad_max":self.rad_max.plot_rad_max_vs_energy(ax=ax)ax.set_title("Rad max")ifname=="events":m=self.events._counts_image(allsky=False)ax.remove()ax=fig.add_subplot(nrows,ncols,idx+1,projection=m.geom.wcs)m.plot(ax=ax,stretch="sqrt",vmin=0,add_cbar=True)ax.set_title("Events")ifnameisNone:ax.set_visible(False)
[docs]defselect_time(self,time_interval):"""Select a time interval of the observation. Parameters ---------- time_interval : `astropy.time.Time` Start and stop time of the selected time interval. For now, we only support a single time interval. Returns ------- new_obs : `~gammapy.data.Observation` A new observation instance of the specified time interval. """new_obs_filter=self.obs_filter.copy()new_obs_filter.time_filter=time_intervalobs=copy.deepcopy(self)obs.obs_filter=new_obs_filterreturnobs
[docs]@classmethoddefread(cls,event_file,irf_file=None,checksum=False):"""Create an Observation from a Event List and an (optional) IRF file. Parameters ---------- event_file : str or `~pathlib.Path` Path to the FITS file containing the event list and the GTI. irf_file : str or `~pathlib.Path`, optional Path to the FITS file containing the IRF components. Default is None. If None, the IRFs will be read from the event file. checksum : bool If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False. Returns ------- observation : `~gammapy.data.Observation` Observation with the events and the IRF read from the file. """fromgammapy.irf.ioimportload_irf_dict_from_fileevents=EventList.read(event_file,checksum=checksum)gti=GTI.read(event_file,checksum=checksum)irf_file=irf_fileifirf_fileisnotNoneelseevent_fileirf_dict=load_irf_dict_from_file(irf_file)obs_info=events.table.metameta=ObservationMetaData.from_header(obs_info)returncls(events=events,gti=gti,obs_id=meta.obs_info.obs_id,pointing=FixedPointingInfo.from_fits_header(obs_info),meta=meta,location=meta.location,**irf_dict,)
[docs]defwrite(self,path,overwrite=False,format="gadf",include_irfs=True,checksum=False):""" Write this observation into `~pathlib.Path` using the specified format. Parameters ---------- path : str or `~pathlib.Path` Path for the output file. overwrite : bool, optional Overwrite existing file. Default is False. format : {"gadf"} Output format, currently only "gadf" is supported. Default is "gadf". include_irfs : bool, optional Whether to include irf components in the output file. Default is True. checksum : bool, optional When True adds both DATASUM and CHECKSUM cards to the headers written to the file. Default is False. """ifformat!="gadf":raiseValueError(f'Only the "gadf" format is supported, got {format}')path=make_path(path)primary=fits.PrimaryHDU()primary.header.update(self.meta.creation.to_header(format))hdul=fits.HDUList([primary])events=self.eventsifeventsisnotNone:events_hdu=events.to_table_hdu(format=format)events_hdu.header.update(self.pointing.to_fits_header())hdul.append(events_hdu)gti=self.gtiifgtiisnotNone:hdul.append(gti.to_table_hdu(format=format))ifinclude_irfs:forirf_nameinself.available_irfs:irf=getattr(self,irf_name)ifirfisnotNone:hdul.append(irf.to_table_hdu(format="gadf-dl3"))hdul.writeto(path,overwrite=overwrite,checksum=checksum)
[docs]defcopy(self,in_memory=False,**kwargs):"""Copy observation. Overwriting `Observation` arguments requires the ``in_memory`` argument to be true. Parameters ---------- in_memory : bool, optional Copy observation in memory. Default is False. **kwargs : dict, optional Keyword arguments passed to `Observation`. Examples -------- >>> from gammapy.data import Observation >>> obs = Observation.read("$GAMMAPY_DATA/hess-dl3-dr1/data/hess_dl3_dr1_obs_id_020136.fits.gz") >>> obs_copy = obs.copy(in_memory=True, obs_id=1234) >>> print(obs_copy) # doctest: +SKIP Returns ------- obs : `Observation` Copied observation. """ifin_memory:argnames=inspect.getfullargspec(self.__init__).args# TODO: remove once obs_info is removed from the list of arguments in __init__argnames.remove("obs_info")argnames.remove("self")fornameinargnames:ifname=="location":attr="observatory_earth_location"else:attr=namevalue=getattr(self,attr)kwargs.setdefault(name,copy.deepcopy(value))returnself.__class__(**kwargs)ifkwargs:raiseValueError("Overwriting arguments requires to set 'in_memory=True'")returncopy.deepcopy(self)
[docs]classObservations(collections.abc.MutableSequence):"""Container class that holds a list of observations. Parameters ---------- observations : list A list of `~gammapy.data.Observation`. """def__init__(self,observations=None):self._observations=[]ifobservationsisNone:observations=[]forobsinobservations:self.append(obs)def__getitem__(self,key):ifisinstance(key,slice):returnself.__class__(self._observations[key])else:returnself._observations[self.index(key)]def__delitem__(self,key):delself._observations[self.index(key)]def__setitem__(self,key,obs):ifisinstance(obs,Observation):ifobsinself:log.warning(f"Observation with obs_id {obs.obs_id} already belongs to Observations.")self._observations[self.index(key)]=obselse:raiseTypeError(f"Invalid type: {type(obs)!r}")
[docs]definsert(self,idx,obs):ifisinstance(obs,Observation):ifobsinself:log.warning(f"Observation with obs_id {obs.obs_id} already belongs to Observations.")self._observations.insert(idx,obs)else:raiseTypeError(f"Invalid type: {type(obs)!r}")
def__len__(self):returnlen(self._observations)def__str__(self):s=self.__class__.__name__+"\n"s+="Number of observations: {}\n".format(len(self))forobsinself:s+=str(obs)returns
@propertydefids(self):"""List of observation IDs (`list`)."""return[str(obs.obs_id)forobsinself]
[docs]defselect_time(self,time_intervals):"""Select a time interval of the observations. Parameters ---------- time_intervals : `astropy.time.Time` or list of `astropy.time.Time` List of start and stop time of the time intervals or one time interval. Returns ------- new_observations : `~gammapy.data.Observations` A new Observations instance of the specified time intervals. """new_obs_list=[]ifisinstance(time_intervals,Time):time_intervals=[time_intervals]fortime_intervalintime_intervals:forobsinself:if(obs.tstart<time_interval[1])&(obs.tstop>time_interval[0]):new_obs=obs.select_time(time_interval)new_obs_list.append(new_obs)returnself.__class__(new_obs_list)
def_ipython_key_completions_(self):returnself.ids
[docs]defgroup_by_label(self,labels):"""Split observations in multiple groups of observations. Parameters ---------- labels : list or `numpy.ndarray` Array of group labels. Returns ------- obs_clusters : dict of `~gammapy.data.Observations` Dictionary of Observations instance, one instance for each group. """obs_groups={}forlabelinnp.unique(labels):observations=self.__class__([obsfork,obsinenumerate(self)iflabels[k]==label])obs_groups[f"group_{label}"]=observationsreturnobs_groups
[docs]@classmethoddeffrom_stack(cls,observations_list):# TODO : Do more check when stacking observations when we have metadata."""Create a new `Observations` instance by concatenating a list of `Observations` objects. Parameters ---------- observations_list : list of `~gammapy.data.Observations` The list of `Observations` to stack. Returns ------- observations : `~gammapy.data.Observations` The `Observations` object resulting from stacking all the `Observations` in `observation_list`. """obs=itertools.chain(*observations_list)returncls(list(obs))
[docs]defin_memory_generator(self):"""A generator that iterates over observation. Yield an in memory copy of the observation."""forobsinself:obs_copy=obs.copy(in_memory=True)yieldobs_copy
classObservationChecker(Checker):"""Check an observation. Checks data format and a bit about the content. """CHECKS={"events":"check_events","gti":"check_gti","aeff":"check_aeff","edisp":"check_edisp","psf":"check_psf",}def__init__(self,observation):self.observation=observationdef_record(self,level="info",msg=None):return{"level":level,"obs_id":self.observation.obs_id,"msg":msg}defcheck_events(self):yieldself._record(level="debug",msg="Starting events check")try:events=self.observation.eventsexceptException:yieldself._record(level="warning",msg="Loading events failed")returnyield fromEventListChecker(events).run()# TODO: split this out into a GTICheckerdefcheck_gti(self):yieldself._record(level="debug",msg="Starting gti check")try:gti=self.observation.gtiexceptException:yieldself._record(level="warning",msg="Loading GTI failed")returniflen(gti.table)==0:yieldself._record(level="error",msg="GTI table has zero rows")columns_required=["START","STOP"]fornameincolumns_required:ifnamenotingti.table.colnames:yieldself._record(level="error",msg=f"Missing table column: {name!r}")# TODO: Check that header keywords agree with table entries# TSTART, TSTOP, MJDREFI, MJDREFF# Check that START and STOP times are consecutive# times = np.ravel(self.table['START'], self.table['STOP'])# # TODO: not sure this is correct ... add test with a multi-gti table from Fermi.# if not np.all(np.diff(times) >= 0):# yield 'GTIs are not consecutive or sorted.'# TODO: add reference times for all instruments and check for this# Use TELESCOP header key to check which instrument it is.def_check_times(self):"""Check if various times are consistent. The headers and tables of the FITS EVENTS and GTI extension contain various observation and event time information. """# http://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Cicerone/Cicerone_Data/Time_in_ScienceTools.html# https://hess-confluence.desy.de/confluence/display/HESS/HESS+FITS+data+-+References+and+checks#HESSFITSdata-Referencesandchecks-Timetelescope_met_refs={"FERMI":Time("2001-01-01T00:00:00"),"HESS":Time("2001-01-01T00:00:00"),}meta=self.dset.event_list.table.metatelescope=meta["TELESCOP"]iftelescopeintelescope_met_refs.keys():dt=self.time_ref-telescope_met_refs[telescope]ifdt>self.accuracy["time"]:yieldself._record(level="error",msg="Reference time incorrect for telescope")defcheck_aeff(self):yieldself._record(level="debug",msg="Starting aeff check")try:aeff=self.observation.aeffexceptException:yieldself._record(level="warning",msg="Loading aeff failed")return# Check that thresholds are meaningful for aeffif("LO_THRES"inaeff.metaand"HI_THRES"inaeff.metaandaeff.meta["LO_THRES"]>=aeff.meta["HI_THRES"]):yieldself._record(level="error",msg="LO_THRES >= HI_THRES in effective area meta data")# Check that data isn't all nullifnp.max(aeff.data.data)<=0:yieldself._record(level="error",msg="maximum entry of effective area is <= 0")defcheck_edisp(self):yieldself._record(level="debug",msg="Starting edisp check")try:edisp=self.observation.edispexceptException:yieldself._record(level="warning",msg="Loading edisp failed")return# Check that data isn't all nullifnp.max(edisp.data.data)<=0:yieldself._record(level="error",msg="maximum entry of edisp is <= 0")defcheck_psf(self):yieldself._record(level="debug",msg="Starting psf check")try:self.observation.psfexceptException:yieldself._record(level="warning",msg="Loading psf failed")return