# Licensed under a 3-clause BSD style license - see LICENSE.rstimportabcimportnumpyasnpfrompathlibimportPathfromastropyimportunitsasufromastropy.ioimportfitsfromastropy.tableimportTablefromgammapy.dataimportGTIfromgammapy.irfimportEDispKernel,EDispKernelMap,PSFMapfromgammapy.mapsimportRegionNDMap,Mapfromgammapy.modeling.modelsimportcreate_fermi_isotropic_diffuse_model,Modelsfromgammapy.utils.scriptsimportread_yaml,make_pathfrom.spectrumimportSpectrumDatasetOnOfffrom.utilsimportcreate_map_dataset_from_dl4__all__=["DatasetReader","DatasetWriter","OGIPDatasetReader","OGIPDatasetWriter","FermipyDatasetsReader",]classDatasetReader(abc.ABC):"""Dataset reader base class."""@property@abc.abstractmethoddeftag(self):pass@abc.abstractmethoddefread(self):passclassDatasetWriter(abc.ABC):"""Dataset writer base class."""@property@abc.abstractmethoddeftag(self):pass@abc.abstractmethoddefwrite(self,dataset):pass
[docs]classOGIPDatasetWriter(DatasetWriter):"""Write OGIP files. If you want to use the written files with Sherpa, you have to use the ``ogip-sherpa`` format. Then all files will be written in units of 'keV' and 'cm2'. The naming scheme is fixed as following: * PHA file is named filename.fits * BKG file is named filename_bkg.fits * ARF file is named filename_arf.fits * RMF file is named filename_rmf.fits Parameters ---------- filename : `~pathlib.Path` or str Filename. format : {"ogip", "ogip-sherpa"} Which format to use. Default is 'ogip'. overwrite : bool, optional Overwrite existing files. Default is False. checksum : bool When True adds both DATASUM and CHECKSUM cards to the headers written to the files. Default is False. """tag=["ogip","ogip-sherpa"]def__init__(self,filename,format="ogip",overwrite=False,checksum=False):filename=make_path(filename)filename.parent.mkdir(exist_ok=True,parents=True)self.filename=filenameself.format=formatself.overwrite=overwriteself.checksum=checksum
[docs]defget_ogip_meta(self,dataset,is_bkg=False):"""Meta info for the OGIP data format."""try:livetime=dataset.exposure.meta["livetime"]exceptKeyError:raiseValueError("Storing in ogip format require the livetime ""to be defined in the exposure meta data")hdu_class="BKG"ifis_bkgelse"TOTAL"meta={"HDUCLAS2":hdu_class,"HDUCLAS3":"COUNT","HDUCLAS4":"TYPE:1","EXPOSURE":livetime.to_value("s"),"OBS_ID":dataset.name,}filenames=OGIPDatasetWriter.get_filenames(self.filename)meta["ANCRFILE"]=filenames["ancrfile"]ifdataset.counts_off:meta["BACKFILE"]=filenames["backfile"]ifdataset.edisp:meta["RESPFILE"]=filenames["respfile"]returnmeta
[docs]defwrite(self,dataset):"""Write dataset to file. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write. """filenames=self.get_filenames(self.filename)self.write_pha(dataset,filename=self.filename)path=self.filename.parentself.write_arf(dataset,filename=path/filenames["ancrfile"])ifdataset.counts_off:self.write_bkg(dataset,filename=path/filenames["backfile"])ifdataset.edisp:self.write_rmf(dataset,filename=path/filenames["respfile"])
[docs]defwrite_rmf(self,dataset,filename):"""Write energy dispersion. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write. filename : str or `~pathlib.Path` Filename to use. """kernel=dataset.edisp.get_edisp_kernel()kernel.write(filename=filename,format=self.format,checksum=self.checksum,overwrite=self.overwrite,)
[docs]defwrite_arf(self,dataset,filename):"""Write effective area. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write. filename : str or `~pathlib.Path` Filename to use. """aeff=dataset.exposure/dataset.exposure.meta["livetime"]aeff.write(filename=filename,overwrite=self.overwrite,format=self.format.replace("ogip","ogip-arf"),checksum=self.checksum,)
[docs]defto_counts_hdulist(self,dataset,is_bkg=False):"""Convert counts region map to hdulist. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write. is_bkg : bool Whether to use counts off. Default is False. """counts=dataset.counts_offifis_bkgelsedataset.countsacceptance=dataset.acceptance_offifis_bkgelsedataset.acceptancehdulist=counts.to_hdulist(format=self.format)table=Table.read(hdulist["SPECTRUM"])meta=self.get_ogip_meta(dataset,is_bkg=is_bkg)ifdataset.mask_safeisnotNone:mask_array=dataset.mask_safe.data[:,0,0]else:mask_array=np.ones(acceptance.data.size)table["QUALITY"]=np.logical_not(mask_array)deltable.meta["QUALITY"]table["BACKSCAL"]=acceptance.data[:,0,0]deltable.meta["BACKSCAL"]# adapt meta datatable.meta.update(meta)hdulist["SPECTRUM"]=fits.BinTableHDU(table)returnhdulist
[docs]defwrite_pha(self,dataset,filename):"""Write counts file. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write. filename : str or `~pathlib.Path` Filename to use. """hdulist=self.to_counts_hdulist(dataset)ifdataset.gti:hdu=dataset.gti.to_table_hdu()hdulist.append(hdu)hdulist.writeto(filename,overwrite=self.overwrite,checksum=self.checksum)
[docs]defwrite_bkg(self,dataset,filename):"""Write off counts file. Parameters ---------- dataset : `SpectrumDatasetOnOff` Dataset to write. filename : str or `~pathlib.Path` Filename to use. """hdulist=self.to_counts_hdulist(dataset,is_bkg=True)hdulist.writeto(filename,overwrite=self.overwrite,checksum=self.checksum)
[docs]classOGIPDatasetReader(DatasetReader):"""Read `~gammapy.datasets.SpectrumDatasetOnOff` from OGIP files. BKG file, ARF, and RMF must be set in the PHA header and be present in the same folder. The naming scheme is fixed to the following scheme: * PHA file is named ``pha_obs{name}.fits`` * BKG file is named ``bkg_obs{name}.fits`` * ARF file is named ``arf_obs{name}.fits`` * RMF file is named ``rmf_obs{name}.fits`` with ``{name}`` the dataset name. Parameters ---------- filename : str or `~pathlib.Path` OGIP PHA file to read. checksum : bool If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False. """tag="ogip"def__init__(self,filename,checksum=False,name=None):self.filename=make_path(filename)self.checksum=checksumself.name=name
[docs]defget_valid_path(self,filename):"""Get absolute or relative path. The relative path is with respect to the name of the reference file. Parameters ---------- filename : str or `~pathlib.Path` Filename. Returns ------- filename : `~pathlib.Path` Valid path. """filename=make_path(filename)ifnotfilename.exists():returnself.filename.parent/filenameelse:returnfilename
[docs]defget_filenames(self,pha_meta):"""Get filenames. Parameters ---------- pha_meta : dict Metadata from the PHA file. Returns ------- filenames : dict Dictionary with filenames of "arffile", "rmffile" (optional) and "bkgfile" (optional). """filenames={"arffile":self.get_valid_path(pha_meta["ANCRFILE"])}if"BACKFILE"inpha_meta:filenames["bkgfile"]=self.get_valid_path(pha_meta["BACKFILE"])if"RESPFILE"inpha_meta:filenames["rmffile"]=self.get_valid_path(pha_meta["RESPFILE"])returnfilenames
[docs]@staticmethoddefread_pha(filename,checksum=False):"""Read PHA file. Parameters ---------- filename : str or `~pathlib.Path` PHA file name. checksum : bool If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False. Returns ------- data : dict Dictionary with counts, acceptance and mask_safe. """data={}withfits.open(filename,memmap=False,checksum=checksum)ashdulist:data["counts"]=RegionNDMap.from_hdulist(hdulist,format="ogip")data["acceptance"]=RegionNDMap.from_hdulist(hdulist,format="ogip",ogip_column="BACKSCAL")if"GTI"inhdulist:data["gti"]=GTI.from_table_hdu(hdulist["GTI"])data["mask_safe"]=RegionNDMap.from_hdulist(hdulist,format="ogip",ogip_column="QUALITY")returndata
[docs]@staticmethoddefread_bkg(filename,checksum=False):"""Read PHA background file. Parameters ---------- filename : str or `~pathlib.Path` PHA file name. checksum : bool If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False. Returns ------- data : dict Dictionary with counts_off and acceptance_off. """withfits.open(filename,memmap=False,checksum=checksum)ashdulist:counts_off=RegionNDMap.from_hdulist(hdulist,format="ogip")acceptance_off=RegionNDMap.from_hdulist(hdulist,ogip_column="BACKSCAL",format="ogip")return{"counts_off":counts_off,"acceptance_off":acceptance_off}
[docs]@staticmethoddefread_rmf(filename,exposure,checksum=False):"""Read RMF file. Parameters ---------- filename : str or `~pathlib.Path` PHA file name. exposure : `RegionNDMap` Exposure map. checksum : bool If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False. Returns ------- data : `EDispKernelMap` Dictionary with edisp. """kernel=EDispKernel.read(filename,checksum=checksum)edisp=EDispKernelMap.from_edisp_kernel(kernel,geom=exposure.geom)# TODO: resolve this separate handling of exposure for edispedisp.exposure_map.data=exposure.data[:,:,np.newaxis,:]returnedisp
[docs]@staticmethoddefread_arf(filename,livetime,checksum=False):"""Read ARF file. Parameters ---------- filename : str or `~pathlib.Path` PHA file name. livetime : `Quantity` Livetime. checksum : bool If True checks both DATASUM and CHECKSUM cards in the file headers. Default is False. Returns ------- data : `RegionNDMap` Exposure map. """aeff=RegionNDMap.read(filename,format="ogip-arf",checksum=checksum)exposure=aeff*livetimeexposure.meta["livetime"]=livetimereturnexposure
[docs]classFermipyDatasetsReader(DatasetReader):"""Create datasets from Fermi-LAT files. Parameters ---------- filename : str Path to Fermipy configuration file (tested only for v1.3.1). edisp_bins : int Number of margin bins to slice in energy. Default is 0. For now only maps created with edisp_bins=0 in fermipy configuration are supported, in that case the emin/emax in the fermipy configuration will correspond to the true energy range for gammapy, and a value edisp_bins>0 should be set here in order to apply the energy dispersion correctly. With a binning of 8 to 10 bins per decade, it is recommended to use edisp_bins ≥ 2 (See https://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Pass8_edisp_usage.html) """tag="fermipy"def__init__(self,filename,edisp_bins=0):self.filename=make_path(filename)self.edisp_bins=edisp_bins
[docs]@staticmethoddefcreate_dataset(counts_file,exposure_file,psf_file,edisp_file,isotropic_file=None,edisp_bins=0,name=None,):"""Create a map dataset from Fermi-LAT files. Parameters ---------- counts_file : str Counts file path. exposure_file : str Exposure file path. psf_file : str Point spread function file path. edisp_file : str Energy dispersion file path. isotropic_file : str, optional Isotropic file path. Default is None edisp_bins : int Number of margin bins to slice in energy. Default is 0. For now only maps created with edisp_bins=0 in fermipy configuration are supported, in that case the emin/emax in the fermipy configuration will correspond to the true energy range for gammapy, and a value edisp_bins>0 should be set here in order to apply the energy dispersion correctly. With a binning of 8 to 10 bins per decade, it is recommended to use edisp_bins ≥ 2 (See https://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Pass8_edisp_usage.html) name : str, optional Dataset name. The default is None, and the name is randomly generated. Returns ------- dataset : `~gammapy.datasets.MapDataset` Map dataset. """fromgammapy.datasetsimportMapDatasetcounts=Map.read(counts_file)exposure=Map.read(exposure_file)psf=PSFMap.read(psf_file,format="gtpsf")edisp=EDispKernelMap.read(edisp_file,format="gtdrm")# check that fermipy edisp_bins are matching between edisp and exposure# as we will interp to edisp axis the exposure axis must be larger or equaledisp_axes=edisp.edisp_map.geom.axesiflen(edisp_axes["energy_true"].center)>len(exposure.geom.axes["energy_true"].center):raiseValueError("Energy true axes of exposure and DRM do not match. Check fermipy configuration.")edisp_axes=edisp.edisp_map.geom.axespsf_r68s=psf.containment_radius(0.68,edisp_axes["energy_true"].center,position=counts.geom.center_skydir,)# check that pdf is well defined (fails if edisp_bins>0 in fermipy)ifnp.any(psf_r68s.value)==0.0:raiseValueError("PSF is not defined for all true energies. Check fermipy configuration.")# change counts energy axis unit keV->MeVenergy_axis=counts.geom.axes["energy"]._init_copy(nodes=edisp_axes["energy"].edges)geom=counts.geom.to_image().to_cube([energy_axis])counts=Map.from_geom(geom,data=counts.data)# standardize dataset interpolating to same geom and axesdataset=MapDataset(counts=counts,exposure=exposure,psf=psf,edisp=edisp,name=name,)dataset=create_map_dataset_from_dl4(dataset,geom=counts.geom,name=dataset.name)ifedisp_bins>0:# slice edisp_binsdataset=dataset.slice_by_idx(dict(energy=slice(edisp_bins,-edisp_bins)),name=dataset.name)ifisotropic_file:model=create_fermi_isotropic_diffuse_model(isotropic_file,datasets_names=[dataset.name])dataset.models=Models([model])returndataset