# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Implementation of adaptive smoothing algorithms."""importnumpyasnpfromastropy.convolutionimportGaussian2DKernel,Tophat2DKernelfromastropy.coordinatesimportAnglefromgammapy.datasetsimportMapDatasetOnOfffromgammapy.mapsimportMap,Maps,WcsNDMapfromgammapy.modeling.modelsimportPowerLawSpectralModelfromgammapy.statsimportCashCountsStatisticfromgammapy.utils.arrayimportscale_cubefromgammapy.utils.pbarimportprogress_barfrom..coreimportEstimatorfrom..utilsimportestimate_exposure_reco_energy__all__=["ASmoothMapEstimator"]def_sqrt_ts_asmooth(counts,background):"""Significance according to formula (5) in asmooth paper."""return(counts-background)/np.sqrt(counts+background)
[docs]classASmoothMapEstimator(Estimator):"""Adaptively smooth counts image. Achieves a roughly constant sqrt_ts of features across the whole image. Algorithm based on https://ui.adsabs.harvard.edu/abs/2006MNRAS.368...65E The algorithm was slightly adapted to also allow Li & Ma to estimate the sqrt_ts of a feature in the image. Parameters ---------- scales : `~astropy.units.Quantity` Smoothing scales. kernel : `astropy.convolution.Kernel` Smoothing kernel. spectrum : `SpectralModel` Spectral model assumption method : {'asmooth', 'lima'} Significance estimation method. threshold : float Significance threshold. Examples -------- >>> import astropy.units as u >>> import numpy as np >>> from gammapy.estimators import ASmoothMapEstimator >>> from gammapy.datasets import MapDataset >>> dataset = MapDataset.read("$GAMMAPY_DATA/cta-1dc-gc/cta-1dc-gc.fits.gz") >>> scales = u.Quantity(np.arange(0.1, 1, 0.1), unit="deg") >>> smooth = ASmoothMapEstimator(threshold=3, scales=scales, energy_edges=[1, 10] * u.TeV) >>> images = smooth.run(dataset) """tag="ASmoothMapEstimator"def__init__(self,scales=None,kernel=Gaussian2DKernel,spectrum=None,method="lima",threshold=5,energy_edges=None,):ifspectrumisNone:spectrum=PowerLawSpectralModel()self.spectrum=spectrumifscalesisNone:scales=self.get_scales(n_scales=9,kernel=kernel)self.scales=scalesself.kernel=kernelself.threshold=thresholdself.method=methodself.energy_edges=energy_edges
[docs]defselection_all(self):"""Which quantities are computed"""return
[docs]@staticmethoddefget_scales(n_scales,factor=np.sqrt(2),kernel=Gaussian2DKernel):"""Create list of Gaussian widths. Parameters ---------- n_scales : int Number of scales factor : float Incremental factor Returns ------- scales : `~numpy.ndarray` Scale array """ifkernel==Gaussian2DKernel:sigma_0=1.0/np.sqrt(9*np.pi)elifkernel==Tophat2DKernel:sigma_0=1.0/np.sqrt(np.pi)returnsigma_0*factor**np.arange(n_scales)
[docs]defget_kernels(self,pixel_scale):"""Get kernels according to the specified method. Parameters ---------- pixel_scale : `~astropy.coordinates.Angle` Sky image pixel scale Returns ------- kernels : list List of `~astropy.convolution.Kernel` """scales=self.scales.to_value("deg")/Angle(pixel_scale).degkernels=[]forscaleinscales:# .value:kernel=self.kernel(scale,mode="oversample")# TODO: check if normalizing here makes sensekernel.normalize("peak")kernels.append(kernel)returnkernels
@staticmethoddef_sqrt_ts_cube(cubes,method):ifmethodin{"lima"}:scube=CashCountsStatistic(cubes["counts"],cubes["background"]).sqrt_tselifmethod=="asmooth":scube=_sqrt_ts_asmooth(cubes["counts"],cubes["background"])elifmethod=="ts":raiseNotImplementedError()else:raiseValueError("Not a valid sqrt_ts estimation method."" Choose one of the following: 'lima' or 'asmooth'")returnscube
[docs]defrun(self,dataset):"""Run adaptive smoothing on input MapDataset. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` or `~gammapy.datasets.MapDatasetOnOff` the input dataset (with one bin in energy at most) Returns ------- images : dict of `~gammapy.maps.WcsNDMap` Smoothed images; keys are: * 'counts' * 'background' * 'flux' (optional) * 'scales' * 'sqrt_ts'. """energy_axis=self._get_energy_axis(dataset)results=[]forenergy_min,energy_maxinprogress_bar(energy_axis.iter_by_edges,desc="Energy bins"):dataset_sliced=dataset.slice_by_energy(energy_min=energy_min,energy_max=energy_max,name=dataset.name)dataset_sliced.models=dataset.modelsresult=self.estimate_maps(dataset_sliced)results.append(result)maps=Maps()fornameinresults[0].keys():maps[name]=Map.from_stack(maps=[_[name]for_inresults],axis_name="energy")returnmaps
[docs]defestimate_maps(self,dataset):"""Run adaptive smoothing on input Maps. Parameters ---------- dataset : `MapDataset` Dataset Returns ------- images : dict of `~gammapy.maps.WcsNDMap` Smoothed images; keys are: * 'counts' * 'background' * 'flux' (optional) * 'scales' * 'sqrt_ts'. """dataset_image=dataset.to_image(name=dataset.name)dataset_image.models=dataset.models# extract 2d arrayscounts=dataset_image.counts.data[0].astype(float)background=dataset_image.npred_background().data[0]ifisinstance(dataset_image,MapDatasetOnOff):background=dataset_image.background.data[0]ifdataset_image.exposureisnotNone:exposure=estimate_exposure_reco_energy(dataset_image,self.spectrum)else:exposure=Nonepixel_scale=dataset_image.counts.geom.pixel_scales.mean()kernels=self.get_kernels(pixel_scale)cubes={}cubes["counts"]=scale_cube(counts,kernels)cubes["background"]=scale_cube(background,kernels)ifexposureisnotNone:flux=(dataset_image.counts-background)/exposurecubes["flux"]=scale_cube(flux.data[0],kernels)cubes["sqrt_ts"]=self._sqrt_ts_cube(cubes,method=self.method)smoothed=self._reduce_cubes(cubes,kernels)result={}geom=dataset_image.counts.geomforname,datainsmoothed.items():# set remaining pixels with sqrt_ts < threshold to mean valueifnamein["counts","background"]:mask=np.isnan(data)data[mask]=np.mean(locals()[name][mask])result[name]=WcsNDMap(geom,data,unit="")else:unit="deg"ifname=="scale"else""result[name]=WcsNDMap(geom,data,unit=unit)ifexposureisnotNone:data=smoothed["flux"]mask=np.isnan(data)data[mask]=np.mean(flux.data[0][mask])result["flux"]=WcsNDMap(geom,data,unit=flux.unit)returnresult
def_reduce_cubes(self,cubes,kernels):""" Combine scale cube to image. Parameters ---------- cubes : dict Data cubes """shape=cubes["counts"].shape[:2]smoothed={}# Init smoothed data arraysforkeyin["counts","background","scale","sqrt_ts"]:smoothed[key]=np.tile(np.nan,shape)if"flux"incubes:smoothed["flux"]=np.tile(np.nan,shape)foridx,scaleinenumerate(self.scales):# slice out 2D image at index idx out of cubeslice_=np.s_[:,:,idx]mask=np.isnan(smoothed["counts"])mask=(cubes["sqrt_ts"][slice_]>self.threshold)&masksmoothed["scale"][mask]=scalesmoothed["sqrt_ts"][mask]=cubes["sqrt_ts"][slice_][mask]# renormalize smoothed data arraysnorm=kernels[idx].array.sum()forkeyin["counts","background"]:smoothed[key][mask]=cubes[key][slice_][mask]/normif"flux"incubes:smoothed["flux"][mask]=cubes["flux"][slice_][mask]/normreturnsmoothed