Source code for gammapy.cube.edisp_map

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from copy import deepcopy
import numpy as np
import astropy.io.fits as fits
import astropy.units as u
from gammapy.irf import EnergyDispersion
from gammapy.maps import Map, MapCoord, WcsGeom
from gammapy.utils.random import InverseCDFSampler, get_random_state

__all__ = ["make_edisp_map", "EDispMap"]


[docs]def make_edisp_map(edisp, pointing, geom, exposure_map=None): """Make a edisp map for a single observation Expected axes : migra and true energy in this specific order The name of the migra MapAxis is expected to be 'migra' Parameters ---------- edisp : `~gammapy.irf.EnergyDispersion2D` the 2D Energy Dispersion IRF pointing : `~astropy.coordinates.SkyCoord` the pointing direction geom : `~gammapy.maps.Geom` the map geom to be used. It provides the target geometry. rad and true energy axes should be given in this specific order. exposure_map : `~gammapy.maps.Map`, optional the associated exposure map. default is None Returns ------- edispmap : `~gammapy.cube.EDispMap` the resulting EDisp map """ energy_axis = geom.get_axis_by_name("energy") energy = energy_axis.center migra_axis = geom.get_axis_by_name("migra") migra = migra_axis.center # Compute separations with pointing position offset = geom.separation(pointing) # Compute EDisp values edisp_values = edisp.data.evaluate( offset=offset, e_true=energy[:, np.newaxis, np.newaxis, np.newaxis], migra=migra[:, np.newaxis, np.newaxis], ) # Create Map and fill relevant entries data = edisp_values.to_value("") edispmap = Map.from_geom(geom, data=data, unit="") return EDispMap(edispmap, exposure_map)
[docs]class EDispMap: """Energy dispersion map. Parameters ---------- edisp_map : `~gammapy.maps.Map` the input Energy Dispersion Map. Should be a Map with 2 non spatial axes. migra and true energy axes should be given in this specific order. exposure_map : `~gammapy.maps.Map`, optional Associated exposure map. Needs to have a consistent map geometry. Examples -------- :: import numpy as np from astropy import units as u from astropy.coordinates import SkyCoord from gammapy.maps import WcsGeom, MapAxis from gammapy.irf import EnergyDispersion2D, EffectiveAreaTable2D from gammapy.cube import make_edisp_map, make_map_exposure_true_energy # Define energy dispersion map geometry energy_axis = MapAxis.from_edges(np.logspace(-1, 1, 4), unit="TeV", name="energy") migra_axis = MapAxis.from_edges(np.linspace(0, 3, 100), name="migra") pointing = SkyCoord(0, 0, unit="deg") max_offset = 4 * u.deg geom = WcsGeom.create( binsz=0.25 * u.deg, width=10 * u.deg, skydir=pointing, axes=[migra_axis, energy_axis], ) # Extract EnergyDispersion2D from CTA 1DC IRF filename = "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits" edisp2D = EnergyDispersion2D.read(filename, hdu="ENERGY DISPERSION") aeff2d = EffectiveAreaTable2D.read(filename, hdu="EFFECTIVE AREA") # Create the exposure map exposure_geom = geom.to_image().to_cube([energy_axis]) exposure_map = make_map_exposure_true_energy(pointing, "1 h", aeff2d, exposure_geom) # create the EDispMap for the specified pointing edisp_map = make_edisp_map(edisp2D, pointing, geom, max_offset, exposure_map) # Get an Energy Dispersion (1D) at any position in the image pos = SkyCoord(2.0, 2.5, unit="deg") e_reco = np.logspace(-1.0, 1.0, 10) * u.TeV edisp = edisp_map.get_energy_dispersion(pos=pos, e_reco=e_reco) # Write map to disk edisp_map.write("edisp_map.fits") """ def __init__(self, edisp_map, exposure_map): if edisp_map.geom.axes[1].name.upper() != "ENERGY": raise ValueError("Incorrect energy axis position in input Map") if edisp_map.geom.axes[0].name.upper() != "MIGRA": raise ValueError("Incorrect migra axis position in input Map") self.edisp_map = edisp_map self.exposure_map = exposure_map
[docs] @classmethod def from_hdulist( cls, hdulist, edisp_hdu="EDISPMAP", edisp_hdubands="BANDSEDISP", exposure_hdu="EXPMAP", exposure_hdubands="BANDSEXP", ): """Convert to `~astropy.io.fits.HDUList`. Parameters ---------- edisp_hdu : str Name or index of the HDU with the edisp_map data. edisp_hdubands : str Name or index of the HDU with the edisp_map BANDS table. exposure_hdu : str Name or index of the HDU with the exposure_map data. exposure_hdubands : str Name or index of the HDU with the exposure_map BANDS table. """ edisp_map = Map.from_hdulist(hdulist, edisp_hdu, edisp_hdubands, "auto") if exposure_hdu in hdulist: exposure_map = Map.from_hdulist( hdulist, exposure_hdu, exposure_hdubands, "auto" ) else: exposure_map = None return cls(edisp_map, exposure_map)
[docs] @classmethod def read(cls, filename, **kwargs): """Read an edisp_map from file and create an EDispMap object""" with fits.open(filename, memmap=False) as hdulist: return cls.from_hdulist(hdulist, **kwargs)
[docs] def to_hdulist( self, edisp_hdu="EDISPMAP", edisp_hdubands="BANDSEDISP", exposure_hdu="EXPMAP", exposure_hdubands="BANDSEXP", ): """Convert to `~astropy.io.fits.HDUList`. Parameters ---------- edisp_hdu : str Name or index of the HDU with the edisp_map data. edisp_hdubands : str Name or index of the HDU with the edisp_map BANDS table. exposure_hdu : str Name or index of the HDU with the exposure_map data. exposure_hdubands : str Name or index of the HDU with the exposure_map BANDS table. Returns ------- hdu_list : `~astropy.io.fits.HDUList` """ hdulist = self.edisp_map.to_hdulist(hdu=edisp_hdu, hdu_bands=edisp_hdubands) if self.exposure_map is not None: new_hdulist = self.exposure_map.to_hdulist( hdu=exposure_hdu, hdu_bands=exposure_hdubands ) hdulist.extend(new_hdulist[1:]) return hdulist
[docs] def write(self, filename, overwrite=False, **kwargs): """Write to fits""" hdulist = self.to_hdulist(**kwargs) hdulist.writeto(filename, overwrite=overwrite)
[docs] def get_energy_dispersion(self, position, e_reco, migra_step=5e-3): """Get energy dispersion at a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` the target position. Should be a single coordinates e_reco : `~astropy.units.Quantity` Reconstructed energy axis binning migra_step : float Integration step in migration Returns ------- edisp : `~gammapy.irf.EnergyDispersion` the energy dispersion (i.e. rmf object) """ # TODO: reduce code duplication with EnergyDispersion2D.get_response if position.size != 1: raise ValueError( "EnergyDispersion can be extracted at one single position only." ) # axes ordering fixed. Could be changed. pix_ener = np.arange(self.edisp_map.geom.axes[1].nbin) # Define a vector of migration with mig_step step mrec_min = self.edisp_map.geom.axes[0].edges[0] mrec_max = self.edisp_map.geom.axes[0].edges[-1] mig_array = np.arange(mrec_min, mrec_max, migra_step) pix_migra = (mig_array - mrec_min) / mrec_max * self.edisp_map.geom.axes[0].nbin # Convert position to pixels pix_lon, pix_lat = self.edisp_map.geom.to_image().coord_to_pix(position) # Build the pixels tuple pix = np.meshgrid(pix_lon, pix_lat, pix_migra, pix_ener) # Interpolate in the EDisp map. Squeeze to remove dimensions of length 1 edisp_values = self.edisp_map.interp_by_pix(pix) * u.Unit(self.edisp_map.unit) edisp_values = np.squeeze(edisp_values, axis=(0, 1)) e_trues = self.edisp_map.geom.axes[1].center data = [] for i, e_true in enumerate(e_trues): # We now perform integration over migra # The code is adapted from `~gammapy.EnergyDispersion2D.get_response` # migration value of e_reco bounds migra_e_reco = e_reco / e_true # Compute normalized cumulative sum to prepare integration tmp = np.nan_to_num( np.cumsum(edisp_values[:, i]) / np.sum(edisp_values[:, i]) ) # Determine positions (bin indices) of e_reco bounds in migration array pos_mig = np.digitize(migra_e_reco, mig_array) - 1 # We ensure that no negative values are found pos_mig = np.maximum(pos_mig, 0) # We compute the difference between 2 successive bounds in e_reco # to get integral over reco energy bin integral = np.diff(tmp[pos_mig]) data.append(integral) data = np.asarray(data) # EnergyDispersion uses edges of true energy bins e_true_edges = self.edisp_map.geom.axes[1].edges e_lo, e_hi = e_true_edges[:-1], e_true_edges[1:] ereco_lo, ereco_hi = (e_reco[:-1], e_reco[1:]) return EnergyDispersion( e_true_lo=e_lo, e_true_hi=e_hi, e_reco_lo=ereco_lo, e_reco_hi=ereco_hi, data=data, )
[docs] def stack(self, other, weights=None): """Stack EDispMap with another one in place. Parameters ---------- other : `~gammapy.cube.EDispMap` Energy dispersion map to be stacked with this one. """ if self.exposure_map is None or other.exposure_map is None: raise ValueError("Missing exposure map for PSFMap.stack") cutout_info = other.edisp_map.geom.cutout_info if cutout_info is not None: slices = cutout_info["parent-slices"] parent_slices = Ellipsis, slices[0], slices[1] else: parent_slices = None self.edisp_map.data[parent_slices] *= self.exposure_map.data[parent_slices] self.edisp_map.stack(other.edisp_map * other.exposure_map.data, weights=weights) # stack exposure map self.exposure_map.stack(other.exposure_map, weights=weights) with np.errstate(invalid="ignore"): self.edisp_map.data[parent_slices] /= self.exposure_map.data[parent_slices] self.edisp_map.data = np.nan_to_num(self.edisp_map.data)
[docs] def copy(self): """Copy EDispMap""" return deepcopy(self)
[docs] @classmethod def from_geom(cls, geom): """Create edisp map from geom. By default a diagonal edisp matrix is created. Parameters ---------- geom : `Geom` Edisp map geometry. Returns ------- edisp_map : `EDispMap` Energy dispersion map. """ geom_exposure_edisp = geom.squash(axis="migra") exposure_edisp = Map.from_geom(geom_exposure_edisp, unit="m2 s") migra_axis = geom.get_axis_by_name("migra") edisp_map = Map.from_geom(geom, unit="") loc = migra_axis.edges.searchsorted(1.0) edisp_map.data[:, loc, :, :] = 1.0 return cls(edisp_map, exposure_edisp)
[docs] def sample_coord(self, map_coord, random_state=0): """Apply the energy dispersion corrections on the coordinates of a set of simulated events. Parameters ---------- map_coord : `~gammapy.maps.MapCoord` object. Sequence of coordinates and energies of sampled events. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- `~gammapy.maps.MapCoord`. Sequence of Edisp-corrected coordinates of the input map_coord map. """ random_state = get_random_state(random_state) migra_axis = self.edisp_map.geom.get_axis_by_name("migra") coord = { "skycoord": map_coord.skycoord.reshape(-1, 1), "energy": map_coord["energy"].reshape(-1, 1), "migra": migra_axis.center, } pdf_edisp = self.edisp_map.interp_by_coord(coord) sample_edisp = InverseCDFSampler(pdf_edisp, axis=1, random_state=random_state) pix_edisp = sample_edisp.sample_axis() energy_reco = migra_axis.pix_to_coord(pix_edisp) return MapCoord.create({"skycoord": map_coord.skycoord, "energy": energy_reco})
[docs] @classmethod def from_diagonal_response(cls, energy_axis_true, migra_axis=None): """Create an allsky EDisp map with diagonal response. Parameters ---------- energy_axis_true : `MapAxis` True energy axis migra_axis : `MapAxis` Migra axis Returns ------- edisp_map : `EDispMap` Energy dispersion map. """ from .fit import MIGRA_AXIS_DEFAULT migra_axis = migra_axis or MIGRA_AXIS_DEFAULT geom = WcsGeom.create( npix=(4, 2), proj="CAR", binsz=90, axes=[migra_axis, energy_axis_true] ) return cls.from_geom(geom)
[docs] def cutout(self, position, width, mode="trim"): """Cutout edisp map. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Center position of the cutout region. width : tuple of `~astropy.coordinates.Angle` Angular sizes of the region in (lon, lat) in that specific order. If only one value is passed, a square region is extracted. mode : {'trim', 'partial', 'strict'} Mode option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`. Returns ------- cutout : `EdispMap` Cutout edisp map. """ edisp_map = self.edisp_map.cutout(position, width, mode) exposure_map = self.exposure_map.cutout(position, width, mode) return self.__class__(edisp_map=edisp_map, exposure_map=exposure_map)