# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""HESS Galactic plane survey (HGPS) catalog."""
from __future__ import absolute_import, division, print_function, unicode_literals
import os
from collections import OrderedDict
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.coordinates import Angle
from astropy.modeling.models import Gaussian1D
from ..extern.pathlib import Path
from ..utils.scripts import make_path
from ..utils.table import table_row_to_dict
from ..utils.interpolation import ScaledRegularGridInterpolator
from ..spectrum import FluxPoints
from ..spectrum.models import PowerLaw, PowerLaw2, ExponentialCutoffPowerLaw
from ..image.models import SkyPointSource, SkyGaussian, SkyShell
from ..cube.models import SkyModel, SkyModels
from .core import SourceCatalog, SourceCatalogObject
__all__ = [
"SourceCatalogHGPS",
"SourceCatalogObjectHGPS",
"SourceCatalogObjectHGPSComponent",
"SourceCatalogLargeScaleHGPS",
]
# Flux factor, used for printing
FF = 1e-12
# Multiplicative factor to go from cm^-2 s^-1 to % Crab for integral flux > 1 TeV
# Here we use the same Crab reference that's used in the HGPS paper
# CRAB = crab_integral_flux(energy_min=1, reference='hess_ecpl')
FLUX_TO_CRAB = 100 / 2.26e-11
FLUX_TO_CRAB_DIFF = 100 / 3.5060459323111307e-11
[docs]class SourceCatalogObjectHGPSComponent(object):
"""One Gaussian component from the HGPS catalog.
See also
--------
SourceCatalogHGPS, SourceCatalogObjectHGPS
"""
_source_name_key = "Component_ID"
_source_index_key = "row_index"
def __init__(self, data):
self.data = OrderedDict(data)
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.name)
def __str__(self):
"""Pretty-print source data"""
d = self.data
ss = "Component {}:\n".format(d["Component_ID"])
fmt = "{:<20s} : {:8.3f} +/- {:.3f} deg\n"
ss += fmt.format("GLON", d["GLON"].value, d["GLON_Err"].value)
ss += fmt.format("GLAT", d["GLAT"].value, d["GLAT_Err"].value)
fmt = "{:<20s} : {:.3f} +/- {:.3f} deg\n"
ss += fmt.format("Size", d["Size"].value, d["Size_Err"].value)
val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value
fmt = "{:<20s} : ({:.2f} +/- {:.2f}) x 10^-12 cm^-2 s^-1 = ({:.1f} +/- {:.1f}) % Crab"
ss += fmt.format(
"Flux (>1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB
)
return ss
@property
def name(self):
"""Source name (str)"""
name = self.data[self._source_name_key]
return name.strip()
@property
def index(self):
"""Row index of source in table (int)"""
return self.data[self._source_index_key]
@property
def spatial_model(self):
"""Component spatial model (`~gammapy.image.models.SkyGaussian`)."""
d = self.data
model = SkyGaussian(lon_0=d["GLON"], lat_0=d["GLAT"], sigma=d["Size"])
model.parameters.set_parameter_errors(
dict(lon_0=d["GLON_Err"], lat_0=d["GLAT_Err"], sigma=d["Size_Err"])
)
return model
@property
def spectral_model(self):
"""Component spectral model (`gammapy.spectrum.models.PowerLaw2`)."""
d = self.data
model = PowerLaw2(
amplitude=d["Flux_Map"], index=2.3, emin="1 TeV", emax="1e5 TeV"
)
model.parameters.set_parameter_errors(dict(amplitude=d["Flux_Map_Err"]))
return model
@property
def sky_model(self):
"""Component sky model (`gammapy.cube.models.SkyModel`)."""
return SkyModel(self.spatial_model, self.spectral_model)
[docs]class SourceCatalogObjectHGPS(SourceCatalogObject):
"""One object from the HGPS catalog.
The catalog is represented by `SourceCatalogHGPS`.
Examples are given there.
"""
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.name)
def __str__(self):
return self.info()
[docs] def info(self, info="all"):
"""Info string.
Parameters
----------
info : {'all', 'basic', 'map', 'spec', 'flux_points', 'components', 'associations', 'id'}
Comma separated list of options
"""
if info == "all":
info = "basic,associations,id,map,spec,flux_points,components"
ss = ""
ops = info.split(",")
if "basic" in ops:
ss += self._info_basic()
if "map" in ops:
ss += self._info_map()
if "spec" in ops:
ss += self._info_spec()
if "flux_points" in ops:
ss += self._info_flux_points()
if "components" in ops and hasattr(self, "components"):
ss += self._info_components()
if "associations" in ops:
ss += self._info_associations()
if "id" in ops and hasattr(self, "identification_info"):
ss += self._info_id()
return ss
def _info_basic(self):
"""Print basic info."""
d = self.data
ss = "\n*** Basic info ***\n\n"
ss += "Catalog row index (zero-based) : {}\n".format(d["catalog_row_index"])
ss += "{:<20s} : {}\n".format("Source name", d["Source_Name"])
ss += "{:<20s} : {}\n".format("Analysis reference", d["Analysis_Reference"])
ss += "{:<20s} : {}\n".format("Source class", d["Source_Class"])
ss += "{:<20s} : {}\n".format("Identified object", d["Identified_Object"])
ss += "{:<20s} : {}\n".format("Gamma-Cat id", d["Gamma_Cat_Source_ID"])
ss += "\n"
return ss
def _info_associations(self):
ss = "\n*** Source associations info ***\n\n"
lines = self.associations.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_id(self):
ss = "\n*** Source identification info ***\n\n"
ss += "\n".join(
"{}: {}".format(k, v) for k, v in self.identification_info.items()
)
return ss + "\n"
def _info_map(self):
"""Print info from map analysis."""
d = self.data
ss = "\n*** Info from map analysis ***\n\n"
ra_str = Angle(d["RAJ2000"], "deg").to_string(unit="hour", precision=0)
dec_str = Angle(d["DEJ2000"], "deg").to_string(unit="deg", precision=0)
ss += "{:<20s} : {:8.3f} = {}\n".format("RA", d["RAJ2000"], ra_str)
ss += "{:<20s} : {:8.3f} = {}\n".format("DEC", d["DEJ2000"], dec_str)
ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format(
"GLON", d["GLON"].value, d["GLON_Err"].value
)
ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format(
"GLAT", d["GLAT"].value, d["GLAT_Err"].value
)
ss += "{:<20s} : {:.3f}\n".format("Position Error (68%)", d["Pos_Err_68"])
ss += "{:<20s} : {:.3f}\n".format("Position Error (95%)", d["Pos_Err_95"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_Number"])
ss += "{:<20s} : {}\n".format("Spatial model", d["Spatial_Model"])
ss += "{:<20s} : {}\n".format("Spatial components", d["Components"])
ss += "{:<20s} : {:.1f}\n".format("TS", d["Sqrt_TS"] ** 2)
ss += "{:<20s} : {:.1f}\n".format("sqrt(TS)", d["Sqrt_TS"])
ss += "{:<20s} : {:.3f} +/- {:.3f} (UL: {:.3f}) deg\n".format(
"Size", d["Size"].value, d["Size_Err"].value, d["Size_UL"].value
)
ss += "{:<20s} : {:.3f}\n".format("R70", d["R70"])
ss += "{:<20s} : {:.3f}\n".format("RSpec", d["RSpec"])
ss += "{:<20s} : {:.1f}\n".format("Total model excess", d["Excess_Model_Total"])
ss += "{:<20s} : {:.1f}\n".format("Excess in RSpec", d["Excess_RSpec"])
ss += "{:<20s} : {:.1f}\n".format(
"Model Excess in RSpec", d["Excess_RSpec_Model"]
)
ss += "{:<20s} : {:.1f}\n".format("Background in RSpec", d["Background_RSpec"])
ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime"].value)
ss += "{:<20s} : {:.2f}\n".format("Energy threshold", d["Energy_Threshold"])
val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Source flux (>1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
ss += "\nFluxes in RSpec (> 1 TeV):\n"
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Map measurement",
d["Flux_Map_RSpec_Data"].value / FF,
d["Flux_Map_RSpec_Data"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Source model",
d["Flux_Map_RSpec_Source"].value / FF,
d["Flux_Map_RSpec_Source"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Other component model",
d["Flux_Map_RSpec_Other"].value / FF,
d["Flux_Map_RSpec_Other"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Large scale component model",
d["Flux_Map_RSpec_LS"].value / FF,
d["Flux_Map_RSpec_LS"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Total model",
d["Flux_Map_RSpec_Total"].value / FF,
d["Flux_Map_RSpec_Total"].value * FLUX_TO_CRAB,
)
ss += "{:<35s} : {:5.1f} %\n".format(
"Containment in RSpec", 100 * d["Containment_RSpec"]
)
ss += "{:<35s} : {:5.1f} %\n".format(
"Contamination in RSpec", 100 * d["Contamination_RSpec"]
)
label, val = (
"Flux correction (RSpec -> Total)",
100 * d["Flux_Correction_RSpec_To_Total"],
)
ss += "{:<35s} : {:5.1f} %\n".format(label, val)
label, val = (
"Flux correction (Total -> RSpec)",
100 * (1 / d["Flux_Correction_RSpec_To_Total"]),
)
ss += "{:<35s} : {:5.1f} %\n".format(label, val)
return ss
def _info_spec(self):
"""Print info from spectral analysis."""
d = self.data
ss = "\n*** Info from spectral analysis ***\n\n"
ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime_Spec"].value)
lo = d["Energy_Range_Spec_Min"].value
hi = d["Energy_Range_Spec_Max"].value
ss += "{:<20s} : {:.2f} to {:.2f} TeV\n".format("Energy range:", lo, hi)
ss += "{:<20s} : {:.1f}\n".format("Background", d["Background_Spec"])
ss += "{:<20s} : {:.1f}\n".format("Excess", d["Excess_Spec"])
ss += "{:<20s} : {}\n".format("Spectral model", d["Spectral_Model"])
val = d["TS_ECPL_over_PL"]
ss += "{:<20s} : {:.1f}\n".format("TS ECPL over PL", val)
val = d["Flux_Spec_Int_1TeV"].value
err = d["Flux_Spec_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Best-fit model flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Flux_Spec_Energy_1_10_TeV"].value
err = d["Flux_Spec_Energy_1_10_TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 erg cm^-2 s^-1\n".format(
"Best-fit model energy flux(1 to 10 TeV)", val / FF, err / FF
)
ss += self._info_spec_pl()
ss += self._info_spec_ecpl()
return ss
def _info_spec_pl(self):
d = self.data
ss = "{:<20s} : {:.2f}\n".format("Pivot energy", d["Energy_Spec_PL_Pivot"])
val = d["Flux_Spec_PL_Diff_Pivot"].value
err = d["Flux_Spec_PL_Diff_Pivot_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Flux at pivot energy",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Flux_Spec_PL_Int_1TeV"].value
err = d["Flux_Spec_PL_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"PL Flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Flux_Spec_PL_Diff_1TeV"].value
err = d["Flux_Spec_PL_Diff_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"PL Flux(@ 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Index_Spec_PL"]
err = d["Index_Spec_PL_Err"]
ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("PL Index", val, err)
return ss
def _info_spec_ecpl(self):
d = self.data
ss = ""
# ss = '{:<20s} : {:.1f}\n'.format('Pivot energy', d['Energy_Spec_ECPL_Pivot'])
val = d["Flux_Spec_ECPL_Diff_1TeV"].value
err = d["Flux_Spec_ECPL_Diff_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"ECPL Flux(@ 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Flux_Spec_ECPL_Int_1TeV"].value
err = d["Flux_Spec_ECPL_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"ECPL Flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Index_Spec_ECPL"]
err = d["Index_Spec_ECPL_Err"]
ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("ECPL Index", val, err)
val = d["Lambda_Spec_ECPL"].value
err = d["Lambda_Spec_ECPL_Err"].value
ss += "{:<20s} : {:.3f} +/- {:.3f} TeV^-1\n".format("ECPL Lambda", val, err)
# Use Gaussian analytical error propagation,
# tested against the uncertainties package
err = err / val ** 2
val = 1.0 / val
ss += "{:<20s} : {:.2f} +/- {:.2f} TeV\n".format("ECPL E_cut", val, err)
return ss
def _info_flux_points(self):
"""Print flux point results"""
d = self.data
ss = "\n*** Flux points info ***\n\n"
ss += "Number of flux points: {}\n".format(d["N_Flux_Points"])
ss += "Flux points table: \n\n"
lines = self.flux_points.table_formatted.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_components(self):
"""Print info about the components."""
ss = "\n*** Gaussian component info ***\n\n"
ss += "Number of components: {}\n".format(len(self.components))
ss += "{:<20s} : {}\n\n".format("Spatial components", self.data["Components"])
for component in self.components:
ss += str(component)
ss += "\n\n"
return ss
@property
def energy_range(self):
"""Spectral model energy range (`~astropy.units.Quantity` with length 2)."""
emin, emax = (
self.data["Energy_Range_Spec_Min"],
self.data["Energy_Range_Spec_Max"],
)
if np.isnan(emin):
emin = u.Quantity(0.2, "TeV")
if np.isnan(emax):
emax = u.Quantity(50, "TeV")
return u.Quantity([emin, emax], "TeV")
@property
def spectral_model_type(self):
"""Spectral model type (str).
One of: 'pl', 'ecpl'
"""
return self.data["Spectral_Model"].strip().lower()
[docs] def spectral_model(self, which="best"):
"""Spectral model (`~gammapy.spectrum.models.SpectralModel`).
One of the following models (given by ``Spectral_Model`` in the catalog):
- ``PL`` : `~gammapy.spectrum.models.PowerLaw`
- ``ECPL`` : `~gammapy.spectrum.models.ExponentialCutoffPowerLaw`
Parameters
----------
which : {'best', 'pl', 'ecpl'}
Which spectral model
"""
data = self.data
if which == "best":
spec_type = self.spectral_model_type
elif which in {"pl", "ecpl"}:
spec_type = which
else:
raise ValueError("Invalid selection: which = {!r}".format(which))
pars, errs = {}, {}
if spec_type == "pl":
pars["index"] = data["Index_Spec_PL"]
pars["amplitude"] = data["Flux_Spec_PL_Diff_Pivot"]
pars["reference"] = data["Energy_Spec_PL_Pivot"]
errs["amplitude"] = data["Flux_Spec_PL_Diff_Pivot_Err"]
errs["index"] = data["Index_Spec_PL_Err"]
model = PowerLaw(**pars)
elif spec_type == "ecpl":
pars["index"] = data["Index_Spec_ECPL"]
pars["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot"]
pars["reference"] = data["Energy_Spec_ECPL_Pivot"]
pars["lambda_"] = data["Lambda_Spec_ECPL"]
errs["index"] = data["Index_Spec_ECPL_Err"]
errs["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot_Err"]
errs["lambda_"] = data["Lambda_Spec_ECPL_Err"]
model = ExponentialCutoffPowerLaw(**pars)
else:
raise ValueError("Invalid spectral model: {}".format(spec_type))
model.parameters.set_parameter_errors(errs)
return model
@property
def spatial_model_type(self):
"""Spatial model type (str).
One of: 'point-like', 'shell', 'gaussian', '2-gaussian', '3-gaussian'
"""
return self.data["Spatial_Model"].strip().lower()
@property
def is_pointlike(self):
"""Source is pointlike? (bool)"""
d = self.data
has_size_ul = np.isfinite(d["Size_UL"])
pointlike = d["Spatial_Model"] == "Point-Like"
return pointlike or has_size_ul
@property
def spatial_model(self):
"""Spatial model (`~gammapy.image.models.SkySpatialModel`).
One of the following models (given by ``Spatial_Model`` in the catalog):
- ``Point-Like`` or has a size upper limit : `~gammapy.image.models.SkyPointSource`
- ``Gaussian``: `~gammapy.image.models.SkyGaussian`
- ``2-Gaussian`` or ``3-Gaussian``: composite model (using ``+`` with Gaussians)
- ``Shell``: `~gammapy.image.models.SkyShell`
TODO: add parameter errors
"""
d = self.data
glon = d["GLON"]
glat = d["GLAT"]
spatial_type = self.spatial_model_type
if self.is_pointlike:
model = SkyPointSource(lon_0=glon, lat_0=glat)
elif spatial_type == "gaussian":
model = SkyGaussian(lon_0=glon, lat_0=glat, sigma=d["Size"])
elif spatial_type in {"2-gaussian", "3-gaussian"}:
raise ValueError("For Gaussian or Multi-Gaussian models, use sky_model()!")
elif spatial_type == "shell":
# HGPS contains no information on shell width
# Here we assuma a 5% shell width for all shells.
r_out = d["Size"]
radius = 0.95 * r_out
width = r_out - radius
model = SkyShell(lon_0=glon, lat_0=glat, width=width, radius=radius)
else:
raise ValueError("Not a valid spatial model: {}".format(spatial_type))
return model
@property
def sky_model(self):
"""Source sky model (`~gammapy.cube.models.SkyModel`)."""
if self.spatial_model_type in {"2-gaussian", "3-gaussian"}:
models = [c.sky_model for c in self.components]
return SkyModels(models)
else:
spatial_model = self.spatial_model
# TODO: there are two spectral models
# Here we only expose the default
# Change sky_model into a method and re-expose option to select spectral model?
spectral_model = self.spectral_model()
return SkyModel(spatial_model, spectral_model)
@property
def flux_points(self):
"""Flux points (`~gammapy.spectrum.FluxPoints`)."""
table = Table()
table.meta["SED_TYPE"] = "dnde"
mask = ~np.isnan(self.data["Flux_Points_Energy"])
table["e_ref"] = self.data["Flux_Points_Energy"][mask]
table["e_min"] = self.data["Flux_Points_Energy_Min"][mask]
table["e_max"] = self.data["Flux_Points_Energy_Max"][mask]
table["dnde"] = self.data["Flux_Points_Flux"][mask]
table["dnde_errn"] = self.data["Flux_Points_Flux_Err_Lo"][mask]
table["dnde_errp"] = self.data["Flux_Points_Flux_Err_Hi"][mask]
table["dnde_ul"] = self.data["Flux_Points_Flux_UL"][mask]
table["is_ul"] = self.data["Flux_Points_Flux_Is_UL"][mask].astype("bool")
return FluxPoints(table)
[docs]class SourceCatalogHGPS(SourceCatalog):
"""HESS Galactic plane survey (HGPS) source catalog.
Reference: https://www.mpi-hd.mpg.de/hfm/HESS/hgps/
One source is represented by `SourceCatalogObjectHGPS`.
An extensive tutorial is available here: :gp-extra-notebook:`hgps`
Examples
--------
Let's assume you have downloaded the HGPS catalog FITS file, e.g. via:
.. code-block:: bash
curl -O https://www.mpi-hd.mpg.de/hfm/HESS/hgps/data/hgps_catalog_v1.fits.gz
Then you can load it up like this:
>>> from gammapy.catalog import SourceCatalogHGPS
>>> filename = 'hgps_catalog_v1.fits.gz'
>>> cat = SourceCatalogHGPS(filename)
Access a source by name:
>>> source = cat['HESS J1843-033']
>>> print(source)
Access source spectral data and plot it:
>>> source.spectral_model().plot(source.energy_range)
>>> source.spectral_model().plot_error(source.energy_range)
>>> source.flux_points.plot()
Gaussian component information can be accessed as well,
either via the source, or via the catalog:
>>> source.components
>>> cat.gaussian_component(83)
More examples here: :gp-extra-notebook:`hgps`
"""
name = "hgps"
"""Source catalog name (str)."""
description = "H.E.S.S. Galactic plane survey (HGPS) source catalog"
"""Source catalog description (str)."""
source_object_class = SourceCatalogObjectHGPS
def __init__(self, filename=None, hdu="HGPS_SOURCES"):
if not filename:
filename = (
Path(os.environ["HGPS_ANALYSIS"])
/ "data/catalogs/HGPS3/release/HGPS_v0.4.fits"
)
filename = str(make_path(filename))
table = Table.read(filename, hdu=hdu)
source_name_alias = ("Identified_Object",)
super(SourceCatalogHGPS, self).__init__(
table=table, source_name_alias=source_name_alias
)
self._table_components = Table.read(filename, hdu="HGPS_GAUSS_COMPONENTS")
self._table_associations = Table.read(filename, hdu="HGPS_ASSOCIATIONS")
self._table_identifications = Table.read(filename, hdu="HGPS_IDENTIFICATIONS")
self._table_large_scale_component = Table.read(
filename, hdu="HGPS_LARGE_SCALE_COMPONENT"
)
@property
def table_components(self):
"""Gaussian component table (`~astropy.table.Table`)"""
return self._table_components
@property
def table_associations(self):
"""Source association table (`~astropy.table.Table`)"""
return self._table_associations
@property
def table_identifications(self):
"""Source identification table (`~astropy.table.Table`)"""
return self._table_identifications
@property
def table_large_scale_component(self):
"""Large scale component table (`~astropy.table.Table`)"""
return self._table_large_scale_component
@property
def large_scale_component(self):
"""Large scale component model (`~gammapy.catalog.SourceCatalogLargeScaleHGPS`).
"""
table = self.table_large_scale_component
return SourceCatalogLargeScaleHGPS(table, spline_kwargs=dict(k=3, s=0, ext=0))
def _make_source_object(self, index):
"""Make `SourceCatalogObject` for given row index"""
source = super(SourceCatalogHGPS, self)._make_source_object(index)
if source.data["Components"] != "":
self._attach_component_info(source)
self._attach_association_info(source)
if source.data["Source_Class"] != "Unid":
self._attach_identification_info(source)
return source
def _attach_component_info(self, source):
source.components = []
lookup = SourceCatalog(self.table_components, source_name_key="Component_ID")
for name in source.data["Components"].split(", "):
component = SourceCatalogObjectHGPSComponent(data=lookup[name].data)
source.components.append(component)
def _attach_association_info(self, source):
t = self.table_associations
mask = source.data["Source_Name"] == t["Source_Name"]
source.associations = t[mask]
def _attach_identification_info(self, source):
t = self._table_identifications
idx = np.nonzero(source.name == t["Source_Name"])[0][0]
source.identification_info = table_row_to_dict(t[idx])
[docs] def gaussian_component(self, row_idx):
"""Gaussian component (`SourceCatalogObjectHGPSComponent`)."""
data = table_row_to_dict(self.table_components[row_idx])
data["row_index"] = row_idx
return SourceCatalogObjectHGPSComponent(data=data)
[docs]class SourceCatalogLargeScaleHGPS(object):
"""Gaussian band model.
This 2-dimensional model is Gaussian in ``y`` for a given ``x``,
and the Gaussian parameters can vary in ``x``.
One application of this model is the diffuse emission along the
Galactic plane, i.e. ``x = GLON`` and ``y = GLAT``.
Parameters
----------
table : `~astropy.table.Table`
Table of Gaussian parameters.
``x``, ``amplitude``, ``mean``, ``stddev``.
interp_kwargs : dict
Keyword arguments passed to `ScaledRegularGridInterpolator`
"""
def __init__(self, table, interp_kwargs=None):
interp_kwargs = interp_kwargs or {}
interp_kwargs.setdefault("values_scale", "lin")
self.table = table
glon = Angle(self.table["GLON"]).wrap_at("180d")
interps = {}
for column in table.colnames:
values = self.table[column].quantity
interp = ScaledRegularGridInterpolator(
(glon.degree,), values, **interp_kwargs
)
interps[column] = interp
self._interp = interps
def _interpolate_parameter(self, parname, glon):
glon = glon.wrap_at("180d")
return self._interp[parname]((np.asanyarray(glon.degree),), clip=False)
[docs] def peak_brightness(self, glon):
"""Peak brightness at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Longitude`
Galactic Longitude.
"""
return self._interpolate_parameter("Surface_Brightness", glon)
[docs] def peak_brightness_error(self, glon):
"""Peak brightness error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Surface_Brightness_Err", glon)
[docs] def width(self, glon):
"""Width at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Width", glon)
[docs] def width_error(self, glon):
"""Width error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Width_Err", glon)
[docs] def peak_latitude(self, glon):
"""Peak position at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("GLAT", glon)
[docs] def peak_latitude_error(self, glon):
"""Peak position error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("GLAT_Err", glon)
[docs] def evaluate(self, position):
"""Evaluate model at a given position.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
Position on the sky.
"""
glon, glat = position.galactic.l, position.galactic.b
width = self.width(glon)
amplitude = self.peak_brightness(glon)
mean = self.peak_latitude(glon)
return Gaussian1D.evaluate(glat, amplitude=amplitude, mean=mean, stddev=width)