# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import copy
import abc
import logging
import numpy as np
import astropy.units as u
from astropy.coordinates.angle_utilities import angular_separation
from astropy.coordinates import Angle, Longitude, Latitude
from ...extern import six
from ...utils.fitting import Parameter, Parameters
from ...maps import Map
__all__ = [
"SkySpatialModel",
"SkyPointSource",
"SkyGaussian",
"SkyDisk",
"SkyShell",
"SkyDiffuseConstant",
"SkyDiffuseMap",
]
log = logging.getLogger(__name__)
[docs]@six.add_metaclass(abc.ABCMeta)
class SkySpatialModel(object):
"""SkySpatial model base class.
"""
def __str__(self):
ss = self.__class__.__name__
ss += "\n\nParameters: \n\n\t"
table = self.parameters.to_table()
ss += "\n\t".join(table.pformat())
if self.parameters.covariance is not None:
ss += "\n\nCovariance: \n\n\t"
covar = self.parameters.covariance_to_table()
ss += "\n\t".join(covar.pformat())
return ss
[docs] def __call__(self, lon, lat):
"""Call evaluate method"""
kwargs = dict()
for par in self.parameters.parameters:
kwargs[par.name] = par.quantity
return self.evaluate(lon, lat, **kwargs)
[docs] def copy(self):
"""A deep copy."""
return copy.deepcopy(self)
[docs]class SkyPointSource(SkySpatialModel):
r"""Point Source.
.. math::
\phi(lon, lat) = \delta{(lon - lon_0, lat - lat_0)}
A tolerance of 1 arcsecond is accepted for numerical stability
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
"""
def __init__(self, lon_0, lat_0):
self.parameters = Parameters(
[Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0))]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0):
"""Evaluate the model (static function)."""
wrapval = lon_0 + 180 * u.deg
lon = Angle(lon).wrap_at(wrapval)
_, grad_lon = np.gradient(lon)
grad_lat, _ = np.gradient(lat)
lon_diff = np.abs((lon - lon_0) / grad_lon)
lat_diff = np.abs((lat - lat_0) / grad_lat)
lon_val = np.select([lon_diff < 1], [1 - lon_diff], 0) / np.abs(grad_lon)
lat_val = np.select([lat_diff < 1], [1 - lat_diff], 0) / np.abs(grad_lat)
val = lon_val * lat_val
return val.to("sr-1")
[docs]class SkyGaussian(SkySpatialModel):
r"""Two-dimensional symmetric Gaussian model.
.. math::
\phi(lon, lat) = \frac{1}{2\pi\sigma^2} \exp{\left(-\frac{1}{2}
\frac{\theta^2}{\sigma^2}\right)}
where :math:`\theta` is the sky separation
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
sigma : `~astropy.coordinates.Angle`
:math:`\sigma`
"""
def __init__(self, lon_0, lat_0, sigma):
self.parameters = Parameters(
[
Parameter("lon_0", Longitude(lon_0)),
Parameter("lat_0", Latitude(lat_0)),
Parameter("sigma", Angle(sigma)),
]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, sigma):
"""Evaluate the model (static function)."""
sep = angular_separation(lon, lat, lon_0, lat_0)
sep = sep.to("rad").value
sigma = sigma.to("rad").value
norm = 1 / (2 * np.pi * sigma ** 2)
exponent = -0.5 * (sep / sigma) ** 2
val = norm * np.exp(exponent)
return val * u.Unit("sr-1")
[docs]class SkyDisk(SkySpatialModel):
r"""Constant radial disk model.
.. math::
\phi(lon, lat) = \frac{1}{2 \pi (1 - \cos{r}) } \cdot
\begin{cases}
1 & \text{for } \theta \leq r_0 \\
0 & \text{for } \theta < r_0
\end{cases}
where :math:`\theta` is the sky separation
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
r_0 : `~astropy.coordinates.Angle`
:math:`r_0`
"""
def __init__(self, lon_0, lat_0, r_0):
self.parameters = Parameters(
[
Parameter("lon_0", Longitude(lon_0)),
Parameter("lat_0", Latitude(lat_0)),
Parameter("r_0", Angle(r_0)),
]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, r_0):
"""Evaluate the model (static function)."""
sep = angular_separation(lon, lat, lon_0, lat_0)
sep = sep.to("rad").value
r_0 = r_0.to("rad").value
norm = 1. / (2 * np.pi * (1 - np.cos(r_0)))
val = np.where(sep <= r_0, norm, 0)
return val * u.Unit("sr-1")
[docs]class SkyShell(SkySpatialModel):
r"""Shell model
.. math::
\phi(lon, lat) = \frac{3}{2 \pi (r_{out}^3 - r_{in}^3)} \cdot
\begin{cases}
\sqrt{r_{out}^2 - \theta^2} - \sqrt{r_{in}^2 - \theta^2} &
\text{for } \theta \lt r_{in} \\
\sqrt{r_{out}^2 - \theta^2} &
\text{for } r_{in} \leq \theta \lt r_{out} \\
0 & \text{for } \theta > r_{out}
\end{cases}
where :math:`\theta` is the sky separation and :math:`r_out = r_in` + width
Note that the normalization is a small angle approximation,
although that approximation is still very good even for 10 deg radius shells.
Parameters
----------
lon_0 : `~astropy.coordinates.Longitude`
:math:`lon_0`
lat_0 : `~astropy.coordinates.Latitude`
:math:`lat_0`
radius : `~astropy.coordinates.Angle`
Inner radius, :math:`r_{in}`
width : `~astropy.coordinates.Angle`
Shell width
"""
def __init__(self, lon_0, lat_0, radius, width):
self.parameters = Parameters(
[
Parameter("lon_0", Longitude(lon_0)),
Parameter("lat_0", Latitude(lat_0)),
Parameter("radius", Angle(radius)),
Parameter("width", Angle(width)),
]
)
[docs] @staticmethod
def evaluate(lon, lat, lon_0, lat_0, radius, width):
"""Evaluate the model (static function)."""
sep = angular_separation(lon, lat, lon_0, lat_0)
sep = sep.to("rad").value
r_i = radius.to("rad").value
r_o = (radius + width).to("rad").value
norm = 3 / (2 * np.pi * (r_o ** 3 - r_i ** 3))
with np.errstate(invalid="ignore"):
val_out = np.sqrt(r_o ** 2 - sep ** 2)
val_in = val_out - np.sqrt(r_i ** 2 - sep ** 2)
val = np.select([sep < r_i, sep < r_o], [val_in, val_out])
return norm * val * u.Unit("sr-1")
[docs]class SkyDiffuseConstant(SkySpatialModel):
"""Spatially constant (isotropic) spatial model.
Parameters
----------
value : `~astropy.units.Quantity`
Value
"""
def __init__(self, value=1):
self.parameters = Parameters([Parameter("value", value)])
[docs] @staticmethod
def evaluate(lon, lat, value):
return value
[docs]class SkyDiffuseMap(SkySpatialModel):
"""Spatial sky map template model (2D).
This is for a 2D image. Use `~gammapy.cube.SkyDiffuseCube` for 3D cubes with
an energy axis.
Parameters
----------
map : `~gammapy.maps.Map`
Map template
norm : float
Norm parameter (multiplied with map values)
meta : dict, optional
Meta information, meta['filename'] will be used for serialization
normalize : bool
Normalize the input map so that it integrates to unity.
interp_kwargs : dict
Interpolation keyword arguments passed to `Map.interp_by_coord()`.
Default arguments are {'interp': 'linear', 'fill_value': 0}.
"""
def __init__(self, map, norm=1, meta=None, normalize=True, interp_kwargs=None):
if (map.data < 0).any():
log.warn(
"Map template contains negative values, please check the"
" data and fix if needed."
)
self.map = map
if normalize:
self.normalize()
self.parameters = Parameters([Parameter("norm", norm)])
self.meta = dict() if meta is None else meta
interp_kwargs = {} if interp_kwargs is None else interp_kwargs
interp_kwargs.setdefault("interp", "linear")
interp_kwargs.setdefault("fill_value", 0)
self._interp_kwargs = interp_kwargs
[docs] def normalize(self):
"""Normalize the diffuse map model so that it integrates to unity."""
data = self.map.data / self.map.data.sum()
data /= self.map.geom.solid_angle().to("sr").value
self.map = self.map.copy(data=data, unit="sr-1")
[docs] @classmethod
def read(cls, filename, normalize=True, **kwargs):
"""Read spatial template model from FITS image.
The default unit used if none is found in the file is ``sr-1``.
Parameters
----------
filename : str
FITS image filename.
normalize : bool
Normalize the input map so that it integrates to unity.
kwargs : dict
Keyword arguments passed to `Map.read()`.
"""
m = Map.read(filename, **kwargs)
if m.unit == "":
m.unit = "sr-1"
return cls(m, normalize=normalize)
[docs] def evaluate(self, lon, lat, norm):
"""Evaluate model."""
coord = {"lon": lon.to("deg").value, "lat": lat.to("deg").value}
val = self.map.interp_by_coord(coord, **self._interp_kwargs)
return u.Quantity(norm.value * val, self.map.unit, copy=False)