Source code for gammapy.spectrum.dataset

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from collections import OrderedDict
import numpy as np
from pathlib import Path
from astropy import units as u
from astropy.table import Table
from astropy.io import fits
from .utils import SpectrumEvaluator
from ..utils.scripts import make_path
from ..utils.fitting import Dataset, Parameters
from ..utils.fits import energy_axis_to_ebounds
from ..stats import wstat, cash
from ..utils.random import get_random_state
from ..data import ObservationStats
from .core import CountsSpectrum
from ..irf import EffectiveAreaTable, EnergyDispersion, IRFStacker

__all__ = ["SpectrumDatasetOnOff", "SpectrumDataset", "SpectrumDatasetOnOffStacker"]


[docs]class SpectrumDataset(Dataset): """Spectrum dataset for likelihood fitting. The spectrum dataset bundles reduced counts data, with a spectral model, background model and instrument response function to compute the fit-statistic given the current model and data. Parameters ---------- model : `~gammapy.spectrum.models.SpectralModel` Fit model counts : `~gammapy.spectrum.CountsSpectrum` Counts spectrum livetime : `~astropy.units.Quantity` Livetime aeff : `~gammapy.irf.EffectiveAreaTable` Effective area edisp : `~gammapy.irf.EnergyDispersion` Energy dispersion background : `~gammapy.spectrum.CountsSpectrum` Background to use for the fit. mask_safe : `~numpy.ndarray` Mask defining the safe data range. mask_fit : `~numpy.ndarray` Mask to apply to the likelihood for fitting. obs_id : int or list of int Observation id(s) corresponding to the (stacked) dataset. gti : '~gammapy.data.gti.GTI' GTI of the observation or union of GTI if it is a stacked observation See Also -------- SpectrumDatasetOnOff, FluxPointsDataset, MapDataset """ likelihood_type = "cash" def __init__( self, model=None, counts=None, livetime=None, aeff=None, edisp=None, background=None, mask_safe=None, mask_fit=None, obs_id=None, gti=None, ): if mask_fit is not None and mask_fit.dtype != np.dtype("bool"): raise ValueError("mask data must have dtype bool") self.counts = counts if livetime is not None: livetime = u.Quantity(livetime) self.livetime = livetime self.mask_fit = mask_fit self.aeff = aeff self.edisp = edisp self.background = background self.model = model self.mask_safe = mask_safe self.obs_id = obs_id self.gti = gti def __repr__(self): str_ = self.__class__.__name__ return str_ def __str__(self): str_ = self.__class__.__name__ str_ += "\n\n" counts = np.nan if self.counts is not None: counts = np.sum(self.counts.data) str_ += "\t{:32}: {:.0f} \n".format("Total counts", counts) npred = np.nan if self.model is not None: npred = np.sum(self.npred().data) str_ += "\t{:32}: {:.2f}\n".format("Total predicted counts", npred) counts_off = np.nan if getattr(self, "counts_off", None) is not None: counts_off = np.sum(self.counts_off.data) str_ += "\t{:32}: {:.2f}\n\n".format("Total off counts", counts_off) aeff_min, aeff_max, aeff_unit = np.nan, np.nan, "" if self.aeff is not None: aeff_min = np.min(self.aeff.data.data.value[self.aeff.data.data.value > 0]) aeff_max = np.max(self.aeff.data.data.value) aeff_unit = self.aeff.data.data.unit str_ += "\t{:32}: {:.2e} {}\n".format("Effective area min", aeff_min, aeff_unit) str_ += "\t{:32}: {:.2e} {}\n\n".format( "Effective area max", aeff_max, aeff_unit ) livetime = np.nan if self.livetime is not None: livetime = self.livetime str_ += "\t{:32}: {:.2e}\n\n".format("Livetime", livetime) # data section n_bins = 0 if self.counts is not None: n_bins = self.counts.data.size str_ += "\t{:32}: {} \n".format("Number of total bins", n_bins) n_fit_bins = 0 if self.mask is not None: n_fit_bins = np.sum(self.mask) str_ += "\t{:32}: {} \n\n".format("Number of fit bins", n_fit_bins) # likelihood section str_ += "\t{:32}: {}\n".format("Fit statistic type", self.likelihood_type) stat = np.nan if self.model is not None: stat = self.likelihood() str_ += "\t{:32}: {:.2f}\n\n".format("Fit statistic value (-2 log(L))", stat) n_pars, n_free_pars = 0, 0 if self.model is not None: n_pars = len(self.model.parameters.parameters) n_free_pars = len(self.parameters.free_parameters) str_ += "\t{:32}: {}\n".format("Number of parameters", n_pars) str_ += "\t{:32}: {}\n\n".format("Number of free parameters", n_free_pars) if self.model is not None: str_ += "\t{:32}: {}\n".format("Model type", self.model.__class__.__name__) info = str(self.model.parameters) lines = info.split("\n") for line in lines[2:-1]: str_ += "\t" + line.replace(":", "\t:") + "\n" return str_.expandtabs(tabsize=4) @property def model(self): return self._model @model.setter def model(self, model): self._model = model if model is not None: self._parameters = Parameters(self._model.parameters.parameters) self._predictor = SpectrumEvaluator( model=self.model, livetime=self.livetime, aeff=self.aeff, e_true=self._energy_axis.edges, edisp=self.edisp, ) else: self._parameters = None self._predictor = None @property def parameters(self): if self._parameters is None: raise AttributeError("No model set for Dataset") else: return self._parameters @property def _energy_axis(self): if self.counts is not None: e_axis = self.counts.energy elif self.edisp is not None: e_axis = self.edisp.data.axis("e_reco") elif self.aeff is not None: # assume e_reco = e_true e_axis = self.aeff.data.axis("energy") return e_axis @property def data_shape(self): """Shape of the counts data""" return (self._energy_axis.nbin,)
[docs] def npred(self): """Returns npred map (model + background)""" if self._predictor is None: raise AttributeError("No model set for Dataset") npred = self._predictor.compute_npred() if self.background: npred.data += self.background.data return npred
[docs] def likelihood_per_bin(self): """Likelihood per bin given the current model parameters""" return cash(n_on=self.counts.data, mu_on=self.npred().data)
def _as_counts_spectrum(self, data): energy = self.counts.energy.edges return CountsSpectrum(data=data, energy_lo=energy[:-1], energy_hi=energy[1:]) @property def excess(self): """Excess (counts - alpha * counts_off)""" excess = self.counts.data - self.background.data return self._as_counts_spectrum(excess)
[docs] def fake(self, random_state="random-seed"): """Simulate fake counts for the current model and reduced irfs. This method overwrites the counts defined on the dataset object. Parameters ---------- random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. """ random_state = get_random_state(random_state) npred = self.npred() npred.data = random_state.poisson(npred.data) self.counts = npred
@property def energy_range(self): """Energy range defined by the safe mask""" energy = self.counts.energy.edges e_lo = energy[:-1][self.mask_safe] e_hi = energy[1:][self.mask_safe] return u.Quantity([e_lo.min(), e_hi.max()])
[docs] def plot_fit(self): """Plot counts and residuals in two panels. Calls ``plot_counts`` and ``plot_residuals``. """ from matplotlib.gridspec import GridSpec import matplotlib.pyplot as plt gs = GridSpec(7, 1) ax_spectrum = plt.subplot(gs[:5, :]) self.plot_counts(ax=ax_spectrum) ax_spectrum.set_xticks([]) ax_residuals = plt.subplot(gs[5:, :]) self.plot_residuals(ax=ax_residuals) return ax_spectrum, ax_residuals
@property def _e_unit(self): return self.counts.energy.unit
[docs] def plot_counts(self, ax=None): """Plot predicted and detected counts. Parameters ---------- ax : `~matplotlib.pyplot.Axes` Axes object. Returns ------- ax : `~matplotlib.pyplot.Axes` Axes object. """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax self.npred().plot(ax=ax, label="mu_src", energy_unit=self._e_unit) self.excess.plot(ax=ax, label="Excess", fmt=".", energy_unit=self._e_unit) e_min, e_max = self.energy_range kwargs = {"color": "black", "linestyle": "dashed"} ax.axvline(e_min.to_value(self._e_unit), label="fit range", **kwargs) ax.axvline(e_max.to_value(self._e_unit), **kwargs) ax.legend(numpoints=1) ax.set_title("") return ax
[docs] def residuals(self, method="diff"): """Compute the spectral residuals. Parameters ---------- method: {"diff", "diff/model", "diff/sqrt(model)"} Method used to compute the residuals. Available options are: - `diff` (default): data - model - `diff/model`: (data - model) / model - `diff/sqrt(model)`: (data - model) / sqrt(model) Returns ------- residuals : `CountsSpectrum` Residual spectrum. """ residuals = self._compute_residuals(self.counts, self.npred(), method) return residuals
[docs] def plot_residuals(self, method="diff", ax=None, **kwargs): """Plot residuals. Parameters ---------- ax : `~matplotlib.pyplot.Axes` Axes object. method : {"diff", "diff/model", "diff/sqrt(model)"} Normalization used to compute the residuals, see `SpectrumDataset.residuals()` **kwargs : dict Keywords passed to `CountsSpectrum.plot()` Returns ------- ax : `~matplotlib.pyplot.Axes` Axes object. """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax residuals = self.residuals(method=method) label = self._residuals_labels[method] residuals.plot( ax=ax, ecolor="black", fmt="none", energy_unit=self._e_unit, **kwargs ) ax.axhline(0, color="black", lw=0.5) ymax = 1.2 * np.nanmax(residuals.data) ax.set_ylim(-ymax, ymax) ax.set_xlabel("Energy [{}]".format(self._e_unit)) ax.set_ylabel("Residuals ({})".format(label)) return ax
[docs]class SpectrumDatasetOnOff(SpectrumDataset): """Spectrum dataset for on-off likelihood fitting. The on-off spectrum dataset bundles reduced counts data, off counts data, with a spectral model, relative background efficiency and instrument response functions to compute the fit-statistic given the current model and data. Parameters ---------- model : `~gammapy.spectrum.models.SpectralModel` Fit model counts : `~gammapy.spectrum.CountsSpectrum` ON Counts spectrum counts_off : `~gammapy.spectrum.CountsSpectrum` OFF Counts spectrum livetime : `~astropy.units.Quantity` Livetime aeff : `~gammapy.irf.EffectiveAreaTable` Effective area edisp : `~gammapy.irf.EnergyDispersion` Energy dispersion mask_safe : `~numpy.array` Mask defining the safe data range. mask_fit : `~numpy.array` Mask to apply to the likelihood for fitting. acceptance : `~numpy.array` or float Relative background efficiency in the on region. acceptance_off : `~numpy.array` or float Relative background efficiency in the off region. obs_id : int or list of int Observation id(s) corresponding to the (stacked) dataset. gti : '~gammapy.data.gti.GTI' GTI of the observation or union of GTI if it is a stacked observation See Also -------- SpectrumDataset, FluxPointsDataset, MapDataset """ likelihood_type = "wstat" def __init__( self, model=None, counts=None, counts_off=None, livetime=None, aeff=None, edisp=None, mask_safe=None, mask_fit=None, acceptance=None, acceptance_off=None, obs_id=None, gti=None, ): self.counts = counts self.counts_off = counts_off if livetime is not None: livetime = u.Quantity(livetime) self.livetime = livetime self.mask_fit = mask_fit self.aeff = aeff self.edisp = edisp self.model = model self.mask_safe = mask_safe if np.isscalar(acceptance): acceptance = np.ones(self.data_shape) * acceptance if np.isscalar(acceptance_off): acceptance_off = np.ones(self.data_shape) * acceptance_off self.acceptance = acceptance self.acceptance_off = acceptance_off self.obs_id = obs_id self.gti = gti def __repr__(self): str_ = self.__class__.__name__ return str_ def __str__(self): str_ = super().__str__() acceptance = np.nan if self.acceptance is not None: acceptance = np.mean(self.acceptance) str_ += "\t{:32}: {}\n".format("Acceptance mean:", acceptance) return str_.expandtabs(tabsize=4) @property def background(self): """""" background = self.alpha * self.counts_off.data return self._as_counts_spectrum(background) @property def alpha(self): """Exposure ratio between signal and background regions""" return self.acceptance / self.acceptance_off
[docs] def npred_sig(self): """Predicted counts from source model (`CountsSpectrum`).""" if self._predictor is None: raise AttributeError("No model set for this Dataset") npred = self._predictor.compute_npred() return npred
[docs] def likelihood_per_bin(self): """Likelihood per bin given the current model parameters""" mu_sig = self.npred_sig().data on_stat_ = wstat( n_on=self.counts.data, n_off=self.counts_off.data, alpha=self.alpha, mu_sig=mu_sig, ) return np.nan_to_num(on_stat_)
[docs] def fake(self, background_model, random_state="random-seed"): """Simulate fake counts for the current model and reduced irfs. This method overwrites the counts and off counts defined on the dataset object. Parameters ---------- background_model : `~gammapy.spectrum.CountsSpectrum` BackgroundModel. In the future will be part of the SpectrumDataset Class. For the moment, a CountSpectrum. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. """ random_state = get_random_state(random_state) npred_sig = self.npred_sig() npred_sig.data = random_state.poisson(npred_sig.data) npred_bkg = background_model.copy() npred_bkg.data = random_state.poisson(npred_bkg.data) self.counts = npred_sig + npred_bkg npred_off = background_model / self.alpha npred_off.data = random_state.poisson(npred_off.data) self.counts_off = npred_off
[docs] @classmethod def read(cls, filename): """Read from file For now, filename is assumed to the name of a PHA file where BKG file, ARF, and RMF names must be set in the PHA header and be present in the same folder Parameters ---------- filename : str OGIP PHA file to read """ raise NotImplementedError( "To read from an OGIP fits file use SpectrumDatasetOnOff.from_ogip_files." )
[docs] def peek(self, figsize=(10, 10)): """Quick-look summary plots.""" import matplotlib.pyplot as plt e_min, e_max = self.energy_range fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(nrows=2, ncols=2, figsize=figsize) ax1.set_title("Counts") energy_unit = "TeV" if self.counts_off is not None: self.background.plot_hist( ax=ax1, label="alpha * n_off", color="darkblue", energy_unit=energy_unit ) self.counts.plot_hist( ax=ax1, label="n_on", color="darkred", energy_unit=energy_unit, show_energy=(e_min, e_max), ) ax1.set_xlim( 0.7 * e_min.to_value(energy_unit), 1.3 * e_max.to_value(energy_unit) ) ax1.legend(numpoints=1) ax2.set_title("Effective Area") e_unit = self.aeff.energy.unit self.aeff.plot(ax=ax2, show_energy=(e_min, e_max)) ax2.set_xlim(0.7 * e_min.to_value(e_unit), 1.3 * e_max.to_value(e_unit)) ax3.axis("off") if self.counts_off is not None: stats = ObservationStats(**self._info_dict(in_safe_energy_range=True)) ax3.text(0, 0.2, "{}".format(stats), fontsize=12) ax4.set_title("Energy Dispersion") if self.edisp is not None: self.edisp.plot_matrix(ax=ax4) # TODO: optimize layout plt.subplots_adjust(wspace=0.3)
[docs] def to_ogip_files(self, outdir=None, use_sherpa=False, overwrite=False): """Write OGIP files. If you want to use the written files with Sherpa you have to set the ``use_sherpa`` flag. Then all files will be written in units 'keV' and 'cm2'. Parameters ---------- outdir : `pathlib.Path` output directory, default: pwd use_sherpa : bool, optional Write Sherpa compliant files, default: False overwrite : bool Overwrite existing files? """ # TODO: refactor and reduce amount of code duplication outdir = Path.cwd() if outdir is None else make_path(outdir) outdir.mkdir(exist_ok=True, parents=True) if isinstance(self.obs_id, list): phafile = "pha_stacked.fits" else: phafile = "pha_obs{}.fits".format(self.obs_id) bkgfile = phafile.replace("pha", "bkg") arffile = phafile.replace("pha", "arf") rmffile = phafile.replace("pha", "rmf") counts_table = self.counts.to_table() counts_table["QUALITY"] = np.logical_not(self.mask_safe) counts_table["BACKSCAL"] = self.acceptance counts_table["AREASCAL"] = np.ones(self.acceptance.size) meta = self._ogip_meta() meta["respfile"] = rmffile meta["backfile"] = bkgfile meta["ancrfile"] = arffile meta["hduclas2"] = "TOTAL" counts_table.meta = meta name = counts_table.meta["name"] hdu = fits.BinTableHDU(counts_table, name=name) hdulist = fits.HDUList([fits.PrimaryHDU(), hdu, self._ebounds_hdu(use_sherpa)]) hdulist.writeto(str(outdir / phafile), overwrite=overwrite) self.aeff.write(outdir / arffile, overwrite=overwrite, use_sherpa=use_sherpa) if self.counts_off is not None: counts_off_table = self.counts_off.to_table() counts_off_table["QUALITY"] = np.logical_not(self.mask_safe) counts_off_table["BACKSCAL"] = self.acceptance_off counts_off_table["AREASCAL"] = np.ones(self.acceptance.size) meta = self._ogip_meta() meta["hduclas2"] = "BKG" counts_off_table.meta = meta name = counts_off_table.meta["name"] hdu = fits.BinTableHDU(counts_off_table, name=name) hdulist = fits.HDUList( [fits.PrimaryHDU(), hdu, self._ebounds_hdu(use_sherpa)] ) hdulist.writeto(str(outdir / bkgfile), overwrite=overwrite) if self.edisp is not None: self.edisp.write( str(outdir / rmffile), overwrite=overwrite, use_sherpa=use_sherpa )
def _ebounds_hdu(self, use_sherpa): energy = self.counts.energy.edges if use_sherpa: energy = energy.to("keV") return energy_axis_to_ebounds(energy) def _ogip_meta(self): """Meta info for the OGIP data format""" meta = OrderedDict() meta["name"] = "SPECTRUM" meta["hduclass"] = "OGIP" meta["hduclas1"] = "SPECTRUM" meta["corrscal"] = "" meta["chantype"] = "PHA" meta["detchans"] = self.counts.energy.nbin meta["filter"] = "None" meta["corrfile"] = "" meta["poisserr"] = True meta["hduclas3"] = "COUNT" meta["hduclas4"] = "TYPE:1" meta["lo_thres"] = self.energy_range[0].to_value("TeV") meta["hi_thres"] = self.energy_range[1].to_value("TeV") meta["exposure"] = self.livetime.to_value("s") meta["obs_id"] = self.obs_id return meta
[docs] @classmethod def from_ogip_files(cls, filename): """Read `~gammapy.spectrum.SpectrumDatasetOnOff` from OGIP files. BKG file, ARF, and RMF must be set in the PHA header and be present in the same folder. Parameters ---------- filename : str OGIP PHA file to read """ filename = make_path(filename) dirname = filename.parent with fits.open(str(filename), memmap=False) as hdulist: data = _read_ogip_hdulist(hdulist) counts = CountsSpectrum( energy_hi=data["energy_hi"], energy_lo=data["energy_lo"], data=data["data"] ) phafile = filename.name try: rmffile = phafile.replace("pha", "rmf") energy_dispersion = EnergyDispersion.read(str(dirname / rmffile)) except IOError: # TODO : Add logger and echo warning energy_dispersion = None try: bkgfile = phafile.replace("pha", "bkg") filename = str(dirname / bkgfile) with fits.open(str(filename), memmap=False) as hdulist: data_bkg = _read_ogip_hdulist(hdulist) counts_off = CountsSpectrum( energy_hi=data_bkg["energy_hi"], energy_lo=data_bkg["energy_lo"], data=data_bkg["data"], ) acceptance_off = data_bkg["backscal"] except IOError: # TODO : Add logger and echo warning counts_off, acceptance_off = None, None arffile = phafile.replace("pha", "arf") aeff = EffectiveAreaTable.read(str(dirname / arffile)) mask_safe = np.logical_not(data["quality"]) return cls( counts=counts, aeff=aeff, counts_off=counts_off, edisp=energy_dispersion, livetime=data["livetime"], mask_safe=mask_safe, acceptance=data["backscal"], acceptance_off=acceptance_off, obs_id=data["obs_id"], )
# TODO: decide on a design for dataset info tables / dicts and make it part # of the public API def _info_dict(self, in_safe_energy_range=False): """Info dict""" info = dict() mask = self.mask_safe if in_safe_energy_range else slice(None) # TODO: handle energy dependent a_on / a_off info["a_on"] = self.acceptance[0] info["n_on"] = self.counts.data[mask].sum() if self.counts_off is not None: info["n_off"] = self.counts_off.data[mask].sum() info["a_off"] = self.acceptance_off[0] else: info["n_off"] = 0 info["a_off"] = 1 info["livetime"] = self.livetime info["obs_id"] = self.obs_id return info
def _read_ogip_hdulist(hdulist, hdu1="SPECTRUM", hdu2="EBOUNDS"): """Create from `~astropy.io.fits.HDUList`.""" counts_table = Table.read(hdulist[hdu1]) ebounds = Table.read(hdulist[hdu2]) emin = ebounds["E_MIN"].quantity emax = ebounds["E_MAX"].quantity # Check if column are present in the header quality = None areascal = None backscal = None if "QUALITY" in counts_table.colnames: quality = counts_table["QUALITY"].data if "AREASCAL" in counts_table.colnames: areascal = counts_table["AREASCAL"].data if "BACKSCAL" in counts_table.colnames: backscal = counts_table["BACKSCAL"].data return dict( data=counts_table["COUNTS"], backscal=backscal, energy_lo=emin, energy_hi=emax, quality=quality, areascal=areascal, livetime=counts_table.meta["EXPOSURE"] * u.s, obs_id=counts_table.meta["OBS_ID"], is_bkg=False, )
[docs]class SpectrumDatasetOnOffStacker: r"""Stack a list of homogeneous datasets. The stacking of :math:`j` datasets is implemented as follows. :math:`k` and :math:`l` denote a bin in reconstructed and true energy, respectively. .. math:: \epsilon_{jk} =\left\{\begin{array}{cl} 1, & \mbox{if bin k is inside the energy thresholds}\\ 0, & \mbox{otherwise} \end{array}\right. \overline{\mathrm{n_{on}}}_k = \sum_{j} \mathrm{n_{on}}_{jk} \cdot \epsilon_{jk} \overline{\mathrm{n_{off}}}_k = \sum_{j} \mathrm{n_{off}}_{jk} \cdot \epsilon_{jk} \overline{\alpha}_k = \frac{\overline{{b_{on}}}_k}{\overline{{b_{off}}}_k} \overline{{b}_{on}}_k = 1 \overline{{b}_{off}}_k = \frac{1}{\sum_{j}\alpha_{jk} \cdot \mathrm{n_{off}}_{jk} \cdot \epsilon_{jk}} \cdot \overline{\mathrm {n_{off}}} Please refer to the `~gammapy.irf.IRFStacker` for the description of how the IRFs are stacked. Parameters ---------- obs_list : list of `~gammapy.spectrum.SpectrumDatasetOnOff` Observations to stack Examples -------- >>> from gammapy.spectrum import SpectrumDatasetOnOff, SpectrumDatasetOnOffStacker >>> obs_ids = [23523, 23526, 23559, 23592] >>> datasets = [] >>> for obs in obs_ids: >>> filename = "$GAMMAPY_DATA/joint-crab/spectra/hess/pha_obs{}.fits" >>> ds = SpectrumDatasetOnOff.from_ogip_files(filename.format(obs)) >>> datasets.append(ds) >>> obs_stacker = SpectrumDatasetOnOffStacker(datasets) >>> stacked = obs_stacker.run() >>> print(stacked.livetime) 6313.8116406202325 s """ def __init__(self, obs_list): self.obs_list = obs_list self.stacked_on_vector = None self.stacked_off_vector = None self.stacked_aeff = None self.stacked_edisp = None self.stacked_bkscal_on = None self.stacked_bkscal_off = None self.stacked_obs = None self.stacked_gti = None def __str__(self): ss = self.__class__.__name__ ss += "\n{}".format(self.obs_list) return ss
[docs] def run(self): """Run all steps in the correct order.""" self.stack_counts_vectors() self.stack_aeff() self.stack_edisp() self.stack_gti() self.stack_obs() return self.stacked_obs
[docs] def stack_counts_vectors(self): """Stack on and off vectors.""" self.stack_on_vector() self.stack_off_vector() self.stack_backscal() self.setup_counts_vectors()
[docs] def stack_on_vector(self): """Stack the on count vector.""" on_vector_list = [o.counts for o in self.obs_list] self.stacked_on_vector = self.stack_counts_spectrum(on_vector_list)
[docs] def stack_off_vector(self): """Stack the off count vector.""" off_vector_list = [o.counts_off for o in self.obs_list] self.stacked_off_vector = self.stack_counts_spectrum(off_vector_list)
[docs] def stack_counts_spectrum(self, counts_spectrum_list): """Stack `~gammapy.spectrum.CountsSpectrum`. * Bins outside the safe energy range are set to 0 * Attributes are set to None. * The quality vector of the observations are combined with a logical or, such that the low (high) threshold of the stacked obs is the minimum low (maximum high) threshold of the observation list to be stacked. """ template = counts_spectrum_list[0].copy() energy = template.energy stacked_data = np.zeros(energy.nbin) stacked_quality = np.ones(energy.nbin) for spec, obs in zip(counts_spectrum_list, self.obs_list): stacked_data[obs.mask_safe] += spec.data[obs.mask_safe] temp = np.logical_and(stacked_quality, ~obs.mask_safe) stacked_quality = np.array(temp, dtype=int) self.stacked_quality = stacked_quality return CountsSpectrum( data=stacked_data, energy_lo=energy.edges[:-1], energy_hi=energy.edges[1:] )
[docs] def stack_backscal(self): """Stack ``backscal`` for on and off vector.""" nbins = self.obs_list[0].counts.energy.nbin bkscal_on = np.ones(nbins) bkscal_off = np.zeros(nbins) alpha_sum = 0.0 for obs in self.obs_list: bkscal_off[obs.mask_safe] += (obs.alpha * obs.counts_off.data)[ obs.mask_safe ] alpha_sum += (obs.alpha * obs.counts_off.data)[obs.mask_safe].sum() with np.errstate(divide="ignore", invalid="ignore"): stacked_bkscal_off = self.stacked_off_vector.data / bkscal_off alpha_average = ( alpha_sum / self.stacked_off_vector.data[obs.mask_safe].sum() ) # there should be no nan values in backscal_on or backscal_off # this leads to problems when fitting the data # use 1 for backscale of on_vector and 1 / alpha_average for backscale of off_vector alpha_correction = 1 idx = np.where(self.stacked_off_vector.data == 0)[0] bkscal_on[idx] = alpha_correction # For the bins where the stacked OFF counts equal 0, the alpha value is performed by weighting on the total # OFF counts of each run stacked_bkscal_off[idx] = alpha_correction / alpha_average self.stacked_bkscal_on = bkscal_on self.stacked_bkscal_off = stacked_bkscal_off
[docs] def setup_counts_vectors(self): """Add correct attributes to stacked counts vectors.""" livetimes = [obs.livetime.to_value("s") for obs in self.obs_list] self.total_livetime = u.Quantity(np.sum(livetimes), "s") self.stacked_on_vector.livetime = self.total_livetime self.stacked_off_vector.livetime = self.total_livetime self.stacked_on_vector.backscal = self.stacked_bkscal_on self.stacked_off_vector.backscal = self.stacked_bkscal_off
[docs] def stack_aeff(self): """Stack effective areas (weighted by livetime). Calls `gammapy.irf.IRFStacker.stack_aeff`. """ irf_stacker = IRFStacker( list_aeff=[obs.aeff for obs in self.obs_list], list_livetime=[obs.livetime for obs in self.obs_list], ) irf_stacker.stack_aeff() self.stacked_aeff = irf_stacker.stacked_aeff
[docs] def stack_edisp(self): """Stack energy dispersion (weighted by exposure). Calls `~gammapy.irf.IRFStacker.stack_edisp` """ irf_stacker = IRFStacker( list_aeff=[obs.aeff for obs in self.obs_list], list_livetime=[obs.livetime for obs in self.obs_list], list_edisp=[obs.edisp for obs in self.obs_list], list_low_threshold=[obs.energy_range[0] for obs in self.obs_list], list_high_threshold=[obs.energy_range[1] for obs in self.obs_list], ) irf_stacker.stack_edisp() self.stacked_edisp = irf_stacker.stacked_edisp
[docs] def stack_gti(self): """Stack GTI """ first_gti = self.obs_list[0].gti if first_gti is None: self.stacked_gti = None else: stack_gti = first_gti.copy() for obs in self.obs_list[1:]: stack_gti = stack_gti.stack(obs.gti) self.stacked_gti = stack_gti.union()
[docs] def stack_obs(self): """Create stacked `~gammapy.spectrum.SpectrumDatasetOnOff`.""" self.stacked_obs = SpectrumDatasetOnOff( counts=self.stacked_on_vector, counts_off=self.stacked_off_vector, aeff=self.stacked_aeff, edisp=self.stacked_edisp, livetime=self.total_livetime, mask_safe=np.logical_not(self.stacked_quality), acceptance=self.stacked_on_vector.backscal, acceptance_off=self.stacked_off_vector.backscal, obs_id=[obs.obs_id for obs in self.obs_list], gti=self.stacked_gti, )