# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Session class driving the high level interface API"""importloggingfromastropy.coordinatesimportSkyCoordfromastropy.tableimportTablefromregionsimportCircleSkyRegionfromgammapy.analysis.configimportAnalysisConfigfromgammapy.dataimportDataStorefromgammapy.datasetsimportDatasets,FluxPointsDataset,MapDataset,SpectrumDatasetfromgammapy.estimatorsimport(ExcessMapEstimator,FluxPointsEstimator,LightCurveEstimator,)fromgammapy.makersimport(DatasetsMaker,FoVBackgroundMaker,MapDatasetMaker,ReflectedRegionsBackgroundMaker,RingBackgroundMaker,SafeMaskMaker,SpectrumDatasetMaker,)fromgammapy.mapsimportMap,MapAxis,RegionGeom,WcsGeomfromgammapy.modelingimportFitfromgammapy.modeling.modelsimportDatasetModels,FoVBackgroundModel,Modelsfromgammapy.utils.pbarimportprogress_barfromgammapy.utils.scriptsimportmake_path__all__=["Analysis"]log=logging.getLogger(__name__)
[docs]classAnalysis:"""Config-driven high level analysis interface. It is initialized by default with a set of configuration parameters and values declared in an internal high level interface model, though the user can also provide configuration parameters passed as a nested dictionary at the moment of instantiation. In that case these parameters will overwrite the default values of those present in the configuration file. Parameters ---------- config : dict or `AnalysisConfig` Configuration options following `AnalysisConfig` schema """def__init__(self,config):self.config=configself.config.set_logging()self.datastore=Noneself.observations=Noneself.datasets=Noneself.fit=Fit()self.fit_result=Noneself.flux_points=None@propertydefmodels(self):ifnotself.datasets:raiseRuntimeError("No datasets defined. Impossible to set models.")returnself.datasets.models@models.setterdefmodels(self,models):self.set_models(models,extend=False)@propertydefconfig(self):"""Analysis configuration (`AnalysisConfig`)"""returnself._config@config.setterdefconfig(self,value):ifisinstance(value,dict):self._config=AnalysisConfig(**value)elifisinstance(value,AnalysisConfig):self._config=valueelse:raiseTypeError("config must be dict or AnalysisConfig.")def_set_data_store(self):"""Set the datastore on the Analysis object."""path=make_path(self.config.observations.datastore)ifpath.is_file():log.debug(f"Setting datastore from file: {path}")self.datastore=DataStore.from_file(path)elifpath.is_dir():log.debug(f"Setting datastore from directory: {path}")self.datastore=DataStore.from_dir(path)else:raiseFileNotFoundError(f"Datastore not found: {path}")def_make_obs_table_selection(self):"""Return list of obs_ids after filtering on datastore observation table."""obs_settings=self.config.observations# Reject configs with list of obs_ids and obs_file set at the same timeiflen(obs_settings.obs_ids)andobs_settings.obs_fileisnotNone:raiseValueError("Values for both parameters obs_ids and obs_file are not accepted.")# First select input list of observations from obs_tableiflen(obs_settings.obs_ids):selected_obs_table=self.datastore.obs_table.select_obs_id(obs_settings.obs_ids)elifobs_settings.obs_fileisnotNone:path=make_path(obs_settings.obs_file)ids=list(Table.read(path,format="ascii",data_start=0).columns[0])selected_obs_table=self.datastore.obs_table.select_obs_id(ids)else:selected_obs_table=self.datastore.obs_table# Apply cone selectionifobs_settings.obs_cone.lonisnotNone:cone=dict(type="sky_circle",frame=obs_settings.obs_cone.frame,lon=obs_settings.obs_cone.lon,lat=obs_settings.obs_cone.lat,radius=obs_settings.obs_cone.radius,border="0 deg",)selected_obs_table=selected_obs_table.select_observations(cone)returnselected_obs_table["OBS_ID"].tolist()
[docs]defget_observations(self):"""Fetch observations from the data store according to criteria defined in the configuration."""observations_settings=self.config.observationsself._set_data_store()log.info("Fetching observations.")ids=self._make_obs_table_selection()required_irf=[_.valuefor_inobservations_settings.required_irf]self.observations=self.datastore.get_observations(ids,skip_missing=True,required_irf=required_irf)ifobservations_settings.obs_time.startisnotNone:start=observations_settings.obs_time.startstop=observations_settings.obs_time.stopiflen(start.shape)==0:time_intervals=[(start,stop)]else:time_intervals=[(tstart,tstop)fortstart,tstopinzip(start,stop)]self.observations=self.observations.select_time(time_intervals)log.info(f"Number of selected observations: {len(self.observations)}")forobsinself.observations:log.debug(obs)
[docs]defget_datasets(self):"""Produce reduced datasets."""datasets_settings=self.config.datasetsifnotself.observationsorlen(self.observations)==0:raiseRuntimeError("No observations have been selected.")ifdatasets_settings.type=="1d":self._spectrum_extraction()else:# 3dself._map_making()
[docs]defset_models(self,models,extend=True):"""Set models on datasets. Adds `FoVBackgroundModel` if not present already Parameters ---------- models : `~gammapy.modeling.models.Models` or str Models object or YAML models string extend : bool Extend the exiting models on the datasets or replace them. """ifnotself.datasetsorlen(self.datasets)==0:raiseRuntimeError("Missing datasets")log.info("Reading model.")ifisinstance(models,str):models=Models.from_yaml(models)elifisinstance(models,Models):passelifisinstance(models,DatasetModels)orisinstance(models,list):models=Models(models)else:raiseTypeError(f"Invalid type: {models!r}")ifextend:models.extend(self.datasets.models)self.datasets.models=modelsbkg_models=[]fordatasetinself.datasets:ifdataset.tag=="MapDataset"anddataset.background_modelisNone:bkg_models.append(FoVBackgroundModel(dataset_name=dataset.name))ifbkg_models:models.extend(bkg_models)self.datasets.models=modelslog.info(models)
[docs]defread_models(self,path,extend=True):"""Read models from YAML file. Parameters ---------- path : str path to the model file extend : bool Extend the exiting models on the datasets or replace them. """path=make_path(path)models=Models.read(path)self.set_models(models,extend=extend)log.info(f"Models loaded from {path}.")
[docs]defwrite_models(self,overwrite=True,write_covariance=True):"""Write models to YAML file. File name is taken from the configuration file. """filename_models=self.config.general.models_fileiffilename_modelsisnotNone:self.models.write(filename_models,overwrite=overwrite,write_covariance=write_covariance)log.info(f"Models loaded from {filename_models}.")else:raiseRuntimeError("Missing models_file in config.general")
[docs]defread_datasets(self):"""Read datasets from YAML file. File names are taken from the configuration file. """filename=self.config.general.datasets_filefilename_models=self.config.general.models_fileiffilenameisnotNone:self.datasets=Datasets.read(filename)log.info(f"Datasets loaded from {filename}.")iffilename_modelsisnotNone:self.read_models(filename_models,extend=False)else:raiseRuntimeError("Missing datasets_file in config.general")
[docs]defwrite_datasets(self,overwrite=True,write_covariance=True):"""Write datasets to YAML file. File names are taken from the configuration file. Parameters ---------- overwrite : bool overwrite datasets FITS files write_covariance : bool save covariance or not """filename=self.config.general.datasets_filefilename_models=self.config.general.models_fileiffilenameisnotNone:self.datasets.write(filename,filename_models,overwrite=overwrite,write_covariance=write_covariance,)log.info(f"Datasets stored to {filename}.")log.info(f"Datasets stored to {filename_models}.")else:raiseRuntimeError("Missing datasets_file in config.general")
[docs]defrun_fit(self):"""Fitting reduced datasets to model."""ifnotself.models:raiseRuntimeError("Missing models")fit_settings=self.config.fitfordatasetinself.datasets:iffit_settings.fit_range:energy_min=fit_settings.fit_range.minenergy_max=fit_settings.fit_range.maxgeom=dataset.counts.geomdataset.mask_fit=geom.energy_mask(energy_min,energy_max)log.info("Fitting datasets.")result=self.fit.run(datasets=self.datasets)self.fit_result=resultlog.info(self.fit_result)
[docs]defget_flux_points(self):"""Calculate flux points for a specific model component."""ifnotself.datasets:raiseRuntimeError("No datasets defined. Impossible to compute flux points.")fp_settings=self.config.flux_pointslog.info("Calculating flux points.")energy_edges=self._make_energy_axis(fp_settings.energy).edgesflux_point_estimator=FluxPointsEstimator(energy_edges=energy_edges,source=fp_settings.source,fit=self.fit,**fp_settings.parameters,)fp=flux_point_estimator.run(datasets=self.datasets)self.flux_points=FluxPointsDataset(data=fp,models=self.models[fp_settings.source])cols=["e_ref","dnde","dnde_ul","dnde_err","sqrt_ts"]table=self.flux_points.data.to_table(sed_type="dnde")log.info("\n{}".format(table[cols]))
[docs]defget_excess_map(self):"""Calculate excess map with respect to the current model."""excess_settings=self.config.excess_maplog.info("Computing excess maps.")# TODO: Here we could possibly stack the datasets if needed# or allow to compute the excess map for each datasetiflen(self.datasets)>1:raiseValueError("Datasets must be stacked to compute the excess map")ifself.datasets[0].tagnotin["MapDataset","MapDatasetOnOff"]:raiseValueError("Cannot compute excess map for 1D dataset")energy_edges=self._make_energy_axis(excess_settings.energy_edges)ifenergy_edgesisnotNone:energy_edges=energy_edges.edgesexcess_map_estimator=ExcessMapEstimator(correlation_radius=excess_settings.correlation_radius,energy_edges=energy_edges,**excess_settings.parameters,)self.excess_map=excess_map_estimator.run(self.datasets[0])
[docs]defget_light_curve(self):"""Calculate light curve for a specific model component."""lc_settings=self.config.light_curvelog.info("Computing light curve.")energy_edges=self._make_energy_axis(lc_settings.energy_edges).edgesif(lc_settings.time_intervals.startisNoneorlc_settings.time_intervals.stopisNone):log.info("Time intervals not defined. Extract light curve on datasets GTIs.")time_intervals=Noneelse:time_intervals=[(t1,t2)fort1,t2inzip(lc_settings.time_intervals.start,lc_settings.time_intervals.stop)]light_curve_estimator=LightCurveEstimator(time_intervals=time_intervals,energy_edges=energy_edges,source=lc_settings.source,fit=self.fit,**lc_settings.parameters,)lc=light_curve_estimator.run(datasets=self.datasets)self.light_curve=lclog.info("\n{}".format(self.light_curve.to_table(format="lightcurve",sed_type="flux")))
@staticmethoddef_create_wcs_geometry(wcs_geom_settings,axes):"""Create the WCS geometry."""geom_params={}skydir_settings=wcs_geom_settings.skydirifskydir_settings.lonisnotNone:skydir=SkyCoord(skydir_settings.lon,skydir_settings.lat,frame=skydir_settings.frame)geom_params["skydir"]=skydirifskydir_settings.framein["icrs","galactic"]:geom_params["frame"]=skydir_settings.frameelse:raiseValueError(f"Incorrect skydir frame: expect 'icrs' or 'galactic'. Got {skydir_settings.frame}")geom_params["axes"]=axesgeom_params["binsz"]=wcs_geom_settings.binsizewidth=wcs_geom_settings.width.width.to("deg").valueheight=wcs_geom_settings.width.height.to("deg").valuegeom_params["width"]=(width,height)returnWcsGeom.create(**geom_params)@staticmethoddef_create_region_geometry(on_region_settings,axes):"""Create the region geometry."""on_lon=on_region_settings.lonon_lat=on_region_settings.laton_center=SkyCoord(on_lon,on_lat,frame=on_region_settings.frame)on_region=CircleSkyRegion(on_center,on_region_settings.radius)returnRegionGeom.create(region=on_region,axes=axes)def_create_geometry(self):"""Create the geometry."""log.debug("Creating geometry.")datasets_settings=self.config.datasetsgeom_settings=datasets_settings.geomaxes=[self._make_energy_axis(geom_settings.axes.energy)]ifdatasets_settings.type=="3d":geom=self._create_wcs_geometry(geom_settings.wcs,axes)elifdatasets_settings.type=="1d":geom=self._create_region_geometry(datasets_settings.on_region,axes)else:raiseValueError(f"Incorrect dataset type. Expect '1d' or '3d'. Got {datasets_settings.type}.")returngeomdef_create_reference_dataset(self,name=None):"""Create the reference dataset for the current analysis."""log.debug("Creating target Dataset.")geom=self._create_geometry()geom_settings=self.config.datasets.geomgeom_irf=dict(energy_axis_true=None,binsz_irf=None)ifgeom_settings.axes.energy_true.minisnotNone:geom_irf["energy_axis_true"]=self._make_energy_axis(geom_settings.axes.energy_true,name="energy_true")ifgeom_settings.wcs.binsize_irfisnotNone:geom_irf["binsz_irf"]=geom_settings.wcs.binsize_irf.to("deg").valueifself.config.datasets.type=="1d":returnSpectrumDataset.create(geom,name=name,**geom_irf)else:returnMapDataset.create(geom,name=name,**geom_irf)def_create_dataset_maker(self):"""Create the Dataset Maker."""log.debug("Creating the target Dataset Maker.")datasets_settings=self.config.datasetsifdatasets_settings.type=="3d":maker=MapDatasetMaker(selection=datasets_settings.map_selection)elifdatasets_settings.type=="1d":maker_config={}ifdatasets_settings.containment_correction:maker_config["containment_correction"]=datasets_settings.containment_correctionmaker_config["selection"]=["counts","exposure","edisp"]maker=SpectrumDatasetMaker(**maker_config)returnmakerdef_create_safe_mask_maker(self):"""Create the SafeMaskMaker."""log.debug("Creating the mask_safe Maker.")safe_mask_selection=self.config.datasets.safe_mask.methodssafe_mask_settings=self.config.datasets.safe_mask.parametersreturnSafeMaskMaker(methods=safe_mask_selection,**safe_mask_settings)def_create_background_maker(self):"""Create the Background maker."""log.info("Creating the background Maker.")datasets_settings=self.config.datasetsbkg_maker_config={}ifdatasets_settings.background.exclusion:path=make_path(datasets_settings.background.exclusion)exclusion_mask=Map.read(path)exclusion_mask.data=exclusion_mask.data.astype(bool)bkg_maker_config["exclusion_mask"]=exclusion_maskbkg_maker_config.update(datasets_settings.background.parameters)bkg_method=datasets_settings.background.methodbkg_maker=Noneifbkg_method=="fov_background":log.debug(f"Creating FoVBackgroundMaker with arguments {bkg_maker_config}")bkg_maker=FoVBackgroundMaker(**bkg_maker_config)elifbkg_method=="ring":bkg_maker=RingBackgroundMaker(**bkg_maker_config)log.debug(f"Creating RingBackgroundMaker with arguments {bkg_maker_config}")ifdatasets_settings.geom.axes.energy.nbins>1:raiseValueError("You need to define a single-bin energy geometry for your dataset.")elifbkg_method=="reflected":bkg_maker=ReflectedRegionsBackgroundMaker(**bkg_maker_config)log.debug(f"Creating ReflectedRegionsBackgroundMaker with arguments {bkg_maker_config}")else:log.warning("No background maker set. Check configuration.")returnbkg_makerdef_map_making(self):"""Make maps and datasets for 3d analysis"""datasets_settings=self.config.datasetsoffset_max=datasets_settings.geom.selection.offset_maxlog.info("Creating reference dataset and makers.")stacked=self._create_reference_dataset(name="stacked")maker=self._create_dataset_maker()maker_safe_mask=self._create_safe_mask_maker()bkg_maker=self._create_background_maker()makers=[maker,maker_safe_mask,bkg_maker]makers=[makerformakerinmakersifmakerisnotNone]log.info("Start the data reduction loop.")datasets_maker=DatasetsMaker(makers,stack_datasets=datasets_settings.stack,n_jobs=self.config.general.n_jobs,cutout_mode="trim",cutout_width=2*offset_max,)self.datasets=datasets_maker.run(stacked,self.observations)# TODO: move progress bar to DatasetsMaker but how with multiprocessing ?def_spectrum_extraction(self):"""Run all steps for the spectrum extraction."""log.info("Reducing spectrum datasets.")datasets_settings=self.config.datasetsdataset_maker=self._create_dataset_maker()safe_mask_maker=self._create_safe_mask_maker()bkg_maker=self._create_background_maker()reference=self._create_reference_dataset()datasets=[]forobsinprogress_bar(self.observations,desc="Observations"):log.debug(f"Processing observation {obs.obs_id}")dataset=dataset_maker.run(reference.copy(),obs)ifbkg_makerisnotNone:dataset=bkg_maker.run(dataset,obs)ifdataset.counts_offisNone:log.debug(f"No OFF region found for observation {obs.obs_id}. Discarding.")continuedataset=safe_mask_maker.run(dataset,obs)log.debug(dataset)datasets.append(dataset)self.datasets=Datasets(datasets)ifdatasets_settings.stack:stacked=self.datasets.stack_reduce(name="stacked")self.datasets=Datasets([stacked])@staticmethoddef_make_energy_axis(axis,name="energy"):ifaxis.minisNoneoraxis.maxisNone:returnNoneelifaxis.nbinsisNoneoraxis.nbins<1:returnNoneelse:returnMapAxis.from_bounds(name=name,lo_bnd=axis.min.value,hi_bnd=axis.max.to_value(axis.min.unit),nbin=axis.nbins,unit=axis.min.unit,interp="log",node_type="edges",)