# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Time-dependent models."""
import logging
import numpy as np
import scipy.interpolate
from astropy import units as u
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from astropy.utils import lazyproperty
from gammapy.maps import MapAxis, RegionNDMap, TimeMapAxis
from gammapy.modeling import Parameter
from gammapy.utils.random import InverseCDFSampler, get_random_state
from gammapy.utils.scripts import make_path
from gammapy.utils.time import time_ref_from_dict, time_ref_to_dict
from .core import ModelBase, _build_parameters_from_dict
__all__ = [
"ConstantTemporalModel",
"ExpDecayTemporalModel",
"GaussianTemporalModel",
"GeneralizedGaussianTemporalModel",
"LightCurveTemplateTemporalModel",
"LinearTemporalModel",
"PowerLawTemporalModel",
"SineTemporalModel",
"TemplatePhaseCurveTemporalModel",
"TemporalModel",
]
log = logging.getLogger(__name__)
# TODO: make this a small ABC to define a uniform interface.
[docs]class TemporalModel(ModelBase):
"""Temporal model base class.
Evaluates on astropy.time.Time objects"""
_type = "temporal"
def __init__(self, **kwargs):
scale = kwargs.pop("scale", "utc")
if scale not in Time.SCALES:
raise ValueError(
f"{scale} is not a valid time scale. Choose from {Time.SCALES}"
)
self.scale = scale
super().__init__(**kwargs)
[docs] def __call__(self, time, energy=None):
"""Evaluate model
Parameters
----------
time : `~astropy.time.Time`
Time object
energy : `~astropy.units.Quantity`
Energy (optional)
Returns
-------
values : `~astropy.units.Quantity`
Model values
"""
kwargs = {par.name: par.quantity for par in self.parameters}
if energy is not None:
kwargs["energy"] = energy
time = Time(time, scale=self.scale).mjd * u.d
return self.evaluate(time, **kwargs)
@property
def type(self):
return self._type
@property
def is_energy_dependent(self):
return False
@property
def reference_time(self):
"""Reference time in mjd"""
return Time(self.t_ref.value, format="mjd", scale=self.scale)
@reference_time.setter
def reference_time(self, t_ref):
"""Reference time"""
if not isinstance(t_ref, Time):
raise TypeError(f"{t_ref} is not a {Time} object")
self.t_ref.value = Time(t_ref, scale=self.scale).mjd
[docs] def to_dict(self, full_output=False):
"""Create dict for YAML serilisation"""
data = super().to_dict(full_output)
data["temporal"]["scale"] = self.scale
return data
[docs] @staticmethod
def time_sum(t_min, t_max):
"""Total time between t_min and t_max
Parameters
----------
t_min, t_max : `~astropy.time.Time`
Lower and upper bound of integration range
Returns
-------
time_sum : `~astropy.time.TimeDelta`
Summed time in the intervals.
"""
diff = t_max - t_min
# TODO: this is a work-around for https://github.com/astropy/astropy/issues/10501
return u.Quantity(np.sum(diff.to_value("day")), "day")
[docs] def plot(self, time_range, ax=None, n_points=100, **kwargs):
"""
Plot Temporal Model.
Parameters
----------
time_range : `~astropy.time.Time`
times to plot the model
ax : `~matplotlib.axes.Axes`, optional
Axis to plot on
n_points : int
Number of bins to plot model
**kwargs : dict
Keywords forwarded to `~matplotlib.pyplot.errorbar`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
axis
"""
time_min, time_max = time_range
time_axis = TimeMapAxis.from_time_bounds(
time_min=time_min, time_max=time_max, nbin=n_points
)
m = RegionNDMap.create(region=None, axes=[time_axis])
kwargs.setdefault("marker", "None")
kwargs.setdefault("ls", "-")
kwargs.setdefault("xerr", None)
m.quantity = self(time_axis.time_mid)
ax = m.plot(ax=ax, **kwargs)
ax.set_ylabel("Norm / A.U.")
return ax
[docs] def sample_time(self, n_events, t_min, t_max, t_delta="1 s", random_state=0):
"""Sample arrival times of events.
Parameters
----------
n_events : int
Number of events to sample.
t_min : `~astropy.time.Time`
Start time of the sampling.
t_max : `~astropy.time.Time`
Stop time of the sampling.
t_delta : `~astropy.units.Quantity`
Time step used for sampling of the temporal model.
random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}
Defines random number generator initialisation.
Passed to `~gammapy.utils.random.get_random_state`.
Returns
-------
time : `~astropy.units.Quantity`
Array with times of the sampled events.
"""
t_min = Time(t_min, scale=self.scale)
t_max = Time(t_max, scale=self.scale)
t_delta = u.Quantity(t_delta)
random_state = get_random_state(random_state)
ontime = u.Quantity((t_max - t_min).sec, t_delta.unit)
n_step = (ontime / t_delta).to_value("")
t_step = ontime / n_step
indices = np.arange(n_step + 1)
steps = indices * t_step
t = Time(t_min + steps, format="mjd")
pdf = self(t)
sampler = InverseCDFSampler(pdf=pdf, random_state=random_state)
time_pix = sampler.sample(n_events)[0]
time = np.interp(time_pix, indices, steps)
return t_min + time
[docs] def integral(self, t_min, t_max, oversampling_factor=100, **kwargs):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min: `~astropy.time.Time`
Start times of observation
t_max: `~astropy.time.Time`
Stop times of observation
oversampling_factor : int
Oversampling factor to be used for numerical integration.
Returns
-------
norm : float
Integrated flux norm on the given time intervals
"""
t_values, steps = np.linspace(
t_min.mjd, t_max.mjd, oversampling_factor, retstep=True, axis=-1
)
times = Time(t_values, format="mjd", scale=self.scale)
values = self(times)
integral = np.sum(values, axis=-1) * steps
return integral / self.time_sum(t_min, t_max).to_value("d")
[docs]class ConstantTemporalModel(TemporalModel):
"""Constant temporal model.
For more information see :ref:`constant-temporal-model`.
"""
tag = ["ConstantTemporalModel", "const"]
[docs] @staticmethod
def evaluate(time):
"""Evaluate at given times."""
return np.ones(time.shape)
[docs] def integral(self, t_min, t_max):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min : `~astropy.time.Time`
Start times of observation
t_max : `~astropy.time.Time`
Stop times of observation
Returns
-------
norm : `~astropy.units.Quantity`
Integrated flux norm on the given time intervals
"""
return (t_max - t_min) / self.time_sum(t_min, t_max)
[docs]class LinearTemporalModel(TemporalModel):
"""Temporal model with a linear variation.
For more information see :ref:`linear-temporal-model`.
Parameters
----------
alpha : float
Constant term of the baseline flux
beta : `~astropy.units.Quantity`
Time variation coefficient of the flux
t_ref : `~astropy.units.Quantity`
The reference time in mjd. Frozen per default, at 2000-01-01.
"""
tag = ["LinearTemporalModel", "linear"]
alpha = Parameter("alpha", 1.0, frozen=False)
beta = Parameter("beta", 0.0, unit="d-1", frozen=False)
_t_ref_default = Time("2000-01-01")
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
[docs] @staticmethod
def evaluate(time, alpha, beta, t_ref):
"""Evaluate at given times"""
return alpha + beta * (time - t_ref)
[docs] def integral(self, t_min, t_max):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min : `~astropy.time.Time`
Start times of observation
t_max : `~astropy.time.Time`
Stop times of observation
Returns
-------
norm : float
Integrated flux norm on the given time intervals
"""
pars = self.parameters
alpha = pars["alpha"]
beta = pars["beta"].quantity
t_ref = self.reference_time
value = alpha * (t_max - t_min) + beta / 2.0 * (
(t_max - t_ref) * (t_max - t_ref) - (t_min - t_ref) * (t_min - t_ref)
)
return value / self.time_sum(t_min, t_max)
[docs]class ExpDecayTemporalModel(TemporalModel):
r"""Temporal model with an exponential decay.
For more information see :ref:`expdecay-temporal-model`.
Parameters
----------
t0 : `~astropy.units.Quantity`
Decay time scale
t_ref : `~astropy.units.Quantity`
The reference time in mjd. Frozen per default, at 2000-01-01 .
"""
tag = ["ExpDecayTemporalModel", "exp-decay"]
t0 = Parameter("t0", "1 d", frozen=False)
_t_ref_default = Time("2000-01-01")
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
[docs] @staticmethod
def evaluate(time, t0, t_ref):
"""Evaluate at given times"""
return np.exp(-(time - t_ref) / t0)
[docs] def integral(self, t_min, t_max):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min : `~astropy.time.Time`
Start times of observation
t_max : `~astropy.time.Time`
Stop times of observation
Returns
-------
norm : float
Integrated flux norm on the given time intervals
"""
pars = self.parameters
t0 = pars["t0"].quantity
t_ref = self.reference_time
value = self.evaluate(t_max, t0, t_ref) - self.evaluate(t_min, t0, t_ref)
return -t0 * value / self.time_sum(t_min, t_max)
[docs]class GaussianTemporalModel(TemporalModel):
r"""A Gaussian temporal profile
For more information see :ref:`gaussian-temporal-model`.
Parameters
----------
t_ref : `~astropy.units.Quantity`
The reference time in mjd at the peak.
sigma : `~astropy.units.Quantity`
Width of the gaussian profile.
"""
tag = ["GaussianTemporalModel", "gauss"]
_t_ref_default = Time("2000-01-01")
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False)
sigma = Parameter("sigma", "1 d", frozen=False)
[docs] @staticmethod
def evaluate(time, t_ref, sigma):
return np.exp(-((time - t_ref) ** 2) / (2 * sigma**2))
[docs] def integral(self, t_min, t_max, **kwargs):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min : `~astropy.time.Time`
Start times of observation
t_max : `~astropy.time.Time`
Stop times of observation
Returns
-------
norm : float
Integrated flux norm on the given time intervals
"""
pars = self.parameters
sigma = pars["sigma"].quantity
t_ref = self.reference_time
norm = np.sqrt(np.pi / 2) * sigma
u_min = (t_min - t_ref) / (np.sqrt(2) * sigma)
u_max = (t_max - t_ref) / (np.sqrt(2) * sigma)
integral = norm * (scipy.special.erf(u_max) - scipy.special.erf(u_min))
return integral / self.time_sum(t_min, t_max)
[docs]class GeneralizedGaussianTemporalModel(TemporalModel):
r"""A generalized Gaussian temporal profile
For more information see :ref:`generalized-gaussian-temporal-model`.
Parameters
----------
t_ref : `~astropy.units.Quantity`
The time of the pulse's maximum intensity.
t_rise : `~astropy.units.Quantity`
Rise time constant.
t_decay : `~astropy.units.Quantity`
Decay time constant.
eta : `~astropy.units.Quantity`
Inverse pulse sharpness -> higher values implies a more peaked pulse
"""
tag = ["GeneralizedGaussianTemporalModel", "gengauss"]
_t_ref_default = Time("2000-01-01")
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False)
t_rise = Parameter("t_rise", "1d", frozen=False)
t_decay = Parameter("t_decay", "1d", frozen=False)
eta = Parameter("eta", 1 / 2, unit="", frozen=False)
[docs] @staticmethod
def evaluate(time, t_ref, t_rise, t_decay, eta):
val_rise = np.exp(
-0.5
* (np.abs(u.Quantity(time - t_ref, "d")) ** (1 / eta))
/ (t_rise ** (1 / eta))
)
val_decay = np.exp(
-0.5
* (np.abs(u.Quantity(time - t_ref, "d")) ** (1 / eta))
/ (t_decay ** (1 / eta))
)
val = np.where(time < t_ref, val_rise, val_decay)
return val
[docs]class LightCurveTemplateTemporalModel(TemporalModel):
"""Temporal light curve model.
The lightcurve is given at specific times (and optionally energies) as a ``norm``
It can be serialised either as an astropy table or a `~gammapy.maps.RegionNDMap`
The ``norm`` is supposed to be a unit-less multiplicative factor in the model,
to be multiplied with a spectral model.
The model does linear interpolation for times between the given ``(time, energy, norm)``
values.
When the temporal model is energy-dependent, the default interpolation scheme is
linear with a log scale for the values. The interpolation method and scale values
can be changed with the ``method`` and ``values_scale`` arguments.
For more information see :ref:`LightCurve-temporal-model`.
Examples
--------
Read an example light curve object:
>>> from gammapy.modeling.models import LightCurveTemplateTemporalModel
>>> path = '$GAMMAPY_DATA/tests/models/light_curve/lightcrv_PKSB1222+216.fits'
>>> light_curve = LightCurveTemplateTemporalModel.read(path)
Show basic information about the lightcurve:
>>> print(light_curve)
LightCurveTemplateTemporalModel model summary:
Reference time: 59000.5 MJD
Start time: 59000.5 MJD
End time: 61862.5 MJD
Norm min: 0.01551196351647377
Norm max: 1.0
<BLANKLINE>
Compute ``norm`` at a given time:
>>> t = Time(59001.195, format="mjd")
>>> light_curve.evaluate(t)
array(0.02287888)
Compute mean ``norm`` in a given time interval:
>>> from astropy.time import Time
>>> import astropy.units as u
>>> t_r = Time(59000.5, format='mjd')
>>> t_min = t_r + [1, 4, 8] * u.d
>>> t_max = t_r + [1.5, 6, 9] * u.d
>>> light_curve.integral(t_min, t_max)
array([0.0074388942, 0.0071144081, 0.0068115544])
"""
tag = ["LightCurveTemplateTemporalModel", "template"]
_t_ref_default = Time("2000-01-01")
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
def __init__(self, map, t_ref=None, filename=None, method=None, values_scale=None):
if (map.data < 0).any():
log.warning("Map has negative values. Check and fix this!")
self.map = map.copy()
super().__init__()
if t_ref:
self.reference_time = t_ref
self.filename = filename
if method is None:
method = "linear"
if values_scale is None:
if self.is_energy_dependent:
values_scale = "log"
else:
values_scale = "lin"
self.method = method
self.values_scale = values_scale
def __str__(self):
start_time = self.t_ref.quantity + self.map.geom.axes["time"].edges[0]
end_time = self.t_ref.quantity + self.map.geom.axes["time"].edges[-1]
norm_min = np.nanmin(self.map.data)
norm_max = np.nanmax(self.map.data)
prnt = (
f"{self.__class__.__name__} model summary:\n "
f"Reference time: {self.t_ref.value} MJD \n "
f"Start time: {start_time.value} MJD \n "
f"End time: {end_time.value} MJD \n "
f"Norm min: {norm_min} \n"
f"Norm max: {norm_max}"
)
if self.is_energy_dependent:
energy_min = self.map.geom.axes["energy"].center[0]
energy_max = self.map.geom.axes["energy"].center[-1]
prnt1 = f"Energy min: {energy_min} \n" f"Energy max: {energy_max} \n"
prnt = prnt + prnt1
return prnt
@property
def is_energy_dependent(self):
"""Whether the model is energy dependent"""
return self.map.geom.has_energy_axis
[docs] @classmethod
def from_table(cls, table, filename=None):
"""Create a template model from an astropy table
Parameters
----------
table : `~astropy.table.Table`
Table containing the template model.
filename : str
Name of input file
Returns
-------
model : `LightCurveTemplateTemporalModel`
Light curve template model
"""
columns = [_.lower() for _ in table.colnames]
if "time" not in columns:
raise ValueError("A TIME column is necessary")
t_ref = time_ref_from_dict(table.meta, scale="utc")
nodes = table["TIME"]
ax_unit = nodes.quantity.unit
if not ax_unit.is_equivalent("d"):
try:
ax_unit = u.Unit(table.meta["TIMEUNIT"])
except KeyError:
raise ValueError("Time unit not found in the table")
time_axis = MapAxis.from_nodes(nodes=nodes, name="time", unit=ax_unit)
axes = [time_axis]
m = RegionNDMap.create(region=None, axes=axes, data=table["NORM"])
return cls(m, t_ref=t_ref, filename=filename)
[docs] @classmethod
def read(cls, filename, format="table"):
"""Read a template model
Parameters
----------
filename : str
Name of file to read
format : {"table", "map"}
Format of the input file.
Returns
-------
model : `LightCurveTemplateTemporalModel`
Light curve template model
"""
filename = str(make_path(filename))
if format == "table":
table = Table.read(filename)
return cls.from_table(table, filename=filename)
elif format == "map":
with fits.open(filename) as hdulist:
header = hdulist["SKYMAP_BANDS"].header
t_ref = time_ref_from_dict(header)
# TODO : Ugly hack to prevent creating a TimeMapAxis
# By default, MapAxis.from_table tries to create a
# TimeMapAxis, failing which, it creates a normal MapAxis.
# This ugly hack forces the fail. We need a normal Axis to
# have the evaluate method work
hdulist["SKYMAP_BANDS"].header.pop("MJDREFI")
m = RegionNDMap.from_hdulist(hdulist)
return cls(m, t_ref=t_ref, filename=filename)
else:
raise ValueError(
f"Not a valid format: '{format}', choose from: {'table', 'map'}"
)
[docs] def to_table(self):
"""Convert model to an astropy table"""
if self.is_energy_dependent:
raise NotImplementedError("Not supported for energy dependent models")
table = Table(
data=[self.map.geom.axes["time"].center, self.map.quantity],
names=["TIME", "NORM"],
meta=time_ref_to_dict(self.reference_time, scale=self.scale),
)
return table
[docs] def write(self, filename, format="table", overwrite=False):
"""Write a model to disk as per the specified format
Parameters:
filename : str
name of output file
format : str, either "table" or "map"
if format is table, it is serialised as an astropy Table
if map, then it is serialised as a RegionNDMap
overwrite : bool
Overwrite file on disk if present
"""
if self.filename is None:
raise IOError("Missing filename")
if format == "table":
table = self.to_table()
table.write(filename, overwrite=overwrite)
elif format == "map":
# RegionNDMap.from_hdulist does not update the header
hdulist = self.map.to_hdulist()
hdulist["SKYMAP_BANDS"].header.update(
time_ref_to_dict(self.reference_time, scale=self.scale)
)
hdulist.writeto(filename, overwrite=overwrite)
else:
raise ValueError("Not a valid format, choose from ['map', 'table']")
[docs] def evaluate(self, time, t_ref=None, energy=None):
"""Evaluate the model at given coordinates.
Parameters
----------
time: `~astropy.time.Time`
array of times where the model is evaluated;
t_ref: `~gammapy.modeling.Parameter`
Reference time for the model;
energy: `~astropy.units.Quantity`
array of energies where the model is evaluated;
Returns
-------
values : `~astropy.units.Quantity`
Model values
"""
if t_ref is None:
t_ref = self.reference_time
t = (time - t_ref).to_value(self.map.geom.axes["time"].unit)
coords = {"time": t}
if self.is_energy_dependent:
if energy is None:
energy = self.map.geom.axes["energy"].center
coords["energy"] = energy.reshape(-1, 1)
val = self.map.interp_by_coord(
coords, method=self.method, values_scale=self.values_scale
)
val = np.clip(val, 0, a_max=None)
return u.Quantity(val, unit=self.map.unit, copy=False)
[docs] def integral(self, t_min, t_max, oversampling_factor=100, **kwargs):
if self.is_energy_dependent:
raise NotImplementedError(
"Integral not supported for energy dependent models"
)
return super().integral(t_min, t_max, oversampling_factor, **kwargs)
[docs] @classmethod
def from_dict(cls, data):
data = data["temporal"]
filename = data["filename"]
format = data.get("format", "table")
return cls.read(filename, format)
[docs] def to_dict(self, full_output=False, format="table"):
"""Create dict for YAML serialisation"""
data = super().to_dict(full_output)
data["temporal"]["filename"] = self.filename
data["temporal"]["format"] = format
data["temporal"]["unit"] = str(self.map.unit)
return data
[docs] def plot(self, time_range, ax=None, n_points=100, energy=None, **kwargs):
"""
Plot Temporal Model.
Parameters
----------
time_range : `~astropy.time.Time`
times to plot the model
ax : `~matplotlib.axes.Axes`, optional
Axis to plot on
n_points : int
Number of bins to plot model
energy : `~astropy.units.quantity`
energies to compute the model at for energy dependent models, optional
**kwargs : dict
Keywords forwarded to `~matplotlib.pyplot.errorbar`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
axis
"""
if not self.is_energy_dependent:
super().plot(time_range=time_range, ax=ax, n_points=n_points, **kwargs)
else:
time_min, time_max = Time(time_range, scale=self.scale)
time_axis = TimeMapAxis.from_time_bounds(
time_min=time_min, time_max=time_max, nbin=n_points
)
if energy is None:
energy_axis = self.map.geom.axes["energy"]
else:
energy_axis = MapAxis.from_nodes(
nodes=energy, name="energy", interp="log"
)
m = RegionNDMap.create(region=None, axes=[time_axis, energy_axis])
kwargs.setdefault("marker", "None")
kwargs.setdefault("ls", "-")
m.quantity = self.evaluate(
time=time_axis.time_mid, energy=energy_axis.center
)
ax = m.plot(axis_name="time", ax=ax, **kwargs)
ax.set_ylabel("Norm / A.U.")
return ax, m
[docs]class PowerLawTemporalModel(TemporalModel):
"""Temporal model with a Power Law decay.
For more information see :ref:`powerlaw-temporal-model`.
Parameters
----------
alpha : float
Decay time power
t_ref: `~astropy.units.Quantity`
The reference time in mjd. Frozen by default, at 2000-01-01.
t0: `~astropy.units.Quantity`
The scaling time in mjd. Fixed by default, at 1 day.
"""
tag = ["PowerLawTemporalModel", "powerlaw"]
alpha = Parameter("alpha", 1.0, frozen=False)
_t_ref_default = Time("2000-01-01")
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
t0 = Parameter("t0", "1 d", frozen=True)
[docs] @staticmethod
def evaluate(time, alpha, t_ref, t0=1 * u.day):
"""Evaluate at given times"""
return np.power((time - t_ref) / t0, alpha)
[docs] def integral(self, t_min, t_max):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min: `~astropy.time.Time`
Start times of observation
t_max: `~astropy.time.Time`
Stop times of observation
Returns
-------
norm : float
Integrated flux norm on the given time intervals
"""
pars = self.parameters
alpha = pars["alpha"].quantity
t0 = pars["t0"].quantity
t_ref = self.reference_time
if alpha != -1:
value = self.evaluate(t_max, alpha + 1.0, t_ref, t0) - self.evaluate(
t_min, alpha + 1.0, t_ref, t0
)
return t0 / (alpha + 1.0) * value / self.time_sum(t_min, t_max)
else:
value = np.log((t_max - t_ref) / (t_min - t_ref))
return t0 * value / self.time_sum(t_min, t_max)
[docs]class SineTemporalModel(TemporalModel):
"""Temporal model with a sinusoidal modulation.
For more information see :ref:`sine-temporal-model`.
Parameters
----------
amp : float
Amplitude of the sinusoidal function
t_ref: `~astropy.units.Quantity`
The reference time in mjd.
omega: `~astropy.units.Quantity`
Pulsation of the signal.
"""
tag = ["SineTemporalModel", "sinus"]
amp = Parameter("amp", 1.0, frozen=False)
omega = Parameter("omega", "1. rad/day", frozen=False)
_t_ref_default = Time("2000-01-01")
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=False)
[docs] @staticmethod
def evaluate(time, amp, omega, t_ref):
"""Evaluate at given times"""
return 1.0 + amp * np.sin(omega * (time - t_ref))
[docs] def integral(self, t_min, t_max):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min: `~astropy.time.Time`
Start times of observation
t_max: `~astropy.time.Time`
Stop times of observation
Returns
-------
norm : float
Integrated flux norm on the given time intervals
"""
pars = self.parameters
omega = pars["omega"].quantity.to_value("rad/day")
amp = pars["amp"].value
t_ref = self.reference_time
value = (t_max - t_min).to_value(u.day) - amp / omega * (
np.sin(omega * (t_max - t_ref).to_value(u.day))
- np.sin(omega * (t_min - t_ref).to_value(u.day))
)
return value / self.time_sum(t_min, t_max).to_value(u.day)
[docs]class TemplatePhaseCurveTemporalModel(TemporalModel):
"""Temporal phase curve model.
A timing solution is used to compute the phase corresponding to time and
a template phase curve is used to determine the associated ``norm``.
The phasecurve is given as a table with columns ``phase`` and ``norm``.
The ``norm`` is supposed to be a unit-less multiplicative factor in the model,
to be multiplied with a spectral model.
The model does linear interpolation for times between the given ``(phase, norm)`` values.
The implementation currently uses `scipy.interpolate. InterpolatedUnivariateSpline`,
using degree ``k=1`` to get linear interpolation.
This class also contains an ``integral`` method, making the computation of
mean fluxes for a given time interval a one-liner.
Parameters
----------
table : `~astropy.table.Table`
A table with 'PHASE' vs 'NORM'
filename : str
The name of the file containing the phase curve
t_ref : `~astropy.units.Quantity`
The reference time in mjd
phi_ref : `~astropy.units.Quantity`
The phase at reference time. Default is 0.
f0 : `~astropy.units.Quantity`
The frequency at t_ref in s-1
f1 : `~astropy.units.Quantity`
The frequency derivative at t_ref in s-2. Default is 0 s-2.
f2 : `~astropy.units.Quantity`
The frequency second derivative at t_ref in s-3. Default is 0 s-3.
"""
tag = ["TemplatePhaseCurveTemporalModel", "template-phase"]
_t_ref_default = Time(48442.5, format="mjd")
_phi_ref_default = 0
_f0_default = 29.946923 * u.s**-1
_f1_default = 0 * u.s**-2
_f2_default = 0 * u.s**-3
t_ref = Parameter("t_ref", _t_ref_default.mjd, unit="day", frozen=True)
phi_ref = Parameter("phi_ref", _phi_ref_default, unit="", frozen=True)
f0 = Parameter("f0", _f0_default, frozen=True)
f1 = Parameter("f1", _f1_default, frozen=True)
f2 = Parameter("f2", _f2_default, frozen=True)
def __init__(self, table, filename=None, **kwargs):
self.table = table
if filename is not None:
filename = str(make_path(filename))
self.filename = filename
super().__init__(**kwargs)
[docs] @classmethod
def read(
cls,
path,
t_ref=_t_ref_default.mjd * u.d,
phi_ref=_phi_ref_default,
f0=_f0_default,
f1=_f1_default,
f2=_f2_default,
):
"""Read phasecurve model table from FITS file.
Beware : this does **not** read parameters.
They will be set to defaults.
Parameters
----------
path : str or `~pathlib.Path`
filename with path
"""
filename = str(make_path(path))
return cls(
Table.read(filename),
filename=filename,
t_ref=t_ref,
phi_ref=phi_ref,
f0=f0,
f1=f1,
f2=f2,
)
@staticmethod
def _time_to_phase(time, t_ref, phi_ref, f0, f1, f2):
"""Convert time to phase given timing solution parameters.
Parameters
----------
time : `~astropy.units.Quantity`
The time at which to compute the phase
t_ref : `~astropy.units.Quantity`
The reference time in mjd.
phi_ref : `~astropy.units.Quantity`
The phase at reference time. Default is 0.
f0 : `~astropy.units.Quantity`
The frequency at t_ref in s-1
f1 : `~astropy.units.Quantity`
The frequency derivative at t_ref in s-2.
f2 : `~astropy.units.Quantity`
The frequency second derivative at t_ref in s-3.
Returns
-------
phase : float
Phase.
period_number : int
Number of period since t_ref.
"""
delta_t = time - t_ref
phase = (
phi_ref + delta_t * (f0 + delta_t / 2.0 * (f1 + delta_t / 3 * f2))
).to_value("")
period_number = np.floor(phase)
phase -= period_number
return phase, period_number
[docs] def write(self, path=None, overwrite=False):
if path is None:
path = self.filename
if path is None:
raise ValueError(f"filename is required for {self.tag}")
else:
self.filename = str(make_path(path))
self.table.write(self.filename, overwrite=overwrite)
@lazyproperty
def _interpolator(self):
x = self.table["PHASE"].data
y = self.table["NORM"].data
return scipy.interpolate.InterpolatedUnivariateSpline(
x, y, k=1, ext=2, bbox=[0.0, 1.0]
)
[docs] def evaluate(self, time, t_ref, phi_ref, f0, f1, f2):
phase, _ = self._time_to_phase(time, t_ref, phi_ref, f0, f1, f2)
return self._interpolator(phase)
[docs] def integral(self, t_min, t_max):
"""Evaluate the integrated flux within the given time intervals
Parameters
----------
t_min: `~astropy.time.Time`
Start times of observation
t_max: `~astropy.time.Time`
Stop times of observation
Returns
-------
norm: The model integrated flux
"""
kwargs = {par.name: par.quantity for par in self.parameters}
ph_min, n_min = self._time_to_phase(t_min.mjd * u.d, **kwargs)
ph_max, n_max = self._time_to_phase(t_max.mjd * u.d, **kwargs)
# here we assume that the frequency does not change during the integration boundaries
delta_t = (t_min - self.reference_time).to(u.d)
frequency = self.f0.quantity + delta_t * (
self.f1.quantity + delta_t * self.f2.quantity / 2
)
# Compute integral of one phase
phase_integral = self._interpolator.antiderivative()(
1
) - self._interpolator.antiderivative()(0)
# Multiply by the total number of phases
phase_integral *= n_max - n_min - 1
# Compute integrals before first full phase and after the last full phase
end_integral = self._interpolator.antiderivative()(
ph_max
) - self._interpolator.antiderivative()(0)
start_integral = self._interpolator.antiderivative()(
1
) - self._interpolator.antiderivative()(ph_min)
# Divide by Jacobian (here we neglect variations of frequency during the integration period)
total = (phase_integral + start_integral + end_integral) / frequency
# Normalize by total integration time
integral_norm = total / self.time_sum(t_min, t_max)
return integral_norm.to("")
[docs] @classmethod
def from_dict(cls, data):
params = _build_parameters_from_dict(
data["temporal"]["parameters"], cls.default_parameters
)
filename = data["temporal"]["filename"]
kwargs = {par.name: par for par in params}
return cls.read(filename, **kwargs)
[docs] def to_dict(self, full_output=False):
"""Create dict for YAML serialisation"""
model_dict = super().to_dict()
model_dict["temporal"]["filename"] = self.filename
return model_dict
[docs] def plot_phasogram(self, ax=None, n_points=100, **kwargs):
"""
Plot phasogram of the phase model.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis to plot on
n_points : int
Number of bins to plot model
**kwargs : dict
Keywords forwarded to `~matplotlib.pyplot.errorbar`
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
axis
"""
phase_axis = MapAxis.from_bounds(0.0, 1, nbin=n_points, name="Phase", unit="")
m = RegionNDMap.create(region=None, axes=[phase_axis])
kwargs.setdefault("marker", "None")
kwargs.setdefault("ls", "-")
kwargs.setdefault("xerr", None)
m.quantity = self._interpolator(phase_axis.center)
ax = m.plot(ax=ax, **kwargs)
ax.set_ylabel("Norm / A.U.")
return ax
[docs] def sample_time(self, n_events, t_min, t_max, t_delta="1 s", random_state=0):
"""Sample arrival times of events.
To fully cover the phase range, t_delta is the minimum between the input
and product of the period at 0.5*(t_min + t_max) and the table bin size.
Parameters
----------
n_events : int
Number of events to sample.
t_min : `~astropy.time.Time`
Start time of the sampling.
t_max : `~astropy.time.Time`
Stop time of the sampling.
t_delta : `~astropy.units.Quantity`
Time step used for sampling of the temporal model.
random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}
Defines random number generator initialisation.
Passed to `~gammapy.utils.random.get_random_state`.
Returns
-------
time : `~astropy.units.Quantity`
Array with times of the sampled events.
"""
t_delta = u.Quantity(t_delta)
# Determine period at the mid time
t_mid = Time(t_min, scale=self.scale) + 0.5 * (t_max - t_min)
delta_t = (t_mid - self.reference_time).to(u.d)
frequency = self.f0.quantity + delta_t * (
self.f1.quantity + delta_t * self.f2.quantity / 2
)
period = 1 / frequency
# Take minimum time delta between user input and the period divied by the number of raows in the model table
# this assumes that phase values are evenly spaced.
t_delta = np.minimum(period / len(self.table), t_delta)
return super().sample_time(n_events, t_min, t_max, t_delta, random_state)