# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Fermi catalog and source classes.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import warnings
import numpy as np
import astropy.units as u
from astropy.table import Table, Column
from astropy.time import Time
from ..utils.scripts import make_path
from ..utils.energy import EnergyBounds
from ..utils.table import table_standardise_units_inplace
from ..maps import Map
from ..spectrum import FluxPoints
from ..spectrum.models import (
PowerLaw,
PowerLaw2,
ExponentialCutoffPowerLaw3FGL,
PLSuperExpCutoff3FGL,
LogParabola,
)
from ..image.models import SkyPointSource, SkyGaussian, SkyDisk, SkyDiffuseMap
from ..cube.models import SkyModel
from ..time import LightCurve
from .core import SourceCatalog, SourceCatalogObject
__all__ = [
"SourceCatalogObject3FGL",
"SourceCatalogObject1FHL",
"SourceCatalogObject2FHL",
"SourceCatalogObject3FHL",
"SourceCatalog3FGL",
"SourceCatalog1FHL",
"SourceCatalog2FHL",
"SourceCatalog3FHL",
]
def compute_flux_points_ul(quantity, quantity_errp):
"""Compute UL value for fermi flux points.
See https://arxiv.org/pdf/1501.02003.pdf (page 30)
"""
return 2 * quantity_errp + quantity
[docs]class SourceCatalogObject3FGL(SourceCatalogObject):
"""One source from the Fermi-LAT 3FGL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog3FGL`.
"""
_ebounds = EnergyBounds([100, 300, 1000, 3000, 10000, 100000], "MeV")
_ebounds_suffix = ["100_300", "300_1000", "1000_3000", "3000_10000", "10000_100000"]
energy_range = u.Quantity([100, 100000], "MeV")
"""Energy range of the catalog.
Paper says that analysis uses data up to 300 GeV,
but results are all quoted up to 100 GeV only to
be consistent with previous catalogs.
"""
def __str__(self):
return self.info()
[docs] def info(self, info="all"):
"""Summary info string.
Parameters
----------
info : {'all', 'basic', 'position', 'spectral', 'lightcurve'}
Comma separated list of options
"""
if info == "all":
info = "basic,position,spectral,lightcurve"
ss = ""
ops = info.split(",")
if "basic" in ops:
ss += self._info_basic()
if "position" in ops:
ss += self._info_position()
if "spectral" in ops:
ss += self._info_spectral_fit()
ss += self._info_spectral_points()
if "lightcurve" in ops:
ss += self._info_lightcurve()
return ss
def _info_basic(self):
"""Print basic info."""
d = self.data
ss = "\n*** Basic info ***\n\n"
ss += "Catalog row index (zero-based) : {}\n".format(d["catalog_row_index"])
ss += "{:<20s} : {}\n".format("Source name", d["Source_Name"])
ss += "{:<20s} : {}\n".format("Extended name", d["Extended_Source_Name"])
def get_nonentry_keys(keys):
vals = [d[_].strip() for _ in keys]
return ", ".join([_ for _ in vals if _ != ""])
keys = [
"ASSOC1",
"ASSOC2",
"ASSOC_TEV",
"ASSOC_GAM1",
"ASSOC_GAM2",
"ASSOC_GAM3",
]
associations = get_nonentry_keys(keys)
ss += "{:<20s} : {}\n".format("Associations", associations)
keys = ["0FGL_Name", "1FGL_Name", "2FGL_Name", "1FHL_Name"]
other_names = get_nonentry_keys(keys)
ss += "{:<20s} : {}\n".format("Other names", other_names)
ss += "{:<20s} : {}\n".format("Class", d["CLASS1"])
tevcat_flag = d["TEVCAT_FLAG"]
if tevcat_flag == "N":
tevcat_message = "No TeV association"
elif tevcat_flag == "P":
tevcat_message = "Small TeV source"
elif tevcat_flag == "E":
tevcat_message = "Extended TeV source (diameter > 40 arcmins)"
else:
tevcat_message = "N/A"
ss += "{:<20s} : {}\n".format("TeVCat flag", tevcat_message)
flag_message = {
0: "None",
1: "Source with TS > 35 which went to TS < 25 when changing the diffuse model. Note that sources with TS < "
"35 are not flagged with this bit because normal statistical fluctuations can push them to TS < 25.",
3: "Flux (> 1 GeV) or energy flux (> 100 MeV) changed by more than 3 sigma when changing the diffuse model."
" Requires also that the flux change by more than 35% (to not flag strong sources).",
4: "Source-to-background ratio less than 10% in highest band in which TS > 25. Background is integrated "
"over the 68%-confidence area (pi*r_682) or 1 square degree, whichever is smaller.",
5: "Closer than theta_ref from a brighter neighbor, where theta_ref is defined in the highest band in which"
" source TS > 25, or the band with highest TS if all are < 25. theta_ref is set to 2.17 degrees (FWHM)"
" below 300 MeV, 1.38 degrees between 300 MeV and 1 GeV, 0.87 degrees between 1 GeV and 3 GeV, 0.67"
" degrees between 3 and 10 GeV and 0.45 degrees about 10 GeV (2*r_68).",
6: "On top of an interstellar gas clump or small-scale defect in the model of diffuse emission. This flag "
'is equivalent to the "c" suffix in the source name.',
7: "Unstable position determination; result from gtfindsrc outside the 95% ellipse from pointlike.",
9: "Localization Quality > 8 in pointlike (see Section 3.1 in catalog paper) or long axis of 95% ellipse >"
" 0.25.",
10: "Spectral Fit Quality > 16.3 (see Equation 3 in 2FGL catalog paper).",
11: "Possibly due to the Sun (see Section 3.6 in catalog paper).",
12: "Highly curved spectrum; LogParabola beta fixed to 1 or PLExpCutoff Spectral Index fixed to 0 (see "
"Section 3.3 in catalog paper).",
}
ss += "{:<20s} : {}\n".format(
"Other flags", flag_message.get(d["Flags"], "N/A")
)
return ss
def _info_position(self):
"""Print position info."""
d = self.data
ss = "\n*** Position info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
ss += "\n"
ss += "{:<20s} : {:.4f}\n".format("Semimajor (68%)", d["Conf_68_SemiMajor"])
ss += "{:<20s} : {:.4f}\n".format("Semiminor (68%)", d["Conf_68_SemiMinor"])
ss += "{:<20s} : {:.2f}\n".format("Position angle (68%)", d["Conf_68_PosAng"])
ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"])
ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"])
ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"])
return ss
def _info_spectral_fit(self):
"""Print spectral info."""
d = self.data
spec_type = d["SpectrumType"].strip()
ss = "\n*** Spectral info ***\n\n"
ss += "{:<45s} : {}\n".format("Spectrum type", d["SpectrumType"])
fmt = "{:<45s} : {:.3f}\n"
ss += fmt.format("Detection significance (100 MeV - 300 GeV)", d["Signif_Avg"])
ss += "{:<45s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"])
if spec_type == "PowerLaw":
pass
elif spec_type == "LogParabola":
ss += "{:<45s} : {} +- {}\n".format("beta", d["beta"], d["Unc_beta"])
elif spec_type in ["PLExpCutoff", "PlSuperExpCutoff"]:
fmt = "{:<45s} : {:.0f} +- {:.0f} {}\n"
ss += fmt.format(
"Cutoff energy",
d["Cutoff"].value,
d["Unc_Cutoff"].value,
d["Cutoff"].unit,
)
elif spec_type == "PLSuperExpCutoff":
ss += "{:<45s} : {} +- {}\n".format(
"Super-exponential cutoff index", d["Exp_Index"], d["Unc_Exp_Index"]
)
else:
raise ValueError("Invalid spec_type")
ss += "{:<45s} : {:.0f} {}\n".format(
"Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
)
ss += "{:<45s} : {:.3f}\n".format(
"Power law spectral index", d["PowerLaw_Index"]
)
fmt = "{:<45s} : {:.3f} +- {:.3f}\n"
ss += fmt.format("Spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"])
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Flux Density at pivot energy",
d["Flux_Density"].value,
d["Unc_Flux_Density"].value,
"cm-2 MeV-1 s-1",
)
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Integral flux (1 - 100 GeV)",
d["Flux1000"].value,
d["Unc_Flux1000"].value,
"cm-2 s-1",
)
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Energy flux (100 MeV - 100 GeV)",
d["Energy_Flux100"].value,
d["Unc_Energy_Flux100"].value,
"erg cm-2 s-1",
)
return ss
def _info_spectral_points(self):
"""Print spectral points."""
ss = "\n*** Spectral points ***\n\n"
lines = self._flux_points_table_formatted.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_lightcurve(self):
"""Print lightcurve info."""
d = self.data
ss = "\n*** Lightcurve info ***\n\n"
ss += "Lightcurve measured in the energy band: 100 MeV - 100 GeV\n\n"
ss += "{:<15s} : {:.3f}\n".format("Variability index", d["Variability_Index"])
if d["Signif_Peak"] == np.nan:
ss += "{:<40s} : {:.3f}\n".format(
"Significance peak (100 MeV - 100 GeV)", d["Signif_Peak"]
)
fmt = "{:<40s} : {:.3} +- {:.3} cm^-2 s^-1\n"
ss += fmt.format(
"Integral flux peak (100 MeV - 100 GeV)",
d["Flux_Peak"],
d["Unc_Flux_Peak"],
)
# TODO: give time as UTC string, not MET
ss += "{:<40s} : {:.3} s (Mission elapsed time)\n".format(
"Time peak", d["Time_Peak"]
)
peak_interval = d["Peak_Interval"].to("day").value
ss += "{:<40s} : {:.3} day\n".format("Peak interval", peak_interval)
else:
ss += "\nNo peak measured for this source.\n"
# TODO: Add a lightcurve table with d['Flux_History'] and d['Unc_Flux_History']
return ss
@property
def spectral_model(self):
"""Best fit spectral model (`~gammapy.spectrum.models.SpectralModel`)."""
spec_type = self.data["SpectrumType"].strip()
pars, errs = {}, {}
pars["amplitude"] = self.data["Flux_Density"]
errs["amplitude"] = self.data["Unc_Flux_Density"]
pars["reference"] = self.data["Pivot_Energy"]
if spec_type == "PowerLaw":
pars["index"] = self.data["Spectral_Index"] * u.dimensionless_unscaled
errs["index"] = self.data["Unc_Spectral_Index"] * u.dimensionless_unscaled
model = PowerLaw(**pars)
elif spec_type == "PLExpCutoff":
pars["index"] = self.data["Spectral_Index"] * u.dimensionless_unscaled
pars["ecut"] = self.data["Cutoff"]
errs["index"] = self.data["Unc_Spectral_Index"] * u.dimensionless_unscaled
errs["ecut"] = self.data["Unc_Cutoff"]
model = ExponentialCutoffPowerLaw3FGL(**pars)
elif spec_type == "LogParabola":
pars["alpha"] = self.data["Spectral_Index"] * u.dimensionless_unscaled
pars["beta"] = self.data["beta"] * u.dimensionless_unscaled
errs["alpha"] = self.data["Unc_Spectral_Index"] * u.dimensionless_unscaled
errs["beta"] = self.data["Unc_beta"] * u.dimensionless_unscaled
model = LogParabola(**pars)
elif spec_type == "PLSuperExpCutoff":
# TODO: why convert to GeV here? Remove?
pars["reference"] = pars["reference"].to("GeV")
pars["index_1"] = self.data["Spectral_Index"] * u.dimensionless_unscaled
pars["index_2"] = self.data["Exp_Index"] * u.dimensionless_unscaled
pars["ecut"] = self.data["Cutoff"].to("GeV")
errs["index_1"] = self.data["Unc_Spectral_Index"] * u.dimensionless_unscaled
errs["index_2"] = self.data["Unc_Exp_Index"] * u.dimensionless_unscaled
errs["ecut"] = self.data["Unc_Cutoff"].to("GeV")
model = PLSuperExpCutoff3FGL(**pars)
else:
raise ValueError("Invalid spec_type: {!r}".format(spec_type))
model.parameters.set_parameter_errors(errs)
return model
@property
def spatial_model(self):
"""
Source spatial model (`~gammapy.image.models.SkySpatialModel`).
"""
d = self.data
pars = {}
glon = d["GLON"]
glat = d["GLAT"]
if self.is_pointlike:
pars["lon_0"] = glon
pars["lat_0"] = glat
return SkyPointSource(lon_0=glon, lat_0=glat)
else:
de = self.data_extended
morph_type = de["Model_Form"].strip()
if morph_type == "Disk":
r_0 = de["Model_SemiMajor"].to("deg")
return SkyDisk(lon_0=glon, lat_0=glat, r_0=r_0)
elif morph_type in ["Map", "Ring", "2D Gaussian x2"]:
filename = de["Spatial_Filename"].strip()
path = make_path(
"$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v15/Templates/"
)
return SkyDiffuseMap.read(path / filename)
elif morph_type == "2D Gaussian":
# TODO: fill elongation info as soon as model supports it
sigma = de["Model_SemiMajor"].to("deg")
return SkyGaussian(lon_0=glon, lat_0=glat, sigma=sigma)
else:
raise ValueError("Invalid spatial model: {!r}".format(morph_type))
@property
def sky_model(self):
"""Source sky model (`~gammapy.cube.models.SkyModel`)."""
spatial_model = self.spatial_model
spectral_model = self.spectral_model
return SkyModel(spatial_model, spectral_model)
@property
def is_pointlike(self):
return self.data["Extended_Source_Name"].strip() == ""
@property
def _flux_points_table_formatted(self):
"""Returns formatted version of self.flux_points.table"""
table = self.flux_points.table.copy()
flux_cols = [
"flux",
"flux_errn",
"flux_errp",
"e2dnde",
"e2dnde_errn",
"e2dnde_errp",
"flux_ul",
"e2dnde_ul",
"dnde",
]
table["sqrt_TS"].format = ".1f"
table["e_ref"].format = ".1f"
for _ in flux_cols:
table[_].format = ".3"
return table
@property
def flux_points(self):
"""Flux points (`~gammapy.spectrum.FluxPoints`)."""
table = Table()
table.meta["SED_TYPE"] = "flux"
e_ref = self._ebounds.log_centers
table["e_ref"] = e_ref
table["e_min"] = self._ebounds.lower_bounds
table["e_max"] = self._ebounds.upper_bounds
flux = self._get_flux_values("Flux")
flux_err = self._get_flux_values("Unc_Flux")
table["flux"] = flux
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
nuFnu = self._get_flux_values("nuFnu", "erg cm-2 s-1")
table["e2dnde"] = nuFnu
table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux)
table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
# handle upper limits
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
# handle upper limits
table["e2dnde_ul"] = np.nan * nuFnu.unit
e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
# Square root of test statistic
table["sqrt_TS"] = [self.data["Sqrt_TS" + _] for _ in self._ebounds_suffix]
table["dnde"] = (nuFnu * e_ref ** -2).to("TeV-1 cm-2 s-1")
return FluxPoints(table)
def _get_flux_values(self, prefix, unit="cm-2 s-1"):
values = [self.data[prefix + _] for _ in self._ebounds_suffix]
return u.Quantity(values, unit)
@property
def lightcurve(self):
"""Lightcurve (`~gammapy.time.LightCurve`).
Examples
--------
>>> from gammapy.catalog import source_catalogs
>>> source = source_catalogs['3fgl']['3FGL J0349.9-2102']
>>> lc = source.lightcurve
>>> lc.plot()
"""
flux = self.data["Flux_History"]
# Flux error is given as asymmetric high/low
flux_errn = -self.data["Unc_Flux_History"][:, 0]
flux_errp = self.data["Unc_Flux_History"][:, 1]
# Really the time binning is stored in a separate HDU in the FITS
# catalog file called `Hist_Start`, with a single column `Hist_Start`
# giving the time binning in MET (mission elapsed time)
# This is not available here for now.
# TODO: read that info in `SourceCatalog3FGL` and pass it down to the
# `SourceCatalogObject3FGL` object somehow.
# For now, we just hard-code the start and stop time and assume
# equally-spaced time intervals. This is roughly correct,
# for plotting the difference doesn't matter, only for analysis
time_start = Time("2008-08-02T00:33:19")
time_end = Time("2012-07-31T22:45:47")
n_points = len(flux)
time_step = (time_end - time_start) / n_points
time_bounds = time_start + np.arange(n_points + 1) * time_step
table = Table(
[
Column(time_bounds[:-1].utc.mjd, "time_min"),
Column(time_bounds[1:].utc.mjd, "time_max"),
Column(flux, "flux"),
Column(flux_errp, "flux_errp"),
Column(flux_errn, "flux_errn"),
]
)
return LightCurve(table)
[docs]class SourceCatalogObject1FHL(SourceCatalogObject):
"""One source from the Fermi-LAT 1FHL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog1FHL`.
"""
_ebounds = EnergyBounds([10, 30, 100, 500], "GeV")
_ebounds_suffix = ["10_30", "30_100", "100_500"]
energy_range = u.Quantity([0.01, 0.5], "TeV")
"""Energy range of the Fermi 1FHL source catalog"""
def __str__(self):
return self.info()
[docs] def info(self):
"""Print summary info."""
# TODO: can we share code with 3FGL summary function?
d = self.data
ss = "Source: {}\n".format(d["Source_Name"])
ss += "\n"
ss += "RA (J2000) : {}\n".format(d["RAJ2000"])
ss += "Dec (J2000) : {}\n".format(d["DEJ2000"])
ss += "GLON : {}\n".format(d["GLON"])
ss += "GLAT : {}\n".format(d["GLAT"])
ss += "\n"
# val, err = d['Energy_Flux100'], d['Unc_Energy_Flux100']
# ss += 'Energy flux (100 MeV - 100 GeV) : {} +- {} erg cm^-2 s^-1\n'.format(val, err)
# ss += 'Detection significance : {}\n'.format(d['Signif_Avg'])
return ss
def _get_flux_values(self, prefix, unit="cm-2 s-1"):
values = [self.data[prefix + _ + "GeV"] for _ in self._ebounds_suffix]
return u.Quantity(values, unit)
@property
def flux_points(self):
"""Integral flux points (`~gammapy.spectrum.FluxPoints`)."""
table = Table()
table.meta["SED_TYPE"] = "flux"
table["e_min"] = self._ebounds.lower_bounds
table["e_max"] = self._ebounds.upper_bounds
table["flux"] = self._get_flux_values("Flux")
flux_err = self._get_flux_values("Unc_Flux")
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
# handle upper limits
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
flux_points = FluxPoints(table)
# TODO: change this and leave it up to the caller to convert to dnde
# See https://github.com/gammapy/gammapy/issues/1034
return flux_points.to_sed_type("dnde", model=self.spectral_model)
@property
def spectral_model(self):
"""Best fit spectral model `~gammapy.spectrum.models.SpectralModel`."""
pars, errs = {}, {}
pars["amplitude"] = self.data["Flux"]
pars["emin"], pars["emax"] = self.energy_range
pars["index"] = self.data["Spectral_Index"] * u.dimensionless_unscaled
errs["amplitude"] = self.data["Unc_Flux"]
errs["index"] = self.data["Unc_Spectral_Index"] * u.dimensionless_unscaled
model = PowerLaw2(**pars)
model.parameters.set_parameter_errors(errs)
return model
[docs]class SourceCatalogObject2FHL(SourceCatalogObject):
"""One source from the Fermi-LAT 2FHL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog2FHL`.
"""
_ebounds = EnergyBounds([50, 171, 585, 2000], "GeV")
_ebounds_suffix = ["50_171", "171_585", "585_2000"]
energy_range = u.Quantity([0.05, 2], "TeV")
"""Energy range of the Fermi 2FHL source catalog"""
def __str__(self):
return self.info()
[docs] def info(self):
"""Print summary info."""
# TODO: can we share code with 3FGL summary funtion?
d = self.data
ss = "Source: {}\n".format(d["Source_Name"])
ss += "\n"
ss += "RA (J2000) : {}\n".format(d["RAJ2000"])
ss += "Dec (J2000) : {}\n".format(d["DEJ2000"])
ss += "GLON : {}\n".format(d["GLON"])
ss += "GLAT : {}\n".format(d["GLAT"])
ss += "\n"
# val, err = d['Energy_Flux100'], d['Unc_Energy_Flux100']
# ss += 'Energy flux (100 MeV - 100 GeV) : {} +- {} erg cm^-2 s^-1\n'.format(val, err)
# ss += 'Detection significance : {}\n'.format(d['Signif_Avg'])
return ss
def _get_flux_values(self, prefix, unit="cm-2 s-1"):
values = [self.data[prefix + _ + "GeV"] for _ in self._ebounds_suffix]
return u.Quantity(values, unit)
@property
def flux_points(self):
"""Integral flux points (`~gammapy.spectrum.FluxPoints`)."""
table = Table()
table.meta["SED_TYPE"] = "flux"
table["e_min"] = self._ebounds.lower_bounds
table["e_max"] = self._ebounds.upper_bounds
table["flux"] = self._get_flux_values("Flux")
flux_err = self._get_flux_values("Unc_Flux")
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
# handle upper limits
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
flux_points = FluxPoints(table)
# TODO: change this and leave it up to the caller to convert to dnde
# See https://github.com/gammapy/gammapy/issues/1034
return flux_points.to_sed_type("dnde", model=self.spectral_model)
@property
def spectral_model(self):
"""Best fit spectral model (`~gammapy.spectrum.models.SpectralModel`)."""
pars, errs = {}, {}
pars["amplitude"] = self.data["Flux50"]
pars["emin"], pars["emax"] = self.energy_range
pars["index"] = self.data["Spectral_Index"] * u.dimensionless_unscaled
errs["amplitude"] = self.data["Unc_Flux50"]
errs["index"] = self.data["Unc_Spectral_Index"] * u.dimensionless_unscaled
model = PowerLaw2(**pars)
model.parameters.set_parameter_errors(errs)
return model
[docs]class SourceCatalogObject3FHL(SourceCatalogObject):
"""One source from the Fermi-LAT 3FHL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog3FHL`.
"""
energy_range = u.Quantity([0.01, 2], "TeV")
"""Energy range of the Fermi 1FHL source catalog"""
_ebounds = EnergyBounds([10, 20, 50, 150, 500, 2000], "GeV")
def __str__(self):
return self.info()
[docs] def info(self, info="all"):
"""Summary info string.
Parameters
----------
info : {'all', 'basic', 'position', 'spectral'}
Comma separated list of options
"""
if info == "all":
info = "basic,position,spectral,other"
ss = ""
ops = info.split(",")
if "basic" in ops:
ss += self._info_basic()
if "position" in ops:
ss += self._info_position()
if not self.is_pointlike:
ss += self._info_morphology()
if "spectral" in ops:
ss += self._info_spectral_fit()
ss += self._info_spectral_points()
if "other" in ops:
ss += self._info_other()
return ss
def _info_basic(self):
"""Print basic info."""
d = self.data
ss = "\n*** Basic info ***\n\n"
ss += "Catalog row index (zero-based) : {}\n".format(d["catalog_row_index"])
ss += "{:<20s} : {}\n".format("Source name", d["Source_Name"])
ss += "{:<20s} : {}\n".format("Extended name", d["Extended_Source_Name"])
def get_nonentry_keys(keys):
vals = [d[_].strip() for _ in keys]
return ", ".join([_ for _ in vals if _ != ""])
keys = ["ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM"]
associations = get_nonentry_keys(keys)
ss += "{:<16s} : {}\n".format("Associations", associations)
ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_BAY", d["ASSOC_PROB_BAY"])
ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_LR", d["ASSOC_PROB_LR"])
ss += "{:<16s} : {}\n".format("Class", d["CLASS"])
tevcat_flag = d["TEVCAT_FLAG"]
if tevcat_flag == "N":
tevcat_message = "No TeV association"
elif tevcat_flag == "P":
tevcat_message = "Small TeV source"
elif tevcat_flag == "E":
tevcat_message = "Extended TeV source (diameter > 40 arcmins)"
else:
tevcat_message = "N/A"
ss += "{:<16s} : {}\n".format("TeVCat flag", tevcat_message)
fmt = "\n{:<32s} : {:.3f}\n"
ss += fmt.format("Significance (10 GeV - 2 TeV)", d["Signif_Avg"])
ss += "{:<32s} : {:.1f}\n".format("Npred", d["Npred"])
return ss
def _info_position(self):
"""Print position info."""
d = self.data
ss = "\n*** Position info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
# TODO: All sources are non-elliptical; just give one number for radius?
ss += "\n"
ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"])
ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"])
ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"])
return ss
def _info_morphology(self):
e = self.data_extended
ss = "*** Extended source information ***\n"
ss += "{:<16s} : {}\n".format("Model form", e["Model_Form"])
ss += "{:<16s} : {:.4f}\n".format("Model semimajor", e["Model_SemiMajor"])
ss += "{:<16s} : {:.4f}\n".format("Model semiminor", e["Model_SemiMinor"])
ss += "{:<16s} : {:.4f}\n".format("Position angle", e["Model_PosAng"])
ss += "{:<16s} : {}\n".format("Spatial function", e["Spatial_Function"])
ss += "{:<16s} : {}\n\n".format("Spatial filename", e["Spatial_Filename"])
return ss
def _info_spectral_fit(self):
"""Print model data."""
d = self.data
spec_type = d["SpectrumType"].strip()
ss = "\n*** Spectral fit info ***\n\n"
ss += "{:<32s} : {}\n".format("Spectrum type", d["SpectrumType"])
ss += "{:<32s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"])
# Power-law parameters are always given; give in any case
fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
ss += fmt.format(
"Power-law spectral index", d["PowerLaw_Index"], d["Unc_PowerLaw_Index"]
)
if spec_type == "PowerLaw":
pass
elif spec_type == "LogParabola":
fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
ss += fmt.format(
"LogParabola spectral index",
d["Spectral_Index"],
d["Unc_Spectral_Index"],
)
ss += "{:<32s} : {:.3f} +- {:.3f}\n".format(
"LogParabola beta", d["beta"], d["Unc_beta"]
)
else:
raise ValueError("Invalid spec_type")
ss += "{:<32s} : {:.1f} {}\n".format(
"Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Flux Density at pivot energy",
d["Flux_Density"].value,
d["Unc_Flux_Density"].value,
"cm-2 GeV-1 s-1",
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Integral flux (10 GeV - 1 TeV)",
d["Flux"].value,
d["Unc_Flux"].value,
"cm-2 s-1",
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Energy flux (10 GeV - TeV)",
d["Energy_Flux"].value,
d["Unc_Energy_Flux"].value,
"erg cm-2 s-1",
)
return ss
def _info_spectral_points(self):
"""Print spectral points."""
ss = "\n*** Spectral points ***\n\n"
lines = self._flux_points_table_formatted.pformat(max_width=-1, max_lines=-1)
ss += "\n".join(lines)
return ss + "\n"
def _info_other(self):
"""Print other info."""
d = self.data
ss = "\n*** Other info ***\n\n"
ss += "{:<16s} : {:.3f} {}\n".format(
"HEP Energy", d["HEP_Energy"].value, d["HEP_Energy"].unit
)
ss += "{:<16s} : {:.3f}\n".format("HEP Probability", d["HEP_Prob"])
# This is the number of Bayesian blocks for most sources,
# except -1 means "could not be tested"
msg = d["Variability_BayesBlocks"]
if msg == 1:
msg = "1 (not variable)"
elif msg == -1:
msg = "Could not be tested"
ss += "{:<16s} : {}\n".format("Bayesian Blocks", msg)
ss += "{:<16s} : {:.3f}\n".format("Redshift", d["Redshift"])
ss += "{:<16s} : {:.3} {}\n".format(
"NuPeak_obs", d["NuPeak_obs"].value, d["NuPeak_obs"].unit
)
return ss
@property
def spectral_model(self):
"""Best fit spectral model (`~gammapy.spectrum.models.SpectralModel`)."""
d = self.data
spec_type = self.data["SpectrumType"].strip()
pars, errs = {}, {}
pars["amplitude"] = d["Flux_Density"]
errs["amplitude"] = d["Unc_Flux_Density"]
pars["reference"] = d["Pivot_Energy"]
if spec_type == "PowerLaw":
pars["index"] = d["PowerLaw_Index"] * u.dimensionless_unscaled
errs["index"] = d["Unc_PowerLaw_Index"] * u.dimensionless_unscaled
model = PowerLaw(**pars)
elif spec_type == "LogParabola":
pars["alpha"] = d["Spectral_Index"] * u.dimensionless_unscaled
pars["beta"] = d["beta"] * u.dimensionless_unscaled
errs["alpha"] = d["Unc_Spectral_Index"] * u.dimensionless_unscaled
errs["beta"] = d["Unc_beta"] * u.dimensionless_unscaled
model = LogParabola(**pars)
else:
raise ValueError("Invalid spec_type: {!r}".format(spec_type))
model.parameters.set_parameter_errors(errs)
return model
@property
def _flux_points_table_formatted(self):
"""Returns formatted version of self.flux_points.table"""
table = self.flux_points.table.copy()
flux_cols = [
"flux",
"flux_errn",
"flux_errp",
"e2dnde",
"e2dnde_errn",
"e2dnde_errp",
"flux_ul",
"e2dnde_ul",
"dnde",
]
table["sqrt_ts"].format = ".1f"
table["e_ref"].format = ".1f"
for _ in flux_cols:
table[_].format = ".3"
return table
@property
def flux_points(self):
"""Flux points (`~gammapy.spectrum.FluxPoints`)."""
table = Table()
table.meta["SED_TYPE"] = "flux"
e_ref = self._ebounds.log_centers
table["e_ref"] = e_ref
table["e_min"] = self._ebounds.lower_bounds
table["e_max"] = self._ebounds.upper_bounds
flux = self.data["Flux_Band"]
flux_err = self.data["Unc_Flux_Band"]
e2dnde = self.data["nuFnu"]
table["flux"] = flux
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
table["e2dnde"] = e2dnde
table["e2dnde_errn"] = np.abs(e2dnde * flux_err[:, 0] / flux)
table["e2dnde_errp"] = e2dnde * flux_err[:, 1] / flux
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
# handle upper limits
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
table["e2dnde_ul"] = np.nan * e2dnde.unit
e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
# Square root of test statistic
table["sqrt_ts"] = self.data["Sqrt_TS_Band"]
# TODO: remove this computation here.
# # Instead provide a method on the FluxPoints class like `to_dnde()` or something.
table["dnde"] = (e2dnde * e_ref ** -2).to("cm-2 s-1 TeV-1")
return FluxPoints(table)
@property
def spatial_model(self):
"""Source spatial model (`~gammapy.image.models.SkySpatialModel`)."""
d = self.data
pars = {}
glon = d["GLON"]
glat = d["GLAT"]
if self.is_pointlike:
pars["lon_0"] = glon
pars["lat_0"] = glat
return SkyPointSource(lon_0=glon, lat_0=glat)
else:
de = self.data_extended
morph_type = de["Spatial_Function"].strip()
if morph_type == "RadialDisk":
r_0 = de["Model_SemiMajor"].to("deg")
return SkyDisk(lon_0=glon, lat_0=glat, r_0=r_0)
elif morph_type in ["SpatialMap"]:
filename = de["Spatial_Filename"].strip()
path = make_path(
"$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v18/Templates/"
)
return SkyDiffuseMap.read(path / filename)
elif morph_type == "RadialGauss":
# TODO: fill elongation info as soon as model supports it
sigma = de["Model_SemiMajor"].to("deg")
return SkyGaussian(lon_0=glon, lat_0=glat, sigma=sigma)
else:
raise ValueError("Invalid morph_type: {!r}".format(morph_type))
@property
def sky_model(self):
"""Source sky model (`~gammapy.cube.models.SkyModel`)."""
spatial_model = self.spatial_model
spectral_model = self.spectral_model
return SkyModel(spatial_model, spectral_model)
@property
def is_pointlike(self):
return self.data["Extended_Source_Name"].strip() == ""
[docs]class SourceCatalog3FGL(SourceCatalog):
"""Fermi-LAT 3FGL source catalog.
Reference: https://ui.adsabs.harvard.edu/#abs/2015ApJS..218...23A
One source is represented by `~gammapy.catalog.SourceCatalogObject3FGL`.
"""
name = "3fgl"
description = "LAT 4-year point source catalog"
source_object_class = SourceCatalogObject3FGL
source_categories = {
"galactic": ["psr", "pwn", "snr", "spp", "glc"],
"extra-galactic": [
"css",
"bll",
"fsrq",
"agn",
"nlsy1",
"rdg",
"sey",
"bcu",
"gal",
"sbg",
"ssrq",
],
"GALACTIC": ["PSR", "PWN", "SNR", "HMB", "BIN", "NOV", "SFR"],
"EXTRA-GALACTIC": [
"CSS",
"BLL",
"FSRQ",
"AGN",
"NLSY1",
"RDG",
"SEY",
"BCU",
"GAL",
"SBG",
"SSRQ",
],
"unassociated": [""],
}
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psc_v16.fit.gz"):
filename = str(make_path(filename))
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", u.UnitsWarning)
table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = (
"Extended_Source_Name",
"0FGL_Name",
"1FGL_Name",
"2FGL_Name",
"1FHL_Name",
"ASSOC_TEV",
"ASSOC1",
"ASSOC2",
)
super(SourceCatalog3FGL, self).__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
[docs] def is_source_class(self, source_class):
"""
Check if source belongs to a given source class.
The classes are described in Table 3 of the 3FGL paper:
http://adsabs.harvard.edu/abs/2015arXiv150102003T
Parameters
----------
source_class : str
Source class designator as defined in Table 3. There are a few extra
selections available:
- 'ALL': all identified objects
- 'all': all objects with associations
- 'galactic': all sources with an associated galactic object
- 'GALACTIC': all identified galactic sources
- 'extra-galactic': all sources with an associated extra-galactic object
- 'EXTRA-GALACTIC': all identified extra-galactic sources
- 'unassociated': all unassociated objects
Returns
-------
selection : `~numpy.ndarray`
Selection mask.
"""
source_class_info = np.array([_.strip() for _ in self.table["CLASS1"]])
cats = self.source_categories
if source_class in cats:
category = set(cats[source_class])
elif source_class == "ALL":
category = set(cats["EXTRA-GALACTIC"] + cats["GALACTIC"])
elif source_class == "all":
category = set(cats["extra-galactic"] + cats["galactic"])
elif source_class in np.unique(source_class_info):
category = set([source_class])
else:
raise ValueError("Invalid source_class: {!r}".format(source_class))
return np.array([_ in category for _ in source_class_info])
[docs] def select_source_class(self, source_class):
"""
Select all sources of a given source class.
See `SourceCatalog3FHL.is_source_class` for further documentation
Parameters
----------
source_class : str
Source class designator.
Returns
-------
selection : `SourceCatalog3FHL`
Subset of the 3FHL catalog containing only the selected source class.
"""
catalog = self.copy()
selection = self.is_source_class(source_class)
catalog.table = catalog.table[selection]
return catalog
[docs]class SourceCatalog1FHL(SourceCatalog):
"""Fermi-LAT 1FHL source catalog.
Reference: http://adsabs.harvard.edu/abs/2013ApJS..209...34A
One source is represented by `~gammapy.catalog.SourceCatalogObject1FHL`.
"""
name = "1fhl"
description = "First Fermi-LAT Catalog of Sources above 10 GeV"
source_object_class = SourceCatalogObject1FHL
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v07.fit.gz"):
filename = str(make_path(filename))
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", u.UnitsWarning)
table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = ("ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM")
super(SourceCatalog1FHL, self).__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
[docs]class SourceCatalog2FHL(SourceCatalog):
"""Fermi-LAT 2FHL source catalog.
Reference: http://adsabs.harvard.edu/abs/2016ApJS..222....5A
One source is represented by `~gammapy.catalog.SourceCatalogObject2FHL`.
"""
name = "2fhl"
description = "LAT second high-energy source catalog"
source_object_class = SourceCatalogObject2FHL
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v08.fit.gz"):
filename = str(make_path(filename))
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", u.UnitsWarning)
table = Table.read(filename, hdu="2FHL Source Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = ("ASSOC", "3FGL_Name", "1FHL_Name", "TeVCat_Name")
super(SourceCatalog2FHL, self).__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.counts_image = Map.read(filename, hdu="Count Map")
self.extended_sources_table = Table.read(filename, hdu="Extended Sources")
self.rois = Table.read(filename, hdu="ROIs")
[docs]class SourceCatalog3FHL(SourceCatalog):
"""Fermi-LAT 3FHL source catalog.
Reference: http://adsabs.harvard.edu/abs/2017ApJS..232...18A
One source is represented by `~gammapy.catalog.SourceCatalogObject3FHL`.
"""
name = "3fhl"
description = "LAT third high-energy source catalog"
source_object_class = SourceCatalogObject3FHL
source_categories = {
"galactic": ["glc", "hmb", "psr", "pwn", "sfr", "snr", "spp"],
"extra-galactic": ["agn", "bcu", "bll", "fsrq", "rdg", "sbg"],
"GALACTIC": ["BIN", "HMB", "PSR", "PWN", "SFR", "SNR"],
"EXTRA-GALACTIC": ["BLL", "FSRQ", "NLSY1", "RDG"],
"unassociated": [""],
}
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v13.fit.gz"):
filename = str(make_path(filename))
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", u.UnitsWarning)
table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = ("ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM")
super(SourceCatalog3FHL, self).__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
self.rois = Table.read(filename, hdu="ROIs")
self.energy_bounds_table = Table.read(filename, hdu="EnergyBounds")
[docs] def is_source_class(self, source_class):
"""
Check if source belongs to a given source class.
The classes are described in Table 3 of the 3FGL paper:
http://adsabs.harvard.edu/abs/2015arXiv150102003T
Parameters
----------
source_class : str
Source class designator as defined in Table 3. There are a few extra
selections available:
- 'ALL': all identified objects
- 'all': all objects with associations
- 'galactic': all sources with an associated galactic object
- 'GALACTIC': all identified galactic sources
- 'extra-galactic': all sources with an associated extra-galactic object
- 'EXTRA-GALACTIC': all identified extra-galactic sources
- 'unassociated': all unassociated objects
Returns
-------
selection : `~numpy.ndarray`
Selection mask.
"""
source_class_info = np.array([_.strip() for _ in self.table["CLASS"]])
cats = self.source_categories
if source_class in cats:
category = set(cats[source_class])
elif source_class == "ALL":
category = set(cats["EXTRA-GALACTIC"] + cats["GALACTIC"])
elif source_class == "all":
category = set(cats["extra-galactic"] + cats["galactic"])
elif source_class in np.unique(source_class_info):
category = set([source_class])
else:
raise ValueError("Invalid source_class: {!r}".format(source_class))
return np.array([_ in category for _ in source_class_info])
[docs] def select_source_class(self, source_class):
"""
Select all sources of a given source class.
See `SourceCatalog3FHL.is_source_class` for further documentation
Parameters
----------
source_class : str
Source class designator.
Returns
-------
selection : `SourceCatalog3FHL`
Subset of the 3FHL catalog containing only the selected source class.
"""
catalog = self.copy()
selection = self.is_source_class(source_class)
catalog.table = catalog.table[selection]
return catalog