Source code for gammapy.modeling.models.temporal

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Time-dependent models."""
import logging
import numpy as np
import scipy.interpolate
from astropy import units as u
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from astropy.utils import lazyproperty
from gammapy.maps import MapAxis, RegionNDMap, TimeMapAxis
from gammapy.modeling import Parameter
from gammapy.utils.random import InverseCDFSampler, get_random_state
from gammapy.utils.scripts import make_path
from gammapy.utils.time import time_ref_from_dict, time_ref_to_dict
from .core import ModelBase, _build_parameters_from_dict

__all__ = [
    "ConstantTemporalModel",
    "ExpDecayTemporalModel",
    "GaussianTemporalModel",
    "GeneralizedGaussianTemporalModel",
    "LightCurveTemplateTemporalModel",
    "LinearTemporalModel",
    "PowerLawTemporalModel",
    "SineTemporalModel",
    "TemplatePhaseCurveTemporalModel",
    "TemporalModel",
]

log = logging.getLogger(__name__)


# TODO: make this a small ABC to define a uniform interface.
[docs]class TemporalModel(ModelBase): """Temporal model base class. Evaluates on `~astropy.time.Time` objects. """ _type = "temporal" def __init__(self, **kwargs): scale = kwargs.pop("scale", "utc") if scale not in Time.SCALES: raise ValueError( f"{scale} is not a valid time scale. Choose from {Time.SCALES}" ) self.scale = scale super().__init__(**kwargs)
[docs] def __call__(self, time, energy=None): """Evaluate model. Parameters ---------- time : `~astropy.time.Time` Time object. energy : `~astropy.units.Quantity`, optional Energy. Default is None. Returns ------- values : `~astropy.units.Quantity` Model values. """ kwargs = {par.name: par.quantity for par in self.parameters} if energy is not None: kwargs["energy"] = energy time = Time(time, scale=self.scale).mjd * u.d return self.evaluate(time, **kwargs)
@property def type(self): return self._type @property def is_energy_dependent(self): return False @property def reference_time(self): """Reference time in MJD.""" return Time(self.t_ref.value, format="mjd", scale=self.scale) @reference_time.setter def reference_time(self, t_ref): """Reference time.""" if not isinstance(t_ref, Time): raise TypeError(f"{t_ref} is not a {Time} object") self.t_ref.value = Time(t_ref, scale=self.scale).mjd
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serilisation.""" data = super().to_dict(full_output) data["temporal"]["scale"] = self.scale return data
[docs] @classmethod def from_dict(cls, data, **kwargs): """Create a temporal model from a dictionary. Parameters ---------- data : dict Dictionary containing the model parameters. **kwargs : dict Keyword arguments passed to `~TemporalModel.from_parameters`. """ kwargs = kwargs or {} temporal_data = data.get("temporal", data) if "scale" in temporal_data: kwargs["scale"] = temporal_data["scale"] return super().from_dict(data, **kwargs)
[docs] @staticmethod def time_sum(t_min, t_max): """Total time between t_min and t_max. Parameters ---------- t_min, t_max : `~astropy.time.Time` Lower and upper bound of integration range. Returns ------- time_sum : `~astropy.time.TimeDelta` Summed time in the intervals. """ diff = t_max - t_min return np.sum(diff).to(u.day)
[docs] def plot(self, time_range, ax=None, n_points=100, **kwargs): """ Plot the temporal model. Parameters ---------- time_range : `~astropy.time.Time` Times to plot the model. ax : `~matplotlib.axes.Axes`, optional Axis to plot on. n_points : int Number of bins to plot model. Default is 100. **kwargs : dict Keywords forwarded to `~matplotlib.pyplot.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ time_min, time_max = time_range time_axis = TimeMapAxis.from_time_bounds( time_min=time_min, time_max=time_max, nbin=n_points ) m = RegionNDMap.create(region=None, axes=[time_axis]) kwargs.setdefault("marker", "None") kwargs.setdefault("ls", "-") kwargs.setdefault("xerr", None) m.quantity = self(time_axis.time_mid).to(u.one) ax = m.plot(ax=ax, **kwargs) ax.set_ylabel("Norm / A.U.") return ax
[docs] def sample_time(self, n_events, t_min, t_max, t_delta="1 s", random_state=0): """Sample arrival times of events. Parameters ---------- n_events : int Number of events to sample. t_min : `~astropy.time.Time` Start time of the sampling. t_max : `~astropy.time.Time` Stop time of the sampling. t_delta : `~astropy.units.Quantity`, optional Time step used for sampling of the temporal model. Default is 1 s. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Default is 0. Returns ------- time : `~astropy.units.Quantity` Array with times of the sampled events. """ t_min = Time(t_min, scale=self.scale) t_max = Time(t_max, scale=self.scale) t_delta = u.Quantity(t_delta) random_state = get_random_state(random_state) ontime = u.Quantity((t_max - t_min).sec, t_delta.unit) n_step = (ontime / t_delta).to_value("").item() t_step = ontime / n_step indices = np.arange(n_step + 1) steps = indices * t_step t = Time(t_min + steps, format="mjd") pdf = self(t) sampler = InverseCDFSampler(pdf=pdf, random_state=random_state) time_pix = sampler.sample(n_events)[0] time = np.interp(time_pix, indices, steps) return t_min + time
[docs] def integral(self, t_min, t_max, oversampling_factor=100, **kwargs): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. oversampling_factor : int, optional Oversampling factor to be used for numerical integration. Default is 100. Returns ------- norm : float Integrated flux norm on the given time intervals. """ t_values, steps = np.linspace( t_min.mjd, t_max.mjd, oversampling_factor, retstep=True, axis=-1 ) times = Time(t_values, format="mjd", scale=self.scale) values = self(times) integral = np.sum(values, axis=-1) * steps return integral / self.time_sum(t_min, t_max).to_value("d")
[docs]class ConstantTemporalModel(TemporalModel): """Constant temporal model. For more information see :ref:`constant-temporal-model`. """ tag = ["ConstantTemporalModel", "const"]
[docs] @staticmethod def evaluate(time): """Evaluate at given times.""" return np.ones(time.shape) * u.one
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : `~astropy.units.Quantity` Integrated flux norm on the given time intervals. """ return (t_max - t_min) / self.time_sum(t_min, t_max)
[docs]class LinearTemporalModel(TemporalModel): """Temporal model with a linear variation. For more information see :ref:`linear-temporal-model`. Parameters ---------- alpha : float Constant term of the baseline flux. Default is 1. beta : `~astropy.units.Quantity` Time variation coefficient of the flux. Default is 0. t_ref : `~astropy.units.Quantity` The reference time in mjd. Frozen per default, at 2000-01-01. """ tag = ["LinearTemporalModel", "linear"] alpha = Parameter("alpha", 1.0, frozen=False) beta = Parameter("beta", 0.0, unit="d-1", frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
[docs] @staticmethod def evaluate(time, alpha, beta, t_ref): """Evaluate at given times.""" return alpha + beta * (time - t_ref)
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters alpha = pars["alpha"] beta = pars["beta"].quantity t_ref = self.reference_time value = alpha * (t_max - t_min) + beta / 2.0 * ( (t_max - t_ref) * (t_max - t_ref) - (t_min - t_ref) * (t_min - t_ref) ) return value / self.time_sum(t_min, t_max)
[docs]class ExpDecayTemporalModel(TemporalModel): r"""Temporal model with an exponential decay. For more information see :ref:`expdecay-temporal-model`. Parameters ---------- t0 : `~astropy.units.Quantity` Decay timescale. Default is 1 day. t_ref : `~astropy.units.Quantity` The reference time in mjd. Frozen per default, at 2000-01-01. """ tag = ["ExpDecayTemporalModel", "exp-decay"] t0 = Parameter("t0", "1 d", frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
[docs] @staticmethod def evaluate(time, t0, t_ref): """Evaluate at given times.""" return np.exp(-(time - t_ref) / t0)
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters t0 = pars["t0"].quantity t_ref = self.reference_time value = self.evaluate(t_max, t0, t_ref) - self.evaluate(t_min, t0, t_ref) return -t0 * value / self.time_sum(t_min, t_max)
[docs]class GaussianTemporalModel(TemporalModel): r"""A Gaussian temporal profile. For more information see :ref:`gaussian-temporal-model`. Parameters ---------- t_ref : `~astropy.units.Quantity` The reference time in mjd at the peak. Default is 2000-01-01. sigma : `~astropy.units.Quantity` Width of the gaussian profile. Default is 1 day. """ tag = ["GaussianTemporalModel", "gauss"] _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False) sigma = Parameter("sigma", "1 d", frozen=False)
[docs] @staticmethod def evaluate(time, t_ref, sigma): return np.exp(-((time - t_ref) ** 2) / (2 * sigma**2))
[docs] def integral(self, t_min, t_max, **kwargs): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min : `~astropy.time.Time` Start times of observation. t_max : `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters sigma = pars["sigma"].quantity t_ref = self.reference_time norm = np.sqrt(np.pi / 2) * sigma u_min = (t_min - t_ref) / (np.sqrt(2) * sigma) u_max = (t_max - t_ref) / (np.sqrt(2) * sigma) integral = norm * (scipy.special.erf(u_max) - scipy.special.erf(u_min)) return integral / self.time_sum(t_min, t_max)
[docs]class GeneralizedGaussianTemporalModel(TemporalModel): r"""A generalized Gaussian temporal profile. For more information see :ref:`generalized-gaussian-temporal-model`. Parameters ---------- t_ref : `~astropy.units.Quantity` The time of the pulse's maximum intensity. Default is 2000-01-01. t_rise : `~astropy.units.Quantity` Rise time constant. Default is 1 day. t_decay : `~astropy.units.Quantity` Decay time constant. Default is 1 day. eta : `~astropy.units.Quantity` Inverse pulse sharpness -> higher values implies a more peaked pulse. Default is 1/2. """ tag = ["GeneralizedGaussianTemporalModel", "gengauss"] _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False) t_rise = Parameter("t_rise", "1d", frozen=False) t_decay = Parameter("t_decay", "1d", frozen=False) eta = Parameter("eta", 1 / 2, unit="", frozen=False)
[docs] @staticmethod def evaluate(time, t_ref, t_rise, t_decay, eta): val_rise = np.exp( -0.5 * (np.abs(u.Quantity(time - t_ref, "d")) ** (1 / eta)) / (t_rise ** (1 / eta)) ) val_decay = np.exp( -0.5 * (np.abs(u.Quantity(time - t_ref, "d")) ** (1 / eta)) / (t_decay ** (1 / eta)) ) val = np.where(time < t_ref, val_rise, val_decay) return val
[docs]class LightCurveTemplateTemporalModel(TemporalModel): """Temporal light curve model. The lightcurve is given at specific times (and optionally energies) as a ``norm`` It can be serialised either as an astropy table or a `~gammapy.maps.RegionNDMap` The ``norm`` is supposed to be a unit-less multiplicative factor in the model, to be multiplied with a spectral model. The model does linear interpolation for times between the given ``(time, energy, norm)`` values. When the temporal model is energy-dependent, the default interpolation scheme is linear with a log scale for the values. The interpolation method and scale values can be changed with the ``method`` and ``values_scale`` arguments. For more information see :ref:`LightCurve-temporal-model`. Examples -------- Read an example light curve object: >>> from gammapy.modeling.models import LightCurveTemplateTemporalModel >>> path = '$GAMMAPY_DATA/tests/models/light_curve/lightcrv_PKSB1222+216.fits' >>> light_curve = LightCurveTemplateTemporalModel.read(path) Show basic information about the lightcurve: >>> print(light_curve) LightCurveTemplateTemporalModel model summary: Reference time: 59000.49919925926 MJD Start time: 58999.99919925926 MJD End time: 61862.99919925926 MJD Norm min: 0.01551196351647377 Norm max: 1.0 <BLANKLINE> Compute ``norm`` at a given time: >>> from astropy.time import Time >>> t = Time(59001.195, format="mjd") >>> light_curve.evaluate(t) <Quantity [0.02288737]> Compute mean ``norm`` in a given time interval: >>> import astropy.units as u >>> t_r = Time(59000.5, format='mjd') >>> t_min = t_r + [1, 4, 8] * u.d >>> t_max = t_r + [1.5, 6, 9] * u.d >>> light_curve.integral(t_min, t_max) <Quantity [0.00375698, 0.0143724 , 0.00688029]> """ tag = ["LightCurveTemplateTemporalModel", "template"] _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True) def __init__(self, map, t_ref=None, filename=None, method=None, values_scale=None): if (map.data < 0).any(): log.warning("Map has negative values. Check and fix this!") self.map = map.copy() super().__init__() if t_ref: self.reference_time = t_ref self.filename = filename if method is None: method = "linear" if values_scale is None: if self.is_energy_dependent: values_scale = "log" else: values_scale = "lin" self.method = method self.values_scale = values_scale def __str__(self): start_time = self.t_ref.quantity + self.map.geom.axes["time"].edges[0] end_time = self.t_ref.quantity + self.map.geom.axes["time"].edges[-1] norm_min = np.nanmin(self.map.data) norm_max = np.nanmax(self.map.data) prnt = ( f"{self.__class__.__name__} model summary:\n " f"Reference time: {self.t_ref.value} MJD \n " f"Start time: {start_time.value} MJD \n " f"End time: {end_time.value} MJD \n " f"Norm min: {norm_min} \n" f"Norm max: {norm_max}" ) if self.is_energy_dependent: energy_min = self.map.geom.axes["energy"].center[0] energy_max = self.map.geom.axes["energy"].center[-1] prnt1 = f"Energy min: {energy_min} \n" f"Energy max: {energy_max} \n" prnt = prnt + prnt1 return prnt @property def is_energy_dependent(self): """Whether the model is energy dependent.""" return self.map.geom.has_energy_axis
[docs] @classmethod def from_table(cls, table, filename=None): """Create a template model from an astropy table. Parameters ---------- table : `~astropy.table.Table` Table containing the template model. filename : str, optional Name of input file. Default is None. Returns ------- model : `LightCurveTemplateTemporalModel` Light curve template model. """ columns = [_.lower() for _ in table.colnames] if "time" not in columns: raise ValueError("A TIME column is necessary") t_ref = time_ref_from_dict(table.meta, scale="utc") nodes = table["TIME"] ax_unit = nodes.quantity.unit if not ax_unit.is_equivalent("d"): try: ax_unit = u.Unit(table.meta["TIMEUNIT"]) except KeyError: raise ValueError("Time unit not found in the table") time_axis = MapAxis.from_nodes(nodes=nodes, name="time", unit=ax_unit) axes = [time_axis] m = RegionNDMap.create(region=None, axes=axes, data=table["NORM"]) return cls(m, t_ref=t_ref, filename=filename)
[docs] @classmethod def read(cls, filename, format="table"): """Read a template model. Parameters ---------- filename : str Name of file to read. format : {"table", "map"} Format of the input file. Returns ------- model : `LightCurveTemplateTemporalModel` Light curve template model. """ filename = str(make_path(filename)) if format == "table": table = Table.read(filename) return cls.from_table(table, filename=filename) elif format == "map": with fits.open(filename) as hdulist: header = hdulist["SKYMAP_BANDS"].header t_ref = time_ref_from_dict(header) # TODO : Ugly hack to prevent creating a TimeMapAxis # By default, MapAxis.from_table tries to create a # TimeMapAxis, failing which, it creates a normal MapAxis. # This ugly hack forces the fail. We need a normal Axis to # have the evaluate method work hdulist["SKYMAP_BANDS"].header.pop("MJDREFI") m = RegionNDMap.from_hdulist(hdulist) return cls(m, t_ref=t_ref, filename=filename) else: raise ValueError( f"Not a valid format: '{format}', choose from: {'table', 'map'}" )
[docs] def to_table(self): """Convert model to an astropy table.""" if self.is_energy_dependent: raise NotImplementedError("Not supported for energy dependent models") table = Table( data=[self.map.geom.axes["time"].center, self.map.quantity], names=["TIME", "NORM"], meta=time_ref_to_dict(self.reference_time, scale=self.scale), ) return table
[docs] def write(self, filename, format="table", overwrite=False): """Write a model to disk as per the specified format. Parameters: filename : str Name of output file. format : {"table" or "map"} If format is "table", it is serialised as a `~astropy.table.Table`. If "map", then it is serialised as a `~gammapy.maps.RegionNDMap`. Default is "table". overwrite : bool, optional Overwrite existing file. Default is False. """ if self.filename is None: raise IOError("Missing filename") if format == "table": table = self.to_table() table.write(filename, overwrite=overwrite) elif format == "map": # RegionNDMap.from_hdulist does not update the header hdulist = self.map.to_hdulist() hdulist["SKYMAP_BANDS"].header.update( time_ref_to_dict(self.reference_time, scale=self.scale) ) hdulist.writeto(filename, overwrite=overwrite) else: raise ValueError("Not a valid format, choose from ['map', 'table']")
[docs] def evaluate(self, time, t_ref=None, energy=None): """Evaluate the model at given coordinates. Parameters ---------- time: `~astropy.time.Time` Time. t_ref: `~gammapy.modeling.Parameter`, optional Reference time for the model. Default is None. energy: `~astropy.units.Quantity`, optional Energy. Default is None. Returns ------- values : `~astropy.units.Quantity` Model values. """ if t_ref is None: t_ref = self.reference_time t = (time - t_ref).to_value(self.map.geom.axes["time"].unit) coords = {"time": t} if self.is_energy_dependent: if energy is None: energy = self.map.geom.axes["energy"].center coords["energy"] = energy.reshape(-1, 1) val = self.map.interp_by_coord( coords, method=self.method, values_scale=self.values_scale ) val = np.clip(val, 0, a_max=None) return u.Quantity(val, unit=self.map.unit, copy=False)
[docs] def integral(self, t_min, t_max, oversampling_factor=100, **kwargs): if self.is_energy_dependent: raise NotImplementedError( "Integral not supported for energy dependent models" ) return super().integral(t_min, t_max, oversampling_factor, **kwargs)
[docs] @classmethod def from_dict(cls, data): data = data["temporal"] filename = data["filename"] format = data.get("format", "table") return cls.read(filename, format)
def _guess_format(self): if self.is_energy_dependent: format = "map" else: format = "table" log.info("Inferred format: " + format) return format
[docs] def to_dict(self, full_output=False, format=None): """Create dictionary for YAML serialisation.""" data = super().to_dict(full_output) if format is None: format = self._guess_format() data["temporal"]["filename"] = self.filename data["temporal"]["format"] = format data["temporal"]["unit"] = str(self.map.unit) return data
[docs] def plot(self, time_range, ax=None, n_points=100, energy=None, **kwargs): """ Plot the temporal model. Parameters ---------- time_range : `~astropy.time.Time` Times to plot the model. ax : `~matplotlib.axes.Axes`, optional Axis to plot on. Default is None. n_points : int, optional Number of bins to plot model. Default is 100. energy : `~astropy.units.quantity`, optional Energies to compute the model at for energy dependent models. Default is None. **kwargs : dict Keywords forwarded to `~matplotlib.pyplot.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ if not self.is_energy_dependent: super().plot(time_range=time_range, ax=ax, n_points=n_points, **kwargs) else: time_min, time_max = Time(time_range, scale=self.scale) time_axis = TimeMapAxis.from_time_bounds( time_min=time_min, time_max=time_max, nbin=n_points ) if energy is None: energy_axis = self.map.geom.axes["energy"] else: energy_axis = MapAxis.from_nodes( nodes=energy, name="energy", interp="log" ) m = RegionNDMap.create(region=None, axes=[time_axis, energy_axis]) kwargs.setdefault("marker", "None") kwargs.setdefault("ls", "-") m.quantity = self.evaluate( time=time_axis.time_mid, energy=energy_axis.center ) ax = m.plot(axis_name="time", ax=ax, **kwargs) ax.set_ylabel("Norm / A.U.") return ax, m
[docs]class PowerLawTemporalModel(TemporalModel): """Temporal model with a Power Law decay. For more information see :ref:`powerlaw-temporal-model`. Parameters ---------- alpha : float Decay time power. Default is 1. t_ref: `~astropy.units.Quantity` The reference time in mjd. Frozen by default, at 2000-01-01. t0: `~astropy.units.Quantity` The scaling time in mjd. Fixed by default, at 1 day. """ tag = ["PowerLawTemporalModel", "powerlaw"] alpha = Parameter("alpha", 1.0, frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True) t0 = Parameter("t0", "1 d", frozen=True)
[docs] @staticmethod def evaluate(time, alpha, t_ref, t0=1 * u.day): """Evaluate at given times.""" return np.power((time - t_ref) / t0, alpha)
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters alpha = pars["alpha"].quantity t0 = pars["t0"].quantity t_ref = self.reference_time if alpha != -1: value = self.evaluate(t_max, alpha + 1.0, t_ref, t0) - self.evaluate( t_min, alpha + 1.0, t_ref, t0 ) return t0 / (alpha + 1.0) * value / self.time_sum(t_min, t_max) else: value = np.log((t_max - t_ref) / (t_min - t_ref)) return t0 * value / self.time_sum(t_min, t_max)
[docs]class SineTemporalModel(TemporalModel): """Temporal model with a sinusoidal modulation. For more information see :ref:`sine-temporal-model`. Parameters ---------- amp : float Amplitude of the sinusoidal function. Default is 1. t_ref: `~astropy.units.Quantity` The reference time in mjd. Default is 2000-01-01. omega: `~astropy.units.Quantity` Pulsation of the signal. Default is 1 rad/day. """ tag = ["SineTemporalModel", "sinus"] amp = Parameter("amp", 1.0, frozen=False) omega = Parameter("omega", "1. rad/day", frozen=False) _t_ref_default = Time("2000-01-01") t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False)
[docs] @staticmethod def evaluate(time, amp, omega, t_ref): """Evaluate at given times.""" return 1.0 + amp * np.sin(omega * (time - t_ref))
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. Returns ------- norm : float Integrated flux norm on the given time intervals. """ pars = self.parameters omega = pars["omega"].quantity.to_value("rad/day") amp = pars["amp"].value t_ref = self.reference_time value = (t_max - t_min).to_value(u.day) - amp / omega * ( np.sin(omega * (t_max - t_ref).to_value(u.day)) - np.sin(omega * (t_min - t_ref).to_value(u.day)) ) return value / self.time_sum(t_min, t_max).to_value(u.day)
[docs]class TemplatePhaseCurveTemporalModel(TemporalModel): """Temporal phase curve model. A timing solution is used to compute the phase corresponding to time and a template phase curve is used to determine the associated ``norm``. The phasecurve is given as a table with columns ``phase`` and ``norm``. The ``norm`` is supposed to be a unit-less multiplicative factor in the model, to be multiplied with a spectral model. The model does linear interpolation for times between the given ``(phase, norm)`` values. The implementation currently uses `scipy.interpolate. InterpolatedUnivariateSpline`, using degree ``k=1`` to get linear interpolation. This class also contains an ``integral`` method, making the computation of mean fluxes for a given time interval a one-liner. Parameters ---------- table : `~astropy.table.Table` A table with 'PHASE' vs 'NORM'. filename : str The name of the file containing the phase curve. t_ref : `~astropy.units.Quantity` The reference time in mjd. Default is 48442.5 mjd. phi_ref : `~astropy.units.Quantity` The phase at reference time. Default is 0. f0 : `~astropy.units.Quantity` The frequency at t_ref in s-1. Default is 29.946923 s-1. f1 : `~astropy.units.Quantity` The frequency derivative at t_ref in s-2. Default is 0 s-2. f2 : `~astropy.units.Quantity` The frequency second derivative at t_ref in s-3. Default is 0 s-3. """ tag = ["TemplatePhaseCurveTemporalModel", "template-phase"] _t_ref_default = Time(48442.5, format="mjd") _phi_ref_default = 0 _f0_default = 29.946923 * u.s**-1 _f1_default = 0 * u.s**-2 _f2_default = 0 * u.s**-3 t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True) phi_ref = Parameter("phi_ref", _phi_ref_default, unit="", frozen=True) f0 = Parameter("f0", _f0_default, frozen=True) f1 = Parameter("f1", _f1_default, frozen=True) f2 = Parameter("f2", _f2_default, frozen=True) def __init__(self, table, filename=None, **kwargs): self.table = table if filename is not None: filename = str(make_path(filename)) self.filename = filename super().__init__(**kwargs)
[docs] @classmethod def read( cls, path, t_ref=_t_ref_default.mjd * u.d, phi_ref=_phi_ref_default, f0=_f0_default, f1=_f1_default, f2=_f2_default, ): """Read phasecurve model table from FITS file. Beware : this does **not** read parameters. They will be set to defaults. Parameters ---------- path : str or `~pathlib.Path` Filename with path. """ filename = str(make_path(path)) return cls( Table.read(filename), filename=filename, t_ref=t_ref, phi_ref=phi_ref, f0=f0, f1=f1, f2=f2, )
@staticmethod def _time_to_phase(time, t_ref, phi_ref, f0, f1, f2): """Convert time to phase given timing solution parameters. Parameters ---------- time : `~astropy.units.Quantity` The time at which to compute the phase. t_ref : `~astropy.units.Quantity` The reference time in mjd. phi_ref : `~astropy.units.Quantity` The phase at reference time. Default is 0. f0 : `~astropy.units.Quantity` The frequency at t_ref in s-1. f1 : `~astropy.units.Quantity` The frequency derivative at t_ref in s-2. f2 : `~astropy.units.Quantity` The frequency second derivative at t_ref in s-3. Returns ------- phase : float Phase. period_number : int Number of period since t_ref. """ delta_t = time - t_ref phase = ( phi_ref + delta_t * (f0 + delta_t / 2.0 * (f1 + delta_t / 3 * f2)) ).to_value("") period_number = np.floor(phase) phase -= period_number return phase, period_number
[docs] def write(self, path=None, overwrite=False): if path is None: path = self.filename if path is None: raise ValueError(f"filename is required for {self.tag}") else: self.filename = str(make_path(path)) self.table.write(self.filename, overwrite=overwrite)
@lazyproperty def _interpolator(self): x = self.table["PHASE"].data y = self.table["NORM"].data return scipy.interpolate.InterpolatedUnivariateSpline( x, y, k=1, ext=2, bbox=[0.0, 1.0] )
[docs] def evaluate(self, time, t_ref, phi_ref, f0, f1, f2): phase, _ = self._time_to_phase(time, t_ref, phi_ref, f0, f1, f2) return self._interpolator(phase) * u.one
[docs] def integral(self, t_min, t_max): """Evaluate the integrated flux within the given time intervals. Parameters ---------- t_min: `~astropy.time.Time` Start times of observation. t_max: `~astropy.time.Time` Stop times of observation. Returns ------- norm: The model integrated flux. """ kwargs = {par.name: par.quantity for par in self.parameters} ph_min, n_min = self._time_to_phase(t_min.mjd * u.d, **kwargs) ph_max, n_max = self._time_to_phase(t_max.mjd * u.d, **kwargs) # here we assume that the frequency does not change during the integration boundaries delta_t = (t_min - self.reference_time).to(u.d) frequency = self.f0.quantity + delta_t * ( self.f1.quantity + delta_t * self.f2.quantity / 2 ) # Compute integral of one phase phase_integral = self._interpolator.antiderivative()( 1 ) - self._interpolator.antiderivative()(0) # Multiply by the total number of phases phase_integral *= n_max - n_min - 1 # Compute integrals before first full phase and after the last full phase end_integral = self._interpolator.antiderivative()( ph_max ) - self._interpolator.antiderivative()(0) start_integral = self._interpolator.antiderivative()( 1 ) - self._interpolator.antiderivative()(ph_min) # Divide by Jacobian (here we neglect variations of frequency during the integration period) total = (phase_integral + start_integral + end_integral) / frequency # Normalize by total integration time integral_norm = total / self.time_sum(t_min, t_max) return integral_norm.to("")
[docs] @classmethod def from_dict(cls, data): params = _build_parameters_from_dict( data["temporal"]["parameters"], cls.default_parameters ) filename = data["temporal"]["filename"] kwargs = {par.name: par for par in params} return cls.read(filename, **kwargs)
[docs] def to_dict(self, full_output=False): """Create dictionary for YAML serialisation.""" model_dict = super().to_dict() model_dict["temporal"]["filename"] = self.filename return model_dict
[docs] def plot_phasogram(self, ax=None, n_points=100, **kwargs): """ Plot phasogram of the phase model. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axis to plot on. Default is None. n_points : int, optional Number of bins to plot model. Default is 100. **kwargs : dict Keywords forwarded to `~matplotlib.pyplot.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. """ phase_axis = MapAxis.from_bounds(0.0, 1, nbin=n_points, name="Phase", unit="") m = RegionNDMap.create(region=None, axes=[phase_axis]) kwargs.setdefault("marker", "None") kwargs.setdefault("ls", "-") kwargs.setdefault("xerr", None) m.quantity = self._interpolator(phase_axis.center) ax = m.plot(ax=ax, **kwargs) ax.set_ylabel("Norm / A.U.") return ax
[docs] def sample_time(self, n_events, t_min, t_max, t_delta="1 s", random_state=0): """Sample arrival times of events. To fully cover the phase range, t_delta is the minimum between the input and product of the period at 0.5*(t_min + t_max) and the table bin size. Parameters ---------- n_events : int Number of events to sample. t_min : `~astropy.time.Time` Start time of the sampling. t_max : `~astropy.time.Time` Stop time of the sampling. t_delta : `~astropy.units.Quantity` Time step used for sampling of the temporal model. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- time : `~astropy.units.Quantity` Array with times of the sampled events. """ t_delta = u.Quantity(t_delta) # Determine period at the mid time t_mid = Time(t_min, scale=self.scale) + 0.5 * (t_max - t_min) delta_t = (t_mid - self.reference_time).to(u.d) frequency = self.f0.quantity + delta_t * ( self.f1.quantity + delta_t * self.f2.quantity / 2 ) period = 1 / frequency # Take minimum time delta between user input and the period divided by the number of rows in the model table # this assumes that phase values are evenly spaced. t_delta = np.minimum(period / len(self.table), t_delta) return super().sample_time(n_events, t_min, t_max, t_delta, random_state)