# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Ring background estimation."""importitertoolsimportnumpyasnpfromastropy.convolutionimportRing2DKernel,Tophat2DKernelfromastropy.coordinatesimportAnglefromgammapy.mapsimportMapfromgammapy.utils.arrayimportscale_cubefrom..coreimportMaker__all__=["AdaptiveRingBackgroundMaker","RingBackgroundMaker"]
[docs]classAdaptiveRingBackgroundMaker(Maker):"""Adaptive ring background algorithm. This algorithm extends the `RingBackgroundMaker` method by adapting the size of the ring to achieve a minimum on / off exposure ratio (alpha) in regions where the area to estimate the background from is limited. Parameters ---------- r_in : `~astropy.units.Quantity` Inner radius of the ring. r_out_max : `~astropy.units.Quantity` Maximum outer radius of the ring. width : `~astropy.units.Quantity` Width of the ring. stepsize : `~astropy.units.Quantity` Stepsize used for increasing the radius. threshold_alpha : float Threshold on alpha above which the adaptive ring takes action. theta : `~astropy.units.Quantity` Integration radius used for alpha computation. method : {'fixed_width', 'fixed_r_in'} Adaptive ring method. Default is 'fixed_width'. exclusion_mask : `~gammapy.maps.WcsNDMap` Exclusion mask. See Also -------- RingBackgroundMaker. """tag="AdaptiveRingBackgroundMaker"def__init__(self,r_in,r_out_max,width,stepsize="0.02 deg",threshold_alpha=0.1,theta="0.22 deg",method="fixed_width",exclusion_mask=None,):ifmethodnotin["fixed_width","fixed_r_in"]:raiseValueError("Not a valid adaptive ring method.")self.r_in=Angle(r_in)self.r_out_max=Angle(r_out_max)self.width=Angle(width)self.stepsize=Angle(stepsize)self.threshold_alpha=threshold_alphaself.theta=Angle(theta)self.method=methodself.exclusion_mask=exclusion_mask
[docs]defkernels(self,image):"""Ring kernels according to the specified method. Parameters ---------- image : `~gammapy.maps.WcsNDMap` Map specifying the WCS information. Returns ------- kernels : list List of `~astropy.convolution.Ring2DKernel`. """scale=image.geom.pixel_scales[0]r_in=(self.r_in/scale).to_value("")r_out_max=(self.r_out_max/scale).to_value("")width=(self.width/scale).to_value("")stepsize=(self.stepsize/scale).to_value("")ifself.method=="fixed_width":r_ins=np.arange(r_in,(r_out_max-width),stepsize)widths=[width]elifself.method=="fixed_r_in":widths=np.arange(width,(r_out_max-r_in),stepsize)r_ins=[r_in]else:raiseValueError(f"Invalid method: {self.method!r}")kernels=[]forr_in,widthinitertools.product(r_ins,widths):kernel=Ring2DKernel(r_in,width)kernel.normalize("peak")kernels.append(kernel)returnkernels
@staticmethoddef_alpha_approx_cube(cubes):acceptance=cubes["acceptance"]acceptance_off=cubes["acceptance_off"]withnp.errstate(divide="ignore",invalid="ignore"):alpha_approx=np.where(acceptance_off>0,acceptance/acceptance_off,np.inf)returnalpha_approxdef_reduce_cubes(self,cubes,dataset):"""Compute off and off acceptance map. Calculated by reducing the cubes. The data is iterated along the third axis (i.e. increasing ring sizes), the value with the first approximate alpha < threshold is taken. """threshold=self.threshold_alphaalpha_approx_cube=self._alpha_approx_cube(cubes)counts_off_cube=cubes["counts_off"]acceptance_off_cube=cubes["acceptance_off"]acceptance_cube=cubes["acceptance"]shape=alpha_approx_cube.shape[:2]counts_off=np.tile(np.nan,shape)acceptance_off=np.tile(np.nan,shape)acceptance=np.tile(np.nan,shape)foridxinnp.arange(alpha_approx_cube.shape[-1]):mask=(alpha_approx_cube[:,:,idx]<=threshold)&np.isnan(counts_off)counts_off[mask]=counts_off_cube[:,:,idx][mask]acceptance_off[mask]=acceptance_off_cube[:,:,idx][mask]acceptance[mask]=acceptance_cube[:,:,idx][mask]counts=dataset.countsacceptance=counts.copy(data=acceptance[np.newaxis,Ellipsis])acceptance_off=counts.copy(data=acceptance_off[np.newaxis,Ellipsis])counts_off=counts.copy(data=counts_off[np.newaxis,Ellipsis])returnacceptance,acceptance_off,counts_off
[docs]defmake_cubes(self,dataset):"""Make acceptance, off acceptance, off counts cubes. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input map dataset. Returns ------- cubes : dict of `~gammapy.maps.WcsNDMap` Dictionary containing ``counts_off``, ``acceptance`` and ``acceptance_off`` cubes. """counts=dataset.countsbackground=dataset.npred_background()kernels=self.kernels(counts)ifself.exclusion_mask:exclusion=self.exclusion_mask.interp_to_geom(geom=counts.geom)else:exclusion=Map.from_geom(geom=counts.geom,data=True,dtype=bool)cubes={}cubes["counts_off"]=scale_cube((counts.data*exclusion.data)[0,Ellipsis],kernels)cubes["acceptance_off"]=scale_cube((background.data*exclusion.data)[0,Ellipsis],kernels)scale=background.geom.pixel_scales[0].to("deg")theta=self.theta*scaletophat=Tophat2DKernel(theta.value)tophat.normalize("peak")acceptance=background.convolve(tophat.array)acceptance_data=acceptance.data[0,Ellipsis]cubes["acceptance"]=np.repeat(acceptance_data[Ellipsis,np.newaxis],len(kernels),axis=2)returncubes
[docs]defrun(self,dataset,observation=None):"""Run adaptive ring background maker. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input map dataset. Returns ------- dataset_on_off : `~gammapy.datasets.MapDatasetOnOff` On off dataset. """fromgammapy.datasetsimportMapDatasetOnOffcubes=self.make_cubes(dataset)acceptance,acceptance_off,counts_off=self._reduce_cubes(cubes,dataset)mask_safe=dataset.mask_safe.copy()not_has_off_acceptance=acceptance_off.data<=0mask_safe.data[not_has_off_acceptance]=0dataset_on_off=MapDatasetOnOff.from_map_dataset(dataset=dataset,counts_off=counts_off,acceptance=acceptance,acceptance_off=acceptance_off,name=dataset.name,)dataset_on_off.mask_safe=mask_safereturndataset_on_off
[docs]classRingBackgroundMaker(Maker):"""Perform a local renormalisation of the existing background template, using a ring kernel. Expected signal regions should be removed by passing an exclusion mask. Parameters ---------- r_in : `~astropy.units.Quantity` Inner ring radius. width : `~astropy.units.Quantity` Ring width. exclusion_mask : `~gammapy.maps.WcsNDMap` Exclusion mask. Examples -------- For a usage example, see :doc:`/tutorials/analysis-2d/ring_background` tutorial. See Also -------- AdaptiveRingBackgroundEstimator. """tag="RingBackgroundMaker"def__init__(self,r_in,width,exclusion_mask=None):self.r_in=Angle(r_in)self.width=Angle(width)self.exclusion_mask=exclusion_mask
[docs]defkernel(self,image):"""Ring kernel. Parameters ---------- image : `~gammapy.maps.WcsNDMap` Input map. Returns ------- ring : `~astropy.convolution.Ring2DKernel` Ring kernel. """scale=image.geom.pixel_scales[0].to("deg")r_in=self.r_in.to("deg")/scalewidth=self.width.to("deg")/scalering=Ring2DKernel(r_in.value,width.value)ring.normalize("peak")returnring
[docs]defmake_maps_off(self,dataset):"""Make off maps. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input map dataset. Returns ------- maps_off : dict of `~gammapy.maps.WcsNDMap` Dictionary containing `counts_off` and `acceptance_off` maps. """counts=dataset.countsbackground=dataset.npred_background()ifself.exclusion_maskisnotNone:# reproject exclusion maskcoords=counts.geom.get_coord()data=self.exclusion_mask.get_by_coord(coords)exclusion=Map.from_geom(geom=counts.geom,data=data)else:data=np.ones(counts.geom.data_shape,dtype=bool)exclusion=Map.from_geom(geom=counts.geom,data=data)maps_off={}ring=self.kernel(counts)counts_excluded=counts*exclusionmaps_off["counts_off"]=counts_excluded.convolve(ring.array)background_excluded=background*exclusionmaps_off["acceptance_off"]=background_excluded.convolve(ring.array)returnmaps_off
[docs]defrun(self,dataset,observation=None):"""Run ring background maker. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Input map dataset. Returns ------- dataset_on_off : `~gammapy.datasets.MapDatasetOnOff` On off dataset. """fromgammapy.datasetsimportMapDatasetOnOffmaps_off=self.make_maps_off(dataset)maps_off["acceptance"]=dataset.npred_background()mask_safe=dataset.mask_safe.copy()not_has_off_acceptance=maps_off["acceptance_off"].data<=0mask_safe.data[not_has_off_acceptance]=0dataset_on_off=MapDatasetOnOff.from_map_dataset(dataset=dataset,name=dataset.name,**maps_off)dataset_on_off.mask_safe=mask_safereturndataset_on_off