Source code for gammapy.catalog.hess

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""HESS Galactic plane survey (HGPS) catalog.

(Not released yet.)

TODO:
- [ ] Load HGPS maps
- [ ] Source object should contain info on components and associations
- [ ] Links to SNRCat
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import os
from collections import OrderedDict
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.coordinates import Angle
from astropy.modeling.models import Gaussian2D
from ..extern.pathlib import Path
from ..utils.scripts import make_path
from ..spectrum import FluxPoints
from ..spectrum.models import PowerLaw, ExponentialCutoffPowerLaw
from ..image.models import Delta2D, Shell2D
from ..background.models import GaussianBand2D
from .core import SourceCatalog, SourceCatalogObject

__all__ = [
    'SourceCatalogHGPS',
    'SourceCatalogObjectHGPS',
]

# Flux factor, used for printing
FF = 1e-12

# Multiplicative factor to go from cm^-2 s^-1 to % Crab for integral flux > 1 TeV
# Here we use the same Crab reference that's used in the HGPS paper
# CRAB = crab_integral_flux(energy_min=1, reference='hess_ecpl')
FLUX_TO_CRAB = 100 / 2.26e-11
FLUX_TO_CRAB_DIFF = 100 / 3.5060459323111307e-11


class HGPSGaussComponent(object):
    """One Gaussian component from the HGPS catalog.
    """
    _source_name_key = 'Source_Name'
    _source_index_key = 'catalog_row_index'

    def __init__(self, data):
        self.data = OrderedDict(data)

    def __str__(self):
        """Pretty-print source data"""
        d = self.data
        ss = 'Component {}:\n'.format(d['Component_ID'])
        fmt = '{:<20s} : {:8.3f} +/- {:.3f} deg\n'
        ss += fmt.format('GLON', d['GLON'].value, d['GLON_Err'].value)
        ss += fmt.format('GLAT', d['GLAT'].value, d['GLAT_Err'].value)
        fmt = '{:<20s} : {:.3f} +/- {:.3f} deg\n'
        ss += fmt.format('Size', d['Size'].value, d['Size_Err'].value)
        val, err = d['Flux_Map'].value, d['Flux_Map_Err'].value
        fmt = '{:<20s} : ({:.2f} +/- {:.2f}) x 10^-12 cm^-2 s^-1 = ({:.1f} +/- {:.1f}) % Crab'
        ss += fmt.format('Flux (>1 TeV)', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB)
        return ss

    @property
    def name(self):
        """Source name"""
        name = self.data[self._source_name_key]
        return name.strip()

    @property
    def index(self):
        """Row index of source in catalog"""
        return self.data[self._source_index_key]

    @property
    def spatial_model(self):
        """
        Component spatial model.
        """
        d = self.data
        # At the moment spatial models are normalised by deg^2
        amplitude = d['Flux_Map'].value / (2 * np.pi * d['Size'].value ** 2)
        glon = Angle(d['GLON']).wrap_at('180d')
        glat = Angle(d['GLAT']).wrap_at('180d')

        return Gaussian2D(
            x_mean=glon.value,
            y_mean=glat.value,
            x_stddev=d['Size'].value,
            y_stddev=d['Size'].value,
            amplitude=amplitude,
        )


[docs]class SourceCatalogObjectHGPS(SourceCatalogObject): """One object from the HGPS catalog.""" def __str__(self): return self.info()
[docs] def info(self, info='all'): """Info string. Parameters ---------- info : {'all', 'basic', 'map', 'spec', 'flux_points', 'components', 'associations'} Comma separated list of options """ if info == 'all': info = 'basic,associations,map,spec,flux_points,components' ss = '' ops = info.split(',') if 'basic' in ops: ss += self._info_basic() if 'map' in ops: ss += self._info_map() if 'spec' in ops: ss += self._info_spec() if 'flux_points' in ops: ss += self._info_flux_points() if 'components' in ops: ss += self._info_components() if 'associations' in ops: ss += self._info_associations() return ss
def _info_basic(self): """Print basic info.""" d = self.data ss = '\n*** Basic info ***\n\n' ss += 'Catalog row index (zero-based) : {}\n'.format(d['catalog_row_index']) ss += '{:<20s} : {}\n'.format('Source name', d['Source_Name']) ss += '{:<20s} : {}\n'.format('Analysis reference', d['Analysis_Reference']) ss += '{:<20s} : {}\n'.format('Source class', d['Source_Class']) ss += '{:<20s} : {}\n'.format('Identified object', d['Identified_Object']) ss += '{:<20s} : {}\n'.format('Gamma-Cat id', d['Gamma_Cat_Source_ID']) ss += '\n' return ss def _info_associations(self): ss = '\n*** Source associations info ***\n\n' associations = ', '.join(self.associations) ss += 'List of associated objects: {}\n'.format(associations) return ss def _info_map(self): """Print info from map analysis.""" d = self.data ss = '\n*** Info from map analysis ***\n\n' ra_str = Angle(d['RAJ2000'], 'deg').to_string(unit='hour', precision=0) dec_str = Angle(d['DEJ2000'], 'deg').to_string(unit='deg', precision=0) ss += '{:<20s} : {:8.3f} = {}\n'.format('RA', d['RAJ2000'], ra_str) ss += '{:<20s} : {:8.3f} = {}\n'.format('DEC', d['DEJ2000'], dec_str) ss += '{:<20s} : {:8.3f} +/- {:.3f} deg\n'.format('GLON', d['GLON'].value, d['GLON_Err'].value) ss += '{:<20s} : {:8.3f} +/- {:.3f} deg\n'.format('GLAT', d['GLAT'].value, d['GLAT_Err'].value) ss += '{:<20s} : {:.3f}\n'.format('Position Error (68%)', d['Pos_Err_68']) ss += '{:<20s} : {:.3f}\n'.format('Position Error (95%)', d['Pos_Err_95']) ss += '{:<20s} : {:.0f}\n'.format('ROI number', d['ROI_Number']) ss += '{:<20s} : {}\n'.format('Spatial model', d['Spatial_Model']) ss += '{:<20s} : {}\n'.format('Spatial components', d['Components']) ss += '{:<20s} : {:.1f}\n'.format('TS', d['Sqrt_TS'] ** 2) ss += '{:<20s} : {:.1f}\n'.format('sqrt(TS)', d['Sqrt_TS']) ss += '{:<20s} : {:.3f} +/- {:.3f} (UL: {:.3f}) deg\n'.format( 'Size', d['Size'].value, d['Size_Err'].value, d['Size_UL'].value) ss += '{:<20s} : {:.3f}\n'.format('R70', d['R70']) ss += '{:<20s} : {:.3f}\n'.format('RSpec', d['RSpec']) ss += '{:<20s} : {:.1f}\n'.format('Total model excess', d['Excess_Model_Total']) ss += '{:<20s} : {:.1f}\n'.format('Excess in RSpec', d['Excess_RSpec']) ss += '{:<20s} : {:.1f}\n'.format('Model Excess in RSpec', d['Excess_RSpec_Model']) ss += '{:<20s} : {:.1f}\n'.format('Background in RSpec', d['Background_RSpec']) ss += '{:<20s} : {:.1f} hours\n'.format('Livetime', d['Livetime'].value) ss += '{:<20s} : {:.2f}\n'.format('Energy threshold', d['Energy_Threshold']) val, err = d['Flux_Map'].value, d['Flux_Map_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n'.format( 'Source flux (>1 TeV)', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB) ss += '\nFluxes in RSpec (> 1 TeV):\n' ss += '{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n'.format( 'Map measurement', d['Flux_Map_RSpec_Data'].value / FF, d['Flux_Map_RSpec_Data'].value * FLUX_TO_CRAB, ) ss += '{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n'.format( 'Source model', d['Flux_Map_RSpec_Source'].value / FF, d['Flux_Map_RSpec_Source'].value * FLUX_TO_CRAB, ) ss += '{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n'.format( 'Other component model', d['Flux_Map_RSpec_Other'].value / FF, d['Flux_Map_RSpec_Other'].value * FLUX_TO_CRAB, ) ss += '{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n'.format( 'Large scale component model', d['Flux_Map_RSpec_LS'].value / FF, d['Flux_Map_RSpec_LS'].value * FLUX_TO_CRAB, ) ss += '{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n'.format( 'Total model', d['Flux_Map_RSpec_Total'].value / FF, d['Flux_Map_RSpec_Total'].value * FLUX_TO_CRAB, ) ss += '{:<35s} : {:5.1f} %\n'.format('Containment in RSpec', 100 * d['Containment_RSpec']) ss += '{:<35s} : {:5.1f} %\n'.format('Contamination in RSpec', 100 * d['Contamination_RSpec']) label, val = 'Flux correction (RSpec -> Total)', 100 * d['Flux_Correction_RSpec_To_Total'] ss += '{:<35s} : {:5.1f} %\n'.format(label, val) label, val = 'Flux correction (Total -> RSpec)', 100 * (1 / d['Flux_Correction_RSpec_To_Total']) ss += '{:<35s} : {:5.1f} %\n'.format(label, val) return ss def _info_spec(self): """Print info from spectral analysis.""" d = self.data ss = '\n*** Info from spectral analysis ***\n\n' ss += '{:<20s} : {:.1f} hours\n'.format('Livetime', d['Livetime_Spec'].value) lo = d['Energy_Range_Spec_Min'].value hi = d['Energy_Range_Spec_Max'].value ss += '{:<20s} : {:.2f} to {:.2f} TeV\n'.format('Energy range:', lo, hi) ss += '{:<20s} : {:.1f}\n'.format('Background', d['Background_Spec']) ss += '{:<20s} : {:.1f}\n'.format('Excess', d['Excess_Spec']) ss += '{:<20s} : {}\n'.format('Spectral model', d['Spectral_Model']) val = d['TS_ECPL_over_PL'] ss += '{:<20s} : {:.1f}\n'.format('TS ECPL over PL', val) val = d['Flux_Spec_Int_1TeV'].value err = d['Flux_Spec_Int_1TeV_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n'.format( 'Best-fit model flux(> 1 TeV)', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB) val = d['Flux_Spec_Energy_1_10_TeV'].value err = d['Flux_Spec_Energy_1_10_TeV_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 erg cm^-2 s^-1\n'.format( 'Best-fit model energy flux(1 to 10 TeV)', val / FF, err / FF) # TODO: can we just use the Gammapy model classes here instead of duplicating the code? ss += self._info_spec_pl() ss += self._info_spec_ecpl() return ss def _info_spec_pl(self): d = self.data ss = '{:<20s} : {:.2f}\n'.format('Pivot energy', d['Energy_Spec_PL_Pivot']) val = d['Flux_Spec_PL_Diff_Pivot'].value err = d['Flux_Spec_PL_Diff_Pivot_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n'.format( 'Flux at pivot energy', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF) val = d['Flux_Spec_PL_Int_1TeV'].value err = d['Flux_Spec_PL_Int_1TeV_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n'.format( 'PL Flux(> 1 TeV)', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB) val = d['Flux_Spec_PL_Diff_1TeV'].value err = d['Flux_Spec_PL_Diff_1TeV_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n'.format( 'PL Flux(@ 1 TeV)', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF) val = d['Index_Spec_PL'] err = d['Index_Spec_PL_Err'] ss += '{:<20s} : {:.2f} +/- {:.2f}\n'.format('PL Index', val, err) return ss def _info_spec_ecpl(self): d = self.data ss = '' # ss = '{:<20s} : {:.1f}\n'.format('Pivot energy', d['Energy_Spec_ECPL_Pivot']) val = d['Flux_Spec_ECPL_Diff_1TeV'].value err = d['Flux_Spec_ECPL_Diff_1TeV_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n'.format( 'ECPL Flux(@ 1 TeV)', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF) val = d['Flux_Spec_ECPL_Int_1TeV'].value err = d['Flux_Spec_ECPL_Int_1TeV_Err'].value ss += '{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n'.format( 'ECPL Flux(> 1 TeV)', val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB) val = d['Index_Spec_ECPL'] err = d['Index_Spec_ECPL_Err'] ss += '{:<20s} : {:.2f} +/- {:.2f}\n'.format('ECPL Index', val, err) val = d['Lambda_Spec_ECPL'].value err = d['Lambda_Spec_ECPL_Err'].value ss += '{:<20s} : {:.3f} +/- {:.3f} TeV^-1\n'.format('ECPL Lambda', val, err) # Use Gaussian analytical error propagation, # tested against the uncertainties package err = err / val ** 2 val = 1. / val ss += '{:<20s} : {:.2f} +/- {:.2f} TeV\n'.format('ECPL E_cut', val, err) return ss def _info_flux_points(self): """Print flux point results""" d = self.data ss = '\n*** Flux points info ***\n\n' ss += 'Number of flux points: {}\n'.format(d['N_Flux_Points']) ss += 'Flux points table: \n\n\t' flux_points = self.flux_points.table.copy() energy_cols = ['e_ref', 'e_min', 'e_max'] flux_cols = ['dnde', 'dnde_errn', 'dnde_errp', 'dnde_ul'] cols = energy_cols + flux_cols + ['is_ul'] flux_points = flux_points[cols] for _ in energy_cols: flux_points[_].format = '.3f' for _ in flux_cols: flux_points[_].format = '.3e' # convert table to string lines = flux_points.pformat(max_width=-1, max_lines=-1) ss += '\n'.join(lines) return ss + '\n' def _info_components(self): """Print info about the components.""" if not hasattr(self, 'components'): return '' ss = '\n*** Gaussian component info ***\n\n' ss += 'Number of components: {}\n'.format(len(self.components)) ss += '{:<20s} : {}\n\n'.format('Spatial components', self.data['Components']) for component in self.components: ss += str(component) ss += '\n\n' return ss @property def energy_range(self): """Spectral model energy range (`~astropy.units.Quantity` with length 2).""" e = u.Quantity([ self.data['Energy_Range_Spec_Min'], self.data['Energy_Range_Spec_Max'], ]) # Some EXTERN sources have no energy range information. # In those cases, we put a default use_default = np.isnan(e) e_default = [0.2, 50] * u.TeV e[use_default] = e_default[use_default] return e @property def spectral_model(self): """Spectral model (`~gammapy.spectrum.models.SpectralModel`). One of the following models: - ``Spectral_Model="PL"`` : `~gammapy.spectrum.models.PowerLaw` - ``Spectral_Model="ECPL"`` : `~gammapy.spectrum.models.ExponentialCutoffPowerLaw` """ data = self.data spec_type = data['Spectral_Model'].strip() return self._spectral_model(spec_type) def _spectral_model(self, spec_type): """ Create spectral model from source parameters. Parameters ---------- spec_type : str Spectral model type either 'PL' or 'ECPL'. Returns ------- model : `~gammapy.spectrum.models.SpectralModel` Spectral model instance. """ data = self.data pars, errs = {}, {} if spec_type == 'PL': pars['index'] = data['Index_Spec_PL'] pars['amplitude'] = data['Flux_Spec_PL_Diff_Pivot'] pars['reference'] = data['Energy_Spec_PL_Pivot'] errs['amplitude'] = data['Flux_Spec_PL_Diff_Pivot_Err'] errs['index'] = data['Index_Spec_PL_Err'] * u.dimensionless_unscaled model = PowerLaw(**pars) elif spec_type == 'ECPL': pars['index'] = data['Index_Spec_ECPL'] pars['amplitude'] = data['Flux_Spec_ECPL_Diff_Pivot'] pars['reference'] = data['Energy_Spec_ECPL_Pivot'] pars['lambda_'] = data['Lambda_Spec_ECPL'] errs['index'] = data['Index_Spec_ECPL_Err'] * u.dimensionless_unscaled errs['amplitude'] = data['Flux_Spec_ECPL_Diff_Pivot_Err'] errs['lambda_'] = data['Lambda_Spec_ECPL_Err'] model = ExponentialCutoffPowerLaw(**pars) else: raise ValueError('Invalid spectral model: {}'.format(spec_type)) model.parameters.set_parameter_errors(errs) return model
[docs] def spatial_model(self, emin=1 * u.TeV, emax=1E5 * u.TeV): """ Source spatial model. """ d = self.data flux = self.spectral_model.integral(emin, emax) amplitude = flux.to('cm-2 s-1').value pars = {} glon = Angle(d['GLON']).wrap_at('180d') glat = Angle(d['GLAT']).wrap_at('180d') spatial_model = d['Spatial_Model'] if self.is_pointlike: pars['amplitude'] = amplitude pars['x_0'] = glon.value pars['y_0'] = glat.value return Delta2D(**pars) elif spatial_model == 'Shell': # HGPS contains no information on shell width # Here we assuma a 5% shell width for all shells. shell_width_fraction = 0.05 pars['x_0'] = glon.value pars['y_0'] = glat.value r_out = d['Size'].to('deg').value pars['width'] = r_out * shell_width_fraction pars['r_in'] = r_out * (1 - shell_width_fraction) pars['amplitude'] = amplitude return Shell2D(**pars) elif spatial_model in ['Gaussian', '2-Gaussian', '3-Gaussian']: amplitude_total = d['Flux_Map'].to('cm-2 s-1').value try: models = [component.spatial_model for component in self.components] except AttributeError: # there is one external source (HESS J1801-233) where there is no # component info available, so we create it here locally models = [HGPSGaussComponent(d).spatial_model] for model in models: # weight total flux according to relative amplitude model.amplitude = model.amplitude / amplitude_total * amplitude if spatial_model == 'Gaussian': return models[0] elif spatial_model == '2-Gaussian': model = models[0] + models[1] # use generic bounding box of 2 deg width and height model.bounding_box = ((glat.deg - 2, glat.deg + 2), (glon.deg - 2, glon.deg + 2)) return model elif spatial_model == '3-Gaussian': model = models[0] + models[1] + models[2] # use generic bounding box of 2 deg width and height model.bounding_box = ((glat.deg - 2, glat.deg + 2), (glon.deg - 2, glon.deg + 2)) return model else: raise ValueError('Not a valid spatial model{}'.format(spatial_model))
@property def is_pointlike(self): """Source is pointlike""" d = self.data has_size_ul = np.isfinite(d['Size_UL']) pointlike = d['Spatial_Model'] == 'Point-Like' return pointlike or has_size_ul @property def flux_points(self): """Flux points (`~gammapy.spectrum.FluxPoints`).""" table = Table() table.meta['SED_TYPE'] = 'dnde' mask = ~np.isnan(self.data['Flux_Points_Energy']) table['e_ref'] = self.data['Flux_Points_Energy'][mask] table['e_min'] = self.data['Flux_Points_Energy_Min'][mask] table['e_max'] = self.data['Flux_Points_Energy_Max'][mask] table['dnde'] = self.data['Flux_Points_Flux'][mask] table['dnde_errn'] = self.data['Flux_Points_Flux_Err_Lo'][mask] table['dnde_errp'] = self.data['Flux_Points_Flux_Err_Hi'][mask] table['dnde_ul'] = self.data['Flux_Points_Flux_UL'][mask] table['is_ul'] = self.data['Flux_Points_Flux_Is_UL'][mask].astype('bool') return FluxPoints(table)
[docs]class SourceCatalogHGPS(SourceCatalog): """HESS Galactic plane survey (HGPS) source catalog. Note: this catalog isn't publicly available yet. H.E.S.S. members have access and can use this. """ name = 'hgps' description = 'H.E.S.S. Galactic plane survey (HGPS) source catalog' source_object_class = SourceCatalogObjectHGPS def __init__(self, filename=None, hdu='HGPS_SOURCES'): if not filename: filename = Path(os.environ['HGPS_ANALYSIS']) / 'data/catalogs/HGPS3/release/HGPS_v0.4.fits' filename = str(make_path(filename)) table = Table.read(filename, hdu=hdu) source_name_alias = ('Identified_Object',) super(SourceCatalogHGPS, self).__init__( table=table, source_name_alias=source_name_alias, ) self.components = Table.read(filename, hdu='HGPS_GAUSS_COMPONENTS') self.associations = Table.read(filename, hdu='HGPS_ASSOCIATIONS') self.identifications = Table.read(filename, hdu='HGPS_IDENTIFICATIONS') self._large_scale_component = Table.read(filename, hdu='HGPS_LARGE_SCALE_COMPONENT') @property def large_scale_component(self): """Large sclae component model (`~gammapy.background.models.GaussianBand2D`). """ table = self._large_scale_component return GaussianBand2D(table, spline_kwargs=dict(k=3, s=0, ext=0)) def _make_source_object(self, index): """Make one source object. Parameters ---------- index : int Row index Returns ------- source : `SourceCatalogObject` Source object """ source = super(SourceCatalogHGPS, self)._make_source_object(index) if hasattr(self, 'components'): if source.data['Components'] != '': self._attach_component_info(source) if hasattr(self, 'associations'): self._attach_association_info(source) # TODO: implement or remove: # if source.data['Source_Class'] != 'Unid': # self._attach_identification_info(source) return source def _attach_component_info(self, source): source.components = [] lookup = SourceCatalog(self.components, source_name_key='Component_ID') for name in source.data['Components'].split(', '): component = HGPSGaussComponent(data=lookup[name].data) source.components.append(component) def _attach_association_info(self, source): source.associations = [] _ = source.data['Source_Name'] == self.associations['Source_Name'] source.associations = list(self.associations['Association_Name'][_])