# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""HESS Galactic plane survey (HGPS) catalog."""
import numpy as np
import astropy.units as u
from astropy.coordinates import Angle
from astropy.modeling.models import Gaussian1D
from astropy.table import Table
from gammapy.modeling.models import (
ExpCutoffPowerLawSpectralModel,
GaussianSpatialModel,
PointSpatialModel,
PowerLawSpectralModel,
ShellSpatialModel,
SkyModel,
SkyModels,
)
from gammapy.spectrum import FluxPoints
from gammapy.utils.interpolation import ScaledRegularGridInterpolator
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_row_to_dict
from .core import SourceCatalog, SourceCatalogObject
__all__ = [
"SourceCatalogHGPS",
"SourceCatalogObjectHGPS",
"SourceCatalogObjectHGPSComponent",
"SourceCatalogLargeScaleHGPS",
]
# Flux factor, used for printing
FF = 1e-12
# Multiplicative factor to go from cm^-2 s^-1 to % Crab for integral flux > 1 TeV
# Here we use the same Crab reference that's used in the HGPS paper
# CRAB = crab_integral_flux(energy_min=1, reference='hess_ecpl')
FLUX_TO_CRAB = 100 / 2.26e-11
FLUX_TO_CRAB_DIFF = 100 / 3.5060459323111307e-11
[docs]class SourceCatalogObjectHGPSComponent:
"""One Gaussian component from the HGPS catalog.
See Also
--------
SourceCatalogHGPS, SourceCatalogObjectHGPS
"""
_source_name_key = "Component_ID"
_source_index_key = "row_index"
def __init__(self, data):
self.data = data
def __repr__(self):
return f"{self.__class__.__name__}({self.name!r})"
def __str__(self):
"""Pretty-print source data"""
d = self.data
ss = "Component {}:\n".format(d["Component_ID"])
fmt = "{:<20s} : {:8.3f} +/- {:.3f} deg\n"
ss += fmt.format("GLON", d["GLON"].value, d["GLON_Err"].value)
ss += fmt.format("GLAT", d["GLAT"].value, d["GLAT_Err"].value)
fmt = "{:<20s} : {:.3f} +/- {:.3f} deg\n"
ss += fmt.format("Size", d["Size"].value, d["Size_Err"].value)
val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value
fmt = "{:<20s} : ({:.2f} +/- {:.2f}) x 10^-12 cm^-2 s^-1 = ({:.1f} +/- {:.1f}) % Crab"
ss += fmt.format(
"Flux (>1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB
)
return ss
@property
def name(self):
"""Source name (str)"""
name = self.data[self._source_name_key]
return name.strip()
@property
def index(self):
"""Row index of source in table (int)"""
return self.data[self._source_index_key]
@property
def spatial_model(self):
"""Component spatial model (`~gammapy.modeling.models.GaussianSpatialModel`)."""
d = self.data
model = GaussianSpatialModel(
lon_0=d["GLON"], lat_0=d["GLAT"], sigma=d["Size"], frame="galactic"
)
model.parameters.set_parameter_errors(
dict(lon_0=d["GLON_Err"], lat_0=d["GLAT_Err"], sigma=d["Size_Err"])
)
return model
[docs]class SourceCatalogObjectHGPS(SourceCatalogObject):
"""One object from the HGPS catalog.
The catalog is represented by `SourceCatalogHGPS`.
Examples are given there.
"""
def __repr__(self):
return f"{self.__class__.__name__}({self.name!r})"
def __str__(self):
return self.info()
[docs] def info(self, info="all"):
"""Info string.
Parameters
----------
info : {'all', 'basic', 'map', 'spec', 'flux_points', 'components', 'associations', 'id'}
Comma separated list of options
"""
if info == "all":
info = "basic,associations,id,map,spec,flux_points,components"
ss = ""
ops = info.split(",")
if "basic" in ops:
ss += self._info_basic()
if "map" in ops:
ss += self._info_map()
if "spec" in ops:
ss += self._info_spec()
if "flux_points" in ops:
ss += self._info_flux_points()
if "components" in ops and hasattr(self, "components"):
ss += self._info_components()
if "associations" in ops:
ss += self._info_associations()
if "id" in ops and hasattr(self, "identification_info"):
ss += self._info_id()
return ss
def _info_basic(self):
"""Print basic info."""
d = self.data
ss = "\n*** Basic info ***\n\n"
ss += "Catalog row index (zero-based) : {}\n".format(d["catalog_row_index"])
ss += "{:<20s} : {}\n".format("Source name", d["Source_Name"])
ss += "{:<20s} : {}\n".format("Analysis reference", d["Analysis_Reference"])
ss += "{:<20s} : {}\n".format("Source class", d["Source_Class"])
ss += "{:<20s} : {}\n".format("Identified object", d["Identified_Object"])
ss += "{:<20s} : {}\n".format("Gamma-Cat id", d["Gamma_Cat_Source_ID"])
ss += "\n"
return ss
def _info_associations(self):
ss = "\n*** Source associations info ***\n\n"
lines = self.associations.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_id(self):
ss = "\n*** Source identification info ***\n\n"
ss += "\n".join(f"{k}: {v}" for k, v in self.identification_info.items())
return ss + "\n"
def _info_map(self):
"""Print info from map analysis."""
d = self.data
ss = "\n*** Info from map analysis ***\n\n"
ra_str = Angle(d["RAJ2000"], "deg").to_string(unit="hour", precision=0)
dec_str = Angle(d["DEJ2000"], "deg").to_string(unit="deg", precision=0)
ss += "{:<20s} : {:8.3f} = {}\n".format("RA", d["RAJ2000"], ra_str)
ss += "{:<20s} : {:8.3f} = {}\n".format("DEC", d["DEJ2000"], dec_str)
ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format(
"GLON", d["GLON"].value, d["GLON_Err"].value
)
ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format(
"GLAT", d["GLAT"].value, d["GLAT_Err"].value
)
ss += "{:<20s} : {:.3f}\n".format("Position Error (68%)", d["Pos_Err_68"])
ss += "{:<20s} : {:.3f}\n".format("Position Error (95%)", d["Pos_Err_95"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_Number"])
ss += "{:<20s} : {}\n".format("Spatial model", d["Spatial_Model"])
ss += "{:<20s} : {}\n".format("Spatial components", d["Components"])
ss += "{:<20s} : {:.1f}\n".format("TS", d["Sqrt_TS"] ** 2)
ss += "{:<20s} : {:.1f}\n".format("sqrt(TS)", d["Sqrt_TS"])
ss += "{:<20s} : {:.3f} +/- {:.3f} (UL: {:.3f}) deg\n".format(
"Size", d["Size"].value, d["Size_Err"].value, d["Size_UL"].value
)
ss += "{:<20s} : {:.3f}\n".format("R70", d["R70"])
ss += "{:<20s} : {:.3f}\n".format("RSpec", d["RSpec"])
ss += "{:<20s} : {:.1f}\n".format("Total model excess", d["Excess_Model_Total"])
ss += "{:<20s} : {:.1f}\n".format("Excess in RSpec", d["Excess_RSpec"])
ss += "{:<20s} : {:.1f}\n".format(
"Model Excess in RSpec", d["Excess_RSpec_Model"]
)
ss += "{:<20s} : {:.1f}\n".format("Background in RSpec", d["Background_RSpec"])
ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime"].value)
ss += "{:<20s} : {:.2f}\n".format("Energy threshold", d["Energy_Threshold"])
val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Source flux (>1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
ss += "\nFluxes in RSpec (> 1 TeV):\n"
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Map measurement",
d["Flux_Map_RSpec_Data"].value / FF,
d["Flux_Map_RSpec_Data"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Source model",
d["Flux_Map_RSpec_Source"].value / FF,
d["Flux_Map_RSpec_Source"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Other component model",
d["Flux_Map_RSpec_Other"].value / FF,
d["Flux_Map_RSpec_Other"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Large scale component model",
d["Flux_Map_RSpec_LS"].value / FF,
d["Flux_Map_RSpec_LS"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Total model",
d["Flux_Map_RSpec_Total"].value / FF,
d["Flux_Map_RSpec_Total"].value * FLUX_TO_CRAB,
)
ss += "{:<35s} : {:5.1f} %\n".format(
"Containment in RSpec", 100 * d["Containment_RSpec"]
)
ss += "{:<35s} : {:5.1f} %\n".format(
"Contamination in RSpec", 100 * d["Contamination_RSpec"]
)
label, val = (
"Flux correction (RSpec -> Total)",
100 * d["Flux_Correction_RSpec_To_Total"],
)
ss += f"{label:<35s} : {val:5.1f} %\n"
label, val = (
"Flux correction (Total -> RSpec)",
100 * (1 / d["Flux_Correction_RSpec_To_Total"]),
)
ss += f"{label:<35s} : {val:5.1f} %\n"
return ss
def _info_spec(self):
"""Print info from spectral analysis."""
d = self.data
ss = "\n*** Info from spectral analysis ***\n\n"
ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime_Spec"].value)
lo = d["Energy_Range_Spec_Min"].value
hi = d["Energy_Range_Spec_Max"].value
ss += "{:<20s} : {:.2f} to {:.2f} TeV\n".format("Energy range:", lo, hi)
ss += "{:<20s} : {:.1f}\n".format("Background", d["Background_Spec"])
ss += "{:<20s} : {:.1f}\n".format("Excess", d["Excess_Spec"])
ss += "{:<20s} : {}\n".format("Spectral model", d["Spectral_Model"])
val = d["TS_ECPL_over_PL"]
ss += "{:<20s} : {:.1f}\n".format("TS ECPL over PL", val)
val = d["Flux_Spec_Int_1TeV"].value
err = d["Flux_Spec_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Best-fit model flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Flux_Spec_Energy_1_10_TeV"].value
err = d["Flux_Spec_Energy_1_10_TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 erg cm^-2 s^-1\n".format(
"Best-fit model energy flux(1 to 10 TeV)", val / FF, err / FF
)
ss += self._info_spec_pl()
ss += self._info_spec_ecpl()
return ss
def _info_spec_pl(self):
d = self.data
ss = "{:<20s} : {:.2f}\n".format("Pivot energy", d["Energy_Spec_PL_Pivot"])
val = d["Flux_Spec_PL_Diff_Pivot"].value
err = d["Flux_Spec_PL_Diff_Pivot_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Flux at pivot energy",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Flux_Spec_PL_Int_1TeV"].value
err = d["Flux_Spec_PL_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"PL Flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Flux_Spec_PL_Diff_1TeV"].value
err = d["Flux_Spec_PL_Diff_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"PL Flux(@ 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Index_Spec_PL"]
err = d["Index_Spec_PL_Err"]
ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("PL Index", val, err)
return ss
def _info_spec_ecpl(self):
d = self.data
ss = ""
# ss = '{:<20s} : {:.1f}\n'.format('Pivot energy', d['Energy_Spec_ECPL_Pivot'])
val = d["Flux_Spec_ECPL_Diff_1TeV"].value
err = d["Flux_Spec_ECPL_Diff_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"ECPL Flux(@ 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Flux_Spec_ECPL_Int_1TeV"].value
err = d["Flux_Spec_ECPL_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"ECPL Flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Index_Spec_ECPL"]
err = d["Index_Spec_ECPL_Err"]
ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("ECPL Index", val, err)
val = d["Lambda_Spec_ECPL"].value
err = d["Lambda_Spec_ECPL_Err"].value
ss += "{:<20s} : {:.3f} +/- {:.3f} TeV^-1\n".format("ECPL Lambda", val, err)
# Use Gaussian analytical error propagation,
# tested against the uncertainties package
err = err / val ** 2
val = 1.0 / val
ss += "{:<20s} : {:.2f} +/- {:.2f} TeV\n".format("ECPL E_cut", val, err)
return ss
def _info_flux_points(self):
"""Print flux point results"""
d = self.data
ss = "\n*** Flux points info ***\n\n"
ss += "Number of flux points: {}\n".format(d["N_Flux_Points"])
ss += "Flux points table: \n\n"
lines = self.flux_points.table_formatted.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_components(self):
"""Print info about the components."""
ss = "\n*** Gaussian component info ***\n\n"
ss += "Number of components: {}\n".format(len(self.components))
ss += "{:<20s} : {}\n\n".format("Spatial components", self.data["Components"])
for component in self.components:
ss += str(component)
ss += "\n\n"
return ss
@property
def energy_range(self):
"""Spectral model energy range (`~astropy.units.Quantity` with length 2)."""
emin, emax = (
self.data["Energy_Range_Spec_Min"],
self.data["Energy_Range_Spec_Max"],
)
if np.isnan(emin):
emin = u.Quantity(0.2, "TeV")
if np.isnan(emax):
emax = u.Quantity(50, "TeV")
return u.Quantity([emin, emax], "TeV")
@property
def spectral_model_type(self):
"""Spectral model type (str).
One of: 'pl', 'ecpl'
"""
return self.data["Spectral_Model"].strip().lower()
[docs] def spectral_model(self, which="best"):
"""Spectral model (`~gammapy.modeling.models.SpectralModel`).
One of the following models (given by ``Spectral_Model`` in the catalog):
- ``PL`` : `~gammapy.modeling.models.PowerLawSpectralModel`
- ``ECPL`` : `~gammapy.modeling.models.ExpCutoffPowerLawSpectralModel`
Parameters
----------
which : {'best', 'pl', 'ecpl'}
Which spectral model
"""
data = self.data
if which == "best":
spec_type = self.spectral_model_type
elif which in {"pl", "ecpl"}:
spec_type = which
else:
raise ValueError(f"Invalid selection: which = {which!r}")
pars, errs = {}, {}
if spec_type == "pl":
pars["index"] = data["Index_Spec_PL"]
pars["amplitude"] = data["Flux_Spec_PL_Diff_Pivot"]
pars["reference"] = data["Energy_Spec_PL_Pivot"]
errs["amplitude"] = data["Flux_Spec_PL_Diff_Pivot_Err"]
errs["index"] = data["Index_Spec_PL_Err"]
model = PowerLawSpectralModel(**pars)
elif spec_type == "ecpl":
pars["index"] = data["Index_Spec_ECPL"]
pars["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot"]
pars["reference"] = data["Energy_Spec_ECPL_Pivot"]
pars["lambda_"] = data["Lambda_Spec_ECPL"]
errs["index"] = data["Index_Spec_ECPL_Err"]
errs["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot_Err"]
errs["lambda_"] = data["Lambda_Spec_ECPL_Err"]
model = ExpCutoffPowerLawSpectralModel(**pars)
else:
raise ValueError(f"Invalid spectral model: {spec_type}")
model.parameters.set_parameter_errors(errs)
return model
@property
def spatial_model_type(self):
"""Spatial model type (str).
One of: 'point-like', 'shell', 'gaussian', '2-gaussian', '3-gaussian'
"""
return self.data["Spatial_Model"].strip().lower()
@property
def is_pointlike(self):
"""Source is pointlike? (bool)"""
d = self.data
has_size_ul = np.isfinite(d["Size_UL"])
pointlike = d["Spatial_Model"] == "Point-Like"
return pointlike or has_size_ul
@property
def spatial_model(self):
"""Spatial model (`~gammapy.modeling.models.SpatialModel`).
One of the following models (given by ``Spatial_Model`` in the catalog):
- ``Point-Like`` or has a size upper limit : `~gammapy.modeling.models.PointSpatialModel`
- ``Gaussian``: `~gammapy.modeling.models.GaussianSpatialModel`
- ``2-Gaussian`` or ``3-Gaussian``: composite model (using ``+`` with Gaussians)
- ``Shell``: `~gammapy.modeling.models.ShellSpatialModel`
"""
d = self.data
glon = d["GLON"]
glat = d["GLAT"]
spatial_type = self.spatial_model_type
if self.is_pointlike:
model = PointSpatialModel(lon_0=glon, lat_0=glat, frame="galactic")
elif spatial_type == "gaussian":
model = GaussianSpatialModel(
lon_0=glon, lat_0=glat, sigma=d["Size"], frame="galactic"
)
elif spatial_type in {"2-gaussian", "3-gaussian"}:
raise ValueError("For Gaussian or Multi-Gaussian models, use sky_model()!")
elif spatial_type == "shell":
# HGPS contains no information on shell width
# Here we assume a 5% shell width for all shells.
r_out = d["Size"]
radius = 0.95 * r_out
width = r_out - radius
model = ShellSpatialModel(
lon_0=glon, lat_0=glat, width=width, radius=radius, frame="galactic"
)
else:
raise ValueError(f"Not a valid spatial model: {spatial_type}")
return model
[docs] def sky_model(self, which="best"):
"""Source sky model.
Parameters
----------
which : {'best', 'pl', 'ecpl'}
Which spectral model
Returns
-------
sky_model : `~gammapy.modeling.models.SkyModel`
Sky model of the catalog object.
"""
if self.spatial_model_type in {"2-gaussian", "3-gaussian"}:
models = []
spectral_model = self.spectral_model(which=which)
for component in self.components:
weight = component.data["Flux_Map"] / self.data["Flux_Map"]
spectral_model_comp = spectral_model.copy()
# weight amplitude of the component
spectral_model_comp.parameters["amplitude"].value *= weight
models.append(
SkyModel(
component.spatial_model,
spectral_model_comp,
name=component.name,
)
)
return SkyModels(models)
else:
spatial_model = self.spatial_model
spectral_model = self.spectral_model(which=which)
return SkyModel(spatial_model, spectral_model, name=self.name)
@property
def flux_points(self):
"""Flux points (`~gammapy.spectrum.FluxPoints`)."""
table = Table()
table.meta["SED_TYPE"] = "dnde"
mask = ~np.isnan(self.data["Flux_Points_Energy"])
table["e_ref"] = self.data["Flux_Points_Energy"][mask]
table["e_min"] = self.data["Flux_Points_Energy_Min"][mask]
table["e_max"] = self.data["Flux_Points_Energy_Max"][mask]
table["dnde"] = self.data["Flux_Points_Flux"][mask]
table["dnde_errn"] = self.data["Flux_Points_Flux_Err_Lo"][mask]
table["dnde_errp"] = self.data["Flux_Points_Flux_Err_Hi"][mask]
table["dnde_ul"] = self.data["Flux_Points_Flux_UL"][mask]
table["is_ul"] = self.data["Flux_Points_Flux_Is_UL"][mask].astype("bool")
return FluxPoints(table)
[docs]class SourceCatalogHGPS(SourceCatalog):
"""HESS Galactic plane survey (HGPS) source catalog.
Reference: https://www.mpi-hd.mpg.de/hfm/HESS/hgps/
One source is represented by `SourceCatalogObjectHGPS`.
An extensive tutorial is available here: :gp-notebook:`hgps`
Examples
--------
Let's assume you have downloaded the HGPS catalog FITS file, e.g. via:
.. code-block:: bash
curl -O https://www.mpi-hd.mpg.de/hfm/HESS/hgps/data/hgps_catalog_v1.fits.gz
Then you can load it up like this:
>>> from gammapy.catalog import SourceCatalogHGPS
>>> filename = 'hgps_catalog_v1.fits.gz'
>>> cat = SourceCatalogHGPS(filename)
Access a source by name:
>>> source = cat['HESS J1843-033']
>>> print(source)
Access source spectral data and plot it:
>>> source.spectral_model().plot(source.energy_range)
>>> source.spectral_model().plot_error(source.energy_range)
>>> source.flux_points.plot()
Gaussian component information can be accessed as well,
either via the source, or via the catalog:
>>> source.components
>>> cat.gaussian_component(83)
More examples here: :gp-notebook:`hgps`
"""
name = "hgps"
"""Source catalog name (str)."""
description = "H.E.S.S. Galactic plane survey (HGPS) source catalog"
"""Source catalog description (str)."""
source_object_class = SourceCatalogObjectHGPS
def __init__(self, filename=None, hdu="HGPS_SOURCES"):
if not filename:
filename = "$GAMMAPY_DATA/catalogs/hgps_catalog_v1.fits.gz"
filename = str(make_path(filename))
table = Table.read(filename, hdu=hdu)
source_name_alias = ("Identified_Object",)
super().__init__(table=table, source_name_alias=source_name_alias)
self._table_components = Table.read(filename, hdu="HGPS_GAUSS_COMPONENTS")
self._table_associations = Table.read(filename, hdu="HGPS_ASSOCIATIONS")
self._table_associations["Separation"].format = ".6f"
self._table_identifications = Table.read(filename, hdu="HGPS_IDENTIFICATIONS")
self._table_large_scale_component = Table.read(
filename, hdu="HGPS_LARGE_SCALE_COMPONENT"
)
@property
def table_components(self):
"""Gaussian component table (`~astropy.table.Table`)"""
return self._table_components
@property
def table_associations(self):
"""Source association table (`~astropy.table.Table`)"""
return self._table_associations
@property
def table_identifications(self):
"""Source identification table (`~astropy.table.Table`)"""
return self._table_identifications
@property
def table_large_scale_component(self):
"""Large scale component table (`~astropy.table.Table`)"""
return self._table_large_scale_component
@property
def large_scale_component(self):
"""Large scale component model (`~gammapy.catalog.SourceCatalogLargeScaleHGPS`)."""
return SourceCatalogLargeScaleHGPS(self.table_large_scale_component)
def _make_source_object(self, index):
"""Make `SourceCatalogObject` for given row index"""
source = super()._make_source_object(index)
if source.data["Components"] != "":
self._attach_component_info(source)
self._attach_association_info(source)
if source.data["Source_Class"] != "Unid":
self._attach_identification_info(source)
return source
def _attach_component_info(self, source):
source.components = []
lookup = SourceCatalog(self.table_components, source_name_key="Component_ID")
for name in source.data["Components"].split(", "):
component = SourceCatalogObjectHGPSComponent(data=lookup[name].data)
source.components.append(component)
def _attach_association_info(self, source):
t = self.table_associations
mask = source.data["Source_Name"] == t["Source_Name"]
source.associations = t[mask]
def _attach_identification_info(self, source):
t = self._table_identifications
idx = np.nonzero(source.name == t["Source_Name"])[0][0]
source.identification_info = table_row_to_dict(t[idx])
[docs] def gaussian_component(self, row_idx):
"""Gaussian component (`SourceCatalogObjectHGPSComponent`)."""
data = table_row_to_dict(self.table_components[row_idx])
data["row_index"] = row_idx
return SourceCatalogObjectHGPSComponent(data=data)
[docs]class SourceCatalogLargeScaleHGPS:
"""Gaussian band model.
This 2-dimensional model is Gaussian in ``y`` for a given ``x``,
and the Gaussian parameters can vary in ``x``.
One application of this model is the diffuse emission along the
Galactic plane, i.e. ``x = GLON`` and ``y = GLAT``.
Parameters
----------
table : `~astropy.table.Table`
Table of Gaussian parameters.
``x``, ``amplitude``, ``mean``, ``stddev``.
interp_kwargs : dict
Keyword arguments passed to `ScaledRegularGridInterpolator`
"""
def __init__(self, table, interp_kwargs=None):
interp_kwargs = interp_kwargs or {}
interp_kwargs.setdefault("values_scale", "lin")
self.table = table
glon = Angle(self.table["GLON"]).wrap_at("180d")
interps = {}
for column in table.colnames:
values = self.table[column].quantity
interp = ScaledRegularGridInterpolator((glon,), values, **interp_kwargs)
interps[column] = interp
self._interp = interps
def _interpolate_parameter(self, parname, glon):
glon = glon.wrap_at("180d")
return self._interp[parname]((np.asanyarray(glon),), clip=False)
[docs] def peak_brightness(self, glon):
"""Peak brightness at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Surface_Brightness", glon)
[docs] def peak_brightness_error(self, glon):
"""Peak brightness error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Surface_Brightness_Err", glon)
[docs] def width(self, glon):
"""Width at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Width", glon)
[docs] def width_error(self, glon):
"""Width error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Width_Err", glon)
[docs] def peak_latitude(self, glon):
"""Peak position at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("GLAT", glon)
[docs] def peak_latitude_error(self, glon):
"""Peak position error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("GLAT_Err", glon)
[docs] def evaluate(self, position):
"""Evaluate model at a given position.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
Position on the sky.
"""
glon, glat = position.galactic.l, position.galactic.b
width = self.width(glon)
amplitude = self.peak_brightness(glon)
mean = self.peak_latitude(glon)
return Gaussian1D.evaluate(glat, amplitude=amplitude, mean=mean, stddev=width)