# Licensed under a 3-clause BSD style license - see LICENSE.rst
from copy import deepcopy
import numpy as np
import astropy.io.fits as fits
import astropy.units as u
from gammapy.irf import EnergyDependentTablePSF
from gammapy.maps import Map, MapAxis, MapCoord, WcsGeom
from gammapy.utils.random import InverseCDFSampler, get_random_state
from .psf_kernel import PSFKernel
__all__ = ["make_psf_map", "PSFMap"]
[docs]def make_psf_map(psf, pointing, geom, exposure_map=None):
"""Make a psf map for a single observation
Expected axes : rad and true energy in this specific order
The name of the rad MapAxis is expected to be 'rad'
Parameters
----------
psf : `~gammapy.irf.PSF3D`
the PSF IRF
pointing : `~astropy.coordinates.SkyCoord`
the pointing direction
geom : `~gammapy.maps.Geom`
the map geom to be used. It provides the target geometry.
rad and true energy axes should be given in this specific order.
exposure_map : `~gammapy.maps.Map`, optional
the associated exposure map.
default is None
Returns
-------
psfmap : `~gammapy.cube.PSFMap`
the resulting PSF map
"""
energy_axis = geom.get_axis_by_name("energy")
energy = energy_axis.center
rad_axis = geom.get_axis_by_name("theta")
rad = rad_axis.center
# Compute separations with pointing position
offset = geom.separation(pointing)
# Compute PSF values
# TODO: allow broadcasting in PSF3D.evaluate()
psf_values = psf._interpolate(
(
rad[:, np.newaxis, np.newaxis],
offset,
energy[:, np.newaxis, np.newaxis, np.newaxis],
)
)
# TODO: this probably does not ensure that probability is properly normalized in the PSFMap
# Create Map and fill relevant entries
data = psf_values.to_value("sr-1")
psfmap = Map.from_geom(geom, data=data, unit="sr-1")
return PSFMap(psfmap, exposure_map)
[docs]class PSFMap:
"""Class containing the Map of PSFs and allowing to interact with it.
Parameters
----------
psf_map : `~gammapy.maps.Map`
the input PSF Map. Should be a Map with 2 non spatial axes.
rad and true energy axes should be given in this specific order.
exposure_map : `~gammapy.maps.Map`
Associated exposure map. Needs to have a consistent map geometry.
Examples
--------
::
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from gammapy.maps import Map, WcsGeom, MapAxis
from gammapy.irf import EnergyDependentMultiGaussPSF, EffectiveAreaTable2D
from gammapy.cube import make_psf_map, PSFMap, make_map_exposure_true_energy
# Define energy axis. Note that the name is fixed.
energy_axis = MapAxis.from_edges(np.logspace(-1., 1., 4), unit='TeV', name='energy')
# Define rad axis. Again note the axis name
rads = np.linspace(0., 0.5, 100) * u.deg
rad_axis = MapAxis.from_edges(rads, unit='deg', name='theta')
# Define parameters
pointing = SkyCoord(0., 0., unit='deg')
max_offset = 4 * u.deg
# Create WcsGeom
geom = WcsGeom.create(binsz=0.25*u.deg, width=10*u.deg, skydir=pointing, axes=[rad_axis, energy_axis])
# Extract EnergyDependentTablePSF from CTA 1DC IRF
filename = '$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits'
psf = EnergyDependentMultiGaussPSF.read(filename, hdu='POINT SPREAD FUNCTION')
psf3d = psf.to_psf3d(rads)
aeff2d = EffectiveAreaTable2D.read(filename, hdu='EFFECTIVE AREA')
# Create the exposure map
exposure_geom = geom.to_image().to_cube([energy_axis])
exposure_map = make_map_exposure_true_energy(pointing, "1 h", aeff2d, exposure_geom)
# create the PSFMap for the specified pointing
psf_map = make_psf_map(psf3d, pointing, geom, max_offset, exposure_map)
# Get an EnergyDependentTablePSF at any position in the image
psf_table = psf_map.get_energy_dependent_table_psf(SkyCoord(2., 2.5, unit='deg'))
# Write map to disk
psf_map.write('psf_map.fits')
"""
def __init__(self, psf_map, exposure_map=None):
if psf_map.geom.axes[1].name.upper() != "ENERGY":
raise ValueError("Incorrect energy axis position in input Map")
if psf_map.geom.axes[0].name.upper() != "THETA":
raise ValueError("Incorrect theta axis position in input Map")
self.psf_map = psf_map
self.exposure_map = exposure_map
[docs] @classmethod
def from_hdulist(
cls,
hdulist,
psf_hdu="PSFMAP",
psf_hdubands="BANDSPSF",
exposure_hdu="EXPMAP",
exposure_hdubands="BANDSEXP",
):
"""Convert to `~astropy.io.fits.HDUList`.
Parameters
----------
psf_hdu : str
Name or index of the HDU with the psf_map data.
psf_hdubands : str
Name or index of the HDU with the psf_map BANDS table.
exposure_hdu : str
Name or index of the HDU with the exposure_map data.
exposure_hdubands : str
Name or index of the HDU with the exposure_map BANDS table.
"""
psf_map = Map.from_hdulist(hdulist, psf_hdu, psf_hdubands, "auto")
if exposure_hdu in hdulist:
exposure_map = Map.from_hdulist(
hdulist, exposure_hdu, exposure_hdubands, "auto"
)
else:
exposure_map = None
return cls(psf_map, exposure_map)
[docs] @classmethod
def read(cls, filename, **kwargs):
"""Read a psf_map from file and create a PSFMap object"""
with fits.open(filename, memmap=False) as hdulist:
return cls.from_hdulist(hdulist, **kwargs)
[docs] def to_hdulist(
self,
psf_hdu="PSFMAP",
psf_hdubands="BANDSPSF",
exposure_hdu="EXPMAP",
exposure_hdubands="BANDSEXP",
):
"""Convert to `~astropy.io.fits.HDUList`.
Parameters
----------
psf_hdu : str
Name or index of the HDU with the psf_map data.
psf_hdubands : str
Name or index of the HDU with the psf_map BANDS table.
exposure_hdu : str
Name or index of the HDU with the exposure_map data.
exposure_hdubands : str
Name or index of the HDU with the exposure_map BANDS table.
Returns
-------
hdu_list : `~astropy.io.fits.HDUList`
"""
hdulist = self.psf_map.to_hdulist(hdu=psf_hdu, hdu_bands=psf_hdubands)
if self.exposure_map is not None:
new_hdulist = self.exposure_map.to_hdulist(
hdu=exposure_hdu, hdu_bands=exposure_hdubands
)
hdulist.extend(new_hdulist[1:])
return hdulist
[docs] def write(self, filename, overwrite=False, **kwargs):
"""Write to fits"""
hdulist = self.to_hdulist(**kwargs)
hdulist.writeto(filename, overwrite=overwrite)
[docs] def get_energy_dependent_table_psf(self, position):
"""Get energy-dependent PSF at a given position.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
the target position. Should be a single coordinates
Returns
-------
psf_table : `~gammapy.irf.EnergyDependentTablePSF`
the table PSF
"""
if position.size != 1:
raise ValueError(
"EnergyDependentTablePSF can be extracted at one single position only."
)
# axes ordering fixed. Could be changed.
pix_ener = np.arange(self.psf_map.geom.axes[1].nbin)
pix_rad = np.arange(self.psf_map.geom.axes[0].nbin)
# Convert position to pixels
pix_lon, pix_lat = self.psf_map.geom.to_image().coord_to_pix(position)
# Build the pixels tuple
pix = np.meshgrid(pix_lon, pix_lat, pix_rad, pix_ener)
# Interpolate in the PSF map. Squeeze to remove dimensions of length 1
psf_values = np.squeeze(
self.psf_map.interp_by_pix(pix) * u.Unit(self.psf_map.unit)
)
energies = self.psf_map.geom.axes[1].center
rad = self.psf_map.geom.axes[0].center
if self.exposure_map is not None:
exposure_3d = self.exposure_map.slice_by_idx({"theta": 0})
coords = {"skycoord": position, "energy": energies.reshape((-1, 1, 1))}
data = exposure_3d.interp_by_coord(coords).squeeze()
exposure = data * self.exposure_map.unit
else:
exposure = None
# Beware. Need to revert rad and energies to follow the TablePSF scheme.
return EnergyDependentTablePSF(
energy=energies, rad=rad, psf_value=psf_values.T, exposure=exposure
)
[docs] def get_psf_kernel(self, position, geom, max_radius=None, factor=4):
"""Returns a PSF kernel at the given position.
The PSF is returned in the form a WcsNDMap defined by the input Geom.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
the target position. Should be a single coordinate
geom : `~gammapy.maps.Geom`
the target geometry to use
max_radius : `~astropy.coordinates.Angle`
maximum angular size of the kernel map
factor : int
oversampling factor to compute the PSF
Returns
-------
kernel : `~gammapy.cube.PSFKernel`
the resulting kernel
"""
table_psf = self.get_energy_dependent_table_psf(position)
if max_radius is None:
max_radius = np.max(table_psf.rad)
return PSFKernel.from_table_psf(table_psf, geom, max_radius, factor)
[docs] def containment_radius_map(self, energy, fraction=0.68):
"""Containment radius map.
Parameters
----------
energy : `~astropy.units.Quantity`
Scalar energy at which to compute the containment radius
fraction : float
the containment fraction (range: 0 to 1)
Returns
-------
containment_radius_map : `~gammapy.maps.Map`
Containment radius map
"""
coords = self.psf_map.geom.to_image().get_coord().skycoord.flatten()
m = Map.from_geom(self.psf_map.geom.to_image(), unit="deg")
for coord in coords:
psf_table = self.get_energy_dependent_table_psf(coord)
containment_radius = psf_table.containment_radius(energy, fraction)
m.fill_by_coord(coord, containment_radius)
return m
[docs] def stack(self, other, weights=None):
"""Stack PSFMap with another one in place.
Parameters
----------
other : `~gammapy.cube.PSFMap`
the psfmap to be stacked with this one.
"""
if self.exposure_map is None or other.exposure_map is None:
raise ValueError("Missing exposure map for PSFMap.stack")
cutout_info = other.psf_map.geom.cutout_info
if cutout_info is not None:
slices = cutout_info["parent-slices"]
parent_slices = Ellipsis, slices[0], slices[1]
else:
parent_slices = None
self.psf_map.data[parent_slices] *= self.exposure_map.data[parent_slices]
self.psf_map.stack(other.psf_map * other.exposure_map.data, weights=weights)
# stack exposure map
self.exposure_map.stack(other.exposure_map, weights=weights)
with np.errstate(invalid="ignore"):
self.psf_map.data[parent_slices] /= self.exposure_map.data[parent_slices]
self.psf_map.data = np.nan_to_num(self.psf_map.data)
[docs] def copy(self):
"""Copy PSFMap"""
return deepcopy(self)
[docs] @classmethod
def from_geom(cls, geom):
"""Create psf map from geom.
Parameters
----------
geom : `Geom`
PSF map geometry.
Returns
-------
psf_map : `PSFMap`
Point spread function map.
"""
geom_exposure_psf = geom.squash(axis="theta")
exposure_psf = Map.from_geom(geom_exposure_psf, unit="m2 s")
psf_map = Map.from_geom(geom, unit="sr-1")
return cls(psf_map, exposure_psf)
[docs] def sample_coord(self, map_coord, random_state=0):
"""Apply PSF corrections on the coordinates of a set of simulated events.
Parameters
----------
map_coord : `~gammapy.maps.MapCoord` object.
Sequence of coordinates and energies of sampled events.
random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}
Defines random number generator initialisation.
Passed to `~gammapy.utils.random.get_random_state`.
Returns
-------
corr_coord : `~gammapy.maps.MapCoord` object.
Sequence of PSF-corrected coordinates of the input map_coord map.
"""
random_state = get_random_state(random_state)
rad_axis = self.psf_map.geom.get_axis_by_name("theta")
coord = {
"skycoord": map_coord.skycoord.reshape(-1, 1),
"energy": map_coord["energy"].reshape(-1, 1),
"theta": rad_axis.center,
}
pdf = self.psf_map.interp_by_coord(coord)
sample_pdf = InverseCDFSampler(pdf, axis=1, random_state=random_state)
pix_coord = sample_pdf.sample_axis()
separation = rad_axis.pix_to_coord(pix_coord)
position_angle = random_state.uniform(360, size=len(map_coord.lon)) * u.deg
event_positions = map_coord.skycoord.directional_offset_by(
position_angle=position_angle, separation=separation
)
return MapCoord.create(
{"skycoord": event_positions, "energy": map_coord["energy"]}
)
[docs] @classmethod
def from_energy_dependent_table_psf(cls, table_psf):
"""Create PSF map from table PSF object.
Helper function to create an allsky PSF map from
table PSF, which does not depend on position.
Parameters
----------
table_psf : `EnergyDependentTablePSF`
Table PSF
Returns
-------
psf_map : `PSFMap`
Point spread function map.
"""
energy_axis = MapAxis.from_nodes(table_psf.energy, name="energy", interp="log")
rad_axis = MapAxis.from_nodes(table_psf.rad, name="theta")
geom = WcsGeom.create(
npix=(4, 2), proj="CAR", binsz=180, axes=[rad_axis, energy_axis]
)
coords = geom.get_coord()
# TODO: support broadcasting in .evaluate()
data = table_psf._interpolate((coords["energy"], coords["theta"])).to_value(
"sr-1"
)
psf_map = Map.from_geom(geom, data=data, unit="sr-1")
geom_exposure = geom.squash(axis="theta")
data = table_psf.exposure.reshape((-1, 1, 1, 1))
exposure_map = Map.from_geom(geom_exposure, unit="cm2 s")
exposure_map.quantity += data
return cls(psf_map=psf_map, exposure_map=exposure_map)
[docs] def cutout(self, position, width, mode="trim"):
"""Cutout map psf map.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
Center position of the cutout region.
width : tuple of `~astropy.coordinates.Angle`
Angular sizes of the region in (lon, lat) in that specific order.
If only one value is passed, a square region is extracted.
mode : {'trim', 'partial', 'strict'}
Mode option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`.
Returns
-------
cutout : `PSFMap`
Cutout psf map.
"""
psf_map = self.psf_map.cutout(position, width, mode)
exposure_map = self.exposure_map.cutout(position, width, mode)
return self.__class__(psf_map=psf_map, exposure_map=exposure_map)