Source code for gammapy.modeling.models.spatial

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Spatial models."""
import logging
import os
import numpy as np
import scipy.integrate
import scipy.special
from scipy.interpolate import griddata
import astropy.units as u
from astropy.coordinates import Angle, SkyCoord, angular_separation, position_angle
from astropy.nddata import NoOverlapError
from astropy.utils import lazyproperty
from regions import (
    CircleAnnulusSkyRegion,
    CircleSkyRegion,
    EllipseAnnulusSkyRegion,
    EllipseSkyRegion,
    PointSkyRegion,
    RectangleSkyRegion,
)
import matplotlib.pyplot as plt
from gammapy.maps import HpxNDMap, Map, MapCoord, WcsGeom, WcsNDMap
from gammapy.modeling import Parameter, Parameters
from gammapy.utils.gauss import Gauss2DPDF
from gammapy.utils.interpolation import interpolation_scale
from gammapy.utils.regions import region_circle_to_ellipse, region_to_frame
from gammapy.utils.scripts import make_path
from .core import ModelBase, _build_parameters_from_dict

__all__ = [
    "ConstantFluxSpatialModel",
    "ConstantSpatialModel",
    "DiskSpatialModel",
    "GaussianSpatialModel",
    "GeneralizedGaussianSpatialModel",
    "PointSpatialModel",
    "Shell2SpatialModel",
    "ShellSpatialModel",
    "SpatialModel",
    "TemplateSpatialModel",
    "TemplateNDSpatialModel",
    "PiecewiseNormSpatialModel",
]


log = logging.getLogger(__name__)

MAX_OVERSAMPLING = 200


def compute_sigma_eff(lon_0, lat_0, lon, lat, phi, major_axis, e):
    """Effective radius, used for the evaluation of elongated models."""
    phi_0 = position_angle(lon_0, lat_0, lon, lat)
    d_phi = phi - phi_0
    minor_axis = Angle(major_axis * np.sqrt(1 - e**2))

    a2 = (major_axis * np.sin(d_phi)) ** 2
    b2 = (minor_axis * np.cos(d_phi)) ** 2
    denominator = np.sqrt(a2 + b2)
    sigma_eff = major_axis * minor_axis / denominator
    return minor_axis, sigma_eff


[docs]class SpatialModel(ModelBase): """Spatial model base class.""" _type = "spatial" def __init__(self, **kwargs): frame = kwargs.pop("frame", "icrs") super().__init__(**kwargs) if not hasattr(self, "frame"): self.frame = frame
[docs] def __call__(self, lon, lat, energy=None): """Call evaluate method.""" kwargs = {par.name: par.quantity for par in self.parameters} if energy is None and self.is_energy_dependent: raise ValueError("Missing energy value for evaluation") if energy is not None: kwargs["energy"] = energy return self.evaluate(lon, lat, **kwargs)
@property def evaluation_bin_size_min(self): return None # TODO: make this a hard-coded class attribute? @lazyproperty def is_energy_dependent(self): varnames = self.evaluate.__code__.co_varnames return "energy" in varnames @property def position(self): """Spatial model center position as a `~astropy.coordinates.SkyCoord`.""" lon = self.lon_0.quantity lat = self.lat_0.quantity return SkyCoord(lon, lat, frame=self.frame) @position.setter def position(self, skycoord): """Spatial model center position.""" coord = skycoord.transform_to(self.frame) self.lon_0.quantity = coord.data.lon self.lat_0.quantity = coord.data.lat @property def position_lonlat(self): """Spatial model center position `(lon, lat)` in radians and frame of the model.""" lon = self.lon_0.quantity.to_value(u.rad) lat = self.lat_0.quantity.to_value(u.rad) return lon, lat # TODO: get rid of this! _phi_0 = 0.0 @property def phi_0(self): return self._phi_0 @phi_0.setter def phi_0(self, phi_0=0.0): self._phi_0 = phi_0 @property def position_error(self): """Get 95% containment position error as `~regions.EllipseSkyRegion`.""" if self.covariance is None: raise ValueError("No position error information available.") pars = self.parameters sub_covar = self.covariance.get_subcovariance(["lon_0", "lat_0"]).data.copy() cos_lat = np.cos(self.lat_0.quantity.to_value("rad")) sub_covar[0, 0] *= cos_lat**2.0 sub_covar[0, 1] *= cos_lat sub_covar[1, 0] *= cos_lat eig_vals, eig_vecs = np.linalg.eig(sub_covar) lon_err, lat_err = np.sqrt(eig_vals) y_vec = eig_vecs[:, 0] phi = (np.arctan2(y_vec[1], y_vec[0]) * u.rad).to("deg") + self.phi_0 err = np.sort([lon_err, lat_err]) scale_r95 = Gauss2DPDF(sigma=1).containment_radius(0.95) err *= scale_r95 if err[1] == lon_err * scale_r95: phi += 90 * u.deg height = 2 * err[1] * pars["lon_0"].unit width = 2 * err[0] * pars["lat_0"].unit else: height = 2 * err[1] * pars["lat_0"].unit width = 2 * err[0] * pars["lon_0"].unit return EllipseSkyRegion( center=self.position, height=height, width=width, angle=phi )
[docs] def evaluate_geom(self, geom): """Evaluate model on `~gammapy.maps.Geom`. Parameters ---------- geom : `~gammapy.maps.WcsGeom` Map geometry. Returns ------- map : `~gammapy.maps.Map` Map containing the value in each spatial bin. """ coords = geom.get_coord(frame=self.frame, sparse=True) if self.is_energy_dependent: return self(coords.lon, coords.lat, energy=coords["energy_true"]) else: return self(coords.lon, coords.lat)
[docs] def integrate_geom(self, geom, oversampling_factor=None): """Integrate model on `~gammapy.maps.Geom` or `~gammapy.maps.RegionGeom`. Integration is performed by simple rectangle approximation, the pixel center model value is multiplied by the pixel solid angle. An oversampling factor can be used for precision. By default, this parameter is set to None and an oversampling factor is automatically estimated based on the model estimation maximal bin width. For a RegionGeom, the model is integrated on a tangent WCS projection in the region. Parameters ---------- geom : `~gammapy.maps.WcsGeom` or `~gammapy.maps.RegionGeom` The geom on which the integration is performed. oversampling_factor : int or None The oversampling factor to use for integration. Default is None: the factor is estimated from the model minimal bin size. Returns ------- map : `~gammapy.maps.Map` or `gammapy.maps.RegionNDMap` Map containing the integral value in each spatial bin. """ wcs_geom = geom mask = None if geom.is_region: wcs_geom = geom.to_wcs_geom().to_image() result = Map.from_geom(geom=wcs_geom) integrated = Map.from_geom(wcs_geom) pix_scale = np.max(wcs_geom.pixel_scales.to_value("deg")) if self.evaluation_radius is not None: try: width = 2 * np.maximum( self.evaluation_radius.to_value("deg"), pix_scale ) wcs_geom_cut = wcs_geom.cutout(self.position, width) integrated = Map.from_geom(wcs_geom_cut) except (NoOverlapError, ValueError): oversampling_factor = 1 if oversampling_factor is None: if self.evaluation_bin_size_min is not None: res_scale = self.evaluation_bin_size_min.to_value("deg") if res_scale > 0: oversampling_factor = np.minimum( int(np.ceil(pix_scale / res_scale)), MAX_OVERSAMPLING ) else: oversampling_factor = MAX_OVERSAMPLING else: oversampling_factor = 1 if oversampling_factor > 1: upsampled_geom = integrated.geom.upsample( oversampling_factor, axis_name=None ) # assume the upsampled solid angles are approximately factor**2 smaller values = self.evaluate_geom(upsampled_geom) / oversampling_factor**2 upsampled = Map.from_geom(upsampled_geom, unit=values.unit) upsampled += values if geom.is_region: mask = geom.contains(upsampled_geom.get_coord()).astype("int") integrated.quantity = upsampled.downsample( oversampling_factor, preserve_counts=True, weights=mask ).quantity # Finally stack result result._unit = integrated.unit result.stack(integrated) else: values = self.evaluate_geom(wcs_geom) result._unit = values.unit result += values result *= result.geom.solid_angle() if geom.is_region: mask = result.geom.region_mask([geom.region]) result = Map.from_geom( geom, data=np.sum(result.data[mask]), unit=result.unit ) return result
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serilisation.""" data = super().to_dict(full_output) data["spatial"]["frame"] = self.frame data["spatial"]["parameters"] = data["spatial"].pop("parameters") return data
[docs] @classmethod def from_dict(cls, data, **kwargs): """Create a spatial model from a dictionary. Parameters ---------- data : dict Dictionary containing model parameters. kwargs : dict Keyword arguments passed to `~SpatialModel.from_parameters`. """ kwargs = kwargs or {} spatial_data = data.get("spatial", data) if "frame" in spatial_data: kwargs["frame"] = spatial_data["frame"] return super().from_dict(data, **kwargs)
@property def _evaluation_geom(self): if isinstance(self, TemplateSpatialModel): geom = self.map.geom else: width = 2 * max(self.evaluation_radius, 0.1 * u.deg) geom = WcsGeom.create( skydir=self.position, frame=self.frame, width=width, binsz=0.02 ) return geom def _get_plot_map(self, geom): if self.evaluation_radius is None and geom is None: raise ValueError( f"{self.__class__.__name__} requires geom to be defined for plotting." ) if geom is None: geom = self._evaluation_geom data = self.evaluate_geom(geom) return Map.from_geom(geom, data=data.value, unit=data.unit)
[docs] def plot(self, ax=None, geom=None, **kwargs): """Plot spatial model. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. Default is None. geom : `~gammapy.maps.WcsGeom`, optional Geometry to use for plotting. Default is None. **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ m = self._get_plot_map(geom) if not m.geom.is_flat: raise TypeError( "Use .plot_interactive() or .plot_grid() for Map dimension > 2" ) return m.plot(ax=ax, **kwargs)
[docs] def plot_interactive(self, ax=None, geom=None, **kwargs): """Plot spatial model. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. Default is None. geom : `~gammapy.maps.WcsGeom`, optional Geom to use for plotting. Default is None. **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ m = self._get_plot_map(geom) if m.geom.is_image: raise TypeError("Use .plot() for 2D Maps") m.plot_interactive(ax=ax, **kwargs)
[docs] def plot_position_error(self, ax=None, **kwargs): """Plot position error. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes to plot the position error on. Default is None. **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ # plot center position lon, lat = self.lon_0.value, self.lat_0.value ax = plt.gca() if ax is None else ax kwargs.setdefault("marker", "x") kwargs.setdefault("color", "red") kwargs.setdefault("label", "position") ax.scatter(lon, lat, transform=ax.get_transform(self.frame), **kwargs) # plot position error if not np.all(self.covariance.data == 0): region = self.position_error.to_pixel(ax.wcs) artist = region.as_artist(facecolor="none", edgecolor=kwargs["color"]) ax.add_artist(artist) return ax
def _to_region_error(self): pass
[docs] def plot_error( self, ax=None, which="position", kwargs_position=None, kwargs_extension=None ): """Plot the errors of the spatial model. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes to plot the errors on. Default is None. which: list of str Which errors to plot. Available options are: * "all": all the optional steps are plotted * "position": plot the position error of the spatial model * "extension": plot the extension error of the spatial model kwargs_position : dict, optional Keyword arguments passed to `~SpatialModel.plot_position_error`. Default is None. kwargs_extension : dict, optional Keyword arguments passed to `~SpatialModel.plot_extension_error`. Default is None. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ kwargs_position = kwargs_position or {} kwargs_extension = kwargs_extension or {} ax = plt.gca() if ax is None else ax kwargs_extension.setdefault("edgecolor", "red") kwargs_extension.setdefault("facecolor", "red") kwargs_extension.setdefault("alpha", 0.15) kwargs_extension.setdefault("fill", True) if "all" in which: self.plot_position_error(ax, **kwargs_position) region = self._to_region_error() if region is not None: artist = region.to_pixel(ax.wcs).as_artist(**kwargs_extension) ax.add_artist(artist) if "position" in which: self.plot_position_error(ax, **kwargs_position) if "extension" in which: region = self._to_region_error() if region is not None: artist = region.to_pixel(ax.wcs).as_artist(**kwargs_extension) ax.add_artist(artist)
[docs] def plot_grid(self, geom=None, **kwargs): """Plot spatial model energy slices in a grid. Parameters ---------- geom : `~gammapy.maps.WcsGeom`, optional Geometry to use for plotting. Default is None. **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsMap.plot()`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ m = self._get_plot_map(geom) if (m.geom is None) or m.geom.is_image: raise TypeError("Use .plot() for 2D Maps") m.plot_grid(**kwargs)
[docs] @classmethod def from_position(cls, position, **kwargs): """Define the position of the model using a `~astropy.coordinates.SkyCoord`. The model will be created in the frame of the `~astropy.coordinates.SkyCoord`. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position. Returns ------- model : `SpatialModel` Spatial model. """ lon_0, lat_0 = position.data.lon, position.data.lat return cls(lon_0=lon_0, lat_0=lat_0, frame=position.frame.name, **kwargs)
@property def evaluation_radius(self): """Evaluation radius.""" return None @property def evaluation_region(self): """Evaluation region.""" if hasattr(self, "to_region"): return self.to_region() elif self.evaluation_radius is not None: return CircleSkyRegion( center=self.position, radius=self.evaluation_radius, ) else: return None
[docs]class PointSpatialModel(SpatialModel): """Point Source. For more information see :ref:`point-spatial-model`. Parameters ---------- lon_0, lat_0 : `~astropy.coordinates.Angle` Center position. Default is "0 deg", "0 deg". frame : {"icrs", "galactic"} Center position coordinate frame. """ tag = ["PointSpatialModel", "point"] lon_0 = Parameter("lon_0", "0 deg") lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90) @property def evaluation_bin_size_min(self): """Minimal evaluation bin size as an `~astropy.coordinates.Angle`.""" return 0 * u.deg @property def evaluation_radius(self): """Evaluation radius as an `~astropy.coordinates.Angle`. Set as zero degrees. """ return 0 * u.deg @staticmethod def _grid_weights(x, y, x0, y0): """Compute 4-pixel weights such that centroid is preserved.""" dx = np.abs(x - x0) dx = np.where(dx < 1, 1 - dx, 0) dy = np.abs(y - y0) dy = np.where(dy < 1, 1 - dy, 0) return dx * dy @property def is_energy_dependent(self): return False
[docs] def evaluate_geom(self, geom): """Evaluate model on `~gammapy.maps.Geom`.""" values = self.integrate_geom(geom).data return values / geom.solid_angle()
[docs] def integrate_geom(self, geom, oversampling_factor=None): """Integrate model on `~gammapy.maps.Geom`. Parameters ---------- geom : `Geom` Map geometry. Returns ------- flux : `Map` Predicted flux map. """ geom_image = geom.to_image() if geom.is_hpx: idx, weights = geom_image.interp_weights({"skycoord": self.position}) data = np.zeros(geom_image.data_shape) data[tuple(idx)] = weights else: x, y = geom_image.get_pix() x0, y0 = self.position.to_pixel(geom.wcs) data = self._grid_weights(x, y, x0, y0) return Map.from_geom(geom=geom_image, data=data, unit="")
[docs] def to_region(self, **kwargs): """Model outline as a `~regions.PointSkyRegion`.""" return PointSkyRegion(center=self.position, **kwargs)
[docs]class GaussianSpatialModel(SpatialModel): r"""Two-dimensional Gaussian model. For more information see :ref:`gaussian-spatial-model`. Parameters ---------- lon_0, lat_0 : `~astropy.coordinates.Angle` Center position. Default is "0 deg", "0 deg". sigma : `~astropy.coordinates.Angle` Length of the major semiaxis of the Gaussian, in angular units. Default is 1 deg. e : `float` Eccentricity of the Gaussian (:math:`0< e< 1`). Default is 0. phi : `~astropy.coordinates.Angle` Rotation angle :math:`\phi`: of the major semiaxis. Increases counter-clockwise from the North direction. Default is 0 deg. frame : {"icrs", "galactic"} Center position coordinate frame. """ tag = ["GaussianSpatialModel", "gauss"] lon_0 = Parameter("lon_0", "0 deg") lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90) sigma = Parameter("sigma", "1 deg", min=0) e = Parameter("e", 0, min=0, max=1, frozen=True) phi = Parameter("phi", "0 deg", frozen=True) @property def evaluation_bin_size_min(self): """Minimal evaluation bin size as an `~astropy.coordinates.Angle`, chosen as sigma/3.""" return self.parameters["sigma"].quantity / 3.0 @property def evaluation_radius(self): r"""Evaluation radius as an `~astropy.coordinates.Angle`. Set as :math:`5\sigma`. """ return 5 * self.parameters["sigma"].quantity
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, sigma, e, phi): """Evaluate model.""" sep = angular_separation(lon, lat, lon_0, lat_0) if e == 0: a = 1.0 - np.cos(sigma) norm = (1 / (4 * np.pi * a * (1.0 - np.exp(-1.0 / a)))).value else: minor_axis, sigma_eff = compute_sigma_eff( lon_0, lat_0, lon, lat, phi, sigma, e ) a = 1.0 - np.cos(sigma_eff) norm = (1 / (2 * np.pi * sigma * minor_axis)).to_value("sr-1") exponent = -0.5 * ((1 - np.cos(sep)) / a) return u.Quantity(norm * np.exp(exponent).value, "sr-1", copy=False)
[docs] def to_region(self, x_sigma=1.5, **kwargs): r"""Model outline at a given number of :math:`\sigma`. Parameters ---------- x_sigma : float Number of :math:`\sigma Default is :math:`1.5\sigma` which corresponds to about 68% containment for a 2D symmetric Gaussian. Returns ------- region : `~regions.EllipseSkyRegion` Model outline. """ minor_axis = Angle(self.sigma.quantity * np.sqrt(1 - self.e.quantity**2)) return EllipseSkyRegion( center=self.position, height=2 * x_sigma * self.sigma.quantity, width=2 * x_sigma * minor_axis, angle=self.phi.quantity, **kwargs, )
@property def evaluation_region(self): """Evaluation region consistent with evaluation radius.""" return self.to_region(x_sigma=5) def _to_region_error(self, x_sigma=1.5): r"""Plot model error at a given number of :math:`\sigma`. Parameters ---------- x_sigma : float Number of :math:`\sigma Default is :math:`1.5\sigma` which corresponds to about 68% containment for a 2D symmetric Gaussian. Returns ------- region : `~regions.EllipseSkyRegion` Model error region. """ sigma_hi = self.sigma.quantity + (self.sigma.error * self.sigma.unit) sigma_lo = self.sigma.quantity - (self.sigma.error * self.sigma.unit) minor_axis_hi = Angle( sigma_hi * np.sqrt(1 - (self.e.quantity + self.e.error) ** 2) ) minor_axis_lo = Angle( sigma_lo * np.sqrt(1 - (self.e.quantity - self.e.error) ** 2) ) return EllipseAnnulusSkyRegion( center=self.position, inner_height=2 * x_sigma * sigma_lo, outer_height=2 * x_sigma * sigma_hi, inner_width=2 * x_sigma * minor_axis_lo, outer_width=2 * x_sigma * minor_axis_hi, angle=self.phi.quantity, )
[docs]class GeneralizedGaussianSpatialModel(SpatialModel): r"""Two-dimensional Generalized Gaussian model. For more information see :ref:`generalized-gaussian-spatial-model`. Parameters ---------- lon_0, lat_0 : `~astropy.coordinates.Angle` Center position. Default is "0 deg", "0 deg". r_0 : `~astropy.coordinates.Angle` Length of the major semiaxis, in angular units. Default is 1 deg. eta : `float` Shape parameter within (0, 1). Special cases for disk: ->0, Gaussian: 0.5, Laplace:1 Default is 0.5. e : `float` Eccentricity (:math:`0< e< 1`). Default is 0. phi : `~astropy.coordinates.Angle` Rotation angle :math:`\phi`: of the major semiaxis. Increases counter-clockwise from the North direction. Default is 0 deg. frame : {"icrs", "galactic"} Center position coordinate frame. """ tag = ["GeneralizedGaussianSpatialModel", "gauss-general"] lon_0 = Parameter("lon_0", "0 deg") lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90) r_0 = Parameter("r_0", "1 deg") eta = Parameter("eta", 0.5, min=0.01, max=1.0) e = Parameter("e", 0.0, min=0.0, max=1.0, frozen=True) phi = Parameter("phi", "0 deg", frozen=True)
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, r_0, eta, e, phi): sep = angular_separation(lon, lat, lon_0, lat_0) if isinstance(eta, u.Quantity): eta = eta.value # gamma function does not allow quantities minor_axis, r_eff = compute_sigma_eff(lon_0, lat_0, lon, lat, phi, r_0, e) z = sep / r_eff norm = 1 / (2 * np.pi * minor_axis * r_0 * eta * scipy.special.gamma(2 * eta)) return (norm * np.exp(-(z ** (1 / eta)))).to("sr-1")
@property def evaluation_bin_size_min(self): """Minimal evaluation bin size as an `~astropy.coordinates.Angle`. The bin min size is defined as r_0/(3+8*eta)/(e+1). """ return self.r_0.quantity / (3 + 8 * self.eta.value) / (self.e.value + 1) @property def evaluation_radius(self): r"""Evaluation radius as an `~astropy.coordinates.Angle`. The evaluation radius is defined as r_eval = r_0*(1+8*eta) so it verifies: r_eval -> r_0 if eta -> 0 r_eval = 5*r_0 > 5*sigma_gauss = 5*r_0/sqrt(2) ~ 3.5*r_0 if eta=0.5 r_eval = 9*r_0 > 5*sigma_laplace = 5*sqrt(2)*r_0 ~ 7*r_0 if eta = 1 r_eval -> inf if eta -> inf """ return self.r_0.quantity * (1 + 8 * self.eta.value)
[docs] def to_region(self, x_r_0=1, **kwargs): r"""Model outline at a given number of :math:`r_0`. Parameters ---------- x_r_0 : float, optional Number of :math:`r_0`. Default is 1. Returns ------- region : `~regions.EllipseSkyRegion` Model outline. """ minor_axis = Angle(self.r_0.quantity * np.sqrt(1 - self.e.quantity**2)) return EllipseSkyRegion( center=self.position, height=2 * x_r_0 * self.r_0.quantity, width=2 * x_r_0 * minor_axis, angle=self.phi.quantity, **kwargs, )
@property def evaluation_region(self): """Evaluation region consistent with evaluation radius.""" scale = self.evaluation_radius / self.r_0.quantity return self.to_region(x_r_0=scale) def _to_region_error(self, x_r_0=1): r"""Model error at a given number of :math:`r_0`. Parameters ---------- x_r_0 : float, optional Number of :math:`r_0`. Default is 1. Returns ------- region : `~regions.EllipseSkyRegion` Model error region. """ r_0_lo = self.r_0.quantity - self.r_0.error * self.r_0.unit r_0_hi = self.r_0.quantity + self.r_0.error * self.r_0.unit minor_axis_hi = Angle( r_0_hi * np.sqrt(1 - (self.e.quantity + self.e.error) ** 2) ) minor_axis_lo = Angle( r_0_lo * np.sqrt(1 - (self.e.quantity - self.e.error) ** 2) ) return EllipseAnnulusSkyRegion( center=self.position, inner_height=2 * x_r_0 * r_0_lo, outer_height=2 * x_r_0 * r_0_hi, inner_width=2 * x_r_0 * minor_axis_lo, outer_width=2 * x_r_0 * minor_axis_hi, angle=self.phi.quantity, )
[docs]class DiskSpatialModel(SpatialModel): r"""Constant disk model. For more information see :ref:`disk-spatial-model`. Parameters ---------- lon_0, lat_0 : `~astropy.coordinates.Angle` Center position. Default is "0 deg", "0 deg". r_0 : `~astropy.coordinates.Angle` :math:`a`: length of the major semiaxis, in angular units. Default is 1 deg. e : `float` Eccentricity of the ellipse (:math:`0< e< 1`). Default is 0. phi : `~astropy.coordinates.Angle` Rotation angle :math:`\phi`: of the major semiaxis. Increases counter-clockwise from the North direction. Default is 0 deg. edge_width : float Width of the edge. The width is defined as the range within which the smooth edge of the model drops from 95% to 5% of its amplitude. It is given as fraction of r_0. Default is 0.01. frame : {"icrs", "galactic"} Center position coordinate frame. """ tag = ["DiskSpatialModel", "disk"] lon_0 = Parameter("lon_0", "0 deg") lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90) r_0 = Parameter("r_0", "1 deg", min=0) e = Parameter("e", 0, min=0, max=1, frozen=True) phi = Parameter("phi", "0 deg", frozen=True) edge_width = Parameter("edge_width", value=0.01, min=0, max=1, frozen=True) @property def evaluation_bin_size_min(self): """Minimal evaluation bin size as an `~astropy.coordinates.Angle`. The bin min size is defined as r_0*(1-edge_width)/10. """ return self.r_0.quantity * (1 - self.edge_width.quantity) / 10.0 @property def evaluation_radius(self): """Evaluation radius as an `~astropy.coordinates.Angle`. Set to the length of the semi-major axis plus the edge width. """ return 1.1 * self.r_0.quantity * (1 + self.edge_width.quantity) @staticmethod def _evaluate_norm_factor(r_0, e): """Compute the normalization factor.""" semi_minor = r_0 * np.sqrt(1 - e**2) def integral_fcn(x, a, b): A = 1 / np.sin(a) ** 2 B = 1 / np.sin(b) ** 2 C = A - B cs2 = np.cos(x) ** 2 return 1 - np.sqrt(1 - 1 / (B + C * cs2)) return ( 2 * scipy.integrate.quad( lambda x: integral_fcn(x, r_0, semi_minor), 0, np.pi )[0] ) ** -1 @staticmethod def _evaluate_smooth_edge(x, width): value = (x / width).to_value("") edge_width_95 = 2.326174307353347 return 0.5 * (1 - scipy.special.erf(value * edge_width_95))
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, r_0, e, phi, edge_width): """Evaluate model.""" sep = angular_separation(lon, lat, lon_0, lat_0) if e == 0: sigma_eff = r_0 else: sigma_eff = compute_sigma_eff(lon_0, lat_0, lon, lat, phi, r_0, e)[1] norm = DiskSpatialModel._evaluate_norm_factor(r_0, e) in_ellipse = DiskSpatialModel._evaluate_smooth_edge( sep - sigma_eff, sigma_eff * edge_width ) return u.Quantity(norm * in_ellipse, "sr-1", copy=False)
[docs] def to_region(self, **kwargs): """Model outline as a `~regions.EllipseSkyRegion`.""" minor_axis = Angle(self.r_0.quantity * np.sqrt(1 - self.e.quantity**2)) return EllipseSkyRegion( center=self.position, height=2 * self.r_0.quantity, width=2 * minor_axis, angle=self.phi.quantity, **kwargs, )
[docs] @classmethod def from_region(cls, region, **kwargs): """Create a `DiskSpatialModel from a ~regions.EllipseSkyRegion`. Parameters ---------- region : `~regions.EllipseSkyRegion` or ~regions.CircleSkyRegion` Region to create model from. kwargs : dict Keyword arguments passed to `~gammapy.modeling.models.DiskSpatialModel`. Returns ------- spatial_model : `~gammapy.modeling.models.DiskSpatialModel` Spatial model. """ if isinstance(region, CircleSkyRegion): region = region_circle_to_ellipse(region) if not isinstance(region, EllipseSkyRegion): raise ValueError( f"Please provide a `CircleSkyRegion` " f"or `EllipseSkyRegion`, got {type(region)} instead." ) frame = kwargs.pop("frame", region.center.frame) region = region_to_frame(region, frame=frame) if region.height > region.width: major_axis, minor_axis = region.height, region.width phi = region.angle else: minor_axis, major_axis = region.height, region.width phi = 90 * u.deg + region.angle kwargs.setdefault("phi", phi) kwargs.setdefault("e", np.sqrt(1.0 - np.power(minor_axis / major_axis, 2))) kwargs.setdefault("r_0", major_axis / 2.0) return cls.from_position(region.center, **kwargs)
def _to_region_error(self): """Model error. Returns ------- region : `~regions.EllipseSkyRegion` Model error region. """ r_0_lo = self.r_0.quantity - self.r_0.error * self.r_0.unit r_0_hi = self.r_0.quantity + self.r_0.error * self.r_0.unit minor_axis_hi = Angle( r_0_hi * np.sqrt(1 - (self.e.quantity + self.e.error) ** 2) ) minor_axis_lo = Angle( r_0_lo * np.sqrt(1 - (self.e.quantity - self.e.error) ** 2) ) return EllipseAnnulusSkyRegion( center=self.position, inner_height=2 * r_0_lo, outer_height=2 * r_0_hi, inner_width=2 * minor_axis_lo, outer_width=2 * minor_axis_hi, angle=self.phi.quantity, )
[docs]class ShellSpatialModel(SpatialModel): r"""Shell model. For more information see :ref:`shell-spatial-model`. Parameters ---------- lon_0, lat_0 : `~astropy.coordinates.Angle` Center position. Default is "0 deg", "0 deg". radius : `~astropy.coordinates.Angle` Inner radius, :math:`r_{in}`. Default is 1 deg. width : `~astropy.coordinates.Angle` Shell width. Default is 0.2 deg. frame : {"icrs", "galactic"} Center position coordinate frame. See Also -------- Shell2SpatialModel """ tag = ["ShellSpatialModel", "shell"] lon_0 = Parameter("lon_0", "0 deg") lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90) radius = Parameter("radius", "1 deg") width = Parameter("width", "0.2 deg") @property def evaluation_bin_size_min(self): """Minimal evaluation bin size as an `~astropy.coordinates.Angle`. The bin min size is defined as the shell width. """ return self.width.quantity @property def evaluation_radius(self): r"""Evaluation radius as an `~astropy.coordinates.Angle`. Set to :math:`r_\text{out}`. """ return self.radius.quantity + self.width.quantity
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, radius, width): """Evaluate model.""" sep = angular_separation(lon, lat, lon_0, lat_0) radius_out = radius + width norm = 3 / (2 * np.pi * (radius_out**3 - radius**3)) with np.errstate(invalid="ignore"): # np.where and np.select do not work with quantities, so we use the # workaround with indexing value = np.sqrt(radius_out**2 - sep**2) mask = sep < radius value[mask] = (value - np.sqrt(radius**2 - sep**2))[mask] value[sep > radius_out] = 0 return norm * value
[docs] def to_region(self, **kwargs): """Model outline as a `~regions.CircleAnnulusSkyRegion`.""" return CircleAnnulusSkyRegion( center=self.position, inner_radius=self.radius.quantity, outer_radius=self.radius.quantity + self.width.quantity, **kwargs, )
[docs]class Shell2SpatialModel(SpatialModel): r"""Shell model with outer radius and relative width parametrization. For more information see :ref:`shell2-spatial-model`. Parameters ---------- lon_0, lat_0 : `~astropy.coordinates.Angle` Center position. Default is "0 deg", "0 deg". r_0 : `~astropy.coordinates.Angle` Outer radius, :math:`r_{out}`. Default is 1 deg. eta : float Shell width relative to outer radius, r_0, should be within (0,1). Default is 0.2. frame : {"icrs", "galactic"} Center position coordinate frame. See Also -------- ShellSpatialModel """ tag = ["Shell2SpatialModel", "shell2"] lon_0 = Parameter("lon_0", "0 deg") lat_0 = Parameter("lat_0", "0 deg", min=-90, max=90) r_0 = Parameter("r_0", "1 deg") eta = Parameter("eta", 0.2, min=0.02, max=1) @property def evaluation_bin_size_min(self): """Minimal evaluation bin size as an `~astropy.coordinates.Angle`. The bin min size is defined as r_0*eta. """ return self.eta.value * self.r_0.quantity @property def evaluation_radius(self): r"""Evaluation radius as an `~astropy.coordinates.Angle`. Set to :math:`r_\text{out}`. """ return self.r_0.quantity @property def r_in(self): return (1 - self.eta.quantity) * self.r_0.quantity
[docs] @staticmethod def evaluate(lon, lat, lon_0, lat_0, r_0, eta): """Evaluate model.""" sep = angular_separation(lon, lat, lon_0, lat_0) r_in = (1 - eta) * r_0 norm = 3 / (2 * np.pi * (r_0**3 - r_in**3)) with np.errstate(invalid="ignore"): # np.where and np.select do not work with quantities, so we use the # workaround with indexing value = np.sqrt(r_0**2 - sep**2) mask = sep < r_in value[mask] = (value - np.sqrt(r_in**2 - sep**2))[mask] value[sep > r_0] = 0 return norm * value
[docs] def to_region(self, **kwargs): """Model outline as a `~regions.CircleAnnulusSkyRegion`.""" return CircleAnnulusSkyRegion( center=self.position, inner_radius=self.r_in, outer_radius=self.r_0.quantity, **kwargs, )
[docs]class ConstantSpatialModel(SpatialModel): """Spatially constant (isotropic) spatial model. For more information see :ref:`constant-spatial-model`. Parameters ---------- value : `~astropy.units.Quantity` Value. Default is 1 sr-1. """ tag = ["ConstantSpatialModel", "const"] value = Parameter("value", "1 sr-1", frozen=True) frame = "icrs" evaluation_radius = None position = None
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serilisation.""" # redefined to ignore frame attribute from parent class data = super().to_dict(full_output) data["spatial"].pop("frame") data["spatial"]["parameters"] = [] return data
[docs] @staticmethod def evaluate(lon, lat, value): """Evaluate model.""" return value
[docs] def to_region(self, **kwargs): """Model outline as a `~regions.RectangleSkyRegion`.""" return RectangleSkyRegion( center=SkyCoord(0 * u.deg, 0 * u.deg, frame=self.frame), height=180 * u.deg, width=360 * u.deg, **kwargs, )
[docs]class ConstantFluxSpatialModel(SpatialModel): """Spatially constant flux spatial model. For more information see :ref:`constant-spatial-model`. """ tag = ["ConstantFluxSpatialModel", "const-flux"] frame = "icrs" evaluation_radius = None position = None
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serilisation.""" # redefined to ignore frame attribute from parent class data = super().to_dict(full_output) data["spatial"].pop("frame") return data
[docs] @staticmethod def evaluate(lon, lat): """Evaluate model.""" return 1 / u.sr
[docs] @staticmethod def evaluate_geom(geom): """Evaluate model.""" return 1 / geom.solid_angle()
[docs] @staticmethod def integrate_geom(geom, oversampling_factor=None): """Evaluate model.""" return Map.from_geom(geom=geom, data=1)
[docs] def to_region(self, **kwargs): """Model outline as a `~regions.RectangleSkyRegion`.""" return RectangleSkyRegion( center=SkyCoord(0 * u.deg, 0 * u.deg, frame=self.frame), height=180 * u.deg, width=360 * u.deg, **kwargs, )
[docs]class TemplateSpatialModel(SpatialModel): """Spatial sky map template model. For more information see :ref:`template-spatial-model`. By default the position of the model is fixed at the center of the map. The position can be fittted by unfreezing the `lon_0` and `lat_0 `parameters. In that case, the coordinate of every pixel is shifted in lon and lat in the frame of the map. NOTE: planar distances are calculated, so the results are correct only when the fitted position is close to the map center. Parameters ---------- map : `~gammapy.maps.Map` Map template. meta : dict, optional Meta information, meta['filename'] will be used for serialisation. normalize : bool Normalize the input map so that it integrates to unity. interp_kwargs : dict Interpolation keyword arguments passed to `gammapy.maps.Map.interp_by_coord`. Default arguments are {'method': 'linear', 'fill_value': 0, "values_scale": "log"}. Filename : str Name of the map file. copy_data : bool Create a deepcopy of the map data or directly use the original. Default is True. Use False to save memory in case of large maps. **kwargs : dict Keyword arguments forwarded to `SpatialModel.__init__`. """ tag = ["TemplateSpatialModel", "template"] lon_0 = Parameter("lon_0", np.nan, unit="deg", frozen=True) lat_0 = Parameter("lat_0", np.nan, unit="deg", min=-90, max=90, frozen=True) def __init__( self, map, meta=None, normalize=True, interp_kwargs=None, filename=None, copy_data=True, **kwargs, ): if (map.data < 0).any(): log.warning("Map has negative values. Check and fix this!") if (map.data == 0.0).all(): log.warning("Map values are all zeros. Check and fix this!") if not map.geom.is_image and (map.data.sum(axis=(1, 2)) == 0).any(): log.warning( "Map values are all zeros in at least one energy bin. Check and fix this!" ) if filename is not None: filename = str(make_path(filename)) self.normalize = normalize if normalize: # Normalize the diffuse map model so that it integrates to unity if map.geom.is_image: data_sum = map.data.sum() else: # Normalize in each energy bin data_sum = map.data.sum(axis=(1, 2)).reshape((-1, 1, 1)) data = np.divide( map.data, data_sum, out=np.zeros_like(map.data), where=data_sum != 0 ) data /= map.geom.solid_angle().to_value("sr") map = map.copy(data=data, unit="sr-1") if map.unit.is_equivalent(""): map = map.copy(data=map.data, unit="sr-1") log.warning("Missing spatial template unit, assuming sr^-1") if copy_data: self._map = map.copy() else: self._map = map.copy(data=map.data) self.meta = {} if meta is None else meta interp_kwargs = {} if interp_kwargs is None else interp_kwargs interp_kwargs.setdefault("method", "linear") interp_kwargs.setdefault("fill_value", 0) interp_kwargs.setdefault("values_scale", "log") self._interp_kwargs = interp_kwargs self.filename = filename kwargs["frame"] = self.map.geom.frame if "lon_0" not in kwargs or ( isinstance(kwargs["lon_0"], Parameter) and np.isnan(kwargs["lon_0"].value) ): kwargs["lon_0"] = self.map_center.data.lon if "lat_0" not in kwargs or ( isinstance(kwargs["lat_0"], Parameter) and np.isnan(kwargs["lat_0"].value) ): kwargs["lat_0"] = self.map_center.data.lat super().__init__(**kwargs) def __str__(self): width = self.map.geom.width data_min = np.nanmin(self.map.data) data_max = np.nanmax(self.map.data) prnt = ( f"{self.__class__.__name__} model summary:\n " f"Model center: {self.position} \n " f"Map center: {self.map_center} \n " f"Map width: {width} \n " f"Data min: {data_min} \n" f"Data max: {data_max} \n" f"Data unit: {self.map.unit}" ) if self.is_energy_dependent: energy_min = self.map.geom.axes["energy_true"].center[0] energy_max = self.map.geom.axes["energy_true"].center[-1] prnt1 = f"Energy min: {energy_min} \n" f"Energy max: {energy_max} \n" prnt = prnt + prnt1 return prnt
[docs] def copy(self, copy_data=False, **kwargs): """Copy model. Parameters ---------- copy_data : bool Whether to copy the data. Default is False. **kwargs : dict Keyword arguments forwarded to `TemplateSpatialModel`. Returns ------- model : `TemplateSpatialModel` Copied template spatial model. """ kwargs.setdefault("map", self.map) kwargs.setdefault("meta", self.meta.copy()) kwargs.setdefault("normalize", self.normalize) kwargs.setdefault("interp_kwargs", self._interp_kwargs) kwargs.setdefault("filename", self.filename) kwargs.setdefault("lon_0", self.parameters["lon_0"].copy()) kwargs.setdefault("lat_0", self.parameters["lat_0"].copy()) return self.__class__(copy_data=copy_data, **kwargs)
@property def map(self): """Template map as a `~gammapy.maps.Map` object.""" return self._map @property def is_energy_dependent(self): return "energy_true" in self.map.geom.axes.names @property def evaluation_radius(self): """Evaluation radius as an `~astropy.coordinates.Angle`. Set to half of the maximal dimension of the map. """ return np.max(self.map.geom.width) / 2.0 @property def map_center(self): return self.map.geom.center_skydir
[docs] @classmethod def read(cls, filename, normalize=True, **kwargs): """Read spatial template model from FITS image. If unit is not given in the FITS header the default is ``sr-1``. Parameters ---------- filename : str FITS image filename. normalize : bool Normalize the input map so that it integrates to unity. kwargs : dict Keyword arguments passed to `Map.read()`. """ m = Map.read(filename, **kwargs) return cls(m, normalize=normalize, filename=filename)
[docs] def evaluate(self, lon, lat, energy=None, lon_0=None, lat_0=None): """Evaluate the model at given coordinates. Note that, if the map data assume negative values, these are clipped to zero. """ offset_lon = 0.0 * u.deg if lon_0 is None else lon_0 - self.map_center.data.lon offset_lat = 0.0 * u.deg if lat_0 is None else lat_0 - self.map_center.data.lat coord = { "lon": (lon - offset_lon).to_value("deg"), "lat": (lat - offset_lat).to_value("deg"), } if energy is not None: coord["energy_true"] = energy val = self.map.interp_by_coord(coord, **self._interp_kwargs) val = np.clip(val, 0, a_max=None) return u.Quantity(val, self.map.unit, copy=False)
@property def position_lonlat(self): """Spatial model center position `(lon, lat)` in radians and frame of the model.""" lon = self.position.data.lon.rad lat = self.position.data.lat.rad return lon, lat
[docs] @classmethod def from_dict(cls, data): data = data["spatial"] filename = data["filename"] normalize = data.get("normalize", True) m = Map.read(filename) pars = data.get("parameters") if pars is not None: parameters = _build_parameters_from_dict(pars, cls.default_parameters) kwargs = {par.name: par for par in parameters} else: kwargs = {} return cls(m, normalize=normalize, filename=filename, **kwargs)
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serilisation.""" data = super().to_dict(full_output) data["spatial"]["filename"] = self.filename data["spatial"]["normalize"] = self.normalize data["spatial"]["unit"] = str(self.map.unit) return data
[docs] def write(self, overwrite=False): """ Write the map. Parameters ---------- overwrite: bool, optional Overwrite existing file. Default is False, which will raise a warning if the template file exists already. """ if self.filename is None: raise IOError("Missing filename") elif os.path.isfile(make_path(self.filename)) and not overwrite: log.warning("Template file already exits, and overwrite is False") else: self.map.write(self.filename, overwrite=overwrite)
[docs] def to_region(self, **kwargs): """Model outline from template map boundary as a `~regions.RectangleSkyRegion`.""" return RectangleSkyRegion( center=self.map.geom.center_skydir, width=self.map.geom.width[0][0], height=self.map.geom.width[1][0], **kwargs, )
[docs] def plot(self, ax=None, geom=None, **kwargs): if geom is None: geom = self.map.geom super().plot(ax=ax, geom=geom, **kwargs)
[docs] def plot_interactive(self, ax=None, geom=None, **kwargs): if geom is None: geom = self.map.geom super().plot_interactive(ax=ax, geom=geom, **kwargs)
[docs]class TemplateNDSpatialModel(SpatialModel): """A model generated from a ND map where extra dimensions define the parameter space. Parameters ---------- map : `~gammapy.maps.WcsNDMap` or `~gammapy.maps.HpxNDMap` Map template. meta : dict, optional Meta information, meta['filename'] will be used for serialisation. interp_kwargs : dict Interpolation keyword arguments passed to `gammapy.maps.Map.interp_by_pix`. Default arguments are {'method': 'linear', 'fill_value': 0, "values_scale": "log"}. copy_data : bool Create a deepcopy of the map data or directly use the original. Default is True. Use False to save memory in case of large maps. """ tag = ["TemplateNDSpatialModel", "templateND"] def __init__( self, map, interp_kwargs=None, meta=None, filename=None, copy_data=True, ): if not isinstance(map, (HpxNDMap, WcsNDMap)): raise TypeError("Map should be a HpxNDMap or WcsNDMap") if copy_data: self._map = map.copy() else: self._map = map.copy(data=map.data) self.meta = dict() if meta is None else meta if filename is not None: filename = str(make_path(filename)) self.filename = filename parameters = [] for axis in map.geom.axes: if axis.name not in ["energy_true", "energy"]: center = (axis.bounds[1] + axis.bounds[0]) / 2 parameter = Parameter( name=axis.name, value=center, unit=axis.unit, scale_method="scale10", min=axis.bounds[0], max=axis.bounds[-1], interp=axis.interp, ) parameters.append(parameter) self.default_parameters = Parameters(parameters) interp_kwargs = interp_kwargs or {} interp_kwargs.setdefault("method", "linear") interp_kwargs.setdefault("fill_value", 0) interp_kwargs.setdefault("values_scale", "log") self._interp_kwargs = interp_kwargs super().__init__() @property def map(self): """Template map as a `~gammapy.maps.WcsNDMap` or `~gammapy.maps.HpxNDMap`.""" return self._map @property def is_energy_dependent(self): return "energy_true" in self.map.geom.axes.names
[docs] def evaluate(self, lon, lat, energy=None, **kwargs): coord = { "lon": lon.to_value("deg"), "lat": lat.to_value("deg"), } if energy is not None: coord["energy_true"] = energy coord.update(kwargs) val = self.map.interp_by_coord(coord, **self._interp_kwargs) val = np.clip(val, 0, a_max=None) return u.Quantity(val, self.map.unit, copy=False)
[docs] def write(self, overwrite=False): """ Write the map. Parameters ---------- overwrite: bool, optional Overwrite existing file. Default is False, which will raise a warning if the template file exists already. """ if self.filename is None: raise IOError("Missing filename") elif os.path.isfile(self.filename) and not overwrite: log.warning("Template file already exits, and overwrite is False") else: self.map.write(self.filename)
[docs] @classmethod def from_dict(cls, data): data = data["spatial"] filename = data["filename"] m = Map.read(filename) model = cls(m, filename=filename) for idx, p in enumerate(model.parameters): par = p.to_dict() par.update(data["parameters"][idx]) setattr(model, p.name, Parameter(**par)) return model
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serialisation.""" data = super().to_dict(full_output) data["spatial"]["filename"] = self.filename data["spatial"]["unit"] = str(self.map.unit) return data
[docs]class PiecewiseNormSpatialModel(SpatialModel): """Piecewise spatial correction with a free normalization at each fixed nodes. For more information see :ref:`piecewise-norm-spatial`. Parameters ---------- coord : `gammapy.maps.MapCoord` Flat coordinates list at which the model values are given (nodes). norms : `~numpy.ndarray` or list of `Parameter` Array with the initial norms of the model at energies ``energy``. Normalisation parameters are created for each value. Default is one at each node. interp : {"lin", "log"} Interpolation scaling. Default is "lin". """ tag = ["PiecewiseNormSpatialModel", "piecewise-norm"] def __init__(self, coords, norms=None, interp="lin", **kwargs): self._coords = coords.copy() self._coords["lon"] = Angle(coords.lon).wrap_at(0 * u.deg) self._wrap_angle = (coords.lon.max() - coords.lon.min()) / 2 self._coords["lon"] = Angle(coords.lon).wrap_at(self._wrap_angle) self._interp = interp if norms is None: norms = np.ones(coords.shape) if len(norms) != coords.shape[0]: raise ValueError("dimension mismatch") if len(norms) < 4: raise ValueError("Input arrays must contain at least 4 elements") if self.is_energy_dependent: raise ValueError("Energy dependent nodes are not supported") if not isinstance(norms[0], Parameter): parameters = Parameters( [Parameter(f"norm_{k}", norm) for k, norm in enumerate(norms)] ) else: parameters = Parameters(norms) self.default_parameters = parameters super().__init__(**kwargs) @property def coords(self): """Energy nodes.""" return self._coords @property def norms(self): """Norm values.""" return u.Quantity([p.quantity for p in self.parameters]) @property def is_energy_dependent(self): keys = self.coords._data.keys() return "energy" in keys or "energy_true" in keys
[docs] def evaluate(self, lon, lat, energy=None, **norms): """Evaluate the model at given coordinates.""" scale = interpolation_scale(scale=self._interp) v_nodes = scale(self.norms.value) coords = [value.value for value in self.coords._data.values()] # TODO: apply axes scaling in this loop coords = list(zip(*coords)) lon = Angle(lon).wrap_at(self._wrap_angle) # by default rely on CloughTocher2DInterpolator # (Piecewise cubic, C1 smooth, curvature-minimizing interpolant) interpolated = griddata(coords, v_nodes, (lon, lat), method="cubic") return scale.inverse(interpolated) * self.norms.unit
[docs] def evaluate_geom(self, geom): """Evaluate model on `~gammapy.maps.Geom`. Parameters ---------- geom : `~gammapy.maps.WcsGeom` Map geometry. Returns ------- map : `~gammapy.maps.Map` Map containing the value in each spatial bin. """ coords = geom.get_coord(frame=self.frame, sparse=True) return self(coords.lon, coords.lat)
[docs] def to_dict(self, full_output=False): data = super().to_dict(full_output=full_output) for key, value in self.coords._data.items(): data["spatial"][key] = { "data": value.data.tolist(), "unit": str(value.unit), } return data
[docs] @classmethod def from_dict(cls, data): """Create model from dictionary.""" data = data["spatial"] lon = u.Quantity(data["lon"]["data"], data["lon"]["unit"]) lat = u.Quantity(data["lat"]["data"], data["lat"]["unit"]) coords = MapCoord.create((lon, lat)) parameters = Parameters.from_dict(data["parameters"]) return cls.from_parameters(parameters, coords=coords, frame=data["frame"])
[docs] @classmethod def from_parameters(cls, parameters, **kwargs): """Create model from parameters.""" return cls(norms=parameters, **kwargs)