Source code for gammapy.catalog.hess

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""HESS Galactic plane survey (HGPS) catalog."""
from collections import OrderedDict
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.coordinates import Angle
from astropy.modeling.models import Gaussian1D
from ..utils.scripts import make_path
from ..utils.table import table_row_to_dict
from ..utils.interpolation import ScaledRegularGridInterpolator
from ..spectrum import FluxPoints
from ..spectrum.models import PowerLaw, ExponentialCutoffPowerLaw
from ..image.models import SkyPointSource, SkyGaussian, SkyShell
from ..cube.models import SkyModel, SkyModels
from .core import SourceCatalog, SourceCatalogObject

__all__ = [
    "SourceCatalogHGPS",
    "SourceCatalogObjectHGPS",
    "SourceCatalogObjectHGPSComponent",
    "SourceCatalogLargeScaleHGPS",
]

# Flux factor, used for printing
FF = 1e-12

# Multiplicative factor to go from cm^-2 s^-1 to % Crab for integral flux > 1 TeV
# Here we use the same Crab reference that's used in the HGPS paper
# CRAB = crab_integral_flux(energy_min=1, reference='hess_ecpl')
FLUX_TO_CRAB = 100 / 2.26e-11
FLUX_TO_CRAB_DIFF = 100 / 3.5060459323111307e-11


[docs]class SourceCatalogObjectHGPSComponent: """One Gaussian component from the HGPS catalog. See Also -------- SourceCatalogHGPS, SourceCatalogObjectHGPS """ _source_name_key = "Component_ID" _source_index_key = "row_index" def __init__(self, data): self.data = OrderedDict(data) def __repr__(self): return "{}({!r})".format(self.__class__.__name__, self.name) def __str__(self): """Pretty-print source data""" d = self.data ss = "Component {}:\n".format(d["Component_ID"]) fmt = "{:<20s} : {:8.3f} +/- {:.3f} deg\n" ss += fmt.format("GLON", d["GLON"].value, d["GLON_Err"].value) ss += fmt.format("GLAT", d["GLAT"].value, d["GLAT_Err"].value) fmt = "{:<20s} : {:.3f} +/- {:.3f} deg\n" ss += fmt.format("Size", d["Size"].value, d["Size_Err"].value) val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value fmt = "{:<20s} : ({:.2f} +/- {:.2f}) x 10^-12 cm^-2 s^-1 = ({:.1f} +/- {:.1f}) % Crab" ss += fmt.format( "Flux (>1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB ) return ss @property def name(self): """Source name (str)""" name = self.data[self._source_name_key] return name.strip() @property def index(self): """Row index of source in table (int)""" return self.data[self._source_index_key] @property def spatial_model(self): """Component spatial model (`~gammapy.image.models.SkyGaussian`).""" d = self.data model = SkyGaussian(lon_0=d["GLON"], lat_0=d["GLAT"], sigma=d["Size"]) model.parameters.set_parameter_errors( dict(lon_0=d["GLON_Err"], lat_0=d["GLAT_Err"], sigma=d["Size_Err"]) ) return model
[docs]class SourceCatalogObjectHGPS(SourceCatalogObject): """One object from the HGPS catalog. The catalog is represented by `SourceCatalogHGPS`. Examples are given there. """ def __repr__(self): return "{}({!r})".format(self.__class__.__name__, self.name) def __str__(self): return self.info()
[docs] def info(self, info="all"): """Info string. Parameters ---------- info : {'all', 'basic', 'map', 'spec', 'flux_points', 'components', 'associations', 'id'} Comma separated list of options """ if info == "all": info = "basic,associations,id,map,spec,flux_points,components" ss = "" ops = info.split(",") if "basic" in ops: ss += self._info_basic() if "map" in ops: ss += self._info_map() if "spec" in ops: ss += self._info_spec() if "flux_points" in ops: ss += self._info_flux_points() if "components" in ops and hasattr(self, "components"): ss += self._info_components() if "associations" in ops: ss += self._info_associations() if "id" in ops and hasattr(self, "identification_info"): ss += self._info_id() return ss
def _info_basic(self): """Print basic info.""" d = self.data ss = "\n*** Basic info ***\n\n" ss += "Catalog row index (zero-based) : {}\n".format(d["catalog_row_index"]) ss += "{:<20s} : {}\n".format("Source name", d["Source_Name"]) ss += "{:<20s} : {}\n".format("Analysis reference", d["Analysis_Reference"]) ss += "{:<20s} : {}\n".format("Source class", d["Source_Class"]) ss += "{:<20s} : {}\n".format("Identified object", d["Identified_Object"]) ss += "{:<20s} : {}\n".format("Gamma-Cat id", d["Gamma_Cat_Source_ID"]) ss += "\n" return ss def _info_associations(self): ss = "\n*** Source associations info ***\n\n" lines = self.associations.pformat(max_width=-1, max_lines=-1) ss += "\n".join(lines) return ss + "\n" def _info_id(self): ss = "\n*** Source identification info ***\n\n" ss += "\n".join( "{}: {}".format(k, v) for k, v in self.identification_info.items() ) return ss + "\n" def _info_map(self): """Print info from map analysis.""" d = self.data ss = "\n*** Info from map analysis ***\n\n" ra_str = Angle(d["RAJ2000"], "deg").to_string(unit="hour", precision=0) dec_str = Angle(d["DEJ2000"], "deg").to_string(unit="deg", precision=0) ss += "{:<20s} : {:8.3f} = {}\n".format("RA", d["RAJ2000"], ra_str) ss += "{:<20s} : {:8.3f} = {}\n".format("DEC", d["DEJ2000"], dec_str) ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format( "GLON", d["GLON"].value, d["GLON_Err"].value ) ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format( "GLAT", d["GLAT"].value, d["GLAT_Err"].value ) ss += "{:<20s} : {:.3f}\n".format("Position Error (68%)", d["Pos_Err_68"]) ss += "{:<20s} : {:.3f}\n".format("Position Error (95%)", d["Pos_Err_95"]) ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_Number"]) ss += "{:<20s} : {}\n".format("Spatial model", d["Spatial_Model"]) ss += "{:<20s} : {}\n".format("Spatial components", d["Components"]) ss += "{:<20s} : {:.1f}\n".format("TS", d["Sqrt_TS"] ** 2) ss += "{:<20s} : {:.1f}\n".format("sqrt(TS)", d["Sqrt_TS"]) ss += "{:<20s} : {:.3f} +/- {:.3f} (UL: {:.3f}) deg\n".format( "Size", d["Size"].value, d["Size_Err"].value, d["Size_UL"].value ) ss += "{:<20s} : {:.3f}\n".format("R70", d["R70"]) ss += "{:<20s} : {:.3f}\n".format("RSpec", d["RSpec"]) ss += "{:<20s} : {:.1f}\n".format("Total model excess", d["Excess_Model_Total"]) ss += "{:<20s} : {:.1f}\n".format("Excess in RSpec", d["Excess_RSpec"]) ss += "{:<20s} : {:.1f}\n".format( "Model Excess in RSpec", d["Excess_RSpec_Model"] ) ss += "{:<20s} : {:.1f}\n".format("Background in RSpec", d["Background_RSpec"]) ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime"].value) ss += "{:<20s} : {:.2f}\n".format("Energy threshold", d["Energy_Threshold"]) val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "Source flux (>1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) ss += "\nFluxes in RSpec (> 1 TeV):\n" ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Map measurement", d["Flux_Map_RSpec_Data"].value / FF, d["Flux_Map_RSpec_Data"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Source model", d["Flux_Map_RSpec_Source"].value / FF, d["Flux_Map_RSpec_Source"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Other component model", d["Flux_Map_RSpec_Other"].value / FF, d["Flux_Map_RSpec_Other"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Large scale component model", d["Flux_Map_RSpec_LS"].value / FF, d["Flux_Map_RSpec_LS"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Total model", d["Flux_Map_RSpec_Total"].value / FF, d["Flux_Map_RSpec_Total"].value * FLUX_TO_CRAB, ) ss += "{:<35s} : {:5.1f} %\n".format( "Containment in RSpec", 100 * d["Containment_RSpec"] ) ss += "{:<35s} : {:5.1f} %\n".format( "Contamination in RSpec", 100 * d["Contamination_RSpec"] ) label, val = ( "Flux correction (RSpec -> Total)", 100 * d["Flux_Correction_RSpec_To_Total"], ) ss += "{:<35s} : {:5.1f} %\n".format(label, val) label, val = ( "Flux correction (Total -> RSpec)", 100 * (1 / d["Flux_Correction_RSpec_To_Total"]), ) ss += "{:<35s} : {:5.1f} %\n".format(label, val) return ss def _info_spec(self): """Print info from spectral analysis.""" d = self.data ss = "\n*** Info from spectral analysis ***\n\n" ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime_Spec"].value) lo = d["Energy_Range_Spec_Min"].value hi = d["Energy_Range_Spec_Max"].value ss += "{:<20s} : {:.2f} to {:.2f} TeV\n".format("Energy range:", lo, hi) ss += "{:<20s} : {:.1f}\n".format("Background", d["Background_Spec"]) ss += "{:<20s} : {:.1f}\n".format("Excess", d["Excess_Spec"]) ss += "{:<20s} : {}\n".format("Spectral model", d["Spectral_Model"]) val = d["TS_ECPL_over_PL"] ss += "{:<20s} : {:.1f}\n".format("TS ECPL over PL", val) val = d["Flux_Spec_Int_1TeV"].value err = d["Flux_Spec_Int_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "Best-fit model flux(> 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) val = d["Flux_Spec_Energy_1_10_TeV"].value err = d["Flux_Spec_Energy_1_10_TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 erg cm^-2 s^-1\n".format( "Best-fit model energy flux(1 to 10 TeV)", val / FF, err / FF ) ss += self._info_spec_pl() ss += self._info_spec_ecpl() return ss def _info_spec_pl(self): d = self.data ss = "{:<20s} : {:.2f}\n".format("Pivot energy", d["Energy_Spec_PL_Pivot"]) val = d["Flux_Spec_PL_Diff_Pivot"].value err = d["Flux_Spec_PL_Diff_Pivot_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "Flux at pivot energy", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF, ) val = d["Flux_Spec_PL_Int_1TeV"].value err = d["Flux_Spec_PL_Int_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "PL Flux(> 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) val = d["Flux_Spec_PL_Diff_1TeV"].value err = d["Flux_Spec_PL_Diff_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "PL Flux(@ 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF, ) val = d["Index_Spec_PL"] err = d["Index_Spec_PL_Err"] ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("PL Index", val, err) return ss def _info_spec_ecpl(self): d = self.data ss = "" # ss = '{:<20s} : {:.1f}\n'.format('Pivot energy', d['Energy_Spec_ECPL_Pivot']) val = d["Flux_Spec_ECPL_Diff_1TeV"].value err = d["Flux_Spec_ECPL_Diff_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "ECPL Flux(@ 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF, ) val = d["Flux_Spec_ECPL_Int_1TeV"].value err = d["Flux_Spec_ECPL_Int_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "ECPL Flux(> 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) val = d["Index_Spec_ECPL"] err = d["Index_Spec_ECPL_Err"] ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("ECPL Index", val, err) val = d["Lambda_Spec_ECPL"].value err = d["Lambda_Spec_ECPL_Err"].value ss += "{:<20s} : {:.3f} +/- {:.3f} TeV^-1\n".format("ECPL Lambda", val, err) # Use Gaussian analytical error propagation, # tested against the uncertainties package err = err / val ** 2 val = 1.0 / val ss += "{:<20s} : {:.2f} +/- {:.2f} TeV\n".format("ECPL E_cut", val, err) return ss def _info_flux_points(self): """Print flux point results""" d = self.data ss = "\n*** Flux points info ***\n\n" ss += "Number of flux points: {}\n".format(d["N_Flux_Points"]) ss += "Flux points table: \n\n" lines = self.flux_points.table_formatted.pformat(max_width=-1, max_lines=-1) ss += "\n".join(lines) return ss + "\n" def _info_components(self): """Print info about the components.""" ss = "\n*** Gaussian component info ***\n\n" ss += "Number of components: {}\n".format(len(self.components)) ss += "{:<20s} : {}\n\n".format("Spatial components", self.data["Components"]) for component in self.components: ss += str(component) ss += "\n\n" return ss @property def energy_range(self): """Spectral model energy range (`~astropy.units.Quantity` with length 2).""" emin, emax = ( self.data["Energy_Range_Spec_Min"], self.data["Energy_Range_Spec_Max"], ) if np.isnan(emin): emin = u.Quantity(0.2, "TeV") if np.isnan(emax): emax = u.Quantity(50, "TeV") return u.Quantity([emin, emax], "TeV") @property def spectral_model_type(self): """Spectral model type (str). One of: 'pl', 'ecpl' """ return self.data["Spectral_Model"].strip().lower()
[docs] def spectral_model(self, which="best"): """Spectral model (`~gammapy.spectrum.models.SpectralModel`). One of the following models (given by ``Spectral_Model`` in the catalog): - ``PL`` : `~gammapy.spectrum.models.PowerLaw` - ``ECPL`` : `~gammapy.spectrum.models.ExponentialCutoffPowerLaw` Parameters ---------- which : {'best', 'pl', 'ecpl'} Which spectral model """ data = self.data if which == "best": spec_type = self.spectral_model_type elif which in {"pl", "ecpl"}: spec_type = which else: raise ValueError("Invalid selection: which = {!r}".format(which)) pars, errs = {}, {} if spec_type == "pl": pars["index"] = data["Index_Spec_PL"] pars["amplitude"] = data["Flux_Spec_PL_Diff_Pivot"] pars["reference"] = data["Energy_Spec_PL_Pivot"] errs["amplitude"] = data["Flux_Spec_PL_Diff_Pivot_Err"] errs["index"] = data["Index_Spec_PL_Err"] model = PowerLaw(**pars) elif spec_type == "ecpl": pars["index"] = data["Index_Spec_ECPL"] pars["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot"] pars["reference"] = data["Energy_Spec_ECPL_Pivot"] pars["lambda_"] = data["Lambda_Spec_ECPL"] errs["index"] = data["Index_Spec_ECPL_Err"] errs["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot_Err"] errs["lambda_"] = data["Lambda_Spec_ECPL_Err"] model = ExponentialCutoffPowerLaw(**pars) else: raise ValueError("Invalid spectral model: {}".format(spec_type)) model.parameters.set_parameter_errors(errs) return model
@property def spatial_model_type(self): """Spatial model type (str). One of: 'point-like', 'shell', 'gaussian', '2-gaussian', '3-gaussian' """ return self.data["Spatial_Model"].strip().lower() @property def is_pointlike(self): """Source is pointlike? (bool)""" d = self.data has_size_ul = np.isfinite(d["Size_UL"]) pointlike = d["Spatial_Model"] == "Point-Like" return pointlike or has_size_ul @property def spatial_model(self): """Spatial model (`~gammapy.image.models.SkySpatialModel`). One of the following models (given by ``Spatial_Model`` in the catalog): - ``Point-Like`` or has a size upper limit : `~gammapy.image.models.SkyPointSource` - ``Gaussian``: `~gammapy.image.models.SkyGaussian` - ``2-Gaussian`` or ``3-Gaussian``: composite model (using ``+`` with Gaussians) - ``Shell``: `~gammapy.image.models.SkyShell` """ d = self.data glon = d["GLON"] glat = d["GLAT"] spatial_type = self.spatial_model_type if self.is_pointlike: model = SkyPointSource(lon_0=glon, lat_0=glat) elif spatial_type == "gaussian": model = SkyGaussian(lon_0=glon, lat_0=glat, sigma=d["Size"]) elif spatial_type in {"2-gaussian", "3-gaussian"}: raise ValueError("For Gaussian or Multi-Gaussian models, use sky_model()!") elif spatial_type == "shell": # HGPS contains no information on shell width # Here we assume a 5% shell width for all shells. r_out = d["Size"] radius = 0.95 * r_out width = r_out - radius model = SkyShell(lon_0=glon, lat_0=glat, width=width, radius=radius) else: raise ValueError("Not a valid spatial model: {}".format(spatial_type)) return model
[docs] def sky_model(self, which="best"): """Source sky model. Parameters ---------- which : {'best', 'pl', 'ecpl'} Which spectral model Returns ------- sky_model : `~gammapy.cube.models.SkyModel` Sky model of the catalog object. """ if self.spatial_model_type in {"2-gaussian", "3-gaussian"}: models = [] spectral_model = self.spectral_model(which=which) for component in self.components: weight = component.data["Flux_Map"] / self.data["Flux_Map"] spectral_model_comp = spectral_model.copy() # weight amplitude of the component spectral_model_comp.parameters["amplitude"].value *= weight models.append( SkyModel( component.spatial_model, spectral_model_comp, name=component.name, ) ) return SkyModels(models) else: spatial_model = self.spatial_model spectral_model = self.spectral_model(which=which) return SkyModel(spatial_model, spectral_model, name=self.name)
@property def flux_points(self): """Flux points (`~gammapy.spectrum.FluxPoints`).""" table = Table() table.meta["SED_TYPE"] = "dnde" mask = ~np.isnan(self.data["Flux_Points_Energy"]) table["e_ref"] = self.data["Flux_Points_Energy"][mask] table["e_min"] = self.data["Flux_Points_Energy_Min"][mask] table["e_max"] = self.data["Flux_Points_Energy_Max"][mask] table["dnde"] = self.data["Flux_Points_Flux"][mask] table["dnde_errn"] = self.data["Flux_Points_Flux_Err_Lo"][mask] table["dnde_errp"] = self.data["Flux_Points_Flux_Err_Hi"][mask] table["dnde_ul"] = self.data["Flux_Points_Flux_UL"][mask] table["is_ul"] = self.data["Flux_Points_Flux_Is_UL"][mask].astype("bool") return FluxPoints(table)
[docs]class SourceCatalogHGPS(SourceCatalog): """HESS Galactic plane survey (HGPS) source catalog. Reference: https://www.mpi-hd.mpg.de/hfm/HESS/hgps/ One source is represented by `SourceCatalogObjectHGPS`. An extensive tutorial is available here: :gp-notebook:`hgps` Examples -------- Let's assume you have downloaded the HGPS catalog FITS file, e.g. via: .. code-block:: bash curl -O https://www.mpi-hd.mpg.de/hfm/HESS/hgps/data/hgps_catalog_v1.fits.gz Then you can load it up like this: >>> from gammapy.catalog import SourceCatalogHGPS >>> filename = 'hgps_catalog_v1.fits.gz' >>> cat = SourceCatalogHGPS(filename) Access a source by name: >>> source = cat['HESS J1843-033'] >>> print(source) Access source spectral data and plot it: >>> source.spectral_model().plot(source.energy_range) >>> source.spectral_model().plot_error(source.energy_range) >>> source.flux_points.plot() Gaussian component information can be accessed as well, either via the source, or via the catalog: >>> source.components >>> cat.gaussian_component(83) More examples here: :gp-notebook:`hgps` """ name = "hgps" """Source catalog name (str).""" description = "H.E.S.S. Galactic plane survey (HGPS) source catalog" """Source catalog description (str).""" source_object_class = SourceCatalogObjectHGPS def __init__(self, filename=None, hdu="HGPS_SOURCES"): if not filename: filename = "$GAMMAPY_DATA/catalogs/hgps_catalog_v1.fits.gz" filename = str(make_path(filename)) table = Table.read(filename, hdu=hdu) source_name_alias = ("Identified_Object",) super().__init__(table=table, source_name_alias=source_name_alias) self._table_components = Table.read(filename, hdu="HGPS_GAUSS_COMPONENTS") self._table_associations = Table.read(filename, hdu="HGPS_ASSOCIATIONS") self._table_identifications = Table.read(filename, hdu="HGPS_IDENTIFICATIONS") self._table_large_scale_component = Table.read( filename, hdu="HGPS_LARGE_SCALE_COMPONENT" ) @property def table_components(self): """Gaussian component table (`~astropy.table.Table`)""" return self._table_components @property def table_associations(self): """Source association table (`~astropy.table.Table`)""" return self._table_associations @property def table_identifications(self): """Source identification table (`~astropy.table.Table`)""" return self._table_identifications @property def table_large_scale_component(self): """Large scale component table (`~astropy.table.Table`)""" return self._table_large_scale_component @property def large_scale_component(self): """Large scale component model (`~gammapy.catalog.SourceCatalogLargeScaleHGPS`).""" return SourceCatalogLargeScaleHGPS(self.table_large_scale_component) def _make_source_object(self, index): """Make `SourceCatalogObject` for given row index""" source = super()._make_source_object(index) if source.data["Components"] != "": self._attach_component_info(source) self._attach_association_info(source) if source.data["Source_Class"] != "Unid": self._attach_identification_info(source) return source def _attach_component_info(self, source): source.components = [] lookup = SourceCatalog(self.table_components, source_name_key="Component_ID") for name in source.data["Components"].split(", "): component = SourceCatalogObjectHGPSComponent(data=lookup[name].data) source.components.append(component) def _attach_association_info(self, source): t = self.table_associations mask = source.data["Source_Name"] == t["Source_Name"] source.associations = t[mask] def _attach_identification_info(self, source): t = self._table_identifications idx = np.nonzero(source.name == t["Source_Name"])[0][0] source.identification_info = table_row_to_dict(t[idx])
[docs] def gaussian_component(self, row_idx): """Gaussian component (`SourceCatalogObjectHGPSComponent`).""" data = table_row_to_dict(self.table_components[row_idx]) data["row_index"] = row_idx return SourceCatalogObjectHGPSComponent(data=data)
[docs]class SourceCatalogLargeScaleHGPS: """Gaussian band model. This 2-dimensional model is Gaussian in ``y`` for a given ``x``, and the Gaussian parameters can vary in ``x``. One application of this model is the diffuse emission along the Galactic plane, i.e. ``x = GLON`` and ``y = GLAT``. Parameters ---------- table : `~astropy.table.Table` Table of Gaussian parameters. ``x``, ``amplitude``, ``mean``, ``stddev``. interp_kwargs : dict Keyword arguments passed to `ScaledRegularGridInterpolator` """ def __init__(self, table, interp_kwargs=None): interp_kwargs = interp_kwargs or {} interp_kwargs.setdefault("values_scale", "lin") self.table = table glon = Angle(self.table["GLON"]).wrap_at("180d") interps = {} for column in table.colnames: values = self.table[column].quantity interp = ScaledRegularGridInterpolator((glon,), values, **interp_kwargs) interps[column] = interp self._interp = interps def _interpolate_parameter(self, parname, glon): glon = glon.wrap_at("180d") return self._interp[parname]((np.asanyarray(glon),), clip=False)
[docs] def peak_brightness(self, glon): """Peak brightness at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` Galactic Longitude. """ return self._interpolate_parameter("Surface_Brightness", glon)
[docs] def peak_brightness_error(self, glon): """Peak brightness error at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("Surface_Brightness_Err", glon)
[docs] def width(self, glon): """Width at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("Width", glon)
[docs] def width_error(self, glon): """Width error at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("Width_Err", glon)
[docs] def peak_latitude(self, glon): """Peak position at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("GLAT", glon)
[docs] def peak_latitude_error(self, glon): """Peak position error at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("GLAT_Err", glon)
[docs] def evaluate(self, position): """Evaluate model at a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position on the sky. """ glon, glat = position.galactic.l, position.galactic.b width = self.width(glon) amplitude = self.peak_brightness(glon) mean = self.peak_latitude(glon) return Gaussian1D.evaluate(glat, amplitude=amplitude, mean=mean, stddev=width)