Source code for gammapy.estimators.points.lightcurve

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
from itertools import repeat
import numpy as np
import astropy.units as u
import gammapy.utils.parallel as parallel
from gammapy.data import GTI
from gammapy.datasets import Datasets
from gammapy.datasets.actors import DatasetsActor
from gammapy.maps import LabelMapAxis, Map, TimeMapAxis
from gammapy.modeling.models import Models
from gammapy.utils.pbar import progress_bar
from .core import FluxPoints
from .sed import FluxPointsEstimator

__all__ = ["LightCurveEstimator"]

log = logging.getLogger(__name__)


[docs] class LightCurveEstimator(FluxPointsEstimator): """Estimate light curve. The estimator will apply flux point estimation on the source model component to datasets in each of the provided time intervals. The normalisation, ``norm``, is the only parameter of the source model left free to vary. Other model components can be left free to vary with the reoptimize option. If no time intervals are provided, the estimator will use the time intervals defined by the datasets GTIs. To be included in the estimation, the dataset must have their GTI fully overlapping a time interval. Time intervals without any dataset GTI fully overlapping will be dropped. They will not be stored in the final lightcurve `FluxPoints` object. Parameters ---------- time_intervals : list of `~astropy.time.Time` objects Start and stop time for each interval to compute the LC. source : str or int, optional For which source in the model to compute the flux points. Default is 0, i.e. the first source of the models. atol : `~astropy.units.Quantity`, optional Tolerance value for time comparison with different scale. Default 1e-6 sec. n_sigma : float, optional Number of sigma to use for asymmetric error computation. Must be a positive value. Default is 1. n_sigma_ul : float, optional Number of sigma to use for upper limit computation. Must be a positive value. Default is 2. selection_optional : list of str, optional Which steps to execute. Available options are: * "all": all the optional steps are executed. * "errn-errp": estimate asymmetric errors. * "ul": estimate upper limits. * "scan": estimate fit statistic profiles. Default is None so the optional steps are not executed. energy_edges : list of `~astropy.units.Quantity`, optional Edges of the lightcurve energy bins. The resulting bin edges won't be exactly equal to the input ones, but rather the closest values to the energy axis edges of the parent dataset. Default is None: apply the estimator in each energy bin of the parent dataset. For further explanation see :ref:`estimators`. fit : `~gammapy.modeling.Fit`, optional Fit instance specifying the backend and fit options. If None, the `~gammapy.modeling.Fit` instance is created internally. Default is None. reoptimize : bool, optional If True the free parameters of the other models are fitted in each bin independently, together with the norm of the source of interest (but the other parameters of the source of interest are kept frozen). If False only the norm of the source of interest if fitted, and all other parameters are frozen at their current values. Default is False. stack_over_time_interval : bool, optional Whether to stack datasets within each time interval. Default is False. The predicted background counts from all datasets in the given interval will be stacked together, and the final background model (if any) will not have any free parameters. Available only if ``reoptimize`` is False. n_jobs : int, optional Number of processes used in parallel for the computation. Default is one, unless `~gammapy.utils.parallel.N_JOBS_DEFAULT` was modified. The number of jobs is limited to the number of physical CPUs. parallel_backend : {"multiprocessing", "ray"}, optional Which backend to use for multiprocessing. Defaults to `~gammapy.utils.parallel.BACKEND_DEFAULT`. norm : ~gammapy.modeling.Parameter` or dict, optional Norm parameter used for the fit. Default is None and a new parameter is created automatically, with value=1, name="norm", scan_min=0.2, scan_max=5, and scan_n_values = 11. By default, the min and max are not set and derived from the source model, unless the source model does not have one and only one norm parameter. If a dict is given the entries should be a subset of `~gammapy.modeling.Parameter` arguments. Examples -------- For a usage example, see :doc:`/tutorials/analysis-time/light_curve` tutorial. Notes ----- In case of failure of upper limits computation (e.g. nan), see the User Guide: :ref:`dropdown-UL`. """ tag = "LightCurveEstimator"
[docs] def __init__( self, time_intervals=None, atol="1e-6 s", stack_over_time_interval=False, **kwargs, ): self.time_intervals = time_intervals self.atol = u.Quantity(atol) self.stack_over_time_interval = stack_over_time_interval super().__init__(**kwargs)
[docs] def run(self, datasets): """Run light curve extraction. Normalize integral and energy flux between emin and emax. Notes ----- The progress bar can be displayed for this function. Parameters ---------- datasets : list of `~gammapy.datasets.SpectrumDataset` or `~gammapy.datasets.MapDataset` Spectrum or Map datasets. Returns ------- lightcurve : `~gammapy.estimators.FluxPoints` Light curve flux points. """ if not isinstance(datasets, DatasetsActor): datasets = Datasets(datasets) if self.time_intervals is None: gti = datasets.gti else: gti = GTI.from_time_intervals(self.time_intervals) gti = gti.union(overlap_ok=False, merge_equal=False) rows = [] valid_intervals = [] parallel_datasets = [] dataset_names = datasets.names for idx, (t_min, t_max) in enumerate( progress_bar(gti.time_intervals, desc="Time intervals selection") ): datasets_to_fit = datasets.select_time( time_min=t_min, time_max=t_max, atol=self.atol ) if len(datasets_to_fit) == 0: log.info( f"No Dataset for the time interval {t_min} to {t_max}. Skipping interval." ) continue valid_intervals.append([t_min, t_max]) if self.stack_over_time_interval and self.reoptimize: raise ValueError( "It is not possible to stack the light curve if ``reoptimize=True``. You should set this value to False." ) if self.stack_over_time_interval and not self.reoptimize: dataset_reduced = datasets_to_fit.stack_reduce(name="stacked") models = Models(datasets.models.copy()) # Remove background models already applied in stack_reduce for model_name in models.background_models.values(): models.remove(model_name) for dataset in datasets: models.reassign(dataset.name, dataset_reduced.name) dataset_reduced.models = models datasets_to_fit = Datasets([dataset_reduced]) dataset_names = datasets_to_fit.names if self.n_jobs == 1: fp = self.estimate_time_bin_flux(datasets_to_fit, dataset_names) rows.append(fp) else: parallel_datasets.append(datasets_to_fit) if self.n_jobs > 1: self._update_child_jobs() rows = parallel.run_multiprocessing( self.estimate_time_bin_flux, zip( parallel_datasets, repeat(dataset_names), ), backend=self.parallel_backend, pool_kwargs=dict(processes=self.n_jobs), task_name="Time intervals", ) if len(rows) == 0: raise ValueError("LightCurveEstimator: No datasets in time intervals") gti = GTI.from_time_intervals(valid_intervals) axis = TimeMapAxis.from_gti(gti=gti) return FluxPoints.from_stack( maps=rows, axis=axis, )
[docs] @staticmethod def expand_map(m, dataset_names): """Expand map in dataset axis. Parameters ---------- map : `~gammapy.maps.Map` Map to expand. dataset_names : list of str Dataset names. Returns ------- map : `~gammapy.maps.Map` Expanded map. """ label_axis = LabelMapAxis(labels=dataset_names, name="dataset") geom = m.geom.replace_axis(axis=label_axis) result = Map.from_geom(geom, data=np.nan) coords = m.geom.get_coord(sparse=True) result.set_by_coord(coords, vals=m.data) return result
[docs] def estimate_time_bin_flux(self, datasets, dataset_names=None): """Estimate flux point for a single energy group. Parameters ---------- datasets : `~gammapy.datasets.Datasets` List of dataset objects. dataset_names : list of str Dataset names. Returns ------- result : `FluxPoints` Resulting flux points. """ estimator = self.copy() estimator.n_jobs = self._n_child_jobs fp = estimator._run_flux_points(datasets) if dataset_names: for name in ["counts", "npred", "npred_excess"]: fp._data[name] = self.expand_map( fp._data[name], dataset_names=dataset_names ) return fp
def _run_flux_points(self, datasets): return super().run(datasets)