# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""HESS Galactic plane survey (HGPS) catalog."""
import numpy as np
import astropy.units as u
from astropy.coordinates import Angle
from astropy.modeling.models import Gaussian1D
from astropy.table import Table
from gammapy.modeling.models import Model, Models, SkyModel
from gammapy.spectrum import FluxPoints
from gammapy.utils.interpolation import ScaledRegularGridInterpolator
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_row_to_dict
from .core import SourceCatalog, SourceCatalogObject
__all__ = [
"SourceCatalogHGPS",
"SourceCatalogObjectHGPS",
"SourceCatalogObjectHGPSComponent",
"SourceCatalogLargeScaleHGPS",
]
# Flux factor, used for printing
FF = 1e-12
# Multiplicative factor to go from cm^-2 s^-1 to % Crab for integral flux > 1 TeV
# Here we use the same Crab reference that's used in the HGPS paper
# CRAB = crab_integral_flux(energy_min=1, reference='hess_ecpl')
FLUX_TO_CRAB = 100 / 2.26e-11
FLUX_TO_CRAB_DIFF = 100 / 3.5060459323111307e-11
[docs]class SourceCatalogObjectHGPSComponent(SourceCatalogObject):
"""One Gaussian component from the HGPS catalog.
See Also
--------
SourceCatalogHGPS, SourceCatalogObjectHGPS
"""
_source_name_key = "Component_ID"
def __init__(self, data):
self.data = data
def __repr__(self):
return f"{self.__class__.__name__}({self.name!r})"
def __str__(self):
"""Pretty-print source data"""
d = self.data
ss = "Component {}:\n".format(d["Component_ID"])
fmt = "{:<20s} : {:8.3f} +/- {:.3f} deg\n"
ss += fmt.format("GLON", d["GLON"].value, d["GLON_Err"].value)
ss += fmt.format("GLAT", d["GLAT"].value, d["GLAT_Err"].value)
fmt = "{:<20s} : {:.3f} +/- {:.3f} deg\n"
ss += fmt.format("Size", d["Size"].value, d["Size_Err"].value)
val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value
fmt = "{:<20s} : ({:.2f} +/- {:.2f}) x 10^-12 cm^-2 s^-1 = ({:.1f} +/- {:.1f}) % Crab"
ss += fmt.format(
"Flux (>1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB
)
return ss
@property
def name(self):
"""Source name (str)"""
return self.data[self._source_name_key]
[docs] def spatial_model(self):
"""Component spatial model (`~gammapy.modeling.models.GaussianSpatialModel`)."""
d = self.data
tag = "GaussianSpatialModel"
pars = {
"lon_0": d["GLON"],
"lat_0": d["GLAT"],
"sigma": d["Size"],
"frame": "galactic",
}
errs = {"lon_0": d["GLON_Err"], "lat_0": d["GLAT_Err"], "sigma": d["Size_Err"]}
model = Model.create(tag, **pars)
model.parameters.set_error(**errs)
return model
[docs]class SourceCatalogObjectHGPS(SourceCatalogObject):
"""One object from the HGPS catalog.
The catalog is represented by `SourceCatalogHGPS`.
Examples are given there.
"""
def __repr__(self):
return f"{self.__class__.__name__}({self.name!r})"
def __str__(self):
return self.info()
[docs] def info(self, info="all"):
"""Info string.
Parameters
----------
info : {'all', 'basic', 'map', 'spec', 'flux_points', 'components', 'associations', 'id'}
Comma separated list of options
"""
if info == "all":
info = "basic,associations,id,map,spec,flux_points,components"
ss = ""
ops = info.split(",")
if "basic" in ops:
ss += self._info_basic()
if "map" in ops:
ss += self._info_map()
if "spec" in ops:
ss += self._info_spec()
if "flux_points" in ops:
ss += self._info_flux_points()
if "components" in ops and hasattr(self, "components"):
ss += self._info_components()
if "associations" in ops:
ss += self._info_associations()
if "id" in ops and hasattr(self, "identification_info"):
ss += self._info_id()
return ss
def _info_basic(self):
"""Print basic info."""
d = self.data
ss = "\n*** Basic info ***\n\n"
ss += "Catalog row index (zero-based) : {}\n".format(self.row_index)
ss += "{:<20s} : {}\n".format("Source name", self.name)
ss += "{:<20s} : {}\n".format("Analysis reference", d["Analysis_Reference"])
ss += "{:<20s} : {}\n".format("Source class", d["Source_Class"])
ss += "{:<20s} : {}\n".format("Identified object", d["Identified_Object"])
ss += "{:<20s} : {}\n".format("Gamma-Cat id", d["Gamma_Cat_Source_ID"])
ss += "\n"
return ss
def _info_associations(self):
ss = "\n*** Source associations info ***\n\n"
lines = self.associations.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_id(self):
ss = "\n*** Source identification info ***\n\n"
ss += "\n".join(f"{k}: {v}" for k, v in self.identification_info.items())
return ss + "\n"
def _info_map(self):
"""Print info from map analysis."""
d = self.data
ss = "\n*** Info from map analysis ***\n\n"
ra_str = Angle(d["RAJ2000"], "deg").to_string(unit="hour", precision=0)
dec_str = Angle(d["DEJ2000"], "deg").to_string(unit="deg", precision=0)
ss += "{:<20s} : {:8.3f} = {}\n".format("RA", d["RAJ2000"], ra_str)
ss += "{:<20s} : {:8.3f} = {}\n".format("DEC", d["DEJ2000"], dec_str)
ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format(
"GLON", d["GLON"].value, d["GLON_Err"].value
)
ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format(
"GLAT", d["GLAT"].value, d["GLAT_Err"].value
)
ss += "{:<20s} : {:.3f}\n".format("Position Error (68%)", d["Pos_Err_68"])
ss += "{:<20s} : {:.3f}\n".format("Position Error (95%)", d["Pos_Err_95"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_Number"])
ss += "{:<20s} : {}\n".format("Spatial model", d["Spatial_Model"])
ss += "{:<20s} : {}\n".format("Spatial components", d["Components"])
ss += "{:<20s} : {:.1f}\n".format("TS", d["Sqrt_TS"] ** 2)
ss += "{:<20s} : {:.1f}\n".format("sqrt(TS)", d["Sqrt_TS"])
ss += "{:<20s} : {:.3f} +/- {:.3f} (UL: {:.3f}) deg\n".format(
"Size", d["Size"].value, d["Size_Err"].value, d["Size_UL"].value
)
ss += "{:<20s} : {:.3f}\n".format("R70", d["R70"])
ss += "{:<20s} : {:.3f}\n".format("RSpec", d["RSpec"])
ss += "{:<20s} : {:.1f}\n".format("Total model excess", d["Excess_Model_Total"])
ss += "{:<20s} : {:.1f}\n".format("Excess in RSpec", d["Excess_RSpec"])
ss += "{:<20s} : {:.1f}\n".format(
"Model Excess in RSpec", d["Excess_RSpec_Model"]
)
ss += "{:<20s} : {:.1f}\n".format("Background in RSpec", d["Background_RSpec"])
ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime"].value)
ss += "{:<20s} : {:.2f}\n".format("Energy threshold", d["Energy_Threshold"])
val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Source flux (>1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
ss += "\nFluxes in RSpec (> 1 TeV):\n"
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Map measurement",
d["Flux_Map_RSpec_Data"].value / FF,
d["Flux_Map_RSpec_Data"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Source model",
d["Flux_Map_RSpec_Source"].value / FF,
d["Flux_Map_RSpec_Source"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Other component model",
d["Flux_Map_RSpec_Other"].value / FF,
d["Flux_Map_RSpec_Other"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Large scale component model",
d["Flux_Map_RSpec_LS"].value / FF,
d["Flux_Map_RSpec_LS"].value * FLUX_TO_CRAB,
)
ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format(
"Total model",
d["Flux_Map_RSpec_Total"].value / FF,
d["Flux_Map_RSpec_Total"].value * FLUX_TO_CRAB,
)
ss += "{:<35s} : {:5.1f} %\n".format(
"Containment in RSpec", 100 * d["Containment_RSpec"]
)
ss += "{:<35s} : {:5.1f} %\n".format(
"Contamination in RSpec", 100 * d["Contamination_RSpec"]
)
label, val = (
"Flux correction (RSpec -> Total)",
100 * d["Flux_Correction_RSpec_To_Total"],
)
ss += f"{label:<35s} : {val:5.1f} %\n"
label, val = (
"Flux correction (Total -> RSpec)",
100 * (1 / d["Flux_Correction_RSpec_To_Total"]),
)
ss += f"{label:<35s} : {val:5.1f} %\n"
return ss
def _info_spec(self):
"""Print info from spectral analysis."""
d = self.data
ss = "\n*** Info from spectral analysis ***\n\n"
ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime_Spec"].value)
lo = d["Energy_Range_Spec_Min"].value
hi = d["Energy_Range_Spec_Max"].value
ss += "{:<20s} : {:.2f} to {:.2f} TeV\n".format("Energy range:", lo, hi)
ss += "{:<20s} : {:.1f}\n".format("Background", d["Background_Spec"])
ss += "{:<20s} : {:.1f}\n".format("Excess", d["Excess_Spec"])
ss += "{:<20s} : {}\n".format("Spectral model", d["Spectral_Model"])
val = d["TS_ECPL_over_PL"]
ss += "{:<20s} : {:.1f}\n".format("TS ECPL over PL", val)
val = d["Flux_Spec_Int_1TeV"].value
err = d["Flux_Spec_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Best-fit model flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Flux_Spec_Energy_1_10_TeV"].value
err = d["Flux_Spec_Energy_1_10_TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 erg cm^-2 s^-1\n".format(
"Best-fit model energy flux(1 to 10 TeV)", val / FF, err / FF
)
ss += self._info_spec_pl()
ss += self._info_spec_ecpl()
return ss
def _info_spec_pl(self):
d = self.data
ss = "{:<20s} : {:.2f}\n".format("Pivot energy", d["Energy_Spec_PL_Pivot"])
val = d["Flux_Spec_PL_Diff_Pivot"].value
err = d["Flux_Spec_PL_Diff_Pivot_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"Flux at pivot energy",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Flux_Spec_PL_Int_1TeV"].value
err = d["Flux_Spec_PL_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"PL Flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Flux_Spec_PL_Diff_1TeV"].value
err = d["Flux_Spec_PL_Diff_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"PL Flux(@ 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Index_Spec_PL"]
err = d["Index_Spec_PL_Err"]
ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("PL Index", val, err)
return ss
def _info_spec_ecpl(self):
d = self.data
ss = ""
# ss = '{:<20s} : {:.1f}\n'.format('Pivot energy', d['Energy_Spec_ECPL_Pivot'])
val = d["Flux_Spec_ECPL_Diff_1TeV"].value
err = d["Flux_Spec_ECPL_Diff_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"ECPL Flux(@ 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB_DIFF,
)
val = d["Flux_Spec_ECPL_Int_1TeV"].value
err = d["Flux_Spec_ECPL_Int_1TeV_Err"].value
ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format(
"ECPL Flux(> 1 TeV)",
val / FF,
err / FF,
val * FLUX_TO_CRAB,
err * FLUX_TO_CRAB,
)
val = d["Index_Spec_ECPL"]
err = d["Index_Spec_ECPL_Err"]
ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("ECPL Index", val, err)
val = d["Lambda_Spec_ECPL"].value
err = d["Lambda_Spec_ECPL_Err"].value
ss += "{:<20s} : {:.3f} +/- {:.3f} TeV^-1\n".format("ECPL Lambda", val, err)
# Use Gaussian analytical error propagation,
# tested against the uncertainties package
err = err / val ** 2
val = 1.0 / val
ss += "{:<20s} : {:.2f} +/- {:.2f} TeV\n".format("ECPL E_cut", val, err)
return ss
def _info_flux_points(self):
"""Print flux point results"""
d = self.data
ss = "\n*** Flux points info ***\n\n"
ss += "Number of flux points: {}\n".format(d["N_Flux_Points"])
ss += "Flux points table: \n\n"
lines = self.flux_points.table_formatted.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_components(self):
"""Print info about the components."""
ss = "\n*** Gaussian component info ***\n\n"
ss += "Number of components: {}\n".format(len(self.components))
ss += "{:<20s} : {}\n\n".format("Spatial components", self.data["Components"])
for component in self.components:
ss += str(component)
ss += "\n\n"
return ss
@property
def energy_range(self):
"""Spectral model energy range (`~astropy.units.Quantity` with length 2)."""
emin, emax = (
self.data["Energy_Range_Spec_Min"],
self.data["Energy_Range_Spec_Max"],
)
if np.isnan(emin):
emin = u.Quantity(0.2, "TeV")
if np.isnan(emax):
emax = u.Quantity(50, "TeV")
return u.Quantity([emin, emax], "TeV")
[docs] def spectral_model(self, which="best"):
"""Spectral model (`~gammapy.modeling.models.SpectralModel`).
One of the following models (given by ``Spectral_Model`` in the catalog):
- ``PL`` : `~gammapy.modeling.models.PowerLawSpectralModel`
- ``ECPL`` : `~gammapy.modeling.models.ExpCutoffPowerLawSpectralModel`
Parameters
----------
which : {'best', 'pl', 'ecpl'}
Which spectral model
"""
data = self.data
if which == "best":
spec_type = self.data["Spectral_Model"].strip().lower()
elif which in {"pl", "ecpl"}:
spec_type = which
else:
raise ValueError(f"Invalid selection: which = {which!r}")
if spec_type == "pl":
tag = "PowerLawSpectralModel"
pars = {
"index": data["Index_Spec_PL"],
"amplitude": data["Flux_Spec_PL_Diff_Pivot"],
"reference": data["Energy_Spec_PL_Pivot"],
}
errs = {
"amplitude": data["Flux_Spec_PL_Diff_Pivot_Err"],
"index": data["Index_Spec_PL_Err"],
}
elif spec_type == "ecpl":
tag = "ExpCutoffPowerLawSpectralModel"
pars = {
"index": data["Index_Spec_ECPL"],
"amplitude": data["Flux_Spec_ECPL_Diff_Pivot"],
"reference": data["Energy_Spec_ECPL_Pivot"],
"lambda_": data["Lambda_Spec_ECPL"],
}
errs = {
"index": data["Index_Spec_ECPL_Err"],
"amplitude": data["Flux_Spec_ECPL_Diff_Pivot_Err"],
"lambda_": data["Lambda_Spec_ECPL_Err"],
}
else:
raise ValueError(f"Invalid spec_type: {spec_type}")
model = Model.create(tag, **pars)
model.parameters.set_error(**errs)
return model
[docs] def spatial_model(self):
"""Spatial model (`~gammapy.modeling.models.SpatialModel`).
One of the following models (given by ``Spatial_Model`` in the catalog):
- ``Point-Like`` or has a size upper limit : `~gammapy.modeling.models.PointSpatialModel`
- ``Gaussian``: `~gammapy.modeling.models.GaussianSpatialModel`
- ``2-Gaussian`` or ``3-Gaussian``: composite model (using ``+`` with Gaussians)
- ``Shell``: `~gammapy.modeling.models.ShellSpatialModel`
"""
d = self.data
pars = {"lon_0": d["GLON"], "lat_0": d["GLAT"], "frame": "galactic"}
errs = {"lon_0": d["GLON_Err"], "lat_0": d["GLAT_Err"]}
spatial_type = self.data["Spatial_Model"]
if spatial_type == "Point-Like":
tag = "PointSpatialModel"
elif spatial_type == "Gaussian":
tag = "GaussianSpatialModel"
pars["sigma"] = d["Size"]
errs["sigma"] = d["Size_Err"]
elif spatial_type in {"2-Gaussian", "3-Gaussian"}:
raise ValueError("For Gaussian or Multi-Gaussian models, use sky_model()!")
elif spatial_type == "Shell":
# HGPS contains no information on shell width
# Here we assume a 5% shell width for all shells.
tag = "ShellSpatialModel"
pars["radius"] = 0.95 * d["Size"]
pars["width"] = d["Size"] - pars["radius"]
errs["radius"] = d["Size_Err"]
else:
raise ValueError(f"Invalid spatial_type: {spatial_type}")
model = Model.create(tag, **pars)
model.parameters.set_error(**errs)
return model
[docs] def sky_model(self, which="best"):
"""Source sky model.
Parameters
----------
which : {'best', 'pl', 'ecpl'}
Which spectral model
Returns
-------
sky_model : `~gammapy.modeling.models.SkyModel`
Sky model of the catalog object.
"""
spatial_type = self.data["Spatial_Model"]
if spatial_type in {"2-Gaussian", "3-Gaussian"}:
models = []
for component in self.components:
spectral_model = self.spectral_model(which=which)
weight = component.data["Flux_Map"] / self.data["Flux_Map"]
spectral_model.parameters["amplitude"].value *= weight
model = SkyModel(
spatial_model=component.spatial_model(),
spectral_model=spectral_model,
name=component.name,
)
models.append(model)
return Models(models)
else:
return SkyModel(
spatial_model=self.spatial_model(),
spectral_model=self.spectral_model(which=which),
name=self.name,
)
@property
def flux_points(self):
"""Flux points (`~gammapy.spectrum.FluxPoints`)."""
table = Table()
table.meta["SED_TYPE"] = "dnde"
mask = ~np.isnan(self.data["Flux_Points_Energy"])
table["e_ref"] = self.data["Flux_Points_Energy"][mask]
table["e_min"] = self.data["Flux_Points_Energy_Min"][mask]
table["e_max"] = self.data["Flux_Points_Energy_Max"][mask]
table["dnde"] = self.data["Flux_Points_Flux"][mask]
table["dnde_errn"] = self.data["Flux_Points_Flux_Err_Lo"][mask]
table["dnde_errp"] = self.data["Flux_Points_Flux_Err_Hi"][mask]
table["dnde_ul"] = self.data["Flux_Points_Flux_UL"][mask]
table["is_ul"] = self.data["Flux_Points_Flux_Is_UL"][mask].astype("bool")
return FluxPoints(table)
[docs]class SourceCatalogHGPS(SourceCatalog):
"""HESS Galactic plane survey (HGPS) source catalog.
Reference: https://www.mpi-hd.mpg.de/hfm/HESS/hgps/
One source is represented by `SourceCatalogObjectHGPS`.
Examples
--------
Let's assume you have downloaded the HGPS catalog FITS file, e.g. via:
.. code-block:: bash
curl -O https://www.mpi-hd.mpg.de/hfm/HESS/hgps/data/hgps_catalog_v1.fits.gz
Then you can load it up like this:
>>> from gammapy.catalog import SourceCatalogHGPS
>>> filename = 'hgps_catalog_v1.fits.gz'
>>> cat = SourceCatalogHGPS(filename)
Access a source by name:
>>> source = cat['HESS J1843-033']
>>> print(source)
Access source spectral data and plot it:
>>> source.spectral_model().plot(source.energy_range)
>>> source.spectral_model().plot_error(source.energy_range)
>>> source.flux_points.plot()
Gaussian component information can be accessed as well,
either via the source, or via the catalog:
>>> source.components
>>> cat.gaussian_component(83)
"""
name = "hgps"
"""Source catalog name (str)."""
description = "H.E.S.S. Galactic plane survey (HGPS) source catalog"
"""Source catalog description (str)."""
source_object_class = SourceCatalogObjectHGPS
def __init__(
self,
filename="$GAMMAPY_DATA/catalogs/hgps_catalog_v1.fits.gz",
hdu="HGPS_SOURCES",
):
filename = make_path(filename)
table = Table.read(filename, hdu=hdu)
source_name_alias = ("Identified_Object",)
super().__init__(table=table, source_name_alias=source_name_alias)
self._table_components = Table.read(filename, hdu="HGPS_GAUSS_COMPONENTS")
self._table_associations = Table.read(filename, hdu="HGPS_ASSOCIATIONS")
self._table_associations["Separation"].format = ".6f"
self._table_identifications = Table.read(filename, hdu="HGPS_IDENTIFICATIONS")
self._table_large_scale_component = Table.read(
filename, hdu="HGPS_LARGE_SCALE_COMPONENT"
)
@property
def table_components(self):
"""Gaussian component table (`~astropy.table.Table`)"""
return self._table_components
@property
def table_associations(self):
"""Source association table (`~astropy.table.Table`)"""
return self._table_associations
@property
def table_identifications(self):
"""Source identification table (`~astropy.table.Table`)"""
return self._table_identifications
@property
def table_large_scale_component(self):
"""Large scale component table (`~astropy.table.Table`)"""
return self._table_large_scale_component
@property
def large_scale_component(self):
"""Large scale component model (`~gammapy.catalog.SourceCatalogLargeScaleHGPS`)."""
return SourceCatalogLargeScaleHGPS(self.table_large_scale_component)
def _make_source_object(self, index):
"""Make `SourceCatalogObject` for given row index"""
source = super()._make_source_object(index)
if source.data["Components"] != "":
source.components = list(self._get_gaussian_components(source))
self._attach_association_info(source)
if source.data["Source_Class"] != "Unid":
self._attach_identification_info(source)
return source
def _get_gaussian_components(self, source):
for name in source.data["Components"].split(", "):
row_index = int(name.split()[-1]) - 1
yield self.gaussian_component(row_index)
def _attach_association_info(self, source):
t = self.table_associations
mask = source.data["Source_Name"] == t["Source_Name"]
source.associations = t[mask]
def _attach_identification_info(self, source):
t = self._table_identifications
idx = np.nonzero(source.name == t["Source_Name"])[0][0]
source.identification_info = table_row_to_dict(t[idx])
[docs] def gaussian_component(self, row_idx):
"""Gaussian component (`SourceCatalogObjectHGPSComponent`)."""
data = table_row_to_dict(self.table_components[row_idx])
data[SourceCatalogObject._row_index_key] = row_idx
return SourceCatalogObjectHGPSComponent(data=data)
[docs]class SourceCatalogLargeScaleHGPS:
"""Gaussian band model.
This 2-dimensional model is Gaussian in ``y`` for a given ``x``,
and the Gaussian parameters can vary in ``x``.
One application of this model is the diffuse emission along the
Galactic plane, i.e. ``x = GLON`` and ``y = GLAT``.
Parameters
----------
table : `~astropy.table.Table`
Table of Gaussian parameters.
``x``, ``amplitude``, ``mean``, ``stddev``.
interp_kwargs : dict
Keyword arguments passed to `ScaledRegularGridInterpolator`
"""
def __init__(self, table, interp_kwargs=None):
interp_kwargs = interp_kwargs or {}
interp_kwargs.setdefault("values_scale", "lin")
self.table = table
glon = Angle(self.table["GLON"]).wrap_at("180d")
interps = {}
for column in table.colnames:
values = self.table[column].quantity
interp = ScaledRegularGridInterpolator((glon,), values, **interp_kwargs)
interps[column] = interp
self._interp = interps
def _interpolate_parameter(self, parname, glon):
glon = glon.wrap_at("180d")
return self._interp[parname]((np.asanyarray(glon),), clip=False)
[docs] def peak_brightness(self, glon):
"""Peak brightness at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Surface_Brightness", glon)
[docs] def peak_brightness_error(self, glon):
"""Peak brightness error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Surface_Brightness_Err", glon)
[docs] def width(self, glon):
"""Width at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Width", glon)
[docs] def width_error(self, glon):
"""Width error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("Width_Err", glon)
[docs] def peak_latitude(self, glon):
"""Peak position at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("GLAT", glon)
[docs] def peak_latitude_error(self, glon):
"""Peak position error at a given longitude.
Parameters
----------
glon : `~astropy.coordinates.Angle`
Galactic Longitude.
"""
return self._interpolate_parameter("GLAT_Err", glon)
[docs] def evaluate(self, position):
"""Evaluate model at a given position.
Parameters
----------
position : `~astropy.coordinates.SkyCoord`
Position on the sky.
"""
glon, glat = position.galactic.l, position.galactic.b
width = self.width(glon)
amplitude = self.peak_brightness(glon)
mean = self.peak_latitude(glon)
return Gaussian1D.evaluate(glat, amplitude=amplitude, mean=mean, stddev=width)