# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Time-dependent models."""importnumpyasnpimportscipy.interpolatefromastropyimportunitsasufromastropy.tableimportTablefromastropy.timeimportTimefromastropy.utilsimportlazypropertyfromgammapy.mapsimportRegionNDMap,TimeMapAxisfromgammapy.modelingimportParameterfromgammapy.utils.randomimportInverseCDFSampler,get_random_statefromgammapy.utils.scriptsimportmake_pathfromgammapy.utils.timeimporttime_ref_from_dictfrom.coreimportModelBase__all__=["ConstantTemporalModel","ExpDecayTemporalModel","GaussianTemporalModel","GeneralizedGaussianTemporalModel","LightCurveTemplateTemporalModel","LinearTemporalModel","PowerLawTemporalModel","SineTemporalModel","TemporalModel",]# TODO: make this a small ABC to define a uniform interface.
[docs]classTemporalModel(ModelBase):"""Temporal model base class. evaluates on astropy.time.Time objects"""_type="temporal"
[docs]def__call__(self,time):"""Evaluate model Parameters ---------- time : `~astropy.time.Time` Time object """kwargs={par.name:par.quantityforparinself.parameters}time=u.Quantity(time.mjd,"day")returnself.evaluate(time,**kwargs)
@propertydeftype(self):returnself._type
[docs]@staticmethoddeftime_sum(t_min,t_max):""" Total time between t_min and t_max Parameters ---------- t_min, t_max : `~astropy.time.Time` Lower and upper bound of integration range Returns ------- time_sum : `~astropy.time.TimeDelta` Summed time in the intervals. """diff=t_max-t_min# TODO: this is a work-around for https://github.com/astropy/astropy/issues/10501returnu.Quantity(np.sum(diff.to_value("day")),"day")
[docs]defplot(self,time_range,ax=None,**kwargs):""" Plot Temporal Model. Parameters ---------- time_range : `~astropy.time.Time` times to plot the model ax : `~matplotlib.axes.Axes`, optional Axis to plot on **kwargs : dict Keywords forwarded to `~matplotlib.pyplot.errorbar` Returns ------- ax : `~matplotlib.axes.Axes`, optional axis """time_min,time_max=time_rangetime_axis=TimeMapAxis.from_time_bounds(time_min=time_min,time_max=time_max,nbin=100)m=RegionNDMap.create(region=None,axes=[time_axis])kwargs.setdefault("marker","None")kwargs.setdefault("ls","-")kwargs.setdefault("xerr",None)m.quantity=self(time_axis.time_mid)ax=m.plot(ax=ax,**kwargs)ax.set_ylabel("Norm / A.U.")returnax
[docs]defsample_time(self,n_events,t_min,t_max,t_delta="1 s",random_state=0):"""Sample arrival times of events. Parameters ---------- n_events : int Number of events to sample. t_min : `~astropy.time.Time` Start time of the sampling. t_max : `~astropy.time.Time` Stop time of the sampling. t_delta : `~astropy.units.Quantity` Time step used for sampling of the temporal model. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`} Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- time : `~astropy.units.Quantity` Array with times of the sampled events. """t_min=Time(t_min)t_max=Time(t_max)t_delta=u.Quantity(t_delta)random_state=get_random_state(random_state)ontime=u.Quantity((t_max-t_min).sec,"s")time_unit=(u.Unit(self.table.meta["TIMEUNIT"])ifhasattr(self,"table")elseontime.unit)t_stop=ontime.to_value(time_unit)# TODO: the separate time unit handling is unfortunate, but the quantity support for np.arange and np.interp# is still incomplete, refactor once we change to recent numpy and astropy versionst_step=t_delta.to_value(time_unit)ifhasattr(self,"table"):t=np.arange(0,t_stop,t_step)pdf=self.evaluate(t)sampler=InverseCDFSampler(pdf=pdf,random_state=random_state)time_pix=sampler.sample(n_events)[0]time=np.interp(time_pix,np.arange(len(t)),t)*time_unitelse:t_step=(t_step*u.s).to("d")t=Time(np.arange(t_min.mjd,t_max.mjd,t_step.value),format="mjd")pdf=self(t)sampler=InverseCDFSampler(pdf=pdf,random_state=random_state)time_pix=sampler.sample(n_events)[0]time=(np.interp(time_pix,np.arange(len(t)),t.value-min(t.value))*t_step.unit).to(time_unit)returnt_min+time
[docs]@staticmethoddefevaluate(time):"""Evaluate at given times."""returnnp.ones(time.shape)
[docs]defintegral(self,t_min,t_max):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min : `~astropy.time.Time` Start times of observation t_max : `~astropy.time.Time` Stop times of observation Returns ------- norm : `~astropy.units.Quantity` Integrated flux norm on the given time intervals """return(t_max-t_min)/self.time_sum(t_min,t_max)
[docs]classLinearTemporalModel(TemporalModel):"""Temporal model with a linear variation. For more information see :ref:`linear-temporal-model`. Parameters ---------- alpha : float Constant term of the baseline flux beta : `~astropy.units.Quantity` Time variation coefficient of the flux t_ref : `~astropy.units.Quantity` The reference time in mjd. Frozen per default, at 2000-01-01. """tag=["LinearTemporalModel","linear"]alpha=Parameter("alpha",1.0,frozen=False)beta=Parameter("beta",0.0,unit="d-1",frozen=False)_t_ref_default=Time("2000-01-01")t_ref=Parameter("t_ref",_t_ref_default.mjd,unit="day",frozen=True)
[docs]@staticmethoddefevaluate(time,alpha,beta,t_ref):"""Evaluate at given times"""returnalpha+beta*(time-t_ref)
[docs]defintegral(self,t_min,t_max):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min : `~astropy.time.Time` Start times of observation t_max : `~astropy.time.Time` Stop times of observation Returns ------- norm : float Integrated flux norm on the given time intervals """pars=self.parametersalpha=pars["alpha"]beta=pars["beta"].quantityt_ref=Time(pars["t_ref"].quantity,format="mjd")value=alpha*(t_max-t_min)+beta/2.0*((t_max-t_ref)*(t_max-t_ref)-(t_min-t_ref)*(t_min-t_ref))returnvalue/self.time_sum(t_min,t_max)
[docs]classExpDecayTemporalModel(TemporalModel):r"""Temporal model with an exponential decay. .. math:: F(t) = exp(-(t - t_ref)/t0) Parameters ---------- t0 : `~astropy.units.Quantity` Decay time scale t_ref : `~astropy.units.Quantity` The reference time in mjd. Frozen per default, at 2000-01-01 . """tag=["ExpDecayTemporalModel","exp-decay"]t0=Parameter("t0","1 d",frozen=False)_t_ref_default=Time("2000-01-01")t_ref=Parameter("t_ref",_t_ref_default.mjd,unit="day",frozen=True)
[docs]@staticmethoddefevaluate(time,t0,t_ref):"""Evaluate at given times"""returnnp.exp(-(time-t_ref)/t0)
[docs]defintegral(self,t_min,t_max):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min : `~astropy.time.Time` Start times of observation t_max : `~astropy.time.Time` Stop times of observation Returns ------- norm : float Integrated flux norm on the given time intervals """pars=self.parameterst0=pars["t0"].quantityt_ref=Time(pars["t_ref"].quantity,format="mjd")value=self.evaluate(t_max,t0,t_ref)-self.evaluate(t_min,t0,t_ref)return-t0*value/self.time_sum(t_min,t_max)
[docs]classGaussianTemporalModel(TemporalModel):r"""A Gaussian temporal profile .. math:: F(t) = exp( -0.5 * \frac{ (t - t_{ref})^2 } { \sigma^2 }) Parameters ---------- t_ref : `~astropy.units.Quantity` The reference time in mjd at the peak. sigma : `~astropy.units.Quantity` Width of the gaussian profile. """tag=["GaussianTemporalModel","gauss"]_t_ref_default=Time("2000-01-01")t_ref=Parameter("t_ref",_t_ref_default.mjd,unit="day",frozen=False)sigma=Parameter("sigma","1 d",frozen=False)
[docs]defintegral(self,t_min,t_max,**kwargs):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min : `~astropy.time.Time` Start times of observation t_max : `~astropy.time.Time` Stop times of observation Returns ------- norm : float Integrated flux norm on the given time intervals """pars=self.parameterssigma=pars["sigma"].quantityt_ref=Time(pars["t_ref"].quantity,format="mjd")norm=np.sqrt(np.pi/2)*sigmau_min=(t_min-t_ref)/(np.sqrt(2)*sigma)u_max=(t_max-t_ref)/(np.sqrt(2)*sigma)integral=norm*(scipy.special.erf(u_max)-scipy.special.erf(u_min))returnintegral/self.time_sum(t_min,t_max)
[docs]classGeneralizedGaussianTemporalModel(TemporalModel):r"""A generalized Gaussian temporal profile .. math:: F(t) = exp( - 0.5 * (\frac{ \lvert t - t_{ref} \rvert}{t_rise}) ^ {1 / \eta}) for t < t_ref F(t) = exp( - 0.5 * (\frac{ \lvert t - t_{ref} \rvert}{t_decay}) ^ {1 / \eta}) for t > t_ref Parameters ---------- t_ref : `~astropy.units.Quantity` The time of the pulse's maximum intensity. t_rise : `~astropy.units.Quantity` Rise time constant. t_decay : `~astropy.units.Quantity` Decay time constant. eta : `~astropy.units.Quantity` Inverse pulse sharpness -> higher values implies a more peaked pulse """tag=["GeneralizedGaussianTemporalModel","gengauss"]_t_ref_default=Time("2000-01-01")t_ref=Parameter("t_ref",_t_ref_default.mjd,unit="day",frozen=False)t_rise=Parameter("t_rise","1d",frozen=False)t_decay=Parameter("t_decay","1d",frozen=False)eta=Parameter("eta",1/2,unit="",frozen=False)
[docs]defintegral(self,t_min,t_max,**kwargs):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min: `~astropy.time.Time` Start times of observation t_max: `~astropy.time.Time` Stop times of observation Returns ------- norm : float Integrated flux norm on the given time intervals """pars=self.parameterst_rise=pars["t_rise"].quantityt_decay=pars["t_decay"].quantityeta=pars["eta"].quantityt_ref=Time(pars["t_ref"].quantity,format="mjd")integral=scipy.integrate.quad(self.evaluate,t_min.mjd,t_max.mjd,args=(t_ref.mjd,t_rise,t_decay,eta))[0]returnintegral/self.time_sum(t_min,t_max).to_value("d")
[docs]classLightCurveTemplateTemporalModel(TemporalModel):"""Temporal light curve model. The lightcurve is given as a table with columns ``time`` and ``norm``. The ``norm`` is supposed to be a unit-less multiplicative factor in the model, to be multiplied with a spectral model. The model does linear interpolation for times between the given ``(time, norm)`` values. The implementation currently uses `scipy.interpolate. InterpolatedUnivariateSpline`, using degree ``k=1`` to get linear interpolation. This class also contains an ``integral`` method, making the computation of mean fluxes for a given time interval a one-liner. Parameters ---------- table : `~astropy.table.Table` A table with 'TIME' vs 'NORM' Examples -------- Read an example light curve object: >>> from gammapy.modeling.models import LightCurveTemplateTemporalModel >>> path = '$GAMMAPY_DATA/tests/models/light_curve/lightcrv_PKSB1222+216.fits' >>> light_curve = LightCurveTemplateTemporalModel.read(path) Show basic information about the lightcurve: >>> print(light_curve) LightCurveTemplateTemporalModel model summary: Start time: 59000.5 MJD End time: 61862.5 MJD Norm min: 0.01551196351647377 Norm max: 1.0 <BLANKLINE> Compute ``norm`` at a given time: >>> light_curve.evaluate(60000) array(0.01551196) Compute mean ``norm`` in a given time interval: >>> from astropy.time import Time >>> times = Time([60000, 61000], format='mjd') >>> light_curve.integral(times[0], times[1]) <Quantity 0.01721725> """tag=["LightCurveTemplateTemporalModel","template"]def__init__(self,table,filename=None):self.table=tableiffilenameisnotNone:filename=str(make_path(filename))self.filename=filenamesuper().__init__()def__str__(self):norm=self.table["NORM"]return(f"{self.__class__.__name__} model summary:\n"f"Start time: {self._time[0].mjd} MJD\n"f"End time: {self._time[-1].mjd} MJD\n"f"Norm min: {norm.min()}\n"f"Norm max: {norm.max()}\n")
[docs]@classmethoddefread(cls,path):"""Read lightcurve model table from FITS file. TODO: This doesn't read the XML part of the model yet. """filename=str(make_path(path))returncls(Table.read(filename),filename=filename)
[docs]defwrite(self,path=None,overwrite=False):ifpathisNone:path=self.filenameifpathisNone:raiseValueError(f"filename is required for {self.tag}")else:self.filename=str(make_path(path))self.table.write(self.filename,overwrite=overwrite)
[docs]defevaluate(self,time,ext=0):"""Evaluate for a given time. Parameters ---------- time : array_like Time since the ``reference`` time. ext : int or str, optional, default: 0 Parameter passed to ~scipy.interpolate.InterpolatedUnivariateSpline Controls the extrapolation mode for GTIs outside the range 0 or "extrapolate", return the extrapolated value. 1 or "zeros", return 0 2 or "raise", raise a ValueError 3 or "const", return the boundary value. Returns ------- norm : array_like Norm at the given times. """returnself._interpolator(time,ext=ext)
[docs]defintegral(self,t_min,t_max):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min: `~astropy.time.Time` Start times of observation t_max: `~astropy.time.Time` Stop times of observation Returns ------- norm: The model integrated flux """n1=self._interpolator.antiderivative()(t_max.mjd)n2=self._interpolator.antiderivative()(t_min.mjd)returnu.Quantity(n1-n2,"day")/self.time_sum(t_min,t_max)
[docs]defto_dict(self,full_output=False):"""Create dict for YAML serialisation"""return{self._type:{"type":self.tag[0],"filename":self.filename}}
[docs]classPowerLawTemporalModel(TemporalModel):"""Temporal model with a Power Law decay. For more information see :ref:`powerlaw-temporal-model`. Parameters ---------- alpha : float Decay time power t_ref: `~astropy.units.Quantity` The reference time in mjd. Frozen by default, at 2000-01-01. t0: `~astropy.units.Quantity` The scaling time in mjd. Fixed by default, at 1 day. """tag=["PowerLawTemporalModel","powerlaw"]alpha=Parameter("alpha",1.0,frozen=False)_t_ref_default=Time("2000-01-01")t_ref=Parameter("t_ref",_t_ref_default.mjd,unit="day",frozen=True)t0=Parameter("t0","1 d",frozen=True)
[docs]@staticmethoddefevaluate(time,alpha,t_ref,t0=1*u.day):"""Evaluate at given times"""returnnp.power((time-t_ref)/t0,alpha)
[docs]defintegral(self,t_min,t_max):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min: `~astropy.time.Time` Start times of observation t_max: `~astropy.time.Time` Stop times of observation Returns ------- norm : float Integrated flux norm on the given time intervals """pars=self.parametersalpha=pars["alpha"].quantityt0=pars["t0"].quantityt_ref=Time(pars["t_ref"].quantity,format="mjd")ifalpha!=-1:value=self.evaluate(t_max,alpha+1.0,t_ref,t0)-self.evaluate(t_min,alpha+1.0,t_ref,t0)returnt0/(alpha+1.0)*value/self.time_sum(t_min,t_max)else:value=np.log((t_max-t_ref)/(t_min-t_ref))returnt0*value/self.time_sum(t_min,t_max)
[docs]classSineTemporalModel(TemporalModel):"""Temporal model with a sinusoidal modulation. For more information see :ref:`sine-temporal-model`. Parameters ---------- amp : float Amplitude of the sinusoidal function t_ref: `~astropy.units.Quantity` The reference time in mjd. omega: `~astropy.units.Quantity` Pulsation of the signal. """tag=["SineTemporalModel","sinus"]amp=Parameter("amp",1.0,frozen=False)omega=Parameter("omega","1. rad/day",frozen=False)_t_ref_default=Time("2000-01-01")t_ref=Parameter("t_ref",_t_ref_default.mjd,unit="day",frozen=False)
[docs]@staticmethoddefevaluate(time,amp,omega,t_ref):"""Evaluate at given times"""return1.0+amp*np.sin(omega*(time-t_ref))
[docs]defintegral(self,t_min,t_max):"""Evaluate the integrated flux within the given time intervals Parameters ---------- t_min: `~astropy.time.Time` Start times of observation t_max: `~astropy.time.Time` Stop times of observation Returns ------- norm : float Integrated flux norm on the given time intervals """pars=self.parametersomega=pars["omega"].quantity.to_value("rad/day")amp=pars["amp"].valuet_ref=Time(pars["t_ref"].quantity,format="mjd")value=(t_max-t_min)-amp/omega*(np.sin(omega*(t_max-t_ref).to_value("day"))-np.sin(omega*(t_min-t_ref).to_value("day")))returnvalue/self.time_sum(t_min,t_max)