# Licensed under a 3-clause BSD style license - see LICENSE.rstimportcollections.abcimportcopyimportinspectimportloggingimportwarningsfromitertoolsimportzip_longestimportnumpyasnpimportastropy.unitsasufromastropy.coordinatesimportSkyCoordfromastropy.ioimportfitsfromastropy.timeimportTimefromastropy.unitsimportQuantityfromastropy.utilsimportlazypropertyimportmatplotlib.pyplotaspltfromgammapyimport__version__fromgammapy.utils.deprecationimportGammapyDeprecationWarning,deprecatedfromgammapy.utils.fitsimportLazyFitsData,earth_location_to_dictfromgammapy.utils.scriptsimportmake_pathfromgammapy.utils.testingimportCheckerfromgammapy.utils.timeimporttime_ref_to_dict,time_relative_to_reffrom.event_listimportEventList,EventListCheckerfrom.filtersimportObservationFilterfrom.gtiimportGTIfrom.pointingimportFixedPointingInfo__all__=["Observation","Observations"]log=logging.getLogger(__name__)
[docs]classObservation:"""In-memory observation. Parameters ---------- obs_id : int Observation id obs_info : dict Observation info dict aeff : `~gammapy.irf.EffectiveAreaTable2D` Effective area edisp : `~gammapy.irf.EnergyDispersion2D` Energy dispersion psf : `~gammapy.irf.PSF3D` Point spread function bkg : `~gammapy.irf.Background3D` Background rate model rad_max: `~gammapy.irf.RadMax2D` Only for point-like IRFs: RAD_MAX table (energy dependent RAD_MAX) For a fixed RAD_MAX, create a RadMax2D with a single bin. gti : `~gammapy.data.GTI` Table with GTI start and stop time events : `~gammapy.data.EventList` Event list obs_filter : `ObservationFilter` Observation filter. """aeff=LazyFitsData(cache=False)edisp=LazyFitsData(cache=False)psf=LazyFitsData(cache=False)bkg=LazyFitsData(cache=False)_rad_max=LazyFitsData(cache=False)_events=LazyFitsData(cache=False)_gti=LazyFitsData(cache=False)_pointing=LazyFitsData(cache=True)def__init__(self,obs_id=None,obs_info=None,gti=None,aeff=None,edisp=None,psf=None,bkg=None,rad_max=None,events=None,obs_filter=None,pointing=None,location=None,):self.obs_id=obs_idself._obs_info=obs_infoself.aeff=aeffself.edisp=edispself.psf=psfself.bkg=bkgself._rad_max=rad_maxself._gti=gtiself._events=eventsself._pointing=pointingself._location=locationself.obs_filter=obs_filterorObservationFilter()@propertydefrad_max(self):# prevent circular importfromgammapy.irfimportRadMax2Difself._rad_maxisnotNone:returnself._rad_max# load once to avoid trigger lazy loading it three timesaeff=self.aeffifaeffisnotNoneandaeff.is_pointlike:self._rad_max=RadMax2D.from_irf(aeff)returnself._rad_maxedisp=self.edispifedispisnotNoneandedisp.is_pointlike:self._rad_max=RadMax2D.from_irf(self.edisp)returnself._rad_max@propertydefavailable_hdus(self):"""Which HDUs are available"""available_hdus=[]keys=["_events","_gti","aeff","edisp","psf","bkg","_rad_max"]hdus=["events","gti","aeff","edisp","psf","bkg","rad_max"]forkey,hduinzip(keys,hdus):available=self.__dict__.get(key,False)available_hdu=self.__dict__.get(f"_{hdu}_hdu",False)available_hdu_=self.__dict__.get(f"_{key}_hdu",False)ifavailableoravailable_hduoravailable_hdu_:available_hdus.append(hdu)returnavailable_hdus@propertydefavailable_irfs(self):"""Which IRFs are available"""return[_for_inself.available_hdusif_notin["events","gti"]]@propertydefevents(self):events=self.obs_filter.filter_events(self._events)returnevents@propertydefgti(self):gti=self.obs_filter.filter_gti(self._gti)returngti@staticmethoddef_get_obs_info(pointing,deadtime_fraction,time_start,time_stop,reference_time,location):"""Create obs info dict from in memory data"""obs_info={"DEADC":1-deadtime_fraction,}ifisinstance(pointing,SkyCoord):obs_info["RA_PNT"]=pointing.icrs.ra.degobs_info["DEC_PNT"]=pointing.icrs.dec.degobs_info.update(time_ref_to_dict(reference_time))obs_info["TSTART"]=time_relative_to_ref(time_start,obs_info).to_value(u.s)obs_info["TSTOP"]=time_relative_to_ref(time_stop,obs_info).to_value(u.s)iflocationisnotNone:obs_info.update(earth_location_to_dict(location))returnobs_info
[docs]@classmethoddefcreate(cls,pointing,location=None,obs_id=0,livetime=None,tstart=None,tstop=None,irfs=None,deadtime_fraction=0.0,reference_time=Time("2000-01-01 00:00:00"),):"""Create an observation. User must either provide the livetime, or the start and stop times. Parameters ---------- pointing : `~gammapy.data.FixedPointingInfo` or `~astropy.coordinates.SkyCoord` Pointing information obs_id : int Observation ID as identifier livetime : ~astropy.units.Quantity` Livetime exposure of the simulated observation tstart: `~astropy.time.Time` or `~astropy.units.Quantity` Start time of observation as `~astropy.time.Time` or duration relative to `reference_time` tstop: `astropy.time.Time` or `~astropy.units.Quantity` Stop time of observation as `~astropy.time.Time` or duration relative to `reference_time` irfs: dict IRFs used for simulating the observation: `bkg`, `aeff`, `psf`, `edisp`, `rad_max` deadtime_fraction : float, optional Deadtime fraction, defaults to 0 reference_time : `~astropy.time.Time` the reference time to use in GTI definition Returns ------- obs : `gammapy.data.MemoryObservation` """iftstartisNone:tstart=reference_time.copy()iftstopisNone:tstop=tstart+Quantity(livetime)gti=GTI.create(tstart,tstop,reference_time=reference_time)obs_info=cls._get_obs_info(pointing=pointing,deadtime_fraction=deadtime_fraction,time_start=gti.time_start[0],time_stop=gti.time_stop[0],reference_time=reference_time,location=location,)ifnotisinstance(pointing,FixedPointingInfo):warnings.warn("Pointing will be required to be provided as FixedPointingInfo",GammapyDeprecationWarning,)pointing=FixedPointingInfo.from_fits_header(obs_info)returncls(obs_id=obs_id,obs_info=obs_info,gti=gti,aeff=irfs.get("aeff"),bkg=irfs.get("bkg"),edisp=irfs.get("edisp"),psf=irfs.get("psf"),rad_max=irfs.get("rad_max"),pointing=pointing,location=location,)
@propertydeftstart(self):"""Observation start time (`~astropy.time.Time`)."""returnself.gti.time_start[0]@propertydeftstop(self):"""Observation stop time (`~astropy.time.Time`)."""returnself.gti.time_stop[0]@propertydeftmid(self):"""Midpoint between start and stop time"""returnself.tstart+0.5*(self.tstop-self.tstart)@propertydefobservation_time_duration(self):"""Observation time duration in seconds (`~astropy.units.Quantity`). The wall time, including dead-time. """returnself.gti.time_sum@propertydefobservation_live_time_duration(self):"""Live-time duration in seconds (`~astropy.units.Quantity`). The dead-time-corrected observation time. Computed as ``t_live = t_observation * (1 - f_dead)`` where ``f_dead`` is the dead-time fraction. """return(self.observation_time_duration*(1-self.observation_dead_time_fraction)*self.obs_filter.livetime_fraction)@propertydefobservation_dead_time_fraction(self):"""Dead-time fraction (float). Defined as dead-time over observation time. Dead-time is defined as the time during the observation where the detector didn't record events: https://en.wikipedia.org/wiki/Dead_time https://ui.adsabs.harvard.edu/abs/2004APh....22..285F The dead-time fraction is used in the live-time computation, which in turn is used in the exposure and flux computation. """return1-self.obs_info["DEADC"]@lazypropertydefobs_info(self):"""Observation info dictionary."""meta=self._obs_info.copy()ifself._obs_infoisnotNoneelse{}ifself.eventsisnotNone:meta.update({k:vfork,vinself.events.table.meta.items()ifnotk.startswith("HDU")})returnmeta@propertydefpointing(self):ifself._pointingisNone:self._pointing=FixedPointingInfo.from_fits_header(self.obs_info)returnself._pointing
[docs]defget_pointing_altaz(self,time):"""Get the pointing in altaz for given time"""returnself.pointing.get_altaz(time,self.observatory_earth_location)
[docs]defget_pointing_icrs(self,time):"""Get the pointing in icrs for given time"""returnself.pointing.get_icrs(time,self.observatory_earth_location)
@lazyproperty@deprecated("v1.1",message="Use observation.pointing or observation.get_pointing_{{altaz,icrs}} instead",)deffixed_pointing_info(self):"""Fixed pointing info for this observation (`FixedPointingInfo`)."""returnself._pointing@property@deprecated("v1.1",message="Use observation.get_pointing_icrs(time) instead")defpointing_radec(self):"""Pointing RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`)."""returnself.fixed_pointing_info.radec@property@deprecated("v1.1",message="Use observation.get_pointing_altaz(time) instead")defpointing_altaz(self):returnself.fixed_pointing_info.altaz@property@deprecated("v1.1",message="Use observation.get_pointing_altaz(time).zen instead")defpointing_zen(self):"""Pointing zenith angle sky (`~astropy.units.Quantity`)."""returnself.get_pointing_altaz(self.tmid).zen@propertydefobservatory_earth_location(self):"""Observatory location (`~astropy.coordinates.EarthLocation`)."""ifself._locationisnotNone:returnself._location# for now we catch the deprecation warning until we store location on the observation properlywithwarnings.catch_warnings():warnings.simplefilter("ignore",category=GammapyDeprecationWarning)returnself.fixed_pointing_info.location@lazypropertydeftarget_radec(self):"""Target RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`)."""lon,lat=(self.obs_info.get("RA_OBJ",np.nan),self.obs_info.get("DEC_OBJ",np.nan),)returnSkyCoord(lon,lat,unit="deg",frame="icrs")@propertydefmuoneff(self):"""Observation muon efficiency."""returnself.obs_info.get("MUONEFF",1)def__str__(self):pointing=self.get_pointing_icrs(self.tmid)ra=pointing.ra.degdec=pointing.dec.degpointing=f"{ra:.1f} deg, {dec:.1f} deg\n"# TODO: Which target was observed?# TODO: print info about available HDUs for this observation ...return(f"{self.__class__.__name__}\n\n"f"\tobs id : {self.obs_id}\n "f"\ttstart : {self.tstart.mjd:.2f}\n"f"\ttstop : {self.tstop.mjd:.2f}\n"f"\tduration : {self.observation_time_duration:.2f}\n"f"\tpointing (icrs) : {pointing}\n"f"\tdeadtime fraction : {self.observation_dead_time_fraction:.1%}\n")
[docs]defcheck(self,checks="all"):"""Run checks. This is a generator that yields a list of dicts. """checker=ObservationChecker(self)returnchecker.run(checks=checks)
[docs]defpeek(self,figsize=(15,10)):"""Quick-look plots in a few panels. Parameters ---------- figsize : tuple Figure size """plottable_hds=["events","aeff","psf","edisp","bkg","rad_max"]plot_hdus=list(set(plottable_hds)&set(self.available_hdus))plot_hdus.sort()n_irfs=len(plot_hdus)nrows=n_irfs//2ncols=2+n_irfs%2fig,axes=plt.subplots(nrows=nrows,ncols=ncols,figsize=figsize,gridspec_kw={"wspace":0.3,"hspace":0.3},)foridx,(ax,name)inenumerate(zip_longest(axes.flat,plot_hdus)):ifname=="aeff":self.aeff.plot(ax=ax)ax.set_title("Effective area")ifname=="bkg":bkg=self.bkgifnotbkg.has_offset_axis:bkg=bkg.to_2d()bkg.plot(ax=ax)ax.set_title("Background rate")ifname=="psf":self.psf.plot_containment_radius_vs_energy(ax=ax)ax.set_title("Point spread function")ifname=="edisp":self.edisp.plot_bias(ax=ax,add_cbar=True)ax.set_title("Energy dispersion")ifname=="rad_max":self.rad_max.plot_rad_max_vs_energy(ax=ax)ax.set_title("Rad max")ifname=="events":m=self.events._counts_image(allsky=False)ax.remove()ax=fig.add_subplot(nrows,ncols,idx+1,projection=m.geom.wcs)m.plot(ax=ax,stretch="sqrt",vmin=0,add_cbar=True)ax.set_title("Events")ifnameisNone:ax.set_visible(False)
[docs]defselect_time(self,time_interval):"""Select a time interval of the observation. Parameters ---------- time_interval : `astropy.time.Time` Start and stop time of the selected time interval. For now we only support a single time interval. Returns ------- new_obs : `~gammapy.data.Observation` A new observation instance of the specified time interval """new_obs_filter=self.obs_filter.copy()new_obs_filter.time_filter=time_intervalobs=copy.deepcopy(self)obs.obs_filter=new_obs_filterreturnobs
[docs]@classmethoddefread(cls,event_file,irf_file=None):"""Create an Observation from a Event List and an (optional) IRF file. Parameters ---------- event_file : str, Path path to the .fits file containing the event list and the GTI irf_file : str, Path (optional) path to the .fits file containing the IRF components, if not provided the IRF will be read from the event file Returns ------- observation : `~gammapy.data.Observation` observation with the events and the irf read from the file """fromgammapy.irf.ioimportload_irf_dict_from_fileevents=EventList.read(event_file)gti=GTI.read(event_file)irf_file=irf_fileifirf_fileisnotNoneelseevent_fileirf_dict=load_irf_dict_from_file(irf_file)obs_info=events.table.metareturncls(events=events,gti=gti,obs_info=obs_info,obs_id=obs_info.get("OBS_ID"),pointing=FixedPointingInfo.from_fits_header(obs_info),**irf_dict,)
[docs]defwrite(self,path,overwrite=False,format="gadf",include_irfs=True):""" Write this observation into `path` using the specified format Parameters ---------- path: str or `~pathlib.Path` Path for the output file overwrite: bool If true, existing files are overwritten. format: str Output format, currently only "gadf" is supported include_irfs: bool Whether to include irf components in the output file """ifformat!="gadf":raiseValueError(f'Only the "gadf" format supported, got {format}')path=make_path(path)primary=fits.PrimaryHDU()primary.header["CREATOR"]=f"Gammapy {__version__}"primary.header["DATE"]=Time.now().isohdul=fits.HDUList([primary])events=self.eventsifeventsisnotNone:events_hdu=events.to_table_hdu(format=format)events_hdu.header.update(self.pointing.to_fits_header())hdul.append(events_hdu)gti=self.gtiifgtiisnotNone:hdul.append(gti.to_table_hdu(format=format))ifinclude_irfs:forirf_nameinself.available_irfs:irf=getattr(self,irf_name)ifirfisnotNone:hdul.append(irf.to_table_hdu(format="gadf-dl3"))hdul.writeto(path,overwrite=overwrite)
[docs]defcopy(self,in_memory=False,**kwargs):"""Copy observation Overwriting arguments requires the 'in_memory` argument to be true. Parameters ---------- in_memory : bool Copy observation in memory. **kwargs : dict Keyword arguments passed to `Observation` Examples -------- .. code:: from gammapy.data import Observation obs = Observation.read( "$GAMMAPY_DATA/hess-dl3-dr1/data/hess_dl3_dr1_obs_id_020136.fits.gz" ) obs_copy = obs.copy(obs_id=1234) print(obs_copy) Returns ------- obs : `Observation` Copied observation """ifin_memory:argnames=inspect.getfullargspec(self.__init__).argsargnames.remove("self")fornameinargnames:ifname=="location":attr="observatory_earth_location"else:attr=namevalue=getattr(self,attr)kwargs.setdefault(name,copy.deepcopy(value))returnself.__class__(**kwargs)ifkwargs:raiseValueError("Overwriting arguments requires to set 'in_memory=True'")returncopy.deepcopy(self)
[docs]classObservations(collections.abc.MutableSequence):"""Container class that holds a list of observations. Parameters ---------- observations : list A list of `~gammapy.data.Observation` """def__init__(self,observations=None):self._observations=observationsor[]def__getitem__(self,key):returnself._observations[self.index(key)]def__delitem__(self,key):delself._observations[self.index(key)]def__setitem__(self,key,obs):ifisinstance(obs,Observation):self._observations[self.index(key)]=obselse:raiseTypeError(f"Invalid type: {type(obs)!r}")
def__len__(self):returnlen(self._observations)def__str__(self):s=self.__class__.__name__+"\n"s+="Number of observations: {}\n".format(len(self))forobsinself:s+=str(obs)returns
@propertydefids(self):"""List of obs IDs (`list`)"""return[str(obs.obs_id)forobsinself]
[docs]defselect_time(self,time_intervals):"""Select a time interval of the observations. Parameters ---------- time_intervals : `astropy.time.Time` or list of `astropy.time.Time` list of Start and stop time of the time intervals or one Time interval Returns ------- new_observations : `~gammapy.data.Observations` A new Observations instance of the specified time intervals """new_obs_list=[]ifisinstance(time_intervals,Time):time_intervals=[time_intervals]fortime_intervalintime_intervals:forobsinself:if(obs.tstart<time_interval[1])&(obs.tstop>time_interval[0]):new_obs=obs.select_time(time_interval)new_obs_list.append(new_obs)returnself.__class__(new_obs_list)
def_ipython_key_completions_(self):returnself.ids
[docs]defgroup_by_label(self,labels):"""Split obsevations in multiple groups of observations Parameters ---------- labels : array Array of group labels Returns ------- obs_clusters : dict of `~gammapy.data.Observations` dict of Observations instance, one instance for each group. """obs_groups={}forlabelinnp.unique(labels):observations=self.__class__([obsfork,obsinenumerate(self)iflabels[k]==label])obs_groups[f"group_{label}"]=observationsreturnobs_groups
classObservationChecker(Checker):"""Check an observation. Checks data format and a bit about the content. """CHECKS={"events":"check_events","gti":"check_gti","aeff":"check_aeff","edisp":"check_edisp","psf":"check_psf",}def__init__(self,observation):self.observation=observationdef_record(self,level="info",msg=None):return{"level":level,"obs_id":self.observation.obs_id,"msg":msg}defcheck_events(self):yieldself._record(level="debug",msg="Starting events check")try:events=self.observation.eventsexceptException:yieldself._record(level="warning",msg="Loading events failed")returnyield fromEventListChecker(events).run()# TODO: split this out into a GTICheckerdefcheck_gti(self):yieldself._record(level="debug",msg="Starting gti check")try:gti=self.observation.gtiexceptException:yieldself._record(level="warning",msg="Loading GTI failed")returniflen(gti.table)==0:yieldself._record(level="error",msg="GTI table has zero rows")columns_required=["START","STOP"]fornameincolumns_required:ifnamenotingti.table.colnames:yieldself._record(level="error",msg=f"Missing table column: {name!r}")# TODO: Check that header keywords agree with table entries# TSTART, TSTOP, MJDREFI, MJDREFF# Check that START and STOP times are consecutive# times = np.ravel(self.table['START'], self.table['STOP'])# # TODO: not sure this is correct ... add test with a multi-gti table from Fermi.# if not np.all(np.diff(times) >= 0):# yield 'GTIs are not consecutive or sorted.'# TODO: add reference times for all instruments and check for this# Use TELESCOP header key to check which instrument it is.def_check_times(self):"""Check if various times are consistent. The headers and tables of the FITS EVENTS and GTI extension contain various observation and event time information. """# http://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Cicerone/Cicerone_Data/Time_in_ScienceTools.html# https://hess-confluence.desy.de/confluence/display/HESS/HESS+FITS+data+-+References+and+checks#HESSFITSdata-Referencesandchecks-Timetelescope_met_refs={"FERMI":Time("2001-01-01T00:00:00"),"HESS":Time("2001-01-01T00:00:00"),}meta=self.dset.event_list.table.metatelescope=meta["TELESCOP"]iftelescopeintelescope_met_refs.keys():dt=self.time_ref-telescope_met_refs[telescope]ifdt>self.accuracy["time"]:yieldself._record(level="error",msg="Reference time incorrect for telescope")defcheck_aeff(self):yieldself._record(level="debug",msg="Starting aeff check")try:aeff=self.observation.aeffexceptException:yieldself._record(level="warning",msg="Loading aeff failed")return# Check that thresholds are meaningful for aeffif("LO_THRES"inaeff.metaand"HI_THRES"inaeff.metaandaeff.meta["LO_THRES"]>=aeff.meta["HI_THRES"]):yieldself._record(level="error",msg="LO_THRES >= HI_THRES in effective area meta data")# Check that data isn't all nullifnp.max(aeff.data.data)<=0:yieldself._record(level="error",msg="maximum entry of effective area is <= 0")defcheck_edisp(self):yieldself._record(level="debug",msg="Starting edisp check")try:edisp=self.observation.edispexceptException:yieldself._record(level="warning",msg="Loading edisp failed")return# Check that data isn't all nullifnp.max(edisp.data.data)<=0:yieldself._record(level="error",msg="maximum entry of edisp is <= 0")defcheck_psf(self):yieldself._record(level="debug",msg="Starting psf check")try:self.observation.psfexceptException:yieldself._record(level="warning",msg="Loading psf failed")return