Source code for gammapy.irf.edisp_map

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from scipy.interpolate import interp1d
from gammapy.maps import Map, MapAxis, MapCoord, WcsGeom
from gammapy.utils.random import InverseCDFSampler, get_random_state
from .edisp_kernel import EDispKernel
from .irf_map import IRFMap

__all__ = ["EDispMap", "EDispKernelMap"]


def get_overlap_fraction(energy_axis, energy_axis_true):
    a_min = energy_axis.edges[:-1]
    a_max = energy_axis.edges[1:]

    b_min = energy_axis_true.edges[:-1][:, np.newaxis]
    b_max = energy_axis_true.edges[1:][:, np.newaxis]

    xmin = np.fmin(a_max, b_max)
    xmax = np.fmax(a_min, b_min)
    return np.clip(xmin - xmax, 0, np.inf) / (b_max - b_min)


[docs]class EDispMap(IRFMap): """Energy dispersion map. Parameters ---------- edisp_map : `~gammapy.maps.Map` the input Energy Dispersion Map. Should be a Map with 2 non spatial axes. migra and true energy axes should be given in this specific order. exposure_map : `~gammapy.maps.Map`, optional Associated exposure map. Needs to have a consistent map geometry. Examples -------- :: import numpy as np from astropy import units as u from astropy.coordinates import SkyCoord from gammapy.maps import WcsGeom, MapAxis from gammapy.irf import EnergyDispersion2D, EffectiveAreaTable2D from gammapy.cube import make_edisp_map, make_map_exposure_true_energy # Define energy dispersion map geometry energy_axis = MapAxis.from_edges(np.logspace(-1, 1, 4), unit="TeV", name="energy") migra_axis = MapAxis.from_edges(np.linspace(0, 3, 100), name="migra") pointing = SkyCoord(0, 0, unit="deg") max_offset = 4 * u.deg geom = WcsGeom.create( binsz=0.25 * u.deg, width=10 * u.deg, skydir=pointing, axes=[migra_axis, energy_axis], ) # Extract EnergyDispersion2D from CTA 1DC IRF filename = "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits" edisp2D = EnergyDispersion2D.read(filename, hdu="ENERGY DISPERSION") aeff2d = EffectiveAreaTable2D.read(filename, hdu="EFFECTIVE AREA") # Create the exposure map exposure_geom = geom.to_image().to_cube([energy_axis]) exposure_map = make_map_exposure_true_energy(pointing, "1 h", aeff2d, exposure_geom) # create the EDispMap for the specified pointing edisp_map = make_edisp_map(edisp2D, pointing, geom, max_offset, exposure_map) # Get an Energy Dispersion (1D) at any position in the image pos = SkyCoord(2.0, 2.5, unit="deg") e_reco = np.logspace(-1.0, 1.0, 10) * u.TeV edisp = edisp_map.get_edisp_kernel(pos=pos, e_reco=e_reco) # Write map to disk edisp_map.write("edisp_map.fits") """ _hdu_name = "edisp" def __init__(self, edisp_map, exposure_map): if edisp_map.geom.axes[1].name.upper() != "ENERGY_TRUE": raise ValueError("Incorrect energy axis position in input Map") if edisp_map.geom.axes[0].name.upper() != "MIGRA": raise ValueError("Incorrect migra axis position in input Map") super().__init__(irf_map=edisp_map, exposure_map=exposure_map) @property def edisp_map(self): return self._irf_map @edisp_map.setter def edisp_map(self, value): self._irf_map = value
[docs] def get_edisp_kernel(self, position, e_reco): """Get energy dispersion at a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` the target position. Should be a single coordinates e_reco : `~astropy.units.Quantity` Reconstructed energy axis binning Returns ------- edisp : `~gammapy.irf.EnergyDispersion` the energy dispersion (i.e. rmf object) """ if position.size != 1: raise ValueError( "EnergyDispersion can be extracted at one single position only." ) energy_axis = self.edisp_map.geom.get_axis_by_name("energy_true") migra_axis = self.edisp_map.geom.get_axis_by_name("migra") coords = { "skycoord": position, "migra": migra_axis.center.reshape((-1, 1, 1, 1)), "energy_true": energy_axis.center.reshape((1, -1, 1, 1)), } # Interpolate in the EDisp map. Squeeze to remove dimensions of length 1 values = self.edisp_map.interp_by_coord(coords) * self.edisp_map.unit edisp_values = values[:, :, 0, 0] data = [] for idx, e_true in enumerate(energy_axis.center): # migration value of e_reco bounds migra = e_reco / e_true cumsum = np.insert(edisp_values[:, idx], 0, 0).cumsum() with np.errstate(invalid="ignore"): cumsum = np.nan_to_num(cumsum / cumsum[-1]) f = interp1d( migra_axis.edges.value, cumsum, kind="linear", bounds_error=False, fill_value=(0, 1), ) # We compute the difference between 2 successive bounds in e_reco # to get integral over reco energy bin integral = np.diff(np.clip(f(migra), a_min=0, a_max=1)) data.append(integral) return EDispKernel( e_true_lo=energy_axis.edges[:-1], e_true_hi=energy_axis.edges[1:], e_reco_lo=e_reco[:-1], e_reco_hi=e_reco[1:], data=data, )
[docs] @classmethod def from_geom(cls, geom): """Create edisp map from geom. By default a diagonal edisp matrix is created. Parameters ---------- geom : `Geom` Edisp map geometry. Returns ------- edisp_map : `EDispMap` Energy dispersion map. """ if "energy_true" not in [ax.name for ax in geom.axes]: raise ValueError("EDispMap requires true energy axis") geom_exposure_edisp = geom.squash(axis="migra") exposure_edisp = Map.from_geom(geom_exposure_edisp, unit="m2 s") migra_axis = geom.get_axis_by_name("migra") edisp_map = Map.from_geom(geom, unit="") migra_0 = migra_axis.coord_to_pix(1) # distribute over two pixels migra = geom.get_idx()[2] data = np.abs(migra - migra_0) data = np.where(data < 1, 1 - data, 0) edisp_map.quantity = data return cls(edisp_map, exposure_edisp)
[docs] def sample_coord(self, map_coord, random_state=0): """Apply the energy dispersion corrections on the coordinates of a set of simulated events. Parameters ---------- map_coord : `~gammapy.maps.MapCoord` object. Sequence of coordinates and energies of sampled events. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- `~gammapy.maps.MapCoord`. Sequence of Edisp-corrected coordinates of the input map_coord map. """ random_state = get_random_state(random_state) migra_axis = self.edisp_map.geom.get_axis_by_name("migra") coord = { "skycoord": map_coord.skycoord.reshape(-1, 1), "energy_true": map_coord["energy_true"].reshape(-1, 1), "migra": migra_axis.center, } pdf_edisp = self.edisp_map.interp_by_coord(coord) sample_edisp = InverseCDFSampler(pdf_edisp, axis=1, random_state=random_state) pix_edisp = sample_edisp.sample_axis() migra = migra_axis.pix_to_coord(pix_edisp) energy_reco = map_coord["energy_true"] * migra return MapCoord.create({"skycoord": map_coord.skycoord, "energy": energy_reco})
[docs] @classmethod def from_diagonal_response(cls, energy_axis_true, migra_axis=None): """Create an allsky EDisp map with diagonal response. Parameters ---------- energy_axis_true : `MapAxis` True energy axis migra_axis : `MapAxis` Migra axis Returns ------- edisp_map : `EDispMap` Energy dispersion map. """ migra_res = 1e-5 migra_axis_default = MapAxis.from_bounds( 1 - migra_res, 1 + migra_res, nbin=3, name="migra", node_type="edges" ) migra_axis = migra_axis or migra_axis_default geom = WcsGeom.create( npix=(2, 1), proj="CAR", binsz=180, axes=[migra_axis, energy_axis_true] ) return cls.from_geom(geom)
[docs] def to_edisp_kernel_map(self, energy_axis): """Convert to map with edisp kernels Parameters ---------- e_reco : `MapAxis` Reconstructed enrgy axis. Returns ------- edisp : `EDispKernelMap` Energy dispersion kernel map. """ axis = 0 energy_axis_true = self.edisp_map.geom.get_axis_by_name("energy_true") migra_axis = self.edisp_map.geom.get_axis_by_name("migra") data = [] for idx, e_true in enumerate(energy_axis_true.center): # migration value of e_reco bounds migra = energy_axis.edges / e_true edisp_e_true = self.edisp_map.slice_by_idx({"energy_true": idx}) cumsum = np.insert(edisp_e_true.data, 0, 0, axis=axis).cumsum(axis=axis) with np.errstate(invalid="ignore"): cumsum = np.nan_to_num(cumsum / cumsum[slice(-2, -1)]) f = interp1d( migra_axis.edges.value, cumsum, kind="linear", bounds_error=False, fill_value=(0, 1), axis=axis, ) integral = np.diff(np.clip(f(migra), a_min=0, a_max=1), axis=axis) data.append(integral) data = np.stack(data) geom_image = self.edisp_map.geom.to_image() geom = geom_image.to_cube([energy_axis, energy_axis_true]) edisp_kernel_map = Map.from_geom(geom=geom, data=data) return EDispKernelMap( edisp_kernel_map=edisp_kernel_map, exposure_map=self.exposure_map )
[docs]class EDispKernelMap(IRFMap): """Energy dispersion kernel map. Parameters ---------- edisp_kernel_map : `~gammapy.maps.Map` The input energy dispersion kernel map. Should be a Map with 2 non spatial axes. Reconstructed and and true energy axes should be given in this specific order. exposure_map : `~gammapy.maps.Map`, optional Associated exposure map. Needs to have a consistent map geometry. """ _hdu_name = "edisp" def __init__(self, edisp_kernel_map, exposure_map): if edisp_kernel_map.geom.axes[1].name.upper() != "ENERGY_TRUE": raise ValueError("Incorrect energy axis position in input Map") if edisp_kernel_map.geom.axes[0].name.upper() != "ENERGY": raise ValueError("Incorrect migra axis position in input Map") super().__init__(irf_map=edisp_kernel_map, exposure_map=exposure_map) @property def edisp_map(self): return self._irf_map @edisp_map.setter def edisp_map(self, value): self._irf_map = value
[docs] @classmethod def from_geom(cls, geom): """Create edisp map from geom. By default a diagonal edisp matrix is created. Parameters ---------- geom : `Geom` Edisp map geometry. Returns ------- edisp_map : `EDispKernelMap` Energy dispersion kernel map. """ axis_names = [ax.name for ax in geom.axes] if "energy_true" not in axis_names: raise ValueError("EDispKernelMap requires true energy axis") if "energy" not in axis_names: raise ValueError("EDispKernelMap requires energy axis") geom_exposure = geom.squash(axis="energy") exposure = Map.from_geom(geom_exposure, unit="m2 s") energy_axis = geom.get_axis_by_name("energy") energy_axis_true = geom.get_axis_by_name("energy_true") data = get_overlap_fraction(energy_axis, energy_axis_true) edisp_kernel_map = Map.from_geom(geom, unit="") edisp_kernel_map.quantity += data[:, :, np.newaxis, np.newaxis] return cls(edisp_kernel_map=edisp_kernel_map, exposure_map=exposure)
[docs] def get_edisp_kernel(self, position): """Get energy dispersion at a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` the target position. Should be a single coordinates Returns ------- edisp : `~gammapy.irf.EnergyDispersion` the energy dispersion (i.e. rmf object) """ energy_true_axis = self.edisp_map.geom.get_axis_by_name("energy_true") energy_axis = self.edisp_map.geom.get_axis_by_name("energy") coords = { "skycoord": position, "energy": energy_axis.center, "energy_true": energy_true_axis.center.reshape((-1, 1)), } data = self.edisp_map.get_by_coord(coords) return EDispKernel( e_true_lo=energy_true_axis.edges[:-1], e_true_hi=energy_true_axis.edges[1:], e_reco_lo=energy_axis.edges[:-1], e_reco_hi=energy_axis.edges[1:], data=data, )
[docs] @classmethod def from_diagonal_response(cls, energy_axis, energy_axis_true): """Create an all-sky energy dispersion map with diagonal response. Parameters ---------- energy_axis : `MapAxis` Energy axis. energy_axis_true : `MapAxis` True energy axis Returns ------- edisp_map : `EDispKernelMap` Energy dispersion kernel map. """ geom = WcsGeom.create( npix=(2, 1), proj="CAR", binsz=180, axes=[energy_axis, energy_axis_true] ) return cls.from_geom(geom)