# Licensed under a 3-clause BSD style license - see LICENSE.rst
import copy
from pathlib import Path
import numpy as np
import astropy.units as u
from ..utils.fitting import Parameter, Model, Parameters
from ..spectrum.models import SpectralModel
from ..image.models import SkySpatialModel
from ..utils.scripts import make_path, write_yaml
from ..maps import Map
__all__ = [
"SkyModelBase",
"SkyModels",
"SkyModel",
"SkyDiffuseCube",
"BackgroundModel",
"BackgroundModels",
]
[docs]class SkyModelBase(Model):
"""Sky model base class"""
def __add__(self, skymodel):
skymodels = [self]
if isinstance(skymodel, SkyModels):
skymodels += skymodel.skymodels
elif isinstance(skymodel, (SkyModel, SkyDiffuseCube)):
skymodels += [skymodel]
else:
raise NotImplementedError
return SkyModels(skymodels)
def __radd__(self, model):
return self.__add__(model)
[docs] def __call__(self, lon, lat, energy):
return self.evaluate(lon, lat, energy)
[docs]class SkyModels:
"""Collection of `~gammapy.cube.models.SkyModel`
Parameters
----------
skymodels : list of `~gammapy.cube.models.SkyModel`
Sky models
Examples
--------
Read from an XML file::
from gammapy.cube import SkyModels
filename = '$GAMMAPY_DATA/tests/models/fermi_model.xml'
sourcelib = SkyModels.read(filename)
"""
frame = None
__slots__ = ["skymodels"]
def __init__(self, skymodels):
existing_names = []
for model in skymodels:
if model.name in existing_names:
raise ValueError(
"SkyModel '{}' already exists, please choose"
" another name.".format(model.name)
)
existing_names.append(model.name)
self.skymodels = skymodels
@property
def parameters(self):
parameters = []
for skymodel in self.skymodels:
for p in skymodel.parameters:
parameters.append(p)
return Parameters(parameters)
@property
def names(self):
"""Sky model names"""
return [_.name for _ in self.skymodels]
[docs] @classmethod
def from_xml(cls, xml):
"""Read from XML string."""
from ..utils.serialization import xml_to_sky_models
return xml_to_sky_models(xml)
[docs] @classmethod
def read(cls, filename):
"""Read from XML file.
The XML definition of some models is uncompatible with the models
currently implemented in gammapy. Therefore the following modifications
happen to the XML model definition
* PowerLaw: The spectral index is negative in XML but positive in
gammapy. Parameter limits are ignored
* ExponentialCutoffPowerLaw: The cutoff energy is transferred to
lambda = 1 / cutof energy on read
"""
path = make_path(filename)
xml = path.read_text()
return cls.from_xml(xml)
[docs] def to_xml(self, filename):
"""Write to XML file."""
from ..utils.serialization import sky_models_to_xml
xml = sky_models_to_xml(self)
filename = make_path(filename)
with filename.open("w") as output:
output.write(xml)
[docs] @classmethod
def from_yaml(cls, filename):
"""Write to YAML file."""
from ..utils.serialization import dict_to_models
from ..utils.scripts import read_yaml
data = read_yaml(filename)
skymodels = dict_to_models(data)
return cls(skymodels)
[docs] def to_yaml(self, filename, selection="all"):
"""Write to YAML file."""
from ..utils.serialization import models_to_dict
components_dict = models_to_dict(self.skymodels, selection)
write_yaml(components_dict, filename)
[docs] def evaluate(self, lon, lat, energy):
out = self.skymodels[0].evaluate(lon, lat, energy)
for skymodel in self.skymodels[1:]:
out += skymodel.evaluate(lon, lat, energy)
return out
def __str__(self):
str_ = self.__class__.__name__ + "\n\n"
for idx, skymodel in enumerate(self.skymodels):
str_ += "Component {idx}: {skymodel}\n\n\t".format(
idx=idx, skymodel=skymodel
)
str_ += "\n\n"
if self.parameters.covariance is not None:
str_ += "\n\nCovariance: \n\n\t"
covariance = self.parameters.covariance_to_table()
str_ += "\n\t".join(covariance.pformat())
return str_
def __iadd__(self, skymodel):
if isinstance(skymodel, SkyModels):
self.skymodels += skymodel.skymodels
elif isinstance(skymodel, (SkyModel, SkyDiffuseCube)):
self.skymodels += [skymodel]
else:
raise NotImplementedError
return self
def __add__(self, skymodel):
skymodels = self.skymodels.copy()
if isinstance(skymodel, SkyModels):
skymodels += skymodel.skymodels
elif isinstance(skymodel, (SkyModel, SkyDiffuseCube)):
skymodels += [skymodel]
else:
raise NotImplementedError
return SkyModels(skymodels)
def __getitem__(self, item):
idx = self.names.index(item)
return self.skymodels[idx]
[docs]class SkyModel(SkyModelBase):
"""Sky model component.
This model represents a factorised sky model.
It has a `~gammapy.utils.modeling.Parameters`
combining the spatial and spectral parameters.
TODO: add possibility to have a temporal model component also.
Parameters
----------
spatial_model : `~gammapy.image.models.SkySpatialModel`
Spatial model (must be normalised to integrate to 1)
spectral_model : `~gammapy.spectrum.models.SpectralModel`
Spectral model
name : str
Model identifier
"""
__slots__ = ["name", "_spatial_model", "_spectral_model"]
def __init__(self, spatial_model, spectral_model, name="source"):
self.name = name
if not isinstance(spatial_model, SkySpatialModel):
raise ValueError(
"Spatial model must be instance / subclass "
" of `SkySpatialModel` and not {}.".format(
spatial_model.__class__.__name__
)
)
self._spatial_model = spatial_model
if not isinstance(spectral_model, SpectralModel):
raise ValueError(
"Spectral model model must be instance / subclass "
"of `SpectralModel` and not {}.".format(
spatial_model.__class__.__name__
)
)
self._spectral_model = spectral_model
parameters = (
spatial_model.parameters.parameters + spectral_model.parameters.parameters
)
super().__init__(parameters)
@property
def spatial_model(self):
"""`~gammapy.image.models.SkySpatialModel`"""
return self._spatial_model
@property
def spectral_model(self):
"""`~gammapy.spectrum.models.SpectralModel`"""
return self._spectral_model
@spectral_model.setter
def spectral_model(self, model):
"""`~gammapy.spectrum.models.SpectralModel`"""
self._spectral_model = model
self._parameters = Parameters(
self.spatial_model.parameters.parameters
+ self.spectral_model.parameters.parameters
)
@property
def position(self):
"""`~astropy.coordinates.SkyCoord`"""
return self.spatial_model.position
@property
def evaluation_radius(self):
"""`~astropy.coordinates.Angle`"""
return self.spatial_model.evaluation_radius
@property
def frame(self):
return self.spatial_model.frame
def __repr__(self):
fmt = "{}(spatial_model={!r}, spectral_model={!r})"
return fmt.format(
self.__class__.__name__, self.spatial_model, self.spectral_model
)
[docs] def evaluate(self, lon, lat, energy):
"""Evaluate the model at given points.
The model evaluation follows numpy broadcasting rules.
Return differential surface brightness cube.
At the moment in units: ``cm-2 s-1 TeV-1 deg-2``
Parameters
----------
lon, lat : `~astropy.units.Quantity`
Spatial coordinates
energy : `~astropy.units.Quantity`
Energy coordinate
Returns
-------
value : `~astropy.units.Quantity`
Model value at the given point.
"""
val_spatial = self.spatial_model(lon, lat) # pylint:disable=not-callable
val_spectral = self.spectral_model(energy) # pylint:disable=not-callable
return val_spatial * val_spectral
[docs] def copy(self, **kwargs):
"""Copy SkyModel"""
kwargs.setdefault("spatial_model", self.spatial_model.copy())
kwargs.setdefault("spectral_model", self.spectral_model.copy())
kwargs.setdefault("name", self.name + "-copy")
return self.__class__(**kwargs)
[docs]class SkyDiffuseCube(SkyModelBase):
"""Cube sky map template model (3D).
This is for a 3D map with an energy axis. Use `~gammapy.image.models.SkyDiffuseMap`
for 2D maps.
Parameters
----------
map : `~gammapy.maps.Map`
Map template
norm : float
Norm parameter (multiplied with map values)
meta : dict, optional
Meta information, meta['filename'] will be used for serialization
interp_kwargs : dict
Interpolation keyword arguments passed to `gammapy.maps.Map.interp_by_coord`.
Default arguments are {'interp': 'linear', 'fill_value': 0}.
"""
__slots__ = ["map", "norm", "meta", "_interp_kwargs"]
def __init__(
self,
map,
norm=1,
meta=None,
interp_kwargs=None,
name="diffuse",
filename=None,
obs_id="Global",
):
self.name = name
axis = map.geom.get_axis_by_name("energy")
if axis.node_type != "center":
raise ValueError('Need a map with energy axis node_type="center"')
self.map = map
self.norm = Parameter("norm", norm)
self.meta = {} if meta is None else meta
self.filename = filename
self.obs_id = obs_id
interp_kwargs = {} if interp_kwargs is None else interp_kwargs
interp_kwargs.setdefault("interp", "linear")
interp_kwargs.setdefault("fill_value", 0)
self._interp_kwargs = interp_kwargs
super().__init__([self.norm])
[docs] @classmethod
def read(cls, filename, **kwargs):
"""Read map from FITS file.
The default unit used if none is found in the file is ``cm-2 s-1 MeV-1 sr-1``.
Parameters
----------
filename : str
FITS image filename.
"""
m = Map.read(filename, **kwargs)
if m.unit == "":
m.unit = "cm-2 s-1 MeV-1 sr-1"
name = Path(filename).stem
return cls(m, name=name, filename=filename)
[docs] def evaluate(self, lon, lat, energy):
"""Evaluate model."""
coord = {
"lon": lon.to_value("deg"),
"lat": lat.to_value("deg"),
"energy": energy,
}
val = self.map.interp_by_coord(coord, **self._interp_kwargs)
norm = self.parameters["norm"].value
return u.Quantity(norm * val, self.map.unit, copy=False)
[docs] def copy(self):
"""A shallow copy"""
return copy.copy(self)
@property
def position(self):
"""`~astropy.coordinates.SkyCoord`"""
return self.map.geom.center_skydir
@property
def evaluation_radius(self):
"""`~astropy.coordinates.Angle`"""
radius = np.max(self.map.geom.width) / 2.0
return radius
@property
def frame(self):
return self.position.frame.name
[docs]class BackgroundModel(Model):
"""Background model.
Create a new map by a tilt and normalisation on the available map
Parameters
----------
background : `~gammapy.maps.Map`
Background model map
norm : float
Background normalisation
tilt : float
Additional tilt in the spectrum
reference : `~astropy.units.Quantity`
Reference energy of the tilt.
"""
__slots__ = ["map", "norm", "tilt", "reference", "name", "filename", "obs_id"]
def __init__(
self,
background,
norm=1,
tilt=0,
reference="1 TeV",
name="background",
filename=None,
obs_id=None,
):
axis = background.geom.get_axis_by_name("energy")
if axis.node_type != "edges":
raise ValueError('Need an integrated map, energy axis node_type="edges"')
self.map = background
self.norm = Parameter("norm", norm, unit="", min=0)
self.tilt = Parameter("tilt", tilt, unit="", frozen=True)
self.reference = Parameter("reference", reference, frozen=True)
self.name = name
self.filename = filename
self.obs_id = obs_id
super().__init__([self.norm, self.tilt, self.reference])
@property
def energy_center(self):
"""True energy axis bin centers (`~astropy.units.Quantity`)"""
energy_axis = self.map.geom.get_axis_by_name("energy")
energy = energy_axis.center
return energy[:, np.newaxis, np.newaxis]
[docs] def evaluate(self):
"""Evaluate background model.
Returns
-------
background_map : `~gammapy.maps.Map`
Background evaluated on the Map
"""
norm = self.parameters["norm"].value
tilt = self.parameters["tilt"].value
reference = self.parameters["reference"].quantity
tilt_factor = np.power((self.energy_center / reference).to(""), -tilt)
back_values = norm * self.map.data * tilt_factor.value
return self.map.copy(data=back_values)
[docs] @classmethod
def from_skymodel(cls, skymodel, exposure, edisp=None, psf=None, **kwargs):
"""Create background model from sky model by applying IRFs.
Typically used for diffuse Galactic or constant emission models.
Parameters
----------
skymodel : `~gammapy.cube.models.SkyModel` or `~gammapy.cube.models.SkyDiffuseCube`
Sky model
exposure : `~gammapy.maps.Map`
Exposure map
edisp : `~gammapy.irf.EnergyDispersion`
Energy dispersion
psf : `~gammapy.cube.PSFKernel`
PSF kernel
"""
from .fit import MapEvaluator
evaluator = MapEvaluator(
model=skymodel, exposure=exposure, edisp=edisp, psf=psf
)
background = evaluator.compute_npred()
background_model = cls(background=background, **kwargs)
background_model.name = skymodel.name
if skymodel.__class__.__name__ == "SkyDiffuseCube":
background_model.filename = skymodel.filename
return background_model
def __add__(self, model):
models = [self]
if isinstance(model, BackgroundModels):
models += model.models
elif isinstance(model, BackgroundModel):
models += [model]
else:
raise NotImplementedError
return BackgroundModels(models)
[docs]class BackgroundModels(Model):
"""Background models.
Parameters
----------
models : list of `BackgroundModel`
List of background models.
"""
__slots__ = ["models", "_parameters"]
def __init__(self, models):
self.models = models
parameters = []
for model in models:
for p in model.parameters:
parameters.append(p)
super().__init__(parameters)
[docs] def evaluate(self):
"""Evaluate background models."""
for idx, model in enumerate(self.models):
if idx == 0:
vals = model.evaluate()
else:
vals += model.evaluate()
return vals
def __iadd__(self, model):
if isinstance(model, BackgroundModels):
self.models += model.models
elif isinstance(model, BackgroundModel):
self.models += [model]
else:
raise NotImplementedError
return self
def __add__(self, model):
model_ = self.copy()
model_ += model
return model_
[docs] def to_yaml(self, filename, selection="all"):
"""Write to yaml file."""
from ..utils.serialization import models_to_dict
components_dict = models_to_dict(self.models, selection)
write_yaml(components_dict, filename)