# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Functions to compute TS images."""importwarningsfromitertoolsimportrepeatimportnumpyasnpimportscipy.optimizefromastropy.coordinatesimportAnglefromastropy.utilsimportlazypropertyimportgammapy.utils.parallelasparallelfromgammapy.datasets.mapimportMapEvaluatorfromgammapy.datasets.utilsimportget_nearest_valid_exposure_positionfromgammapy.mapsimportMap,Mapsfromgammapy.modeling.modelsimportPointSpatialModel,PowerLawSpectralModel,SkyModelfromgammapy.statsimportcash_sum_cython,f_cash_root_cython,norm_bounds_cythonfromgammapy.utils.arrayimportshape_2N,symmetric_crop_pad_widthfromgammapy.utils.pbarimportprogress_barfromgammapy.utils.rootsimportfind_rootsfrom..coreimportEstimatorfrom..utilsimportestimate_exposure_reco_energyfrom.coreimportFluxMaps__all__=["TSMapEstimator"]def_extract_array(array,shape,position):"""Helper function to extract parts of a larger array. Simple implementation of an array extract function , because `~astropy.ndata.utils.extract_array` introduces too much overhead.` Parameters ---------- array : `~numpy.ndarray` The array from which to extract. shape : tuple or int The shape of the extracted array. position : tuple of numbers or number The position of the small array's center with respect to the large array. """x_width=shape[2]//2y_width=shape[1]//2y_lo=position[0]-y_widthy_hi=position[0]+y_width+1x_lo=position[1]-x_widthx_hi=position[1]+x_width+1returnarray[:,y_lo:y_hi,x_lo:x_hi]
[docs]classTSMapEstimator(Estimator,parallel.ParallelMixin):r"""Compute TS map from a MapDataset using different optimization methods. The map is computed fitting by a single parameter norm fit. The fit is simplified by finding roots of the derivative of the fit statistics using various root finding algorithms. The approach is described in Appendix A in Stewart (2009). Parameters ---------- model : `~gammapy.modeling.model.SkyModel` Source model kernel. If set to None, assume spatail model: point source model, PointSpatialModel. spectral model: PowerLawSpectral Model of index 2 kernel_width : `~astropy.coordinates.Angle` Width of the kernel to use: the kernel will be truncated at this size n_sigma : int Number of sigma for flux error. Default is 1. n_sigma_ul : int Number of sigma for flux upper limits. Default is 2. downsampling_factor : int Sample down the input maps to speed up the computation. Only integer values that are a multiple of 2 are allowed. Note that the kernel is not sampled down, but must be provided with the downsampled bin size. threshold : float (None) If the TS value corresponding to the initial flux estimate is not above this threshold, the optimizing step is omitted to save computing time. rtol : float (0.01) Relative precision of the flux estimate. Used as a stopping criterion for the norm fit. selection_optional : list of str Which maps to compute besides TS, sqrt(TS), flux and symmetric error on flux. Available options are: * "all": all the optional steps are executed * "errn-errp": estimate asymmetric error on flux. * "ul": estimate upper limits on flux. Default is None so the optional steps are not executed. energy_edges : `~astropy.units.Quantity` Energy edges of the maps bins. sum_over_energy_groups : bool Whether to sum over the energy groups or fit the norm on the full energy cube. n_jobs : int Number of processes used in parallel for the computation. Default is one, unless `~gammapy.utils.parallel.N_JOBS_DEFAULT` was modified. The number of jobs limited to the number of physical CPUs. parallel_backend : {"multiprocessing", "ray"} Which backend to use for multiprocessing. Defaults to `~gammapy.utils.parallel.BACKEND_DEFAULT`. Notes ----- Negative :math:`TS` values are defined as following: .. math:: TS = \left \{ \begin{array}{ll} -TS \text{ if } F < 0 \\ TS \text{ else} \end{array} \right. Where :math:`F` is the fitted flux norm. Examples -------- >>> import astropy.units as u >>> from gammapy.estimators import TSMapEstimator >>> from gammapy.datasets import MapDataset >>> from gammapy.modeling.models import (SkyModel, PowerLawSpectralModel,PointSpatialModel) >>> spatial_model = PointSpatialModel() >>> spectral_model = PowerLawSpectralModel(amplitude="1e-22 cm-2 s-1 keV-1", index=2) >>> model = SkyModel(spatial_model=spatial_model, spectral_model=spectral_model) >>> dataset = MapDataset.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc.fits.gz") >>> estimator = TSMapEstimator( model, kernel_width="1 deg",energy_edges=[10, 100] * u.GeV, downsampling_factor=4 ) >>> maps = estimator.run(dataset) >>> print(maps) FluxMaps -------- <BLANKLINE> geom : WcsGeom axes : ['lon', 'lat', 'energy'] shape : (400, 200, 1) quantities : ['ts', 'norm', 'niter', 'norm_err', 'npred', 'npred_excess', 'stat', 'stat_null', 'success'] # noqa: E501 ref. model : pl n_sigma : 1 n_sigma_ul : 2 sqrt_ts_threshold_ul : 2 sed type init : likelihood References ---------- [Stewart2009]_ """tag="TSMapEstimator"_available_selection_optional=["errn-errp","ul"]def__init__(self,model=None,kernel_width=None,downsampling_factor=None,n_sigma=1,n_sigma_ul=2,threshold=None,rtol=0.01,selection_optional=None,energy_edges=None,sum_over_energy_groups=True,n_jobs=None,parallel_backend=None,):ifkernel_widthisnotNone:kernel_width=Angle(kernel_width)self.kernel_width=kernel_widthifmodelisNone:model=SkyModel(spectral_model=PowerLawSpectralModel(),spatial_model=PointSpatialModel(),name="ts-kernel",)self.model=modelself.downsampling_factor=downsampling_factorself.n_sigma=n_sigmaself.n_sigma_ul=n_sigma_ulself.threshold=thresholdself.rtol=rtolself.n_jobs=n_jobsself.parallel_backend=parallel_backendself.sum_over_energy_groups=sum_over_energy_groupsself.selection_optional=selection_optionalself.energy_edges=energy_edgesself._flux_estimator=BrentqFluxEstimator(rtol=self.rtol,n_sigma=self.n_sigma,n_sigma_ul=self.n_sigma_ul,selection_optional=selection_optional,ts_threshold=threshold,)@propertydefselection_all(self):"""Which quantities are computed"""selection=["ts","norm","niter","norm_err","npred","npred_excess","stat","stat_null","success",]if"errn-errp"inself.selection_optional:selection+=["norm_errp","norm_errn"]if"ul"inself.selection_optional:selection+=["norm_ul"]returnselection
[docs]defestimate_kernel(self,dataset):"""Get the convolution kernel for the input dataset. Convolves the model with the IRFs at the center of the dataset, or at the nearest position with non-zero exposure. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input dataset. Returns ------- kernel : `Map` Kernel map """geom=dataset.exposure.geomifself.kernel_widthisnotNone:geom=geom.to_odd_npix(max_radius=self.kernel_width/2)model=self.model.copy()model.spatial_model.position=geom.center_skydir# Creating exposure map with the mean non-null exposureexposure=Map.from_geom(geom,unit=dataset.exposure.unit)position=get_nearest_valid_exposure_position(dataset.exposure,geom.center_skydir)exposure_position=dataset.exposure.to_region_nd_map(position)ifnotnp.any(exposure_position.data):raiseValueError("No valid exposure. Impossible to compute kernel for TS Map.")exposure.data[...]=exposure_position.data# We use global evaluation mode to not modify the geometryevaluator=MapEvaluator(model=model)evaluator.update(exposure=exposure,psf=dataset.psf,edisp=dataset.edisp,geom=dataset.counts.geom,mask=dataset.mask_image,)kernel=evaluator.compute_npred()kernel.data/=kernel.data.sum()returnkernel
[docs]defestimate_flux_default(self,dataset,kernel=None,exposure=None):"""Estimate default flux map using a given kernel. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input dataset. kernel : `~gammapy.maps.WcsNDMap` Source model kernel. exposure : `~gammapy.maps.WcsNDMap` Exposure map on reconstructed energy. Returns ------- flux : `~gammapy.maps.WcsNDMap` Approximate flux map. """ifexposureisNone:exposure=estimate_exposure_reco_energy(dataset,self.model.spectral_model)ifkernelisNone:kernel=self.estimate_kernel(dataset=dataset)kernel=kernel.data/np.sum(kernel.data**2)withnp.errstate(invalid="ignore",divide="ignore"):flux=(dataset.counts-dataset.npred())/exposureflux.data=np.nan_to_num(flux.data)flux.quantity=flux.quantity.to("1 / (cm2 s)")flux=flux.convolve(kernel)returnflux.sum_over_axes()
[docs]@staticmethoddefestimate_mask_default(dataset):"""Compute default mask where to estimate TS values. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input dataset. Returns ------- mask : `WcsNDMap` Mask map. """geom=dataset.counts.geom.to_image()mask=np.ones(geom.data_shape,dtype=bool)ifdataset.maskisnotNone:mask&=dataset.mask.reduce_over_axes(func=np.logical_or,keepdims=False)# in some image there are pixels, which have exposure, but zero# background, which doesn't make sense and causes the TS computation# to fail, this is a temporary fixbackground=dataset.npred().sum_over_axes(keepdims=False)mask[background.data==0]=FalsereturnMap.from_geom(data=mask,geom=geom)
[docs]defestimate_pad_width(self,dataset,kernel=None):"""Estimate pad width of the dataset Parameters ---------- dataset : `MapDataset` Input MapDataset. kernel : `WcsNDMap` Source model kernel. Returns ------- pad_width : tuple Padding width """ifkernelisNone:kernel=self.estimate_kernel(dataset=dataset)geom=dataset.counts.geom.to_image()geom_kernel=kernel.geom.to_image()pad_width=np.array(geom_kernel.data_shape)//2ifself.downsampling_factorandself.downsampling_factor>1:shape=tuple(np.array(geom.data_shape)+2*pad_width)pad_width=symmetric_crop_pad_width(geom.data_shape,shape_2N(shape))[0]returntuple(pad_width)
[docs]defestimate_fit_input_maps(self,dataset):"""Estimate fit input maps Parameters ---------- dataset : `MapDataset` Map dataset Returns ------- maps : dict of `Map` Maps dict """# First create 2D map arrayscounts=dataset.countsbackground=dataset.npred()exposure=estimate_exposure_reco_energy(dataset,self.model.spectral_model)kernel=self.estimate_kernel(dataset)mask=self.estimate_mask_default(dataset=dataset)flux=self.estimate_flux_default(dataset=dataset,kernel=kernel,exposure=exposure)energy_axis=counts.geom.axes["energy"]flux_ref=self.model.spectral_model.integral(energy_axis.edges[0],energy_axis.edges[-1])exposure_npred=(exposure*flux_ref*mask.data).to_unit("")norm=(flux/flux_ref).to_unit("")return{"counts":counts,"background":background,"norm":norm,"mask":mask,"exposure":exposure_npred,"kernel":kernel,}
[docs]defestimate_flux_map(self,dataset):"""Estimate flux and ts maps for single dataset Parameters ---------- dataset : `MapDataset` Map dataset """maps=self.estimate_fit_input_maps(dataset=dataset)x,y=np.where(np.squeeze(maps["mask"].data))positions=list(zip(x,y))inputs=zip(positions,repeat(maps["counts"].data.astype(float)),repeat(maps["exposure"].data.astype(float)),repeat(maps["background"].data.astype(float)),repeat(maps["kernel"].data),repeat(maps["norm"].data),repeat(self._flux_estimator),)results=parallel.run_multiprocessing(_ts_value,inputs,pool_kwargs=dict(processes=self.n_jobs),task_name="TS map",)result={}j,i=zip(*positions)geom=maps["counts"].geom.squash(axis_name="energy")fornameinself.selection_all:m=Map.from_geom(geom=geom,data=np.nan,unit="")m.data[0,j,i]=[_[name]for_inresults]result[name]=mreturnresult
[docs]defrun(self,dataset):""" Run TS map estimation. Requires a MapDataset with counts, exposure and background_model properly set to run. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input MapDataset. Returns ------- maps : dict Dictionary containing result maps. Keys are: * ts : delta TS map * sqrt_ts : sqrt(delta TS), or significance map * flux : flux map * flux_err : symmetric error map * flux_ul : upper limit map """ifdataset.stat_type!="cash":raiseTypeError(f"{type(dataset)} is not a valid type for {self.__class__}")dataset_models=dataset.modelspad_width=self.estimate_pad_width(dataset=dataset)dataset=dataset.pad(pad_width,name=dataset.name)dataset=dataset.downsample(self.downsampling_factor,name=dataset.name)energy_axis=self._get_energy_axis(dataset=dataset)results=[]forenergy_min,energy_maxinprogress_bar(energy_axis.iter_by_edges,desc="Energy bins"):sliced_dataset=dataset.slice_by_energy(energy_min=energy_min,energy_max=energy_max,name=dataset.name)ifself.sum_over_energy_groups:sliced_dataset=sliced_dataset.to_image(name=dataset.name)sliced_dataset.models=dataset_modelsresult=self.estimate_flux_map(sliced_dataset)results.append(result)maps=Maps()fornameinself.selection_all:m=Map.from_stack(maps=[_[name]for_inresults],axis_name="energy")order=0ifnamein["niter","success"]else1m=m.upsample(factor=self.downsampling_factor,preserve_counts=False,order=order)maps[name]=m.crop(crop_width=pad_width)maps["success"].data=maps["success"].data.astype(bool)meta={"n_sigma":self.n_sigma,"n_sigma_ul":self.n_sigma_ul}returnFluxMaps(data=maps,reference_model=self.model,gti=dataset.gti,meta=meta,)
# TODO: merge with MapDataset?classSimpleMapDataset:"""Simple map dataset Parameters ---------- counts : `~numpy.ndarray` Counts array background : `~numpy.ndarray` Background array model : `~numpy.ndarray` Kernel array """def__init__(self,model,counts,background,norm_guess):self.model=modelself.counts=countsself.background=backgroundself.norm_guess=norm_guess@lazypropertydefnorm_bounds(self):"""Bounds for x"""returnnorm_bounds_cython(self.counts.ravel(),self.background.ravel(),self.model.ravel())defnpred(self,norm):"""Predicted number of counts"""returnself.background+norm*self.modeldefstat_sum(self,norm):"""Stat sum"""returncash_sum_cython(self.counts.ravel(),self.npred(norm).ravel())defstat_derivative(self,norm):"""Stat derivative"""returnf_cash_root_cython(norm,self.counts.ravel(),self.background.ravel(),self.model.ravel())defstat_2nd_derivative(self,norm):"""Stat 2nd derivative"""term_top=self.model**2*self.countsterm_bottom=(self.background+norm*self.model)**2mask=term_bottom==0return(term_top/term_bottom)[~mask].sum()@classmethoddeffrom_arrays(cls,counts,background,exposure,norm,position,kernel):""""""counts_cutout=_extract_array(counts,kernel.shape,position)background_cutout=_extract_array(background,kernel.shape,position)exposure_cutout=_extract_array(exposure,kernel.shape,position)norm_guess=norm[0,position[0],position[1]]returncls(counts=counts_cutout,background=background_cutout,model=kernel*exposure_cutout,norm_guess=norm_guess,)# TODO: merge with `FluxEstimator`?classBrentqFluxEstimator(Estimator):"""Single parameter flux estimator"""_available_selection_optional=["errn-errp","ul"]tag="BrentqFluxEstimator"def__init__(self,rtol,n_sigma,n_sigma_ul,selection_optional=None,max_niter=20,ts_threshold=None,):self.rtol=rtolself.n_sigma=n_sigmaself.n_sigma_ul=n_sigma_ulself.selection_optional=selection_optionalself.max_niter=max_niterself.ts_threshold=ts_thresholddefestimate_best_fit(self,dataset):"""Estimate best fit norm parameter Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset Returns ------- result : dict Result dict including 'norm' and 'norm_err' """# Compute norm bounds and assert counts > 0norm_min,norm_max,norm_min_total=dataset.norm_boundsifnotdataset.counts.sum()>0:norm,niter,success=norm_min_total,0,Trueelse:withwarnings.catch_warnings():warnings.simplefilter("ignore")try:# here we do not use avoid find_roots for performanceresult_fit=scipy.optimize.brentq(f=dataset.stat_derivative,a=norm_min,b=norm_max,maxiter=self.max_niter,full_output=True,rtol=self.rtol,)norm=max(result_fit[0],norm_min_total)niter=result_fit[1].iterationssuccess=result_fit[1].convergedexcept(RuntimeError,ValueError):norm,niter,success=norm_min_total,self.max_niter,Falsewithnp.errstate(invalid="ignore",divide="ignore"):norm_err=np.sqrt(1/dataset.stat_2nd_derivative(norm))*self.n_sigmastat=dataset.stat_sum(norm=norm)stat_null=dataset.stat_sum(norm=0)return{"norm":norm,"norm_err":norm_err,"niter":niter,"ts":stat_null-stat,"stat":stat,"stat_null":stat_null,"success":success,}def_confidence(self,dataset,n_sigma,result,positive):stat_best=result["stat"]norm=result["norm"]norm_err=result["norm_err"]defts_diff(x):return(stat_best+n_sigma**2)-dataset.stat_sum(x)ifpositive:min_norm=normmax_norm=norm+1e2*norm_errfactor=1else:min_norm=norm-1e2*norm_errmax_norm=normfactor=-1withwarnings.catch_warnings():warnings.simplefilter("ignore")roots,res=find_roots(ts_diff,[min_norm],[max_norm],nbin=1,maxiter=self.max_niter,rtol=self.rtol,)# Where the root finding fails NaN is set as normreturn(roots[0]-norm)*factordefestimate_ul(self,dataset,result):"""Compute upper limit using likelihood profile method. Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset Returns ------- result : dict Result dict including 'norm_ul' """flux_ul=result["norm"]+self._confidence(dataset=dataset,n_sigma=self.n_sigma_ul,result=result,positive=True)return{"norm_ul":flux_ul}defestimate_errn_errp(self,dataset,result):"""Compute norm errors using likelihood profile method. Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset Returns ------- result : dict Result dict including 'norm_errp' and 'norm_errn' """flux_errn=self._confidence(dataset=dataset,result=result,n_sigma=self.n_sigma,positive=False)flux_errp=self._confidence(dataset=dataset,result=result,n_sigma=self.n_sigma,positive=True)return{"norm_errn":flux_errn,"norm_errp":flux_errp}defestimate_default(self,dataset):"""Estimate default norm Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset Returns ------- result : dict Result dict including 'norm', 'norm_err' and "niter" """norm=dataset.norm_guesswithnp.errstate(invalid="ignore",divide="ignore"):norm_err=np.sqrt(1/dataset.stat_2nd_derivative(norm))*self.n_sigmastat=dataset.stat_sum(norm=norm)stat_null=dataset.stat_sum(norm=0)return{"norm":norm,"norm_err":norm_err,"niter":0,"ts":stat_null-stat,"stat":stat,"stat_null":stat_null,"success":True,}defrun(self,dataset):"""Run flux estimator Parameters ---------- dataset : `SimpleMapDataset` Simple map dataset Returns ------- result : dict Result dict """ifself.ts_thresholdisnotNone:result=self.estimate_default(dataset)ifresult["ts"]>self.ts_threshold:result=self.estimate_best_fit(dataset)else:result=self.estimate_best_fit(dataset)norm=result["norm"]result["npred"]=dataset.npred(norm=norm).sum()result["npred_excess"]=result["npred"]-dataset.npred(norm=0).sum()if"ul"inself.selection_optional:result.update(self.estimate_ul(dataset,result))if"errn-errp"inself.selection_optional:result.update(self.estimate_errn_errp(dataset,result))returnresultdef_ts_value(position,counts,exposure,background,kernel,norm,flux_estimator):"""Compute TS value at a given pixel position. Uses approach described in Stewart (2009). Parameters ---------- position : tuple (i, j) Pixel position. counts : `~numpy.ndarray` Counts image background : `~numpy.ndarray` Background image exposure : `~numpy.ndarray` Exposure image kernel : `astropy.convolution.Kernel2D` Source model kernel norm : `~numpy.ndarray` Norm image. The flux value at the given pixel position is used as starting value for the minimization. Returns ------- TS : float TS value at the given pixel position. """dataset=SimpleMapDataset.from_arrays(counts=counts,background=background,exposure=exposure,kernel=kernel,position=position,norm=norm,)returnflux_estimator.run(dataset)