Source code for gammapy.image.models.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import copy
import abc
import logging
import numpy as np
import astropy.units as u
from astropy.coordinates.angle_utilities import angular_separation
from astropy.coordinates import Angle, Longitude, Latitude
from ...extern import six
from ...utils.fitting import Parameter, Parameters
from ...maps import Map

__all__ = [
    "SkySpatialModel",
    "SkyPointSource",
    "SkyGaussian",
    "SkyDisk",
    "SkyShell",
    "SkyDiffuseConstant",
    "SkyDiffuseMap",
]


log = logging.getLogger(__name__)


[docs]@six.add_metaclass(abc.ABCMeta) class SkySpatialModel(object): """SkySpatial model base class. """ def __str__(self): ss = self.__class__.__name__ ss += "\n\nParameters: \n\n\t" table = self.parameters.to_table() ss += "\n\t".join(table.pformat()) if self.parameters.covariance is not None: ss += "\n\nCovariance: \n\n\t" covar = self.parameters.covariance_to_table() ss += "\n\t".join(covar.pformat()) return ss
[docs] def __call__(self, lon, lat): """Call evaluate method""" kwargs = dict() for par in self.parameters.parameters: kwargs[par.name] = par.quantity return self.evaluate(lon, lat, **kwargs)
[docs] def copy(self): """A deep copy.""" return copy.deepcopy(self)
[docs]class SkyPointSource(SkySpatialModel): r"""Point Source. .. math:: \phi(lon, lat) = \delta{(lon - lon_0, lat - lat_0)} A tolerance of 1 arcsecond is accepted for numerical stability Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` """ def __init__(self, lon_0, lat_0): self.parameters = Parameters( [Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0))] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0): """Evaluate the model (static function).""" wrapval = lon_0 + 180 * u.deg lon = Angle(lon).wrap_at(wrapval) _, grad_lon = np.gradient(lon) grad_lat, _ = np.gradient(lat) lon_diff = np.abs((lon - lon_0) / grad_lon) lat_diff = np.abs((lat - lat_0) / grad_lat) lon_val = np.select([lon_diff < 1], [1 - lon_diff], 0) / np.abs(grad_lon) lat_val = np.select([lat_diff < 1], [1 - lat_diff], 0) / np.abs(grad_lat) val = lon_val * lat_val return val.to("sr-1")
[docs]class SkyGaussian(SkySpatialModel): r"""Two-dimensional symmetric Gaussian model. .. math:: \phi(lon, lat) = \frac{1}{2\pi\sigma^2} \exp{\left(-\frac{1}{2} \frac{\theta^2}{\sigma^2}\right)} where :math:`\theta` is the sky separation Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` sigma : `~astropy.coordinates.Angle` :math:`\sigma` """ def __init__(self, lon_0, lat_0, sigma): self.parameters = Parameters( [ Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0)), Parameter("sigma", Angle(sigma)), ] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, sigma): """Evaluate the model (static function).""" sep = angular_separation(lon, lat, lon_0, lat_0) sep = sep.to("rad").value sigma = sigma.to("rad").value norm = 1 / (2 * np.pi * sigma ** 2) exponent = -0.5 * (sep / sigma) ** 2 val = norm * np.exp(exponent) return val * u.Unit("sr-1")
[docs]class SkyDisk(SkySpatialModel): r"""Constant radial disk model. .. math:: \phi(lon, lat) = \frac{1}{2 \pi (1 - \cos{r}) } \cdot \begin{cases} 1 & \text{for } \theta \leq r_0 \\ 0 & \text{for } \theta < r_0 \end{cases} where :math:`\theta` is the sky separation Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` r_0 : `~astropy.coordinates.Angle` :math:`r_0` """ def __init__(self, lon_0, lat_0, r_0): self.parameters = Parameters( [ Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0)), Parameter("r_0", Angle(r_0)), ] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, r_0): """Evaluate the model (static function).""" sep = angular_separation(lon, lat, lon_0, lat_0) sep = sep.to("rad").value r_0 = r_0.to("rad").value norm = 1. / (2 * np.pi * (1 - np.cos(r_0))) val = np.where(sep <= r_0, norm, 0) return val * u.Unit("sr-1")
[docs]class SkyShell(SkySpatialModel): r"""Shell model .. math:: \phi(lon, lat) = \frac{3}{2 \pi (r_{out}^3 - r_{in}^3)} \cdot \begin{cases} \sqrt{r_{out}^2 - \theta^2} - \sqrt{r_{in}^2 - \theta^2} & \text{for } \theta \lt r_{in} \\ \sqrt{r_{out}^2 - \theta^2} & \text{for } r_{in} \leq \theta \lt r_{out} \\ 0 & \text{for } \theta > r_{out} \end{cases} where :math:`\theta` is the sky separation and :math:`r_out = r_in` + width Note that the normalization is a small angle approximation, although that approximation is still very good even for 10 deg radius shells. Parameters ---------- lon_0 : `~astropy.coordinates.Longitude` :math:`lon_0` lat_0 : `~astropy.coordinates.Latitude` :math:`lat_0` radius : `~astropy.coordinates.Angle` Inner radius, :math:`r_{in}` width : `~astropy.coordinates.Angle` Shell width """ def __init__(self, lon_0, lat_0, radius, width): self.parameters = Parameters( [ Parameter("lon_0", Longitude(lon_0)), Parameter("lat_0", Latitude(lat_0)), Parameter("radius", Angle(radius)), Parameter("width", Angle(width)), ] )
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, radius, width): """Evaluate the model (static function).""" sep = angular_separation(lon, lat, lon_0, lat_0) sep = sep.to("rad").value r_i = radius.to("rad").value r_o = (radius + width).to("rad").value norm = 3 / (2 * np.pi * (r_o ** 3 - r_i ** 3)) with np.errstate(invalid="ignore"): val_out = np.sqrt(r_o ** 2 - sep ** 2) val_in = val_out - np.sqrt(r_i ** 2 - sep ** 2) val = np.select([sep < r_i, sep < r_o], [val_in, val_out]) return norm * val * u.Unit("sr-1")
[docs]class SkyDiffuseConstant(SkySpatialModel): """Spatially constant (isotropic) spatial model. Parameters ---------- value : `~astropy.units.Quantity` Value """ def __init__(self, value=1): self.parameters = Parameters([Parameter("value", value)])
[docs] @staticmethod def evaluate(lon, lat, value): return value
[docs]class SkyDiffuseMap(SkySpatialModel): """Spatial sky map template model (2D). This is for a 2D image. Use `~gammapy.cube.SkyDiffuseCube` for 3D cubes with an energy axis. Parameters ---------- map : `~gammapy.maps.Map` Map template norm : float Norm parameter (multiplied with map values) meta : dict, optional Meta information, meta['filename'] will be used for serialization normalize : bool Normalize the input map so that it integrates to unity. interp_kwargs : dict Interpolation keyword arguments passed to `Map.interp_by_coord()`. Default arguments are {'interp': 'linear', 'fill_value': 0}. """ def __init__(self, map, norm=1, meta=None, normalize=True, interp_kwargs=None): if (map.data < 0).any(): log.warn( "Map template contains negative values, please check the" " data and fix if needed." ) self.map = map if normalize: self.normalize() self.parameters = Parameters([Parameter("norm", norm)]) self.meta = dict() if meta is None else meta interp_kwargs = {} if interp_kwargs is None else interp_kwargs interp_kwargs.setdefault("interp", "linear") interp_kwargs.setdefault("fill_value", 0) self._interp_kwargs = interp_kwargs
[docs] def normalize(self): """Normalize the diffuse map model so that it integrates to unity.""" data = self.map.data / self.map.data.sum() data /= self.map.geom.solid_angle().to("sr").value self.map = self.map.copy(data=data, unit="sr-1")
[docs] @classmethod def read(cls, filename, normalize=True, **kwargs): """Read spatial template model from FITS image. The default unit used if none is found in the file is ``sr-1``. Parameters ---------- filename : str FITS image filename. normalize : bool Normalize the input map so that it integrates to unity. kwargs : dict Keyword arguments passed to `Map.read()`. """ m = Map.read(filename, **kwargs) if m.unit == "": m.unit = "sr-1" return cls(m, normalize=normalize)
[docs] def evaluate(self, lon, lat, norm): """Evaluate model.""" coord = {"lon": lon.to("deg").value, "lat": lat.to("deg").value} val = self.map.interp_by_coord(coord, **self._interp_kwargs) return u.Quantity(norm.value * val, self.map.unit, copy=False)