# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Cube models (axes: lon, lat, energy)."""
import logging
import warnings
import os
import numpy as np
import astropy.units as u
from astropy.nddata import NoOverlapError
from astropy.time import Time
from gammapy.maps import Map, MapAxis, WcsGeom
from gammapy.modeling import Parameters
from gammapy.modeling.covariance import CovarianceMixin
from gammapy.modeling.parameter import _get_parameters_str
from gammapy.utils.compat import COPY_IF_NEEDED
from gammapy.utils.fits import LazyFitsData
from gammapy.utils.scripts import make_name, make_path
from gammapy.utils.deprecation import GammapyDeprecationWarning
from .core import Model, ModelBase, Models
from .spatial import ConstantSpatialModel, SpatialModel
from .spectral import PowerLawNormSpectralModel, SpectralModel, TemplateSpectralModel
from .temporal import TemporalModel
log = logging.getLogger(__name__)
__all__ = [
"create_fermi_isotropic_diffuse_model",
"FoVBackgroundModel",
"SkyModel",
"TemplateNPredModel",
]
[docs]
class SkyModel(CovarianceMixin, ModelBase):
"""Sky model component.
This model represents a factorised sky model.
It has `~gammapy.modeling.Parameters` combining the spatial and spectral parameters.
Parameters
----------
spectral_model : `~gammapy.modeling.models.SpectralModel`
Spectral model.
spatial_model : `~gammapy.modeling.models.SpatialModel`
Spatial model (must be normalised to integrate to 1).
temporal_model : `~gammapy.modeling.models.TemporalModel`
Temporal model.
name : str
Model identifier.
apply_irf : dict
Dictionary declaring which IRFs should be applied to this model. Options
are {"exposure": True, "psf": True, "edisp": True}.
datasets_names : list of str
Which datasets this model is applied to.
"""
tag = ["SkyModel", "sky-model"]
_apply_irf_default = {"exposure": True, "psf": True, "edisp": True}
def __init__(
self,
spectral_model,
spatial_model=None,
temporal_model=None,
name=None,
apply_irf=None,
datasets_names=None,
covariance_data=None,
):
self.spatial_model = spatial_model
self.spectral_model = spectral_model
self.temporal_model = temporal_model
self._name = make_name(name)
if apply_irf is None:
apply_irf = self._apply_irf_default.copy()
self.apply_irf = apply_irf
self.datasets_names = datasets_names
self._check_unit()
super().__init__(covariance_data=covariance_data)
@property
def _models(self):
models = self.spectral_model, self.spatial_model, self.temporal_model
return [model for model in models if model is not None]
def _check_unit(self):
axis = MapAxis.from_energy_bounds(
"0.1 TeV", "10 TeV", nbin=1, name="energy_true"
)
geom = WcsGeom.create(skydir=self.position, npix=(2, 2), axes=[axis])
time = Time(55555, format="mjd")
if self.apply_irf["exposure"]:
ref_unit = u.Unit("cm-2 s-1 MeV-1")
else:
ref_unit = u.Unit("")
obt_unit = self.spectral_model(axis.center).unit
if self.spatial_model:
obt_unit = obt_unit * self.spatial_model.evaluate_geom(geom).unit
ref_unit = ref_unit / u.sr
if self.temporal_model:
if u.Quantity(self.temporal_model(time)).unit.is_equivalent(
self.spectral_model(axis.center).unit
):
obt_unit = (
(
self.temporal_model(time)
* self.spatial_model.evaluate_geom(geom).unit
)
.to(obt_unit.to_string())
.unit
)
else:
obt_unit = obt_unit * u.Quantity(self.temporal_model(time)).unit
if not obt_unit.is_equivalent(ref_unit):
raise ValueError(
f"SkyModel unit {obt_unit} is not equivalent to {ref_unit}"
)
@property
def name(self):
return self._name
@property
def parameters(self):
parameters = []
parameters.append(self.spectral_model.parameters)
if self.spatial_model is not None:
parameters.append(self.spatial_model.parameters)
if self.temporal_model is not None:
parameters.append(self.temporal_model.parameters)
return Parameters.from_stack(parameters)
@property
def parameters_unique_names(self):
"""List of unique parameter names. Return formatted as par_type.par_name."""
names = []
for model in self._models:
for par_name in model.parameters_unique_names:
components = [model.type, par_name]
name = ".".join(components)
names.append(name)
return names
@property
def spatial_model(self):
"""Spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
return self._spatial_model
@spatial_model.setter
def spatial_model(self, model):
if not (model is None or isinstance(model, SpatialModel)):
raise TypeError(f"Invalid type: {model!r}")
self._spatial_model = model
@property
def spectral_model(self):
"""Spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
return self._spectral_model
@spectral_model.setter
def spectral_model(self, model):
if not (model is None or isinstance(model, SpectralModel)):
raise TypeError(f"Invalid type: {model!r}")
self._spectral_model = model
@property
def temporal_model(self):
"""Temporal model as a `~gammapy.modeling.models.TemporalModel` object."""
return self._temporal_model
@temporal_model.setter
def temporal_model(self, model):
if not (model is None or isinstance(model, TemporalModel)):
raise TypeError(f"Invalid type: {model!r}")
self._temporal_model = model
@property
def position(self):
"""Position as a `~astropy.coordinates.SkyCoord`."""
return getattr(self.spatial_model, "position", None)
@property
def position_lonlat(self):
"""Spatial model center position `(lon, lat)` in radians and frame of the model."""
return getattr(self.spatial_model, "position_lonlat", None)
@property
def evaluation_bin_size_min(self):
"""Minimal spatial bin size for spatial model evaluation."""
if (
self.spatial_model is not None
and self.spatial_model.evaluation_bin_size_min is not None
):
return self.spatial_model.evaluation_bin_size_min
else:
return None
@property
def evaluation_radius(self):
"""Evaluation radius as an `~astropy.coordinates.Angle`."""
return self.spatial_model.evaluation_radius
@property
def evaluation_region(self):
"""Evaluation region as an `~astropy.coordinates.Angle`."""
return self.spatial_model.evaluation_region
@property
def frame(self):
return self.spatial_model.frame
def __add__(self, other):
if isinstance(other, (Models, list)):
return Models([self, *other])
elif isinstance(other, (SkyModel, TemplateNPredModel)):
return Models([self, other])
else:
raise TypeError(f"Invalid type: {other!r}")
def __radd__(self, model):
return self.__add__(model)
[docs]
def __call__(self, lon, lat, energy, time=None):
return self.evaluate(lon, lat, energy, time)
def __repr__(self):
return (
f"{self.__class__.__name__}("
f"spatial_model={self.spatial_model!r}, "
f"spectral_model={self.spectral_model!r})"
f"temporal_model={self.temporal_model!r})"
)
[docs]
def contributes(self, mask, margin="0 deg"):
"""Check if a sky model contributes within a mask map.
Parameters
----------
mask : `~gammapy.maps.WcsNDMap` of boolean type
Map containing a boolean mask.
margin : `~astropy.units.Quantity`
Add a margin in degree to the source evaluation radius.
Used to take into account PSF width.
Returns
-------
models : `DatasetModels`
Selected models contributing inside the region where mask is True.
"""
from gammapy.datasets.evaluator import CUTOUT_MARGIN
margin = u.Quantity(margin)
if not mask.geom.is_image:
mask = mask.reduce_over_axes(func=np.logical_or)
if mask.geom.is_region and mask.geom.region is not None:
if mask.geom.is_all_point_sky_regions:
return True
geom = mask.geom.to_wcs_geom()
mask = geom.region_mask([mask.geom.region])
try:
mask_cutout = mask.cutout(
position=self.position,
width=(2 * self.evaluation_radius + CUTOUT_MARGIN + margin),
)
contributes = np.any(mask_cutout.data)
except (NoOverlapError, ValueError):
contributes = False
return contributes
[docs]
def evaluate(self, lon, lat, energy, time=None):
"""Evaluate the model at given points.
The model evaluation follows numpy broadcasting rules.
Return differential surface brightness cube.
At the moment in units: ``cm-2 s-1 TeV-1 deg-2``.
Parameters
----------
lon, lat : `~astropy.units.Quantity`
Spatial coordinates.
energy : `~astropy.units.Quantity`
Energy coordinate.
time: `~astropy.time.Time`, optional
Time coordinate. Default is None.
Returns
-------
value : `~astropy.units.Quantity`
Model value at the given point.
"""
value = self.spectral_model(energy) # pylint:disable=not-callable
# TODO: case if self.temporal_model is not None, introduce time in arguments ?
if self.spatial_model is not None:
if self.spatial_model.is_energy_dependent:
spatial = self.spatial_model(lon, lat, energy)
else:
spatial = self.spatial_model(lon, lat)
value = value * spatial # pylint:disable=not-callable
if (self.temporal_model is not None) and (time is not None):
value = value * self.temporal_model(time)
return value
[docs]
def evaluate_geom(self, geom, gti=None):
"""Evaluate model on `~gammapy.maps.Geom`."""
coords = geom.get_coord(sparse=True)
value = self.spectral_model(coords["energy_true"])
additional_axes = set(coords.axis_names) - {
"lon",
"lat",
"energy_true",
"time",
}
for axis in additional_axes:
value = value * np.ones_like(coords[axis])
if self.spatial_model:
value = value * self.spatial_model.evaluate_geom(geom)
if self.temporal_model:
value = self._compute_time_integral(value, geom, gti)
return value
[docs]
def integrate_geom(self, geom, gti=None, oversampling_factor=None):
"""Integrate model on `~gammapy.maps.Geom`.
See `~gammapy.modeling.models.SpatialModel.integrate_geom` and
`~gammapy.modeling.models.SpectralModel.integral`.
Parameters
----------
geom : `Geom` or `~gammapy.maps.RegionGeom`
Map geometry.
gti : `GTI`, optional
GIT table. Default is None.
oversampling_factor : int, optional
The oversampling factor to use for spatial integration.
Default is None: the factor is estimated from the model minimal bin size.
Returns
-------
flux : `Map`
Predicted flux map.
"""
energy = geom.axes["energy_true"].edges
shape = len(geom.data_shape) * [
1,
]
shape[geom.axes.index_data("energy_true")] = -1
value = self.spectral_model.integral(
energy[:-1],
energy[1:],
).reshape(shape)
if self.spatial_model:
value = (
value
* self.spatial_model.integrate_geom(
geom, oversampling_factor=oversampling_factor
).quantity
)
if self.temporal_model:
value = self._compute_time_integral(value, geom, gti)
value = value * np.ones(geom.data_shape)
return Map.from_geom(geom=geom, data=value.value, unit=value.unit)
def _compute_time_integral(self, value, geom, gti):
"""Multiply input value with time integral for the given geometry and GTI."""
if "time" in geom.axes.names:
if geom.axes.names[-1] != "time":
raise ValueError(
"Incorrect axis order. The time axis must be the last axis"
)
time_axis = geom.axes["time"]
temp_eval = np.ones(time_axis.nbin)
for idx in range(time_axis.nbin):
if gti is None:
t1, t2 = time_axis.time_min[idx], time_axis.time_max[idx]
else:
gti_in_bin = gti.select_time(
time_interval=[
time_axis.time_min[idx],
time_axis.time_max[idx],
]
)
t1, t2 = gti_in_bin.time_start, gti_in_bin.time_stop
integral = self.temporal_model.integral(t1, t2)
temp_eval[idx] = np.sum(integral)
value = (value.T * temp_eval).T
else:
if gti is not None:
integral = self.temporal_model.integral(gti.time_start, gti.time_stop)
value = value * np.sum(integral)
return value
[docs]
def copy(self, name=None, copy_data=False, **kwargs):
"""Copy sky model.
Parameters
----------
name : str, optional
Assign a new name to the copied model. Default is None.
copy_data : bool, optional
Copy the data arrays attached to models. Default is False.
**kwargs : dict
Keyword arguments forwarded to `SkyModel`.
Returns
-------
model : `SkyModel`
Copied sky model.
"""
if self.spatial_model is not None:
spatial_model = self.spatial_model.copy(copy_data=copy_data)
else:
spatial_model = None
if self.temporal_model is not None:
temporal_model = self.temporal_model.copy()
else:
temporal_model = None
kwargs.setdefault("name", make_name(name))
kwargs.setdefault("spectral_model", self.spectral_model.copy())
kwargs.setdefault("spatial_model", spatial_model)
kwargs.setdefault("temporal_model", temporal_model)
kwargs.setdefault("apply_irf", self.apply_irf.copy())
kwargs.setdefault("datasets_names", self.datasets_names)
kwargs.setdefault("covariance_data", self.covariance.data.copy())
return self.__class__(**kwargs)
[docs]
def to_dict(self, full_output=False):
"""Create dictionary for YAML serilisation."""
data = {}
data["name"] = self.name
data["type"] = self.tag[0]
if self.apply_irf != self._apply_irf_default:
data["apply_irf"] = self.apply_irf
if self.datasets_names is not None:
data["datasets_names"] = self.datasets_names
data.update(self.spectral_model.to_dict(full_output))
if self.spatial_model is not None:
data.update(self.spatial_model.to_dict(full_output))
if self.temporal_model is not None:
data.update(self.temporal_model.to_dict(full_output))
return data
[docs]
@classmethod
def from_dict(cls, data, **kwargs):
"""Create SkyModel from dictionary."""
from gammapy.modeling.models import (
SPATIAL_MODEL_REGISTRY,
SPECTRAL_MODEL_REGISTRY,
TEMPORAL_MODEL_REGISTRY,
)
model_class = SPECTRAL_MODEL_REGISTRY.get_cls(data["spectral"]["type"])
spectral_model = model_class.from_dict({"spectral": data["spectral"]})
spatial_data = data.get("spatial")
if spatial_data is not None:
model_class = SPATIAL_MODEL_REGISTRY.get_cls(spatial_data["type"])
spatial_model = model_class.from_dict({"spatial": spatial_data})
else:
spatial_model = None
temporal_data = data.get("temporal")
if temporal_data is not None:
model_class = TEMPORAL_MODEL_REGISTRY.get_cls(temporal_data["type"])
temporal_model = model_class.from_dict({"temporal": temporal_data})
else:
temporal_model = None
return cls(
name=data["name"],
spatial_model=spatial_model,
spectral_model=spectral_model,
temporal_model=temporal_model,
apply_irf=data.get("apply_irf", cls._apply_irf_default),
datasets_names=data.get("datasets_names"),
)
def __str__(self):
str_ = f"{self.__class__.__name__}\n\n"
str_ += "\t{:26}: {}\n".format("Name", self.name)
str_ += "\t{:26}: {}\n".format("Datasets names", self.datasets_names)
str_ += "\t{:26}: {}\n".format(
"Spectral model type", self.spectral_model.__class__.__name__
)
if self.spatial_model is not None:
spatial_type = self.spatial_model.__class__.__name__
else:
spatial_type = ""
str_ += "\t{:26}: {}\n".format("Spatial model type", spatial_type)
if self.temporal_model is not None:
temporal_type = self.temporal_model.__class__.__name__
else:
temporal_type = ""
str_ += "\t{:26}: {}\n".format("Temporal model type", temporal_type)
str_ += "\tParameters:\n"
info = _get_parameters_str(self.parameters)
lines = info.split("\n")
str_ += "\t" + "\n\t".join(lines[:-1])
str_ += "\n\n"
return str_.expandtabs(tabsize=2)
[docs]
@classmethod
def create(cls, spectral_model, spatial_model=None, temporal_model=None, **kwargs):
"""Create a model instance.
Parameters
----------
spectral_model : str
Tag to create spectral model.
spatial_model : str, optional
Tag to create spatial model. Default is None.
temporal_model : str, optional
Tag to create temporal model. Default is None.
**kwargs : dict
Keyword arguments passed to `SkyModel`.
Returns
-------
model : SkyModel
Sky model.
"""
spectral_model = Model.create(spectral_model, model_type="spectral")
if spatial_model:
spatial_model = Model.create(spatial_model, model_type="spatial")
if temporal_model:
temporal_model = Model.create(temporal_model, model_type="temporal")
return cls(
spectral_model=spectral_model,
spatial_model=spatial_model,
temporal_model=temporal_model,
**kwargs,
)
[docs]
def freeze(self, model_type=None):
"""Freeze parameters depending on model type.
Parameters
----------
model_type : {None, "spatial", "spectral", "temporal"}
Freeze all parameters or only spatial/spectral/temporal.
Default is None, such that all parameters are frozen.
"""
if model_type is None:
self.parameters.freeze_all()
else:
model = getattr(self, f"{model_type}_model")
model.freeze()
[docs]
def unfreeze(self, model_type=None):
"""Restore parameters frozen status to default depending on model type.
Parameters
----------
model_type : {None, "spatial", "spectral", "temporal"}
Restore frozen status to default for all parameters or only spatial/spectral/temporal.
Default is None, such that all parameters are restored to default frozen status.
"""
if model_type is None:
for model_type in ["spectral", "spatial", "temporal"]:
self.unfreeze(model_type)
else:
model = getattr(self, f"{model_type}_model")
if model:
model.unfreeze()
[docs]
class FoVBackgroundModel(ModelBase):
"""Field of view background model.
The background model holds the correction parameters applied to
the instrumental background attached to a `MapDataset` or
`SpectrumDataset`.
Parameters
----------
dataset_name : str
Dataset name.
spectral_model : `~gammapy.modeling.models.SpectralModel`, Optional
Normalized spectral model.
Default is `~gammapy.modeling.models.PowerLawNormSpectralModel`
spatial_model : `~gammapy.modeling.models.SpatialModel`, Optional
Unitless Spatial model (unit is dropped on evaluation if defined).
Default is None.
"""
tag = ["FoVBackgroundModel", "fov-bkg"]
def __init__(
self,
dataset_name,
spectral_model=None,
spatial_model=None,
covariance_data=None,
):
# TODO: remove this in v2.0
if isinstance(dataset_name, SpectralModel):
warnings.warn(
"dataset_name has been made first argument since v1.3.",
GammapyDeprecationWarning,
stacklevel=2,
)
buf = dataset_name
dataset_name = spectral_model
spectral_model = buf
self.datasets_names = [dataset_name]
if spectral_model is None:
spectral_model = PowerLawNormSpectralModel()
if not spectral_model.is_norm_spectral_model:
raise ValueError("A norm spectral model is required.")
self._spatial_model = spatial_model
self._spectral_model = spectral_model
super().__init__(covariance_data=covariance_data)
[docs]
@staticmethod
def contributes(*args, **kwargs):
"""FoV background models always contribute."""
return True
@property
def spectral_model(self):
"""Spectral norm model."""
return self._spectral_model
@property
def spatial_model(self):
"""Spatial norm model."""
return self._spatial_model
@property
def _models(self):
models = self.spectral_model, self.spatial_model
return [model for model in models if model is not None]
@property
def name(self):
"""Model name."""
return self.datasets_names[0] + "-bkg"
@property
def parameters(self):
"""Model parameters."""
parameters = []
parameters.append(self.spectral_model.parameters)
if self.spatial_model is not None:
parameters.append(self.spatial_model.parameters)
return Parameters.from_stack(parameters)
@property
def parameters_unique_names(self):
"""List of unique parameter names. Return formatted as par_type.par_name."""
names = []
for model in self._models:
for par_name in model.parameters_unique_names:
components = [model.type, par_name]
name = ".".join(components)
names.append(name)
return names
def __str__(self):
str_ = f"{self.__class__.__name__}\n\n"
str_ += "\t{:26}: {}\n".format("Name", self.name)
str_ += "\t{:26}: {}\n".format("Datasets names", self.datasets_names)
str_ += "\t{:26}: {}\n".format(
"Spectral model type", self.spectral_model.__class__.__name__
)
if self.spatial_model is not None:
str_ += "\t{:26}: {}\n".format(
"Spatial model type", self.spatial_model.__class__.__name__
)
str_ += "\tParameters:\n"
info = _get_parameters_str(self.parameters)
lines = info.split("\n")
str_ += "\t" + "\n\t".join(lines[:-1])
str_ += "\n\n"
return str_.expandtabs(tabsize=2)
[docs]
def evaluate_geom(self, geom):
"""Evaluate map."""
coords = geom.get_coord(sparse=True)
return self.evaluate(**coords._data)
[docs]
def evaluate(self, energy, lon=None, lat=None):
"""Evaluate model."""
value = self.spectral_model(energy)
if self.spatial_model is not None:
if lon is not None and lat is not None:
if self.spatial_model.is_energy_dependent:
return self.spatial_model(lon, lat, energy).value * value
else:
return self.spatial_model(lon, lat).value * value
else:
raise ValueError(
"lon and lat are required if a spatial model is defined"
)
else:
return value
[docs]
def copy(self, name=None, copy_data=False, **kwargs):
"""Copy the `FoVBackgroundModel` instance.
Parameters
----------
name : str, optional
Ignored, present for API compatibility.
Default is None.
copy_data : bool, optional
Ignored, present for API compatibility.
Default is False.
**kwargs : dict
Keyword arguments forwarded to `FoVBackgroundModel`.
Returns
-------
model : `FoVBackgroundModel`
Copied FoV background model.
"""
kwargs.setdefault("spectral_model", self.spectral_model.copy())
kwargs.setdefault("dataset_name", self.datasets_names[0])
kwargs.setdefault("covariance_data", self.covariance.data.copy())
if self.spatial_model is not None:
kwargs.setdefault("spatial_model", self.spatial_model.copy())
return self.__class__(**kwargs)
[docs]
def to_dict(self, full_output=False):
data = {}
data["type"] = self.tag[0]
data["datasets_names"] = self.datasets_names
data.update(self.spectral_model.to_dict(full_output=full_output))
if self.spatial_model is not None:
data.update(self.spatial_model.to_dict(full_output))
return data
[docs]
@classmethod
def from_dict(cls, data, **kwargs):
"""Create model from dictionary.
Parameters
----------
data : dict
Data dictionary.
"""
from gammapy.modeling.models import (
SPATIAL_MODEL_REGISTRY,
SPECTRAL_MODEL_REGISTRY,
)
spectral_data = data.get("spectral")
if spectral_data is not None:
model_class = SPECTRAL_MODEL_REGISTRY.get_cls(spectral_data["type"])
spectral_model = model_class.from_dict({"spectral": spectral_data})
else:
spectral_model = None
spatial_data = data.get("spatial")
if spatial_data is not None:
model_class = SPATIAL_MODEL_REGISTRY.get_cls(spatial_data["type"])
spatial_model = model_class.from_dict({"spatial": spatial_data})
else:
spatial_model = None
datasets_names = data.get("datasets_names")
if datasets_names is None:
raise ValueError("FoVBackgroundModel must define a dataset name")
if len(datasets_names) > 1:
raise ValueError("FoVBackgroundModel can only be assigned to one dataset")
return cls(
spatial_model=spatial_model,
spectral_model=spectral_model,
dataset_name=datasets_names[0],
)
[docs]
def reset_to_default(self):
"""Reset parameter values to default."""
values = self.spectral_model.default_parameters.value
self.spectral_model.parameters.value = values
[docs]
def freeze(self, model_type="spectral"):
"""Freeze model parameters."""
if model_type is None or model_type == "spectral":
self._spectral_model.freeze()
[docs]
def unfreeze(self, model_type="spectral"):
"""Restore parameters frozen status to default."""
if model_type is None or model_type == "spectral":
self._spectral_model.unfreeze()
[docs]
class TemplateNPredModel(ModelBase):
"""Background model.
Create a new map by a tilt and normalization on the available map.
Parameters
----------
map : `~gammapy.maps.Map`
Background model map.
spectral_model : `~gammapy.modeling.models.SpectralModel`
Normalized spectral model.
Default is `~gammapy.modeling.models.PowerLawNormSpectralModel`.
copy_data : bool
Create a deepcopy of the map data or directly use the original. Default is True.
Use False to save memory in case of large maps.
spatial_model : `~gammapy.modeling.models.SpatialModel`
Unitless Spatial model (unit is dropped on evaluation if defined).
Default is None.
"""
tag = "TemplateNPredModel"
map = LazyFitsData(cache=True)
def __init__(
self,
map,
spectral_model=None,
name=None,
filename=None,
datasets_names=None,
copy_data=True,
spatial_model=None,
covariance_data=None,
):
if isinstance(map, Map):
axis = map.geom.axes["energy"]
if axis.node_type != "edges":
raise ValueError(
'Need an integrated map, energy axis node_type="edges"'
)
if copy_data:
self.map = map.copy()
else:
self.map = map
self._name = make_name(name)
self.filename = filename
if spectral_model is None:
spectral_model = PowerLawNormSpectralModel()
spectral_model.tilt.frozen = True
self.spatial_model = spatial_model
self.spectral_model = spectral_model
if isinstance(datasets_names, str):
datasets_names = [datasets_names]
if isinstance(datasets_names, list):
if len(datasets_names) != 1:
raise ValueError(
"Currently background models can only be assigned to one dataset."
)
self.datasets_names = datasets_names
super().__init__(covariance_data=covariance_data)
[docs]
def copy(self, name=None, copy_data=False, **kwargs):
"""Copy template npred model.
Parameters
----------
name : str, optional
Assign a new name to the copied model.
Default is None.
copy_data : bool, optional
Copy the data arrays attached to models.
Default is False.
**kwargs : dict
Keyword arguments forwarded to `TemplateNPredModel`.
Returns
-------
model : `TemplateNPredModel`
Copied template npred model.
"""
name = make_name(name)
kwargs.setdefault("map", self.map)
kwargs.setdefault("spectral_model", self.spectral_model.copy())
kwargs.setdefault("filename", self.filename)
kwargs.setdefault("datasets_names", self.datasets_names)
kwargs.setdefault("covariance_data", self.covariance.data.copy())
return self.__class__(name=name, copy_data=copy_data, **kwargs)
@property
def name(self):
return self._name
@property
def energy_center(self):
"""True energy axis bin centers as a `~astropy.units.Quantity`."""
energy_axis = self.map.geom.axes["energy"]
energy = energy_axis.center
return energy[:, np.newaxis, np.newaxis]
@property
def spectral_model(self):
"""Spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
return self._spectral_model
@spectral_model.setter
def spectral_model(self, model):
if not (model is None or isinstance(model, SpectralModel)):
raise TypeError(f"Invalid type: {model!r}")
self._spectral_model = model
@property
def _models(self):
models = self.spectral_model, self.spatial_model
return [model for model in models if model is not None]
@property
def parameters(self):
parameters = []
parameters.append(self.spectral_model.parameters)
return Parameters.from_stack(parameters)
@property
def parameters_unique_names(self):
"""List of unique parameter names. Return formatted as par_type.par_name."""
names = []
for model in self._models:
for par_name in model.parameters_unique_names:
components = [model.type, par_name]
name = ".".join(components)
names.append(name)
return names
[docs]
def evaluate(self):
"""Evaluate background model.
Returns
-------
background_map : `~gammapy.maps.Map`
Background evaluated on the Map.
"""
value = self.spectral_model(self.energy_center).value
back_values = self.map.data * value
if self.spatial_model is not None:
value = self.spatial_model.evaluate_geom(self.map.geom).value
back_values *= value
return self.map.copy(data=back_values)
[docs]
def to_dict(self, full_output=False):
data = {}
data["name"] = self.name
data["type"] = self.tag
if self.spatial_model is not None:
data["spatial"] = self.spatial_model.to_dict(full_output)["spatial"]
data["spectral"] = self.spectral_model.to_dict(full_output)["spectral"]
if self.filename is not None:
data["filename"] = self.filename
if self.datasets_names is not None:
data["datasets_names"] = self.datasets_names
return data
[docs]
def write(self, overwrite=False):
"""
Write the map.
Parameters
----------
overwrite: bool, optional
Overwrite existing file.
Default is False, which will raise a warning if the template file exists already.
"""
if self.filename is None:
raise IOError("Missing filename")
elif os.path.isfile(make_path(self.filename)) and not overwrite:
log.warning("Template file already exits, and overwrite is False")
else:
self.map.write(self.filename, overwrite=overwrite)
[docs]
@classmethod
def from_dict(cls, data, **kwargs):
from gammapy.modeling.models import (
SPATIAL_MODEL_REGISTRY,
SPECTRAL_MODEL_REGISTRY,
)
spectral_data = data.get("spectral")
if spectral_data is not None:
model_class = SPECTRAL_MODEL_REGISTRY.get_cls(spectral_data["type"])
spectral_model = model_class.from_dict({"spectral": spectral_data})
else:
spectral_model = None
spatial_data = data.get("spatial")
if spatial_data is not None:
model_class = SPATIAL_MODEL_REGISTRY.get_cls(spatial_data["type"])
spatial_model = model_class.from_dict({"spatial": spatial_data})
else:
spatial_model = None
if "filename" in data:
bkg_map = Map.read(data["filename"])
else:
raise IOError("Missing filename")
return cls(
map=bkg_map,
spatial_model=spatial_model,
spectral_model=spectral_model,
name=data["name"],
datasets_names=data.get("datasets_names"),
filename=data.get("filename"),
)
[docs]
def cutout(self, position, width, mode="trim", name=None):
"""Cutout background model.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
Center position of the cutout region.
width : tuple of `~astropy.coordinates.Angle`
Angular sizes of the region in (lon, lat) in that specific order.
If only one value is passed, a square region is extracted.
mode : {'trim', 'partial', 'strict'}
Mode option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`.
Default is "trim".
name : str, optional
Name of the returned background model. Default is None.
Returns
-------
cutout : `TemplateNPredModel`
Cutout background model.
"""
cutout_kwargs = {"position": position, "width": width, "mode": mode}
bkg_map = self.map.cutout(**cutout_kwargs)
spectral_model = self.spectral_model.copy()
return self.__class__(bkg_map, spectral_model=spectral_model, name=name)
[docs]
def stack(self, other, weights=None, nan_to_num=True):
"""Stack background model in place.
Stacking the background model resets the current parameters values.
Parameters
----------
other : `TemplateNPredModel`
Other background model.
weights : float, optional
Weights. Default is None.
nan_to_num: bool, optional
Non-finite values are replaced by zero if True. Default is True.
"""
bkg = self.evaluate()
if nan_to_num:
bkg.data[~np.isfinite(bkg.data)] = 0
other_bkg = other.evaluate()
bkg.stack(other_bkg, weights=weights, nan_to_num=nan_to_num)
self.map = bkg
# reset parameter values
self.spectral_model.norm.value = 1
self.spectral_model.tilt.value = 0
[docs]
def slice_by_energy(self, energy_min=None, energy_max=None, name=None):
"""Select and slice model template in energy range
Parameters
----------
energy_min, energy_max : `~astropy.units.Quantity`
Energy bounds of the slice. Default is None.
name : str
Name of the sliced model. Default is None.
Returns
-------
model : `TemplateNpredModel`
Sliced Model.
"""
name = make_name(name)
energy_axis = self.map._geom.axes["energy"]
if energy_min is None:
energy_min = energy_axis.bounds[0]
if energy_max is None:
energy_max = energy_axis.bounds[1]
if name is None:
name = self.name
energy_min, energy_max = u.Quantity(energy_min), u.Quantity(energy_max)
group = energy_axis.group_table(edges=[energy_min, energy_max])
is_normal = group["bin_type"] == "normal "
group = group[is_normal]
slices = {
"energy": slice(int(group["idx_min"][0]), int(group["idx_max"][0]) + 1)
}
model = self.copy(name=name)
model.map = model.map.slice_by_idx(slices=slices)
return model
def __str__(self):
str_ = self.__class__.__name__ + "\n\n"
str_ += "\t{:26}: {}\n".format("Name", self.name)
str_ += "\t{:26}: {}\n".format("Datasets names", self.datasets_names)
str_ += "\tParameters:\n"
info = _get_parameters_str(self.parameters)
lines = info.split("\n")
str_ += "\t" + "\n\t".join(lines[:-1])
str_ += "\n\n"
return str_.expandtabs(tabsize=2)
@property
def position(self):
"""Position as a `~astropy.coordinates.SkyCoord`."""
return self.map.geom.center_skydir
@property
def evaluation_radius(self):
"""Evaluation radius as a `~astropy.coordinates.Angle`."""
return np.max(self.map.geom.width) / 2.0
[docs]
def freeze(self, model_type="spectral"):
"""Freeze model parameters."""
if model_type is None or model_type == "spectral":
self._spectral_model.freeze()
[docs]
def unfreeze(self, model_type="spectral"):
"""Restore parameters frozen status to default."""
if model_type is None or model_type == "spectral":
self._spectral_model.unfreeze()
[docs]
def create_fermi_isotropic_diffuse_model(filename, **kwargs):
"""Read Fermi isotropic diffuse model.
See `LAT Background models <https://fermi.gsfc.nasa.gov/ssc/data/access/lat/BackgroundModels.html>`__.
Parameters
----------
filename : str
Filename.
kwargs : dict
Keyword arguments forwarded to `TemplateSpectralModel`.
Returns
-------
diffuse_model : `SkyModel`
Fermi isotropic diffuse sky model.
"""
vals = np.loadtxt(make_path(filename))
energy = u.Quantity(vals[:, 0], "MeV", copy=COPY_IF_NEEDED)
values = u.Quantity(vals[:, 1], "MeV-1 s-1 cm-2", copy=COPY_IF_NEEDED)
kwargs.setdefault("interp_kwargs", {"fill_value": None})
spatial_model = ConstantSpatialModel()
spectral_model = (
TemplateSpectralModel(energy=energy, values=values, **kwargs)
* PowerLawNormSpectralModel()
)
return SkyModel(
spatial_model=spatial_model,
spectral_model=spectral_model,
name="fermi-diffuse-iso",
apply_irf={"psf": False, "exposure": True, "edisp": True},
)