# Licensed under a 3-clause BSD style license - see LICENSE.rstimportnumpyasnpfromastropy.coordinatesimportSkyCoordfromgammapy.dataimportObservationimportastropy.unitsasufromgammapy.mapsimportMap,MapAxis,WcsGeomfromgammapy.maps.utilsimport_check_width,_check_binszfromgammapy.modeling.models.utilsimportcutout_template_modelsfrom.importDatasets__all__=["apply_edisp","split_dataset","create_map_dataset_from_dl4",]
[docs]defapply_edisp(input_map,edisp):"""Apply energy dispersion to map. Requires "energy_true" axis. Parameters ---------- input_map : `~gammapy.maps.Map` The map to be convolved with the energy dispersion. It must have an axis named "energy_true". edisp : `~gammapy.irf.EDispKernel` Energy dispersion matrix. Returns ------- map : `~gammapy.maps.Map` Map with energy dispersion applied. Examples -------- >>> from gammapy.irf.edisp import EDispKernel >>> from gammapy.datasets.utils import apply_edisp >>> from gammapy.maps import MapAxis, Map >>> import numpy as np >>> >>> axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=6, name="energy_true") >>> m = Map.create( ... skydir=(0.8, 0.8), ... width=(1, 1), ... binsz=0.02, ... axes=[axis], ... frame="galactic" ... ) >>> e_true = m.geom.axes[0] >>> e_reco = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3) >>> edisp = EDispKernel.from_diagonal_response(energy_axis_true=e_true, energy_axis=e_reco) >>> map_edisp = apply_edisp(m, edisp) >>> print(map_edisp) WcsNDMap <BLANKLINE> geom : WcsGeom axes : ['lon', 'lat', 'energy'] shape : (np.int64(50), np.int64(50), 3) ndim : 3 unit : dtype : float64 <BLANKLINE> """# TODO: either use sparse matrix multiplication or something like edisp.is_diagonalifedispisnotNone:loc=input_map.geom.axes.index("energy_true")data=np.rollaxis(input_map.data,loc,len(input_map.data.shape))data=np.dot(data,edisp.pdf_matrix)data=np.rollaxis(data,-1,loc)energy_axis=edisp.axes["energy"].copy(name="energy")else:data=input_map.dataenergy_axis=input_map.geom.axes["energy_true"].copy(name="energy")geom=input_map.geom.to_image().to_cube(axes=[energy_axis])returnMap.from_geom(geom=geom,data=data,unit=input_map.unit)
defget_figure(fig,width,height):importmatplotlib.pyplotaspltifplt.get_fignums():ifnotfig:fig=plt.gcf()fig.clf()else:fig=plt.figure(figsize=(width,height))returnfigdefget_axes(ax1,ax2,width,height,args1,args2,kwargs1=None,kwargs2=None):ifnotax1andnotax2:kwargs1=kwargs1or{}kwargs2=kwargs2or{}fig=get_figure(None,width,height)ax1=fig.add_subplot(*args1,**kwargs1)ax2=fig.add_subplot(*args2,**kwargs2)elifnotax1ornotax2:raiseValueError("Either both or no Axes must be provided")returnax1,ax2defget_nearest_valid_exposure_position(exposure,position=None):mask_exposure=exposure>0.0*exposure.unitmask_exposure=mask_exposure.reduce_over_axes(func=np.logical_or)ifnotposition:position=mask_exposure.geom.center_skydirreturnmask_exposure.mask_nearest_position(position)
[docs]defsplit_dataset(dataset,width,margin,split_template_models=True):"""Split dataset in multiple non-overlapping analysis regions. Parameters ---------- dataset : `~gammapy.datasets.Dataset` Dataset to split. width : `~astropy.coordinates.Angle` Angular size of each sub-region. margin : `~astropy.coordinates.Angle` Angular size to be added to the `width`. The margin should be defined such as sources outside the region of interest that contributes inside are well-defined. The mask_fit in the margin region is False and unchanged elsewhere. split_template_models : bool, optional Apply cutout to template models or not. Default is True. Returns ------- datasets : `~gammapy.datasets.Datasets` Split datasets. Examples -------- >>> from gammapy.datasets import MapDataset >>> from gammapy.datasets.utils import split_dataset >>> from gammapy.modeling.models import GaussianSpatialModel, PowerLawSpectralModel, SkyModel >>> import astropy.units as u >>> dataset = MapDataset.read("$GAMMAPY_DATA/cta-1dc-gc/cta-1dc-gc.fits.gz") >>> # Split the dataset >>> width = 4 * u.deg >>> margin = 1 * u.deg >>> split_datasets = split_dataset(dataset, width, margin, split_template_models=False) >>> # Apply a model and split the dataset >>> spatial_model = GaussianSpatialModel() >>> spectral_model = PowerLawSpectralModel() >>> sky_model = SkyModel( ... spatial_model=spatial_model, spectral_model=spectral_model, name="test-model" ... ) >>> dataset.models = [sky_model] >>> split_datasets = split_dataset( ... dataset, width=width, margin=margin, split_template_models=True ... ) """ifmargin>=width/2.0:raiseValueError("margin should be lower than width/2.")geom=dataset.counts.geom.to_image()pixel_width=np.ceil((width/geom.pixel_scales).to_value("")).astype(int)ilon=range(0,geom.data_shape[1],pixel_width[1])ilat=range(0,geom.data_shape[0],pixel_width[0])datasets=Datasets()forilinilon:foribinilat:l,b=geom.pix_to_coord((il,ib))cutout_kwargs=dict(position=SkyCoord(l,b,frame=geom.frame),width=width+2*margin)d=dataset.cutout(**cutout_kwargs)# apply maskgeom_cut=d.counts.geomgeom_cut_image=geom_cut.to_image()ilgrid,ibgrid=np.meshgrid(range(geom_cut_image.data_shape[1]),range(geom_cut_image.data_shape[0]))il_cut,ib_cut=geom_cut_image.coord_to_pix((l,b))mask=(ilgrid>=il_cut-pixel_width[1]/2.0)&(ibgrid>=ib_cut-pixel_width[0]/2.0)ifil==ilon[-1]:mask&=ilgrid<=il_cut+pixel_width[1]/2.0else:mask&=ilgrid<il_cut+pixel_width[1]/2.0ifib==ilat[-1]:mask&=ibgrid<=ib_cut+pixel_width[0]/2.0else:mask&=ibgrid<ib_cut+pixel_width[0]/2.0mask=np.expand_dims(mask,0)mask=np.repeat(mask,geom_cut.data_shape[0],axis=0)d.mask_fit=Map.from_geom(geom_cut,data=mask)ifdataset.mask_fitisnotNone:d.mask_fit&=dataset.mask_fit.interp_to_geom(geom_cut,method="nearest")# template models cutout (should limit memory usage in parallel)ifsplit_template_models:d.models=cutout_template_models(dataset.models,cutout_kwargs,[d.name])else:d.models=dataset.modelsdatasets.append(d)returndatasets
[docs]defcreate_map_dataset_from_dl4(data,geom=None,energy_axis_true=None,name=None):"""Create a map dataset from a map dataset or an observation containing DL4 IRFs Parameters ---------- data : `~gammapy.dataset.MapDataset` or `~gammapy.data.Observation` MapDataset or Observation containing DL4 IRFs geom : `~gammapy.maps.WcsGeom`, optional Output dataset maps geometry. The default is None, and it is derived from IRFs energy_axis_true : `~gammapy.maps.MapAxis`, optional True energy axis used for IRF maps. The default is None, and it is derived from IRFs name : str, optional Dataset name. The default is None, and the name is randomly generated. Returns ------- dataset : `~gammapy.datasets.MapDataset` Map dataset. """fromgammapy.makersimportMapDatasetMakerfromgammapy.datasetsimportMapDataset# define target geomifgeomisNone:ifisinstance(data,Observation):geom_image=data.aeff.geom.to_image()elifisinstance(data,MapDataset):geom_image=data.exposure.geom.to_image()geom=geom_image.to_cube([data.edisp.edisp_map.geom.axes["energy"]])energy_axis=geom.axes["energy"]ifenergy_axis_trueisNone:energy_axis_true=data.edisp.edisp_map.geom.axes["energy_true"]# ensure that DL4 IRFs have the axesrad_axis=data.psf.psf_map.geom.axes["rad"]geom_psf=data.psf.psf_map.geom.to_image().to_cube([rad_axis,energy_axis_true])geom_edisp=data.edisp.edisp_map.geom.to_image().to_cube([energy_axis,energy_axis_true])geom_exposure=geom.to_image().to_cube([energy_axis_true])# create dataset and run data reduction / irfs interpolationdataset=MapDataset.from_geoms(geom,geom_exposure=geom_exposure,geom_psf=geom_psf,geom_edisp=geom_edisp,name=name,)selection=["exposure","edisp","psf"]ifisinstance(data,Observation)anddata.events:selection.append("counts")ifisinstance(data,Observation)anddata.bkg:selection.append("background")maker=MapDatasetMaker(selection=selection)dataset=maker.run(dataset,data)ifisinstance(data,MapDataset)anddata.counts:ifdataset.counts.geom==data.counts.geom:dataset.counts.data=data.counts.dataelse:raiseValueError("Counts geom of input MapDataset and target geom must be identical")ifnotdataset.background:dataset.background=Map.from_geom(geom,data=0.0)ifdataset.edisp.exposure_mapandnp.all(dataset.edisp.exposure_map.data)==0.0:dataset.edisp.exposure_map.data=dataset.psf.exposure_map.datareturndataset
defcreate_global_dataset(datasets,name=None,position=None,binsz=None,width=None,energy_min=None,energy_max=None,energy_true_min=None,energy_true_max=None,nbin_per_decade=None,):"""Create an empty dataset encompassing the input datasets. Parameters ---------- datasets : `~gammapy.datasets.Datasets` Datasets name : str, optional Name of the output dataset. Default is None. position : `~astropy.coordinates.SkyCoord` Center position of the output dataset. Default is None, and the average position is taken. binsz : float or tuple or list, optional Map pixel size in degrees. Default is None, the minimum bin size is taken. width : float or tuple or list or string, optional Width of the map in degrees. Default is None, in that case it is derived as the maximal width + twice the maximal separation between datasets and position. energy_min : `~astropy.units.Quantity` Energy range. Default is None, the minimum energy is taken. energy_max : `~astropy.units.Quantity` Energy range. Default is None, the maximum energy is taken. energy_true_min, energy_true_max : `~astropy.units.Quantity`, `~astropy.units.Quantity` True energy range. If None, minimum and maximum energies of all geometries is used. Default is None. nbin_per_decade : int number of energy bins per decade. Default is None, the maximum is taken. Returns ------- datasets : `~gammapy.datasets.MapDatset` Empy global dataset. """fromgammapy.datasetsimportMapDatasetifpositionisNone:frame=datasets[0].counts.geom.framepositions=SkyCoord([d.counts.geom.center_skydir.transform_to(frame)fordindatasets])position=SkyCoord(positions.cartesian.mean(),frame="icrs")position=SkyCoord(position.ra,position.dec,frame="icrs").transform_to(frame)# drop fake distancebinsz_list=[]width_list=[]energy_min_list=[]energy_max_list=[]energy_true_min_list=[]energy_true_max_list=[]nbin_per_decade_list=[]fordindatasets:binsz_list.append(np.abs(d.counts.geom.pixel_scales).min())width_list.append(d.counts.geom.width.max()+2*d.counts.geom.center_skydir.separation(position))energy_min_list.append(d.counts.geom.axes["energy"].edges.min())energy_max_list.append(d.counts.geom.axes["energy"].edges.max())energy_true_min_list.append(d.exposure.geom.axes["energy_true"].edges.min())energy_true_max_list.append(d.exposure.geom.axes["energy_true"].edges.max())ndecade=np.log10(energy_true_max_list[-1].value)-np.log10(energy_true_min_list[-1].value)nbins=len(d.exposure.geom.axes[0].center)nbin_per_decade_list.append(np.ceil(nbins/ndecade))ifbinszisNone:binsz=np.min(u.Quantity(binsz_list))binsz=_check_binsz(binsz)ifwidthisNone:width=np.max(u.Quantity(width_list))width=_check_width(width)energy_true_min=(energy_true_minifenergy_true_minisnotNoneelseu.Quantity(energy_true_min_list).min())ifenergy_true_maxisNone:energy_true_max=np.max(u.Quantity(energy_true_max_list))ifenergy_minisNone:energy_min=np.min(u.Quantity(energy_min_list))ifenergy_maxisNone:energy_max=np.max(u.Quantity(energy_max_list))nbin_per_decade=(nbin_per_decadeifnbin_per_decadeisnotNoneelsenp.max(nbin_per_decade_list))energy_axis=MapAxis.from_energy_bounds(energy_min,energy_max,nbin_per_decade,unit="TeV",per_decade=True)geom=WcsGeom.create(skydir=position,binsz=binsz,width=width,frame=position.frame,proj=datasets[0].counts.geom.projection,axes=[energy_axis],)energy_axis_true=MapAxis.from_energy_bounds(energy_true_min,energy_true_max,nbin_per_decade,unit="TeV",name="energy_true",per_decade=True,)returnMapDataset.create(geom=geom,energy_axis_true=energy_axis_true,name=name)