# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Spatial models."""
import logging
import os
import numpy as np
import scipy.integrate
import scipy.special
from scipy.interpolate import griddata
import astropy.units as u
from astropy.coordinates import Angle, SkyCoord
from astropy.coordinates.angle_utilities import angular_separation, position_angle
from astropy.utils import lazyproperty
from regions import (
CircleAnnulusSkyRegion,
CircleSkyRegion,
EllipseSkyRegion,
PointSkyRegion,
RectangleSkyRegion,
)
import matplotlib.pyplot as plt
from gammapy.maps import Map, MapCoord, WcsGeom
from gammapy.modeling import Parameter, Parameters
from gammapy.utils.deprecation import deprecated
from gammapy.utils.gauss import Gauss2DPDF
from gammapy.utils.interpolation import interpolation_scale
from gammapy.utils.regions import region_circle_to_ellipse, region_to_frame
from gammapy.utils.scripts import make_path
from .core import ModelBase
__all__ = [
"ConstantFluxSpatialModel",
"ConstantSpatialModel",
"DiskSpatialModel",
"GaussianSpatialModel",
"GeneralizedGaussianSpatialModel",
"PointSpatialModel",
"Shell2SpatialModel",
"ShellSpatialModel",
"SpatialModel",
"TemplateSpatialModel",
"PiecewiseNormSpatialModel",
]
log = logging.getLogger(__name__)
MAX_OVERSAMPLING = 200
def compute_sigma_eff(lon_0, lat_0, lon, lat, phi, major_axis, e):
"""Effective radius, used for the evaluation of elongated models"""
phi_0 = position_angle(lon_0, lat_0, lon, lat)
d_phi = phi - phi_0
minor_axis = Angle(major_axis * np.sqrt(1 - e**2))
a2 = (major_axis * np.sin(d_phi)) ** 2
b2 = (minor_axis * np.cos(d_phi)) ** 2
denominator = np.sqrt(a2 + b2)
sigma_eff = major_axis * minor_axis / denominator
return minor_axis, sigma_eff
[docs]class SpatialModel(ModelBase):
"""Spatial model base class."""
_type = "spatial"
def __init__(self, **kwargs):
frame = kwargs.pop("frame", "icrs")
super().__init__(**kwargs)
if not hasattr(self, "frame"):
self.frame = frame
[docs] def __call__(self, lon, lat, energy=None):
"""Call evaluate method"""
kwargs = {par.name: par.quantity for par in self.parameters}
if energy is None and self.is_energy_dependent:
raise ValueError("Missing energy value for evaluation")
if energy is not None:
kwargs["energy"] = energy
return self.evaluate(lon, lat, **kwargs)
@property
def evaluation_bin_size_min(self):
return None
# TODO: make this a hard-coded class attribute?
@lazyproperty
def is_energy_dependent(self):
varnames = self.evaluate.__code__.co_varnames
return "energy" in varnames
@property
def position(self):
"""Spatial model center position (`~astropy.coordinates.SkyCoord`)"""
lon = self.lon_0.quantity
lat = self.lat_0.quantity
return SkyCoord(lon, lat, frame=self.frame)
@position.setter
def position(self, skycoord):
"""Spatial model center position"""
coord = skycoord.transform_to(self.frame)
self.lon_0.quantity = coord.data.lon
self.lat_0.quantity = coord.data.lat
@property
def position_lonlat(self):
"""Spatial model center position `(lon, lat)` in rad and frame of the model"""
lon = self.lon_0.quantity.to_value(u.rad)
lat = self.lat_0.quantity.to_value(u.rad)
return lon, lat
# TODO: get rid of this!
_phi_0 = 0.0
@property
def phi_0(self):
return self._phi_0
@phi_0.setter
def phi_0(self, phi_0=0.0):
self._phi_0 = phi_0
@property
def position_error(self):
"""Get 95% containment position error as (`~regions.EllipseSkyRegion`)"""
if self.covariance is None:
raise ValueError("No position error information available.")
pars = self.parameters
sub_covar = self.covariance.get_subcovariance(["lon_0", "lat_0"]).data.copy()
cos_lat = np.cos(self.lat_0.quantity.to_value("rad"))
sub_covar[0, 0] *= cos_lat**2.0
sub_covar[0, 1] *= cos_lat
sub_covar[1, 0] *= cos_lat
eig_vals, eig_vecs = np.linalg.eig(sub_covar)
lon_err, lat_err = np.sqrt(eig_vals)
y_vec = eig_vecs[:, 0]
phi = (np.arctan2(y_vec[1], y_vec[0]) * u.rad).to("deg") + self.phi_0
err = np.sort([lon_err, lat_err])
scale_r95 = Gauss2DPDF(sigma=1).containment_radius(0.95)
err *= scale_r95
if err[1] == lon_err * scale_r95:
phi += 90 * u.deg
height = 2 * err[1] * pars["lon_0"].unit
width = 2 * err[0] * pars["lat_0"].unit
else:
height = 2 * err[1] * pars["lat_0"].unit
width = 2 * err[0] * pars["lon_0"].unit
return EllipseSkyRegion(
center=self.position, height=height, width=width, angle=phi
)
[docs] def evaluate_geom(self, geom):
"""Evaluate model on `~gammapy.maps.Geom`
Parameters
----------
geom : `~gammapy.maps.WcsGeom`
Map geometry
Returns
-------
map : `~gammapy.maps.Map`
Map containing the value in each spatial bin.
"""
coords = geom.get_coord(frame=self.frame, sparse=True)
if self.is_energy_dependent:
return self(coords.lon, coords.lat, energy=coords["energy_true"])
else:
return self(coords.lon, coords.lat)
[docs] def integrate_geom(self, geom, oversampling_factor=None):
"""Integrate model on `~gammapy.maps.Geom` or `~gammapy.maps.RegionGeom`.
Integration is performed by simple rectangle approximation, the pixel center model value
is multiplied by the pixel solid angle.
An oversampling factor can be used for precision. By default, this parameter is set to None
and an oversampling factor is automatically estimated based on the model estimation maximal
bin width.
For a RegionGeom, the model is integrated on a tangent WCS projection in the region.
Parameters
----------
geom : `~gammapy.maps.WcsGeom` or `~gammapy.maps.RegionGeom`
The geom on which the integration is performed
oversampling_factor : int or None
The oversampling factor to use for integration.
Default is None: the factor is estimated from the model minimimal bin size
Returns
-------
map : `~gammapy.maps.Map` or `gammapy.maps.RegionNDMap`
Map containing the integral value in each spatial bin.
"""
wcs_geom = geom
mask = None
if geom.is_region:
wcs_geom = geom.to_wcs_geom().to_image()
result = Map.from_geom(geom=wcs_geom)
pix_scale = np.max(wcs_geom.pixel_scales.to_value("deg"))
if oversampling_factor is None:
if self.evaluation_bin_size_min is not None:
res_scale = self.evaluation_bin_size_min.to_value("deg")
if res_scale > 0:
oversampling_factor = np.minimum(
int(np.ceil(pix_scale / res_scale)), MAX_OVERSAMPLING
)
else:
oversampling_factor = MAX_OVERSAMPLING
else:
oversampling_factor = 1
if oversampling_factor > 1:
if self.evaluation_radius is not None:
# Is it still needed?
width = 2 * np.maximum(
self.evaluation_radius.to_value("deg"), pix_scale
)
wcs_geom = wcs_geom.cutout(self.position, width)
upsampled_geom = wcs_geom.upsample(oversampling_factor, axis_name=None)
# assume the upsampled solid angles are approximately factor**2 smaller
values = self.evaluate_geom(upsampled_geom) / oversampling_factor**2
upsampled = Map.from_geom(upsampled_geom, unit=values.unit)
upsampled += values
if geom.is_region:
mask = geom.contains(upsampled_geom.get_coord()).astype("int")
integrated = upsampled.downsample(
oversampling_factor, preserve_counts=True, weights=mask
)
# Finally stack result
result._unit = integrated.unit
result.stack(integrated)
else:
values = self.evaluate_geom(wcs_geom)
result._unit = values.unit
result += values
result *= result.geom.solid_angle()
if geom.is_region:
mask = result.geom.region_mask([geom.region])
result = Map.from_geom(
geom, data=np.sum(result.data[mask]), unit=result.unit
)
return result
[docs] def to_dict(self, full_output=False):
"""Create dict for YAML serilisation"""
data = super().to_dict(full_output)
data["spatial"]["frame"] = self.frame
data["spatial"]["parameters"] = data["spatial"].pop("parameters")
return data
def _get_plot_map(self, geom):
if self.evaluation_radius is None and geom is None:
raise ValueError(
f"{self.__class__.__name__} requires geom to be defined for plotting."
)
if geom is None:
width = 2 * max(self.evaluation_radius, 0.1 * u.deg)
geom = WcsGeom.create(
skydir=self.position, frame=self.frame, width=width, binsz=0.02
)
data = self.evaluate_geom(geom)
return Map.from_geom(geom, data=data.value, unit=data.unit)
[docs] def plot(self, ax=None, geom=None, **kwargs):
"""Plot spatial model.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
geom : `~gammapy.maps.WcsGeom`, optional
Geom to use for plotting.
**kwargs : dict
Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
Axis
"""
m = self._get_plot_map(geom)
if not m.geom.is_flat:
raise TypeError(
"Use .plot_interactive() or .plot_grid() for Map dimension > 2"
)
return m.plot(ax=ax, **kwargs)
[docs] @deprecated("v1.0.1", alternative="plot_interactive")
def plot_interative(self, ax=None, geom=None, **kwargs):
"""Plot spatial model.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
geom : `~gammapy.maps.WcsGeom`, optional
Geom to use for plotting.
**kwargs : dict
Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
Axis
"""
m = self._get_plot_map(geom)
if m.geom.is_image:
raise TypeError("Use .plot() for 2D Maps")
m.plot_interactive(ax=ax, **kwargs)
[docs] def plot_interactive(self, ax=None, geom=None, **kwargs):
"""Plot spatial model.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
geom : `~gammapy.maps.WcsGeom`, optional
Geom to use for plotting.
**kwargs : dict
Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
Axis
"""
m = self._get_plot_map(geom)
if m.geom.is_image:
raise TypeError("Use .plot() for 2D Maps")
m.plot_interactive(ax=ax, **kwargs)
[docs] def plot_error(self, ax=None, **kwargs):
"""Plot position error
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
**kwargs : dict
Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
Axis
"""
# plot center position
lon, lat = self.lon_0.value, self.lat_0.value
ax = plt.gca() if ax is None else ax
kwargs.setdefault("marker", "x")
kwargs.setdefault("color", "red")
kwargs.setdefault("label", "position")
ax.scatter(lon, lat, transform=ax.get_transform(self.frame), **kwargs)
# plot position error
if not np.all(self.covariance.data == 0):
region = self.position_error.to_pixel(ax.wcs)
artist = region.as_artist(facecolor="none", edgecolor=kwargs["color"])
ax.add_artist(artist)
return ax
[docs] def plot_grid(self, geom=None, **kwargs):
"""Plot spatial model energy slices in a grid.
Parameters
----------
geom : `~gammapy.maps.WcsGeom`, optional
Geom to use for plotting.
**kwargs : dict
Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
Axis
"""
if (geom is None) or geom.is_image:
raise TypeError("Use .plot() for 2D Maps")
m = self._get_plot_map(geom)
m.plot_grid(**kwargs)
[docs] @classmethod
def from_position(cls, position, **kwargs):
"""Define the position of the model using a sky coord
The model will be created in the frame of the sky coord
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
Position
Returns
-------
model : `SpatialModel`
Spatial model
"""
lon_0, lat_0 = position.data.lon, position.data.lat
return cls(lon_0=lon_0, lat_0=lat_0, frame=position.frame.name, **kwargs)
@property
def evaluation_radius(self):
"""Evaluation radius"""
return None
@property
def evaluation_region(self):
"""Evaluation region"""
if hasattr(self, "to_region"):
return self.to_region()
elif self.evaluation_radius is not None:
return CircleSkyRegion(
center=self.position,
radius=self.evaluation_radius,
)
else:
return None
[docs]class PointSpatialModel(SpatialModel):
r"""Point Source.
For more information see :ref:`point-spatial-model`.
Parameters
----------
lon_0, lat_0 : `~astropy.coordinates.Angle`
Center position
frame : {"icrs", "galactic"}
Center position coordinate frame
"""
tag = ["PointSpatialModel", "point"]
lon_0 = Parameter("lon_0", "0 deg")
lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90)
is_energy_dependent = False
@property
def evaluation_bin_size_min(self):
"""Minimal evaluation bin size (`~astropy.coordinates.Angle`)."""
return 0 * u.deg
@property
def evaluation_radius(self):
"""Evaluation radius (`~astropy.coordinates.Angle`).
Set as zero degrees.
"""
return 0 * u.deg
@staticmethod
def _grid_weights(x, y, x0, y0):
"""Compute 4-pixel weights such that centroid is preserved."""
dx = np.abs(x - x0)
dx = np.where(dx < 1, 1 - dx, 0)
dy = np.abs(y - y0)
dy = np.where(dy < 1, 1 - dy, 0)
return dx * dy
[docs] def is_energy_dependent(self):
return False
[docs] def evaluate_geom(self, geom):
"""Evaluate model on `~gammapy.maps.Geom`."""
values = self.integrate_geom(geom).data
return values / geom.solid_angle()
[docs] def integrate_geom(self, geom, oversampling_factor=None):
"""Integrate model on `~gammapy.maps.Geom`
Parameters
----------
geom : `Geom`
Map geometry
Returns
-------
flux : `Map`
Predicted flux map
"""
geom_image = geom.to_image()
if geom.is_hpx:
idx, weights = geom_image.interp_weights({"skycoord": self.position})
data = np.zeros(geom_image.data_shape)
data[tuple(idx)] = weights
else:
x, y = geom_image.get_pix()
x0, y0 = self.position.to_pixel(geom.wcs)
data = self._grid_weights(x, y, x0, y0)
return Map.from_geom(geom=geom_image, data=data, unit="")
[docs] def to_region(self, **kwargs):
"""Model outline (`~regions.PointSkyRegion`)."""
return PointSkyRegion(center=self.position, **kwargs)
[docs]class GaussianSpatialModel(SpatialModel):
r"""Two-dimensional Gaussian model.
For more information see :ref:`gaussian-spatial-model`.
Parameters
----------
lon_0, lat_0 : `~astropy.coordinates.Angle`
Center position
sigma : `~astropy.coordinates.Angle`
Length of the major semiaxis of the Gaussian, in angular units.
e : `float`
Eccentricity of the Gaussian (:math:`0< e< 1`).
phi : `~astropy.coordinates.Angle`
Rotation angle :math:`\phi`: of the major semiaxis.
Increases counter-clockwise from the North direction.
frame : {"icrs", "galactic"}
Center position coordinate frame
"""
tag = ["GaussianSpatialModel", "gauss"]
lon_0 = Parameter("lon_0", "0 deg")
lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90)
sigma = Parameter("sigma", "1 deg", min=0)
e = Parameter("e", 0, min=0, max=1, frozen=True)
phi = Parameter("phi", "0 deg", frozen=True)
@property
def evaluation_bin_size_min(self):
"""Minimal evaluation bin size (`~astropy.coordinates.Angle`) chosen as sigma/3."""
return self.parameters["sigma"].quantity / 3.0
@property
def evaluation_radius(self):
r"""Evaluation radius (`~astropy.coordinates.Angle`).
Set as :math:`5\sigma`.
"""
return 5 * self.parameters["sigma"].quantity
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, sigma, e, phi):
"""Evaluate model."""
sep = angular_separation(lon, lat, lon_0, lat_0)
if e == 0:
a = 1.0 - np.cos(sigma)
norm = (1 / (4 * np.pi * a * (1.0 - np.exp(-1.0 / a)))).value
else:
minor_axis, sigma_eff = compute_sigma_eff(
lon_0, lat_0, lon, lat, phi, sigma, e
)
a = 1.0 - np.cos(sigma_eff)
norm = (1 / (2 * np.pi * sigma * minor_axis)).to_value("sr-1")
exponent = -0.5 * ((1 - np.cos(sep)) / a)
return u.Quantity(norm * np.exp(exponent).value, "sr-1", copy=False)
[docs] def to_region(self, x_sigma=1.5, **kwargs):
r"""Model outline at a given number of :math:`\sigma`.
Parameters
----------
x_sigma : float
Number of :math:`\sigma
Default is :math:`1.5\sigma` which corresponds to about 68%
containment for a 2D symmetric Gaussian.
Returns
-------
region : `~regions.EllipseSkyRegion`
Model outline.
"""
minor_axis = Angle(self.sigma.quantity * np.sqrt(1 - self.e.quantity**2))
return EllipseSkyRegion(
center=self.position,
height=2 * x_sigma * self.sigma.quantity,
width=2 * x_sigma * minor_axis,
angle=self.phi.quantity,
**kwargs,
)
@property
def evaluation_region(self):
"""Evaluation region consistent with evaluation radius"""
return self.to_region(x_sigma=5)
[docs]class GeneralizedGaussianSpatialModel(SpatialModel):
r"""Two-dimensional Generealized Gaussian model.
For more information see :ref:`generalized-gaussian-spatial-model`.
Parameters
----------
lon_0, lat_0 : `~astropy.coordinates.Angle`
Center position
r_0 : `~astropy.coordinates.Angle`
Length of the major semiaxis, in angular units.
eta : `float`
Shape parameter whitin (0, 1]. Special cases for disk: ->0, Gaussian: 0.5, Laplace:1
e : `float`
Eccentricity (:math:`0< e< 1`).
phi : `~astropy.coordinates.Angle`
Rotation angle :math:`\phi`: of the major semiaxis.
Increases counter-clockwise from the North direction.
frame : {"icrs", "galactic"}
Center position coordinate frame
"""
tag = ["GeneralizedGaussianSpatialModel", "gauss-general"]
lon_0 = Parameter("lon_0", "0 deg")
lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90)
r_0 = Parameter("r_0", "1 deg")
eta = Parameter("eta", 0.5, min=0.01, max=1.0)
e = Parameter("e", 0.0, min=0.0, max=1.0, frozen=True)
phi = Parameter("phi", "0 deg", frozen=True)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, r_0, eta, e, phi):
sep = angular_separation(lon, lat, lon_0, lat_0)
if isinstance(eta, u.Quantity):
eta = eta.value # gamma function does not allow quantities
minor_axis, r_eff = compute_sigma_eff(lon_0, lat_0, lon, lat, phi, r_0, e)
z = sep / r_eff
norm = 1 / (2 * np.pi * minor_axis * r_0 * eta * scipy.special.gamma(2 * eta))
return (norm * np.exp(-(z ** (1 / eta)))).to("sr-1")
@property
def evaluation_bin_size_min(self):
"""Minimal evaluation bin size (`~astropy.coordinates.Angle`).
The bin min size is defined as r_0/(3+8*eta)/(e+1).
"""
return self.r_0.quantity / (3 + 8 * self.eta.value) / (self.e.value + 1)
@property
def evaluation_radius(self):
r"""Evaluation radius (`~astropy.coordinates.Angle`).
The evaluation radius is defined as r_eval = r_0*(1+8*eta) so it verifies:
r_eval -> r_0 if eta -> 0
r_eval = 5*r_0 > 5*sigma_gauss = 5*r_0/sqrt(2) ~ 3.5*r_0 if eta=0.5
r_eval = 9*r_0 > 5*sigma_laplace = 5*sqrt(2)*r_0 ~ 7*r_0 if eta = 1
r_eval -> inf if eta -> inf
"""
return self.r_0.quantity * (1 + 8 * self.eta.value)
[docs] def to_region(self, x_r_0=1, **kwargs):
"""Model outline at a given number of r_0.
Parameters
----------
x_r_0 : float
Number of r_0 (Default is 1).
Returns
-------
region : `~regions.EllipseSkyRegion`
Model outline.
"""
minor_axis = Angle(self.r_0.quantity * np.sqrt(1 - self.e.quantity**2))
return EllipseSkyRegion(
center=self.position,
height=2 * x_r_0 * self.r_0.quantity,
width=2 * x_r_0 * minor_axis,
angle=self.phi.quantity,
**kwargs,
)
@property
def evaluation_region(self):
"""Evaluation region consistent with evaluation radius"""
scale = self.evaluation_radius / self.r_0.quantity
return self.to_region(x_r_0=scale)
[docs]class DiskSpatialModel(SpatialModel):
r"""Constant disk model.
For more information see :ref:`disk-spatial-model`.
Parameters
----------
lon_0, lat_0 : `~astropy.coordinates.Angle`
Center position
r_0 : `~astropy.coordinates.Angle`
:math:`a`: length of the major semiaxis, in angular units.
e : `float`
Eccentricity of the ellipse (:math:`0< e< 1`).
phi : `~astropy.coordinates.Angle`
Rotation angle :math:`\phi`: of the major semiaxis.
Increases counter-clockwise from the North direction.
edge_width : float
Width of the edge. The width is defined as the range within which
the smooth edge of the model drops from 95% to 5% of its amplitude.
It is given as fraction of r_0.
frame : {"icrs", "galactic"}
Center position coordinate frame
"""
tag = ["DiskSpatialModel", "disk"]
lon_0 = Parameter("lon_0", "0 deg")
lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90)
r_0 = Parameter("r_0", "1 deg", min=0)
e = Parameter("e", 0, min=0, max=1, frozen=True)
phi = Parameter("phi", "0 deg", frozen=True)
edge_width = Parameter("edge_width", value=0.01, min=0, max=1, frozen=True)
@property
def evaluation_bin_size_min(self):
"""Minimal evaluation bin size (`~astropy.coordinates.Angle`).
The bin min size is defined as r_0*(1-edge_width)/10.
"""
return self.r_0.quantity * (1 - self.edge_width.quantity) / 10.0
@property
def evaluation_radius(self):
"""Evaluation radius (`~astropy.coordinates.Angle`).
Set to the length of the semi-major axis plus the edge width.
"""
return 1.1 * self.r_0.quantity * (1 + self.edge_width.quantity)
@staticmethod
def _evaluate_norm_factor(r_0, e):
"""Compute the normalization factor."""
semi_minor = r_0 * np.sqrt(1 - e**2)
def integral_fcn(x, a, b):
A = 1 / np.sin(a) ** 2
B = 1 / np.sin(b) ** 2
C = A - B
cs2 = np.cos(x) ** 2
return 1 - np.sqrt(1 - 1 / (B + C * cs2))
return (
2
* scipy.integrate.quad(
lambda x: integral_fcn(x, r_0, semi_minor), 0, np.pi
)[0]
) ** -1
@staticmethod
def _evaluate_smooth_edge(x, width):
value = (x / width).to_value("")
edge_width_95 = 2.326174307353347
return 0.5 * (1 - scipy.special.erf(value * edge_width_95))
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, r_0, e, phi, edge_width):
"""Evaluate model."""
sep = angular_separation(lon, lat, lon_0, lat_0)
if e == 0:
sigma_eff = r_0
else:
sigma_eff = compute_sigma_eff(lon_0, lat_0, lon, lat, phi, r_0, e)[1]
norm = DiskSpatialModel._evaluate_norm_factor(r_0, e)
in_ellipse = DiskSpatialModel._evaluate_smooth_edge(
sep - sigma_eff, sigma_eff * edge_width
)
return u.Quantity(norm * in_ellipse, "sr-1", copy=False)
[docs] def to_region(self, **kwargs):
"""Model outline (`~regions.EllipseSkyRegion`)."""
minor_axis = Angle(self.r_0.quantity * np.sqrt(1 - self.e.quantity**2))
return EllipseSkyRegion(
center=self.position,
height=2 * self.r_0.quantity,
width=2 * minor_axis,
angle=self.phi.quantity,
**kwargs,
)
[docs] @classmethod
def from_region(cls, region, **kwargs):
"""Create a `DiskSpatialModel from a ~regions.EllipseSkyRegion`
Parameters
----------
region : `~regions.EllipseSkyRegion` or ~regions.CircleSkyRegion`
region to create model from
kwargs : keywords passed to `~gammapy.modeling.models.DiskSpatialModel`
Returns
-------
spatial_model : `~gammapy.modeling.models.DiskSpatialModel`
"""
if isinstance(region, CircleSkyRegion):
region = region_circle_to_ellipse(region)
if not isinstance(region, EllipseSkyRegion):
raise ValueError(
f"Please provide a `CircleSkyRegion` "
f"or `EllipseSkyRegion`, got {type(region)} instead."
)
frame = kwargs.pop("frame", region.center.frame)
region = region_to_frame(region, frame=frame)
if region.height > region.width:
major_axis, minor_axis = region.height, region.width
phi = region.angle
else:
minor_axis, major_axis = region.height, region.width
phi = 90 * u.deg + region.angle
kwargs.setdefault("phi", phi)
kwargs.setdefault("e", np.sqrt(1.0 - np.power(minor_axis / major_axis, 2)))
kwargs.setdefault("r_0", major_axis / 2.0)
return cls.from_position(region.center, **kwargs)
[docs]class ShellSpatialModel(SpatialModel):
r"""Shell model.
For more information see :ref:`shell-spatial-model`.
Parameters
----------
lon_0, lat_0 : `~astropy.coordinates.Angle`
Center position
radius : `~astropy.coordinates.Angle`
Inner radius, :math:`r_{in}`
width : `~astropy.coordinates.Angle`
Shell width
frame : {"icrs", "galactic"}
Center position coordinate frame
See Also
--------
Shell2SpatialModel
"""
tag = ["ShellSpatialModel", "shell"]
lon_0 = Parameter("lon_0", "0 deg")
lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90)
radius = Parameter("radius", "1 deg")
width = Parameter("width", "0.2 deg")
@property
def evaluation_bin_size_min(self):
"""Minimal evaluation bin size (`~astropy.coordinates.Angle`).
The bin min size is defined as the shell width.
"""
return self.width.quantity
@property
def evaluation_radius(self):
r"""Evaluation radius (`~astropy.coordinates.Angle`).
Set to :math:`r_\text{out}`.
"""
return self.radius.quantity + self.width.quantity
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, radius, width):
"""Evaluate model."""
sep = angular_separation(lon, lat, lon_0, lat_0)
radius_out = radius + width
norm = 3 / (2 * np.pi * (radius_out**3 - radius**3))
with np.errstate(invalid="ignore"):
# np.where and np.select do not work with quantities, so we use the
# workaround with indexing
value = np.sqrt(radius_out**2 - sep**2)
mask = sep < radius
value[mask] = (value - np.sqrt(radius**2 - sep**2))[mask]
value[sep > radius_out] = 0
return norm * value
[docs] def to_region(self, **kwargs):
"""Model outline (`~regions.CircleAnnulusSkyRegion`)."""
return CircleAnnulusSkyRegion(
center=self.position,
inner_radius=self.radius.quantity,
outer_radius=self.radius.quantity + self.width.quantity,
**kwargs,
)
[docs]class Shell2SpatialModel(SpatialModel):
r"""Shell model with outer radius and relative width parametrization
For more information see :ref:`shell2-spatial-model`.
Parameters
----------
lon_0, lat_0 : `~astropy.coordinates.Angle`
Center position
r_0 : `~astropy.coordinates.Angle`
Outer radius, :math:`r_{out}`
eta : float
Shell width relative to outer radius, r_0, should be within (0,1]
frame : {"icrs", "galactic"}
Center position coordinate frame
See Also
--------
ShellSpatialModel
"""
tag = ["Shell2SpatialModel", "shell2"]
lon_0 = Parameter("lon_0", "0 deg")
lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90)
r_0 = Parameter("r_0", "1 deg")
eta = Parameter("eta", 0.2, min=0.02, max=1)
@property
def evaluation_bin_size_min(self):
"""Minimal evaluation bin size (`~astropy.coordinates.Angle`).
The bin min size is defined as r_0*eta.
"""
return self.eta.value * self.r_0.quantity
@property
def evaluation_radius(self):
r"""Evaluation radius (`~astropy.coordinates.Angle`).
Set to :math:`r_\text{out}`.
"""
return self.r_0.quantity
@property
def r_in(self):
return (1 - self.eta.quantity) * self.r_0.quantity
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, r_0, eta):
"""Evaluate model."""
sep = angular_separation(lon, lat, lon_0, lat_0)
r_in = (1 - eta) * r_0
norm = 3 / (2 * np.pi * (r_0**3 - r_in**3))
with np.errstate(invalid="ignore"):
# np.where and np.select do not work with quantities, so we use the
# workaround with indexing
value = np.sqrt(r_0**2 - sep**2)
mask = sep < r_in
value[mask] = (value - np.sqrt(r_in**2 - sep**2))[mask]
value[sep > r_0] = 0
return norm * value
[docs] def to_region(self, **kwargs):
"""Model outline (`~regions.CircleAnnulusSkyRegion`)."""
return CircleAnnulusSkyRegion(
center=self.position,
inner_radius=self.r_in,
outer_radius=self.r_0.quantity,
**kwargs,
)
[docs]class ConstantSpatialModel(SpatialModel):
"""Spatially constant (isotropic) spatial model.
For more information see :ref:`constant-spatial-model`.
Parameters
----------
value : `~astropy.units.Quantity`
Value
"""
tag = ["ConstantSpatialModel", "const"]
value = Parameter("value", "1 sr-1", frozen=True)
frame = "icrs"
evaluation_radius = None
position = None
[docs] def to_dict(self, full_output=False):
"""Create dict for YAML serilisation"""
# redefined to ignore frame attribute from parent class
data = super().to_dict(full_output)
data["spatial"].pop("frame")
data["spatial"]["parameters"] = []
return data
[docs] @staticmethod
def evaluate(lon, lat, value):
"""Evaluate model."""
return value
[docs] def to_region(self, **kwargs):
"""Model outline (`~regions.RectangleSkyRegion`)."""
return RectangleSkyRegion(
center=SkyCoord(0 * u.deg, 0 * u.deg, frame=self.frame),
height=180 * u.deg,
width=360 * u.deg,
**kwargs,
)
[docs]class ConstantFluxSpatialModel(SpatialModel):
"""Spatially constant flux spatial model.
For more information see :ref:`constant-spatial-model`.
"""
tag = ["ConstantFluxSpatialModel", "const-flux"]
frame = "icrs"
evaluation_radius = None
position = None
[docs] def to_dict(self, full_output=False):
"""Create dict for YAML serilisation"""
# redefined to ignore frame attribute from parent class
data = super().to_dict(full_output)
data["spatial"].pop("frame")
return data
[docs] @staticmethod
def evaluate(lon, lat):
"""Evaluate model."""
return 1 / u.sr
[docs] @staticmethod
def evaluate_geom(geom):
"""Evaluate model."""
return 1 / geom.solid_angle()
[docs] @staticmethod
def integrate_geom(geom, oversampling_factor=None):
"""Evaluate model."""
return Map.from_geom(geom=geom, data=1)
[docs] def to_region(self, **kwargs):
"""Model outline (`~regions.RectangleSkyRegion`)."""
return RectangleSkyRegion(
center=SkyCoord(0 * u.deg, 0 * u.deg, frame=self.frame),
height=180 * u.deg,
width=360 * u.deg,
**kwargs,
)
[docs]class TemplateSpatialModel(SpatialModel):
"""Spatial sky map template model.
For more information see :ref:`template-spatial-model`.
Parameters
----------
map : `~gammapy.maps.Map`
Map template.
meta : dict, optional
Meta information, meta['filename'] will be used for serialization
normalize : bool
Normalize the input map so that it integrates to unity.
interp_kwargs : dict
Interpolation keyword arguments passed to `gammapy.maps.Map.interp_by_coord`.
Default arguments are {'method': 'linear', 'fill_value': 0, "values_scale": "log"}.
Filename : str
Name of the map file
copy_data : bool
Create a deepcopy of the map data or directly use the original. True by
default, can be turned to False to save memory in case of large maps.
**kwargs : dict
Keyword arguments forwarded to `SpatialModel.__init__`.
"""
tag = ["TemplateSpatialModel", "template"]
def __init__(
self,
map,
meta=None,
normalize=True,
interp_kwargs=None,
filename=None,
copy_data=True,
**kwargs,
):
if (map.data < 0).any():
log.warning("Map has negative values. Check and fix this!")
if filename is not None:
filename = str(make_path(filename))
self.normalize = normalize
if normalize:
# Normalize the diffuse map model so that it integrates to unity
if map.geom.is_image:
data_sum = map.data.sum()
else:
# Normalize in each energy bin
data_sum = map.data.sum(axis=(1, 2)).reshape((-1, 1, 1))
data = map.data / data_sum
data /= map.geom.solid_angle().to_value("sr")
map = map.copy(data=data, unit="sr-1")
if map.unit.is_equivalent(""):
map = map.copy(data=map.data, unit="sr-1")
log.warning("Missing spatial template unit, assuming sr^-1")
if copy_data:
self._map = map.copy()
else:
self._map = map.copy(data=map.data)
self.meta = {} if meta is None else meta
interp_kwargs = {} if interp_kwargs is None else interp_kwargs
interp_kwargs.setdefault("method", "linear")
interp_kwargs.setdefault("fill_value", 0)
interp_kwargs.setdefault("values_scale", "log")
self._interp_kwargs = interp_kwargs
self.filename = filename
super().__init__(**kwargs)
[docs] def copy(self, copy_data=False, **kwargs):
"""Copy model
Parameters
----------
copy_data : bool
Whether to copy the data.
**kwargs : dict
Keyword arguments forwarded to `TemplateSpatialModel`
Returns
-------
model : `TemplateSpatialModel`
Copied template spatial model.
"""
kwargs.setdefault("map", self.map)
kwargs.setdefault("meta", self.meta.copy())
kwargs.setdefault("normalize", self.normalize)
kwargs.setdefault("interp_kwargs", self._interp_kwargs)
kwargs.setdefault("filename", self.filename)
return self.__class__(copy_data=copy_data, **kwargs)
@property
def map(self):
"""Template map (`~gammapy.maps.Map`)"""
return self._map
@property
def is_energy_dependent(self):
return "energy_true" in self.map.geom.axes.names
@property
def evaluation_radius(self):
"""Evaluation radius (`~astropy.coordinates.Angle`).
Set to half of the maximal dimension of the map.
"""
return np.max(self.map.geom.width) / 2.0
[docs] @classmethod
def read(cls, filename, normalize=True, **kwargs):
"""Read spatial template model from FITS image.
If unit is not given in the FITS header the default is ``sr-1``.
Parameters
----------
filename : str
FITS image filename.
normalize : bool
Normalize the input map so that it integrates to unity.
kwargs : dict
Keyword arguments passed to `Map.read()`.
"""
m = Map.read(filename, **kwargs)
return cls(m, normalize=normalize, filename=filename)
[docs] def evaluate(self, lon, lat, energy=None):
"""Evaluate the model at given coordinates.
Note that, if the map data assume negative values, these are
clipped to zero.
"""
coord = {
"lon": lon.to_value("deg"),
"lat": lat.to_value("deg"),
}
if energy is not None:
coord["energy_true"] = energy
val = self.map.interp_by_coord(coord, **self._interp_kwargs)
val = np.clip(val, 0, a_max=None)
return u.Quantity(val, self.map.unit, copy=False)
@property
def position(self):
"""`~astropy.coordinates.SkyCoord`"""
return self.map.geom.center_skydir
@property
def position_lonlat(self):
"""Spatial model center position `(lon, lat)` in rad and frame of the model"""
lon = self.position.data.lon.rad
lat = self.position.data.lat.rad
return lon, lat
@property
def frame(self):
return self.position.frame.name
[docs] @classmethod
def from_dict(cls, data):
data = data["spatial"]
filename = data["filename"]
normalize = data.get("normalize", True)
m = Map.read(filename)
return cls(m, normalize=normalize, filename=filename)
[docs] def to_dict(self, full_output=False):
"""Create dict for YAML serilisation"""
data = super().to_dict(full_output)
data["spatial"]["filename"] = self.filename
data["spatial"]["normalize"] = self.normalize
data["spatial"]["unit"] = str(self.map.unit)
return data
[docs] def write(self, overwrite=False):
if self.filename is None:
raise IOError("Missing filename")
elif os.path.isfile(make_path(self.filename)) and not overwrite:
log.warning("Template file already exits, and overwrite is False")
else:
self.map.write(self.filename, overwrite=overwrite)
[docs] def to_region(self, **kwargs):
"""Model outline from template map boundary (`~regions.RectangleSkyRegion`)."""
return RectangleSkyRegion(
center=self.map.geom.center_skydir,
width=self.map.geom.width[0][0],
height=self.map.geom.width[1][0],
**kwargs,
)
[docs] def plot(self, ax=None, geom=None, **kwargs):
if geom is None:
geom = self.map.geom
super().plot(ax=ax, geom=geom, **kwargs)
[docs] @deprecated("v1.0.1", alternative="plot_interactive")
def plot_interative(self, ax=None, geom=None, **kwargs):
if geom is None:
geom = self.map.geom
super().plot_interactive(ax=ax, geom=geom, **kwargs)
[docs] def plot_interactive(self, ax=None, geom=None, **kwargs):
if geom is None:
geom = self.map.geom
super().plot_interactive(ax=ax, geom=geom, **kwargs)
[docs]class PiecewiseNormSpatialModel(SpatialModel):
"""Piecewise spatial correction
with a free normalization at each fixed nodes.
For more information see :ref:`piecewise-norm-spatial`.
Parameters
----------
coord : `gammapy.maps.MapCoord`
Flat coordinates list at which the model values are given (nodes).
norms : `~numpy.ndarray` or list of `Parameter`
Array with the initial norms of the model at energies ``energy``.
A normalisation parameters is created for each value.
Default is one at each node.
interp : str
Interpolation scaling in {"log", "lin"}. Default is "lin"
"""
tag = ["PiecewiseNormSpatialModel", "piecewise-norm"]
def __init__(self, coords, norms=None, interp="lin", **kwargs):
self._coords = coords
self._interp = interp
if norms is None:
norms = np.ones(coords.shape)
if len(norms) != coords.shape[0]:
raise ValueError("dimension mismatch")
if len(norms) < 4:
raise ValueError("Input arrays must contain at least 4 elements")
if self.is_energy_dependent:
raise ValueError("Energy dependent nodes are not supported")
if not isinstance(norms[0], Parameter):
parameters = Parameters(
[Parameter(f"norm_{k}", norm) for k, norm in enumerate(norms)]
)
else:
parameters = Parameters(norms)
self.default_parameters = parameters
super().__init__(**kwargs)
@property
def coords(self):
"""Energy nodes"""
return self._coords
@property
def norms(self):
"""Norm values"""
return u.Quantity([p.quantity for p in self.parameters])
@property
def is_energy_dependent(self):
keys = self.coords._data.keys()
return "energy" in keys or "energy_true" in keys
[docs] def evaluate(self, lon, lat, energy=None, **norms):
"""Evaluate the model at given coordinates."""
scale = interpolation_scale(scale=self._interp)
v_nodes = scale(self.norms.value)
coords = [value.value for value in self.coords._data.values()]
# TODO: apply axes scaling in this loop
coords = list(zip(*coords))
# by default rely on CloughTocher2DInterpolator
# (Piecewise cubic, C1 smooth, curvature-minimizing interpolant)
interpolated = griddata(coords, v_nodes, (lon, lat), method="cubic")
return scale.inverse(interpolated) * self.norms.unit
[docs] def evaluate_geom(self, geom):
"""Evaluate model on `~gammapy.maps.Geom`
Parameters
----------
geom : `~gammapy.maps.WcsGeom`
Map geometry
Returns
-------
map : `~gammapy.maps.Map`
Map containing the value in each spatial bin.
"""
coords = geom.get_coord(frame=self.frame, sparse=True)
return self(coords.lon, coords.lat)
[docs] def to_dict(self, full_output=False):
data = super().to_dict(full_output=full_output)
for key, value in self.coords._data.items():
data["spatial"][key] = {
"data": value.data.tolist(),
"unit": str(value.unit),
}
return data
[docs] @classmethod
def from_dict(cls, data):
"""Create model from dict"""
data = data["spatial"]
lon = u.Quantity(data["lon"]["data"], data["lon"]["unit"])
lat = u.Quantity(data["lat"]["data"], data["lat"]["unit"])
coords = MapCoord.create((lon, lat))
parameters = Parameters.from_dict(data["parameters"])
return cls.from_parameters(parameters, coords=coords, frame=data["frame"])
[docs] @classmethod
def from_parameters(cls, parameters, **kwargs):
"""Create model from parameters"""
return cls(norms=parameters, **kwargs)