Source code for gammapy.cube.edisp_map

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from copy import deepcopy
import numpy as np
from scipy.interpolate import interp1d
import astropy.io.fits as fits
from gammapy.irf import EDispKernel
from gammapy.maps import Map, MapAxis, MapCoord, WcsGeom
from gammapy.utils.random import InverseCDFSampler, get_random_state

__all__ = ["make_edisp_map", "EDispMap"]


[docs]def make_edisp_map(edisp, pointing, geom, exposure_map=None): """Make a edisp map for a single observation Expected axes : migra and true energy in this specific order The name of the migra MapAxis is expected to be 'migra' Parameters ---------- edisp : `~gammapy.irf.EnergyDispersion2D` the 2D Energy Dispersion IRF pointing : `~astropy.coordinates.SkyCoord` the pointing direction geom : `~gammapy.maps.Geom` the map geom to be used. It provides the target geometry. rad and true energy axes should be given in this specific order. exposure_map : `~gammapy.maps.Map`, optional the associated exposure map. default is None Returns ------- edispmap : `~gammapy.cube.EDispMap` the resulting EDisp map """ energy_axis = geom.get_axis_by_name("energy") energy = energy_axis.center migra_axis = geom.get_axis_by_name("migra") migra = migra_axis.center # Compute separations with pointing position offset = geom.separation(pointing) # Compute EDisp values edisp_values = edisp.data.evaluate( offset=offset, e_true=energy[:, np.newaxis, np.newaxis, np.newaxis], migra=migra[:, np.newaxis, np.newaxis], ) # Create Map and fill relevant entries data = edisp_values.to_value("") edispmap = Map.from_geom(geom, data=data, unit="") return EDispMap(edispmap, exposure_map)
[docs]class EDispMap: """Energy dispersion map. Parameters ---------- edisp_map : `~gammapy.maps.Map` the input Energy Dispersion Map. Should be a Map with 2 non spatial axes. migra and true energy axes should be given in this specific order. exposure_map : `~gammapy.maps.Map`, optional Associated exposure map. Needs to have a consistent map geometry. Examples -------- :: import numpy as np from astropy import units as u from astropy.coordinates import SkyCoord from gammapy.maps import WcsGeom, MapAxis from gammapy.irf import EnergyDispersion2D, EffectiveAreaTable2D from gammapy.cube import make_edisp_map, make_map_exposure_true_energy # Define energy dispersion map geometry energy_axis = MapAxis.from_edges(np.logspace(-1, 1, 4), unit="TeV", name="energy") migra_axis = MapAxis.from_edges(np.linspace(0, 3, 100), name="migra") pointing = SkyCoord(0, 0, unit="deg") max_offset = 4 * u.deg geom = WcsGeom.create( binsz=0.25 * u.deg, width=10 * u.deg, skydir=pointing, axes=[migra_axis, energy_axis], ) # Extract EnergyDispersion2D from CTA 1DC IRF filename = "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits" edisp2D = EnergyDispersion2D.read(filename, hdu="ENERGY DISPERSION") aeff2d = EffectiveAreaTable2D.read(filename, hdu="EFFECTIVE AREA") # Create the exposure map exposure_geom = geom.to_image().to_cube([energy_axis]) exposure_map = make_map_exposure_true_energy(pointing, "1 h", aeff2d, exposure_geom) # create the EDispMap for the specified pointing edisp_map = make_edisp_map(edisp2D, pointing, geom, max_offset, exposure_map) # Get an Energy Dispersion (1D) at any position in the image pos = SkyCoord(2.0, 2.5, unit="deg") e_reco = np.logspace(-1.0, 1.0, 10) * u.TeV edisp = edisp_map.get_edisp_kernel(pos=pos, e_reco=e_reco) # Write map to disk edisp_map.write("edisp_map.fits") """ def __init__(self, edisp_map, exposure_map): if edisp_map.geom.axes[1].name.upper() != "ENERGY": raise ValueError("Incorrect energy axis position in input Map") if edisp_map.geom.axes[0].name.upper() != "MIGRA": raise ValueError("Incorrect migra axis position in input Map") self.edisp_map = edisp_map self.exposure_map = exposure_map
[docs] @classmethod def from_hdulist( cls, hdulist, edisp_hdu="EDISPMAP", edisp_hdubands="BANDSEDISP", exposure_hdu="EXPMAP", exposure_hdubands="BANDSEXP", ): """Convert to `~astropy.io.fits.HDUList`. Parameters ---------- edisp_hdu : str Name or index of the HDU with the edisp_map data. edisp_hdubands : str Name or index of the HDU with the edisp_map BANDS table. exposure_hdu : str Name or index of the HDU with the exposure_map data. exposure_hdubands : str Name or index of the HDU with the exposure_map BANDS table. """ edisp_map = Map.from_hdulist(hdulist, edisp_hdu, edisp_hdubands, "auto") if exposure_hdu in hdulist: exposure_map = Map.from_hdulist( hdulist, exposure_hdu, exposure_hdubands, "auto" ) else: exposure_map = None return cls(edisp_map, exposure_map)
[docs] @classmethod def read(cls, filename, **kwargs): """Read an edisp_map from file and create an EDispMap object""" with fits.open(filename, memmap=False) as hdulist: return cls.from_hdulist(hdulist, **kwargs)
[docs] def to_hdulist( self, edisp_hdu="EDISPMAP", edisp_hdubands="BANDSEDISP", exposure_hdu="EXPMAP", exposure_hdubands="BANDSEXP", ): """Convert to `~astropy.io.fits.HDUList`. Parameters ---------- edisp_hdu : str Name or index of the HDU with the edisp_map data. edisp_hdubands : str Name or index of the HDU with the edisp_map BANDS table. exposure_hdu : str Name or index of the HDU with the exposure_map data. exposure_hdubands : str Name or index of the HDU with the exposure_map BANDS table. Returns ------- hdu_list : `~astropy.io.fits.HDUList` """ hdulist = self.edisp_map.to_hdulist(hdu=edisp_hdu, hdu_bands=edisp_hdubands) if self.exposure_map is not None: new_hdulist = self.exposure_map.to_hdulist( hdu=exposure_hdu, hdu_bands=exposure_hdubands ) hdulist.extend(new_hdulist[1:]) return hdulist
[docs] def write(self, filename, overwrite=False, **kwargs): """Write to fits""" hdulist = self.to_hdulist(**kwargs) hdulist.writeto(filename, overwrite=overwrite)
[docs] def get_edisp_kernel(self, position, e_reco): """Get energy dispersion at a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` the target position. Should be a single coordinates e_reco : `~astropy.units.Quantity` Reconstructed energy axis binning Returns ------- edisp : `~gammapy.irf.EnergyDispersion` the energy dispersion (i.e. rmf object) """ if position.size != 1: raise ValueError( "EnergyDispersion can be extracted at one single position only." ) energy_axis = self.edisp_map.geom.get_axis_by_name("energy") migra_axis = self.edisp_map.geom.get_axis_by_name("migra") coords = { "skycoord": position, "migra": migra_axis.center.reshape((-1, 1, 1, 1)), "energy": energy_axis.center.reshape((1, -1, 1, 1)), } # Interpolate in the EDisp map. Squeeze to remove dimensions of length 1 values = self.edisp_map.interp_by_coord(coords) * self.edisp_map.unit edisp_values = values.squeeze() data = [] for idx, e_true in enumerate(energy_axis.center): # migration value of e_reco bounds migra = e_reco / e_true cumsum = np.insert(edisp_values[:, idx], 0, 0).cumsum() with np.errstate(invalid="ignore"): cumsum = np.nan_to_num(cumsum / cumsum[-1]) f = interp1d( migra_axis.edges.value, cumsum, kind="linear", bounds_error=False, fill_value=(0, 1), ) # We compute the difference between 2 successive bounds in e_reco # to get integral over reco energy bin integral = np.diff(np.clip(f(migra), a_min=0, a_max=1)) data.append(integral) return EDispKernel( e_true_lo=energy_axis.edges[:-1], e_true_hi=energy_axis.edges[1:], e_reco_lo=e_reco[:-1], e_reco_hi=e_reco[1:], data=data, )
[docs] def stack(self, other, weights=None): """Stack EDispMap with another one in place. Parameters ---------- other : `~gammapy.cube.EDispMap` Energy dispersion map to be stacked with this one. """ if self.exposure_map is None or other.exposure_map is None: raise ValueError("Missing exposure map for PSFMap.stack") cutout_info = other.edisp_map.geom.cutout_info if cutout_info is not None: slices = cutout_info["parent-slices"] parent_slices = Ellipsis, slices[0], slices[1] else: parent_slices = None self.edisp_map.data[parent_slices] *= self.exposure_map.data[parent_slices] self.edisp_map.stack(other.edisp_map * other.exposure_map.data, weights=weights) # stack exposure map self.exposure_map.stack(other.exposure_map, weights=weights) with np.errstate(invalid="ignore"): self.edisp_map.data[parent_slices] /= self.exposure_map.data[parent_slices] self.edisp_map.data = np.nan_to_num(self.edisp_map.data)
[docs] def copy(self): """Copy EDispMap""" return deepcopy(self)
[docs] @classmethod def from_geom(cls, geom): """Create edisp map from geom. By default a diagonal edisp matrix is created. Parameters ---------- geom : `Geom` Edisp map geometry. Returns ------- edisp_map : `EDispMap` Energy dispersion map. """ geom_exposure_edisp = geom.squash(axis="migra") exposure_edisp = Map.from_geom(geom_exposure_edisp, unit="m2 s") migra_axis = geom.get_axis_by_name("migra") edisp_map = Map.from_geom(geom, unit="") migra_0 = migra_axis.coord_to_pix(1) # distribute over two pixels migra = geom.get_idx()[2] data = np.abs(migra - migra_0) data = np.where(data < 1, 1 - data, 0) edisp_map.quantity = data return cls(edisp_map, exposure_edisp)
[docs] def sample_coord(self, map_coord, random_state=0): """Apply the energy dispersion corrections on the coordinates of a set of simulated events. Parameters ---------- map_coord : `~gammapy.maps.MapCoord` object. Sequence of coordinates and energies of sampled events. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- `~gammapy.maps.MapCoord`. Sequence of Edisp-corrected coordinates of the input map_coord map. """ random_state = get_random_state(random_state) migra_axis = self.edisp_map.geom.get_axis_by_name("migra") coord = { "skycoord": map_coord.skycoord.reshape(-1, 1), "energy": map_coord["energy"].reshape(-1, 1), "migra": migra_axis.center, } pdf_edisp = self.edisp_map.interp_by_coord(coord) sample_edisp = InverseCDFSampler(pdf_edisp, axis=1, random_state=random_state) pix_edisp = sample_edisp.sample_axis() migra = migra_axis.pix_to_coord(pix_edisp) energy_reco = map_coord["energy"] * migra return MapCoord.create({"skycoord": map_coord.skycoord, "energy": energy_reco})
[docs] @classmethod def from_diagonal_response(cls, energy_axis_true, migra_axis=None): """Create an allsky EDisp map with diagonal response. Parameters ---------- energy_axis_true : `MapAxis` True energy axis migra_axis : `MapAxis` Migra axis Returns ------- edisp_map : `EDispMap` Energy dispersion map. """ migra_res = 1e-5 migra_axis_default = MapAxis.from_bounds( 1 - migra_res, 1 + migra_res, nbin=3, name="migra", node_type="edges" ) migra_axis = migra_axis or migra_axis_default geom = WcsGeom.create( npix=(2, 1), proj="CAR", binsz=180, axes=[migra_axis, energy_axis_true] ) return cls.from_geom(geom)
[docs] def cutout(self, position, width, mode="trim"): """Cutout edisp map. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Center position of the cutout region. width : tuple of `~astropy.coordinates.Angle` Angular sizes of the region in (lon, lat) in that specific order. If only one value is passed, a square region is extracted. mode : {'trim', 'partial', 'strict'} Mode option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`. Returns ------- cutout : `EdispMap` Cutout edisp map. """ edisp_map = self.edisp_map.cutout(position, width, mode) exposure_map = self.exposure_map.cutout(position, width, mode) return self.__class__(edisp_map=edisp_map, exposure_map=exposure_map)