# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Gammacat open TeV source catalog.
https://github.com/gammapy/gamma-cat
"""
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import OrderedDict, namedtuple
import functools
import logging
import numpy as np
from ..extern import six
from astropy import units as u
from astropy.table import Table
from astropy.coordinates import Angle
from astropy.modeling.models import Gaussian2D
from ..utils.modeling import SourceModel, SourceLibrary, UnknownModelError
from ..utils.scripts import make_path
from ..spectrum import FluxPoints
from ..spectrum.models import PowerLaw, PowerLaw2, ExponentialCutoffPowerLaw
from ..image.models import Shell2D, Delta2D
from .core import SourceCatalog, SourceCatalogObject
__all__ = [
'SourceCatalogGammaCat',
'SourceCatalogObjectGammaCat',
'GammaCatDataCollection',
'GammaCatResource',
'GammaCatResourceIndex',
]
log = logging.getLogger(__name__)
class NoDataAvailableError(LookupError):
"""Generic error used in Gammapy, when some data isn't available.
"""
pass
class GammaCatNotFoundError(OSError):
"""The gammapy-cat repo is not available.
You have to set the GAMMA_CAT environment variable so that it's found.
"""
pass
[docs]class SourceCatalogObjectGammaCat(SourceCatalogObject):
"""One object from the gamma-cat source catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalogGammaCat`.
"""
_source_name_key = 'common_name'
_source_index_key = 'catalog_row_index'
def __str__(self):
return self.info()
[docs] def info(self, info='all'):
"""Info string.
Parameters
----------
info : {'all', 'basic', 'position, 'model'}
Comma separated list of options
"""
if info == 'all':
info = 'basic,position,model'
ss = ''
ops = info.split(',')
if 'basic' in ops:
ss += self._info_basic()
if 'position' in ops:
ss += self._info_position()
if 'model' in ops:
ss += self._info_morph()
ss += self._info_spectral_fit()
ss += self._info_spectral_points()
return ss
def _info_basic(self):
"""Print basic info."""
d = self.data
ss = '\n*** Basic info ***\n\n'
ss += 'Catalog row index (zero-based) : {}\n'.format(d['catalog_row_index'])
ss += '{:<15s} : {}\n'.format('Common name', d['common_name'])
# ss += '{:<15s} : {}\n'.format('Gamma names', d['gamma_names'])
# ss += '{:<15s} : {}\n'.format('Fermi names', d['fermi_names'])
# ss += '{:<15s} : {}\n'.format('Other names', d['other_names'])
def get_nonentry_keys(keys):
vals = [d[_].strip() for _ in keys]
return ','.join([_ for _ in vals if _ != ''])
keys = ['gamma_names', 'fermi_names', 'other_names']
other_names = get_nonentry_keys(keys)
ss += '{:<15s} : {}\n'.format('Other names', other_names)
ss += '{:<15s} : {}\n'.format('Location', d['where'])
ss += '{:<15s} : {}\n'.format('Class', d['classes'])
ss += '\n{:<15s} : {}\n'.format('TeVCat ID', d['tevcat_id'])
ss += '{:<15s} : {}\n'.format('TeVCat 2 ID', d['tevcat2_id'])
ss += '{:<15s} : {}\n'.format('TeVCat name', d['tevcat_name'])
ss += '\n{:<15s} : {}\n'.format('TGeVCat ID', d['tgevcat_id'])
ss += '{:<15s} : {}\n'.format('TGeVCat name', d['tgevcat_name'])
ss += '\n{:<15s} : {}\n'.format('Discoverer', d['discoverer'])
ss += '{:<15s} : {}\n'.format('Discovery date', d['discovery_date'])
ss += '{:<15s} : {}\n'.format('Seen by', d['seen_by'])
ss += '{:<15s} : {}\n'.format('Reference', d['reference_id'])
return ss
def _info_position(self):
"""Print position info."""
d = self.data
ss = '\n*** Position info ***\n\n'
ss += 'SIMBAD:\n'
ss += '{:<20s} : {:.3f}\n'.format('RA', d['ra'])
ss += '{:<20s} : {:.3f}\n'.format('DEC', d['dec'])
ss += '{:<20s} : {:.3f}\n'.format('GLON', d['glon'])
ss += '{:<20s} : {:.3f}\n'.format('GLAT', d['glat'])
ss += '\nMeasurement:\n'
ss += '{:<20s} : {:.3f}\n'.format('RA', d['pos_ra'])
ss += '{:<20s} : {:.3f}\n'.format('DEC', d['pos_dec'])
ss += '{:<20s} : {:.3f}\n'.format('GLON', d['pos_glon'])
ss += '{:<20s} : {:.3f}\n'.format('GLAT', d['pos_glat'])
ss += '{:<20s} : {:.3f}\n'.format('Position error', d['pos_err'])
return ss
def _info_morph(self):
"""Print morphology info."""
ss = '\n*** Morphology info ***\n\n'
d = self.data
ss += '{:<25s} : {}\n'.format('Morphology model type', d['morph_type'])
# TODO: change to morphology model dependent printout
# (see spectra printout and `spatial_model` property)
ss += '{:<25s} : {:.3f}\n'.format('Sigma', d['morph_sigma'])
ss += '{:<25s} : {:.3f}\n'.format('Sigma error', d['morph_sigma_err'])
ss += '{:<25s} : {:.3f}\n'.format('Sigma2', d['morph_sigma2'])
ss += '{:<25s} : {:.3f}\n'.format('Sigma2 error', d['morph_sigma2_err'])
ss += '{:<25s} : {:.3f}\n'.format('Position angle', d['morph_pa'])
ss += '{:<25s} : {:.3f}\n'.format('Position angle error', d['morph_pa_err'])
ss += '{:<25s} : {}\n'.format('Position angle frame', d['morph_pa_frame'])
return ss
def _info_spectral_fit(self):
"""Print spectral info."""
d = self.data
ss = '\n*** Spectral info ***\n\n'
ss += '{:<15s} : {:.3f}\n'.format('Significance', d['significance'])
ss += '{:<15s} : {:.3f}\n'.format('Livetime', d['livetime'])
spec_type = d['spec_type']
ss += '\n{:<15s} : {}\n'.format('Spectrum type', spec_type)
# Spectral model parameters
if spec_type == 'pl':
ss += '{:<15s} : {:.3} +- {:.3} (stat) +- {:.3} (sys) {}\n'.format(
'norm', d['spec_pl_norm'].value, d['spec_pl_norm_err'].value,
d['spec_pl_norm_err_sys'].value, 'cm-2 s-1 TeV-1')
ss += '{:<15s} : {:.3} +- {:.3} (stat) +- {:.3} (sys)\n'.format(
'index', d['spec_pl_index'], d['spec_pl_index_err'],
d['spec_pl_index_err_sys'])
ss += '{:<15s} : {:.3}\n'.format('reference', d['spec_pl_e_ref'])
elif spec_type == 'pl2':
ss += '{:<15s} : {:.3} +- {:.3} (stat) +- {:.3} (sys) {}\n'.format(
'flux', d['spec_pl2_flux'].value, d['spec_pl2_flux_err'].value,
d['spec_pl2_flux_err_sys'].value, 'cm-2 s-1')
ss += '{:<15s} : {:.3} +- {:.3} (stat)\n'.format(
'index', d['spec_pl2_index'], d['spec_pl2_index_err'],
d['spec_pl2_index_err_sys'])
ss += '{:<15s} : {:.3}\n'.format('e_min', d['spec_pl2_e_min'])
ss += '{:<15s} : {:.3}\n'.format('e_max', d['spec_pl2_e_max'])
elif spec_type == 'ecpl':
ss += '{:<15s} : {:.3g} +- {:.3g} (stat) +- {:.03g} (sys) {}\n'.format(
'norm', d['spec_ecpl_norm'].value, d['spec_ecpl_norm_err'].value,
d['spec_ecpl_norm_err_sys'].value, 'cm-2 s-1 TeV-1')
ss += '{:<15s} : {:.3} +- {:.3} (stat) +- {:.3} (sys)\n'.format(
'index', d['spec_ecpl_index'], d['spec_ecpl_index_err'],
d['spec_ecpl_index_err_sys'])
ss += '{:<15s} : {:.3} +- {:.3} (stat) +- {:.3} (stat) {}\n'.format(
'e_cut', d['spec_ecpl_e_cut'].value, d['spec_ecpl_e_cut_err'].value,
d['spec_ecpl_e_cut_err_sys'].value, 'TeV')
ss += '{:<15s} : {:.3}\n'.format('reference', d['spec_ecpl_e_ref'])
else:
# raise ValueError('Spectral model printout not implemented: {}'.format(spec))
ss += '\nSpectral model printout not yet implemented.\n'
ss += '\n{:<20s} : ({:.3}, {:.3}) TeV\n'.format(
'Energy range', d['spec_erange_min'].value, d['spec_erange_max'].value)
ss += '{:<20s} : {:.3}\n'.format('theta', d['spec_theta'])
ss += '\n\nDerived fluxes:\n'
ss += '{:<30s} : {:.3} +- {:.3} (stat) {}\n'.format(
'Spectral model norm (1 TeV)', d['spec_dnde_1TeV'].value,
d['spec_dnde_1TeV_err'].value, 'cm-2 s-1 TeV-1')
ss += '{:<30s} : {:.3} +- {:.3} (stat) {}\n'.format(
'Integrated flux (>1 TeV)', d['spec_flux_1TeV'].value,
d['spec_flux_1TeV_err'].value, 'cm-2 s-1')
ss += '{:<30s} : {:.3f} +- {:.3f} {}\n'.format(
'Integrated flux (>1 TeV)', d['spec_flux_1TeV_crab'],
d['spec_flux_1TeV_crab_err'], '(% Crab)')
ss += '{:<30s} : {:.3} +- {:.3} (stat) {}\n'.format(
'Integrated flux (1-10 TeV)', d['spec_eflux_1TeV_10TeV'].value,
d['spec_eflux_1TeV_10TeV_err'].value, 'erg cm-2 s-1')
return ss
def _info_spectral_points(self):
"""Print spectral points info."""
d = self.data
ss = '\n*** Spectral points ***\n\n'
ss += '{:<25s} : {}\n'.format('SED reference id', d['sed_reference_id'])
ss += '{:<25s} : {}\n'.format('Number of spectral points', d['sed_n_points'])
ss += '{:<25s} : {}\n\n'.format('Number of upper limits', d['sed_n_ul'])
try:
lines = self._flux_points_table_formatted.pformat(max_width=-1, max_lines=-1)
ss += '\n'.join(lines)
except NoDataAvailableError:
ss += '\nNo spectral points available for this source.'
return ss + '\n'
@property
def spectral_model(self):
"""Source spectral model (`~gammapy.spectrum.models.SpectralModel`).
Parameter errors are statistical errors only.
"""
data = self.data
spec_type = data['spec_type']
pars, errs = {}, {}
if spec_type == 'pl':
model_class = PowerLaw
pars['amplitude'] = data['spec_pl_norm']
errs['amplitude'] = data['spec_pl_norm_err']
pars['index'] = data['spec_pl_index'] * u.Unit('')
errs['index'] = data['spec_pl_index_err'] * u.Unit('')
pars['reference'] = data['spec_pl_e_ref']
elif spec_type == 'pl2':
model_class = PowerLaw2
pars['amplitude'] = data['spec_pl2_flux']
errs['amplitude'] = data['spec_pl2_flux_err']
pars['index'] = data['spec_pl2_index'] * u.Unit('')
errs['index'] = data['spec_pl2_index_err'] * u.Unit('')
pars['emin'] = data['spec_pl2_e_min']
e_max = data['spec_pl2_e_max']
DEFAULT_E_MAX = u.Quantity(1e5, 'TeV')
if np.isnan(e_max.value):
e_max = DEFAULT_E_MAX
pars['emax'] = e_max
elif spec_type == 'ecpl':
model_class = ExponentialCutoffPowerLaw
pars['amplitude'] = data['spec_ecpl_norm']
errs['amplitude'] = data['spec_ecpl_norm_err']
pars['index'] = data['spec_ecpl_index'] * u.Unit('')
errs['index'] = data['spec_ecpl_index_err'] * u.Unit('')
pars['lambda_'] = 1. / data['spec_ecpl_e_cut']
errs['lambda_'] = data['spec_ecpl_e_cut_err'] / data['spec_ecpl_e_cut'] ** 2
pars['reference'] = data['spec_ecpl_e_ref']
else:
raise ValueError('Invalid spec_type: {}'.format(spec_type))
model = model_class(**pars)
model.parameters.set_parameter_errors(errs)
return model
[docs] def spatial_model(self, emin=1 * u.TeV, emax=10 * u.TeV):
"""Source spatial model."""
d = self.data
flux = self.spectral_model.integral(emin, emax)
morph_type = d['morph_type']
pars = {}
glon = Angle(d['glon']).wrap_at('180d')
glat = Angle(d['glat']).wrap_at('180d')
if morph_type == 'gauss':
pars['x_mean'] = glon.value
pars['y_mean'] = glat.value
pars['x_stddev'] = d['morph_sigma'].value
pars['y_stddev'] = d['morph_sigma'].value
if not np.isnan(d['morph_sigma2']):
pars['y_stddev'] = d['morph_sigma2'].value
if not np.isnan(d['morph_pa']):
# TODO: handle reference frame for rotation angle
pars['theta'] = Angle(d['morph_pa'], 'deg').rad
ampl = flux.to('cm-2 s-1').value
pars['amplitude'] = ampl * 1 / (2 * np.pi * pars['x_stddev'] * pars['y_stddev'])
return Gaussian2D(**pars)
elif morph_type == 'shell':
pars['amplitude'] = flux.to('cm-2 s-1').value
pars['x_0'] = glon.value
pars['y_0'] = glat.value
pars['r_in'] = d['morph_sigma'].value * 0.8
pars['width'] = 0.2 * d['morph_sigma'].value
return Shell2D(**pars)
elif morph_type == 'point':
pars['amplitude'] = flux.to('cm-2 s-1').value
pars['x_0'] = glon.value
pars['y_0'] = glat.value
return Delta2D(**pars)
elif morph_type == 'none':
raise NoDataAvailableError('No spatial model available: {}'.format(self.name))
else:
raise NotImplementedError('Unknown spatial model: {!r}'.format(morph_type))
def _add_source_meta(self, table):
"""Copy over some info to table.meta"""
d = self.data
m = table.meta
m['origin'] = 'Data from gamma-cat'
m['source_id'] = d['source_id']
m['common_name'] = d['common_name']
m['reference_id'] = d['reference_id']
@property
def _flux_points_table_formatted(self):
"""Returns formatted version of self.flux_points.table"""
table = self.flux_points.table.copy()
table['e_ref'].format = '.3f'
flux_cols = ['dnde', 'dnde_errn', 'dnde_errp', 'dnde_err']
for _ in set(table.colnames) & set(flux_cols):
table[_].format = '.3e'
return table
@property
def flux_points(self):
"""Differential flux points (`~gammapy.spectrum.FluxPoints`)."""
d = self.data
table = Table()
table.meta['SED_TYPE'] = 'dnde'
self._add_source_meta(table)
valid = np.isfinite(d['sed_e_ref'].value)
if valid.sum() == 0:
raise NoDataAvailableError('No flux points available: {}'.format(self.name))
table['e_ref'] = d['sed_e_ref']
table['e_min'] = d['sed_e_min']
table['e_max'] = d['sed_e_max']
table['dnde'] = d['sed_dnde']
table['dnde_err'] = d['sed_dnde_err']
table['dnde_errn'] = d['sed_dnde_errn']
table['dnde_errp'] = d['sed_dnde_errp']
table['dnde_ul'] = d['sed_dnde_ul']
# Only keep rows that actually contain information
table = table[valid]
# Only keep columns that actually contain information
def _del_nan_col(table, colname):
if np.isfinite(table[colname]).sum() == 0:
del table[colname]
for colname in table.colnames:
_del_nan_col(table, colname)
return FluxPoints(table)
@property
def is_pointlike(self):
"""
Source is pointlike.
"""
return self.data['morph_type'] == 'point'
[docs]class SourceCatalogGammaCat(SourceCatalog):
"""Gammacat open TeV source catalog.
See: https://github.com/gammapy/gamma-cat
One source is represented by `~gammapy.catalog.SourceCatalogObjectGammaCat`.
Parameters
----------
filename : str
Path to the gamma-cat fits file.
Examples
--------
Load the catalog data:
>>> from gammapy.catalog import SourceCatalogGammaCat
>>> cat = SourceCatalogGammaCat()
Access a source by name:
>>> source = cat['Vela Junior']
Access source spectral data and plot it:
>>> source.spectral_model.plot()
>>> source.spectral_model.plot_error()
>>> source.flux_points.plot()
"""
name = 'gamma-cat'
description = 'An open catalog of gamma-ray sources'
source_object_class = SourceCatalogObjectGammaCat
def __init__(self, filename='$GAMMA_CAT/output/gammacat.fits.gz'):
filename = str(make_path(filename))
table = Table.read(filename, hdu=1)
self.filename = filename
source_name_key = 'common_name'
source_name_alias = ('other_names', 'gamma_names')
super(SourceCatalogGammaCat, self).__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
[docs] def to_source_library(self):
"""Convert to a `~gammapy.utils.modeling.SourceLibrary`.
TODO: add an option whether to skip or raise on missing models or data.
"""
source_list = []
for source_idx in range(len(self.table)):
source = self[source_idx]
try:
source_model = SourceModel.from_gammacat(source)
except NoDataAvailableError:
log.warning('Skipping source {} (missing data in gamma-cat)'.format(source.name))
continue
except UnknownModelError:
log.warning('Skipping source {} (model not defined in gammapy)'.format(source.name))
continue
source_list.append(source_model)
return SourceLibrary(source_list=source_list)
[docs]class GammaCatDataCollection(object):
"""Data store for gamma-cat.
Gives access to all data from https://github.com/gammapy/gamma-cat .
Holds a `GammaCatResourceIndex` to locate resources,
but also more info about gamma-cat, as well as methods to create
Gammapy objects (spectral models, flux points, lightcurves) from the datasets.
"""
def __init__(self, data_index):
self.data_index = data_index
[docs] @classmethod
def from_index_file(cls, filename='$GAMMA_CAT/docs/data/gammacat-datasets.json'):
"""Create from index file."""
filename = str(make_path(filename))
# TODO: make a list of `GammaCatResource`, as well as a dict by ``resource_id`` for lookup!
data_index = load_json(filename)
return cls(data_index=data_index)
[docs] def info(self):
"""Print some info."""
ss = 'version = {}'.format(self.data_index['info']['version'])
return ss
[docs]@functools.total_ordering
class GammaCatResource(object):
"""Reference for a single resource in gamma-cat.
This can be considered an implementation detail,
used to assign ``global_id`` and to load resources.
TODO: explain how ``global_id``, ``type`` and ``location`` work.
Uses the Python ``hash`` function on the tuple ``(source_id, reference_id, file_id)``
Parameters
----------
source_id : int
Gamma-cat source ID
reference_id : str
Gamma-cat reference ID (usually the ADS paper bibcode)
file_id : int
File ID (a counter for cases with multiple measurements per reference / source)
(use integer -1 if missing)
type : str
Resource type (use string 'none' if missing)
location : str
Resource location (use string 'none' if missing)
Examples
--------
>>> from gammapy.catalog.gammacat import GammaCatResource
>>> resource = GammaCatResource(source_id=42, reference_id='2010A&A...516A..62A', file_id=2)
>>> resource
GammaCatResource(source_id=42, reference_id='2010A&A...516A..62A', file_id=2, type='none', location='none')
"""
_NA_FILL = dict(file_id=-1, type='none', location='none')
def __init__(self, source_id, reference_id, file_id=-1, type='none', location='none'):
self.source_id = int(source_id)
self.reference_id = six.text_type(reference_id)
self.file_id = int(file_id)
self.type = six.text_type(type)
self.location = six.text_type(location)
@property
def global_id(self):
"""Globally unique (within gamma-cat) resource ID (str).
(see class docstring for explanation and example).
"""
return '|'.join((str(self.source_id), self.reference_id, str(self.file_id), self.type))
def __repr__(self):
fmt = '{}(source_id={!r}, reference_id={!r}, file_id={!r}, type={!r}, location={!r})'
return fmt.format(self.__class__.__name__, self.source_id, str(self.reference_id),
self.file_id, str(self.type), str(self.location))
def __eq__(self, other):
return self.to_namedtuple() == other.to_namedtuple()
def __lt__(self, other):
return self.to_namedtuple() < other.to_namedtuple()
[docs] def to_namedtuple(self):
"""Convert to `collections.namedtuple`."""
d = self.to_dict()
return namedtuple('GammaCatResourceNamedTuple', d.keys())(**d)
[docs] def to_dict(self):
"""Convert to `collections.OrderedDict`."""
data = OrderedDict()
data['source_id'] = self.source_id
data['reference_id'] = self.reference_id
data['file_id'] = self.file_id
data['type'] = self.type
data['location'] = self.location
return data
[docs] @classmethod
def from_dict(cls, data):
"""Create from dict."""
return cls(
source_id=data['source_id'],
reference_id=data['reference_id'],
file_id=data.get('file_id', cls._NA_FILL['file_id']),
type=data.get('type', cls._NA_FILL['type']),
location=data.get('location', cls._NA_FILL['location'])
)
[docs]class GammaCatResourceIndex(object):
"""Resource index for gamma-cat.
Parameters
----------
resources : list
List of `GammaCatResource` objects
"""
def __init__(self, resources):
self.resources = resources
def __repr__(self):
return '{}(n_resources={})'.format(self.__class__.__name__, len(self.resources))
def __eq__(self, other):
if len(self.resources) != len(other.resources):
return False
return all(a == b for (a, b) in zip(self.resources, other.resources))
@property
def unique_source_ids(self):
"""Sorted list of unique source IDs (`list(int)`)."""
return sorted(set([resource.source_id for resource in self.resources]))
@property
def unique_reference_ids(self):
"""Sorted list of unique source IDs (`list(str)`)."""
return sorted(set([resource.reference_id for resource in self.resources]))
@property
def global_ids(self):
"""List of global resource IDs (`list(str)`).
In original order, not sorted.
"""
return [resource.global_id for resource in self.resources]
[docs] def sort(self):
"""Return a sorted copy (leave self unchanged)."""
return self.__class__(sorted(self.resources))
[docs] def to_list(self):
"""Convert to list of dict."""
return [resource.to_dict() for resource in self.resources]
[docs] @classmethod
def from_list(cls, data):
"""Create from list of dicts."""
return cls([GammaCatResource.from_dict(_) for _ in data])
[docs] def to_table(self):
"""Convert to `~astropy.table.Table`."""
rows = self.to_list()
return Table(rows=rows, names=list(rows[0].keys()))
[docs] @classmethod
def from_table(cls, table):
"""Create from `~astropy.table.Table`."""
resources = []
for row in table:
data = OrderedDict((k, row[k]) for k in table.colnames)
resources.append(GammaCatResource.from_dict(data))
return cls(resources=resources)
[docs] def to_pandas(self):
"""Convert to `pandas.DataFrame`."""
# This is inefficient. Could implement direct conversion if needed.
table = self.to_table()
return table.to_pandas()
[docs] @classmethod
def from_pandas(cls, dataframe):
"""Create from `pandas.DataFrame`."""
table = Table.from_pandas(dataframe)
return cls.from_table(table)
[docs] def query(self, *args, **kwargs):
"""Query to select subset of resources.
Calls `pandas.DataFrame.query` and passes arguments to that method.
Examples
--------
>>> resource_index = GammaCatResourceIndex(...)
>>> resource_index2 = resource_index.query('type == "sed" and source_id == 42')
"""
df = self.to_pandas()
df2 = df.query(*args, **kwargs)
return self.from_pandas(df2)