[docs]classDatasetsMaker(Maker):"""Run makers in a chain Parameters ---------- makers : list of `Maker` objects Makers stack_datasets : bool If True stack into the reference dataset (see `run` method arguments). n_jobs : int Number of processes to run in parallel cutout_mode : {'trim', 'partial', 'strict'} Used only to cutout the reference `MapDataset` around each processed observation. Mode is an option for Cutout2D, for details see `~astropy.nddata.utils.Cutout2D`. Default is "trim". cutout_width : tuple of `~astropy.coordinates.Angle` Angular sizes of the region in (lon, lat) in that specific order. If only one value is passed, a square region is extracted. If None it returns an error, except if the list of makers includes a `SafeMaskMaker` with the offset-max method defined. In that case it is set to two times `offset_max`. """tag="DatasetsMaker"def__init__(self,makers,stack_datasets=True,n_jobs=None,cutout_mode="trim",cutout_width=None,):self.log=logging.getLogger(__name__)self.makers=makersself.cutout_mode=cutout_modeifcutout_widthisnotNone:cutout_width=Angle(cutout_width)self.cutout_width=cutout_widthself._apply_cutout=Trueifself.cutout_widthisNone:ifself.offset_maxisNone:self._apply_cutout=Falseelse:self.cutout_width=2*self.offset_maxself.n_jobs=n_jobsself.stack_datasets=stack_datasetsself._datasets=[]self._error=False@propertydefoffset_max(self):maker=self.safe_mask_makerifmakerisnotNoneandhasattr(maker,"offset_max"):returnmaker.offset_max@propertydefsafe_mask_maker(self):forminself.makers:ifisinstance(m,SafeMaskMaker):returnm
[docs]defmake_dataset(self,dataset,observation):"""Make single dataset. Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Reference dataset observation : `Observation` Observation """ifself._apply_cutout:cutouts_kwargs={"position":observation.pointing_radec.galactic,"width":self.cutout_width,"mode":self.cutout_mode,}dataset_obs=dataset.cutout(**cutouts_kwargs,)else:dataset_obs=dataset.copy()ifdataset.modelsisnotNone:models=dataset.models.copy()models.reassign(dataset.name,dataset_obs.name)dataset_obs.models=modelslog.info(f"Computing dataset for observation {observation.obs_id}")formakerinself.makers:log.info(f"Running {maker.tag}")dataset_obs=maker.run(dataset=dataset_obs,observation=observation)returndataset_obs
[docs]deferror_callback(self,dataset):# parallel run could cause a memory error with non-explicit message.self._error=True
[docs]defrun(self,dataset,observations,datasets=None):"""Run data reduction Parameters ---------- dataset : `~gammapy.datasets.MapDataset` Reference dataset (used only for stacking if datasets are provided) observations : `Observations` Observations datasets : `~gammapy.datasets.Datasets` Base datasets, if provided its length must be the same than the observations. Returns ------- datasets : `~gammapy.datasets.Datasets` Datasets """ifisinstance(dataset,MapDataset):# also valid for Spectrum as it inherits from MapDatasetself._dataset=datasetelse:raiseTypeError("Invalid reference dataset.")ifisinstance(dataset,SpectrumDataset):self._apply_cutout=FalseifdatasetsisnotNone:self._apply_cutout=Falseelse:datasets=len(observations)*[dataset]ifself.n_jobsisnotNoneandself.n_jobs>1:n_jobs=min(self.n_jobs,len(observations))withPool(processes=n_jobs)aspool:log.info("Using {} jobs.".format(n_jobs))results=[]forbase,obsinzip(datasets,observations):result=pool.apply_async(self.make_dataset,(base,obs,),callback=self.callback,error_callback=self.error_callback,)results.append(result)# wait async run is done[result.wait()forresultinresults]ifself._error:raiseRuntimeError("Execution of a sub-process failed")else:forbase,obsinzip(datasets,observations):dataset=self.make_dataset(base,obs)self.callback(dataset)ifself.stack_datasets:returnDatasets([self._dataset])else:# have to sort datasets because of asyncobs_ids=[d.meta_table["OBS_ID"][0]fordinself._datasets]ordered=[]forobsinobservations:ind=np.where(np.array(obs_ids)==obs.obs_id)[0][0]ordered.append(self._datasets[ind])self._datasets=orderedreturnDatasets(self._datasets)