[docs]classDatasetsMaker(Maker,parallel.ParallelMixin):"""Run makers in a chain Parameters ---------- makers : list of `Maker` objects Makers stack_datasets : bool If True stack into the reference dataset (see `run` method arguments). n_jobs : int Number of processes to run in parallel. Default is one, unless `~gammapy.utils.parallel.N_JOBS_DEFAULT` was modified. cutout_mode : {'trim', 'partial', 'strict'} Used only to cutout the reference `MapDataset` around each processed observation. Mode is an option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`. Default is "trim". cutout_width : tuple of `~astropy.coordinates.Angle` Angular sizes of the region in (lon, lat) in that specific order. If only one value is passed, a square region is extracted. If None it returns an error, except if the list of makers includes a `SafeMaskMaker` with the offset-max method defined. In that case it is set to two times `offset_max`. parallel_backend : {'multiprocessing', 'ray'} Which backend to use for multiprocessing. """tag="DatasetsMaker"def__init__(self,makers,stack_datasets=True,n_jobs=None,cutout_mode="trim",cutout_width=None,parallel_backend=None,):self.log=logging.getLogger(__name__)self.makers=makersself.cutout_mode=cutout_modeifcutout_widthisnotNone:cutout_width=Angle(cutout_width)self.cutout_width=cutout_widthself._apply_cutout=Trueifself.cutout_widthisNone:ifself.offset_maxisNone:self._apply_cutout=Falseelse:self.cutout_width=2*self.offset_maxself.n_jobs=n_jobsself.parallel_backend=parallel_backendself.stack_datasets=stack_datasetsself._datasets=[]self._error=False@propertydefoffset_max(self):maker=self.safe_mask_makerifmakerisnotNoneandhasattr(maker,"offset_max"):returnmaker.offset_max@propertydefsafe_mask_maker(self):forminself.makers:ifisinstance(m,SafeMaskMaker):returnm
[docs]defmake_dataset(self,dataset,observation):"""Make single dataset. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Reference dataset observation : `Observation` Observation """ifself._apply_cutout:cutouts_kwargs={"position":observation.get_pointing_icrs(observation.tmid).galactic,"width":self.cutout_width,"mode":self.cutout_mode,}dataset_obs=dataset.cutout(**cutouts_kwargs,)else:dataset_obs=dataset.copy()ifdataset.modelsisnotNone:models=dataset.models.copy()models.reassign(dataset.name,dataset_obs.name)dataset_obs.models=modelslog.info(f"Computing dataset for observation {observation.obs_id}")formakerinself.makers:log.info(f"Running {maker.tag}")dataset_obs=maker.run(dataset=dataset_obs,observation=observation)returndataset_obs
[docs]deferror_callback(self,dataset):# parallel run could cause a memory error with non-explicit message.self._error=True
[docs]defrun(self,dataset,observations,datasets=None):"""Run data reduction Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Reference dataset (used only for stacking if datasets are provided) observations : `Observations` Observations datasets : `~gammapy.datasets.Datasets` Base datasets, if provided its length must be the same than the observations. Returns ------- datasets : `~gammapy.datasets.Datasets` Datasets """ifisinstance(dataset,MapDataset):# also valid for Spectrum as it inherits from MapDatasetself._dataset=datasetelse:raiseTypeError("Invalid reference dataset.")ifisinstance(dataset,SpectrumDataset):self._apply_cutout=FalseifdatasetsisnotNone:self._apply_cutout=Falseelse:datasets=len(observations)*[dataset]n_jobs=min(self.n_jobs,len(observations))parallel.run_multiprocessing(self.make_dataset,zip(datasets,observations),backend=self.parallel_backend,pool_kwargs=dict(processes=n_jobs),method="apply_async",method_kwargs=dict(callback=self.callback,error_callback=self.error_callback,),task_name="Data reduction",)ifself._error:raiseRuntimeError("Execution of a sub-process failed")ifself.stack_datasets:returnDatasets([self._dataset])lookup={d.meta_table["OBS_ID"][0]:idxforidx,dinenumerate(self._datasets)}returnDatasets([self._datasets[lookup[obs.obs_id]]forobsinobservations])