# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from astropy.coordinates import SkyCoord
from gammapy.maps import Map
from gammapy.modeling.models.utils import cutout_template_models
from . import Datasets
__all__ = ["apply_edisp", "split_dataset"]
[docs]
def apply_edisp(input_map, edisp):
"""Apply energy dispersion to map. Requires "energy_true" axis.
Parameters
----------
input_map : `~gammapy.maps.Map`
The map to be convolved with the energy dispersion.
It must have an axis named "energy_true".
edisp : `~gammapy.irf.EDispKernel`
Energy dispersion matrix.
Returns
-------
map : `~gammapy.maps.Map`
Map with energy dispersion applied.
Examples
--------
>>> from gammapy.irf.edisp import EDispKernel
>>> from gammapy.datasets.utils import apply_edisp
>>> from gammapy.maps import MapAxis, Map
>>> import numpy as np
>>>
>>> axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=6, name="energy_true")
>>> m = Map.create(
... skydir=(0.8, 0.8),
... width=(1, 1),
... binsz=0.02,
... axes=[axis],
... frame="galactic"
... )
>>> e_true = m.geom.axes[0]
>>> e_reco = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
>>> edisp = EDispKernel.from_diagonal_response(energy_axis_true=e_true, energy_axis=e_reco)
>>> map_edisp = apply_edisp(m, edisp)
>>> print(map_edisp)
WcsNDMap
<BLANKLINE>
geom : WcsGeom
axes : ['lon', 'lat', 'energy']
shape : (np.int64(50), np.int64(50), 3)
ndim : 3
unit :
dtype : float64
<BLANKLINE>
"""
# TODO: either use sparse matrix multiplication or something like edisp.is_diagonal
if edisp is not None:
loc = input_map.geom.axes.index("energy_true")
data = np.rollaxis(input_map.data, loc, len(input_map.data.shape))
data = np.dot(data, edisp.pdf_matrix)
data = np.rollaxis(data, -1, loc)
energy_axis = edisp.axes["energy"].copy(name="energy")
else:
data = input_map.data
energy_axis = input_map.geom.axes["energy_true"].copy(name="energy")
geom = input_map.geom.to_image().to_cube(axes=[energy_axis])
return Map.from_geom(geom=geom, data=data, unit=input_map.unit)
def get_figure(fig, width, height):
import matplotlib.pyplot as plt
if plt.get_fignums():
if not fig:
fig = plt.gcf()
fig.clf()
else:
fig = plt.figure(figsize=(width, height))
return fig
def get_axes(ax1, ax2, width, height, args1, args2, kwargs1=None, kwargs2=None):
if not ax1 and not ax2:
kwargs1 = kwargs1 or {}
kwargs2 = kwargs2 or {}
fig = get_figure(None, width, height)
ax1 = fig.add_subplot(*args1, **kwargs1)
ax2 = fig.add_subplot(*args2, **kwargs2)
elif not ax1 or not ax2:
raise ValueError("Either both or no Axes must be provided")
return ax1, ax2
def get_nearest_valid_exposure_position(exposure, position=None):
mask_exposure = exposure > 0.0 * exposure.unit
mask_exposure = mask_exposure.reduce_over_axes(func=np.logical_or)
if not position:
position = mask_exposure.geom.center_skydir
return mask_exposure.mask_nearest_position(position)
[docs]
def split_dataset(dataset, width, margin, split_template_models=True):
"""Split dataset in multiple non-overlapping analysis regions.
Parameters
----------
dataset : `~gammapy.datasets.Dataset`
Dataset to split.
width : `~astropy.coordinates.Angle`
Angular size of each sub-region.
margin : `~astropy.coordinates.Angle`
Angular size to be added to the `width`.
The margin should be defined such as sources outside the region of interest
that contributes inside are well-defined.
The mask_fit in the margin region is False and unchanged elsewhere.
split_template_models : bool, optional
Apply cutout to template models or not. Default is True.
Returns
-------
datasets : `~gammapy.datasets.Datasets`
Split datasets.
Examples
--------
>>> from gammapy.datasets import MapDataset
>>> from gammapy.datasets.utils import split_dataset
>>> from gammapy.modeling.models import GaussianSpatialModel, PowerLawSpectralModel, SkyModel
>>> import astropy.units as u
>>> dataset = MapDataset.read("$GAMMAPY_DATA/cta-1dc-gc/cta-1dc-gc.fits.gz")
>>> # Split the dataset
>>> width = 4 * u.deg
>>> margin = 1 * u.deg
>>> split_datasets = split_dataset(dataset, width, margin, split_template_models=False)
>>> # Apply a model and split the dataset
>>> spatial_model = GaussianSpatialModel()
>>> spectral_model = PowerLawSpectralModel()
>>> sky_model = SkyModel(
... spatial_model=spatial_model, spectral_model=spectral_model, name="test-model"
... )
>>> dataset.models = [sky_model]
>>> split_datasets = split_dataset(
... dataset, width=width, margin=margin, split_template_models=True
... )
"""
if margin >= width / 2.0:
raise ValueError("margin should be lower than width/2.")
geom = dataset.counts.geom.to_image()
pixel_width = np.ceil((width / geom.pixel_scales).to_value("")).astype(int)
ilon = range(0, geom.data_shape[1], pixel_width[1])
ilat = range(0, geom.data_shape[0], pixel_width[0])
datasets = Datasets()
for il in ilon:
for ib in ilat:
l, b = geom.pix_to_coord((il, ib))
cutout_kwargs = dict(
position=SkyCoord(l, b, frame=geom.frame), width=width + 2 * margin
)
d = dataset.cutout(**cutout_kwargs)
# apply mask
geom_cut = d.counts.geom
geom_cut_image = geom_cut.to_image()
ilgrid, ibgrid = np.meshgrid(
range(geom_cut_image.data_shape[1]), range(geom_cut_image.data_shape[0])
)
il_cut, ib_cut = geom_cut_image.coord_to_pix((l, b))
mask = (ilgrid >= il_cut - pixel_width[1] / 2.0) & (
ibgrid >= ib_cut - pixel_width[0] / 2.0
)
if il == ilon[-1]:
mask &= ilgrid <= il_cut + pixel_width[1] / 2.0
else:
mask &= ilgrid < il_cut + pixel_width[1] / 2.0
if ib == ilat[-1]:
mask &= ibgrid <= ib_cut + pixel_width[0] / 2.0
else:
mask &= ibgrid < ib_cut + pixel_width[0] / 2.0
mask = np.expand_dims(mask, 0)
mask = np.repeat(mask, geom_cut.data_shape[0], axis=0)
d.mask_fit = Map.from_geom(geom_cut, data=mask)
if dataset.mask_fit is not None:
d.mask_fit &= dataset.mask_fit.interp_to_geom(
geom_cut, method="nearest"
)
# template models cutout (should limit memory usage in parallel)
if split_template_models:
d.models = cutout_template_models(
dataset.models, cutout_kwargs, [d.name]
)
else:
d.models = dataset.models
datasets.append(d)
return datasets