Source code for gammapy.modeling.models.temporal

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Time-dependent models."""

import logging
import numpy as np
import scipy.interpolate
from astropy import units as u
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from gammapy.maps import MapAxis, RegionNDMap, TimeMapAxis
from astropy.utils import lazyproperty
from gammapy.modeling import Parameter
from gammapy.utils.compat import COPY_IF_NEEDED
from gammapy.utils.random import InverseCDFSampler, get_random_state
from gammapy.utils.scripts import make_path
from gammapy.utils.time import time_ref_from_dict, time_ref_to_dict
from .core import ModelBase, _build_parameters_from_dict

__all__ = [
    "ConstantTemporalModel",
    "ExpDecayTemporalModel",
    "GaussianTemporalModel",
    "GeneralizedGaussianTemporalModel",
    "LightCurveTemplateTemporalModel",
    "LinearTemporalModel",
    "PowerLawTemporalModel",
    "SineTemporalModel",
    "TemplatePhaseCurveTemporalModel",
    "TemporalModel",
]

log = logging.getLogger(__name__)


# TODO: make this a small ABC to define a uniform interface.
[docs] class TemporalModel(ModelBase): """Temporal model base class. Evaluates on `~astropy.time.Time` objects. """ _type = "temporal" def __init__(self, **kwargs): scale = kwargs.pop("scale", "utc") if scale not in Time.SCALES: raise ValueError( f"{scale} is not a valid time scale. Choose from {Time.SCALES}" ) self.scale = scale super().__init__(**kwargs)
[docs] def __call__(self, time, energy=None): """Evaluate model. Parameters ---------- time : `~astropy.time.Time` Time object. energy : `~astropy.units.Quantity`, optional Energy. Default is None. Returns ------- values : `~astropy.units.Quantity` Model values. """ kwargs = {par.name: par.quantity for par in self.parameters} if energy is not None: kwargs["energy"] = energy time = Time(time, scale=self.scale).mjd * u.d return self.evaluate(time, **kwargs)
@property def type(self): return self._type @property def is_energy_dependent(self): return False @property def reference_time(self): """Reference time in MJD.""" return Time(self.t_ref.value, format="mjd", scale=self.scale) @reference_time.setter def reference_time(self, t_ref): """Reference time.""" if not isinstance(t_ref, Time): raise TypeError(f"{t_ref} is not a {Time} object") self.t_ref.value = Time(t_ref, scale=self.scale).mjd
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serilisation.""" data = super().to_dict(full_output) data["temporal"]["scale"] = self.scale return data
[docs] @classmethod def from_dict(cls, data, **kwargs): """Create a temporal model from a dictionary. Parameters ---------- data : dict Dictionary containing the model parameters. **kwargs : dict Keyword arguments passed to `~TemporalModel.from_parameters`. """ kwargs = kwargs or {} temporal_data = data.get("temporal", data) if "scale" in temporal_data: kwargs["scale"] = temporal_data["scale"] return super().from_dict(data, **kwargs)
[docs] @staticmethod def time_sum(t_min, t_max): """Total time between t_min and t_max. Parameters ---------- t_min, t_max : `~astropy.time.Time` Lower and upper bound of integration range. Returns ------- time_sum : `~astropy.time.TimeDelta` Summed time in the intervals. """ diff = t_max - t_min return np.sum(diff).to(u.day)
[docs] def plot(self, time_range, ax=None, n_points=100, **kwargs): """ Plot the temporal model. Parameters ---------- time_range : `~astropy.time.Time` Times to plot the model. ax : `~matplotlib.axes.Axes`, optional Axis to plot on. n_points : int Number of bins to plot model. Default is 100. **kwargs : dict Keywords forwarded to `~matplotlib.pyplot.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ time_min, time_max = time_range time_axis = TimeMapAxis.from_time_bounds( time_min=time_min, time_max=time_max, nbin=n_points ) m = RegionNDMap.create(region=None, axes=[time_axis]) kwargs.setdefault("marker", "None") kwargs.setdefault("ls", "-") kwargs.setdefault("xerr", None) m.quantity = self(time_axis.time_mid).to(u.one) ax = m.plot(ax=ax, **kwargs) ax.set_ylabel("Norm / A.U.") return ax
[docs] def sample_time(self, n_events, t_min, t_max, t_delta="1 s", random_state=0): """Sample arrival times of events. Parameters ---------- n_events : int Number of events to sample. t_min : `~astropy.time.Time` Start time of the sampling. t_max : `~astropy.time.Time` Stop time of the sampling. t_delta : `~astropy.units.Quantity`, optional Time step used for sampling of the temporal model. Default is 1 s. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Default is 0. Returns ------- time : `~astropy.units.Quantity` Array with times of the sampled events. """ t_min = Time(t_min, scale=self.scale) t_max = Time(t_max, scale=self.scale) t_delta = u.Quantity(t_delta) random_state = get_random_state(random_state) ontime = (t_max - t_min).to("s") n_step = (ontime / t_delta).to_value("").item() t_step = ontime / n_step indices = np.arange(n_step + 1) steps = indices * t_step t = Time(t_min + steps, format="mjd") pdf = self(t) sampler = InverseCDFSampler(pdf=pdf, random_state=random_state) time_pix = sampler.sample(n_events)[0] time = np.interp(time_pix, indices, steps) return t_min + time
[docs] def integral(self, t_min, t_max, oversampling_factor=100, **kwargs): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. oversampling_factor : int, optional Oversampling factor to be used for numerical integration. Default is 100. Returns ------- norm : float Integrated flux norm on the given time intervals. """ t_values, steps = np.linspace( t_min.mjd, t_max.mjd, oversampling_factor, retstep=True, axis=-1 ) times = Time(t_values, format="mjd", scale=self.scale) values = self(times) integral = np.sum(values, axis=-1) * steps return integral / self.time_sum(t_min, t_max).to_value("d")
[docs] class ConstantTemporalModel(TemporalModel): """Constant temporal model. For more information see :ref:`constant-temporal-model`. """ tag = ["ConstantTemporalModel", "const"]
[docs] @staticmethod def evaluate(time): """Evaluate at given times.""" return np.ones(time.shape) * u.one
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : `~astropy.units.Quantity` Integrated flux norm on the given time intervals. """ return (t_max - t_min) / self.time_sum(t_min, t_max)
[docs] class LinearTemporalModel(TemporalModel): """Temporal model with a linear variation. For more information see :ref:`linear-temporal-model`. Parameters ---------- alpha : float Constant term of the baseline flux. Default is 1. beta : `~astropy.units.Quantity` Time variation coefficient of the flux. Default is 0. t_ref : `~astropy.units.Quantity` The reference time in mjd. Frozen per default, at 2000-01-01. """ tag = ["LinearTemporalModel", "linear"] alpha = Parameter("alpha", 1.0, frozen=False) beta = Parameter("beta", 0.0, unit="d-1", frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
[docs] @staticmethod def evaluate(time, alpha, beta, t_ref): """Evaluate at given times.""" return alpha + beta * (time - t_ref)
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters alpha = pars["alpha"] beta = pars["beta"].quantity t_ref = self.reference_time value = alpha * (t_max - t_min) + beta / 2.0 * ( (t_max - t_ref) * (t_max - t_ref) - (t_min - t_ref) * (t_min - t_ref) ) return value / self.time_sum(t_min, t_max)
[docs] class ExpDecayTemporalModel(TemporalModel): r"""Temporal model with an exponential decay. For more information see :ref:`expdecay-temporal-model`. Parameters ---------- t0 : `~astropy.units.Quantity` Decay timescale. Default is 1 day. t_ref : `~astropy.units.Quantity` The reference time in mjd. Frozen per default, at 2000-01-01. """ tag = ["ExpDecayTemporalModel", "exp-decay"] t0 = Parameter("t0", "1 d", frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
[docs] @staticmethod def evaluate(time, t0, t_ref): """Evaluate at given times.""" return np.exp(-(time - t_ref) / t0)
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters t0 = pars["t0"].quantity t_ref = self.reference_time value = self.evaluate(t_max, t0, t_ref) - self.evaluate(t_min, t0, t_ref) return -t0 * value / self.time_sum(t_min, t_max)
[docs] class GaussianTemporalModel(TemporalModel): r"""A Gaussian temporal profile. For more information see :ref:`gaussian-temporal-model`. Parameters ---------- t_ref : `~astropy.units.Quantity` The reference time in mjd at the peak. Default is 2000-01-01. sigma : `~astropy.units.Quantity` Width of the gaussian profile. Default is 1 day. """ tag = ["GaussianTemporalModel", "gauss"] _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False) sigma = Parameter("sigma", "1 d", frozen=False)
[docs] @staticmethod def evaluate(time, t_ref, sigma): return np.exp(-((time - t_ref) ** 2) / (2 * sigma**2))
[docs] def integral(self, t_min, t_max, **kwargs): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters sigma = pars["sigma"].quantity t_ref = self.reference_time norm = np.sqrt(np.pi / 2) * sigma u_min = (t_min - t_ref) / (np.sqrt(2) * sigma) u_max = (t_max - t_ref) / (np.sqrt(2) * sigma) integral = norm * (scipy.special.erf(u_max) - scipy.special.erf(u_min)) return integral / self.time_sum(t_min, t_max)
[docs] class GeneralizedGaussianTemporalModel(TemporalModel): r"""A generalized Gaussian temporal profile. For more information see :ref:`generalized-gaussian-temporal-model`. Parameters ---------- t_ref : `~astropy.units.Quantity` The time of the pulse's maximum intensity. Default is 2000-01-01. t_rise : `~astropy.units.Quantity` Rise time constant. Default is 1 day. t_decay : `~astropy.units.Quantity` Decay time constant. Default is 1 day. eta : `~astropy.units.Quantity` Inverse pulse sharpness -> higher values implies a more peaked pulse. Default is 1/2. """ tag = ["GeneralizedGaussianTemporalModel", "gengauss"] _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False) t_rise = Parameter("t_rise", "1d", frozen=False) t_decay = Parameter("t_decay", "1d", frozen=False) eta = Parameter("eta", 1 / 2, unit="", frozen=False)
[docs] @staticmethod def evaluate(time, t_ref, t_rise, t_decay, eta): val_rise = np.exp( -0.5 * (np.abs(u.Quantity(time - t_ref, "d")) ** (1 / eta)) / (t_rise ** (1 / eta)) ) val_decay = np.exp( -0.5 * (np.abs(u.Quantity(time - t_ref, "d")) ** (1 / eta)) / (t_decay ** (1 / eta)) ) val = np.where(time < t_ref, val_rise, val_decay) return val
[docs] class LightCurveTemplateTemporalModel(TemporalModel): """Temporal light curve model. The lightcurve is given at specific times (and optionally energies) as a ``norm`` It can be serialised either as an astropy table or a `~gammapy.maps.RegionNDMap` The ``norm`` is supposed to be a unit-less multiplicative factor in the model, to be multiplied with a spectral model. The model does linear interpolation for times between the given ``(time, energy, norm)`` values. When the temporal model is energy-dependent, the default interpolation scheme is linear with a log scale for the values. The interpolation method and scale values can be changed with the ``method`` and ``values_scale`` arguments. For more information see :ref:`LightCurve-temporal-model`. Examples -------- Read an example light curve object: >>> from gammapy.modeling.models import LightCurveTemplateTemporalModel >>> path = '$GAMMAPY_DATA/tests/models/light_curve/lightcrv_PKSB1222+216.fits' >>> light_curve = LightCurveTemplateTemporalModel.read(path) Show basic information about the lightcurve: >>> print(light_curve) LightCurveTemplateTemporalModel model summary: Reference time: 59000.49919925926 MJD Start time: 58999.99919925926 MJD End time: 61862.99919925926 MJD Norm min: 0.01551196351647377 Norm max: 1.0 <BLANKLINE> Compute ``norm`` at a given time: >>> from astropy.time import Time >>> t = Time(59001.195, format="mjd") >>> light_curve.evaluate(t) <Quantity [0.02288737]> Compute mean ``norm`` in a given time interval: >>> import astropy.units as u >>> t_r = Time(59000.5, format='mjd') >>> t_min = t_r + [1, 4, 8] * u.d >>> t_max = t_r + [1.5, 6, 9] * u.d >>> light_curve.integral(t_min, t_max) <Quantity [0.00375698, 0.0143724 , 0.00688029]> """ tag = ["LightCurveTemplateTemporalModel", "template"] _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True) def __init__(self, map, t_ref=None, filename=None, method=None, values_scale=None): if (map.data < 0).any(): log.warning("Map has negative values. Check and fix this!") self.map = map.copy() super().__init__() if t_ref: self.reference_time = t_ref self.filename = filename if method is None: method = "linear" if values_scale is None: if self.is_energy_dependent: values_scale = "log" else: values_scale = "lin" self.method = method self.values_scale = values_scale def __str__(self): start_time = self.t_ref.quantity + self.map.geom.axes["time"].edges[0] end_time = self.t_ref.quantity + self.map.geom.axes["time"].edges[-1] norm_min = np.nanmin(self.map.data) norm_max = np.nanmax(self.map.data) prnt = ( f"{self.__class__.__name__} model summary:\n " f"Reference time: {self.t_ref.value} MJD \n " f"Start time: {start_time.value} MJD \n " f"End time: {end_time.value} MJD \n " f"Norm min: {norm_min} \n" f"Norm max: {norm_max}" ) if self.is_energy_dependent: energy_min = self.map.geom.axes["energy"].center[0] energy_max = self.map.geom.axes["energy"].center[-1] prnt1 = f"Energy min: {energy_min} \n" f"Energy max: {energy_max} \n" prnt = prnt + prnt1 return prnt @property def is_energy_dependent(self): """Whether the model is energy dependent.""" return self.map.geom.has_energy_axis
[docs] @classmethod def from_table(cls, table, filename=None): """Create a template model from an astropy table. Parameters ---------- table : `~astropy.table.Table` Table containing the template model. filename : str, optional Name of input file. Default is None. Returns ------- model : `LightCurveTemplateTemporalModel` Light curve template model. """ columns = [_.lower() for _ in table.colnames] if "time" not in columns: raise ValueError("A TIME column is necessary") t_ref = time_ref_from_dict(table.meta, scale="utc") nodes = table["TIME"] ax_unit = nodes.quantity.unit if not ax_unit.is_equivalent("d"): try: ax_unit = u.Unit(table.meta["TIMEUNIT"]) except KeyError: raise ValueError("Time unit not found in the table") time_axis = MapAxis.from_nodes(nodes=nodes, name="time", unit=ax_unit) axes = [time_axis] m = RegionNDMap.create(region=None, axes=axes, data=table["NORM"]) return cls(m, t_ref=t_ref, filename=filename)
[docs] @classmethod def read(cls, filename, format="table"): """Read a template model. Parameters ---------- filename : str Name of file to read. format : {"table", "map"} Format of the input file. Returns ------- model : `LightCurveTemplateTemporalModel` Light curve template model. """ filename = str(make_path(filename)) if format == "table": table = Table.read(filename) return cls.from_table(table, filename=filename) elif format == "map": with fits.open(filename) as hdulist: header = hdulist["SKYMAP_BANDS"].header t_ref = time_ref_from_dict(header) # TODO : Ugly hack to prevent creating a TimeMapAxis # By default, MapAxis.from_table tries to create a # TimeMapAxis, failing which, it creates a normal MapAxis. # This ugly hack forces the fail. We need a normal Axis to # have the evaluate method work hdulist["SKYMAP_BANDS"].header.pop("MJDREFI") m = RegionNDMap.from_hdulist(hdulist) return cls(m, t_ref=t_ref, filename=filename) else: raise ValueError( f"Not a valid format: '{format}', choose from: {'table', 'map'}" )
[docs] def to_table(self): """Convert model to an astropy table.""" if self.is_energy_dependent: raise NotImplementedError("Not supported for energy dependent models") table = Table( data=[self.map.geom.axes["time"].center, self.map.quantity], names=["TIME", "NORM"], meta=time_ref_to_dict(self.reference_time, scale=self.scale), ) return table
[docs] def write(self, filename, format="table", overwrite=False): """Write a model to disk as per the specified format. Parameters: filename : str Name of output file. format : {"table" or "map"} If format is "table", it is serialised as a `~astropy.table.Table`. If "map", then it is serialised as a `~gammapy.maps.RegionNDMap`. Default is "table". overwrite : bool, optional Overwrite existing file. Default is False. """ if self.filename is None: raise IOError("Missing filename") if format == "table": table = self.to_table() table.write(filename, overwrite=overwrite) elif format == "map": # RegionNDMap.from_hdulist does not update the header hdulist = self.map.to_hdulist() hdulist["SKYMAP_BANDS"].header.update( time_ref_to_dict(self.reference_time, scale=self.scale) ) hdulist.writeto(filename, overwrite=overwrite) else: raise ValueError("Not a valid format, choose from ['map', 'table']")
[docs] def evaluate(self, time, t_ref=None, energy=None): """Evaluate the model at given coordinates. Parameters ---------- time: `~astropy.time.Time` Time. t_ref: `~gammapy.modeling.Parameter`, optional Reference time for the model. Default is None. energy: `~astropy.units.Quantity`, optional Energy. Default is None. Returns ------- values : `~astropy.units.Quantity` Model values. """ if t_ref is None: t_ref = self.reference_time t = (time - t_ref).to_value(self.map.geom.axes["time"].unit) coords = {"time": t} if self.is_energy_dependent: if energy is None: energy = self.map.geom.axes["energy"].center coords["energy"] = energy.reshape(-1, 1) val = self.map.interp_by_coord( coords, method=self.method, values_scale=self.values_scale ) val = np.clip(val, 0, a_max=None) return u.Quantity(val, unit=self.map.unit, copy=COPY_IF_NEEDED)
[docs] def integral(self, t_min, t_max, oversampling_factor=100, **kwargs): if self.is_energy_dependent: raise NotImplementedError( "Integral not supported for energy dependent models" ) return super().integral(t_min, t_max, oversampling_factor, **kwargs)
[docs] @classmethod def from_dict(cls, data): data = data["temporal"] filename = data["filename"] format = data.get("format", "table") return cls.read(filename, format)
def _guess_format(self): if self.is_energy_dependent: format = "map" else: format = "table" log.info("Inferred format: " + format) return format
[docs] def to_dict(self, full_output=False, format=None): """Create dictionary for YAML serialisation.""" data = super().to_dict(full_output) if format is None: format = self._guess_format() data["temporal"]["filename"] = self.filename data["temporal"]["format"] = format data["temporal"]["unit"] = str(self.map.unit) return data
[docs] def plot(self, time_range, ax=None, n_points=100, energy=None, **kwargs): """ Plot the temporal model. Parameters ---------- time_range : `~astropy.time.Time` Times to plot the model. ax : `~matplotlib.axes.Axes`, optional Axis to plot on. Default is None. n_points : int, optional Number of bins to plot model. Default is 100. energy : `~astropy.units.quantity`, optional Energies to compute the model at for energy dependent models. Default is None. **kwargs : dict Keywords forwarded to `~matplotlib.pyplot.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ if not self.is_energy_dependent: super().plot(time_range=time_range, ax=ax, n_points=n_points, **kwargs) else: time_min, time_max = Time(time_range, scale=self.scale) time_axis = TimeMapAxis.from_time_bounds( time_min=time_min, time_max=time_max, nbin=n_points ) if energy is None: energy_axis = self.map.geom.axes["energy"] else: energy_axis = MapAxis.from_nodes( nodes=energy, name="energy", interp="log" ) m = RegionNDMap.create(region=None, axes=[time_axis, energy_axis]) kwargs.setdefault("marker", "None") kwargs.setdefault("ls", "-") m.quantity = self.evaluate( time=time_axis.time_mid, energy=energy_axis.center ) ax = m.plot(axis_name="time", ax=ax, **kwargs) ax.set_ylabel("Norm / A.U.") return ax, m
[docs] class PowerLawTemporalModel(TemporalModel): """Temporal model with a Power Law decay. For more information see :ref:`powerlaw-temporal-model`. Parameters ---------- alpha : float Decay time power. Default is 1. t_ref: `~astropy.units.Quantity` The reference time in mjd. Frozen by default, at 2000-01-01. t0: `~astropy.units.Quantity` The scaling time in mjd. Fixed by default, at 1 day. """ tag = ["PowerLawTemporalModel", "powerlaw"] alpha = Parameter("alpha", 1.0, frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True) t0 = Parameter("t0", "1 d", frozen=True)
[docs] @staticmethod def evaluate(time, alpha, t_ref, t0=1 * u.day): """Evaluate at given times.""" return np.power((time - t_ref) / t0, alpha)
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters alpha = pars["alpha"].quantity t0 = pars["t0"].quantity t_ref = self.reference_time if alpha != -1: value = self.evaluate(t_max, alpha + 1.0, t_ref, t0) - self.evaluate( t_min, alpha + 1.0, t_ref, t0 ) return t0 / (alpha + 1.0) * value / self.time_sum(t_min, t_max) else: value = np.log((t_max - t_ref) / (t_min - t_ref)) return t0 * value / self.time_sum(t_min, t_max)
[docs] class SineTemporalModel(TemporalModel): """Temporal model with a sinusoidal modulation. For more information see :ref:`sine-temporal-model`. Parameters ---------- amp : float Amplitude of the sinusoidal function. Default is 1. t_ref: `~astropy.units.Quantity` The reference time in mjd. Default is 2000-01-01. omega: `~astropy.units.Quantity` Pulsation of the signal. Default is 1 rad/day. """ tag = ["SineTemporalModel", "sinus"] amp = Parameter("amp", 1.0, frozen=False) omega = Parameter("omega", "1. rad/day", frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False)
[docs] @staticmethod def evaluate(time, amp, omega, t_ref): """Evaluate at given times.""" return 1.0 + amp * np.sin(omega * (time - t_ref))
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters omega = pars["omega"].quantity.to_value("rad/day") amp = pars["amp"].value t_ref = self.reference_time value = (t_max - t_min).to_value(u.day) - amp / omega * ( np.sin(omega * (t_max - t_ref).to_value(u.day)) - np.sin(omega * (t_min - t_ref).to_value(u.day)) ) return value / self.time_sum(t_min, t_max).to_value(u.day)
[docs] class TemplatePhaseCurveTemporalModel(TemporalModel): """Temporal phase curve model. A timing solution is used to compute the phase corresponding to time and a template phase curve is used to determine the associated ``norm``. The phasecurve is given as a table with columns ``phase`` and ``norm``. The ``norm`` is supposed to be a unit-less multiplicative factor in the model, to be multiplied with a spectral model. The model does linear interpolation for times between the given ``(phase, norm)`` values. The implementation currently uses `scipy.interpolate. InterpolatedUnivariateSpline`, using degree ``k=1`` to get linear interpolation. This class also contains an ``integral`` method, making the computation of mean fluxes for a given time interval a one-liner. Parameters ---------- table : `~astropy.table.Table` A table with 'PHASE' vs 'NORM'. filename : str The name of the file containing the phase curve. t_ref : `~astropy.units.Quantity` The reference time in mjd. Default is 48442.5 mjd. phi_ref : `~astropy.units.Quantity` The phase at reference time. Default is 0. f0 : `~astropy.units.Quantity` The frequency at t_ref in s-1. Default is 29.946923 s-1. f1 : `~astropy.units.Quantity` The frequency derivative at t_ref in s-2. Default is 0 s-2. f2 : `~astropy.units.Quantity` The frequency second derivative at t_ref in s-3. Default is 0 s-3. """ tag = ["TemplatePhaseCurveTemporalModel", "template-phase"] _t_ref_default = Time(48442.5, format="mjd") _phi_ref_default = 0 _f0_default = 29.946923 * u.s**-1 _f1_default = 0 * u.s**-2 _f2_default = 0 * u.s**-3 t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True) phi_ref = Parameter("phi_ref", _phi_ref_default, unit="", frozen=True) f0 = Parameter("f0", _f0_default, frozen=True) f1 = Parameter("f1", _f1_default, frozen=True) f2 = Parameter("f2", _f2_default, frozen=True) def __init__(self, table, filename=None, **kwargs): self.table = self._normalise_table(table) if filename is not None: filename = str(make_path(filename)) self.filename = filename super().__init__(**kwargs) @staticmethod def _normalise_table(table): x = table["PHASE"].data y = table["NORM"].data interpolator = scipy.interpolate.InterpolatedUnivariateSpline( x, y, k=1, ext=2, bbox=[0.0, 1.0] ) integral = interpolator.integral(0, 1) table["NORM"] *= 1 / integral return table
[docs] @classmethod def read( cls, path, t_ref=_t_ref_default.mjd * u.d, phi_ref=_phi_ref_default, f0=_f0_default, f1=_f1_default, f2=_f2_default, ): """Read phasecurve model table from FITS file. Beware : this does **not** read parameters. They will be set to defaults. Parameters ---------- path : str or `~pathlib.Path` Filename with path. """ filename = str(make_path(path)) return cls( Table.read(filename), filename=filename, t_ref=t_ref, phi_ref=phi_ref, f0=f0, f1=f1, f2=f2, )
@staticmethod def _time_to_phase(time, t_ref, phi_ref, f0, f1, f2): """Convert time to phase given timing solution parameters. Parameters ---------- time : `~astropy.units.Quantity` The time at which to compute the phase. t_ref : `~astropy.units.Quantity` The reference time in mjd. phi_ref : `~astropy.units.Quantity` The phase at reference time. Default is 0. f0 : `~astropy.units.Quantity` The frequency at t_ref in s-1. f1 : `~astropy.units.Quantity` The frequency derivative at t_ref in s-2. f2 : `~astropy.units.Quantity` The frequency second derivative at t_ref in s-3. Returns ------- phase : float Phase. period_number : int Number of period since t_ref. """ delta_t = time - t_ref phase = ( phi_ref + delta_t * (f0 + delta_t / 2.0 * (f1 + delta_t / 3 * f2)) ).to_value("") period_number = np.floor(phase) phase -= period_number return phase, period_number
[docs] def write(self, path=None, overwrite=False): if path is None: path = self.filename if path is None: raise ValueError(f"filename is required for {self.tag}") else: self.filename = str(make_path(path)) self.table.write(self.filename, overwrite=overwrite)
@lazyproperty def _interpolator(self): x = self.table["PHASE"].data y = self.table["NORM"].data return scipy.interpolate.InterpolatedUnivariateSpline( x, y, k=1, ext=2, bbox=[0.0, 1.0] )
[docs] def evaluate(self, time, t_ref, phi_ref, f0, f1, f2): phase, _ = self._time_to_phase(time, t_ref, phi_ref, f0, f1, f2) return self._interpolator(phase) * u.one
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. Returns ------- norm: The model integrated flux. """ kwargs = {par.name: par.quantity for par in self.parameters} ph_min, n_min = self._time_to_phase(t_min.mjd * u.d, **kwargs) ph_max, n_max = self._time_to_phase(t_max.mjd * u.d, **kwargs) # here we assume that the frequency does not change during the integration boundaries delta_t = (t_min - self.reference_time).to(u.d) frequency = self.f0.quantity + delta_t * ( self.f1.quantity + delta_t * self.f2.quantity / 2 ) # Compute integral of one phase phase_integral = self._interpolator.antiderivative()( 1 ) - self._interpolator.antiderivative()(0) # Multiply by the total number of phases phase_integral *= n_max - n_min - 1 # Compute integrals before first full phase and after the last full phase end_integral = self._interpolator.antiderivative()( ph_max ) - self._interpolator.antiderivative()(0) start_integral = self._interpolator.antiderivative()( 1 ) - self._interpolator.antiderivative()(ph_min) # Divide by Jacobian (here we neglect variations of frequency during the integration period) total = phase_integral + start_integral + end_integral # Normalize by total integration time n_period = (self.time_sum(t_min, t_max) * frequency).to("") if int(n_period) == 0: n_period = 1 integral_norm = total / n_period return integral_norm
[docs] @classmethod def from_dict(cls, data): params = _build_parameters_from_dict( data["temporal"]["parameters"], cls.default_parameters ) filename = data["temporal"]["filename"] kwargs = {par.name: par for par in params} return cls.read(filename, **kwargs)
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serialisation.""" model_dict = super().to_dict() model_dict["temporal"]["filename"] = self.filename return model_dict
[docs] def plot_phasogram(self, ax=None, n_points=100, **kwargs): """ Plot phasogram of the phase model. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axis to plot on. Default is None. n_points : int, optional Number of bins to plot model. Default is 100. **kwargs : dict Keywords forwarded to `~matplotlib.pyplot.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ phase_axis = MapAxis.from_bounds(0.0, 1, nbin=n_points, name="Phase", unit="") m = RegionNDMap.create(region=None, axes=[phase_axis]) kwargs.setdefault("marker", "None") kwargs.setdefault("ls", "-") kwargs.setdefault("xerr", None) m.quantity = self._interpolator(phase_axis.center) ax = m.plot(ax=ax, **kwargs) ax.set_ylabel("Norm / A.U.") return ax
[docs] def sample_time(self, n_events, t_min, t_max, t_delta="1 s", random_state=0): """Sample arrival times of events. To fully cover the phase range, t_delta is the minimum between the input and product of the period at 0.5*(t_min + t_max) and the table bin size. Parameters ---------- n_events : int Number of events to sample. t_min : `~astropy.time.Time` Start time of the sampling. t_max : `~astropy.time.Time` Stop time of the sampling. t_delta : `~astropy.units.Quantity` Time step used for sampling of the temporal model. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- time : `~astropy.units.Quantity` Array with times of the sampled events. """ t_delta = u.Quantity(t_delta) # Determine period at the mid time t_mid = Time(t_min, scale=self.scale) + 0.5 * (t_max - t_min) delta_t = (t_mid - self.reference_time).to(u.d) frequency = self.f0.quantity + delta_t * ( self.f1.quantity + delta_t * self.f2.quantity / 2 ) period = 1 / frequency # Take minimum time delta between user input and the period divided by the number of rows in the model table # this assumes that phase values are evenly spaced. t_delta = np.minimum(period / len(self.table), t_delta) return super().sample_time(n_events, t_min, t_max, t_delta, random_state)