Source code for gammapy.cube.fit

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import numpy as np
from astropy.utils import lazyproperty
import astropy.units as u
from ..utils.fitting import Parameters
from ..stats import cash, cstat
from ..maps import Map, MapAxis
from .models import SkyModel, SkyModels

__all__ = ["MapEvaluator", "MapDataset"]

log = logging.getLogger(__name__)


CUTOUT_MARGIN = 0.1 * u.deg


[docs]class MapDataset: """Perform sky model likelihood fit on maps. Parameters ---------- model : `~gammapy.cube.models.SkyModel` or `~gammapy.cube.models.SkyModels` Source sky models. counts : `~gammapy.maps.WcsNDMap` Counts cube exposure : `~gammapy.maps.WcsNDMap` Exposure cube mask : `~gammapy.maps.WcsNDMap` Mask to apply to the likelihood. psf : `~gammapy.cube.PSFKernel` PSF kernel edisp : `~gammapy.irf.EnergyDispersion` Energy dispersion background_model: `~gammapy.cube.models.BackgroundModel` or `~gammapy.cube.models.BackgroundModel` Background models to use for the fit. likelihood : {"cash", "cstat"} Likelihood function to use for the fit. evaluation_mode : {"local", "global"} Model evaluation mode. The "local" mode evaluates the model components on smaller grids to save computation time. This mode is recommended for local optimization algorithms. The "global" evaluation mode evaluates the model components on the full map. This mode is recommended for global optimization algorithms. """ def __init__( self, model, counts=None, exposure=None, mask=None, psf=None, edisp=None, background_model=None, likelihood="cash", evaluation_mode="local", ): if mask is not None and mask.data.dtype != np.dtype("bool"): raise ValueError("mask data must have dtype bool") self.evaluation_mode = evaluation_mode self.model = model self.counts = counts self.exposure = exposure self.mask = mask self.psf = psf self.edisp = edisp self.background_model = background_model if likelihood == "cash": self._stat = cash elif likelihood == "cstat": self._stat = cstat else: raise ValueError( "Not a valid fit statistic. Choose between 'cash' and 'cstat'." ) @property def model(self): """Sky model to fit instance of `SkyModel` or `SkyModels`""" return self._model @model.setter def model(self, model): """Set sky model to fit""" if isinstance(model, SkyModel): model = SkyModels([model]) self._model = model evaluators = [] for component in model.skymodels: evaluator = MapEvaluator(component, evaluation_mode=self.evaluation_mode) evaluators.append(evaluator) self._evaluators = evaluators @lazyproperty def parameters(self): """List of parameters (`~gammapy.utils.fitting.Parameters`)""" if self.background_model: parameters = Parameters( self.model.parameters.parameters + self.background_model.parameters.parameters ) else: parameters = Parameters(self.model.parameters.parameters) return parameters @property def _geom(self): if self.counts is not None: return self.counts.geom else: return self.background_model.map.geom @property def data_shape(self): """Shape of the counts data""" return self.counts.data.shape
[docs] def npred(self): """Compute predicted counts from the source and background model. Returns ------- npred : `Map` Map of predicted counts. """ if self.background_model: npred_total = self.background_model.evaluate() else: npred_total = Map.from_geom(self._geom) for evaluator in self._evaluators: # if the model component drifts out of its support the evaluator has # has to be updated if evaluator.needs_update: evaluator.update(self.exposure, self.psf, self.edisp, self._geom) npred = evaluator.compute_npred() # avoid slow fancy indexing, when the shape is equivalent if npred.data.shape == npred_total.data.shape: npred_total += npred.data else: npred_total.data[evaluator.coords_idx] += npred.data return npred_total
[docs] def likelihood_per_bin(self): """Likelihood per bin given the current model parameters""" return self._stat(n_on=self.counts.data, mu_on=self.npred().data)
[docs] def likelihood(self, parameters, mask=None): """Total likelihood given the current model parameters. Parameters ---------- mask : `~numpy.ndarray` Mask to be combined with the dataset mask. """ if self.mask is None and mask is None: stat = self.likelihood_per_bin() elif self.mask is None: stat = self.likelihood_per_bin()[mask] elif mask is None: stat = self.likelihood_per_bin()[self.mask.data] else: stat = self.likelihood_per_bin()[mask & self.mask.data] return np.sum(stat, dtype=np.float64)
[docs]class MapEvaluator: """Sky model evaluation on maps. This evaluates a sky model on a 3D map and convolves with the IRFs, and returns a map of the predicted counts. Note that background counts are not added. For now, we only make it work for 3D WCS maps with an energy axis. No HPX, no other axes, those can be added later here or via new separate model evaluator classes. Parameters ---------- model : `~gammapy.cube.models.SkyModel` Sky model exposure : `~gammapy.maps.Map` Exposure map psf : `~gammapy.cube.PSFKernel` PSF kernel edisp : `~gammapy.irf.EnergyDispersion` Energy dispersion evaluation_mode : {"local", "global"} Model evaluation mode. """ _cached_properties = [ "lon_lat", "solid_angle", "bin_volume", "geom_reco", "energy_bin_width", "energy_edges", "energy_center", ] def __init__(self, model=None, exposure=None, psf=None, edisp=None, evaluation_mode="local"): self.model = model self.exposure = exposure self.psf = psf self.edisp = edisp if evaluation_mode not in ["local", "global"]: raise ValueError("Not a valid model evaluation mode. Choose between 'local' and 'global'") self.evaluation_mode = evaluation_mode @property def geom(self): """True energy map geometry (`~gammapy.maps.MapGeom`)""" return self.exposure.geom @lazyproperty def geom_reco(self): """Reco energy map geometry (`~gammapy.maps.MapGeom`)""" edges = self.edisp.e_reco.bins e_reco_axis = MapAxis.from_edges( edges=edges, name="energy", unit=self.edisp.e_reco.unit, interp=self.edisp.e_reco.interpolation_mode, ) return self.geom_image.to_cube(axes=[e_reco_axis]) @property def geom_image(self): """Image map geometry (`~gammapy.maps.MapGeom`)""" return self.geom.to_image() @lazyproperty def energy_center(self): """True energy axis bin centers (`~astropy.units.Quantity`)""" energy_axis = self.geom.get_axis_by_name("energy") energy = energy_axis.center * energy_axis.unit return energy[:, np.newaxis, np.newaxis] @lazyproperty def energy_edges(self): """True energy axis bin edges (`~astropy.units.Quantity`)""" energy_axis = self.geom.get_axis_by_name("energy") energy = energy_axis.edges * energy_axis.unit return energy[:, np.newaxis, np.newaxis] @lazyproperty def energy_bin_width(self): """Energy axis bin widths (`astropy.units.Quantity`)""" return np.diff(self.energy_edges, axis=0) @lazyproperty def lon_lat(self): """Spatial coordinate pixel centers (``lon, lat`` tuple of `~astropy.units.Quantity`). """ coord = self.geom_image.get_coord() frame = self.model.frame if frame is not None: coordsys = "CEL" if frame == "icrs" else "GAL" if not coord.coordsys == coordsys: coord = coord.to_coordsys(coordsys) return ( u.Quantity(coord.lon, "deg", copy=False), u.Quantity(coord.lat, "deg", copy=False), ) @property def lon(self): return self.lon_lat[0] @property def lat(self): return self.lon_lat[1] @lazyproperty def solid_angle(self): """Solid angle per pixel""" return self.geom.solid_angle() @lazyproperty def bin_volume(self): """Map pixel bin volume (solid angle times energy bin width).""" omega = self.solid_angle de = self.energy_bin_width return omega * de @property def coords(self): """Return evaluator coords""" lon, lat = self.lon_lat if self.edisp: energy = self.edisp.e_reco.nodes[:, np.newaxis, np.newaxis] else: energy = self.energy_center return {"lon": lon.value, "lat": lat.value, "energy": energy} @property def needs_update(self): """Check whether the model component has drifted away from its support.""" if self.exposure is None: update = True else: position = self.model.position separation = self._init_position.separation(position) update = separation > (self.model.evaluation_radius + CUTOUT_MARGIN) return update
[docs] def update(self, exposure, psf, edisp, geom): """Update MapEvaluator, based on the current position of the model component. Parameters ---------- exposure : `Map` Exposure map. psf : `PSFMap` PSF map. edisp : `EdispMap` Edisp map. geom : `MapGeom` Reference geometry of the data. """ log.info("Updating model evaluator") # cache current position of the model component self._init_position = self.model.position # TODO: lookup correct Edisp for this component self.edisp = edisp # TODO: lookup correct PSF for this component self.psf = psf if self.evaluation_mode == "local": width = np.max(psf.psf_kernel_map.geom.width) + 2 * (self.model.evaluation_radius + CUTOUT_MARGIN) self.exposure = exposure.cutout(position=self.model.position, width=width) # Reset cached quantities for cached_property in self._cached_properties: self.__dict__.pop(cached_property, None) self.coords_idx = geom.coord_to_idx(self.coords)[::-1] else: self.exposure = exposure
[docs] def compute_dnde(self): """Compute model differential flux at map pixel centers. Returns ------- model_map : `~gammapy.maps.Map` Sky cube with data filled with evaluated model values. Units: ``cm-2 s-1 TeV-1 deg-2`` """ coord = (self.lon, self.lat, self.energy_center) dnde = self.model.evaluate(*coord) return dnde
[docs] def compute_flux(self): """Compute model integral flux over map pixel volumes. For now, we simply multiply dnde with bin volume. """ dnde = self.compute_dnde() volume = self.bin_volume flux = dnde * volume return flux
[docs] def apply_exposure(self, flux): """Compute npred cube For now just divide flux cube by exposure """ npred = (flux * self.exposure.quantity).to_value("") return Map.from_geom(self.geom, data=npred, unit="")
[docs] def apply_psf(self, npred): """Convolve npred cube with PSF""" return npred.convolve(self.psf)
[docs] def apply_edisp(self, npred): """Convolve map data with energy dispersion. Parameters ---------- npred : `~gammapy.maps.Map` Predicted counts in true energy bins Returns ------- npred_reco : `~gammapy.maps.Map` Predicted counts in reco energy bins """ loc = npred.geom.get_axis_index_by_name("energy") data = np.rollaxis(npred.data, loc, len(npred.data.shape)) data = np.dot(data, self.edisp.pdf_matrix) data = np.rollaxis(data, -1, loc) return Map.from_geom(self.geom_reco, data=data, unit="")
[docs] def compute_npred(self): """ Evaluate model predicted counts. Returns ------- npred : `~gammapy.maps.Map` Predicted counts on the map (in reco energy bins) """ flux = self.compute_flux() npred = self.apply_exposure(flux) if self.psf is not None: npred = self.apply_psf(npred) if self.edisp is not None: npred = self.apply_edisp(npred) return npred