# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Fermi catalog and source classes."""
import abc
import logging
import warnings
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.wcs import FITSFixedWarning
from gammapy.estimators import FluxPoints
from gammapy.maps import MapAxis, Maps, RegionGeom
from gammapy.modeling.models import (
DiskSpatialModel,
GaussianSpatialModel,
Model,
Models,
PointSpatialModel,
SkyModel,
TemplateSpatialModel,
)
from gammapy.utils.gauss import Gauss2DPDF
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_standardise_units_inplace
from .core import SourceCatalog, SourceCatalogObject, format_flux_points_table
__all__ = [
"SourceCatalog2FHL",
"SourceCatalog3FGL",
"SourceCatalog3FHL",
"SourceCatalog4FGL",
"SourceCatalog2PC",
"SourceCatalog3PC",
"SourceCatalogObject2FHL",
"SourceCatalogObject3FGL",
"SourceCatalogObject3FHL",
"SourceCatalogObject4FGL",
"SourceCatalogObject2PC",
"SourceCatalogObject3PC",
]
log = logging.getLogger(__name__)
def compute_flux_points_ul(quantity, quantity_errp):
"""Compute UL value for fermi flux points.
See https://arxiv.org/pdf/1501.02003.pdf (page 30).
"""
return 2 * quantity_errp + quantity
class SourceCatalogObjectFermiPCBase(SourceCatalogObject, abc.ABC):
"""Base class for Fermi-LAT Pulsar catalogs."""
def __str__(self):
return self.info()
def info(self, info="all"):
if info == "all":
info = "basic,more,position,pulsar,spectral,lightcurve"
ss = ""
ops = info.split(",")
if "basic" in ops:
ss += self._info_basic()
if "more" in ops:
ss += self._info_more()
if "pulsar" in ops:
ss += self._info_pulsar()
if "position" in ops:
ss += self._info_position()
if "spectral" in ops:
ss += self._info_spectral_fit()
ss += self._info_spectral_points()
if "lightcurve" in ops:
ss += self._info_phasogram()
return ss
def _info_basic(self):
ss = "\n*** Basic info ***\n\n"
ss += "Catalog row index (zero-based) : {}\n".format(self.row_index)
ss += "{:<20s} : {}\n".format("Source name", self.name)
return ss
def _info_more(self):
return ""
def _info_pulsar(self):
return "\n"
def _info_position(self):
source_pos = self.position
ss = "\n*** Position info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("RA", source_pos.ra)
ss += "{:<20s} : {:.3f}\n".format("DEC", source_pos.dec)
ss += "{:<20s} : {:.3f}\n".format("GLON", source_pos.galactic.l)
ss += "{:<20s} : {:.3f}\n".format("GLAT", source_pos.galactic.b)
return ss
def _info_spectral_fit(self):
return "\n"
def _info_spectral_points(self):
ss = "\n*** Spectral points ***\n\n"
if self.flux_points_table is None:
ss += "No spectral points available.\n"
return ss
lines = format_flux_points_table(self.flux_points_table).pformat(
max_width=-1, max_lines=-1
)
ss += "\n".join(lines)
ss += "\n"
return ss
def _info_phasogram(self):
return ""
def spatial_model(self):
source_pos = self.position
ra = source_pos.ra
dec = source_pos.dec
model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
return model
def sky_model(self, name=None):
"""Sky model (`~gammapy.modeling.models.SkyModel`)."""
spectral_model = self.spectral_model()
if spectral_model is None:
return None
if name is None:
name = self.name
return SkyModel(
spatial_model=self.spatial_model(),
spectral_model=spectral_model,
name=name,
)
@property
def flux_points(self):
"""Flux points (`~gammapy.estimators.FluxPoints`)."""
if self.flux_points_table is None:
return None
return FluxPoints.from_table(
table=self.flux_points_table,
reference_model=self.sky_model(),
format="gadf-sed",
)
@property
def lightcurve(self):
"""Light-curve."""
pass
class SourceCatalogObjectFermiBase(SourceCatalogObject, abc.ABC):
"""Base class for Fermi-LAT catalogs."""
asso = ["ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM1", "ASSOC_GAM2", "ASSOC_GAM3"]
flux_points_meta = {
"sed_type_init": "flux",
"n_sigma": 1,
"sqrt_ts_threshold_ul": 1,
"n_sigma_ul": 2,
}
def __str__(self):
return self.info()
def info(self, info="all"):
"""Summary information string.
Parameters
----------
info : {'all', 'basic', 'more', 'position', 'spectral', 'lightcurve'}
Comma separated list of options.
"""
if info == "all":
info = "basic,more,position,spectral,lightcurve"
ss = ""
ops = info.split(",")
if "basic" in ops:
ss += self._info_basic()
if "more" in ops:
ss += self._info_more()
if "position" in ops:
ss += self._info_position()
if not self.is_pointlike:
ss += self._info_morphology()
if "spectral" in ops:
ss += self._info_spectral_fit()
ss += self._info_spectral_points()
if "lightcurve" in ops:
ss += self._info_lightcurve()
return ss
def _info_basic(self):
d = self.data
keys = self.asso
ss = "\n*** Basic info ***\n\n"
ss += "Catalog row index (zero-based) : {}\n".format(self.row_index)
ss += "{:<20s} : {}\n".format("Source name", self.name)
if "Extended_Source_Name" in d:
ss += "{:<20s} : {}\n".format("Extended name", d["Extended_Source_Name"])
def get_nonentry_keys(keys):
vals = [str(d[_]).strip() for _ in keys]
return ", ".join([_ for _ in vals if _ != ""])
associations = get_nonentry_keys(keys)
ss += "{:<16s} : {}\n".format("Associations", associations)
try:
ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_BAY", d["ASSOC_PROB_BAY"])
ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_LR", d["ASSOC_PROB_LR"])
except KeyError:
pass
try:
ss += "{:<16s} : {}\n".format("Class1", d["CLASS1"])
except KeyError:
ss += "{:<16s} : {}\n".format("Class", d["CLASS"])
try:
ss += "{:<16s} : {}\n".format("Class2", d["CLASS2"])
except KeyError:
pass
ss += "{:<16s} : {}\n".format("TeVCat flag", d.get("TEVCAT_FLAG", "N/A"))
return ss
@abc.abstractmethod
def _info_more(self):
pass
def _info_position(self):
d = self.data
ss = "\n*** Position info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
ss += "\n"
ss += "{:<20s} : {:.4f}\n".format("Semimajor (68%)", d["Conf_68_SemiMajor"])
ss += "{:<20s} : {:.4f}\n".format("Semiminor (68%)", d["Conf_68_SemiMinor"])
ss += "{:<20s} : {:.2f}\n".format("Position angle (68%)", d["Conf_68_PosAng"])
ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"])
ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"])
ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"])
return ss
def _info_morphology(self):
e = self.data_extended
ss = "\n*** Extended source information ***\n\n"
ss += "{:<16s} : {}\n".format("Model form", e["Model_Form"])
ss += "{:<16s} : {:.4f}\n".format("Model semimajor", e["Model_SemiMajor"])
ss += "{:<16s} : {:.4f}\n".format("Model semiminor", e["Model_SemiMinor"])
ss += "{:<16s} : {:.4f}\n".format("Position angle", e["Model_PosAng"])
try:
ss += "{:<16s} : {}\n".format("Spatial function", e["Spatial_Function"])
except KeyError:
pass
ss += "{:<16s} : {}\n\n".format("Spatial filename", e["Spatial_Filename"])
return ss
def _info_spectral_fit(self):
return "\n"
def _info_spectral_points(self):
ss = "\n*** Spectral points ***\n\n"
lines = format_flux_points_table(self.flux_points_table).pformat(
max_width=-1, max_lines=-1
)
ss += "\n".join(lines)
return ss
def _info_lightcurve(self):
return "\n"
@property
def is_pointlike(self):
return self.data["Extended_Source_Name"].strip() == ""
# FIXME: this should be renamed `set_position_error`,
# and `phi_0` isn't filled correctly, other parameters missing
# see https://github.com/gammapy/gammapy/pull/2533#issuecomment-553329049
def _set_spatial_errors(self, model):
d = self.data
if "Pos_err_68" in d:
percent = 0.68
semi_minor = d["Pos_err_68"]
semi_major = d["Pos_err_68"]
phi_0 = 0.0
else:
percent = 0.95
semi_minor = d["Conf_95_SemiMinor"]
semi_major = d["Conf_95_SemiMajor"]
phi_0 = d["Conf_95_PosAng"]
if np.isnan(phi_0):
phi_0 = 0.0 * u.deg
scale_1sigma = Gauss2DPDF().containment_radius(percent)
lat_err = semi_major / scale_1sigma
lon_err = semi_minor / scale_1sigma / np.cos(d["DEJ2000"])
if "TemplateSpatialModel" not in model.tag:
model.parameters["lon_0"].error = lon_err
model.parameters["lat_0"].error = lat_err
model.phi_0 = phi_0
def sky_model(self, name=None):
"""Sky model as a `~gammapy.modeling.models.SkyModel` object."""
if name is None:
name = self.name
return SkyModel(
spatial_model=self.spatial_model(),
spectral_model=self.spectral_model(),
name=name,
)
@property
def flux_points(self):
"""Flux points as a `~gammapy.estimators.FluxPoints` object."""
return FluxPoints.from_table(
table=self.flux_points_table,
reference_model=self.sky_model(),
format="gadf-sed",
)
[docs]
class SourceCatalogObject4FGL(SourceCatalogObjectFermiBase):
"""One source from the Fermi-LAT 4FGL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog4FGL`.
"""
asso = [
"ASSOC1",
"ASSOC2",
"ASSOC_TEV",
"ASSOC_FGL",
"ASSOC_FHL",
"ASSOC_GAM1",
"ASSOC_GAM2",
"ASSOC_GAM3",
]
def _info_more(self):
d = self.data
ss = "\n*** Other info ***\n\n"
fmt = "{:<32s} : {:.3f}\n"
ss += fmt.format("Significance (100 MeV - 1 TeV)", d["Signif_Avg"])
ss += "{:<32s} : {:.1f}\n".format("Npred", d["Npred"])
ss += "\n{:<20s} : {}\n".format("Other flags", d["Flags"])
return ss
def _info_spectral_fit(self):
d = self.data
spec_type = d["SpectrumType"].strip()
ss = "\n*** Spectral info ***\n\n"
ss += "{:<45s} : {}\n".format("Spectrum type", d["SpectrumType"])
fmt = "{:<45s} : {:.3f}\n"
ss += fmt.format("Detection significance (100 MeV - 1 TeV)", d["Signif_Avg"])
if spec_type == "PowerLaw":
tag = "PL"
elif spec_type == "LogParabola":
tag = "LP"
ss += "{:<45s} : {:.4f} +- {:.5f}\n".format(
"beta", d["LP_beta"], d["Unc_LP_beta"]
)
ss += "{:<45s} : {:.1f}\n".format("Significance curvature", d["LP_SigCurv"])
elif spec_type == "PLSuperExpCutoff":
tag = "PLEC"
fmt = "{:<45s} : {:.4f} +- {:.4f}\n"
if "PLEC_ExpfactorS" in d:
ss += fmt.format(
"Exponential factor", d["PLEC_ExpfactorS"], d["Unc_PLEC_ExpfactorS"]
)
else:
ss += fmt.format(
"Exponential factor", d["PLEC_Expfactor"], d["Unc_PLEC_Expfactor"]
)
ss += "{:<45s} : {:.4f} +- {:.4f}\n".format(
"Super-exponential cutoff index",
d["PLEC_Exp_Index"],
d["Unc_PLEC_Exp_Index"],
)
ss += "{:<45s} : {:.1f}\n".format(
"Significance curvature", d["PLEC_SigCurv"]
)
else:
raise ValueError(f"Invalid spec_type: {spec_type!r}")
ss += "{:<45s} : {:.0f} {}\n".format(
"Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
)
fmt = "{:<45s} : {:.3f} +- {:.3f}\n"
if f"{tag}_ExpfactorS" in d:
ss += fmt.format(
"Spectral index", d[tag + "_IndexS"], d["Unc_" + tag + "_IndexS"]
)
else:
ss += fmt.format(
"Spectral index", d[tag + "_Index"], d["Unc_" + tag + "_Index"]
)
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Flux Density at pivot energy",
d[tag + "_Flux_Density"].value,
d["Unc_" + tag + "_Flux_Density"].value,
"cm-2 MeV-1 s-1",
)
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Integral flux (1 - 100 GeV)",
d["Flux1000"].value,
d["Unc_Flux1000"].value,
"cm-2 s-1",
)
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Energy flux (100 MeV - 100 GeV)",
d["Energy_Flux100"].value,
d["Unc_Energy_Flux100"].value,
"erg cm-2 s-1",
)
return ss
def _info_lightcurve(self):
d = self.data
ss = "\n*** Lightcurve info ***\n\n"
ss += "Lightcurve measured in the energy band: 100 MeV - 100 GeV\n\n"
ss += "{:<15s} : {:.3f}\n".format("Variability index", d["Variability_Index"])
if np.isfinite(d["Flux_Peak"]):
ss += "{:<40s} : {:.3f}\n".format(
"Significance peak (100 MeV - 100 GeV)", d["Signif_Peak"]
)
fmt = "{:<40s} : {:.3} +- {:.3} cm^-2 s^-1\n"
ss += fmt.format(
"Integral flux peak (100 MeV - 100 GeV)",
d["Flux_Peak"].value,
d["Unc_Flux_Peak"].value,
)
# TODO: give time as UTC string, not MET
ss += "{:<40s} : {:.3} s (Mission elapsed time)\n".format(
"Time peak", d["Time_Peak"].value
)
peak_interval = d["Peak_Interval"].to_value("day")
ss += "{:<40s} : {:.3} day\n".format("Peak interval", peak_interval)
else:
ss += "\nNo peak measured for this source.\n"
# TODO: Add a lightcurve table with d['Flux_History'] and d['Unc_Flux_History']
return ss
[docs]
def spatial_model(self):
"""Spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
d = self.data
ra = d["RAJ2000"]
dec = d["DEJ2000"]
if self.is_pointlike:
model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
else:
de = self.data_extended
morph_type = de["Model_Form"].strip()
e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
sigma = de["Model_SemiMajor"]
phi = de["Model_PosAng"]
if morph_type == "Disk":
r_0 = de["Model_SemiMajor"]
model = DiskSpatialModel(
lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
)
elif morph_type in ["Map", "Ring", "2D Gaussian x2"]:
filename = de["Spatial_Filename"].strip() + ".gz"
if de["version"] < 28:
path_extended = "$GAMMAPY_DATA/catalogs/fermi/LAT_extended_sources_8years/Templates/"
elif de["version"] < 32:
path_extended = (
"$GAMMAPY_DATA/catalogs/fermi/Extended_12years/Templates/"
)
else:
path_extended = (
"$GAMMAPY_DATA/catalogs/fermi/Extended_14years/Templates/"
)
path = make_path(path_extended)
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", FITSFixedWarning)
model = TemplateSpatialModel.read(path / filename)
elif morph_type == "2D Gaussian":
model = GaussianSpatialModel(
lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
)
else:
raise ValueError(f"Invalid spatial model: {morph_type!r}")
self._set_spatial_errors(model)
return model
[docs]
def spectral_model(self):
"""Best fit spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
spec_type = self.data["SpectrumType"].strip()
if spec_type == "PowerLaw":
tag = "PowerLawSpectralModel"
pars = {
"reference": self.data["Pivot_Energy"],
"amplitude": self.data["PL_Flux_Density"],
"index": self.data["PL_Index"],
}
errs = {
"amplitude": self.data["Unc_PL_Flux_Density"],
"index": self.data["Unc_PL_Index"],
}
elif spec_type == "LogParabola":
tag = "LogParabolaSpectralModel"
pars = {
"reference": self.data["Pivot_Energy"],
"amplitude": self.data["LP_Flux_Density"],
"alpha": self.data["LP_Index"],
"beta": self.data["LP_beta"],
}
errs = {
"amplitude": self.data["Unc_LP_Flux_Density"],
"alpha": self.data["Unc_LP_Index"],
"beta": self.data["Unc_LP_beta"],
}
elif spec_type == "PLSuperExpCutoff":
if "PLEC_ExpfactorS" in self.data:
tag = "SuperExpCutoffPowerLaw4FGLDR3SpectralModel"
expfactor = self.data["PLEC_ExpfactorS"]
expfactor_err = self.data["Unc_PLEC_ExpfactorS"]
index_1 = self.data["PLEC_IndexS"]
index_1_err = self.data["Unc_PLEC_IndexS"]
else:
tag = "SuperExpCutoffPowerLaw4FGLSpectralModel"
expfactor = self.data["PLEC_Expfactor"]
expfactor_err = self.data["Unc_PLEC_Expfactor"]
index_1 = self.data["PLEC_Index"]
index_1_err = self.data["Unc_PLEC_Index"]
pars = {
"reference": self.data["Pivot_Energy"],
"amplitude": self.data["PLEC_Flux_Density"],
"index_1": index_1,
"index_2": self.data["PLEC_Exp_Index"],
"expfactor": expfactor,
}
errs = {
"amplitude": self.data["Unc_PLEC_Flux_Density"],
"index_1": index_1_err,
"index_2": np.nan_to_num(float(self.data["Unc_PLEC_Exp_Index"])),
"expfactor": expfactor_err,
}
else:
raise ValueError(f"Invalid spec_type: {spec_type!r}")
model = Model.create(tag, "spectral", **pars)
for name, value in errs.items():
model.parameters[name].error = value
return model
@property
def flux_points_table(self):
"""Flux points as a `~astropy.table.Table`."""
table = Table()
table.meta.update(self.flux_points_meta)
table["e_min"] = self.data["fp_energy_edges"][:-1]
table["e_max"] = self.data["fp_energy_edges"][1:]
flux = self._get_flux_values("Flux_Band")
flux_err = self._get_flux_values("Unc_Flux_Band")
table["flux"] = flux
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
nuFnu = self._get_flux_values("nuFnu_Band", "erg cm-2 s-1")
table["e2dnde"] = nuFnu
table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux)
table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
# handle upper limits
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
# handle upper limits
table["e2dnde_ul"] = np.nan * nuFnu.unit
e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
# Square root of test statistic
table["sqrt_ts"] = self.data["Sqrt_TS_Band"]
return table
def _get_flux_values(self, prefix, unit="cm-2 s-1"):
values = self.data[prefix]
return u.Quantity(values, unit)
[docs]
def lightcurve(self, interval="1-year"):
"""Lightcurve as a `~gammapy.estimators.FluxPoints` object.
Parameters
----------
interval : {'1-year', '2-month'}
Time interval of the lightcurve. Default is '1-year'.
Note that '2-month' is not available for all catalogue version.
"""
if interval == "1-year":
tag = "Flux_History"
if tag not in self.data or "time_axis" not in self.data:
raise ValueError(
"'1-year' interval is not available for this catalogue version"
)
time_axis = self.data["time_axis"]
tag_sqrt_ts = "Sqrt_TS_History"
elif interval == "2-month":
tag = "Flux2_History"
if tag not in self.data or "time_axis_2" not in self.data:
raise ValueError(
"2-month interval is not available for this catalog version"
)
time_axis = self.data["time_axis_2"]
tag_sqrt_ts = "Sqrt_TS2_History"
else:
raise ValueError("Time intervals available are '1-year' or '2-month'")
energy_axis = MapAxis.from_energy_edges([50, 300000] * u.MeV)
geom = RegionGeom.create(region=self.position, axes=[energy_axis, time_axis])
names = ["flux", "flux_errp", "flux_errn", "flux_ul", "ts"]
maps = Maps.from_geom(geom=geom, names=names)
maps["flux"].quantity = self.data[tag].reshape(geom.data_shape)
maps["flux_errp"].quantity = self.data[f"Unc_{tag}"][:, 1].reshape(
geom.data_shape
)
maps["flux_errn"].quantity = -self.data[f"Unc_{tag}"][:, 0].reshape(
geom.data_shape
)
maps["flux_ul"].quantity = compute_flux_points_ul(
maps["flux"].quantity, maps["flux_errp"].quantity
).reshape(geom.data_shape)
maps["ts"].quantity = (self.data[tag_sqrt_ts] ** 2).reshape(geom.data_shape)
return FluxPoints.from_maps(
maps=maps,
sed_type="flux",
reference_model=self.sky_model(),
meta=self.flux_points.meta.copy(),
)
[docs]
class SourceCatalogObject3FGL(SourceCatalogObjectFermiBase):
"""One source from the Fermi-LAT 3FGL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog3FGL`.
"""
_energy_edges = u.Quantity([100, 300, 1000, 3000, 10000, 100000], "MeV")
_energy_edges_suffix = [
"100_300",
"300_1000",
"1000_3000",
"3000_10000",
"10000_100000",
]
energy_range = u.Quantity([100, 100000], "MeV")
"""Energy range used for the catalog.
Paper says that analysis uses data up to 300 GeV,
but results are all quoted up to 100 GeV only to
be consistent with previous catalogs.
"""
def _info_more(self):
d = self.data
ss = "\n*** Other info ***\n\n"
ss += "{:<20s} : {}\n".format("Other flags", d["Flags"])
return ss
def _info_spectral_fit(self):
d = self.data
spec_type = d["SpectrumType"].strip()
ss = "\n*** Spectral info ***\n\n"
ss += "{:<45s} : {}\n".format("Spectrum type", d["SpectrumType"])
fmt = "{:<45s} : {:.3f}\n"
ss += fmt.format("Detection significance (100 MeV - 300 GeV)", d["Signif_Avg"])
ss += "{:<45s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"])
if spec_type == "PowerLaw":
pass
elif spec_type == "LogParabola":
ss += "{:<45s} : {} +- {}\n".format("beta", d["beta"], d["Unc_beta"])
elif spec_type in ["PLExpCutoff", "PlSuperExpCutoff"]:
fmt = "{:<45s} : {:.0f} +- {:.0f} {}\n"
ss += fmt.format(
"Cutoff energy",
d["Cutoff"].value,
d["Unc_Cutoff"].value,
d["Cutoff"].unit,
)
elif spec_type == "PLSuperExpCutoff":
ss += "{:<45s} : {} +- {}\n".format(
"Super-exponential cutoff index", d["Exp_Index"], d["Unc_Exp_Index"]
)
else:
raise ValueError(f"Invalid spec_type: {spec_type!r}")
ss += "{:<45s} : {:.0f} {}\n".format(
"Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
)
ss += "{:<45s} : {:.3f}\n".format(
"Power law spectral index", d["PowerLaw_Index"]
)
fmt = "{:<45s} : {:.3f} +- {:.3f}\n"
ss += fmt.format("Spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"])
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Flux Density at pivot energy",
d["Flux_Density"].value,
d["Unc_Flux_Density"].value,
"cm-2 MeV-1 s-1",
)
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Integral flux (1 - 100 GeV)",
d["Flux1000"].value,
d["Unc_Flux1000"].value,
"cm-2 s-1",
)
fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
ss += fmt.format(
"Energy flux (100 MeV - 100 GeV)",
d["Energy_Flux100"].value,
d["Unc_Energy_Flux100"].value,
"erg cm-2 s-1",
)
return ss
def _info_lightcurve(self):
d = self.data
ss = "\n*** Lightcurve info ***\n\n"
ss += "Lightcurve measured in the energy band: 100 MeV - 100 GeV\n\n"
ss += "{:<15s} : {:.3f}\n".format("Variability index", d["Variability_Index"])
if np.isfinite(d["Flux_Peak"]):
ss += "{:<40s} : {:.3f}\n".format(
"Significance peak (100 MeV - 100 GeV)", d["Signif_Peak"]
)
fmt = "{:<40s} : {:.3} +- {:.3} cm^-2 s^-1\n"
ss += fmt.format(
"Integral flux peak (100 MeV - 100 GeV)",
d["Flux_Peak"].value,
d["Unc_Flux_Peak"].value,
)
# TODO: give time as UTC string, not MET
ss += "{:<40s} : {:.3} s (Mission elapsed time)\n".format(
"Time peak", d["Time_Peak"].value
)
peak_interval = d["Peak_Interval"].to_value("day")
ss += "{:<40s} : {:.3} day\n".format("Peak interval", peak_interval)
else:
ss += "\nNo peak measured for this source.\n"
# TODO: Add a lightcurve table with d['Flux_History'] and d['Unc_Flux_History']
return ss
[docs]
def spectral_model(self):
"""Best fit spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
spec_type = self.data["SpectrumType"].strip()
if spec_type == "PowerLaw":
tag = "PowerLawSpectralModel"
pars = {
"amplitude": self.data["Flux_Density"],
"reference": self.data["Pivot_Energy"],
"index": self.data["Spectral_Index"],
}
errs = {
"amplitude": self.data["Unc_Flux_Density"],
"index": self.data["Unc_Spectral_Index"],
}
elif spec_type == "PLExpCutoff":
tag = "ExpCutoffPowerLaw3FGLSpectralModel"
pars = {
"amplitude": self.data["Flux_Density"],
"reference": self.data["Pivot_Energy"],
"index": self.data["Spectral_Index"],
"ecut": self.data["Cutoff"],
}
errs = {
"amplitude": self.data["Unc_Flux_Density"],
"index": self.data["Unc_Spectral_Index"],
"ecut": self.data["Unc_Cutoff"],
}
elif spec_type == "LogParabola":
tag = "LogParabolaSpectralModel"
pars = {
"amplitude": self.data["Flux_Density"],
"reference": self.data["Pivot_Energy"],
"alpha": self.data["Spectral_Index"],
"beta": self.data["beta"],
}
errs = {
"amplitude": self.data["Unc_Flux_Density"],
"alpha": self.data["Unc_Spectral_Index"],
"beta": self.data["Unc_beta"],
}
elif spec_type == "PLSuperExpCutoff":
tag = "SuperExpCutoffPowerLaw3FGLSpectralModel"
pars = {
"amplitude": self.data["Flux_Density"],
"reference": self.data["Pivot_Energy"],
"index_1": self.data["Spectral_Index"],
"index_2": self.data["Exp_Index"],
"ecut": self.data["Cutoff"],
}
errs = {
"amplitude": self.data["Unc_Flux_Density"],
"index_1": self.data["Unc_Spectral_Index"],
"index_2": self.data["Unc_Exp_Index"],
"ecut": self.data["Unc_Cutoff"],
}
else:
raise ValueError(f"Invalid spec_type: {spec_type!r}")
model = Model.create(tag, "spectral", **pars)
for name, value in errs.items():
model.parameters[name].error = value
return model
[docs]
def spatial_model(self):
"""Spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
d = self.data
ra = d["RAJ2000"]
dec = d["DEJ2000"]
if self.is_pointlike:
model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
else:
de = self.data_extended
morph_type = de["Model_Form"].strip()
e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
sigma = de["Model_SemiMajor"]
phi = de["Model_PosAng"]
if morph_type == "Disk":
r_0 = de["Model_SemiMajor"]
model = DiskSpatialModel(
lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
)
elif morph_type in ["Map", "Ring", "2D Gaussian x2"]:
filename = de["Spatial_Filename"].strip()
path = make_path(
"$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v15/Templates/"
)
model = TemplateSpatialModel.read(path / filename)
elif morph_type == "2D Gaussian":
model = GaussianSpatialModel(
lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
)
else:
raise ValueError(f"Invalid spatial model: {morph_type!r}")
self._set_spatial_errors(model)
return model
@property
def flux_points_table(self):
"""Flux points as a `~astropy.table.Table`."""
table = Table()
table.meta.update(self.flux_points_meta)
table["e_min"] = self._energy_edges[:-1]
table["e_max"] = self._energy_edges[1:]
flux = self._get_flux_values("Flux")
flux_err = self._get_flux_values("Unc_Flux")
table["flux"] = flux
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
nuFnu = self._get_flux_values("nuFnu", "erg cm-2 s-1")
table["e2dnde"] = nuFnu
table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux)
table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
# handle upper limits
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
# handle upper limits
table["e2dnde_ul"] = np.nan * nuFnu.unit
e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
# Square root of test statistic
table["sqrt_ts"] = [self.data["Sqrt_TS" + _] for _ in self._energy_edges_suffix]
return table
def _get_flux_values(self, prefix, unit="cm-2 s-1"):
values = [self.data[prefix + _] for _ in self._energy_edges_suffix]
return u.Quantity(values, unit)
[docs]
def lightcurve(self):
"""Lightcurve as a `~gammapy.estimators.FluxPoints` object."""
time_axis = self.data["time_axis"]
tag = "Flux_History"
energy_axis = MapAxis.from_energy_edges(self.energy_range)
geom = RegionGeom.create(region=self.position, axes=[energy_axis, time_axis])
names = ["flux", "flux_errp", "flux_errn", "flux_ul"]
maps = Maps.from_geom(geom=geom, names=names)
maps["flux"].quantity = self.data[tag].reshape(geom.data_shape)
maps["flux_errp"].quantity = self.data[f"Unc_{tag}"][:, 1].reshape(
geom.data_shape
)
maps["flux_errn"].quantity = -self.data[f"Unc_{tag}"][:, 0].reshape(
geom.data_shape
)
maps["flux_ul"].quantity = compute_flux_points_ul(
maps["flux"].quantity, maps["flux_errp"].quantity
).reshape(geom.data_shape)
is_ul = np.isnan(maps["flux_errn"])
maps["flux_ul"].data[~is_ul] = np.nan
return FluxPoints.from_maps(
maps=maps,
sed_type="flux",
reference_model=self.sky_model(),
meta=self.flux_points_meta.copy(),
)
[docs]
class SourceCatalogObject2FHL(SourceCatalogObjectFermiBase):
"""One source from the Fermi-LAT 2FHL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog2FHL`.
"""
asso = ["ASSOC", "3FGL_Name", "1FHL_Name", "TeVCat_Name"]
_energy_edges = u.Quantity([50, 171, 585, 2000], "GeV")
_energy_edges_suffix = ["50_171", "171_585", "585_2000"]
energy_range = u.Quantity([0.05, 2], "TeV")
"""Energy range used for the catalog."""
def _info_more(self):
d = self.data
ss = "\n*** Other info ***\n\n"
fmt = "{:<32s} : {:.3f}\n"
ss += fmt.format("Test statistic (50 GeV - 2 TeV)", d["TS"])
return ss
def _info_position(self):
d = self.data
ss = "\n*** Position info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
ss += "\n"
ss += "{:<20s} : {:.4f}\n".format("Error on position (68%)", d["Pos_err_68"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI"])
return ss
def _info_spectral_fit(self):
d = self.data
ss = "\n*** Spectral fit info ***\n\n"
fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
ss += fmt.format(
"Power-law spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"]
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Integral flux (50 GeV - 2 TeV)",
d["Flux50"].value,
d["Unc_Flux50"].value,
"cm-2 s-1",
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Energy flux (50 GeV - 2 TeV)",
d["Energy_Flux50"].value,
d["Unc_Energy_Flux50"].value,
"erg cm-2 s-1",
)
return ss
@property
def is_pointlike(self):
return self.data["Source_Name"].strip()[-1] != "e"
[docs]
def spatial_model(self):
"""Spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
d = self.data
ra = d["RAJ2000"]
dec = d["DEJ2000"]
if self.is_pointlike:
model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
else:
de = self.data_extended
morph_type = de["Model_Form"].strip()
e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
sigma = de["Model_SemiMajor"]
phi = de["Model_PosAng"]
if morph_type in ["Disk", "Elliptical Disk"]:
r_0 = de["Model_SemiMajor"]
model = DiskSpatialModel(
lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
)
elif morph_type in ["Map", "Ring", "2D Gaussian x2"]:
filename = de["Spatial_Filename"].strip()
path = make_path(
"$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v15/Templates/"
)
return TemplateSpatialModel.read(path / filename)
elif morph_type in ["2D Gaussian", "Elliptical 2D Gaussian"]:
model = GaussianSpatialModel(
lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
)
else:
raise ValueError(f"Invalid spatial model: {morph_type!r}")
self._set_spatial_errors(model)
return model
[docs]
def spectral_model(self):
"""Best fit spectral model as a `~gammapy.modeling.models.SpectralModel`."""
tag = "PowerLaw2SpectralModel"
pars = {
"amplitude": self.data["Flux50"],
"emin": self.energy_range[0],
"emax": self.energy_range[1],
"index": self.data["Spectral_Index"],
}
errs = {
"amplitude": self.data["Unc_Flux50"],
"index": self.data["Unc_Spectral_Index"],
}
model = Model.create(tag, "spectral", **pars)
for name, value in errs.items():
model.parameters[name].error = value
return model
@property
def flux_points_table(self):
"""Flux points as a `~astropy.table.Table`."""
table = Table()
table.meta.update(self.flux_points_meta)
table["e_min"] = self._energy_edges[:-1]
table["e_max"] = self._energy_edges[1:]
table["flux"] = self._get_flux_values("Flux")
flux_err = self._get_flux_values("Unc_Flux")
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
# handle upper limits
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
return table
def _get_flux_values(self, prefix, unit="cm-2 s-1"):
values = [self.data[prefix + _ + "GeV"] for _ in self._energy_edges_suffix]
return u.Quantity(values, unit)
[docs]
class SourceCatalogObject3FHL(SourceCatalogObjectFermiBase):
"""One source from the Fermi-LAT 3FHL catalog.
Catalog is represented by `~gammapy.catalog.SourceCatalog3FHL`.
"""
asso = ["ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM"]
energy_range = u.Quantity([0.01, 2], "TeV")
"""Energy range used for the catalog."""
_energy_edges = u.Quantity([10, 20, 50, 150, 500, 2000], "GeV")
def _info_position(self):
d = self.data
ss = "\n*** Position info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
# TODO: All sources are non-elliptical; just give one number for radius?
ss += "\n"
ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"])
ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"])
ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"])
ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"])
return ss
def _info_spectral_fit(self):
d = self.data
spec_type = d["SpectrumType"].strip()
ss = "\n*** Spectral fit info ***\n\n"
ss += "{:<32s} : {}\n".format("Spectrum type", d["SpectrumType"])
ss += "{:<32s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"])
# Power-law parameters are always given; give in any case
fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
ss += fmt.format(
"Power-law spectral index", d["PowerLaw_Index"], d["Unc_PowerLaw_Index"]
)
if spec_type == "PowerLaw":
pass
elif spec_type == "LogParabola":
fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
ss += fmt.format(
"LogParabolaSpectralModel spectral index",
d["Spectral_Index"],
d["Unc_Spectral_Index"],
)
ss += "{:<32s} : {:.3f} +- {:.3f}\n".format(
"LogParabolaSpectralModel beta", d["beta"], d["Unc_beta"]
)
else:
raise ValueError(f"Invalid spec_type: {spec_type!r}")
ss += "{:<32s} : {:.1f} {}\n".format(
"Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Flux Density at pivot energy",
d["Flux_Density"].value,
d["Unc_Flux_Density"].value,
"cm-2 GeV-1 s-1",
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Integral flux (10 GeV - 1 TeV)",
d["Flux"].value,
d["Unc_Flux"].value,
"cm-2 s-1",
)
ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
"Energy flux (10 GeV - TeV)",
d["Energy_Flux"].value,
d["Unc_Energy_Flux"].value,
"erg cm-2 s-1",
)
return ss
def _info_more(self):
d = self.data
ss = "\n*** Other info ***\n\n"
fmt = "{:<32s} : {:.3f}\n"
ss += fmt.format("Significance (10 GeV - 2 TeV)", d["Signif_Avg"])
ss += "{:<32s} : {:.1f}\n".format("Npred", d["Npred"])
ss += "\n{:<16s} : {:.3f} {}\n".format(
"HEP Energy", d["HEP_Energy"].value, d["HEP_Energy"].unit
)
ss += "{:<16s} : {:.3f}\n".format("HEP Probability", d["HEP_Prob"])
ss += "{:<16s} : {}\n".format("Bayesian Blocks", d["Variability_BayesBlocks"])
ss += "{:<16s} : {:.3f}\n".format("Redshift", d["Redshift"])
ss += "{:<16s} : {:.3} {}\n".format(
"NuPeak_obs", d["NuPeak_obs"].value, d["NuPeak_obs"].unit
)
return ss
[docs]
def spectral_model(self):
"""Best fit spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
d = self.data
spec_type = self.data["SpectrumType"].strip()
if spec_type == "PowerLaw":
tag = "PowerLawSpectralModel"
pars = {
"reference": d["Pivot_Energy"],
"amplitude": d["Flux_Density"],
"index": d["PowerLaw_Index"],
}
errs = {
"amplitude": d["Unc_Flux_Density"],
"index": d["Unc_PowerLaw_Index"],
}
elif spec_type == "LogParabola":
tag = "LogParabolaSpectralModel"
pars = {
"reference": d["Pivot_Energy"],
"amplitude": d["Flux_Density"],
"alpha": d["Spectral_Index"],
"beta": d["beta"],
}
errs = {
"amplitude": d["Unc_Flux_Density"],
"alpha": d["Unc_Spectral_Index"],
"beta": d["Unc_beta"],
}
else:
raise ValueError(f"Invalid spec_type: {spec_type!r}")
model = Model.create(tag, "spectral", **pars)
for name, value in errs.items():
model.parameters[name].error = value
return model
@property
def flux_points_table(self):
"""Flux points as a `~astropy.table.Table`."""
table = Table()
table.meta.update(self.flux_points_meta)
table["e_min"] = self._energy_edges[:-1]
table["e_max"] = self._energy_edges[1:]
flux = self.data["Flux_Band"]
flux_err = self.data["Unc_Flux_Band"]
e2dnde = self.data["nuFnu"]
table["flux"] = flux
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
table["e2dnde"] = e2dnde
table["e2dnde_errn"] = np.abs(e2dnde * flux_err[:, 0] / flux)
table["e2dnde_errp"] = e2dnde * flux_err[:, 1] / flux
is_ul = np.isnan(table["flux_errn"])
table["is_ul"] = is_ul
# handle upper limits
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
table["e2dnde_ul"] = np.nan * e2dnde.unit
e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
# Square root of test statistic
table["sqrt_ts"] = self.data["Sqrt_TS_Band"]
return table
[docs]
def spatial_model(self):
"""Source spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
d = self.data
ra = d["RAJ2000"]
dec = d["DEJ2000"]
if self.is_pointlike:
model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
else:
de = self.data_extended
morph_type = de["Spatial_Function"].strip()
e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
sigma = de["Model_SemiMajor"]
phi = de["Model_PosAng"]
if morph_type == "RadialDisk":
r_0 = de["Model_SemiMajor"]
model = DiskSpatialModel(
lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
)
elif morph_type in ["SpatialMap"]:
filename = de["Spatial_Filename"].strip()
path = make_path(
"$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v18/Templates/"
)
model = TemplateSpatialModel.read(path / filename)
elif morph_type == "RadialGauss":
model = GaussianSpatialModel(
lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
)
else:
raise ValueError(f"Invalid morph_type: {morph_type!r}")
self._set_spatial_errors(model)
return model
[docs]
class SourceCatalogObject2PC(SourceCatalogObjectFermiPCBase):
"""One source from the 2PC catalog."""
@property
def _auxiliary_filename(self):
return make_path(
f"$GAMMAPY_DATA/catalogs/fermi/2PC_auxiliary/PSR{self.name}_2PC_data.fits.gz"
)
def _info_more(self):
d = self.data
ss = "\n*** Other info ***\n\n"
ss += "{:<20s} : {:s}\n".format("Binary", d["Binary"])
return ss
def _info_pulsar(self):
d = self.data
ss = "\n*** Pulsar info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("Period", d["Period"])
ss += "{:<20s} : {:.3e}\n".format("P_Dot", d["P_Dot"])
ss += "{:<20s} : {:.3e}\n".format("E_Dot", d["E_Dot"])
ss += "{:<20s} : {}\n".format("Type", d["Type"])
return ss
def _info_spectral_fit(self):
d = self.data_spectral
ss = "\n*** Spectral info ***\n\n"
if d is None:
ss += "No spectral info available.\n"
return ss
ss += "{:<20s} : {}\n".format("On peak", d["On_Peak"])
ss += "{:<20s} : {:.0f}\n".format("TS DC", d["TS_DC"])
ss += "{:<20s} : {:.0f}\n".format("TS cutoff", d["TS_Cutoff"])
ss += "{:<20s} : {:.0f}\n".format("TS b free", d["TS_bfree"])
indentation = " " * 4
fmt_e = "{}{:<20s} : {:.3e} +- {:.3e}\n"
fmt_f = "{}{:<20s} : {:.3f} +- {:.3f}\n"
if not isinstance(d["PLEC1_Prefactor"], np.ma.core.MaskedConstant):
ss += "\n{}* PLSuperExpCutoff b = 1 *\n\n".format(indentation)
ss += fmt_e.format(
indentation, "Amplitude", d["PLEC1_Prefactor"], d["Unc_PLEC1_Prefactor"]
)
ss += fmt_f.format(
indentation,
"Index 1",
d["PLEC1_Photon_Index"],
d["Unc_PLEC1_Photon_Index"],
)
ss += "{}{:<20s} : {:.3f}\n".format(indentation, "Index 2", 1)
ss += "{}{:<20s} : {:.3f}\n".format(
indentation, "Reference", d["PLEC1_Scale"]
)
ss += fmt_f.format(
indentation, "Ecut", d["PLEC1_Cutoff"], d["Unc_PLEC1_Cutoff"]
)
if not isinstance(d["PLEC_Prefactor"], np.ma.core.MaskedConstant):
ss += "\n{}* PLSuperExpCutoff b free *\n\n".format(indentation)
ss += fmt_e.format(
indentation, "Amplitude", d["PLEC_Prefactor"], d["Unc_PLEC_Prefactor"]
)
ss += fmt_f.format(
indentation,
"Index 1",
d["PLEC_Photon_Index"],
d["Unc_PLEC_Photon_Index"],
)
ss += fmt_f.format(
indentation,
"Index 2",
d["PLEC_Exponential_Index"],
d["Unc_PLEC_Exponential_Index"],
)
ss += "{}{:<20s} : {:.3f}\n".format(
indentation, "Reference", d["PLEC_Scale"]
)
ss += fmt_f.format(
indentation, "Ecut", d["PLEC_Cutoff"], d["Unc_PLEC_Cutoff"]
)
if not isinstance(d["PL_Prefactor"], np.ma.core.MaskedConstant):
ss += "\n{}* PowerLaw *\n\n".format(indentation)
ss += fmt_e.format(
indentation, "Amplitude", d["PL_Prefactor"], d["Unc_PL_Prefactor"]
)
ss += fmt_f.format(
indentation, "Index", d["PL_Photon_Index"], d["Unc_PL_Photon_Index"]
)
ss += "{}{:<20s} : {:.3f}\n".format(indentation, "Reference", d["PL_Scale"])
return ss
def _info_phasogram(self):
d = self.data
ss = "\n*** Phasogram info ***\n\n"
ss += "{:<20s} : {:d}\n".format("Number of peaks", d["Num_Peaks"])
ss += "{:<20s} : {:.3f}\n".format("Peak separation", d["Peak_Sep"])
return ss
[docs]
def spectral_model(self):
d = self.data_spectral
if d is None:
log.warning(f"No spectral model available for source {self.name}")
return None
if d["TS_Cutoff"] < 9:
tag = "PowerLawSpectralModel"
pars = {
"reference": d["PL_Scale"],
"amplitude": d["PL_Prefactor"],
"index": d["PL_Photon_Index"],
}
errs = {
"amplitude": d["Unc_PL_Prefactor"],
"index": d["Unc_PL_Photon_Index"],
}
elif d["TS_bfree"] >= 9:
tag = "SuperExpCutoffPowerLaw3FGLSpectralModel"
pars = {
"index_1": d["PLEC_Photon_Index"],
"index_2": d["PLEC_Exponential_Index"],
"amplitude": d["PLEC_Prefactor"],
"reference": d["PLEC_Scale"],
"ecut": d["PLEC_Cutoff"],
}
errs = {
"index_1": d["Unc_PLEC_Photon_Index"],
"index_2": d["Unc_PLEC_Exponential_Index"],
"amplitude": d["Unc_PLEC_Prefactor"],
"ecut": d["Unc_PLEC_Cutoff"],
}
elif d["TS_bfree"] < 9:
tag = "SuperExpCutoffPowerLaw3FGLSpectralModel"
pars = {
"index_1": d["PLEC1_Photon_Index"],
"index_2": 1,
"amplitude": d["PLEC1_Prefactor"],
"reference": d["PLEC1_Scale"],
"ecut": d["PLEC1_Cutoff"],
}
errs = {
"index_1": d["Unc_PLEC1_Photon_Index"],
"amplitude": d["Unc_PLEC1_Prefactor"],
"ecut": d["Unc_PLEC1_Cutoff"],
}
else:
log.warning(f"No spectral model available for source {self.name}")
return None
model = Model.create(tag, "spectral", **pars)
for name, value in errs.items():
model.parameters[name].error = value
return model
@property
def flux_points_table(self):
"""Flux points (`~astropy.table.Table`)."""
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", u.UnitsWarning)
fp_data = Table.read(self._auxiliary_filename, hdu="PULSAR_SED")
except (KeyError, FileNotFoundError):
log.warning(f"No flux points available for source {self.name}")
return None
table = Table()
table["e_min"] = fp_data["Energy_Min"]
table["e_max"] = fp_data["Energy_Max"]
table["e_ref"] = fp_data["Center_Energy"]
table["flux"] = fp_data["PhotonFlux"]
table["flux_err"] = fp_data["Unc_PhotonFlux"]
table["e2dnde"] = fp_data["EnergyFlux"]
table["e2dnde_err"] = fp_data["Unc_EnergyFlux"]
is_ul = np.where(table["e2dnde_err"] == 0, True, False)
table["is_ul"] = is_ul
table["flux_ul"] = np.nan * table["flux_err"].unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_err"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
table["e2dnde_ul"] = np.nan * table["e2dnde"].unit
e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_err"])
table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
return table
[docs]
class SourceCatalogObject3PC(SourceCatalogObjectFermiPCBase):
"""One source from the 3PC catalog."""
asso = ["assoc_new"]
_energy_edges = u.Quantity([50, 100, 300, 1_000, 3e3, 1e4, 3e4, 1e5, 1e6], "MeV")
@property
def _auxiliary_filename(self):
return make_path(
f"$GAMMAPY_DATA/catalogs/fermi/3PC_auxiliary_20230728/{self.name}_3PC_data.fits.gz"
)
def _info_pulsar(self):
d = self.data
ss = "\n*** Pulsar info ***\n\n"
ss += "{:<20s} : {:.3f}\n".format("Period", d["P0"])
ss += "{:<20s} : {:.3e}\n".format("P_Dot", d["P1"])
ss += "{:<20s} : {:.3e}\n".format("E_Dot", d["EDOT"])
return ss
def _info_phasogram(self):
d = self.data
ss = "\n*** Phasogram info ***\n\n"
if not isinstance(d["NPEAK"], np.ma.core.MaskedConstant):
npeak = d["NPEAK"]
ss += "{:<20s} : {:.3f}\n".format("Number of peaks", npeak)
if npeak > 1:
ss += "{:<20s} : {:.3f}\n".format("Ph1 (peak one)", d["PHI1"])
ss += "{:<20s} : {:.3f}\n".format(
"Ph2 (peak two)", d["PHI1"] + d["PKSEP"]
)
ss += "{:<20s} : {:.3f}\n".format("Peak separation", d["PKSEP"])
else:
if not isinstance(d["PHI1"], np.ma.core.MaskedConstant):
ss += "{:<20s} : {:.3f}\n".format("Ph1 (peak one)", d["PHI1"])
else:
ss += "No phasogram info available.\n"
return ss
def _info_spectral_fit(self):
d = self.data_spectral
ss = "\n*** Spectral info ***\n\n"
if d is None:
ss += "No spectral info available.\n"
return ss
ss += "{:<20s} : {:.0f}\n".format("TS", d["Test_Statistic"])
ss += "{:<20s} : {:.0f}\n".format("Significance (DC)", d["Signif_Avg"])
ss += "{:<20s} : {:s}\n".format("Spectrum Type", d["SpectrumType"])
indentation = " " * 4
fmt_e = "{}{:<20s} : {:.3e} +- {:.3e}\n"
fmt_f = "{}{:<20s} : {:.3f} +- {:.3f}\n"
if not isinstance(d["PLEC_Flux_Density_b23"], np.ma.core.MaskedConstant):
ss += "\n{}* SuperExpCutoffPowerLaw4FGLDR3 b = 2/3 *\n\n".format(
indentation
)
ss += fmt_e.format(
indentation,
"Amplitude",
d["PLEC_Flux_Density_b23"],
d["Unc_PLEC_Flux_Density_b23"],
)
ss += fmt_f.format(
indentation,
"Index 1",
-d["PLEC_IndexS_b23"],
d["Unc_PLEC_IndexS_b23"],
)
ss += "{}{:<20s} : {:.3f}\n".format(indentation, "Index 2", 0.6667)
ss += "{}{:<20s} : {:.3f}\n".format(
indentation, "Reference", d["Pivot_Energy_b23"]
)
ss += fmt_f.format(
indentation,
"Expfactor",
d["PLEC_ExpfactorS_b23"],
d["Unc_PLEC_ExpfactorS_b23"],
)
if not isinstance(d["PLEC_Flux_Density_bfr"], np.ma.core.MaskedConstant):
ss += "\n{}* SuperExpCutoffPowerLaw4FGLDR3 b free *\n\n".format(indentation)
ss += fmt_e.format(
indentation,
"Amplitude",
d["PLEC_Flux_Density_bfr"],
d["Unc_PLEC_Flux_Density_bfr"],
)
ss += fmt_f.format(
indentation,
"Index 1",
-d["PLEC_IndexS_bfr"],
d["Unc_PLEC_IndexS_bfr"],
)
ss += fmt_f.format(
indentation,
"Index 2",
d["PLEC_Exp_Index_bfr"],
d["Unc_PLEC_Exp_Index_bfr"],
)
ss += "{}{:<20s} : {:.3f}\n".format(
indentation, "Reference", d["Pivot_Energy_bfr"]
)
ss += fmt_f.format(
indentation,
"Expfactor",
d["PLEC_ExpfactorS_bfr"],
d["Unc_PLEC_ExpfactorS_bfr"],
)
return ss
[docs]
def spectral_model(self, fit="auto"):
"""
In the 3PC, Fermi-LAT collaboration tried to fit a
`~gammapy.modelling.models.SuperExpCutoffPowerLaw4FGLDR3SpectralModel` with the
exponential index `index_2` free, or fixed to 2/3. These two models are referred
as "b free" and "b 23". For most pulsars, both models are available. However,
in some cases the "b free" model did not fit correctly.
Parameters
----------
fit : str, optional
Which fitted model to return. The user can choose between "auto", "b free"
and "b 23". "auto" will always try to return the "b free" first and fall
back to the "b 23" fit if "b free" is not available. Default is "auto".
"""
d = self.data_spectral
if d is None or d["SpectrumType"] != "PLSuperExpCutoff4":
log.warning(f"No spectral model available for source {self.name}")
return None
tag = "SuperExpCutoffPowerLaw4FGLDR3SpectralModel"
if not (
isinstance(d["PLEC_IndexS_bfr"], np.ma.core.masked_array) or (fit == "b 23")
):
pars = {
"reference": d["Pivot_Energy_bfr"],
"amplitude": d["PLEC_Flux_Density_bfr"],
"index_1": -d["PLEC_IndexS_bfr"],
"index_2": d["PLEC_Exp_Index_bfr"],
"expfactor": d["PLEC_ExpfactorS_bfr"],
}
errs = {
"amplitude": d["Unc_PLEC_Flux_Density_bfr"],
"index_1": d["Unc_PLEC_IndexS_bfr"],
"index_2": d["Unc_PLEC_Exp_Index_bfr"],
"expfactor": d["Unc_PLEC_ExpfactorS_bfr"],
}
else:
pars = {
"reference": d["Pivot_Energy_b23"],
"amplitude": d["PLEC_Flux_Density_b23"],
"index_1": -d["PLEC_IndexS_b23"],
"index_2": d["PLEC_Exp_Index_b23"],
"expfactor": d["PLEC_ExpfactorS_b23"],
}
errs = {
"amplitude": d["Unc_PLEC_Flux_Density_b23"],
"index_1": d["Unc_PLEC_IndexS_b23"],
"expfactor": d["Unc_PLEC_ExpfactorS_b23"],
}
model = Model.create(tag, "spectral", **pars)
for name, value in errs.items():
model.parameters[name].error = value
return model
@property
def flux_points_table(self):
"""Flux points (`~astropy.table.Table`). Flux point is an upper limit if
its significance is less than 2."""
fp_data = self.data_spectral
if fp_data is None:
log.warning(f"No flux points available for source {self.name}")
return None
table = Table()
table["e_min"] = self._energy_edges[:-1]
table["e_max"] = self._energy_edges[1:]
table["e_ref"] = np.sqrt(table["e_min"] * table["e_max"])
fgl_cols = ["Flux_Band", "Unc_Flux_Band", "Sqrt_TS_Band", "nuFnu_Band"]
flux, flux_err, sig, nuFnu = [fp_data[col] for col in fgl_cols]
table["flux"] = flux
table["flux_errn"] = np.abs(flux_err[:, 0])
table["flux_errp"] = flux_err[:, 1]
table["e2dnde"] = nuFnu
table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux)
table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux
is_ul = np.isnan(flux_err[:, 0]) | (sig < 2)
table["is_ul"] = is_ul
table["flux_ul"] = np.nan * flux_err.unit
flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
table["flux_ul"][is_ul] = flux_ul[is_ul]
table["e2dnde_ul"] = np.nan * table["e2dnde"].unit
e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
table["sqrt_ts"] = fp_data["Sqrt_TS_Band"]
return table
[docs]
class SourceCatalog3FGL(SourceCatalog):
"""Fermi-LAT 3FGL source catalog.
- https://ui.adsabs.harvard.edu/abs/2015ApJS..218...23A
- https://fermi.gsfc.nasa.gov/ssc/data/access/lat/4yr_catalog/
One source is represented by `~gammapy.catalog.SourceCatalogObject3FGL`.
"""
tag = "3fgl"
description = "LAT 4-year point source catalog"
source_object_class = SourceCatalogObject3FGL
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psc_v16.fit.gz"):
filename = make_path(filename)
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", u.UnitsWarning)
table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = (
"Extended_Source_Name",
"0FGL_Name",
"1FGL_Name",
"2FGL_Name",
"1FHL_Name",
"ASSOC_TEV",
"ASSOC1",
"ASSOC2",
)
super().__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
self.hist_table = Table.read(filename, hdu="Hist_Start")
[docs]
class SourceCatalog4FGL(SourceCatalog):
"""Fermi-LAT 4FGL source catalog.
- https://arxiv.org/abs/1902.10045 (DR1)
- https://arxiv.org/abs/2005.11208 (DR2)
- https://arxiv.org/abs/2201.11184 (DR3)
- https://arxiv.org/abs/2307.12546 (DR4)
By default we use the file of the DR4 initial release
from https://fermi.gsfc.nasa.gov/ssc/data/access/lat/14yr_catalog/
One source is represented by `~gammapy.catalog.SourceCatalogObject4FGL`.
"""
tag = "4fgl"
description = "LAT 14-year point source catalog"
source_object_class = SourceCatalogObject4FGL
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psc_v32.fit.gz"):
filename = make_path(filename)
table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = (
"Extended_Source_Name",
"ASSOC_FGL",
"ASSOC_FHL",
"ASSOC_GAM1",
"ASSOC_GAM2",
"ASSOC_GAM3",
"ASSOC_TEV",
"ASSOC1",
"ASSOC2",
)
super().__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
self.extended_sources_table["version"] = int(
"".join(filter(str.isdigit, table.meta["VERSION"]))
)
try:
self.hist_table = Table.read(filename, hdu="Hist_Start")
if "MJDREFI" not in self.hist_table.meta:
self.hist_table.meta = Table.read(filename, hdu="GTI").meta
except KeyError:
pass
try:
self.hist2_table = Table.read(filename, hdu="Hist2_Start")
if "MJDREFI" not in self.hist_table.meta:
self.hist2_table.meta = Table.read(filename, hdu="GTI").meta
except KeyError:
pass
table = Table.read(filename, hdu="EnergyBounds")
self.flux_points_energy_edges = np.unique(
np.c_[table["LowerEnergy"].quantity, table["UpperEnergy"].quantity]
)
[docs]
class SourceCatalog2FHL(SourceCatalog):
"""Fermi-LAT 2FHL source catalog.
- https://ui.adsabs.harvard.edu/abs/2016ApJS..222....5A
- https://fermi.gsfc.nasa.gov/ssc/data/access/lat/2FHL/
One source is represented by `~gammapy.catalog.SourceCatalogObject2FHL`.
"""
tag = "2fhl"
description = "LAT second high-energy source catalog"
source_object_class = SourceCatalogObject2FHL
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v09.fit.gz"):
filename = make_path(filename)
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", u.UnitsWarning)
table = Table.read(filename, hdu="2FHL Source Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = ("ASSOC", "3FGL_Name", "1FHL_Name", "TeVCat_Name")
super().__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.extended_sources_table = Table.read(filename, hdu="Extended Sources")
self.rois = Table.read(filename, hdu="ROIs")
[docs]
class SourceCatalog3FHL(SourceCatalog):
"""Fermi-LAT 3FHL source catalog.
- https://ui.adsabs.harvard.edu/abs/2017ApJS..232...18A
- https://fermi.gsfc.nasa.gov/ssc/data/access/lat/3FHL/
One source is represented by `~gammapy.catalog.SourceCatalogObject3FHL`.
"""
tag = "3fhl"
description = "LAT third high-energy source catalog"
source_object_class = SourceCatalogObject3FHL
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v13.fit.gz"):
filename = make_path(filename)
with warnings.catch_warnings(): # ignore FITS units warnings
warnings.simplefilter("ignore", u.UnitsWarning)
table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
table_standardise_units_inplace(table)
source_name_key = "Source_Name"
source_name_alias = ("ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM")
super().__init__(
table=table,
source_name_key=source_name_key,
source_name_alias=source_name_alias,
)
self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
self.rois = Table.read(filename, hdu="ROIs")
self.energy_bounds_table = Table.read(filename, hdu="EnergyBounds")
[docs]
class SourceCatalog2PC(SourceCatalog):
"""Fermi-LAT 2nd pulsar catalog.
- https://ui.adsabs.harvard.edu/abs/2013ApJS..208...17A
- https://fermi.gsfc.nasa.gov/ssc/data/access/lat/2nd_PSR_catalog/
One source is represented by `~gammapy.catalog.SourceCatalogObject2PC`.
"""
tag = "2PC"
description = "LAT 2nd pulsar catalog"
source_object_class = SourceCatalogObject2PC
def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/2PC_catalog_v04.fits.gz"):
filename = make_path(filename)
with warnings.catch_warnings():
warnings.simplefilter("ignore", u.UnitsWarning)
table_psr = Table.read(filename, hdu="PULSAR_CATALOG")
table_spectral = Table.read(filename, hdu="SPECTRAL")
table_off_peak = Table.read(filename, hdu="OFF_PEAK")
table_standardise_units_inplace(table_psr)
table_standardise_units_inplace(table_spectral)
table_standardise_units_inplace(table_off_peak)
source_name_key = "PSR_Name"
super().__init__(table=table_psr, source_name_key=source_name_key)
self.source_object_class._source_name_key = source_name_key
self.off_peak_table = table_off_peak
self.spectral_table = table_spectral
[docs]
def to_models(self, **kwargs):
models = Models()
for m in self:
sky_model = m.sky_model()
if sky_model is not None:
models.append(sky_model)
return models
def _get_name_spectral(self, data):
return f"{data[self._source_name_key].strip()}"
[docs]
class SourceCatalog3PC(SourceCatalog):
"""Fermi-LAT 3rd pulsar catalog.
- https://arxiv.org/abs/2307.11132
- https://fermi.gsfc.nasa.gov/ssc/data/access/lat/3rd_PSR_catalog/
One source is represented by `~gammapy.catalog.SourceCatalogObject3PC`.
"""
tag = "3PC"
description = "LAT 3rd pulsar catalog"
source_object_class = SourceCatalogObject3PC
def __init__(
self, filename="$GAMMAPY_DATA/catalogs/fermi/3PC_Catalog+SEDs_20230803.fits.gz"
):
filename = make_path(filename)
with warnings.catch_warnings():
warnings.simplefilter("ignore", u.UnitsWarning)
table_psr = Table.read(filename, hdu="PULSARS_BIGFILE")
table_spectral = Table.read(filename, hdu="LAT_Point_Source_Catalog")
table_bigfile_config = Table.read(filename, hdu="BIGFILE_CONFIG")
table_standardise_units_inplace(table_psr)
table_standardise_units_inplace(table_spectral)
table_standardise_units_inplace(table_bigfile_config)
source_name_key = "PSRJ"
super().__init__(table=table_psr, source_name_key=source_name_key)
self.source_object_class._source_name_key = source_name_key
self.spectral_table = table_spectral
self.off_bigfile_config = table_bigfile_config
[docs]
def to_models(self, **kwargs):
models = Models()
for m in self:
sky_model = m.sky_model()
if sky_model is not None:
models.append(sky_model)
return models
@property
def _get_source_name_key(self):
return "NickName"
def _get_name_spectral(self, data):
return f"PSR{data[self._source_name_key].strip()}"