Source code for gammapy.image.models.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
import astropy.units as u
from astropy.coordinates.angle_utilities import angular_separation
from astropy.coordinates import Angle, Longitude, Latitude
from ...utils.fitting import Parameter, Parameters, Model
from ...maps import Map

__all__ = [
    "SkySpatialModel",
    "SkyPointSource",
    "SkyGaussian",
    "SkyDisk",
    "SkyShell",
    "SkyDiffuseConstant",
    "SkyDiffuseMap",
]


log = logging.getLogger(__name__)


[docs]class SkySpatialModel(Model): """Sky spatial model base class."""
[docs] def __call__(self, lon, lat): """Call evaluate method""" kwargs = dict() for par in self.parameters.parameters: kwargs[par.name] = par.quantity return self.evaluate(lon, lat, **kwargs)
[docs]class SkyPointSource(SkySpatialModel): r"""Point Source. .. math:: \phi(lon, lat) = \delta{(lon - lon_0, lat - lat_0)} A tolerance of 1 arcsecond is accepted for numerical stability Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` """ def __init__(self, lon_0, lat_0): self.parameters = Parameters( [Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0))] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0): """Evaluate the model (static function).""" wrapval = lon_0 + 180 * u.deg lon = Angle(lon).wrap_at(wrapval) _, grad_lon = np.gradient(lon) grad_lat, _ = np.gradient(lat) lon_diff = np.abs((lon - lon_0) / grad_lon) lat_diff = np.abs((lat - lat_0) / grad_lat) lon_val = np.select([lon_diff < 1], [1 - lon_diff], 0) / np.abs(grad_lon) lat_val = np.select([lat_diff < 1], [1 - lat_diff], 0) / np.abs(grad_lat) return lon_val * lat_val
[docs]class SkyGaussian(SkySpatialModel): r"""Two-dimensional symmetric Gaussian model. .. math:: \phi(lon, lat) = \frac{1}{2\pi\sigma^2} \exp{\left(-\frac{1}{2} \frac{\theta^2}{\sigma^2}\right)} where :math:`\theta` is the sky separation Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` sigma : `~astropy.coordinates.Angle` :math:`\sigma` """ def __init__(self, lon_0, lat_0, sigma): self.parameters = Parameters( [ Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0)), Parameter("sigma", Angle(sigma)), ] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, sigma): """Evaluate the model (static function).""" sep = angular_separation(lon, lat, lon_0, lat_0) norm = 1 / (2 * np.pi * sigma ** 2) exponent = -0.5 * (sep / sigma) ** 2 return norm * np.exp(exponent)
[docs]class SkyDisk(SkySpatialModel): r"""Constant radial disk model. .. math:: \phi(lon, lat) = \frac{1}{2 \pi (1 - \cos{r}) } \cdot \begin{cases} 1 & \text{for } \theta \leq r_0 \\ 0 & \text{for } \theta < r_0 \end{cases} where :math:`\theta` is the sky separation Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` r_0 : `~astropy.coordinates.Angle` :math:`r_0` """ def __init__(self, lon_0, lat_0, r_0): self.parameters = Parameters( [ Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0)), Parameter("r_0", Angle(r_0)), ] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, r_0): """Evaluate the model (static function).""" sep = angular_separation(lon, lat, lon_0, lat_0) # Surface area of a spherical cap, see https://en.wikipedia.org/wiki/Spherical_cap norm = 1.0 / (2 * np.pi * (1 - np.cos(r_0))) return u.Quantity(norm.value * (sep <= r_0), "sr-1", copy=False)
[docs]class SkyShell(SkySpatialModel): r"""Shell model .. math:: \phi(lon, lat) = \frac{3}{2 \pi (r_{out}^3 - r_{in}^3)} \cdot \begin{cases} \sqrt{r_{out}^2 - \theta^2} - \sqrt{r_{in}^2 - \theta^2} & \text{for } \theta \lt r_{in} \\ \sqrt{r_{out}^2 - \theta^2} & \text{for } r_{in} \leq \theta \lt r_{out} \\ 0 & \text{for } \theta > r_{out} \end{cases} where :math:`\theta` is the sky separation and :math:`r_out = r_in` + width Note that the normalization is a small angle approximation, although that approximation is still very good even for 10 deg radius shells. Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` radius : `~astropy.coordinates.Angle` Inner radius, :math:`r_{in}` width : `~astropy.coordinates.Angle` Shell width """ def __init__(self, lon_0, lat_0, radius, width): self.parameters = Parameters( [ Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0)), Parameter("radius", Angle(radius)), Parameter("width", Angle(width)), ] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, radius, width): """Evaluate the model (static function).""" sep = angular_separation(lon, lat, lon_0, lat_0) radius_out = radius + width norm = 3 / (2 * np.pi * (radius_out ** 3 - radius ** 3)) with np.errstate(invalid="ignore"): # np.where and np.select do not work with quantities, so we use the # workaround with indexing value = np.sqrt(radius_out ** 2 - sep ** 2) mask = [sep < radius] value[mask] = (value - np.sqrt(radius ** 2 - sep ** 2))[mask] value[sep > radius_out] = 0 return norm * value
[docs]class SkyDiffuseConstant(SkySpatialModel): """Spatially constant (isotropic) spatial model. Parameters ---------- value : `~astropy.units.Quantity` Value """ def __init__(self, value=1): self.parameters = Parameters([Parameter("value", value)])
[docs] @staticmethod def evaluate(lon, lat, value): return value
[docs]class SkyDiffuseMap(SkySpatialModel): """Spatial sky map template model (2D). This is for a 2D image. Use `~gammapy.cube.SkyDiffuseCube` for 3D cubes with an energy axis. Parameters ---------- map : `~gammapy.maps.Map` Map template norm : float Norm parameter (multiplied with map values) meta : dict, optional Meta information, meta['filename'] will be used for serialization normalize : bool Normalize the input map so that it integrates to unity. interp_kwargs : dict Interpolation keyword arguments passed to `Map.interp_by_coord()`. Default arguments are {'interp': 'linear', 'fill_value': 0}. """ def __init__(self, map, norm=1, meta=None, normalize=True, interp_kwargs=None): if (map.data < 0).any(): log.warn( "Map template contains negative values, please check the" " data and fix if needed." ) self.map = map if normalize: self.normalize() self.parameters = Parameters([Parameter("norm", norm)]) self.meta = dict() if meta is None else meta interp_kwargs = {} if interp_kwargs is None else interp_kwargs interp_kwargs.setdefault("interp", "linear") interp_kwargs.setdefault("fill_value", 0) self._interp_kwargs = interp_kwargs
[docs] def normalize(self): """Normalize the diffuse map model so that it integrates to unity.""" data = self.map.data / self.map.data.sum() data /= self.map.geom.solid_angle().to_value("sr") self.map = self.map.copy(data=data, unit="sr-1")
[docs] @classmethod def read(cls, filename, normalize=True, **kwargs): """Read spatial template model from FITS image. The default unit used if none is found in the file is ``sr-1``. Parameters ---------- filename : str FITS image filename. normalize : bool Normalize the input map so that it integrates to unity. kwargs : dict Keyword arguments passed to `Map.read()`. """ m = Map.read(filename, **kwargs) if m.unit == "": m.unit = "sr-1" return cls(m, normalize=normalize)
[docs] def evaluate(self, lon, lat, norm): """Evaluate model.""" coord = {"lon": lon.to_value("deg"), "lat": lat.to_value("deg")} val = self.map.interp_by_coord(coord, **self._interp_kwargs) return u.Quantity(norm.value * val, self.map.unit, copy=False)