# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
import astropy.units as u
from astropy.coordinates.angle_utilities import angular_separation
from astropy.coordinates import Angle, Longitude, Latitude
from ...utils.fitting import Parameter, Parameters, Model
from ...maps import Map
__all__ = [
"SkySpatialModel",
"SkyPointSource",
"SkyGaussian",
"SkyDisk",
"SkyShell",
"SkyDiffuseConstant",
"SkyDiffuseMap",
]
log = logging.getLogger(__name__)
[docs]class SkySpatialModel(Model):
"""Sky spatial model base class."""
[docs] def __call__(self, lon, lat):
"""Call evaluate method"""
kwargs = dict()
for par in self.parameters.parameters:
kwargs[par.name] = par.quantity
return self.evaluate(lon, lat, **kwargs)
[docs]class SkyPointSource(SkySpatialModel):
r"""Point Source.
.. math::
\phi(lon, lat) = \delta{(lon - lon_0, lat - lat_0)}
A tolerance of 1 arcsecond is accepted for numerical stability
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
"""
def __init__(self, lon_0, lat_0):
self.parameters = Parameters(
[Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0))]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0):
"""Evaluate the model (static function)."""
wrapval = lon_0 + 180 * u.deg
lon = Angle(lon).wrap_at(wrapval)
_, grad_lon = np.gradient(lon)
grad_lat, _ = np.gradient(lat)
lon_diff = np.abs((lon - lon_0) / grad_lon)
lat_diff = np.abs((lat - lat_0) / grad_lat)
lon_val = np.select([lon_diff < 1], [1 - lon_diff], 0) / np.abs(grad_lon)
lat_val = np.select([lat_diff < 1], [1 - lat_diff], 0) / np.abs(grad_lat)
return lon_val * lat_val
[docs]class SkyGaussian(SkySpatialModel):
r"""Two-dimensional symmetric Gaussian model.
.. math::
\phi(lon, lat) = \frac{1}{2\pi\sigma^2} \exp{\left(-\frac{1}{2}
\frac{\theta^2}{\sigma^2}\right)}
where :math:`\theta` is the sky separation
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
sigma : `~astropy.coordinates.Angle`
:math:`\sigma`
"""
def __init__(self, lon_0, lat_0, sigma):
self.parameters = Parameters(
[
Parameter("lon_0", Longitude(lon_0)),
Parameter("lat_0", Latitude(lat_0)),
Parameter("sigma", Angle(sigma), min=0),
]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, sigma):
"""Evaluate the model (static function)."""
sep = angular_separation(lon, lat, lon_0, lat_0)
norm = 1 / (2 * np.pi * sigma ** 2)
exponent = -0.5 * (sep / sigma) ** 2
return norm * np.exp(exponent)
[docs]class SkyDisk(SkySpatialModel):
r"""Constant radial disk model.
.. math::
\phi(lon, lat) = \frac{1}{2 \pi (1 - \cos{r}) } \cdot
\begin{cases}
1 & \text{for } \theta \leq r_0 \\
0 & \text{for } \theta < r_0
\end{cases}
where :math:`\theta` is the sky separation
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
r_0 : `~astropy.coordinates.Angle`
:math:`r_0`
"""
def __init__(self, lon_0, lat_0, r_0):
self.parameters = Parameters(
[
Parameter("lon_0", Longitude(lon_0)),
Parameter("lat_0", Latitude(lat_0)),
Parameter("r_0", Angle(r_0)),
]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, r_0):
"""Evaluate the model (static function)."""
sep = angular_separation(lon, lat, lon_0, lat_0)
# Surface area of a spherical cap, see https://en.wikipedia.org/wiki/Spherical_cap
norm = 1.0 / (2 * np.pi * (1 - np.cos(r_0)))
return u.Quantity(norm.value * (sep <= r_0), "sr-1", copy=False)
[docs]class SkyShell(SkySpatialModel):
r"""Shell model
.. math::
\phi(lon, lat) = \frac{3}{2 \pi (r_{out}^3 - r_{in}^3)} \cdot
\begin{cases}
\sqrt{r_{out}^2 - \theta^2} - \sqrt{r_{in}^2 - \theta^2} &
\text{for } \theta \lt r_{in} \\
\sqrt{r_{out}^2 - \theta^2} &
\text{for } r_{in} \leq \theta \lt r_{out} \\
0 & \text{for } \theta > r_{out}
\end{cases}
where :math:`\theta` is the sky separation and :math:`r_out = r_in` + width
Note that the normalization is a small angle approximation,
although that approximation is still very good even for 10 deg radius shells.
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
radius : `~astropy.coordinates.Angle`
Inner radius, :math:`r_{in}`
width : `~astropy.coordinates.Angle`
Shell width
"""
def __init__(self, lon_0, lat_0, radius, width):
self.parameters = Parameters(
[
Parameter("lon_0", Longitude(lon_0)),
Parameter("lat_0", Latitude(lat_0)),
Parameter("radius", Angle(radius)),
Parameter("width", Angle(width)),
]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, radius, width):
"""Evaluate the model (static function)."""
sep = angular_separation(lon, lat, lon_0, lat_0)
radius_out = radius + width
norm = 3 / (2 * np.pi * (radius_out ** 3 - radius ** 3))
with np.errstate(invalid="ignore"):
# np.where and np.select do not work with quantities, so we use the
# workaround with indexing
value = np.sqrt(radius_out ** 2 - sep ** 2)
mask = [sep < radius]
value[mask] = (value - np.sqrt(radius ** 2 - sep ** 2))[mask]
value[sep > radius_out] = 0
return norm * value
[docs]class SkyDiffuseConstant(SkySpatialModel):
"""Spatially constant (isotropic) spatial model.
Parameters
----------
value : `~astropy.units.Quantity`
Value
"""
def __init__(self, value=1):
self.parameters = Parameters([Parameter("value", value)])
[docs] @staticmethod
def evaluate(lon, lat, value):
return value
[docs]class SkyDiffuseMap(SkySpatialModel):
"""Spatial sky map template model (2D).
This is for a 2D image. Use `~gammapy.cube.SkyDiffuseCube` for 3D cubes with
an energy axis.
Parameters
----------
map : `~gammapy.maps.Map`
Map template
norm : float
Norm parameter (multiplied with map values)
meta : dict, optional
Meta information, meta['filename'] will be used for serialization
normalize : bool
Normalize the input map so that it integrates to unity.
interp_kwargs : dict
Interpolation keyword arguments passed to `Map.interp_by_coord()`.
Default arguments are {'interp': 'linear', 'fill_value': 0}.
"""
def __init__(self, map, norm=1, meta=None, normalize=True, interp_kwargs=None):
if (map.data < 0).any():
log.warn(
"Map template contains negative values, please check the"
" data and fix if needed."
)
self.map = map
if normalize:
self.normalize()
self.parameters = Parameters([Parameter("norm", norm)])
self.meta = dict() if meta is None else meta
interp_kwargs = {} if interp_kwargs is None else interp_kwargs
interp_kwargs.setdefault("interp", "linear")
interp_kwargs.setdefault("fill_value", 0)
self._interp_kwargs = interp_kwargs
[docs] def normalize(self):
"""Normalize the diffuse map model so that it integrates to unity."""
data = self.map.data / self.map.data.sum()
data /= self.map.geom.solid_angle().to_value("sr")
self.map = self.map.copy(data=data, unit="sr-1")
[docs] @classmethod
def read(cls, filename, normalize=True, **kwargs):
"""Read spatial template model from FITS image.
The default unit used if none is found in the file is ``sr-1``.
Parameters
----------
filename : str
FITS image filename.
normalize : bool
Normalize the input map so that it integrates to unity.
kwargs : dict
Keyword arguments passed to `Map.read()`.
"""
m = Map.read(filename, **kwargs)
if m.unit == "":
m.unit = "sr-1"
return cls(m, normalize=normalize)
[docs] def evaluate(self, lon, lat, norm):
"""Evaluate model."""
coord = {"lon": lon.to_value("deg"), "lat": lat.to_value("deg")}
val = self.map.interp_by_coord(coord, **self._interp_kwargs)
return u.Quantity(norm.value * val, self.map.unit, copy=False)