# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Implementation of adaptive smoothing algorithms."""importnumpyasnpfromastropy.convolutionimportGaussian2DKernel,Tophat2DKernelfromastropy.coordinatesimportAnglefromgammapy.datasetsimportMapDatasetOnOfffromgammapy.mapsimportMap,Maps,WcsNDMapfromgammapy.modeling.modelsimportPowerLawSpectralModelfromgammapy.statsimportCashCountsStatisticfromgammapy.utils.arrayimportscale_cubefromgammapy.utils.pbarimportprogress_barfrom..coreimportEstimatorfrom..utilsimportestimate_exposure_reco_energyfromgammapy.utils.deprecationimportdeprecated_renamed_argument__all__=["ASmoothMapEstimator"]def_sqrt_ts_asmooth(counts,background):"""Significance according to formula (5) in asmooth paper."""return(counts-background)/np.sqrt(counts+background)
[docs]classASmoothMapEstimator(Estimator):"""Adaptively smooth counts image. Achieves a roughly constant sqrt(TS) of features across the whole image. Algorithm based on [1]_. The algorithm was slightly adapted to also allow Li & Ma to estimate the sqrt(TS) of a feature in the image. Parameters ---------- scales : `~astropy.units.Quantity` Smoothing scales. kernel : `astropy.convolution.Kernel` Smoothing kernel. spectral_model : `~gammapy.modeling.models.SpectralModel`, optional Spectral model assumption. Default is power-law with spectral index of 2. method : {'lima', 'asmooth'} Significance estimation method. Default is 'lima'. threshold : float Significance threshold. Default is 5. energy_edges : list of `~astropy.units.Quantity`, optional Edges of the target maps energy bins. The resulting bin edges won't be exactly equal to the input ones, but rather the closest values to the energy axis edges of the parent dataset. Default is None: apply the estimator in each energy bin of the parent dataset. For further explanation see :ref:`estimators`. Examples -------- >>> import astropy.units as u >>> import numpy as np >>> from gammapy.estimators import ASmoothMapEstimator >>> from gammapy.datasets import MapDataset >>> dataset = MapDataset.read("$GAMMAPY_DATA/cta-1dc-gc/cta-1dc-gc.fits.gz") >>> scales = u.Quantity(np.arange(0.1, 1, 0.1), unit="deg") >>> smooth = ASmoothMapEstimator(threshold=3, scales=scales, energy_edges=[1, 10] * u.TeV) >>> images = smooth.run(dataset) References ---------- .. [1] `Ebeling et al. (2006), "ASMOOTH: a simple and efficient algorithm for adaptive kernel smoothing of two-dimensional imaging data" <https://ui.adsabs.harvard.edu/abs/2006MNRAS.368…65E>`_ """tag="ASmoothMapEstimator"@deprecated_renamed_argument("spectrum","spectral_model","v1.3")def__init__(self,scales=None,kernel=Gaussian2DKernel,spectral_model=None,method="lima",threshold=5,energy_edges=None,):ifspectral_modelisNone:spectral_model=PowerLawSpectralModel(index=2)self.spectral_model=spectral_modelifscalesisNone:scales=self.get_scales(n_scales=9,kernel=kernel)self.scales=scalesself.kernel=kernelself.threshold=thresholdself.method=methodself.energy_edges=energy_edges
[docs]defselection_all(self):"""Which quantities are computed."""return
[docs]@staticmethoddefget_scales(n_scales,factor=np.sqrt(2),kernel=Gaussian2DKernel):"""Create list of Gaussian widths. Parameters ---------- n_scales : int Number of scales. factor : float Incremental factor. Returns ------- scales : `~numpy.ndarray` Scale array. """ifkernel==Gaussian2DKernel:sigma_0=1.0/np.sqrt(9*np.pi)elifkernel==Tophat2DKernel:sigma_0=1.0/np.sqrt(np.pi)returnsigma_0*factor**np.arange(n_scales)
[docs]defget_kernels(self,pixel_scale):"""Get kernels according to the specified method. Parameters ---------- pixel_scale : `~astropy.coordinates.Angle` Sky image pixel scale. Returns ------- kernels : list List of `~astropy.convolution.Kernel`. """scales=self.scales.to_value("deg")/Angle(pixel_scale).degkernels=[]forscaleinscales:# .value:kernel=self.kernel(scale,mode="oversample")# TODO: check if normalizing here makes sensekernel.normalize("peak")kernels.append(kernel)returnkernels
@staticmethoddef_sqrt_ts_cube(cubes,method):ifmethodin{"lima"}:scube=CashCountsStatistic(cubes["counts"],cubes["background"]).sqrt_tselifmethod=="asmooth":scube=_sqrt_ts_asmooth(cubes["counts"],cubes["background"])elifmethod=="ts":raiseNotImplementedError()else:raiseValueError("Not a valid sqrt_ts estimation method."" Choose one of the following: 'lima' or 'asmooth'")returnscube
[docs]defrun(self,dataset):"""Run adaptive smoothing on input MapDataset. Notes ----- The progress bar can be displayed for this function. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` or `~gammapy.datasets.MapDatasetOnOff` The input dataset (with one bin in energy at most). Returns ------- images : dict of `~gammapy.maps.WcsNDMap` Smoothed images; keys are: * 'counts' * 'background' * 'flux' (optional) * 'scales' * 'sqrt_ts'. """energy_axis=self._get_energy_axis(dataset)results=[]forenergy_min,energy_maxinprogress_bar(energy_axis.iter_by_edges,desc="Energy bins"):dataset_sliced=dataset.slice_by_energy(energy_min=energy_min,energy_max=energy_max,name=dataset.name)ifdataset.modelsisnotNone:models_sliced=dataset.models._slice_by_energy(energy_min=energy_min,energy_max=energy_max,)dataset_sliced.models=models_slicedresult=self.estimate_maps(dataset_sliced)results.append(result)maps=Maps()fornameinresults[0].keys():maps[name]=Map.from_stack(maps=[_[name]for_inresults],axis_name="energy")returnmaps
[docs]defestimate_maps(self,dataset):"""Run adaptive smoothing on input Maps. Parameters ---------- dataset : `MapDataset` Dataset. Returns ------- images : dict of `~gammapy.maps.WcsNDMap` Smoothed images; keys are: * 'counts' * 'background' * 'flux' (optional) * 'scales' * 'sqrt_ts'. """dataset_image=dataset.to_image(name=dataset.name)dataset_image.models=dataset.models# extract 2d arrayscounts=dataset_image.counts.data[0].astype(float)background=dataset_image.npred_background().data[0]ifisinstance(dataset_image,MapDatasetOnOff):background=dataset_image.background.data[0]ifdataset_image.exposureisnotNone:exposure=estimate_exposure_reco_energy(dataset_image,self.spectral_model)else:exposure=Nonepixel_scale=dataset_image.counts.geom.pixel_scales.mean()kernels=self.get_kernels(pixel_scale)cubes={}cubes["counts"]=scale_cube(counts,kernels)cubes["background"]=scale_cube(background,kernels)ifexposureisnotNone:flux=(dataset_image.counts-background)/exposurecubes["flux"]=scale_cube(flux.data[0],kernels)cubes["sqrt_ts"]=self._sqrt_ts_cube(cubes,method=self.method)smoothed=self._reduce_cubes(cubes,kernels)result={}geom=dataset_image.counts.geomforname,datainsmoothed.items():# set remaining pixels with sqrt_ts < threshold to mean valueifnamein["counts","background"]:mask=np.isnan(data)data[mask]=np.mean(locals()[name][mask])result[name]=WcsNDMap(geom,data,unit="")else:unit="deg"ifname=="scale"else""result[name]=WcsNDMap(geom,data,unit=unit)ifexposureisnotNone:data=smoothed["flux"]mask=np.isnan(data)data[mask]=np.mean(flux.data[0][mask])result["flux"]=WcsNDMap(geom,data,unit=flux.unit)returnresult
def_reduce_cubes(self,cubes,kernels):""" Combine scale cube to image. Parameters ---------- cubes : dict Data cubes. """shape=cubes["counts"].shape[:2]smoothed={}# Init smoothed data arraysforkeyin["counts","background","scale","sqrt_ts"]:smoothed[key]=np.tile(np.nan,shape)if"flux"incubes:smoothed["flux"]=np.tile(np.nan,shape)foridx,scaleinenumerate(self.scales):# slice out 2D image at index idx out of cubeslice_=np.s_[:,:,idx]mask=np.isnan(smoothed["counts"])mask=(cubes["sqrt_ts"][slice_]>self.threshold)&masksmoothed["scale"][mask]=scalesmoothed["sqrt_ts"][mask]=cubes["sqrt_ts"][slice_][mask]# renormalize smoothed data arraysnorm=kernels[idx].array.sum()forkeyin["counts","background"]:smoothed[key][mask]=cubes[key][slice_][mask]/normif"flux"incubes:smoothed["flux"][mask]=cubes["flux"][slice_][mask]/normreturnsmoothed