Source code for gammapy.scripts.cube_background

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
TODO: review this code, move what's useful to `background_model.py`.


"""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
from astropy.coordinates import Angle, SkyCoord
from ..extern.pathlib import Path
from ..utils.scripts import get_parser, set_up_logging_from_args
from ..data import (ObservationTable, DataStore, ObservationGroups,
                   ObservationGroupAxis)
# from ..background import make_bg_cube_model

__all__ = ['make_bg_cube_models',
           'create_bg_observation_list',
           'group_observations',
           'stack_observations',
           ]

log = logging.getLogger(__name__)


def make_bg_cube_models_main(args=None):
    parser = get_parser(make_bg_cube_models)
    parser.add_argument('indir', type=str,
                        help='Input directory (that contains the event lists)')
    parser.add_argument('outdir', type=str,
                        help='Dir path to store the results.')
    parser.add_argument('--overwrite', action='store_true',
                        help='Overwrite existing output file?')
    parser.add_argument('--test', action='store_true',
                        help='If activated, use a subset of '
                             'observations for testing purposes')
    parser.add_argument('--method', type=str, default='default',
                        choices=['default', 'michi'],
                        help='Bg cube model calculation method to apply.')
    parser.add_argument("-l", "--loglevel", default='info',
                        choices=['debug', 'info', 'warning', 'error', 'critical'],
                        help="Set the logging level")
    args = parser.parse_args(args)

    set_up_logging_from_args(args)

    make_bg_cube_models(**vars(args))


[docs]def make_bg_cube_models(indir, outdir, overwrite=False, test=False, method='default'): """Create background cube models from the complete dataset of an experiment. Starting with gamma-ray event lists and effective area IRFs, make background templates. Steps 1. make a global event list from a datastore 2. filter the runs keeping only the ones far from known sources 3. group the runs according to similar observation conditions (i.e. alt, az) * using `~gammapy.data.ObservationGroups` 4. create a bg cube model for each group using: * the `~gammapy.background.make_bg_cube_model` method * and `~gammapy.background.FOVCubeBackgroundModel` objects as containers The models are stored into FITS files. It can take a few minutes to run. For a quicker test, please activate the **test** flag. Parameters ---------- indir : str Input directory (that contains the event lists) outdir : str Dir path to store the results. overwrite : bool If true, run fast (not recommended for analysis). test : bool If true, run fast (not recommended for analysis). method : {'default', 'michi'} Bg cube model calculation method to apply. Examples -------- $ gammapy-make-bg-cube-models -h $ gammapy-make-bg-cube-models <indir> HESS bg_cube_models $ gammapy-make-bg-cube-models <indir> HESS bg_cube_models --test $ gammapy-make-bg-cube-models <indir> HESS bg_cube_models --test --overwrite $ gammapy-make-bg-cube-models <indir> HESS bg_cube_models --method michi """ Path(outdir).mkdir(exist_ok=overwrite) create_bg_observation_list(indir, outdir, overwrite, test) group_observations(outdir, overwrite, test) stack_observations(indir, outdir, overwrite, method)
[docs]def create_bg_observation_list(indir, outdir, overwrite, test): """Make total observation list and filter the observations. In a first version, all obs taken within 3 deg of a known source will be rejected. If a source is extended, twice the extension is added to the corresponding exclusion region radius of 3 deg. Parameters ---------- indir : str Input directory (that contains the event lists) outdir : str Dir path to store the results. overwrite : bool If true, run fast (not recommended for analysis). test : bool If true, run fast: skip many runs and catalog sources. """ log.info(' ') log.info("#######################################") log.info("# Starting create_bg_observation_list #") log.info("#######################################") # get full list of observations data_store = DataStore.from_dir(indir) observation_table = data_store.obs_table # for testing, only process a small subset of observations if test and len(observation_table) > 100: observation_table = observation_table.select_linspace_subset(num=100) log.debug(' ') log.debug("Full observation table:") log.debug(observation_table) # TODO: filter observations: load catalog and reject obs too close to sources # load catalog: TeVCAT (no H.E.S.S. catalog) catalog = load_catalog_tevcat() # for testing, only process a small subset of sources if test: catalog = catalog[:5] # sources coordinates sources_coord = SkyCoord(catalog['coord_ra'], catalog['coord_dec']) # sources sizes (x, y): radius sources_size = Angle([catalog['size_x'], catalog['size_y']]) sources_size = sources_size.reshape(len(catalog), 2) # substitute nan with 0 sources_size[np.isnan(sources_size)] = 0 # sources max size sources_max_size = np.amax(sources_size, axis=1) # sources exclusion radius = 2x max size + 3 deg (fov + 0.5 deg?) sources_excl_radius = 2 * sources_max_size + Angle(3., 'deg') # mask all obs taken within the excl radius of any of the sources # loop over sources obs_coords = SkyCoord(observation_table['RA'], observation_table['DEC']) for i_source in range(len(catalog)): selection = dict(type='sky_circle', frame='icrs', lon=sources_coord[i_source].ra, lat=sources_coord[i_source].dec, radius=sources_excl_radius[i_source], inverted=True, border=Angle(0., 'deg')) observation_table = observation_table.select_observations(selection) # save the bg observation list to a fits file outfile = Path(outdir) / 'bg_observation_table.fits.gz' log.info("Writing {}".format(outfile)) observation_table.write(str(outfile), overwrite=overwrite)
[docs]def group_observations(outdir, overwrite, test): """Group list of observations runs according to observation properties. The observations are grouped into observation groups (bins) according to their altitude and azimuth angle. Parameters ---------- outdir : str Dir path to store the results. overwrite : bool If true, run fast (not recommended for analysis). test : bool If true, run fast: define coarse binning for observation grouping. """ log.info(' ') log.info("###############################") log.info("# Starting group_observations #") log.info("###############################") # read bg observation table from file infile = Path(outdir) / 'bg_observation_table.fits.gz' observation_table = ObservationTable.read(str(infile)) # define observation binning altitude_edges = Angle([0, 20, 23, 27, 30, 33, 37, 40, 44, 49, 53, 58, 64, 72, 90], 'deg') azimuth_edges = Angle([-90, 90, 270], 'deg') # for testing, only process a small subset of bins if test: altitude_edges = Angle([0, 45, 90], 'deg') azimuth_edges = Angle([90, 270], 'deg') # define axis for the grouping list_obs_group_axis = [ObservationGroupAxis('ALT', altitude_edges, 'bin_edges'), ObservationGroupAxis('AZ', azimuth_edges, 'bin_edges')] # create observation groups observation_groups = ObservationGroups(list_obs_group_axis) log.info(' ') log.info("Observation group axes:") log.info(observation_groups.info) log.debug("Observation groups table (group definitions):") log.debug(observation_groups.obs_groups_table) # group observations in the obs table according to the obs groups observation_table_grouped = observation_table # wrap azimuth angles to [-90, 270) deg because of the definition # of the obs group azimuth axis azimuth = Angle(observation_table_grouped['AZ']).wrap_at(Angle(270., 'deg')) observation_table_grouped['AZ'] = azimuth # apply grouping observation_table_grouped = observation_groups.apply(observation_table_grouped) # wrap azimuth angles back to [0, 360) deg azimuth = Angle(observation_table_grouped['AZ']).wrap_at(Angle(360., 'deg')) observation_table_grouped['AZ'] = azimuth log.debug(' ') log.debug("Observation table grouped:") log.debug(observation_table_grouped) # save the observation groups and the grouped bg observation list to file outfile = Path(outdir) / 'bg_observation_groups.ecsv' log.info("Writing {}".format(outfile)) observation_groups.write(str(outfile), overwrite=overwrite) outfile = Path(outdir) / 'bg_observation_table_grouped.fits.gz' log.info("Writing {}".format(outfile)) observation_table_grouped.write(str(outfile), overwrite=overwrite)
[docs]def stack_observations(indir, outdir, overwrite, method='default'): """Stack events for each observation group (bin) and make background model. The models are stored into FITS files. Parameters ---------- indir : str Input directory (that contains the event lists) outdir : str Dir path to store the results. overwrite : bool If true, run fast (not recommended for analysis). method : {'default', 'michi'}, optional Bg cube model calculation method to apply. """ log.info(' ') log.info("###############################") log.info("# Starting stack_observations #") log.info("###############################") # read observation grouping and grouped observation table infile = Path(outdir) / 'bg_observation_groups.ecsv' observation_groups = ObservationGroups.read(str(infile)) infile = Path(outdir) / 'bg_observation_table_grouped.fits.gz' observation_table_grouped = ObservationTable.read(str(infile)) # loop over observation groups groups = observation_groups.list_of_groups log.info(' ') log.info("List of groups to process: {}".format(groups)) for group in groups: log.info(' ') log.info("Processing group: {}".format(group)) # get group of observations observation_table = observation_groups.get_group_of_observations( observation_table_grouped, group) log.debug(observation_table) # skip bins with no observations if len(observation_table) == 0: log.warning("Group {} is empty.".format(group)) continue # skip the rest # create bg cube model bg_cube_model = make_bg_cube_model(observation_table, indir, method) # save model to file for format in ['table', 'image']: filename = 'bg_cube_model_group{}_{}.fits.gz'.format(group, format) filename = Path(outdir) / filename log.info("Writing {}".format(filename)) bg_cube_model.write(str(filename), format=format, clobber=overwrite)