# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import astropy.units as u
import astropy.io.fits as fits
from ..irf import EnergyDispersion
from ..maps import Map
__all__ = ["make_edisp_map", "EDispMap"]
[docs]def make_edisp_map(edisp, pointing, geom, max_offset, exposure_map=None):
"""Make a edisp map for a single observation
Expected axes : migra and true energy in this specific order
The name of the migra MapAxis is expected to be 'migra'
Parameters
----------
edisp : `~gammapy.irf.EnergyDispersion2D`
the 2D Energy Dispersion IRF
pointing : `~astropy.coordinates.SkyCoord`
the pointing direction
geom : `~gammapy.maps.MapGeom`
the map geom to be used. It provides the target geometry.
rad and true energy axes should be given in this specific order.
max_offset : `~astropy.coordinates.Angle`
maximum offset w.r.t. fov center
exposure_map : `~gammapy.maps.Map`, optional
the associated exposure map.
default is None
Returns
-------
edispmap : `~gammapy.cube.EDispMap`
the resulting EDisp map
"""
energy = geom.get_axis_by_name("energy").center
migra = geom.get_axis_by_name("migra").center
# Compute separations with pointing position
separations = pointing.separation(geom.to_image().get_coord().skycoord)
valid = np.where(separations < max_offset)
# Compute EDisp values
edisp_values = edisp.data.evaluate(
offset=separations[valid],
e_true=energy[:, np.newaxis],
migra=migra[:, np.newaxis, np.newaxis],
)
# Re-order axes to be consistent with expected geometry
edisp_values = np.transpose(edisp_values, axes=(1, 0, 2))
# Create Map and fill relevant entries
edispmap = Map.from_geom(geom, unit="")
edispmap.data[:, :, valid[0], valid[1]] += edisp_values.to_value(edispmap.unit)
return EDispMap(edispmap, exposure_map)
[docs]class EDispMap:
"""Energy dispersion map.
Parameters
----------
edisp_map : `~gammapy.maps.Map`
the input Energy Dispersion Map. Should be a Map with 2 non spatial axes.
migra and true energy axes should be given in this specific order.
exposure_map : `~gammapy.maps.Map`, optional
Associated exposure map. Needs to have a consistent map geometry.
Examples
--------
::
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from gammapy.maps import WcsGeom, MapAxis
from gammapy.irf import EnergyDispersion2D, EffectiveAreaTable2D
from gammapy.cube import make_edisp_map, make_map_exposure_true_energy
# Define energy dispersion map geometry
energy_axis = MapAxis.from_edges(np.logspace(-1, 1, 4), unit="TeV", name="energy")
migra_axis = MapAxis.from_edges(np.linspace(0, 3, 100), name="migra")
pointing = SkyCoord(0, 0, unit="deg")
max_offset = 4 * u.deg
geom = WcsGeom.create(
binsz=0.25 * u.deg,
width=10 * u.deg,
skydir=pointing,
axes=[migra_axis, energy_axis],
)
# Extract EnergyDispersion2D from CTA 1DC IRF
filename = "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits"
edisp2D = EnergyDispersion2D.read(filename, hdu="ENERGY DISPERSION")
aeff2d = EffectiveAreaTable2D.read(filename, hdu="EFFECTIVE AREA")
# Create the exposure map
exposure_geom = geom.to_image().to_cube([energy_axis])
exposure_map = make_map_exposure_true_energy(pointing, "1 h", aeff2d, exposure_geom)
# create the EDispMap for the specified pointing
edisp_map = make_edisp_map(edisp2D, pointing, geom, max_offset, exposure_map)
# Get an Energy Dispersion (1D) at any position in the image
pos = SkyCoord(2.0, 2.5, unit="deg")
e_reco = np.logspace(-1.0, 1.0, 10) * u.TeV
edisp = edisp_map.get_energy_dispersion(pos=pos, e_reco=e_reco)
# Write map to disk
edisp_map.write("edisp_map.fits")
"""
def __init__(self, edisp_map, exposure_map):
if edisp_map.geom.axes[1].name.upper() != "ENERGY":
raise ValueError("Incorrect energy axis position in input Map")
if edisp_map.geom.axes[0].name.upper() != "MIGRA":
raise ValueError("Incorrect migra axis position in input Map")
self.edisp_map = edisp_map
if exposure_map is not None:
# First adapt geometry, keep only energy axis
expected_geom = edisp_map.geom.to_image().to_cube([edisp_map.geom.axes[1]])
if exposure_map.geom != expected_geom:
raise ValueError(
"EDispMap and exposure_map have inconsistent geometries"
)
self.exposure_map = exposure_map
[docs] @classmethod
def from_hdulist(
cls,
hdulist,
edisp_hdu="EDISPMAP",
edisp_hdubands="BANDSEDISP",
exposure_hdu="EXPMAP",
exposure_hdubands="BANDSEXP",
):
"""Convert to `~astropy.io.fits.HDUList`.
Parameters
----------
edisp_hdu : str
Name or index of the HDU with the edisp_map data.
edisp_hdubands : str
Name or index of the HDU with the edisp_map BANDS table.
exposure_hdu : str
Name or index of the HDU with the exposure_map data.
exposure_hdubands : str
Name or index of the HDU with the exposure_map BANDS table.
"""
edisp_map = Map.from_hdulist(hdulist, edisp_hdu, edisp_hdubands, "auto")
if exposure_hdu in hdulist:
exposure_map = Map.from_hdulist(
hdulist, exposure_hdu, exposure_hdubands, "auto"
)
else:
exposure_map = None
return cls(edisp_map, exposure_map)
[docs] @classmethod
def read(cls, filename, **kwargs):
"""Read an edisp_map from file and create an EDispMap object"""
with fits.open(filename, memmap=False) as hdulist:
return cls.from_hdulist(hdulist, **kwargs)
[docs] def to_hdulist(
self,
edisp_hdu="EDISPMAP",
edisp_hdubands="BANDSEDISP",
exposure_hdu="EXPMAP",
exposure_hdubands="BANDSEXP",
):
"""Convert to `~astropy.io.fits.HDUList`.
Parameters
----------
edisp_hdu : str
Name or index of the HDU with the edisp_map data.
edisp_hdubands : str
Name or index of the HDU with the edisp_map BANDS table.
exposure_hdu : str
Name or index of the HDU with the exposure_map data.
exposure_hdubands : str
Name or index of the HDU with the exposure_map BANDS table.
Returns
-------
hdu_list : `~astropy.io.fits.HDUList`
"""
hdulist = self.edisp_map.to_hdulist(hdu=edisp_hdu, hdu_bands=edisp_hdubands)
if self.exposure_map is not None:
new_hdulist = self.exposure_map.to_hdulist(
hdu=exposure_hdu, hdu_bands=exposure_hdubands
)
hdulist.extend(new_hdulist[1:])
return hdulist
[docs] def write(self, filename, overwrite=False, **kwargs):
"""Write to fits"""
hdulist = self.to_hdulist(**kwargs)
hdulist.writeto(filename, overwrite=overwrite)
[docs] def get_energy_dispersion(self, position, e_reco, migra_step=5e-3):
"""Get energy dispersion at a given position.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
the target position. Should be a single coordinates
e_reco : `~astropy.units.Quantity`
Reconstructed energy axis binning
migra_step : float
Integration step in migration
Returns
-------
edisp : `~gammapy.irf.EnergyDispersion`
the energy dispersion (i.e. rmf object)
"""
# TODO: reduce code duplication with EnergyDispersion2D.get_response
if position.size != 1:
raise ValueError(
"EnergyDispersion can be extracted at one single position only."
)
# axes ordering fixed. Could be changed.
pix_ener = np.arange(self.edisp_map.geom.axes[1].nbin)
# Define a vector of migration with mig_step step
mrec_min = self.edisp_map.geom.axes[0].edges[0]
mrec_max = self.edisp_map.geom.axes[0].edges[-1]
mig_array = np.arange(mrec_min, mrec_max, migra_step)
pix_migra = (mig_array - mrec_min) / mrec_max * self.edisp_map.geom.axes[0].nbin
# Convert position to pixels
pix_lon, pix_lat = self.edisp_map.geom.to_image().coord_to_pix(position)
# Build the pixels tuple
pix = np.meshgrid(pix_lon, pix_lat, pix_migra, pix_ener)
# Interpolate in the EDisp map. Squeeze to remove dimensions of length 1
edisp_values = np.squeeze(
self.edisp_map.interp_by_pix(pix)
* u.Unit(self.edisp_map.unit) # * migra_step
)
e_trues = self.edisp_map.geom.axes[1].center
data = []
for i, e_true in enumerate(e_trues):
# We now perform integration over migra
# The code is adapted from `~gammapy.EnergyDispersion2D.get_response`
# migration value of e_reco bounds
migra_e_reco = e_reco / e_true
# Compute normalized cumulative sum to prepare integration
tmp = np.nan_to_num(
np.cumsum(edisp_values[:, i]) / np.sum(edisp_values[:, i])
)
# Determine positions (bin indices) of e_reco bounds in migration array
pos_mig = np.digitize(migra_e_reco, mig_array) - 1
# We ensure that no negative values are found
pos_mig = np.maximum(pos_mig, 0)
# We compute the difference between 2 successive bounds in e_reco
# to get integral over reco energy bin
integral = np.diff(tmp[pos_mig])
data.append(integral)
data = np.asarray(data)
# EnergyDispersion uses edges of true energy bins
e_true_edges = self.edisp_map.geom.axes[1].edges
e_lo, e_hi = e_true_edges[:-1], e_true_edges[1:]
ereco_lo, ereco_hi = (e_reco[:-1], e_reco[1:])
return EnergyDispersion(
e_true_lo=e_lo,
e_true_hi=e_hi,
e_reco_lo=ereco_lo,
e_reco_hi=ereco_hi,
data=data,
)
[docs] def stack(self, other):
"""Stack EdispMap with another one.
The current EdispMap is unchanged and a new one is created and returned.
Parameters
----------
other : `~gammapy.cube.EDispMap`
the edispmap to be stacked with this one.
Returns
-------
new : `~gammapy.cube.EDispMap`
the stacked edispmap
"""
if self.exposure_map is None or other.exposure_map is None:
raise ValueError("Missing exposure map for EdispMap.stack")
# Reproject other exposure
exposure_coord = self.exposure_map.geom.get_coord()
reproj_exposure = Map.from_geom(
self.exposure_map.geom, unit=self.exposure_map.unit
)
reproj_exposure.fill_by_coord(
exposure_coord, other.exposure_map.get_by_coord(exposure_coord)
)
# Reproject other psfmap using same geom
edispmap_coord = self.edisp_map.geom.get_coord()
reproj_edispmap = Map.from_geom(self.edisp_map.geom, unit=self.edisp_map.unit)
reproj_edispmap.fill_by_coord(
edispmap_coord, other.edisp_map.get_by_coord(edispmap_coord)
)
exposure = self.exposure_map.quantity[:, np.newaxis, :, :]
stacked_edisp_quantity = self.edisp_map.quantity * exposure
other_exposure = reproj_exposure.quantity[:, np.newaxis, :, :]
stacked_edisp_quantity += reproj_edispmap.quantity * other_exposure
total_exposure = exposure + other_exposure
stacked_edisp_quantity /= total_exposure
reproj_edispmap.quantity = stacked_edisp_quantity
# We need to remove the extra axis in the total exposure
reproj_exposure.quantity = total_exposure[:, 0, :, :]
return EDispMap(reproj_edispmap, reproj_exposure)