# Licensed under a 3-clause BSD style license - see LICENSE.rstimportabcimportnumpyasnpfromastropyimportunitsasufromastropy.ioimportfitsfromastropy.tableimportTablefromgammapy.dataimportGTIfromgammapy.irfimportEDispKernel,EDispKernelMapfromgammapy.mapsimportRegionNDMapfromgammapy.utils.scriptsimportmake_pathfrom.spectrumimportSpectrumDatasetOnOff__all__=["DatasetReader","DatasetWriter","OGIPDatasetReader","OGIPDatasetWriter",]classDatasetReader(abc.ABC):"""Dataset reader base class."""@property@abc.abstractmethoddeftag(self):pass@abc.abstractmethoddefread(self):passclassDatasetWriter(abc.ABC):"""Dataset writer base class."""@property@abc.abstractmethoddeftag(self):pass@abc.abstractmethoddefwrite(self,dataset):pass
[docs]classOGIPDatasetWriter(DatasetWriter):"""Write OGIP files. If you want to use the written files with Sherpa, you have to use the ``ogip-sherpa`` format. Then all files will be written in units of 'keV' and 'cm2'. The naming scheme is fixed as following: * PHA file is named filename.fits * BKG file is named filename_bkg.fits * ARF file is named filename_arf.fits * RMF file is named filename_rmf.fits Parameters ---------- filename : `pathlib.Path` or str Filename format : {"ogip", "ogip-sherpa"} Which format to use overwrite : bool Overwrite existing files? """tag=["ogip","ogip-sherpa"]def__init__(self,filename,format="ogip",overwrite=False):filename=make_path(filename)filename.parent.mkdir(exist_ok=True,parents=True)self.filename=filenameself.format=formatself.overwrite=overwrite
[docs]defget_ogip_meta(self,dataset,is_bkg=False):"""Meta info for the OGIP data format."""try:livetime=dataset.exposure.meta["livetime"]exceptKeyError:raiseValueError("Storing in ogip format require the livetime ""to be defined in the exposure meta data")hdu_class="BKG"ifis_bkgelse"TOTAL"meta={"HDUCLAS2":hdu_class,"HDUCLAS3":"COUNT","HDUCLAS4":"TYPE:1","EXPOSURE":livetime.to_value("s"),"OBS_ID":dataset.name,}filenames=OGIPDatasetWriter.get_filenames(self.filename)meta["ANCRFILE"]=filenames["ancrfile"]ifdataset.edisp:meta["BACKFILE"]=filenames["backfile"]ifdataset.counts_off:meta["RESPFILE"]=filenames["respfile"]returnmeta
[docs]defwrite(self,dataset):"""Write dataset to file. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write """filenames=self.get_filenames(self.filename)self.write_pha(dataset,filename=self.filename)path=self.filename.parentself.write_arf(dataset,filename=path/filenames["ancrfile"])ifdataset.counts_off:self.write_bkg(dataset,filename=path/filenames["backfile"])ifdataset.edisp:self.write_rmf(dataset,filename=path/filenames["respfile"])
[docs]defwrite_rmf(self,dataset,filename):"""Write energy dispersion. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write filename : str or `Path` Filename to use """kernel=dataset.edisp.get_edisp_kernel()kernel.write(filename=filename,overwrite=self.overwrite,format=self.format)
[docs]defwrite_arf(self,dataset,filename):"""Write effective area. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write filename : str or `Path` Filename to use """aeff=dataset.exposure/dataset.exposure.meta["livetime"]aeff.write(filename=filename,overwrite=self.overwrite,format=self.format.replace("ogip","ogip-arf"),)
[docs]defto_counts_hdulist(self,dataset,is_bkg=False):"""Convert counts region map to hdulist. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write is_bkg : bool Whether to use counts off """counts=dataset.counts_offifis_bkgelsedataset.countsacceptance=dataset.acceptance_offifis_bkgelsedataset.acceptancehdulist=counts.to_hdulist(format=self.format)table=Table.read(hdulist["SPECTRUM"])meta=self.get_ogip_meta(dataset,is_bkg=is_bkg)ifdataset.mask_safeisnotNone:mask_array=dataset.mask_safe.data[:,0,0]else:mask_array=np.ones(acceptance.data.size)table["QUALITY"]=np.logical_not(mask_array)deltable.meta["QUALITY"]table["BACKSCAL"]=acceptance.data[:,0,0]deltable.meta["BACKSCAL"]# adapt meta datatable.meta.update(meta)hdulist["SPECTRUM"]=fits.BinTableHDU(table)returnhdulist
[docs]defwrite_pha(self,dataset,filename):"""Write counts file. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write filename : str or `Path` Filename to use """hdulist=self.to_counts_hdulist(dataset)ifdataset.gti:hdu=dataset.gti.to_table_hdu()hdulist.append(hdu)hdulist.writeto(filename,overwrite=self.overwrite)
[docs]defwrite_bkg(self,dataset,filename):"""Write off counts file. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write filename : str or `Path` Filename to use """hdulist=self.to_counts_hdulist(dataset,is_bkg=True)hdulist.writeto(filename,overwrite=self.overwrite)
[docs]classOGIPDatasetReader(DatasetReader):"""Read `~gammapy.datasets.SpectrumDatasetOnOff` from OGIP files. BKG file, ARF, and RMF must be set in the PHA header and be present in the same folder. The naming scheme is fixed to the following scheme: * PHA file is named ``pha_obs{name}.fits`` * BKG file is named ``bkg_obs{name}.fits`` * ARF file is named ``arf_obs{name}.fits`` * RMF file is named ``rmf_obs{name}.fits`` with ``{name}`` the dataset name. Parameters ---------- filename : str or `~pathlib.Path` OGIP PHA file to read """tag="ogip"def__init__(self,filename):self.filename=make_path(filename)
[docs]defget_valid_path(self,filename):"""Get absolute or relative path. The relative path is with respect to the name of the reference file. Parameters ---------- filename : str or `Path` Filename Returns ------- filename : `Path` Valid path """filename=make_path(filename)ifnotfilename.exists():returnself.filename.parent/filenameelse:returnfilename
[docs]defget_filenames(self,pha_meta):"""Get filenames. Parameters ---------- pha_meta : dict Meta data from the PHA file Returns ------- filenames : dict Dict with filenames of "arffile", "rmffile" (optional) and "bkgfile" (optional) """filenames={"arffile":self.get_valid_path(pha_meta["ANCRFILE"])}if"BACKFILE"inpha_meta:filenames["bkgfile"]=self.get_valid_path(pha_meta["BACKFILE"])if"RESPFILE"inpha_meta:filenames["rmffile"]=self.get_valid_path(pha_meta["RESPFILE"])returnfilenames
[docs]@staticmethoddefread_pha(filename):"""Read PHA file. Parameters ---------- filename : str or `Path` PHA file name Returns ------- data : dict Dict with counts, acceptance and mask_safe """data={}withfits.open(filename,memmap=False)ashdulist:data["counts"]=RegionNDMap.from_hdulist(hdulist,format="ogip")data["acceptance"]=RegionNDMap.from_hdulist(hdulist,format="ogip",ogip_column="BACKSCAL")if"GTI"inhdulist:data["gti"]=GTI.from_table_hdu(hdulist["GTI"])data["mask_safe"]=RegionNDMap.from_hdulist(hdulist,format="ogip",ogip_column="QUALITY")returndata
[docs]@staticmethoddefread_bkg(filename):"""Read PHA background file. Parameters ---------- filename : str or `Path` PHA file name Returns ------- data : dict Dict with counts_off and acceptance_off """withfits.open(filename,memmap=False)ashdulist:counts_off=RegionNDMap.from_hdulist(hdulist,format="ogip")acceptance_off=RegionNDMap.from_hdulist(hdulist,ogip_column="BACKSCAL",format="ogip")return{"counts_off":counts_off,"acceptance_off":acceptance_off}
[docs]@staticmethoddefread_rmf(filename,exposure):"""Read RMF file. Parameters ---------- filename : str or `Path` PHA file name exposure : `RegionNDMap` Exposure map Returns ------- data : `EDispKernelMap` Dict with edisp """kernel=EDispKernel.read(filename)edisp=EDispKernelMap.from_edisp_kernel(kernel,geom=exposure.geom)# TODO: resolve this separate handling of exposure for edispedisp.exposure_map.data=exposure.data[:,:,np.newaxis,:]returnedisp
[docs]@staticmethoddefread_arf(filename,livetime):"""Read ARF file. Parameters ---------- filename : str or `Path` PHA file name livetime : `Quantity` Livetime Returns ------- data : `RegionNDMap` Exposure map """aeff=RegionNDMap.read(filename,format="ogip-arf")exposure=aeff*livetimeexposure.meta["livetime"]=livetimereturnexposure