Source code for gammapy.irf.psf_gauss

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import numpy as np
from astropy.io import fits
from astropy.table import Table
from astropy.units import Quantity, Unit
from astropy.coordinates import Angle
from astropy.convolution import Gaussian2DKernel
from astropy.stats import gaussian_fwhm_to_sigma
from ..extern.validator import validate_physical_type
from ..utils.array import array_stats_str
from ..utils.energy import Energy, EnergyBounds
from ..utils.scripts import make_path
from ..utils.gauss import MultiGauss2D
from ..utils.interpolation import ScaledRegularGridInterpolator
from .psf_3d import PSF3D
from . import EnergyDependentTablePSF

__all__ = ["EnergyDependentMultiGaussPSF"]

log = logging.getLogger(__name__)


[docs]class EnergyDependentMultiGaussPSF: """ Triple Gauss analytical PSF depending on energy and theta. To evaluate the PSF call the ``to_energy_dependent_table_psf`` or ``psf_at_energy_and_theta`` methods. Parameters ---------- energy_lo : `~astropy.units.Quantity` Lower energy boundary of the energy bin. energy_hi : `~astropy.units.Quantity` Upper energy boundary of the energy bin. theta : `~astropy.units.Quantity` Center values of the theta bins. sigmas : list of 'numpy.ndarray' Triple Gauss sigma parameters, where every entry is a two dimensional 'numpy.ndarray' containing the sigma value for every given energy and theta. norms : list of 'numpy.ndarray' Triple Gauss norm parameters, where every entry is a two dimensional 'numpy.ndarray' containing the norm value for every given energy and theta. Norm corresponds to the value of the Gaussian at theta = 0. energy_thresh_lo : `~astropy.units.Quantity` Lower save energy threshold of the psf. energy_thresh_hi : `~astropy.units.Quantity` Upper save energy threshold of the psf. Examples -------- Plot R68 of the PSF vs. theta and energy: .. plot:: :include-source: import matplotlib.pyplot as plt from gammapy.irf import EnergyDependentMultiGaussPSF filename = '$GAMMAPY_DATA/tests/unbundled/irfs/psf.fits' psf = EnergyDependentMultiGaussPSF.read(filename, hdu='POINT SPREAD FUNCTION') psf.plot_containment(0.68, show_safe_energy=False) plt.show() """ def __init__( self, energy_lo, energy_hi, theta, sigmas, norms, energy_thresh_lo=Quantity(0.1, "TeV"), energy_thresh_hi=Quantity(100, "TeV"), ): # Validate input validate_physical_type("energy_lo", energy_lo, "energy") validate_physical_type("energy_hi", energy_hi, "energy") validate_physical_type("theta", theta, "angle") validate_physical_type("energy_thresh_lo", energy_thresh_lo, "energy") validate_physical_type("energy_thresh_hi", energy_thresh_hi, "energy") # Set attributes self.energy_lo = energy_lo.to("TeV") self.energy_hi = energy_hi.to("TeV") ebounds = EnergyBounds.from_lower_and_upper_bounds( self.energy_lo, self.energy_hi ) self.energy = ebounds.log_centers self.theta = theta.to("deg") sigmas[0][sigmas[0] == 0] = 1 sigmas[1][sigmas[1] == 0] = 1 sigmas[2][sigmas[2] == 0] = 1 self.sigmas = sigmas self.norms = norms self.energy_thresh_lo = energy_thresh_lo.to("TeV") self.energy_thresh_hi = energy_thresh_hi.to("TeV") self._interp_norms = self._setup_interpolators(self.norms) self._interp_sigmas = self._setup_interpolators(self.sigmas) def _setup_interpolators(self, values_list): interps = [] for values in values_list: interp = ScaledRegularGridInterpolator( points=(self.theta, self.energy), values=values ) interps.append(interp) return interps
[docs] @classmethod def read(cls, filename, hdu="PSF_2D_GAUSS"): """Create `EnergyDependentMultiGaussPSF` from FITS file. Parameters ---------- filename : str File name """ filename = make_path(filename) with fits.open(str(filename), memmap=False) as hdulist: psf = cls.from_fits(hdulist[hdu]) return psf
[docs] @classmethod def from_fits(cls, hdu): """Create `EnergyDependentMultiGaussPSF` from HDU list. Parameters ---------- hdu : `~astropy.io.fits.BintableHDU` HDU """ energy_lo = Quantity(hdu.data["ENERG_LO"][0], "TeV") energy_hi = Quantity(hdu.data["ENERG_HI"][0], "TeV") theta = Angle(hdu.data["THETA_LO"][0], "deg") # Get sigmas shape = (len(theta), len(energy_hi)) sigmas = [] for key in ["SIGMA_1", "SIGMA_2", "SIGMA_3"]: sigma = hdu.data[key].reshape(shape).copy() sigmas.append(sigma) # Get amplitudes norms = [] for key in ["SCALE", "AMPL_2", "AMPL_3"]: norm = hdu.data[key].reshape(shape).copy() norms.append(norm) opts = {} try: opts["energy_thresh_lo"] = Quantity(hdu.header["LO_THRES"], "TeV") opts["energy_thresh_hi"] = Quantity(hdu.header["HI_THRES"], "TeV") except KeyError: pass return cls(energy_lo, energy_hi, theta, sigmas, norms, **opts)
[docs] def to_fits(self): """ Convert psf table data to FITS hdu list. Returns ------- hdu_list : `~astropy.io.fits.HDUList` PSF in HDU list format. """ # Set up data names = [ "ENERG_LO", "ENERG_HI", "THETA_LO", "THETA_HI", "SCALE", "SIGMA_1", "AMPL_2", "SIGMA_2", "AMPL_3", "SIGMA_3", ] units = ["TeV", "TeV", "deg", "deg", "", "deg", "", "deg", "", "deg"] data = [ self.energy_lo, self.energy_hi, self.theta, self.theta, self.norms[0], self.sigmas[0], self.norms[1], self.sigmas[1], self.norms[2], self.sigmas[2], ] table = Table() for name_, data_, unit_ in zip(names, data, units): table[name_] = [data_] table[name_].unit = unit_ # Create hdu and hdu list hdu = fits.BinTableHDU(table) hdu.header["LO_THRES"] = self.energy_thresh_lo.value hdu.header["HI_THRES"] = self.energy_thresh_hi.value return fits.HDUList([fits.PrimaryHDU(), hdu])
[docs] def write(self, filename, *args, **kwargs): """Write PSF to FITS file. Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments. """ self.to_fits().writeto(filename, *args, **kwargs)
[docs] def psf_at_energy_and_theta(self, energy, theta): """ Get `~gammapy.image.models.MultiGauss2D` model for given energy and theta. No interpolation is used. Parameters ---------- energy : `~astropy.units.Quantity` Energy at which a PSF is requested. theta : `~astropy.coordinates.Angle` Offset angle at which a PSF is requested. Returns ------- psf : `~gammapy.morphology.MultiGauss2D` Multigauss PSF object. """ energy = Energy(energy) theta = Quantity(theta) pars = {} for name, interp_norm in zip(["scale", "A_2", "A_3"], self._interp_norms): pars[name] = interp_norm((theta, energy)) for idx, interp_sigma in enumerate(self._interp_sigmas): pars["sigma_{}".format(idx + 1)] = interp_sigma((theta, energy)) psf = HESSMultiGaussPSF(pars) return psf.to_MultiGauss2D(normalize=True)
[docs] def containment_radius(self, energy, theta, fraction=0.68): """Compute containment for all energy and theta values""" # This is a false positive from pylint # See https://github.com/PyCQA/pylint/issues/2435 energies = Energy(energy).flatten() # pylint:disable=assignment-from-no-return thetas = Angle(theta).flatten() radius = np.empty((theta.size, energy.size)) for idx, energy in enumerate(energies): for jdx, theta in enumerate(thetas): try: psf = self.psf_at_energy_and_theta(energy, theta) radius[jdx, idx] = psf.containment_radius(fraction) except ValueError: log.debug( "Computing containment failed for E = {:.2f}" " and Theta={:.2f}".format(energy, theta) ) log.debug("Sigmas: {} Norms: {}".format(psf.sigmas, psf.norms)) radius[jdx, idx] = np.nan return Angle(radius, "deg")
[docs] def plot_containment( self, fraction=0.68, ax=None, show_safe_energy=False, add_cbar=True, **kwargs ): """ Plot containment image with energy and theta axes. Parameters ---------- fraction : float Containment fraction between 0 and 1. add_cbar : bool Add a colorbar """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax energy = self.energy_hi offset = self.theta # Set up and compute data containment = self.containment_radius(energy, offset, fraction) # plotting defaults kwargs.setdefault("cmap", "GnBu") kwargs.setdefault("vmin", np.nanmin(containment.value)) kwargs.setdefault("vmax", np.nanmax(containment.value)) # Plotting x = energy.value y = offset.value caxes = ax.pcolormesh(x, y, containment.value, **kwargs) # Axes labels and ticks, colobar ax.semilogx() ax.set_ylabel("Offset ({unit})".format(unit=offset.unit)) ax.set_xlabel("Energy ({unit})".format(unit=energy.unit)) ax.set_xlim(x.min(), x.max()) ax.set_ylim(y.min(), y.max()) if show_safe_energy: self._plot_safe_energy_range(ax) if add_cbar: label = "Containment radius R{:.0f} ({})".format( 100 * fraction, containment.unit ) ax.figure.colorbar(caxes, ax=ax, label=label) return ax
def _plot_safe_energy_range(self, ax): """add safe energy range lines to the plot""" esafe = self.energy_thresh_lo omin = self.offset.value.min() omax = self.offset.value.max() ax.hlines(y=esafe.value, xmin=omin, xmax=omax) label = "Safe energy threshold: {:3.2f}".format(esafe) ax.text(x=0.1, y=0.9 * esafe.value, s=label, va="top")
[docs] def plot_containment_vs_energy( self, fractions=[0.68, 0.95], thetas=Angle([0, 1], "deg"), ax=None, **kwargs ): """Plot containment fraction as a function of energy. """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax energy = Energy.equal_log_spacing(self.energy_lo[0], self.energy_hi[-1], 100) for theta in thetas: for fraction in fractions: radius = self.containment_radius(energy, theta, fraction).squeeze() label = "{} deg, {:.1f}%".format(theta, 100 * fraction) ax.plot(energy.value, radius.value, label=label) ax.semilogx() ax.legend(loc="best") ax.set_xlabel("Energy (TeV)") ax.set_ylabel("Containment radius (deg)")
[docs] def peek(self, figsize=(15, 5)): """Quick-look summary plots.""" import matplotlib.pyplot as plt fig, axes = plt.subplots(nrows=1, ncols=3, figsize=figsize) self.plot_containment(fraction=0.68, ax=axes[0]) self.plot_containment(fraction=0.95, ax=axes[1]) self.plot_containment_vs_energy(ax=axes[2]) # TODO: implement this plot # psf = self.psf_at_energy_and_theta(energy='1 TeV', theta='1 deg') # psf.plot_components(ax=axes[2]) plt.tight_layout()
[docs] def info( self, fractions=[0.68, 0.95], energies=Quantity([1.0, 10.0], "TeV"), thetas=Quantity([0.0], "deg"), ): """ Print PSF summary info. The containment radius for given fraction, energies and thetas is computed and printed on the command line. Parameters ---------- fractions : list Containment fraction to compute containment radius for. energies : `~astropy.units.Quantity` Energies to compute containment radius for. thetas : `~astropy.units.Quantity` Thetas to compute containment radius for. Returns ------- ss : string Formatted string containing the summary info. """ ss = "\nSummary PSF info\n" ss += "----------------\n" # Summarise data members ss += array_stats_str(self.theta.to("deg"), "Theta") ss += array_stats_str(self.energy_hi, "Energy hi") ss += array_stats_str(self.energy_lo, "Energy lo") ss += "Safe energy threshold lo: {:6.3f}\n".format(self.energy_thresh_lo) ss += "Safe energy threshold hi: {:6.3f}\n".format(self.energy_thresh_hi) for fraction in fractions: containment = self.containment_radius(energies, thetas, fraction) for i, energy in enumerate(energies): for j, theta in enumerate(thetas): radius = containment[j, i] ss += ( "{:2.0f}% containment radius at theta = {} and " "E = {:4.1f}: {:5.8f}\n" "".format(100 * fraction, theta, energy, radius) ) return ss
[docs] def to_energy_dependent_table_psf(self, theta=None, rad=None, exposure=None): """ Convert triple Gaussian PSF ot table PSF. Parameters ---------- theta : `~astropy.coordinates.Angle` Offset in the field of view. Default theta = 0 deg rad : `~astropy.coordinates.Angle` Offset from PSF center used for evaluating the PSF on a grid. Default offset = [0, 0.005, ..., 1.495, 1.5] deg. exposure : `~astropy.units.Quantity` Energy dependent exposure. Should be in units equivalent to 'cm^2 s'. Default exposure = 1. Returns ------- tabe_psf : `~gammapy.irf.EnergyDependentTablePSF` Instance of `EnergyDependentTablePSF`. """ # Convert energies to log center energies = self.energy # Defaults and input handling if theta is None: theta = Angle(0, "deg") else: theta = Angle(theta) if rad is None: rad = Angle(np.arange(0, 1.5, 0.005), "deg") else: rad = Angle(rad).to("deg") psf_value = Quantity(np.zeros((energies.size, rad.size)), "deg^-2") for idx, energy in enumerate(energies): psf_gauss = self.psf_at_energy_and_theta(energy, theta) psf_value[idx] = Quantity(psf_gauss(rad), "deg^-2") return EnergyDependentTablePSF( energy=energies, rad=rad, exposure=exposure, psf_value=psf_value )
[docs] def to_psf3d(self, rad): """Create a PSF3D from an analytical PSF. Parameters ---------- rad : `~astropy.units.Quantity` or `~astropy.coordinates.Angle` the array of position errors (rad) on which the PSF3D will be defined Returns ------- psf3d : `~gammapy.irf.PSF3D` the PSF3D. It will be defined on the same energy and offset values than the input psf. """ offsets = self.theta energy = self.energy energy_lo = self.energy_lo energy_hi = self.energy_hi rad_lo = rad[:-1] rad_hi = rad[1:] psf_values = np.zeros( (rad_lo.shape[0], offsets.shape[0], energy_lo.shape[0]) ) * Unit("sr-1") for i, offset in enumerate(offsets): psftable = self.to_energy_dependent_table_psf(offset) psf_values[:, i, :] = psftable.evaluate(energy, 0.5 * (rad_lo + rad_hi)).T return PSF3D( energy_lo, energy_hi, offsets, rad_lo, rad_hi, psf_values, self.energy_thresh_lo, self.energy_thresh_hi, )
class HESSMultiGaussPSF: """Multi-Gauss PSF as represented in the HESS software. The 2D Gaussian is represented as a 1D exponential probability density function per offset angle squared: dp / dtheta**2 = [0]*(exp(-x/(2*[1]*[1]))+[2]*exp(-x/(2*[3]*[3])) @param source: either a dict of a filename The following two parameters control numerical precision / speed. Usually the defaults are fine. @param theta_max: Maximum offset in numerical computations @param npoints: Number of points in numerical computations @param eps: Allowed tolerance on normalization of total P to 1 """ def __init__(self, source): if isinstance(source, dict): # Assume source is a dict with correct format self.pars = source else: # Assume source is a filename with correct format self.pars = self._read_ascii(source) # Scale will be computed from normalization anyways, # so any default is fine here self.pars["scale"] = self.pars.get("scale", 1) # This avoids handling the first PSF as a special case self.pars["A_1"] = self.pars.get("A_1", 1) def _read_ascii(self, filename): """Parse file with parameters.""" fh = open(filename) # .readlines() pars = {} for line in fh: try: key, value = line.strip().split()[:2] if key.startswith("#"): continue else: pars[key] = float(value) except ValueError: pass fh.close() return pars def n_gauss(self): """Count number of Gaussians.""" return len([_ for _ in self.pars.keys() if "sigma" in _]) def dpdtheta2(self, theta2): """dp / dtheta2 at position theta2 = theta ^ 2.""" theta2 = np.asarray(theta2, "f") total = np.zeros_like(theta2) for ii in range(1, self.n_gauss() + 1): A = self.pars["A_{}".format(ii)] sigma = self.pars["sigma_{}".format(ii)] total += A * np.exp(-theta2 / (2 * sigma ** 2)) return self.pars["scale"] * total def to_MultiGauss2D(self, normalize=True): """Use this to compute containment angles and fractions. Note: We have to set norm = 2 * A * sigma ^ 2, because in MultiGauss2D norm represents the integral, and in HESS A represents the amplitude at 0. """ sigmas, norms = [], [] for ii in range(1, self.n_gauss() + 1): A = self.pars["A_{}".format(ii)] sigma = self.pars["sigma_{}".format(ii)] norm = self.pars["scale"] * 2 * A * sigma ** 2 sigmas.append(sigma) norms.append(norm) m = MultiGauss2D(sigmas, norms) if normalize: m.normalize() return m def multi_gauss_psf_kernel(psf_parameters, BINSZ=0.02, NEW_BINSZ=0.02, **kwargs): """Create multi-Gauss PSF kernel. The Gaussian PSF components are specified via the amplitude at the center and the FWHM. See the example for the exact format. Parameters ---------- psf_parameters : dict PSF parameters BINSZ : float (0.02) Pixel size used for the given parameters in deg. NEW_BINSZ : float (0.02) New pixel size in deg. USed to change the resolution of the PSF. Returns ------- psf_kernel : `astropy.convolution.Kernel2D` PSF kernel Examples -------- >>> psf_pars = dict() >>> psf_pars['psf1'] = dict(ampl=1, fwhm=2.5) >>> psf_pars['psf2'] = dict(ampl=0.06, fwhm=11.14) >>> psf_pars['psf3'] = dict(ampl=0.47, fwhm=5.16) >>> psf_kernel = multi_gauss_psf_kernel(psf_pars, x_size=51) """ psf = None for ii in range(1, 4): # Convert sigma and amplitude pars = psf_parameters["psf{}".format(ii)] sigma = gaussian_fwhm_to_sigma * pars["fwhm"] * BINSZ / NEW_BINSZ ampl = 2 * np.pi * sigma ** 2 * pars["ampl"] if psf is None: psf = float(ampl) * Gaussian2DKernel(sigma, **kwargs) else: psf += float(ampl) * Gaussian2DKernel(sigma, **kwargs) psf.normalize() return psf