Source code for gammapy.catalog.hess

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""HESS Galactic plane survey (HGPS) catalog."""
from __future__ import absolute_import, division, print_function, unicode_literals
import os
from collections import OrderedDict
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.coordinates import Angle
from astropy.modeling.models import Gaussian1D
from ..extern.pathlib import Path
from ..utils.scripts import make_path
from ..utils.table import table_row_to_dict
from ..utils.interpolation import ScaledRegularGridInterpolator
from ..spectrum import FluxPoints
from ..spectrum.models import PowerLaw, PowerLaw2, ExponentialCutoffPowerLaw
from ..image.models import SkyPointSource, SkyGaussian, SkyShell
from ..cube.models import SkyModel, SkyModels
from .core import SourceCatalog, SourceCatalogObject

__all__ = [
    "SourceCatalogHGPS",
    "SourceCatalogObjectHGPS",
    "SourceCatalogObjectHGPSComponent",
    "SourceCatalogLargeScaleHGPS",
]

# Flux factor, used for printing
FF = 1e-12

# Multiplicative factor to go from cm^-2 s^-1 to % Crab for integral flux > 1 TeV
# Here we use the same Crab reference that's used in the HGPS paper
# CRAB = crab_integral_flux(energy_min=1, reference='hess_ecpl')
FLUX_TO_CRAB = 100 / 2.26e-11
FLUX_TO_CRAB_DIFF = 100 / 3.5060459323111307e-11


[docs]class SourceCatalogObjectHGPSComponent(object): """One Gaussian component from the HGPS catalog. See also -------- SourceCatalogHGPS, SourceCatalogObjectHGPS """ _source_name_key = "Component_ID" _source_index_key = "row_index" def __init__(self, data): self.data = OrderedDict(data) def __repr__(self): return "{}({!r})".format(self.__class__.__name__, self.name) def __str__(self): """Pretty-print source data""" d = self.data ss = "Component {}:\n".format(d["Component_ID"]) fmt = "{:<20s} : {:8.3f} +/- {:.3f} deg\n" ss += fmt.format("GLON", d["GLON"].value, d["GLON_Err"].value) ss += fmt.format("GLAT", d["GLAT"].value, d["GLAT_Err"].value) fmt = "{:<20s} : {:.3f} +/- {:.3f} deg\n" ss += fmt.format("Size", d["Size"].value, d["Size_Err"].value) val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value fmt = "{:<20s} : ({:.2f} +/- {:.2f}) x 10^-12 cm^-2 s^-1 = ({:.1f} +/- {:.1f}) % Crab" ss += fmt.format( "Flux (>1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB ) return ss @property def name(self): """Source name (str)""" name = self.data[self._source_name_key] return name.strip() @property def index(self): """Row index of source in table (int)""" return self.data[self._source_index_key] @property def spatial_model(self): """Component spatial model (`~gammapy.image.models.SkyGaussian`).""" d = self.data model = SkyGaussian(lon_0=d["GLON"], lat_0=d["GLAT"], sigma=d["Size"]) model.parameters.set_parameter_errors( dict(lon_0=d["GLON_Err"], lat_0=d["GLAT_Err"], sigma=d["Size_Err"]) ) return model @property def spectral_model(self): """Component spectral model (`gammapy.spectrum.models.PowerLaw2`).""" d = self.data model = PowerLaw2( amplitude=d["Flux_Map"], index=2.3, emin="1 TeV", emax="1e5 TeV" ) model.parameters.set_parameter_errors(dict(amplitude=d["Flux_Map_Err"])) return model @property def sky_model(self): """Component sky model (`gammapy.cube.models.SkyModel`).""" return SkyModel(self.spatial_model, self.spectral_model)
[docs]class SourceCatalogObjectHGPS(SourceCatalogObject): """One object from the HGPS catalog. The catalog is represented by `SourceCatalogHGPS`. Examples are given there. """ def __repr__(self): return "{}({!r})".format(self.__class__.__name__, self.name) def __str__(self): return self.info()
[docs] def info(self, info="all"): """Info string. Parameters ---------- info : {'all', 'basic', 'map', 'spec', 'flux_points', 'components', 'associations', 'id'} Comma separated list of options """ if info == "all": info = "basic,associations,id,map,spec,flux_points,components" ss = "" ops = info.split(",") if "basic" in ops: ss += self._info_basic() if "map" in ops: ss += self._info_map() if "spec" in ops: ss += self._info_spec() if "flux_points" in ops: ss += self._info_flux_points() if "components" in ops and hasattr(self, "components"): ss += self._info_components() if "associations" in ops: ss += self._info_associations() if "id" in ops and hasattr(self, "identification_info"): ss += self._info_id() return ss
def _info_basic(self): """Print basic info.""" d = self.data ss = "\n*** Basic info ***\n\n" ss += "Catalog row index (zero-based) : {}\n".format(d["catalog_row_index"]) ss += "{:<20s} : {}\n".format("Source name", d["Source_Name"]) ss += "{:<20s} : {}\n".format("Analysis reference", d["Analysis_Reference"]) ss += "{:<20s} : {}\n".format("Source class", d["Source_Class"]) ss += "{:<20s} : {}\n".format("Identified object", d["Identified_Object"]) ss += "{:<20s} : {}\n".format("Gamma-Cat id", d["Gamma_Cat_Source_ID"]) ss += "\n" return ss def _info_associations(self): ss = "\n*** Source associations info ***\n\n" lines = self.associations.pformat(max_width=-1, max_lines=-1) ss += "\n".join(lines) return ss + "\n" def _info_id(self): ss = "\n*** Source identification info ***\n\n" ss += "\n".join( "{}: {}".format(k, v) for k, v in self.identification_info.items() ) return ss + "\n" def _info_map(self): """Print info from map analysis.""" d = self.data ss = "\n*** Info from map analysis ***\n\n" ra_str = Angle(d["RAJ2000"], "deg").to_string(unit="hour", precision=0) dec_str = Angle(d["DEJ2000"], "deg").to_string(unit="deg", precision=0) ss += "{:<20s} : {:8.3f} = {}\n".format("RA", d["RAJ2000"], ra_str) ss += "{:<20s} : {:8.3f} = {}\n".format("DEC", d["DEJ2000"], dec_str) ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format( "GLON", d["GLON"].value, d["GLON_Err"].value ) ss += "{:<20s} : {:8.3f} +/- {:.3f} deg\n".format( "GLAT", d["GLAT"].value, d["GLAT_Err"].value ) ss += "{:<20s} : {:.3f}\n".format("Position Error (68%)", d["Pos_Err_68"]) ss += "{:<20s} : {:.3f}\n".format("Position Error (95%)", d["Pos_Err_95"]) ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_Number"]) ss += "{:<20s} : {}\n".format("Spatial model", d["Spatial_Model"]) ss += "{:<20s} : {}\n".format("Spatial components", d["Components"]) ss += "{:<20s} : {:.1f}\n".format("TS", d["Sqrt_TS"] ** 2) ss += "{:<20s} : {:.1f}\n".format("sqrt(TS)", d["Sqrt_TS"]) ss += "{:<20s} : {:.3f} +/- {:.3f} (UL: {:.3f}) deg\n".format( "Size", d["Size"].value, d["Size_Err"].value, d["Size_UL"].value ) ss += "{:<20s} : {:.3f}\n".format("R70", d["R70"]) ss += "{:<20s} : {:.3f}\n".format("RSpec", d["RSpec"]) ss += "{:<20s} : {:.1f}\n".format("Total model excess", d["Excess_Model_Total"]) ss += "{:<20s} : {:.1f}\n".format("Excess in RSpec", d["Excess_RSpec"]) ss += "{:<20s} : {:.1f}\n".format( "Model Excess in RSpec", d["Excess_RSpec_Model"] ) ss += "{:<20s} : {:.1f}\n".format("Background in RSpec", d["Background_RSpec"]) ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime"].value) ss += "{:<20s} : {:.2f}\n".format("Energy threshold", d["Energy_Threshold"]) val, err = d["Flux_Map"].value, d["Flux_Map_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "Source flux (>1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) ss += "\nFluxes in RSpec (> 1 TeV):\n" ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Map measurement", d["Flux_Map_RSpec_Data"].value / FF, d["Flux_Map_RSpec_Data"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Source model", d["Flux_Map_RSpec_Source"].value / FF, d["Flux_Map_RSpec_Source"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Other component model", d["Flux_Map_RSpec_Other"].value / FF, d["Flux_Map_RSpec_Other"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Large scale component model", d["Flux_Map_RSpec_LS"].value / FF, d["Flux_Map_RSpec_LS"].value * FLUX_TO_CRAB, ) ss += "{:<30s} : {:.3f} x 10^-12 cm^-2 s^-1 = {:5.2f} % Crab\n".format( "Total model", d["Flux_Map_RSpec_Total"].value / FF, d["Flux_Map_RSpec_Total"].value * FLUX_TO_CRAB, ) ss += "{:<35s} : {:5.1f} %\n".format( "Containment in RSpec", 100 * d["Containment_RSpec"] ) ss += "{:<35s} : {:5.1f} %\n".format( "Contamination in RSpec", 100 * d["Contamination_RSpec"] ) label, val = ( "Flux correction (RSpec -> Total)", 100 * d["Flux_Correction_RSpec_To_Total"], ) ss += "{:<35s} : {:5.1f} %\n".format(label, val) label, val = ( "Flux correction (Total -> RSpec)", 100 * (1 / d["Flux_Correction_RSpec_To_Total"]), ) ss += "{:<35s} : {:5.1f} %\n".format(label, val) return ss def _info_spec(self): """Print info from spectral analysis.""" d = self.data ss = "\n*** Info from spectral analysis ***\n\n" ss += "{:<20s} : {:.1f} hours\n".format("Livetime", d["Livetime_Spec"].value) lo = d["Energy_Range_Spec_Min"].value hi = d["Energy_Range_Spec_Max"].value ss += "{:<20s} : {:.2f} to {:.2f} TeV\n".format("Energy range:", lo, hi) ss += "{:<20s} : {:.1f}\n".format("Background", d["Background_Spec"]) ss += "{:<20s} : {:.1f}\n".format("Excess", d["Excess_Spec"]) ss += "{:<20s} : {}\n".format("Spectral model", d["Spectral_Model"]) val = d["TS_ECPL_over_PL"] ss += "{:<20s} : {:.1f}\n".format("TS ECPL over PL", val) val = d["Flux_Spec_Int_1TeV"].value err = d["Flux_Spec_Int_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "Best-fit model flux(> 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) val = d["Flux_Spec_Energy_1_10_TeV"].value err = d["Flux_Spec_Energy_1_10_TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 erg cm^-2 s^-1\n".format( "Best-fit model energy flux(1 to 10 TeV)", val / FF, err / FF ) ss += self._info_spec_pl() ss += self._info_spec_ecpl() return ss def _info_spec_pl(self): d = self.data ss = "{:<20s} : {:.2f}\n".format("Pivot energy", d["Energy_Spec_PL_Pivot"]) val = d["Flux_Spec_PL_Diff_Pivot"].value err = d["Flux_Spec_PL_Diff_Pivot_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "Flux at pivot energy", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF, ) val = d["Flux_Spec_PL_Int_1TeV"].value err = d["Flux_Spec_PL_Int_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "PL Flux(> 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) val = d["Flux_Spec_PL_Diff_1TeV"].value err = d["Flux_Spec_PL_Diff_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "PL Flux(@ 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF, ) val = d["Index_Spec_PL"] err = d["Index_Spec_PL_Err"] ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("PL Index", val, err) return ss def _info_spec_ecpl(self): d = self.data ss = "" # ss = '{:<20s} : {:.1f}\n'.format('Pivot energy', d['Energy_Spec_ECPL_Pivot']) val = d["Flux_Spec_ECPL_Diff_1TeV"].value err = d["Flux_Spec_ECPL_Diff_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 TeV^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "ECPL Flux(@ 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB_DIFF, ) val = d["Flux_Spec_ECPL_Int_1TeV"].value err = d["Flux_Spec_ECPL_Int_1TeV_Err"].value ss += "{:<20s} : ({:.3f} +/- {:.3f}) x 10^-12 cm^-2 s^-1 = ({:.2f} +/- {:.2f}) % Crab\n".format( "ECPL Flux(> 1 TeV)", val / FF, err / FF, val * FLUX_TO_CRAB, err * FLUX_TO_CRAB, ) val = d["Index_Spec_ECPL"] err = d["Index_Spec_ECPL_Err"] ss += "{:<20s} : {:.2f} +/- {:.2f}\n".format("ECPL Index", val, err) val = d["Lambda_Spec_ECPL"].value err = d["Lambda_Spec_ECPL_Err"].value ss += "{:<20s} : {:.3f} +/- {:.3f} TeV^-1\n".format("ECPL Lambda", val, err) # Use Gaussian analytical error propagation, # tested against the uncertainties package err = err / val ** 2 val = 1.0 / val ss += "{:<20s} : {:.2f} +/- {:.2f} TeV\n".format("ECPL E_cut", val, err) return ss def _info_flux_points(self): """Print flux point results""" d = self.data ss = "\n*** Flux points info ***\n\n" ss += "Number of flux points: {}\n".format(d["N_Flux_Points"]) ss += "Flux points table: \n\n" lines = self.flux_points.table_formatted.pformat(max_width=-1, max_lines=-1) ss += "\n".join(lines) return ss + "\n" def _info_components(self): """Print info about the components.""" ss = "\n*** Gaussian component info ***\n\n" ss += "Number of components: {}\n".format(len(self.components)) ss += "{:<20s} : {}\n\n".format("Spatial components", self.data["Components"]) for component in self.components: ss += str(component) ss += "\n\n" return ss @property def energy_range(self): """Spectral model energy range (`~astropy.units.Quantity` with length 2).""" emin, emax = ( self.data["Energy_Range_Spec_Min"], self.data["Energy_Range_Spec_Max"], ) if np.isnan(emin): emin = u.Quantity(0.2, "TeV") if np.isnan(emax): emax = u.Quantity(50, "TeV") return u.Quantity([emin, emax], "TeV") @property def spectral_model_type(self): """Spectral model type (str). One of: 'pl', 'ecpl' """ return self.data["Spectral_Model"].strip().lower()
[docs] def spectral_model(self, which="best"): """Spectral model (`~gammapy.spectrum.models.SpectralModel`). One of the following models (given by ``Spectral_Model`` in the catalog): - ``PL`` : `~gammapy.spectrum.models.PowerLaw` - ``ECPL`` : `~gammapy.spectrum.models.ExponentialCutoffPowerLaw` Parameters ---------- which : {'best', 'pl', 'ecpl'} Which spectral model """ data = self.data if which == "best": spec_type = self.spectral_model_type elif which in {"pl", "ecpl"}: spec_type = which else: raise ValueError("Invalid selection: which = {!r}".format(which)) pars, errs = {}, {} if spec_type == "pl": pars["index"] = data["Index_Spec_PL"] pars["amplitude"] = data["Flux_Spec_PL_Diff_Pivot"] pars["reference"] = data["Energy_Spec_PL_Pivot"] errs["amplitude"] = data["Flux_Spec_PL_Diff_Pivot_Err"] errs["index"] = data["Index_Spec_PL_Err"] model = PowerLaw(**pars) elif spec_type == "ecpl": pars["index"] = data["Index_Spec_ECPL"] pars["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot"] pars["reference"] = data["Energy_Spec_ECPL_Pivot"] pars["lambda_"] = data["Lambda_Spec_ECPL"] errs["index"] = data["Index_Spec_ECPL_Err"] errs["amplitude"] = data["Flux_Spec_ECPL_Diff_Pivot_Err"] errs["lambda_"] = data["Lambda_Spec_ECPL_Err"] model = ExponentialCutoffPowerLaw(**pars) else: raise ValueError("Invalid spectral model: {}".format(spec_type)) model.parameters.set_parameter_errors(errs) return model
@property def spatial_model_type(self): """Spatial model type (str). One of: 'point-like', 'shell', 'gaussian', '2-gaussian', '3-gaussian' """ return self.data["Spatial_Model"].strip().lower() @property def is_pointlike(self): """Source is pointlike? (bool)""" d = self.data has_size_ul = np.isfinite(d["Size_UL"]) pointlike = d["Spatial_Model"] == "Point-Like" return pointlike or has_size_ul @property def spatial_model(self): """Spatial model (`~gammapy.image.models.SkySpatialModel`). One of the following models (given by ``Spatial_Model`` in the catalog): - ``Point-Like`` or has a size upper limit : `~gammapy.image.models.SkyPointSource` - ``Gaussian``: `~gammapy.image.models.SkyGaussian` - ``2-Gaussian`` or ``3-Gaussian``: composite model (using ``+`` with Gaussians) - ``Shell``: `~gammapy.image.models.SkyShell` TODO: add parameter errors """ d = self.data glon = d["GLON"] glat = d["GLAT"] spatial_type = self.spatial_model_type if self.is_pointlike: model = SkyPointSource(lon_0=glon, lat_0=glat) elif spatial_type == "gaussian": model = SkyGaussian(lon_0=glon, lat_0=glat, sigma=d["Size"]) elif spatial_type in {"2-gaussian", "3-gaussian"}: raise ValueError("For Gaussian or Multi-Gaussian models, use sky_model()!") elif spatial_type == "shell": # HGPS contains no information on shell width # Here we assuma a 5% shell width for all shells. r_out = d["Size"] radius = 0.95 * r_out width = r_out - radius model = SkyShell(lon_0=glon, lat_0=glat, width=width, radius=radius) else: raise ValueError("Not a valid spatial model: {}".format(spatial_type)) return model @property def sky_model(self): """Source sky model (`~gammapy.cube.models.SkyModel`).""" if self.spatial_model_type in {"2-gaussian", "3-gaussian"}: models = [c.sky_model for c in self.components] return SkyModels(models) else: spatial_model = self.spatial_model # TODO: there are two spectral models # Here we only expose the default # Change sky_model into a method and re-expose option to select spectral model? spectral_model = self.spectral_model() return SkyModel(spatial_model, spectral_model) @property def flux_points(self): """Flux points (`~gammapy.spectrum.FluxPoints`).""" table = Table() table.meta["SED_TYPE"] = "dnde" mask = ~np.isnan(self.data["Flux_Points_Energy"]) table["e_ref"] = self.data["Flux_Points_Energy"][mask] table["e_min"] = self.data["Flux_Points_Energy_Min"][mask] table["e_max"] = self.data["Flux_Points_Energy_Max"][mask] table["dnde"] = self.data["Flux_Points_Flux"][mask] table["dnde_errn"] = self.data["Flux_Points_Flux_Err_Lo"][mask] table["dnde_errp"] = self.data["Flux_Points_Flux_Err_Hi"][mask] table["dnde_ul"] = self.data["Flux_Points_Flux_UL"][mask] table["is_ul"] = self.data["Flux_Points_Flux_Is_UL"][mask].astype("bool") return FluxPoints(table)
[docs]class SourceCatalogHGPS(SourceCatalog): """HESS Galactic plane survey (HGPS) source catalog. Reference: https://www.mpi-hd.mpg.de/hfm/HESS/hgps/ One source is represented by `SourceCatalogObjectHGPS`. An extensive tutorial is available here: :gp-extra-notebook:`hgps` Examples -------- Let's assume you have downloaded the HGPS catalog FITS file, e.g. via: .. code-block:: bash curl -O https://www.mpi-hd.mpg.de/hfm/HESS/hgps/data/hgps_catalog_v1.fits.gz Then you can load it up like this: >>> from gammapy.catalog import SourceCatalogHGPS >>> filename = 'hgps_catalog_v1.fits.gz' >>> cat = SourceCatalogHGPS(filename) Access a source by name: >>> source = cat['HESS J1843-033'] >>> print(source) Access source spectral data and plot it: >>> source.spectral_model().plot(source.energy_range) >>> source.spectral_model().plot_error(source.energy_range) >>> source.flux_points.plot() Gaussian component information can be accessed as well, either via the source, or via the catalog: >>> source.components >>> cat.gaussian_component(83) More examples here: :gp-extra-notebook:`hgps` """ name = "hgps" """Source catalog name (str).""" description = "H.E.S.S. Galactic plane survey (HGPS) source catalog" """Source catalog description (str).""" source_object_class = SourceCatalogObjectHGPS def __init__(self, filename=None, hdu="HGPS_SOURCES"): if not filename: filename = ( Path(os.environ["HGPS_ANALYSIS"]) / "data/catalogs/HGPS3/release/HGPS_v0.4.fits" ) filename = str(make_path(filename)) table = Table.read(filename, hdu=hdu) source_name_alias = ("Identified_Object",) super(SourceCatalogHGPS, self).__init__( table=table, source_name_alias=source_name_alias ) self._table_components = Table.read(filename, hdu="HGPS_GAUSS_COMPONENTS") self._table_associations = Table.read(filename, hdu="HGPS_ASSOCIATIONS") self._table_identifications = Table.read(filename, hdu="HGPS_IDENTIFICATIONS") self._table_large_scale_component = Table.read( filename, hdu="HGPS_LARGE_SCALE_COMPONENT" ) @property def table_components(self): """Gaussian component table (`~astropy.table.Table`)""" return self._table_components @property def table_associations(self): """Source association table (`~astropy.table.Table`)""" return self._table_associations @property def table_identifications(self): """Source identification table (`~astropy.table.Table`)""" return self._table_identifications @property def table_large_scale_component(self): """Large scale component table (`~astropy.table.Table`)""" return self._table_large_scale_component @property def large_scale_component(self): """Large scale component model (`~gammapy.catalog.SourceCatalogLargeScaleHGPS`). """ table = self.table_large_scale_component return SourceCatalogLargeScaleHGPS(table, spline_kwargs=dict(k=3, s=0, ext=0)) def _make_source_object(self, index): """Make `SourceCatalogObject` for given row index""" source = super(SourceCatalogHGPS, self)._make_source_object(index) if source.data["Components"] != "": self._attach_component_info(source) self._attach_association_info(source) if source.data["Source_Class"] != "Unid": self._attach_identification_info(source) return source def _attach_component_info(self, source): source.components = [] lookup = SourceCatalog(self.table_components, source_name_key="Component_ID") for name in source.data["Components"].split(", "): component = SourceCatalogObjectHGPSComponent(data=lookup[name].data) source.components.append(component) def _attach_association_info(self, source): t = self.table_associations mask = source.data["Source_Name"] == t["Source_Name"] source.associations = t[mask] def _attach_identification_info(self, source): t = self._table_identifications idx = np.nonzero(source.name == t["Source_Name"])[0][0] source.identification_info = table_row_to_dict(t[idx])
[docs] def gaussian_component(self, row_idx): """Gaussian component (`SourceCatalogObjectHGPSComponent`).""" data = table_row_to_dict(self.table_components[row_idx]) data["row_index"] = row_idx return SourceCatalogObjectHGPSComponent(data=data)
[docs]class SourceCatalogLargeScaleHGPS(object): """Gaussian band model. This 2-dimensional model is Gaussian in ``y`` for a given ``x``, and the Gaussian parameters can vary in ``x``. One application of this model is the diffuse emission along the Galactic plane, i.e. ``x = GLON`` and ``y = GLAT``. Parameters ---------- table : `~astropy.table.Table` Table of Gaussian parameters. ``x``, ``amplitude``, ``mean``, ``stddev``. interp_kwargs : dict Keyword arguments passed to `ScaledRegularGridInterpolator` """ def __init__(self, table, interp_kwargs=None): interp_kwargs = interp_kwargs or {} interp_kwargs.setdefault("values_scale", "lin") self.table = table glon = Angle(self.table["GLON"]).wrap_at("180d") interps = {} for column in table.colnames: values = self.table[column].quantity interp = ScaledRegularGridInterpolator( (glon.degree,), values, **interp_kwargs ) interps[column] = interp self._interp = interps def _interpolate_parameter(self, parname, glon): glon = glon.wrap_at("180d") return self._interp[parname]((np.asanyarray(glon.degree),), clip=False)
[docs] def peak_brightness(self, glon): """Peak brightness at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` Galactic Longitude. """ return self._interpolate_parameter("Surface_Brightness", glon)
[docs] def peak_brightness_error(self, glon): """Peak brightness error at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("Surface_Brightness_Err", glon)
[docs] def width(self, glon): """Width at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("Width", glon)
[docs] def width_error(self, glon): """Width error at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("Width_Err", glon)
[docs] def peak_latitude(self, glon): """Peak position at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("GLAT", glon)
[docs] def peak_latitude_error(self, glon): """Peak position error at a given longitude. Parameters ---------- glon : `~astropy.coordinates.Longitude` or `~astropy.coordinates.Angle` Galactic Longitude. """ return self._interpolate_parameter("GLAT_Err", glon)
[docs] def evaluate(self, position): """Evaluate model at a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position on the sky. """ glon, glat = position.galactic.l, position.galactic.b width = self.width(glon) amplitude = self.peak_brightness(glon) mean = self.peak_latitude(glon) return Gaussian1D.evaluate(glat, amplitude=amplitude, mean=mean, stddev=width)