Source code for gammapy.irf.psf_gauss

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import numpy as np
from astropy.convolution import Gaussian2DKernel
from astropy.coordinates import Angle
from astropy.io import fits
from astropy.stats import gaussian_fwhm_to_sigma
from astropy.table import Table
from astropy.units import Quantity, Unit
from gammapy.maps import MapAxis
from gammapy.utils.array import array_stats_str
from gammapy.utils.gauss import MultiGauss2D
from gammapy.utils.interpolation import ScaledRegularGridInterpolator
from gammapy.utils.scripts import make_path
from .psf_3d import PSF3D
from .psf_table import EnergyDependentTablePSF

__all__ = ["EnergyDependentMultiGaussPSF"]

log = logging.getLogger(__name__)


[docs]class EnergyDependentMultiGaussPSF: """ Triple Gauss analytical PSF depending on energy and theta. To evaluate the PSF call the ``to_energy_dependent_table_psf`` or ``psf_at_energy_and_theta`` methods. Parameters ---------- energy_lo : `~astropy.units.Quantity` Lower energy boundary of the energy bin. energy_hi : `~astropy.units.Quantity` Upper energy boundary of the energy bin. theta : `~astropy.units.Quantity` Center values of the theta bins. sigmas : list of 'numpy.ndarray' Triple Gauss sigma parameters, where every entry is a two dimensional 'numpy.ndarray' containing the sigma value for every given energy and theta. norms : list of 'numpy.ndarray' Triple Gauss norm parameters, where every entry is a two dimensional 'numpy.ndarray' containing the norm value for every given energy and theta. Norm corresponds to the value of the Gaussian at theta = 0. energy_thresh_lo : `~astropy.units.Quantity` Lower save energy threshold of the psf. energy_thresh_hi : `~astropy.units.Quantity` Upper save energy threshold of the psf. Examples -------- Plot R68 of the PSF vs. theta and energy: .. plot:: :include-source: import matplotlib.pyplot as plt from gammapy.irf import EnergyDependentMultiGaussPSF filename = '$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits' psf = EnergyDependentMultiGaussPSF.read(filename, hdu='POINT SPREAD FUNCTION') psf.plot_containment(0.68, show_safe_energy=False) plt.show() """ def __init__( self, energy_lo, energy_hi, theta, sigmas, norms, energy_thresh_lo="0.1 TeV", energy_thresh_hi="100 TeV", ): self.energy_lo = Quantity(energy_lo, "TeV") self.energy_hi = Quantity(energy_hi, "TeV") self.energy = np.sqrt(self.energy_hi * self.energy_lo) self.theta = Quantity(theta, "deg") sigmas[0][sigmas[0] == 0] = 1 sigmas[1][sigmas[1] == 0] = 1 sigmas[2][sigmas[2] == 0] = 1 self.sigmas = sigmas self.norms = norms self.energy_thresh_lo = Quantity(energy_thresh_lo, "TeV") self.energy_thresh_hi = Quantity(energy_thresh_hi, "TeV") self._interp_norms = self._setup_interpolators(self.norms) self._interp_sigmas = self._setup_interpolators(self.sigmas) def _setup_interpolators(self, values_list): interps = [] for values in values_list: interp = ScaledRegularGridInterpolator( points=(self.theta, self.energy), values=values ) interps.append(interp) return interps
[docs] @classmethod def read(cls, filename, hdu="PSF_2D_GAUSS"): """Create `EnergyDependentMultiGaussPSF` from FITS file. Parameters ---------- filename : str File name """ with fits.open(make_path(filename), memmap=False) as hdulist: return cls.from_fits(hdulist[hdu])
[docs] @classmethod def from_fits(cls, hdu): """Create `EnergyDependentMultiGaussPSF` from HDU list. Parameters ---------- hdu : `~astropy.io.fits.BinTableHDU` HDU """ energy_lo = Quantity(hdu.data["ENERG_LO"][0], "TeV") energy_hi = Quantity(hdu.data["ENERG_HI"][0], "TeV") theta = Angle(hdu.data["THETA_LO"][0], "deg") # Get sigmas shape = (len(theta), len(energy_hi)) sigmas = [] for key in ["SIGMA_1", "SIGMA_2", "SIGMA_3"]: sigma = hdu.data[key].reshape(shape).copy() sigmas.append(sigma) # Get amplitudes norms = [] for key in ["SCALE", "AMPL_2", "AMPL_3"]: norm = hdu.data[key].reshape(shape).copy() norms.append(norm) opts = {} try: opts["energy_thresh_lo"] = Quantity(hdu.header["LO_THRES"], "TeV") opts["energy_thresh_hi"] = Quantity(hdu.header["HI_THRES"], "TeV") except KeyError: pass return cls(energy_lo, energy_hi, theta, sigmas, norms, **opts)
[docs] def to_fits(self): """ Convert psf table data to FITS hdu list. Returns ------- hdu_list : `~astropy.io.fits.HDUList` PSF in HDU list format. """ # Set up data names = [ "ENERG_LO", "ENERG_HI", "THETA_LO", "THETA_HI", "SCALE", "SIGMA_1", "AMPL_2", "SIGMA_2", "AMPL_3", "SIGMA_3", ] units = ["TeV", "TeV", "deg", "deg", "", "deg", "", "deg", "", "deg"] data = [ self.energy_lo, self.energy_hi, self.theta, self.theta, self.norms[0], self.sigmas[0], self.norms[1], self.sigmas[1], self.norms[2], self.sigmas[2], ] table = Table() for name_, data_, unit_ in zip(names, data, units): table[name_] = [data_] table[name_].unit = unit_ # Create hdu and hdu list hdu = fits.BinTableHDU(table) hdu.header["LO_THRES"] = self.energy_thresh_lo.value hdu.header["HI_THRES"] = self.energy_thresh_hi.value return fits.HDUList([fits.PrimaryHDU(), hdu])
[docs] def write(self, filename, *args, **kwargs): """Write PSF to FITS file. Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments. """ self.to_fits().writeto(filename, *args, **kwargs)
[docs] def psf_at_energy_and_theta(self, energy, theta): """ Get `~gammapy.modeling.models.MultiGauss2D` model for given energy and theta. No interpolation is used. Parameters ---------- energy : `~astropy.units.Quantity` Energy at which a PSF is requested. theta : `~astropy.coordinates.Angle` Offset angle at which a PSF is requested. Returns ------- psf : `~gammapy.morphology.MultiGauss2D` Multigauss PSF object. """ energy = Quantity(energy) theta = Quantity(theta) pars = {} for name, interp_norm in zip(["scale", "A_2", "A_3"], self._interp_norms): pars[name] = interp_norm((theta, energy)) for idx, interp_sigma in enumerate(self._interp_sigmas): pars[f"sigma_{idx + 1}"] = interp_sigma((theta, energy)) psf = HESSMultiGaussPSF(pars) return psf.to_MultiGauss2D(normalize=True)
[docs] def containment_radius(self, energy, theta, fraction=0.68): """Compute containment for all energy and theta values""" # This is a false positive from pylint # See https://github.com/PyCQA/pylint/issues/2435 energies = Quantity( energy ).flatten() # pylint:disable=assignment-from-no-return thetas = Angle(theta).flatten() radius = np.empty((theta.size, energy.size)) for idx, energy in enumerate(energies): for jdx, theta in enumerate(thetas): try: psf = self.psf_at_energy_and_theta(energy, theta) radius[jdx, idx] = psf.containment_radius(fraction) except ValueError: log.debug( f"Computing containment failed for energy = {energy:.2f}" f" and theta={theta:.2f}" ) log.debug(f"Sigmas: {psf.sigmas} Norms: {psf.norms}") radius[jdx, idx] = np.nan return Angle(radius, "deg")
[docs] def plot_containment( self, fraction=0.68, ax=None, show_safe_energy=False, add_cbar=True, **kwargs ): """ Plot containment image with energy and theta axes. Parameters ---------- fraction : float Containment fraction between 0 and 1. add_cbar : bool Add a colorbar """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax energy = self.energy_hi offset = self.theta # Set up and compute data containment = self.containment_radius(energy, offset, fraction) # plotting defaults kwargs.setdefault("cmap", "GnBu") kwargs.setdefault("vmin", np.nanmin(containment.value)) kwargs.setdefault("vmax", np.nanmax(containment.value)) # Plotting x = energy.value y = offset.value caxes = ax.pcolormesh(x, y, containment.value, **kwargs) # Axes labels and ticks, colobar ax.semilogx() ax.set_ylabel(f"Offset ({offset.unit})") ax.set_xlabel(f"Energy ({energy.unit})") ax.set_xlim(x.min(), x.max()) ax.set_ylim(y.min(), y.max()) if show_safe_energy: self._plot_safe_energy_range(ax) if add_cbar: label = f"Containment radius R{100 * fraction:.0f} ({containment.unit})" ax.figure.colorbar(caxes, ax=ax, label=label) return ax
def _plot_safe_energy_range(self, ax): """add safe energy range lines to the plot""" esafe = self.energy_thresh_lo omin = self.offset.value.min() omax = self.offset.value.max() ax.hlines(y=esafe.value, xmin=omin, xmax=omax) label = f"Safe energy threshold: {esafe:3.2f}" ax.text(x=0.1, y=0.9 * esafe.value, s=label, va="top")
[docs] def plot_containment_vs_energy( self, fractions=[0.68, 0.95], thetas=Angle([0, 1], "deg"), ax=None, **kwargs ): """Plot containment fraction as a function of energy. """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax energy = MapAxis.from_energy_bounds( self.energy_lo[0], self.energy_hi[-1], 100 ).edges for theta in thetas: for fraction in fractions: radius = self.containment_radius(energy, theta, fraction).squeeze() label = f"{theta.deg} deg, {100 * fraction:.1f}%" ax.plot(energy.value, radius.value, label=label) ax.semilogx() ax.legend(loc="best") ax.set_xlabel("Energy (TeV)") ax.set_ylabel("Containment radius (deg)")
[docs] def peek(self, figsize=(15, 5)): """Quick-look summary plots.""" import matplotlib.pyplot as plt fig, axes = plt.subplots(nrows=1, ncols=3, figsize=figsize) self.plot_containment(fraction=0.68, ax=axes[0]) self.plot_containment(fraction=0.95, ax=axes[1]) self.plot_containment_vs_energy(ax=axes[2]) # TODO: implement this plot # psf = self.psf_at_energy_and_theta(energy='1 TeV', theta='1 deg') # psf.plot_components(ax=axes[2]) plt.tight_layout()
[docs] def info( self, fractions=[0.68, 0.95], energies=Quantity([1.0, 10.0], "TeV"), thetas=Quantity([0.0], "deg"), ): """ Print PSF summary info. The containment radius for given fraction, energies and thetas is computed and printed on the command line. Parameters ---------- fractions : list Containment fraction to compute containment radius for. energies : `~astropy.units.Quantity` Energies to compute containment radius for. thetas : `~astropy.units.Quantity` Thetas to compute containment radius for. Returns ------- ss : string Formatted string containing the summary info. """ ss = "\nSummary PSF info\n" ss += "----------------\n" ss += array_stats_str(self.theta.to("deg"), "Theta") ss += array_stats_str(self.energy_hi, "Energy hi") ss += array_stats_str(self.energy_lo, "Energy lo") ss += f"Safe energy threshold lo: {self.energy_thresh_lo:6.3f}\n" ss += f"Safe energy threshold hi: {self.energy_thresh_hi:6.3f}\n" for fraction in fractions: containment = self.containment_radius(energies, thetas, fraction) for i, energy in enumerate(energies): for j, theta in enumerate(thetas): radius = containment[j, i] ss += ( "{:2.0f}% containment radius at theta = {} and " "E = {:4.1f}: {:5.8f}\n" "".format(100 * fraction, theta, energy, radius) ) return ss
[docs] def to_energy_dependent_table_psf(self, theta=None, rad=None, exposure=None): """ Convert triple Gaussian PSF ot table PSF. Parameters ---------- theta : `~astropy.coordinates.Angle` Offset in the field of view. Default theta = 0 deg rad : `~astropy.coordinates.Angle` Offset from PSF center used for evaluating the PSF on a grid. Default offset = [0, 0.005, ..., 1.495, 1.5] deg. exposure : `~astropy.units.Quantity` Energy dependent exposure. Should be in units equivalent to 'cm^2 s'. Default exposure = 1. Returns ------- tabe_psf : `~gammapy.irf.EnergyDependentTablePSF` Instance of `EnergyDependentTablePSF`. """ # Convert energies to log center energies = self.energy # Defaults and input handling if theta is None: theta = Angle(0, "deg") else: theta = Angle(theta) if rad is None: rad = Angle(np.arange(0, 1.5, 0.005), "deg") else: rad = Angle(rad).to("deg") psf_value = Quantity(np.zeros((energies.size, rad.size)), "deg^-2") for idx, energy in enumerate(energies): psf_gauss = self.psf_at_energy_and_theta(energy, theta) psf_value[idx] = Quantity(psf_gauss(rad), "deg^-2") return EnergyDependentTablePSF( energy=energies, rad=rad, exposure=exposure, psf_value=psf_value )
[docs] def to_psf3d(self, rad=None): """Create a PSF3D from an analytical PSF. Parameters ---------- rad : `~astropy.units.Quantity` or `~astropy.coordinates.Angle` the array of position errors (rad) on which the PSF3D will be defined Returns ------- psf3d : `~gammapy.irf.PSF3D` the PSF3D. It will be defined on the same energy and offset values than the input psf. """ offsets = self.theta energy = self.energy energy_lo = self.energy_lo energy_hi = self.energy_hi if rad is None: rad = np.linspace(0, 0.66, 67) * Unit( "deg" ) # Arbitrary binning of 0.01 in rad rad_lo = rad[:-1] rad_hi = rad[1:] psf_values = np.zeros( (rad_lo.shape[0], offsets.shape[0], energy_lo.shape[0]) ) * Unit("sr-1") for i, offset in enumerate(offsets): psftable = self.to_energy_dependent_table_psf(offset) psf_values[:, i, :] = psftable.evaluate(energy, 0.5 * (rad_lo + rad_hi)).T return PSF3D( energy_lo, energy_hi, offsets, rad_lo, rad_hi, psf_values, self.energy_thresh_lo, self.energy_thresh_hi, )
class HESSMultiGaussPSF: """Multi-Gauss PSF as represented in the HESS software. The 2D Gaussian is represented as a 1D exponential probability density function per offset angle squared: dp / dtheta**2 = [0]*(exp(-x/(2*[1]*[1]))+[2]*exp(-x/(2*[3]*[3])) @param source: either a dict of a filename The following two parameters control numerical precision / speed. Usually the defaults are fine. @param theta_max: Maximum offset in numerical computations @param npoints: Number of points in numerical computations @param eps: Allowed tolerance on normalization of total P to 1 """ def __init__(self, source): if isinstance(source, dict): # Assume source is a dict with correct format self.pars = source else: # Assume source is a filename with correct format self.pars = self._read_ascii(source) # Scale will be computed from normalization anyways, # so any default is fine here self.pars["scale"] = self.pars.get("scale", 1) # This avoids handling the first PSF as a special case self.pars["A_1"] = self.pars.get("A_1", 1) def _read_ascii(self, filename): """Parse file with parameters.""" fh = open(filename) # .readlines() pars = {} for line in fh: try: key, value = line.strip().split()[:2] if key.startswith("#"): continue else: pars[key] = float(value) except ValueError: pass fh.close() return pars def n_gauss(self): """Count number of Gaussians.""" return len([_ for _ in self.pars.keys() if "sigma" in _]) def dpdtheta2(self, theta2): """dp / dtheta2 at position theta2 = theta ^ 2.""" theta2 = np.asarray(theta2, "f") total = np.zeros_like(theta2) for ii in range(1, self.n_gauss() + 1): A = self.pars[f"A_{ii}"] sigma = self.pars[f"sigma_{ii}"] total += A * np.exp(-theta2 / (2 * sigma ** 2)) return self.pars["scale"] * total def to_MultiGauss2D(self, normalize=True): """Use this to compute containment angles and fractions. Note: We have to set norm = 2 * A * sigma ^ 2, because in MultiGauss2D norm represents the integral, and in HESS A represents the amplitude at 0. """ sigmas, norms = [], [] for ii in range(1, self.n_gauss() + 1): A = self.pars[f"A_{ii}"] sigma = self.pars[f"sigma_{ii}"] norm = self.pars["scale"] * 2 * A * sigma ** 2 sigmas.append(sigma) norms.append(norm) m = MultiGauss2D(sigmas, norms) if normalize: m.normalize() return m def multi_gauss_psf_kernel(psf_parameters, BINSZ=0.02, NEW_BINSZ=0.02, **kwargs): """Create multi-Gauss PSF kernel. The Gaussian PSF components are specified via the amplitude at the center and the FWHM. See the example for the exact format. Parameters ---------- psf_parameters : dict PSF parameters BINSZ : float (0.02) Pixel size used for the given parameters in deg. NEW_BINSZ : float (0.02) New pixel size in deg. USed to change the resolution of the PSF. Returns ------- psf_kernel : `astropy.convolution.Kernel2D` PSF kernel Examples -------- >>> psf_pars = dict() >>> psf_pars['psf1'] = dict(ampl=1, fwhm=2.5) >>> psf_pars['psf2'] = dict(ampl=0.06, fwhm=11.14) >>> psf_pars['psf3'] = dict(ampl=0.47, fwhm=5.16) >>> psf_kernel = multi_gauss_psf_kernel(psf_pars, x_size=51) """ psf = None for ii in range(1, 4): # Convert sigma and amplitude pars = psf_parameters[f"psf{ii}"] sigma = gaussian_fwhm_to_sigma * pars["fwhm"] * BINSZ / NEW_BINSZ ampl = 2 * np.pi * sigma ** 2 * pars["ampl"] if psf is None: psf = float(ampl) * Gaussian2DKernel(sigma, **kwargs) else: psf += float(ampl) * Gaussian2DKernel(sigma, **kwargs) psf.normalize() return psf