# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Functions to compute test statistic images."""importwarningsfromitertoolsimportrepeatimportnumpyasnpimportscipy.optimizefromastropy.coordinatesimportAnglefromastropy.utilsimportlazypropertyimportgammapy.utils.parallelasparallelfromgammapy.datasets.mapimportMapEvaluatorfromgammapy.datasets.utilsimportget_nearest_valid_exposure_positionfromgammapy.mapsimportMap,Mapsfromgammapy.modeling.modelsimportPointSpatialModel,PowerLawSpectralModel,SkyModelfromgammapy.statsimportcash_sum_cython,f_cash_root_cython,norm_bounds_cythonfromgammapy.utils.arrayimportshape_2N,symmetric_crop_pad_widthfromgammapy.utils.pbarimportprogress_barfromgammapy.utils.rootsimportfind_rootsfrom..coreimportEstimatorfrom..utilsimportestimate_exposure_reco_energyfrom.coreimportFluxMaps__all__=["TSMapEstimator"]def_extract_array(array,shape,position):"""Helper function to extract parts of a larger array. Simple implementation of an array extract function , because `~astropy.ndata.utils.extract_array` introduces too much overhead.` Parameters ---------- array : `~numpy.ndarray` The array from which to extract. shape : tuple or int The shape of the extracted array. position : tuple of numbers or number The position of the small array's center with respect to the large array. """x_width=shape[2]//2y_width=shape[1]//2y_lo=position[0]-y_widthy_hi=position[0]+y_width+1x_lo=position[1]-x_widthx_hi=position[1]+x_width+1returnarray[:,y_lo:y_hi,x_lo:x_hi]
[docs]classTSMapEstimator(Estimator,parallel.ParallelMixin):r"""Compute test statistic map from a MapDataset using different optimization methods. The map is computed fitting by a single parameter norm fit. The fit is simplified by finding roots of the derivative of the fit statistics using various root finding algorithms. The approach is described in Appendix A in Stewart (2009). Parameters ---------- model : `~gammapy.modeling.model.SkyModel` Source model kernel. If set to None, assume spatail model: point source model, PointSpatialModel. spectral model: PowerLawSpectral Model of index 2 kernel_width : `~astropy.coordinates.Angle` Width of the kernel to use: the kernel will be truncated at this size n_sigma : int Number of sigma for flux error. Default is 1. n_sigma_ul : int Number of sigma for flux upper limits. Default is 2. downsampling_factor : int Sample down the input maps to speed up the computation. Only integer values that are a multiple of 2 are allowed. Note that the kernel is not sampled down, but must be provided with the downsampled bin size. threshold : float, optional If the test statistic value corresponding to the initial flux estimate is not above this threshold, the optimizing step is omitted to save computing time. Default is None. rtol : float Relative precision of the flux estimate. Used as a stopping criterion for the norm fit. Default is 0.01. selection_optional : list of str, optional Which maps to compute besides TS, sqrt(TS), flux and symmetric error on flux. Available options are: * "all": all the optional steps are executed * "errn-errp": estimate asymmetric error on flux. * "ul": estimate upper limits on flux. Default is None so the optional steps are not executed. energy_edges : list of `~astropy.units.Quantity`, optional Edges of the target maps energy bins. The resulting bin edges won't be exactly equal to the input ones, but rather the closest values to the energy axis edges of the parent dataset. Default is None: apply the estimator in each energy bin of the parent dataset. For further explanation see :ref:`estimators`. sum_over_energy_groups : bool Whether to sum over the energy groups or fit the norm on the full energy cube. n_jobs : int Number of processes used in parallel for the computation. Default is one, unless `~gammapy.utils.parallel.N_JOBS_DEFAULT` was modified. The number of jobs limited to the number of physical CPUs. parallel_backend : {"multiprocessing", "ray"} Which backend to use for multiprocessing. Defaults to `~gammapy.utils.parallel.BACKEND_DEFAULT`. Notes ----- Negative :math:`TS` values are defined as following: .. math:: TS = \left \{ \begin{array}{ll} -TS \text{ if } F < 0 \\ TS \text{ else} \end{array} \right. Where :math:`F` is the fitted flux norm. Examples -------- >>> import astropy.units as u >>> from gammapy.estimators import TSMapEstimator >>> from gammapy.datasets import MapDataset >>> from gammapy.modeling.models import (SkyModel, PowerLawSpectralModel,PointSpatialModel) >>> spatial_model = PointSpatialModel() >>> spectral_model = PowerLawSpectralModel(amplitude="1e-22 cm-2 s-1 keV-1", index=2) >>> model = SkyModel(spatial_model=spatial_model, spectral_model=spectral_model) >>> dataset = MapDataset.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc.fits.gz") >>> estimator = TSMapEstimator( ... model, kernel_width="1 deg", energy_edges=[10, 100] * u.GeV, downsampling_factor=4 ... ) >>> maps = estimator.run(dataset) >>> print(maps) FluxMaps -------- <BLANKLINE> geom : WcsGeom axes : ['lon', 'lat', 'energy'] shape : (400, 200, 1) quantities : ['ts', 'norm', 'niter', 'norm_err', 'npred', 'npred_excess', 'stat', 'stat_null', 'success'] ref. model : pl n_sigma : 1 n_sigma_ul : 2 sqrt_ts_threshold_ul : 2 sed type init : likelihood References ---------- [Stewart2009]_ """tag="TSMapEstimator"_available_selection_optional=["errn-errp","ul"]def__init__(self,model=None,kernel_width=None,downsampling_factor=None,n_sigma=1,n_sigma_ul=2,threshold=None,rtol=0.01,selection_optional=None,energy_edges=None,sum_over_energy_groups=True,n_jobs=None,parallel_backend=None,):ifkernel_widthisnotNone:kernel_width=Angle(kernel_width)self.kernel_width=kernel_widthifmodelisNone:model=SkyModel(spectral_model=PowerLawSpectralModel(),spatial_model=PointSpatialModel(),name="ts-kernel",)self.model=modelself.downsampling_factor=downsampling_factorself.n_sigma=n_sigmaself.n_sigma_ul=n_sigma_ulself.threshold=thresholdself.rtol=rtolself.n_jobs=n_jobsself.parallel_backend=parallel_backendself.sum_over_energy_groups=sum_over_energy_groupsself.selection_optional=selection_optionalself.energy_edges=energy_edgesself._flux_estimator=BrentqFluxEstimator(rtol=self.rtol,n_sigma=self.n_sigma,n_sigma_ul=self.n_sigma_ul,selection_optional=selection_optional,ts_threshold=threshold,)@propertydefselection_all(self):"""Which quantities are computed."""selection=["ts","norm","niter","norm_err","npred","npred_excess","stat","stat_null","success",]if"errn-errp"inself.selection_optional:selection+=["norm_errp","norm_errn"]if"ul"inself.selection_optional:selection+=["norm_ul"]returnselection
[docs]defestimate_kernel(self,dataset):"""Get the convolution kernel for the input dataset. Convolves the model with the IRFs at the center of the dataset, or at the nearest position with non-zero exposure. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input dataset. Returns ------- kernel : `~gammapy.maps.Map` Kernel map. """geom=dataset.exposure.geomifself.kernel_widthisnotNone:geom=geom.to_odd_npix(max_radius=self.kernel_width/2)model=self.model.copy()model.spatial_model.position=geom.center_skydir# Creating exposure map with the mean non-null exposureexposure=Map.from_geom(geom,unit=dataset.exposure.unit)position=get_nearest_valid_exposure_position(dataset.exposure,geom.center_skydir)exposure_position=dataset.exposure.to_region_nd_map(position)ifnotnp.any(exposure_position.data):raiseValueError("No valid exposure. Impossible to compute kernel for TS Map.")exposure.data[...]=exposure_position.data# We use global evaluation mode to not modify the geometryevaluator=MapEvaluator(model=model)evaluator.update(exposure=exposure,psf=dataset.psf,edisp=dataset.edisp,geom=dataset.counts.geom,mask=dataset.mask_image,)kernel=evaluator.compute_npred()kernel.data/=kernel.data.sum()returnkernel
[docs]defestimate_flux_default(self,dataset,kernel=None,exposure=None):"""Estimate default flux map using a given kernel. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input dataset. kernel : `~gammapy.maps.WcsNDMap` Source model kernel. exposure : `~gammapy.maps.WcsNDMap` Exposure map on reconstructed energy. Returns ------- flux : `~gammapy.maps.WcsNDMap` Approximate flux map. """ifexposureisNone:exposure=estimate_exposure_reco_energy(dataset,self.model.spectral_model)ifkernelisNone:kernel=self.estimate_kernel(dataset=dataset)kernel=kernel.data/np.sum(kernel.data**2)withnp.errstate(invalid="ignore",divide="ignore"):flux=(dataset.counts-dataset.npred())/exposureflux.data=np.nan_to_num(flux.data)flux.quantity=flux.quantity.to("1 / (cm2 s)")flux=flux.convolve(kernel)returnflux.sum_over_axes()
[docs]@staticmethoddefestimate_mask_default(dataset):"""Compute default mask where to estimate test statistic values. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input dataset. Returns ------- mask : `WcsNDMap` Mask map. """geom=dataset.counts.geom.to_image()mask=np.ones(geom.data_shape,dtype=bool)ifdataset.maskisnotNone:mask&=dataset.mask.reduce_over_axes(func=np.logical_or,keepdims=False)# in some image there are pixels, which have exposure, but zero# background, which doesn't make sense and causes the TS computation# to fail, this is a temporary fixbackground=dataset.npred().sum_over_axes(keepdims=False)mask[background.data==0]=FalsereturnMap.from_geom(data=mask,geom=geom)
[docs]defestimate_pad_width(self,dataset,kernel=None):"""Estimate pad width of the dataset. Parameters ---------- dataset : `MapDataset` Input MapDataset. kernel : `WcsNDMap` Source model kernel. Returns ------- pad_width : tuple Padding width. """ifkernelisNone:kernel=self.estimate_kernel(dataset=dataset)geom=dataset.counts.geom.to_image()geom_kernel=kernel.geom.to_image()pad_width=np.array(geom_kernel.data_shape)//2ifself.downsampling_factorandself.downsampling_factor>1:shape=tuple(np.array(geom.data_shape)+2*pad_width)pad_width=symmetric_crop_pad_width(geom.data_shape,shape_2N(shape))[0]returntuple(pad_width)
[docs]defestimate_fit_input_maps(self,dataset):"""Estimate fit input maps. Parameters ---------- dataset : `MapDataset` Map dataset. Returns ------- maps : dict of `Map` Maps dictionary. """# First create 2D map arrayscounts=dataset.countsbackground=dataset.npred()exposure=estimate_exposure_reco_energy(dataset,self.model.spectral_model)kernel=self.estimate_kernel(dataset)mask=self.estimate_mask_default(dataset=dataset)flux=self.estimate_flux_default(dataset=dataset,kernel=kernel,exposure=exposure)energy_axis=counts.geom.axes["energy"]flux_ref=self.model.spectral_model.integral(energy_axis.edges[0],energy_axis.edges[-1])exposure_npred=(exposure*flux_ref*mask.data).to_unit("")norm=(flux/flux_ref).to_unit("")return{"counts":counts,"background":background,"norm":norm,"mask":mask,"exposure":exposure_npred,"kernel":kernel,}
[docs]defestimate_flux_map(self,dataset):"""Estimate flux and test statistic maps for single dataset. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Map dataset. """maps=self.estimate_fit_input_maps(dataset=dataset)x,y=np.where(np.squeeze(maps["mask"].data))positions=list(zip(x,y))inputs=zip(positions,repeat(maps["counts"].data.astype(float)),repeat(maps["exposure"].data.astype(float)),repeat(maps["background"].data.astype(float)),repeat(maps["kernel"].data),repeat(maps["norm"].data),repeat(self._flux_estimator),)results=parallel.run_multiprocessing(_ts_value,inputs,pool_kwargs=dict(processes=self.n_jobs),task_name="TS map",)result={}j,i=zip(*positions)geom=maps["counts"].geom.squash(axis_name="energy")fornameinself.selection_all:m=Map.from_geom(geom=geom,data=np.nan,unit="")m.data[0,j,i]=[_[name]for_inresults]result[name]=mreturnresult
[docs]defrun(self,dataset):""" Run test statistic map estimation. Requires a MapDataset with counts, exposure and background_model properly set to run. Notes ----- The progress bar can be displayed for this function. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input MapDataset. Returns ------- maps : dict Dictionary containing result maps. Keys are: * ts : delta(TS) map * sqrt_ts : sqrt(delta(TS)), or significance map * flux : flux map * flux_err : symmetric error map * flux_ul : upper limit map. """ifdataset.stat_type!="cash":raiseTypeError(f"{type(dataset)} is not a valid type for {self.__class__}")dataset_models=dataset.modelspad_width=self.estimate_pad_width(dataset=dataset)dataset=dataset.pad(pad_width,name=dataset.name)dataset=dataset.downsample(self.downsampling_factor,name=dataset.name)energy_axis=self._get_energy_axis(dataset=dataset)results=[]forenergy_min,energy_maxinprogress_bar(energy_axis.iter_by_edges,desc="Energy bins"):dataset_sliced=dataset.slice_by_energy(energy_min=energy_min,energy_max=energy_max,name=dataset.name)ifself.sum_over_energy_groups:dataset_sliced=dataset_sliced.to_image(name=dataset.name)ifdataset_modelsisnotNone:models_sliced=dataset_models._slice_by_energy(energy_min=energy_min,energy_max=energy_max,sum_over_energy_groups=self.sum_over_energy_groups,)dataset_sliced.models=models_slicedresult=self.estimate_flux_map(dataset_sliced)results.append(result)maps=Maps()fornameinself.selection_all:m=Map.from_stack(maps=[_[name]for_inresults],axis_name="energy")order=0ifnamein["niter","success"]else1m=m.upsample(factor=self.downsampling_factor,preserve_counts=False,order=order)maps[name]=m.crop(crop_width=pad_width)maps["success"].data=maps["success"].data.astype(bool)meta={"n_sigma":self.n_sigma,"n_sigma_ul":self.n_sigma_ul}returnFluxMaps(data=maps,reference_model=self.model,gti=dataset.gti,meta=meta,)
# TODO: merge with MapDataset?classSimpleMapDataset:"""Simple map dataset. Parameters ---------- counts : `~numpy.ndarray` Counts array. background : `~numpy.ndarray` Background array. model : `~numpy.ndarray` Kernel array. """def__init__(self,model,counts,background,norm_guess):self.model=modelself.counts=countsself.background=backgroundself.norm_guess=norm_guess@lazypropertydefnorm_bounds(self):"""Bounds for x"""returnnorm_bounds_cython(self.counts.ravel(),self.background.ravel(),self.model.ravel())defnpred(self,norm):"""Predicted number of counts."""returnself.background+norm*self.modeldefstat_sum(self,norm):"""Statistics sum."""returncash_sum_cython(self.counts.ravel(),self.npred(norm).ravel())defstat_derivative(self,norm):"""Statistics derivative."""returnf_cash_root_cython(norm,self.counts.ravel(),self.background.ravel(),self.model.ravel())defstat_2nd_derivative(self,norm):"""Statistics 2nd derivative."""term_top=self.model**2*self.countsterm_bottom=(self.background+norm*self.model)**2mask=term_bottom==0return(term_top/term_bottom)[~mask].sum()@classmethoddeffrom_arrays(cls,counts,background,exposure,norm,position,kernel):""""""counts_cutout=_extract_array(counts,kernel.shape,position)background_cutout=_extract_array(background,kernel.shape,position)exposure_cutout=_extract_array(exposure,kernel.shape,position)norm_guess=norm[0,position[0],position[1]]returncls(counts=counts_cutout,background=background_cutout,model=kernel*exposure_cutout,norm_guess=norm_guess,)# TODO: merge with `FluxEstimator`?classBrentqFluxEstimator(Estimator):"""Single parameter flux estimator."""_available_selection_optional=["errn-errp","ul"]tag="BrentqFluxEstimator"def__init__(self,rtol,n_sigma,n_sigma_ul,selection_optional=None,max_niter=20,ts_threshold=None,):self.rtol=rtolself.n_sigma=n_sigmaself.n_sigma_ul=n_sigma_ulself.selection_optional=selection_optionalself.max_niter=max_niterself.ts_threshold=ts_thresholddefestimate_best_fit(self,dataset):"""Estimate best fit norm parameter. Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset. Returns ------- result : dict Result dictionary including 'norm' and 'norm_err'. """# Compute norm bounds and assert counts > 0norm_min,norm_max,norm_min_total=dataset.norm_boundsifnotdataset.counts.sum()>0:norm,niter,success=norm_min_total,0,Trueelse:withwarnings.catch_warnings():warnings.simplefilter("ignore")try:# here we do not use avoid find_roots for performanceresult_fit=scipy.optimize.brentq(f=dataset.stat_derivative,a=norm_min,b=norm_max,maxiter=self.max_niter,full_output=True,rtol=self.rtol,)norm=max(result_fit[0],norm_min_total)niter=result_fit[1].iterationssuccess=result_fit[1].convergedexcept(RuntimeError,ValueError):norm,niter,success=norm_min_total,self.max_niter,Falsewithnp.errstate(invalid="ignore",divide="ignore"):norm_err=np.sqrt(1/dataset.stat_2nd_derivative(norm))*self.n_sigmastat=dataset.stat_sum(norm=norm)stat_null=dataset.stat_sum(norm=0)return{"norm":norm,"norm_err":norm_err,"niter":niter,"ts":stat_null-stat,"stat":stat,"stat_null":stat_null,"success":success,}def_confidence(self,dataset,n_sigma,result,positive):stat_best=result["stat"]norm=result["norm"]norm_err=result["norm_err"]defts_diff(x):return(stat_best+n_sigma**2)-dataset.stat_sum(x)ifpositive:min_norm=normmax_norm=norm+1e2*norm_errfactor=1else:min_norm=norm-1e2*norm_errmax_norm=normfactor=-1withwarnings.catch_warnings():warnings.simplefilter("ignore")roots,res=find_roots(ts_diff,[min_norm],[max_norm],nbin=1,maxiter=self.max_niter,rtol=self.rtol,)# Where the root finding fails NaN is set as normreturn(roots[0]-norm)*factordefestimate_ul(self,dataset,result):"""Compute upper limit using likelihood profile method. Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset. Returns ------- result : dict Result dictionary including 'norm_ul'. """flux_ul=result["norm"]+self._confidence(dataset=dataset,n_sigma=self.n_sigma_ul,result=result,positive=True)return{"norm_ul":flux_ul}defestimate_errn_errp(self,dataset,result):"""Compute norm errors using likelihood profile method. Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset. Returns ------- result : dict Result dictionary including 'norm_errp' and 'norm_errn'. """flux_errn=self._confidence(dataset=dataset,result=result,n_sigma=self.n_sigma,positive=False)flux_errp=self._confidence(dataset=dataset,result=result,n_sigma=self.n_sigma,positive=True)return{"norm_errn":flux_errn,"norm_errp":flux_errp}defestimate_default(self,dataset):"""Estimate default norm. Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset. Returns ------- result : dict Result dictionary including 'norm', 'norm_err' and "niter". """norm=dataset.norm_guesswithnp.errstate(invalid="ignore",divide="ignore"):norm_err=np.sqrt(1/dataset.stat_2nd_derivative(norm))*self.n_sigmastat=dataset.stat_sum(norm=norm)stat_null=dataset.stat_sum(norm=0)return{"norm":norm,"norm_err":norm_err,"niter":0,"ts":stat_null-stat,"stat":stat,"stat_null":stat_null,"success":True,}defrun(self,dataset):"""Run flux estimator. Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset. Returns ------- result : dict Result dictionary. """ifself.ts_thresholdisnotNone:result=self.estimate_default(dataset)ifresult["ts"]>self.ts_threshold:result=self.estimate_best_fit(dataset)else:result=self.estimate_best_fit(dataset)norm=result["norm"]result["npred"]=dataset.npred(norm=norm).sum()result["npred_excess"]=result["npred"]-dataset.npred(norm=0).sum()if"ul"inself.selection_optional:result.update(self.estimate_ul(dataset,result))if"errn-errp"inself.selection_optional:result.update(self.estimate_errn_errp(dataset,result))returnresultdef_ts_value(position,counts,exposure,background,kernel,norm,flux_estimator):"""Compute test statistic value at a given pixel position. Uses approach described in Stewart (2009). Parameters ---------- position : tuple (i, j) Pixel position. counts : `~numpy.ndarray` Counts image. background : `~numpy.ndarray` Background image. exposure : `~numpy.ndarray` Exposure image. kernel : `astropy.convolution.Kernel2D` Source model kernel. norm : `~numpy.ndarray` Norm image. The flux value at the given pixel position is used as starting value for the minimization. Returns ------- TS : float Test statistic value at the given pixel position. """dataset=SimpleMapDataset.from_arrays(counts=counts,background=background,exposure=exposure,kernel=kernel,position=position,norm=norm,)returnflux_estimator.run(dataset)