Source code for gammapy.datasets.make

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Make example datasets.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import numpy as np
from astropy.units import Quantity
from astropy.time import Time, TimeDelta
from astropy.coordinates import SkyCoord, AltAz, Angle
from astropy.table import Table
from ..extern.pathlib import Path
from ..utils.random import sample_sphere, sample_powerlaw, get_random_state
from ..utils.time import time_ref_from_dict, time_relative_to_ref
from ..utils.fits import table_to_fits_table

__all__ = [
    'make_test_psf',
    'make_test_observation_table',
    'make_test_bg_cube_model',
    'make_test_dataset',
    'make_test_eventlist',
]


[docs]def make_test_psf(energy_bins=15, theta_bins=12): """Create a test FITS PSF file. A log-linear dependency in energy is assumed, where the size of the PSF decreases by a factor of tow over tow decades. The theta dependency is a parabola where at theta = 2 deg the size of the PSF has increased by 30%. Parameters ---------- energy_bins : int Number of energy bins. theta_bins : int Number of theta bins. Returns ------- psf : `~gammapy.irf.EnergyDependentMultiGaussPSF` PSF. """ from ..irf import EnergyDependentMultiGaussPSF energies_all = np.logspace(-1, 2, energy_bins + 1) energies_lo = energies_all[:-1] energies_hi = energies_all[1:] theta_lo = theta_hi = np.linspace(0, 2.2, theta_bins) def sigma_energy_theta(energy, theta, sigma): # log-linear dependency of sigma with energy # m and b are choosen such, that at 100 TeV # we have sigma and at 0.1 TeV we have sigma/2 m = -sigma / 6. b = sigma + m return (2 * b + m * np.log10(energy)) * (0.3 / 4 * theta ** 2 + 1) # Compute norms and sigmas values are taken from the psf.txt in # irf/test/data energies, thetas = np.meshgrid(energies_lo, theta_lo) sigmas = [] for sigma in [0.0219206, 0.0905762, 0.0426358]: sigmas.append(sigma_energy_theta(energies, thetas, sigma)) norms = [] for norm in 302.654 * np.array([1, 0.0406003, 0.444632]): norms.append(norm * np.ones((theta_bins, energy_bins))) psf = EnergyDependentMultiGaussPSF(Quantity(energies_lo, 'TeV'), Quantity(energies_hi, 'TeV'), Quantity(theta_lo, 'deg'), sigmas, norms ) return psf
[docs]def make_test_observation_table(observatory_name='HESS', n_obs=10, az_range=Angle([0, 360], 'deg'), alt_range=Angle([45, 90], 'deg'), date_range=(Time('2010-01-01'), Time('2015-01-01')), use_abs_time=False, n_tels_range=(3, 4), random_state='random-seed'): """Make a test observation table. Create an observation table following a specific pattern. For the moment, only random observation tables are created. The observation table is created according to a specific observatory, and randomizing the observation pointingpositions in a specified az-alt range. If a *date_range* is specified, the starting time of the observations will be restricted to the specified interval. These parameters are interpreted as date, the precise hour of the day is ignored, unless the end date is closer than 1 day to the starting date, in which case, the precise time of the day is also considered. In addition, a range can be specified for the number of telescopes. Parameters ---------- observatory_name : str, optional Name of the observatory; a list of choices is given in `~gammapy.data.observatory_locations`. n_obs : int, optional Number of observations for the obs table. az_range : `~astropy.coordinates.Angle`, optional Azimuth angle range (start, end) for random generation of observation pointing positions. alt_range : `~astropy.coordinates.Angle`, optional Altitude angle range (start, end) for random generation of observation pointing positions. date_range : `~astropy.time.Time`, optional Date range (start, end) for random generation of observation start time. use_abs_time : bool, optional Use absolute UTC times instead of [MET]_ seconds after the reference. n_tels_range : int, optional Range (start, end) of number of telescopes participating in the observations. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}, optional Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- obs_table : `~gammapy.data.ObservationTable` Observation table. """ from ..data import ObservationTable, observatory_locations random_state = get_random_state(random_state) n_obs_start = 1 obs_table = ObservationTable() # build a time reference as the start of 2010 dateref = Time('2010-01-01T00:00:00') dateref_mjd_fra, dateref_mjd_int = np.modf(dateref.mjd) # define table header obs_table.meta['OBSERVATORY_NAME'] = observatory_name obs_table.meta['MJDREFI'] = dateref_mjd_int obs_table.meta['MJDREFF'] = dateref_mjd_fra if use_abs_time: # show the observation times in UTC obs_table.meta['TIME_FORMAT'] = 'absolute' else: # show the observation times in seconds after the reference obs_table.meta['TIME_FORMAT'] = 'relative' header = obs_table.meta # obs id obs_id = np.arange(n_obs_start, n_obs_start + n_obs) obs_table['OBS_ID'] = obs_id # obs time: 30 min ontime = Quantity(30. * np.ones_like(obs_id), 'minute').to('second') obs_table['ONTIME'] = ontime # livetime: 25 min time_live = Quantity(25. * np.ones_like(obs_id), 'minute').to('second') obs_table['LIVETIME'] = time_live # start time # - random points between the start of 2010 and the end of 2014 (unless # otherwise specified) # - using the start of 2010 as a reference time for the header of the table # - observations restrict to night time (only if specified time interval is # more than 1 day) # - considering start of astronomical day at midday: implicit in setting # the start of the night, when generating random night hours datestart = date_range[0] dateend = date_range[1] time_start = random_state.uniform(datestart.mjd, dateend.mjd, len(obs_id)) time_start = Time(time_start, format='mjd', scale='utc') # check if time interval selected is more than 1 day if (dateend - datestart).jd > 1.: # keep only the integer part (i.e. the day, not the fraction of the day) time_start_f, time_start_i = np.modf(time_start.mjd) time_start = Time(time_start_i, format='mjd', scale='utc') # random generation of night hours: 6 h (from 22 h to 4 h), leaving 1/2 h # time for the last run to finish night_start = Quantity(22., 'hour') night_duration = Quantity(5.5, 'hour') hour_start = random_state.uniform(night_start.value, night_start.value + night_duration.value, len(obs_id)) hour_start = Quantity(hour_start, 'hour') # add night hour to integer part of MJD time_start += hour_start if use_abs_time: # show the observation times in UTC time_start = Time(time_start.isot) else: # show the observation times in seconds after the reference time_start = time_relative_to_ref(time_start, header) # converting to quantity (better treatment of units) time_start = Quantity(time_start.sec, 'second') obs_table['TSTART'] = time_start # stop time # calculated as TSTART + ONTIME if use_abs_time: time_stop = Time(obs_table['TSTART']) time_stop += TimeDelta(obs_table['ONTIME']) else: time_stop = TimeDelta(obs_table['TSTART']) time_stop += TimeDelta(obs_table['ONTIME']) # converting to quantity (better treatment of units) time_stop = Quantity(time_stop.sec, 'second') obs_table['TSTOP'] = time_stop # az, alt # random points in a portion of sphere; default: above 45 deg altitude az, alt = sample_sphere(size=len(obs_id), lon_range=az_range, lat_range=alt_range, random_state=random_state) az = Angle(az, 'deg') alt = Angle(alt, 'deg') obs_table['AZ'] = az obs_table['ALT'] = alt # RA, dec # derive from az, alt taking into account that alt, az represent the values # at the middle of the observation, i.e. at time_ref + (TIME_START + TIME_STOP)/2 # (or better: time_ref + TIME_START + (TIME_OBSERVATION/2)) # in use_abs_time mode, the time_ref should not be added, since it's already included # in TIME_START and TIME_STOP az = Angle(obs_table['AZ']) alt = Angle(obs_table['ALT']) if use_abs_time: obstime = Time(obs_table['TSTART']) obstime += TimeDelta(obs_table['ONTIME']) / 2. else: obstime = time_ref_from_dict(obs_table.meta) obstime += TimeDelta(obs_table['TSTART']) obstime += TimeDelta(obs_table['ONTIME']) / 2. location = observatory_locations[observatory_name] altaz_frame = AltAz(obstime=obstime, location=location) alt_az_coord = SkyCoord(az, alt, frame=altaz_frame) sky_coord = alt_az_coord.transform_to('icrs') obs_table['RA'] = sky_coord.ra obs_table['DEC'] = sky_coord.dec # positions # number of telescopes # random integers in a specified range; default: between 3 and 4 n_tels = random_state.randint(n_tels_range[0], n_tels_range[1] + 1, len(obs_id)) obs_table['N_TELS'] = n_tels # muon efficiency # random between 0.6 and 1.0 muoneff = random_state.uniform(low=0.6, high=1.0, size=len(obs_id)) obs_table['MUONEFF'] = muoneff return obs_table
[docs]def make_test_bg_cube_model(detx_range=Angle([-10., 10.], 'deg'), ndetx_bins=24, dety_range=Angle([-10., 10.], 'deg'), ndety_bins=24, energy_band=Quantity([0.01, 100.], 'TeV'), nenergy_bins=14, altitude=Angle(70., 'deg'), sigma=Angle(5., 'deg'), spectral_index=2.7, apply_mask=False, do_not_force_mev_units=False): """Make a test bg cube model. The background counts cube is created following a 2D symmetric gaussian model for the spatial coordinates (X, Y) and a power-law in energy. The gaussian width varies in energy from sigma/2 to sigma. The power-law slope in log-log representation is given by the spectral_index parameter. The norm depends linearly on the livetime and on the altitude angle of the observation. It is possible to mask 1/4th of the image (for **x > x_center** and **y > y_center**). Useful for testing coordinate rotations. Per default units of *1 / (MeV sr s)* for the bg rate are enforced, unless *do_not_force_mev_units* is set. This is in agreement to the convention applied in `~gammapy.background.make_bg_cube_model`. This method is useful for instance to produce true (simulated) background cube models to compare to the reconstructed ones produced with `~gammapy.background.make_bg_cube_model`. For details on how to do this, please refer to :ref:`background_make_background_models_datasets_for_testing`. Parameters ---------- detx_range : `~astropy.coordinates.Angle`, optional X coordinate range (min, max). ndetx_bins : int, optional Number of (linear) bins in X coordinate. dety_range : `~astropy.coordinates.Angle`, optional Y coordinate range (min, max). ndety_bins : int, optional Number of (linear) bins in Y coordinate. energy_band : `~astropy.units.Quantity`, optional Energy range (min, max). nenergy_bins : int, optional Number of (logarithmic) bins in energy. altitude : `~astropy.coordinates.Angle`, optional observation altitude angle for the model. sigma : `~astropy.coordinates.Angle`, optional Width of the gaussian model used for the spatial coordinates. spectral_index : float, optional Index for the power-law model used for the energy coordinate. apply_mask : bool, optional If set, 1/4th of the image is masked (for **x > x_center** and **y > y_center**). do_not_force_mev_units : bool, optional Set to ``True`` to use the same energy units as the energy binning for the bg rate. Returns ------- bg_cube_model : `~gammapy.background.FOVCubeBackgroundModel` Bacground cube model. """ from ..background import FOVCubeBackgroundModel # spatial bins (linear) delta_x = (detx_range[1] - detx_range[0]) / ndetx_bins detx_bin_edges = np.arange(ndetx_bins + 1) * delta_x + detx_range[0] delta_y = (dety_range[1] - dety_range[0]) / ndety_bins dety_bin_edges = np.arange(ndety_bins + 1) * delta_y + dety_range[0] # energy bins (logarithmic) log_delta_energy = (np.log(energy_band[1].value) - np.log(energy_band[0].value)) / nenergy_bins energy_bin_edges = np.exp(np.arange(nenergy_bins + 1) * log_delta_energy + np.log(energy_band[0].value)) energy_bin_edges = Quantity(energy_bin_edges, energy_band[0].unit) # TODO: this function should be reviewed/re-written, when # the following PR is completed: # https://github.com/gammapy/gammapy/pull/290 # define empty bg cube model and set bins bg_cube_model = FOVCubeBackgroundModel.set_cube_binning(detx_edges=detx_bin_edges, dety_edges=dety_bin_edges, energy_edges=energy_bin_edges) # counts # define coordinate grids for the calculations det_bin_centers = bg_cube_model.counts_cube.image_bin_centers energy_bin_centers = bg_cube_model.counts_cube.energy_edges.log_centers energy_points, dety_points, detx_points = np.meshgrid(energy_bin_centers, det_bin_centers[1], det_bin_centers[0], indexing='ij') E_0 = Quantity(1., 'TeV') # reference energy for the model # norm of the model # taking as reference for now a dummy value of 1 # it is linearly dependent on the zenith angle (90 deg - altitude) # it is norm_max at alt = 90 deg and norm_max/2 at alt = 0 deg norm_max = Quantity(1, '') alt_min = Angle(0., 'deg') alt_max = Angle(90., 'deg') slope = (norm_max - norm_max / 2) / (alt_max - alt_min) free_term = norm_max / 2 - slope * alt_min norm = altitude * slope + free_term # define E dependent sigma # it is defined via a PL, in order to be log-linear # it is equal to the parameter sigma at E max # and sigma/2. at E min sigma_min = sigma / 2. # at E min sigma_max = sigma # at E max s_index = np.log(sigma_max / sigma_min) s_index /= np.log(energy_bin_edges[-1] / energy_bin_edges[0]) s_norm = sigma_min * ((energy_bin_edges[0] / E_0) ** -s_index) sigma = s_norm * ((energy_points / E_0) ** s_index) # calculate counts gaussian = np.exp(-((detx_points) ** 2 + (dety_points) ** 2) / sigma ** 2) powerlaw = (energy_points / E_0) ** -spectral_index counts = norm * gaussian * powerlaw bg_cube_model.counts_cube.data = Quantity(counts, '') # livetime # taking as reference for now a dummy value of 1 s livetime = Quantity(1., 'second') bg_cube_model.livetime_cube.data += livetime # background bg_cube_model.background_cube.data = bg_cube_model.counts_cube.data.copy() bg_cube_model.background_cube.data /= bg_cube_model.livetime_cube.data bg_cube_model.background_cube.data /= bg_cube_model.background_cube.bin_volume # bg_cube_model.background_cube.set_zero_level() if not do_not_force_mev_units: # use units of 1 / (MeV sr s) for the bg rate bg_rate = bg_cube_model.background_cube.data.to('1 / (MeV sr s)') bg_cube_model.background_cube.data = bg_rate # apply mask if requested if apply_mask: # find central coordinate detx_center = (detx_range[1] + detx_range[0]) / 2. dety_center = (dety_range[1] + dety_range[0]) / 2. mask = (detx_points <= detx_center) & (dety_points <= dety_center) bg_cube_model.counts_cube.data *= mask bg_cube_model.livetime_cube.data *= mask bg_cube_model.background_cube.data *= mask return bg_cube_model
[docs]def make_test_dataset(outdir, overwrite=False, observatory_name='HESS', n_obs=10, az_range=Angle([0, 360], 'deg'), alt_range=Angle([45, 90], 'deg'), date_range=(Time('2010-01-01'), Time('2015-01-01')), n_tels_range=(3, 4), sigma=Angle(5., 'deg'), spectral_index=2.7, random_state='random-seed'): """ Make a test dataset and save it to disk. Uses: * `~gammapy.datasets.make_test_observation_table` to generate an observation table * `~gammapy.datasets.make_test_eventlist` to generate an event list and effective area table for each observation * `~gammapy.data.DataStore` to handle the file naming scheme; currently only the H.E.S.S. naming scheme is supported This method is useful for instance to produce samples in order to test the machinery for reconstructing background (cube) models. See also :ref:`datasets_obssim`. Parameters ---------- outdir : str Path to store the files. overwrite : bool, optional Flag to remove previous datasets in ``outdir`` (if existing). observatory_name : str, optional Name of the observatory; a list of choices is given in `~gammapy.data.observatory_locations`. n_obs : int Number of observations for the obs table. az_range : `~astropy.coordinates.Angle`, optional Azimuth angle range (start, end) for random generation of observation pointing positions. alt_range : `~astropy.coordinates.Angle`, optional Altitude angle range (start, end) for random generation of observation pointing positions. date_range : `~astropy.time.Time`, optional Date range (start, end) for random generation of observation start time. n_tels_range : int, optional Range (start, end) of number of telescopes participating in the observations. sigma : `~astropy.coordinates.Angle`, optional Width of the gaussian model used for the spatial coordinates. spectral_index : float, optional Index for the power-law model used for the energy coordinate. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}, optional Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. """ from ..data import DataStore random_state = get_random_state(random_state) # create output folder Path(outdir).mkdir(exist_ok=overwrite) # generate observation table observation_table = make_test_observation_table(observatory_name=observatory_name, n_obs=n_obs, az_range=az_range, alt_range=alt_range, date_range=date_range, use_abs_time=False, n_tels_range=n_tels_range, random_state=random_state) # save observation list to disk outfile = Path(outdir) / 'runinfo.fits' observation_table.write(str(outfile)) # create data store for the organization of the files # using H.E.S.S.-like dir/file naming scheme if observatory_name == 'HESS': scheme = 'HESS' else: s_error = "Warning! Storage scheme for {}".format(observatory_name) s_error += "not implemented. Only H.E.S.S. scheme is available." raise ValueError(s_error) data_store = DataStore(dir=outdir, scheme=scheme) # loop over observations for obs_id in observation_table['OBS_ID']: event_list, aeff_hdu = make_test_eventlist(observation_table=observation_table, obs_id=obs_id, sigma=sigma, spectral_index=spectral_index, random_state=random_state) # save event list and effective area table to disk outfile = data_store.filename(obs_id, filetype='events') outfile_split = outfile.rsplit("/", 1) os.makedirs(outfile_split[0]) # recursively event_list.write(outfile) outfile = data_store.filename(obs_id, filetype='effective area') aeff_hdu.writeto(outfile)
[docs]def make_test_eventlist(observation_table, obs_id, sigma=Angle(5., 'deg'), spectral_index=2.7, random_state='random-seed'): """ Make a test event list for a specified observation. The observation can be specified with an observation table object and the observation ID pointing to the correct observation in the table. For now, only a very rudimentary event list is generated, containing only detector X, Y coordinates (a.k.a. nominal system) and energy columns for the events. And the livetime of the observations stored in the header. The model used to simulate events is also very simple. Only dummy background is created (no signal). The background is created following a 2D symmetric gaussian model for the spatial coordinates (X, Y) and a power-law in energy. The gaussian width varies in energy from sigma/2 to sigma. The number of events generated depends linearly on the livetime and on the altitude angle of the observation. The model can be tuned via the sigma and spectral_index parameters. In addition, an effective area table is produced. For the moment only the low energy threshold is filled. See also :ref:`datasets_obssim`. Parameters ---------- observation_table : `~gammapy.data.ObservationTable` Observation table containing the observation to fake. obs_id : int Observation ID of the observation to fake inside the observation table. sigma : `~astropy.coordinates.Angle`, optional Width of the gaussian model used for the spatial coordinates. spectral_index : float, optional Index for the power-law model used for the energy coordinate. random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}, optional Defines random number generator initialisation. Passed to `~gammapy.utils.random.get_random_state`. Returns ------- event_list : `~gammapy.data.EventList` Event list. aeff_hdu : `~astropy.io.fits.BinTableHDU` Effective area table. """ from ..data import EventList random_state = get_random_state(random_state) # find obs row in obs table obs_ids = observation_table['OBS_ID'].data obs_index = np.where(obs_ids == obs_id) row = obs_index[0][0] # get observation information alt = Angle(observation_table['ALT'])[row] livetime = Quantity(observation_table['LIVETIME'])[row] # number of events to simulate # it is linearly dependent on the livetime, taking as reference # a trigger rate of 300 Hz # it is linearly dependent on the zenith angle (90 deg - altitude) # it is n_events_max at alt = 90 deg and n_events_max/2 at alt = 0 deg n_events_max = Quantity(300., 'Hz') * livetime alt_min = Angle(0., 'deg') alt_max = Angle(90., 'deg') slope = (n_events_max - n_events_max / 2) / (alt_max - alt_min) free_term = n_events_max / 2 - slope * alt_min n_events = alt * slope + free_term # simulate energy # the index of `~numpy.random.RandomState.power` has to be # positive defined, so it is necessary to translate the (0, 1) # interval of the random variable to (emax, e_min) in order to # have a decreasing power-law e_min = Quantity(0.1, 'TeV') e_max = Quantity(100., 'TeV') energy = sample_powerlaw(e_min.value, e_max.value, spectral_index, size=n_events, random_state=random_state) energy = Quantity(energy, 'TeV') E_0 = Quantity(1., 'TeV') # reference energy for the model # define E dependent sigma # it is defined via a PL, in order to be log-linear # it is equal to the parameter sigma at E max # and sigma/2. at E min sigma_min = sigma / 2. # at E min sigma_max = sigma # at E max s_index = np.log(sigma_max / sigma_min) s_index /= np.log(e_max / e_min) s_norm = sigma_min * ((e_min / E_0) ** -s_index) sigma = s_norm * ((energy / E_0) ** s_index) # simulate detx, dety detx = Angle(random_state.normal(loc=0, scale=sigma.deg, size=n_events), 'deg') dety = Angle(random_state.normal(loc=0, scale=sigma.deg, size=n_events), 'deg') # fill events in an event list event_list = EventList() event_list['DETX'] = detx event_list['DETY'] = dety event_list['ENERGY'] = energy # store important info in header event_list.meta['LIVETIME'] = livetime.to('second').value event_list.meta['EUNIT'] = str(energy.unit) # effective area table aeff_table = Table() # fill threshold, for now, a default 100 GeV will be set # independently of observation parameters energy_threshold = Quantity(0.1, 'TeV') aeff_table.meta['LO_THRES'] = energy_threshold.value aeff_table.meta['name'] = 'EFFECTIVE AREA' # convert to BinTableHDU and add necessary comment for the units aeff_hdu = table_to_fits_table(aeff_table) aeff_hdu.header.comments['LO_THRES'] = '[' + str(energy_threshold.unit) + ']' return event_list, aeff_hdu