# Licensed under a 3-clause BSD style license - see LICENSE.rstimportloggingimportnumpyasnpfromscipy.specialimporterfcfromastropyimportunitsasufromastropy.ioimportfitsfromastropy.tableimportTablefromastropy.visualizationimportquantity_supportimportmatplotlib.pyplotaspltfrommatplotlib.gridspecimportGridSpecfromgammapy.maps.axesimportUNIT_STRING_FORMAT,MapAxisfromgammapy.modeling.modelsimport(DatasetModels,Models,SkyModel,TemplateSpatialModel,)fromgammapy.utils.interpolationimportinterpolate_profilefromgammapy.utils.scriptsimportmake_name,make_pathfrom.coreimportDatasetlog=logging.getLogger(__name__)__all__=["FluxPointsDataset"]def_get_reference_model(model,energy_bounds,margin_percent=70):ifisinstance(model.spatial_model,TemplateSpatialModel):geom=model.spatial_model.map.geomemin=energy_bounds[0]*(1-margin_percent/100)emax=energy_bounds[-1]*(1+margin_percent/100)energy_axis=MapAxis.from_energy_bounds(emin,emax,nbin=20,per_decade=True,name="energy_true")geom=geom.to_image().to_cube([energy_axis])returnModels([model]).to_template_spectral_model(geom)else:returnmodel.spectral_model
[docs]classFluxPointsDataset(Dataset):"""Bundle a set of flux points with a parametric model, to compute fit statistic function using different statistics (see ``stat_type``). For more information see :ref:`datasets`. Parameters ---------- models : `~gammapy.modeling.models.Models` Models (only spectral part needs to be set). data : `~gammapy.estimators.FluxPoints` Flux points. Must be sorted along the energy axis. mask_fit : `numpy.ndarray` Mask to apply for fitting. mask_safe : `numpy.ndarray` Mask defining the safe data range. By default, upper limit values are excluded. meta_table : `~astropy.table.Table` Table listing information on observations used to create the dataset. One line per observation for stacked datasets. stat_type : str Method used to compute the statistics: * chi2 : estimate from chi2 statistics. * profile : estimate from interpolation of the likelihood profile. * distrib : Assuming gaussian errors the likelihood is given by the probability density function of the normal distribution. For the upper limit case it is necessary to marginalize over the unknown measurement, so we integrate the normal distribution up to the upper limit value which gives the complementary error function. See eq. C7 of `Mohanty et al (2013) <https://iopscience.iop.org/article/10.1088/0004-637X/773/2/168/pdf>`__ Default is `chi2`, in that case upper limits are ignored and the mean of asymetrics error is used. However, it is recommended to use `profile` if `stat_scan` is available on flux points. The `distrib` case provides an approximation if the profile is not available. stat_kwargs : dict Extra arguments specifying the interpolation scheme of the likelihood profile. Used only if `stat_type=="profile"`. In that case the default is : `stat_kwargs={"interp_scale":"sqrt", "extrapolate":True}` Examples -------- Load flux points from file and fit with a power-law model:: >>> from gammapy.modeling import Fit >>> from gammapy.modeling.models import PowerLawSpectralModel, SkyModel >>> from gammapy.estimators import FluxPoints >>> from gammapy.datasets import FluxPointsDataset >>> filename = "$GAMMAPY_DATA/tests/spectrum/flux_points/diff_flux_points.fits" >>> dataset = FluxPointsDataset.read(filename) >>> model = SkyModel(spectral_model=PowerLawSpectralModel()) >>> dataset.models = model Make the fit: >>> fit = Fit() >>> result = fit.run([dataset]) >>> print(result) OptimizeResult <BLANKLINE> backend : minuit method : migrad success : True message : Optimization terminated successfully. nfev : 135 total stat : 25.21 <BLANKLINE> CovarianceResult <BLANKLINE> backend : minuit method : hesse success : True message : Hesse terminated successfully. >>> print(result.parameters.to_table()) type name value unit ... frozen link prior ---- --------- ---------- -------------- ... ------ ---- ----- index 2.2159e+00 ... False amplitude 2.1619e-13 TeV-1 s-1 cm-2 ... False reference 1.0000e+00 TeV ... True Note: In order to reproduce the example, you need the tests datasets folder. You may download it with the command: ``gammapy download datasets --tests --out $GAMMAPY_DATA`` """tag="FluxPointsDataset"def__init__(self,models=None,data=None,mask_fit=None,mask_safe=None,name=None,meta_table=None,stat_type="chi2",stat_kwargs=None,):ifnotdata.geom.has_energy_axis:raiseValueError("FluxPointsDataset needs an energy axis")self.data=dataself.mask_fit=mask_fitself._name=make_name(name)self.models=modelsself.meta_table=meta_tableself._available_stat_type=dict(chi2=self._stat_array_chi2,profile=self._stat_array_profile,distrib=self._stat_array_distrib,)ifstat_kwargsisNone:stat_kwargs=dict()self.stat_kwargs=stat_kwargsself.stat_type=stat_typeifmask_safeisNone:mask_safe=np.ones(self.data.dnde.data.shape,dtype=bool)self.mask_safe=mask_safe@propertydefavailable_stat_type(self):returnlist(self._available_stat_type.keys())@propertydefstat_type(self):returnself._stat_type@stat_type.setterdefstat_type(self,stat_type):ifstat_typenotinself.available_stat_type:raiseValueError(f"Invalid stat_type: possible options are {self.available_stat_type}")ifstat_type=="chi2":self._mask_valid=(~self.data.is_ul).data&np.isfinite(self.data.dnde)elifstat_type=="distrib":self._mask_valid=(self.data.is_ul.data&np.isfinite(self.data.dnde_ul))|np.isfinite(self.data.dnde)elifstat_type=="profile":self.stat_kwargs.setdefault("interp_scale","sqrt")self.stat_kwargs.setdefault("extrapolate",True)self._profile_interpolators=self._get_valid_profile_interpolators()self._stat_type=stat_type@propertydefmask_valid(self):returnself._mask_valid@propertydefmask_safe(self):returnself._mask_safe&self.mask_valid@mask_safe.setterdefmask_safe(self,mask_safe):self._mask_safe=mask_safe@propertydefname(self):returnself._name@propertydefgti(self):"""Good time interval info (`GTI`)."""returnself.data.gti@propertydefmodels(self):returnself._models@models.setterdefmodels(self,models):ifmodelsisNone:self._models=Noneelse:models=DatasetModels(models)self._models=models.select(datasets_names=self.name)
[docs]defwrite(self,filename,overwrite=False,checksum=False,**kwargs):"""Write flux point dataset to file. Parameters ---------- filename : str Filename to write to. overwrite : bool, optional Overwrite existing file. Default is False. checksum : bool When True adds both DATASUM and CHECKSUM cards to the headers written to the FITS file. Applies only if filename has .fits suffix. Default is False. **kwargs : dict, optional Keyword arguments passed to `~astropy.table.Table.write`. """table=self.data.to_table()ifself.mask_fitisNone:mask_fit=self.mask_safeelse:mask_fit=self.mask_fittable["mask_fit"]=mask_fittable["mask_safe"]=self.mask_safefilename=make_path(filename)if"fits"infilename.suffixes:primary_hdu=fits.PrimaryHDU()hdu_table=fits.BinTableHDU(table,name="TABLE")fits.HDUList([primary_hdu,hdu_table]).writeto(filename,overwrite=overwrite,checksum=checksum)else:table.write(make_path(filename),overwrite=overwrite,**kwargs)
[docs]@classmethoddefread(cls,filename,name=None):"""Read pre-computed flux points and create a dataset. Parameters ---------- filename : str Filename to read from. name : str, optional Name of the new dataset. Default is None. Returns ------- dataset : `FluxPointsDataset` FluxPointsDataset. """fromgammapy.estimatorsimportFluxPointsfilename=make_path(filename)table=Table.read(filename)mask_fit=Nonemask_safe=Noneif"mask_safe"intable.colnames:mask_safe=table["mask_safe"].data.astype("bool")if"mask_fit"intable.colnames:mask_fit=table["mask_fit"].data.astype("bool")returncls(name=make_name(name),data=FluxPoints.from_table(table),mask_fit=mask_fit,mask_safe=mask_safe,)
[docs]@classmethoddeffrom_dict(cls,data,**kwargs):"""Create flux point dataset from dict. Parameters ---------- data : dict Dictionary containing data to create dataset from. Returns ------- dataset : `FluxPointsDataset` Flux point datasets. """fromgammapy.estimatorsimportFluxPointsfilename=make_path(data["filename"])table=Table.read(filename)mask_fit=table["mask_fit"].data.astype("bool")mask_safe=table["mask_safe"].data.astype("bool")table.remove_columns(["mask_fit","mask_safe"])returncls(name=data["name"],data=FluxPoints.from_table(table),mask_fit=mask_fit,mask_safe=mask_safe,)
def__str__(self):str_=f"{self.__class__.__name__}\n"str_+="-"*len(self.__class__.__name__)+"\n"str_+="\n"str_+="\t{:32}: {}\n\n".format("Name",self.name)# data sectionn_bins=0ifself.dataisnotNone:n_bins=np.prod(self.data.geom.data_shape)str_+="\t{:32}: {}\n".format("Number of total flux points",n_bins)n_fit_bins=0ifself.maskisnotNone:n_fit_bins=np.sum(self.mask.data)str_+="\t{:32}: {}\n\n".format("Number of fit bins",n_fit_bins)# likelihood sectionstr_+="\t{:32}: {}\n".format("Fit statistic type",self.stat_type)stat=np.nanifself.dataisnotNoneandself.modelsisnotNone:stat=self.stat_sum()str_+="\t{:32}: {:.2f}\n\n".format("Fit statistic value (-2 log(L))",stat)# model sectionn_models=0ifself.modelsisnotNone:n_models=len(self.models)str_+="\t{:32}: {}\n".format("Number of models",n_models)ifself.modelsisnotNone:str_+="\t{:32}: {}\n".format("Number of parameters",len(self.models.parameters))str_+="\t{:32}: {}\n\n".format("Number of free parameters",len(self.models.parameters.free_parameters))str_+="\t"+"\n\t".join(str(self.models).split("\n")[2:])returnstr_.expandtabs(tabsize=2)
[docs]defdata_shape(self):"""Shape of the flux points data (tuple)."""returnself.data.energy_ref.shape
def_stat_array_chi2(self):"""Chi2 statistics."""model=self.flux_pred()data=self.data.dnde.quantitytry:sigma=self.data.dnde_err.quantityexceptAttributeError:sigma=(self.data.dnde_errn+self.data.dnde_errp).quantity/2return((data-model)/sigma).to_value("")**2def_stat_array_profile(self):"""Estimate statitistic from interpolation of the likelihood profile."""model=np.zeros(self.data.dnde.data.shape)+(self.flux_pred()/self.data.dnde_ref).to_value("")stat=np.zeros(model.shape)foridxinnp.ndindex(self._profile_interpolators.shape):stat[idx]=self._profile_interpolators[idx](model[idx])returnstatdef_get_valid_profile_interpolators(self):value_scan=self.data.stat_scan.geom.axes["norm"].centershape_axes=self.data.stat_scan.geom._shape[slice(3,None)][::-1]interpolators=np.empty(shape_axes,dtype=object)self._mask_valid=np.ones(self.data.dnde.data.shape,dtype=bool)foridxinnp.ndindex(shape_axes):stat_scan=np.abs(self.data.stat_scan.data[idx].squeeze()-self.data.stat.data[idx].squeeze())self._mask_valid[idx]=np.all(np.isfinite(stat_scan))interpolators[idx]=interpolate_profile(value_scan,stat_scan,interp_scale=self.stat_kwargs["interp_scale"],extrapolate=self.stat_kwargs["extrapolate"],)returninterpolatorsdef_stat_array_distrib(self):"""Estimate statistic from probability distributions, assumes that flux points correspond to asymmetric gaussians and upper limits complementary error functions. """model=np.zeros(self.data.dnde.data.shape)+self.flux_pred().to_value(self.data.dnde.unit)stat=np.zeros(model.shape)mask_valid=~np.isnan(self.data.dnde.data)loc=self.data.dnde.data[mask_valid]value=model[mask_valid]try:mask_p=(model>=self.data.dnde.data)[mask_valid]scale=np.zeros(mask_p.shape)scale[mask_p]=self.data.dnde_errp.data[mask_valid][mask_p]scale[~mask_p]=self.data.dnde_errn.data[mask_valid][~mask_p]mask_invalid=np.isnan(scale)scale[mask_invalid]=self.data.dnde_err.data[mask_valid][mask_invalid]exceptAttributeError:scale=self.data.dnde_err.data[mask_valid]stat[mask_valid]=((value-loc)/scale)**2mask_ul=self.data.is_ul.datavalue=model[mask_ul]loc_ul=self.data.dnde_ul.data[mask_ul]scale_ul=self.data.dnde_ul.data[mask_ul]stat[mask_ul]=2*np.log((erfc((loc_ul-value)/scale_ul)/2)/(erfc((loc_ul-0)/scale_ul)/2))stat[np.isnan(stat.data)]=0returnstat
[docs]defresiduals(self,method="diff"):"""Compute flux point residuals. Parameters ---------- method: {"diff", "diff/model"} Method used to compute the residuals. Available options are: - ``"diff"`` (default): data - model. - ``"diff/model"``: (data - model) / model. Returns ------- residuals : `~numpy.ndarray` Residuals array. """fp=self.datamodel=self.flux_pred()residuals=self._compute_residuals(fp.dnde.quantity,model,method)# Remove residuals for upper_limitsresiduals[fp.is_ul.data]=np.nanreturnresiduals
[docs]defplot_fit(self,ax_spectrum=None,ax_residuals=None,kwargs_spectrum=None,kwargs_residuals=None,):"""Plot flux points, best fit model and residuals in two panels. Calls `~FluxPointsDataset.plot_spectrum` and `~FluxPointsDataset.plot_residuals`. Parameters ---------- ax_spectrum : `~matplotlib.axes.Axes`, optional Axes to plot flux points and best fit model on. Default is None. ax_residuals : `~matplotlib.axes.Axes`, optional Axes to plot residuals on. Default is None. kwargs_spectrum : dict, optional Keyword arguments passed to `~FluxPointsDataset.plot_spectrum`. Default is None. kwargs_residuals : dict, optional Keyword arguments passed to `~FluxPointsDataset.plot_residuals`. Default is None. Returns ------- ax_spectrum, ax_residuals : `~matplotlib.axes.Axes` Flux points, best fit model and residuals plots. Examples -------- >>> from gammapy.modeling.models import PowerLawSpectralModel, SkyModel >>> from gammapy.estimators import FluxPoints >>> from gammapy.datasets import FluxPointsDataset >>> #load precomputed flux points >>> filename = "$GAMMAPY_DATA/tests/spectrum/flux_points/diff_flux_points.fits" >>> flux_points = FluxPoints.read(filename) >>> model = SkyModel(spectral_model=PowerLawSpectralModel()) >>> dataset = FluxPointsDataset(model, flux_points) >>> #configuring optional parameters >>> kwargs_spectrum = {"kwargs_model": {"color":"red", "ls":"--"}, "kwargs_fp":{"color":"green", "marker":"o"}} >>> kwargs_residuals = {"color": "blue", "markersize":4, "marker":'s', } >>> dataset.plot_fit(kwargs_residuals=kwargs_residuals, kwargs_spectrum=kwargs_spectrum) # doctest: +SKIP """ifself.data.geom.ndim>3:raiseValueError("Plot fit works with only one energy axis")fig=plt.figure(figsize=(9,7))gs=GridSpec(7,1)ifax_spectrumisNone:ax_spectrum=fig.add_subplot(gs[:5,:])ifax_residualsisNone:plt.setp(ax_spectrum.get_xticklabels(),visible=False)ifax_residualsisNone:ax_residuals=fig.add_subplot(gs[5:,:],sharex=ax_spectrum)kwargs_spectrum=kwargs_spectrumor{}kwargs_residuals=kwargs_residualsor{}kwargs_residuals.setdefault("method","diff/model")self.plot_spectrum(ax=ax_spectrum,**kwargs_spectrum)self.plot_residuals(ax=ax_residuals,**kwargs_residuals)returnax_spectrum,ax_residuals
[docs]defplot_residuals(self,ax=None,method="diff",**kwargs):"""Plot flux point residuals. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axes to plot on. Default is None. method : {"diff", "diff/model"} Normalization used to compute the residuals, see `FluxPointsDataset.residuals`. Default is "diff". **kwargs : dict Keyword arguments passed to `~matplotlib.axes.Axes.errorbar`. Returns ------- ax : `~matplotlib.axes.Axes` Axes object. """ifself.data.geom.ndim>3:raiseValueError("Plot residuals works with only one energy axis")ax=axorplt.gca()fp=self.dataresiduals=self.residuals(method)xerr=self.data.energy_axis.as_plot_xerryerr=fp._plot_get_flux_err(sed_type="dnde")ifmethod=="diff/model":model=self.flux_pred()yerr=((yerr[0].quantity/model).squeeze(),(yerr[1].quantity/model).squeeze(),)elifmethod=="diff":yerr=yerr[0].quantity.squeeze(),yerr[1].quantity.squeeze()else:raiseValueError('Invalid method, choose between "diff" and "diff/model"')kwargs.setdefault("color",kwargs.pop("c","black"))kwargs.setdefault("marker","+")kwargs.setdefault("linestyle",kwargs.pop("ls","none"))withquantity_support():ax.errorbar(fp.energy_ref,residuals.squeeze(),xerr=xerr,yerr=yerr,**kwargs)ax.axhline(0,color=kwargs["color"],lw=0.5)# format axesax.set_xlabel(f"Energy [{self._energy_unit.to_string(UNIT_STRING_FORMAT)}]")ax.set_xscale("log")label=self._residuals_labels[method]ax.set_ylabel(f"Residuals\n{label}")ymin=np.nanmin(residuals-yerr[0])ymax=np.nanmax(residuals+yerr[1])ymax=max(abs(ymin),ymax)ax.set_ylim(-1.05*ymax,1.05*ymax)returnax
[docs]defplot_spectrum(self,ax=None,kwargs_fp=None,kwargs_model=None,axis_name="energy"):"""Plot flux points and model. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axes to plot on. Default is None. kwargs_fp : dict, optional Keyword arguments passed to `gammapy.estimators.FluxPoints.plot` to configure the plot style. Default is None. kwargs_model : dict, optional Keyword arguments passed to `gammapy.modeling.models.SpectralModel.plot` and `gammapy.modeling.models.SpectralModel.plot_error` to configure the plot style. Default is None. axis_name : str Axis along which to plot the flux points for multiple axes. Default is energy. Returns ------- ax : `~matplotlib.axes.Axes` Axes object. Examples -------- >>> from gammapy.modeling.models import PowerLawSpectralModel, SkyModel >>> from gammapy.estimators import FluxPoints >>> from gammapy.datasets import FluxPointsDataset >>> #load precomputed flux points >>> filename = "$GAMMAPY_DATA/tests/spectrum/flux_points/diff_flux_points.fits" >>> flux_points = FluxPoints.read(filename) >>> model = SkyModel(spectral_model=PowerLawSpectralModel()) >>> dataset = FluxPointsDataset(model, flux_points) >>> #configuring optional parameters >>> kwargs_model = {"color":"red", "ls":"--"} >>> kwargs_fp = {"color":"green", "marker":"o"} >>> dataset.plot_spectrum(kwargs_fp=kwargs_fp, kwargs_model=kwargs_model) # doctest: +SKIP """kwargs_fp=(kwargs_fpor{}).copy()kwargs_model=(kwargs_modelor{}).copy()# plot flux pointskwargs_fp.setdefault("sed_type","e2dnde")ifaxis_name=="time":kwargs_fp["sed_type"]="norm"ax=self.data.plot(ax=ax,**kwargs_fp,axis_name=axis_name)kwargs_model.setdefault("label","Best fit model")kwargs_model.setdefault("zorder",10)formodelinself.models:ifmodel.datasets_namesisNoneorself.nameinmodel.datasets_names:ifaxis_name=="energy":kwargs_model.setdefault("sed_type","e2dnde")kwargs_model.setdefault("energy_bounds",self._energy_bounds)model.spectral_model.plot(ax=ax,**kwargs_model)ifaxis_name=="time":kwargs_model.setdefault("time_range",self.data.geom.axes["time"].time_bounds)model.temporal_model.plot(ax=ax,**kwargs_model)ifaxis_name=="energy":kwargs_model["color"]=ax.lines[-1].get_color()kwargs_model.pop("label")formodelinself.models:ifmodel.datasets_namesisNoneorself.nameinmodel.datasets_names:model.spectral_model.plot_error(ax=ax,**kwargs_model)returnax