# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Fermi catalog and source classes."""
import abc
import logging
import warnings
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.wcs import FITSFixedWarning
from gammapy.estimators import FluxPoints
from gammapy.maps import MapAxis, Maps, RegionGeom
from gammapy.modeling.models import (
    DiskSpatialModel,
    GaussianSpatialModel,
    Model,
    Models,
    PointSpatialModel,
    SkyModel,
    TemplateSpatialModel,
)
from gammapy.utils.gauss import Gauss2DPDF
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_standardise_units_inplace
from .core import SourceCatalog, SourceCatalogObject, format_flux_points_table
__all__ = [
    "SourceCatalog2FHL",
    "SourceCatalog3FGL",
    "SourceCatalog3FHL",
    "SourceCatalog4FGL",
    "SourceCatalog2PC",
    "SourceCatalog3PC",
    "SourceCatalogObject2FHL",
    "SourceCatalogObject3FGL",
    "SourceCatalogObject3FHL",
    "SourceCatalogObject4FGL",
    "SourceCatalogObject2PC",
    "SourceCatalogObject3PC",
]
log = logging.getLogger(__name__)
def compute_flux_points_ul(quantity, quantity_errp):
    """Compute UL value for fermi flux points.
    See https://arxiv.org/pdf/1501.02003.pdf (page 30).
    """
    return 2 * quantity_errp + quantity
class SourceCatalogObjectFermiPCBase(SourceCatalogObject, abc.ABC):
    """Base class for Fermi-LAT Pulsar catalogs."""
    def __str__(self):
        return self.info()
    def info(self, info="all"):
        if info == "all":
            info = "basic,more,position,pulsar,spectral,lightcurve"
        ss = ""
        ops = info.split(",")
        if "basic" in ops:
            ss += self._info_basic()
        if "more" in ops:
            ss += self._info_more()
        if "pulsar" in ops:
            ss += self._info_pulsar()
        if "position" in ops:
            ss += self._info_position()
        if "spectral" in ops:
            ss += self._info_spectral_fit()
            ss += self._info_spectral_points()
        if "lightcurve" in ops:
            ss += self._info_phasogram()
        return ss
    def _info_basic(self):
        ss = "\n*** Basic info ***\n\n"
        ss += "Catalog row index (zero-based) : {}\n".format(self.row_index)
        ss += "{:<20s} : {}\n".format("Source name", self.name)
        return ss
    def _info_more(self):
        return ""
    def _info_pulsar(self):
        return "\n"
    def _info_position(self):
        source_pos = self.position
        ss = "\n*** Position info ***\n\n"
        ss += "{:<20s} : {:.3f}\n".format("RA", source_pos.ra)
        ss += "{:<20s} : {:.3f}\n".format("DEC", source_pos.dec)
        ss += "{:<20s} : {:.3f}\n".format("GLON", source_pos.galactic.l)
        ss += "{:<20s} : {:.3f}\n".format("GLAT", source_pos.galactic.b)
        return ss
    def _info_spectral_fit(self):
        return "\n"
    def _info_spectral_points(self):
        ss = "\n*** Spectral points ***\n\n"
        if self.flux_points_table is None:
            ss += "No spectral points available.\n"
            return ss
        lines = format_flux_points_table(self.flux_points_table).pformat(
            max_width=-1, max_lines=-1
        )
        ss += "\n".join(lines)
        ss += "\n"
        return ss
    def _info_phasogram(self):
        return ""
    def spatial_model(self):
        source_pos = self.position
        ra = source_pos.ra
        dec = source_pos.dec
        model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
        return model
    def sky_model(self, name=None):
        """Sky model (`~gammapy.modeling.models.SkyModel`)."""
        spectral_model = self.spectral_model()
        if spectral_model is None:
            return None
        if name is None:
            name = self.name
        return SkyModel(
            spatial_model=self.spatial_model(),
            spectral_model=spectral_model,
            name=name,
        )
    @property
    def flux_points(self):
        """Flux points (`~gammapy.estimators.FluxPoints`)."""
        if self.flux_points_table is None:
            return None
        return FluxPoints.from_table(
            table=self.flux_points_table,
            reference_model=self.sky_model(),
            format="gadf-sed",
        )
    @property
    def lightcurve(self):
        """Light-curve."""
        pass
class SourceCatalogObjectFermiBase(SourceCatalogObject, abc.ABC):
    """Base class for Fermi-LAT catalogs."""
    asso = ["ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM1", "ASSOC_GAM2", "ASSOC_GAM3"]
    flux_points_meta = {
        "sed_type_init": "flux",
        "n_sigma": 1,
        "sqrt_ts_threshold_ul": 1,
        "n_sigma_ul": 2,
    }
    def __str__(self):
        return self.info()
    def info(self, info="all"):
        """Summary information string.
        Parameters
        ----------
        info : {'all', 'basic', 'more', 'position', 'spectral', 'lightcurve'}
            Comma separated list of options.
        """
        if info == "all":
            info = "basic,more,position,spectral,lightcurve"
        ss = ""
        ops = info.split(",")
        if "basic" in ops:
            ss += self._info_basic()
        if "more" in ops:
            ss += self._info_more()
        if "position" in ops:
            ss += self._info_position()
            if not self.is_pointlike:
                ss += self._info_morphology()
        if "spectral" in ops:
            ss += self._info_spectral_fit()
            ss += self._info_spectral_points()
        if "lightcurve" in ops:
            ss += self._info_lightcurve()
        return ss
    def _info_basic(self):
        d = self.data
        keys = self.asso
        ss = "\n*** Basic info ***\n\n"
        ss += "Catalog row index (zero-based) : {}\n".format(self.row_index)
        ss += "{:<20s} : {}\n".format("Source name", self.name)
        if "Extended_Source_Name" in d:
            ss += "{:<20s} : {}\n".format("Extended name", d["Extended_Source_Name"])
        def get_nonentry_keys(keys):
            vals = [str(d[_]).strip() for _ in keys]
            return ", ".join([_ for _ in vals if _ != ""])
        associations = get_nonentry_keys(keys)
        ss += "{:<16s} : {}\n".format("Associations", associations)
        try:
            ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_BAY", d["ASSOC_PROB_BAY"])
            ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_LR", d["ASSOC_PROB_LR"])
        except KeyError:
            pass
        try:
            ss += "{:<16s} : {}\n".format("Class1", d["CLASS1"])
        except KeyError:
            ss += "{:<16s} : {}\n".format("Class", d["CLASS"])
        try:
            ss += "{:<16s} : {}\n".format("Class2", d["CLASS2"])
        except KeyError:
            pass
        ss += "{:<16s} : {}\n".format("TeVCat flag", d.get("TEVCAT_FLAG", "N/A"))
        return ss
    @abc.abstractmethod
    def _info_more(self):
        pass
    def _info_position(self):
        d = self.data
        ss = "\n*** Position info ***\n\n"
        ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
        ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
        ss += "\n"
        ss += "{:<20s} : {:.4f}\n".format("Semimajor (68%)", d["Conf_68_SemiMajor"])
        ss += "{:<20s} : {:.4f}\n".format("Semiminor (68%)", d["Conf_68_SemiMinor"])
        ss += "{:<20s} : {:.2f}\n".format("Position angle (68%)", d["Conf_68_PosAng"])
        ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"])
        ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"])
        ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"])
        ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"])
        return ss
    def _info_morphology(self):
        e = self.data_extended
        ss = "\n*** Extended source information ***\n\n"
        ss += "{:<16s} : {}\n".format("Model form", e["Model_Form"])
        ss += "{:<16s} : {:.4f}\n".format("Model semimajor", e["Model_SemiMajor"])
        ss += "{:<16s} : {:.4f}\n".format("Model semiminor", e["Model_SemiMinor"])
        ss += "{:<16s} : {:.4f}\n".format("Position angle", e["Model_PosAng"])
        try:
            ss += "{:<16s} : {}\n".format("Spatial function", e["Spatial_Function"])
        except KeyError:
            pass
        ss += "{:<16s} : {}\n\n".format("Spatial filename", e["Spatial_Filename"])
        return ss
    def _info_spectral_fit(self):
        return "\n"
    def _info_spectral_points(self):
        ss = "\n*** Spectral points ***\n\n"
        lines = format_flux_points_table(self.flux_points_table).pformat(
            max_width=-1, max_lines=-1
        )
        ss += "\n".join(lines)
        return ss
    def _info_lightcurve(self):
        return "\n"
    @property
    def is_pointlike(self):
        return self.data["Extended_Source_Name"].strip() == ""
    # FIXME: this should be renamed `set_position_error`,
    # and `phi_0` isn't filled correctly, other parameters missing
    # see https://github.com/gammapy/gammapy/pull/2533#issuecomment-553329049
    def _set_spatial_errors(self, model):
        d = self.data
        if "Pos_err_68" in d:
            percent = 0.68
            semi_minor = d["Pos_err_68"]
            semi_major = d["Pos_err_68"]
            phi_0 = 0.0
        else:
            percent = 0.95
            semi_minor = d["Conf_95_SemiMinor"]
            semi_major = d["Conf_95_SemiMajor"]
            phi_0 = d["Conf_95_PosAng"]
        if np.isnan(phi_0):
            phi_0 = 0.0 * u.deg
        scale_1sigma = Gauss2DPDF().containment_radius(percent)
        lat_err = semi_major / scale_1sigma
        lon_err = semi_minor / scale_1sigma / np.cos(d["DEJ2000"])
        if "TemplateSpatialModel" not in model.tag:
            model.parameters["lon_0"].error = lon_err
            model.parameters["lat_0"].error = lat_err
            model.phi_0 = phi_0
    def sky_model(self, name=None):
        """Sky model as a `~gammapy.modeling.models.SkyModel` object."""
        if name is None:
            name = self.name
        return SkyModel(
            spatial_model=self.spatial_model(),
            spectral_model=self.spectral_model(),
            name=name,
        )
    @property
    def flux_points(self):
        """Flux points as a `~gammapy.estimators.FluxPoints` object."""
        return FluxPoints.from_table(
            table=self.flux_points_table,
            reference_model=self.sky_model(),
            format="gadf-sed",
        )
[docs]
class SourceCatalogObject4FGL(SourceCatalogObjectFermiBase):
    """One source from the Fermi-LAT 4FGL catalog.
    Catalog is represented by `~gammapy.catalog.SourceCatalog4FGL`.
    """
    asso = [
        "ASSOC1",
        "ASSOC2",
        "ASSOC_TEV",
        "ASSOC_FGL",
        "ASSOC_FHL",
        "ASSOC_GAM1",
        "ASSOC_GAM2",
        "ASSOC_GAM3",
    ]
    def _info_more(self):
        d = self.data
        ss = "\n*** Other info ***\n\n"
        fmt = "{:<32s} : {:.3f}\n"
        ss += fmt.format("Significance (100 MeV - 1 TeV)", d["Signif_Avg"])
        ss += "{:<32s} : {:.1f}\n".format("Npred", d["Npred"])
        ss += "\n{:<20s} : {}\n".format("Other flags", d["Flags"])
        return ss
    def _info_spectral_fit(self):
        d = self.data
        spec_type = d["SpectrumType"].strip()
        ss = "\n*** Spectral info ***\n\n"
        ss += "{:<45s} : {}\n".format("Spectrum type", d["SpectrumType"])
        fmt = "{:<45s} : {:.3f}\n"
        ss += fmt.format("Detection significance (100 MeV - 1 TeV)", d["Signif_Avg"])
        if spec_type == "PowerLaw":
            tag = "PL"
        elif spec_type == "LogParabola":
            tag = "LP"
            ss += "{:<45s} : {:.4f} +- {:.5f}\n".format(
                "beta", d["LP_beta"], d["Unc_LP_beta"]
            )
            ss += "{:<45s} : {:.1f}\n".format("Significance curvature", d["LP_SigCurv"])
        elif spec_type == "PLSuperExpCutoff":
            tag = "PLEC"
            fmt = "{:<45s} : {:.4f} +- {:.4f}\n"
            if "PLEC_ExpfactorS" in d:
                ss += fmt.format(
                    "Exponential factor", d["PLEC_ExpfactorS"], d["Unc_PLEC_ExpfactorS"]
                )
            else:
                ss += fmt.format(
                    "Exponential factor", d["PLEC_Expfactor"], d["Unc_PLEC_Expfactor"]
                )
            ss += "{:<45s} : {:.4f} +- {:.4f}\n".format(
                "Super-exponential cutoff index",
                d["PLEC_Exp_Index"],
                d["Unc_PLEC_Exp_Index"],
            )
            ss += "{:<45s} : {:.1f}\n".format(
                "Significance curvature", d["PLEC_SigCurv"]
            )
        else:
            raise ValueError(f"Invalid spec_type: {spec_type!r}")
        ss += "{:<45s} : {:.0f} {}\n".format(
            "Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
        )
        fmt = "{:<45s} : {:.3f} +- {:.3f}\n"
        if f"{tag}_ExpfactorS" in d:
            ss += fmt.format(
                "Spectral index", d[tag + "_IndexS"], d["Unc_" + tag + "_IndexS"]
            )
        else:
            ss += fmt.format(
                "Spectral index", d[tag + "_Index"], d["Unc_" + tag + "_Index"]
            )
        fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
        ss += fmt.format(
            "Flux Density at pivot energy",
            d[tag + "_Flux_Density"].value,
            d["Unc_" + tag + "_Flux_Density"].value,
            "cm-2 MeV-1 s-1",
        )
        fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
        ss += fmt.format(
            "Integral flux (1 - 100 GeV)",
            d["Flux1000"].value,
            d["Unc_Flux1000"].value,
            "cm-2 s-1",
        )
        fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
        ss += fmt.format(
            "Energy flux (100 MeV - 100 GeV)",
            d["Energy_Flux100"].value,
            d["Unc_Energy_Flux100"].value,
            "erg cm-2 s-1",
        )
        return ss
    def _info_lightcurve(self):
        d = self.data
        ss = "\n*** Lightcurve info ***\n\n"
        ss += "Lightcurve measured in the energy band: 100 MeV - 100 GeV\n\n"
        ss += "{:<15s} : {:.3f}\n".format("Variability index", d["Variability_Index"])
        if np.isfinite(d["Flux_Peak"]):
            ss += "{:<40s} : {:.3f}\n".format(
                "Significance peak (100 MeV - 100 GeV)", d["Signif_Peak"]
            )
            fmt = "{:<40s} : {:.3} +- {:.3} cm^-2 s^-1\n"
            ss += fmt.format(
                "Integral flux peak (100 MeV - 100 GeV)",
                d["Flux_Peak"].value,
                d["Unc_Flux_Peak"].value,
            )
            # TODO: give time as UTC string, not MET
            ss += "{:<40s} : {:.3} s (Mission elapsed time)\n".format(
                "Time peak", d["Time_Peak"].value
            )
            peak_interval = d["Peak_Interval"].to_value("day")
            ss += "{:<40s} : {:.3} day\n".format("Peak interval", peak_interval)
        else:
            ss += "\nNo peak measured for this source.\n"
        # TODO: Add a lightcurve table with d['Flux_History'] and d['Unc_Flux_History']
        return ss
[docs]
    def spatial_model(self):
        """Spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
        d = self.data
        ra = d["RAJ2000"]
        dec = d["DEJ2000"]
        if self.is_pointlike:
            model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
        else:
            de = self.data_extended
            morph_type = de["Model_Form"].strip()
            e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
            sigma = de["Model_SemiMajor"]
            phi = de["Model_PosAng"]
            if morph_type == "Disk":
                r_0 = de["Model_SemiMajor"]
                model = DiskSpatialModel(
                    lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
                )
            elif morph_type in ["Map", "Ring", "2D Gaussian x2"]:
                filename = de["Spatial_Filename"].strip() + ".gz"
                if de["version"] < 28:
                    path_extended = "$GAMMAPY_DATA/catalogs/fermi/LAT_extended_sources_8years/Templates/"
                elif de["version"] < 32:
                    path_extended = (
                        "$GAMMAPY_DATA/catalogs/fermi/Extended_12years/Templates/"
                    )
                else:
                    path_extended = (
                        "$GAMMAPY_DATA/catalogs/fermi/Extended_14years/Templates/"
                    )
                path = make_path(path_extended)
                with warnings.catch_warnings():  # ignore FITS units warnings
                    warnings.simplefilter("ignore", FITSFixedWarning)
                    model = TemplateSpatialModel.read(path / filename)
            elif morph_type == "2D Gaussian":
                model = GaussianSpatialModel(
                    lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
                )
            else:
                raise ValueError(f"Invalid spatial model: {morph_type!r}")
        self._set_spatial_errors(model)
        return model 
[docs]
    def spectral_model(self):
        """Best fit spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
        spec_type = self.data["SpectrumType"].strip()
        if spec_type == "PowerLaw":
            tag = "PowerLawSpectralModel"
            pars = {
                "reference": self.data["Pivot_Energy"],
                "amplitude": self.data["PL_Flux_Density"],
                "index": self.data["PL_Index"],
            }
            errs = {
                "amplitude": self.data["Unc_PL_Flux_Density"],
                "index": self.data["Unc_PL_Index"],
            }
        elif spec_type == "LogParabola":
            tag = "LogParabolaSpectralModel"
            pars = {
                "reference": self.data["Pivot_Energy"],
                "amplitude": self.data["LP_Flux_Density"],
                "alpha": self.data["LP_Index"],
                "beta": self.data["LP_beta"],
            }
            errs = {
                "amplitude": self.data["Unc_LP_Flux_Density"],
                "alpha": self.data["Unc_LP_Index"],
                "beta": self.data["Unc_LP_beta"],
            }
        elif spec_type == "PLSuperExpCutoff":
            if "PLEC_ExpfactorS" in self.data:
                tag = "SuperExpCutoffPowerLaw4FGLDR3SpectralModel"
                expfactor = self.data["PLEC_ExpfactorS"]
                expfactor_err = self.data["Unc_PLEC_ExpfactorS"]
                index_1 = self.data["PLEC_IndexS"]
                index_1_err = self.data["Unc_PLEC_IndexS"]
            else:
                tag = "SuperExpCutoffPowerLaw4FGLSpectralModel"
                expfactor = self.data["PLEC_Expfactor"]
                expfactor_err = self.data["Unc_PLEC_Expfactor"]
                index_1 = self.data["PLEC_Index"]
                index_1_err = self.data["Unc_PLEC_Index"]
            pars = {
                "reference": self.data["Pivot_Energy"],
                "amplitude": self.data["PLEC_Flux_Density"],
                "index_1": index_1,
                "index_2": self.data["PLEC_Exp_Index"],
                "expfactor": expfactor,
            }
            errs = {
                "amplitude": self.data["Unc_PLEC_Flux_Density"],
                "index_1": index_1_err,
                "index_2": np.nan_to_num(float(self.data["Unc_PLEC_Exp_Index"])),
                "expfactor": expfactor_err,
            }
        else:
            raise ValueError(f"Invalid spec_type: {spec_type!r}")
        model = Model.create(tag, "spectral", **pars)
        for name, value in errs.items():
            model.parameters[name].error = value
        return model 
    @property
    def flux_points_table(self):
        """Flux points as a `~astropy.table.Table`."""
        table = Table()
        table.meta.update(self.flux_points_meta)
        table["e_min"] = self.data["fp_energy_edges"][:-1]
        table["e_max"] = self.data["fp_energy_edges"][1:]
        flux = self._get_flux_values("Flux_Band")
        flux_err = self._get_flux_values("Unc_Flux_Band")
        table["flux"] = flux
        table["flux_errn"] = np.abs(flux_err[:, 0])
        table["flux_errp"] = flux_err[:, 1]
        nuFnu = self._get_flux_values("nuFnu_Band", "erg cm-2 s-1")
        table["e2dnde"] = nuFnu
        table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux)
        table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux
        is_ul = np.isnan(table["flux_errn"])
        table["is_ul"] = is_ul
        # handle upper limits
        table["flux_ul"] = np.nan * flux_err.unit
        flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
        table["flux_ul"][is_ul] = flux_ul[is_ul]
        # handle upper limits
        table["e2dnde_ul"] = np.nan * nuFnu.unit
        e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
        table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
        # Square root of test statistic
        table["sqrt_ts"] = self.data["Sqrt_TS_Band"]
        return table
    def _get_flux_values(self, prefix, unit="cm-2 s-1"):
        values = self.data[prefix]
        return u.Quantity(values, unit)
[docs]
    def lightcurve(self, interval="1-year"):
        """Lightcurve as a `~gammapy.estimators.FluxPoints` object.
        Parameters
        ----------
        interval : {'1-year', '2-month'}
            Time interval of the lightcurve. Default is '1-year'.
            Note that '2-month' is not available for all catalogue version.
        """
        if interval == "1-year":
            tag = "Flux_History"
            if tag not in self.data or "time_axis" not in self.data:
                raise ValueError(
                    "'1-year' interval is not available for this catalogue version"
                )
            time_axis = self.data["time_axis"]
            tag_sqrt_ts = "Sqrt_TS_History"
        elif interval == "2-month":
            tag = "Flux2_History"
            if tag not in self.data or "time_axis_2" not in self.data:
                raise ValueError(
                    "2-month interval is not available for this catalog version"
                )
            time_axis = self.data["time_axis_2"]
            tag_sqrt_ts = "Sqrt_TS2_History"
        else:
            raise ValueError("Time intervals available are '1-year' or '2-month'")
        energy_axis = MapAxis.from_energy_edges([50, 300000] * u.MeV)
        geom = RegionGeom.create(region=self.position, axes=[energy_axis, time_axis])
        names = ["flux", "flux_errp", "flux_errn", "flux_ul", "ts"]
        maps = Maps.from_geom(geom=geom, names=names)
        maps["flux"].quantity = self.data[tag].reshape(geom.data_shape)
        maps["flux_errp"].quantity = self.data[f"Unc_{tag}"][:, 1].reshape(
            geom.data_shape
        )
        maps["flux_errn"].quantity = -self.data[f"Unc_{tag}"][:, 0].reshape(
            geom.data_shape
        )
        maps["flux_ul"].quantity = compute_flux_points_ul(
            maps["flux"].quantity, maps["flux_errp"].quantity
        ).reshape(geom.data_shape)
        maps["ts"].quantity = (self.data[tag_sqrt_ts] ** 2).reshape(geom.data_shape)
        return FluxPoints.from_maps(
            maps=maps,
            sed_type="flux",
            reference_model=self.sky_model(),
            meta=self.flux_points.meta.copy(),
        ) 
 
[docs]
class SourceCatalogObject3FGL(SourceCatalogObjectFermiBase):
    """One source from the Fermi-LAT 3FGL catalog.
    Catalog is represented by `~gammapy.catalog.SourceCatalog3FGL`.
    """
    _energy_edges = u.Quantity([100, 300, 1000, 3000, 10000, 100000], "MeV")
    _energy_edges_suffix = [
        "100_300",
        "300_1000",
        "1000_3000",
        "3000_10000",
        "10000_100000",
    ]
    energy_range = u.Quantity([100, 100000], "MeV")
    """Energy range used for the catalog.
    Paper says that analysis uses data up to 300 GeV,
    but results are all quoted up to 100 GeV only to
    be consistent with previous catalogs.
    """
    def _info_more(self):
        d = self.data
        ss = "\n*** Other info ***\n\n"
        ss += "{:<20s} : {}\n".format("Other flags", d["Flags"])
        return ss
    def _info_spectral_fit(self):
        d = self.data
        spec_type = d["SpectrumType"].strip()
        ss = "\n*** Spectral info ***\n\n"
        ss += "{:<45s} : {}\n".format("Spectrum type", d["SpectrumType"])
        fmt = "{:<45s} : {:.3f}\n"
        ss += fmt.format("Detection significance (100 MeV - 300 GeV)", d["Signif_Avg"])
        ss += "{:<45s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"])
        if spec_type == "PowerLaw":
            pass
        elif spec_type == "LogParabola":
            ss += "{:<45s} : {} +- {}\n".format("beta", d["beta"], d["Unc_beta"])
        elif spec_type in ["PLExpCutoff", "PlSuperExpCutoff"]:
            fmt = "{:<45s} : {:.0f} +- {:.0f} {}\n"
            ss += fmt.format(
                "Cutoff energy",
                d["Cutoff"].value,
                d["Unc_Cutoff"].value,
                d["Cutoff"].unit,
            )
        elif spec_type == "PLSuperExpCutoff":
            ss += "{:<45s} : {} +- {}\n".format(
                "Super-exponential cutoff index", d["Exp_Index"], d["Unc_Exp_Index"]
            )
        else:
            raise ValueError(f"Invalid spec_type: {spec_type!r}")
        ss += "{:<45s} : {:.0f} {}\n".format(
            "Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
        )
        ss += "{:<45s} : {:.3f}\n".format(
            "Power law spectral index", d["PowerLaw_Index"]
        )
        fmt = "{:<45s} : {:.3f} +- {:.3f}\n"
        ss += fmt.format("Spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"])
        fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
        ss += fmt.format(
            "Flux Density at pivot energy",
            d["Flux_Density"].value,
            d["Unc_Flux_Density"].value,
            "cm-2 MeV-1 s-1",
        )
        fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
        ss += fmt.format(
            "Integral flux (1 - 100 GeV)",
            d["Flux1000"].value,
            d["Unc_Flux1000"].value,
            "cm-2 s-1",
        )
        fmt = "{:<45s} : {:.3} +- {:.3} {}\n"
        ss += fmt.format(
            "Energy flux (100 MeV - 100 GeV)",
            d["Energy_Flux100"].value,
            d["Unc_Energy_Flux100"].value,
            "erg cm-2 s-1",
        )
        return ss
    def _info_lightcurve(self):
        d = self.data
        ss = "\n*** Lightcurve info ***\n\n"
        ss += "Lightcurve measured in the energy band: 100 MeV - 100 GeV\n\n"
        ss += "{:<15s} : {:.3f}\n".format("Variability index", d["Variability_Index"])
        if np.isfinite(d["Flux_Peak"]):
            ss += "{:<40s} : {:.3f}\n".format(
                "Significance peak (100 MeV - 100 GeV)", d["Signif_Peak"]
            )
            fmt = "{:<40s} : {:.3} +- {:.3} cm^-2 s^-1\n"
            ss += fmt.format(
                "Integral flux peak (100 MeV - 100 GeV)",
                d["Flux_Peak"].value,
                d["Unc_Flux_Peak"].value,
            )
            # TODO: give time as UTC string, not MET
            ss += "{:<40s} : {:.3} s (Mission elapsed time)\n".format(
                "Time peak", d["Time_Peak"].value
            )
            peak_interval = d["Peak_Interval"].to_value("day")
            ss += "{:<40s} : {:.3} day\n".format("Peak interval", peak_interval)
        else:
            ss += "\nNo peak measured for this source.\n"
        # TODO: Add a lightcurve table with d['Flux_History'] and d['Unc_Flux_History']
        return ss
[docs]
    def spectral_model(self):
        """Best fit spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
        spec_type = self.data["SpectrumType"].strip()
        if spec_type == "PowerLaw":
            tag = "PowerLawSpectralModel"
            pars = {
                "amplitude": self.data["Flux_Density"],
                "reference": self.data["Pivot_Energy"],
                "index": self.data["Spectral_Index"],
            }
            errs = {
                "amplitude": self.data["Unc_Flux_Density"],
                "index": self.data["Unc_Spectral_Index"],
            }
        elif spec_type == "PLExpCutoff":
            tag = "ExpCutoffPowerLaw3FGLSpectralModel"
            pars = {
                "amplitude": self.data["Flux_Density"],
                "reference": self.data["Pivot_Energy"],
                "index": self.data["Spectral_Index"],
                "ecut": self.data["Cutoff"],
            }
            errs = {
                "amplitude": self.data["Unc_Flux_Density"],
                "index": self.data["Unc_Spectral_Index"],
                "ecut": self.data["Unc_Cutoff"],
            }
        elif spec_type == "LogParabola":
            tag = "LogParabolaSpectralModel"
            pars = {
                "amplitude": self.data["Flux_Density"],
                "reference": self.data["Pivot_Energy"],
                "alpha": self.data["Spectral_Index"],
                "beta": self.data["beta"],
            }
            errs = {
                "amplitude": self.data["Unc_Flux_Density"],
                "alpha": self.data["Unc_Spectral_Index"],
                "beta": self.data["Unc_beta"],
            }
        elif spec_type == "PLSuperExpCutoff":
            tag = "SuperExpCutoffPowerLaw3FGLSpectralModel"
            pars = {
                "amplitude": self.data["Flux_Density"],
                "reference": self.data["Pivot_Energy"],
                "index_1": self.data["Spectral_Index"],
                "index_2": self.data["Exp_Index"],
                "ecut": self.data["Cutoff"],
            }
            errs = {
                "amplitude": self.data["Unc_Flux_Density"],
                "index_1": self.data["Unc_Spectral_Index"],
                "index_2": self.data["Unc_Exp_Index"],
                "ecut": self.data["Unc_Cutoff"],
            }
        else:
            raise ValueError(f"Invalid spec_type: {spec_type!r}")
        model = Model.create(tag, "spectral", **pars)
        for name, value in errs.items():
            model.parameters[name].error = value
        return model 
[docs]
    def spatial_model(self):
        """Spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
        d = self.data
        ra = d["RAJ2000"]
        dec = d["DEJ2000"]
        if self.is_pointlike:
            model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
        else:
            de = self.data_extended
            morph_type = de["Model_Form"].strip()
            e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
            sigma = de["Model_SemiMajor"]
            phi = de["Model_PosAng"]
            if morph_type == "Disk":
                r_0 = de["Model_SemiMajor"]
                model = DiskSpatialModel(
                    lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
                )
            elif morph_type in ["Map", "Ring", "2D Gaussian x2"]:
                filename = de["Spatial_Filename"].strip()
                path = make_path(
                    "$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v15/Templates/"
                )
                model = TemplateSpatialModel.read(path / filename)
            elif morph_type == "2D Gaussian":
                model = GaussianSpatialModel(
                    lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
                )
            else:
                raise ValueError(f"Invalid spatial model: {morph_type!r}")
        self._set_spatial_errors(model)
        return model 
    @property
    def flux_points_table(self):
        """Flux points as a `~astropy.table.Table`."""
        table = Table()
        table.meta.update(self.flux_points_meta)
        table["e_min"] = self._energy_edges[:-1]
        table["e_max"] = self._energy_edges[1:]
        flux = self._get_flux_values("Flux")
        flux_err = self._get_flux_values("Unc_Flux")
        table["flux"] = flux
        table["flux_errn"] = np.abs(flux_err[:, 0])
        table["flux_errp"] = flux_err[:, 1]
        nuFnu = self._get_flux_values("nuFnu", "erg cm-2 s-1")
        table["e2dnde"] = nuFnu
        table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux)
        table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux
        is_ul = np.isnan(table["flux_errn"])
        table["is_ul"] = is_ul
        # handle upper limits
        table["flux_ul"] = np.nan * flux_err.unit
        flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
        table["flux_ul"][is_ul] = flux_ul[is_ul]
        # handle upper limits
        table["e2dnde_ul"] = np.nan * nuFnu.unit
        e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
        table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
        # Square root of test statistic
        table["sqrt_ts"] = [self.data["Sqrt_TS" + _] for _ in self._energy_edges_suffix]
        return table
    def _get_flux_values(self, prefix, unit="cm-2 s-1"):
        values = [self.data[prefix + _] for _ in self._energy_edges_suffix]
        return u.Quantity(values, unit)
[docs]
    def lightcurve(self):
        """Lightcurve as a `~gammapy.estimators.FluxPoints` object."""
        time_axis = self.data["time_axis"]
        tag = "Flux_History"
        energy_axis = MapAxis.from_energy_edges(self.energy_range)
        geom = RegionGeom.create(region=self.position, axes=[energy_axis, time_axis])
        names = ["flux", "flux_errp", "flux_errn", "flux_ul"]
        maps = Maps.from_geom(geom=geom, names=names)
        maps["flux"].quantity = self.data[tag].reshape(geom.data_shape)
        maps["flux_errp"].quantity = self.data[f"Unc_{tag}"][:, 1].reshape(
            geom.data_shape
        )
        maps["flux_errn"].quantity = -self.data[f"Unc_{tag}"][:, 0].reshape(
            geom.data_shape
        )
        maps["flux_ul"].quantity = compute_flux_points_ul(
            maps["flux"].quantity, maps["flux_errp"].quantity
        ).reshape(geom.data_shape)
        is_ul = np.isnan(maps["flux_errn"])
        maps["flux_ul"].data[~is_ul] = np.nan
        return FluxPoints.from_maps(
            maps=maps,
            sed_type="flux",
            reference_model=self.sky_model(),
            meta=self.flux_points_meta.copy(),
        ) 
 
[docs]
class SourceCatalogObject2FHL(SourceCatalogObjectFermiBase):
    """One source from the Fermi-LAT 2FHL catalog.
    Catalog is represented by `~gammapy.catalog.SourceCatalog2FHL`.
    """
    asso = ["ASSOC", "3FGL_Name", "1FHL_Name", "TeVCat_Name"]
    _energy_edges = u.Quantity([50, 171, 585, 2000], "GeV")
    _energy_edges_suffix = ["50_171", "171_585", "585_2000"]
    energy_range = u.Quantity([0.05, 2], "TeV")
    """Energy range used for the catalog."""
    def _info_more(self):
        d = self.data
        ss = "\n*** Other info ***\n\n"
        fmt = "{:<32s} : {:.3f}\n"
        ss += fmt.format("Test statistic (50 GeV - 2 TeV)", d["TS"])
        return ss
    def _info_position(self):
        d = self.data
        ss = "\n*** Position info ***\n\n"
        ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
        ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
        ss += "\n"
        ss += "{:<20s} : {:.4f}\n".format("Error on position (68%)", d["Pos_err_68"])
        ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI"])
        return ss
    def _info_spectral_fit(self):
        d = self.data
        ss = "\n*** Spectral fit info ***\n\n"
        fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
        ss += fmt.format(
            "Power-law spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"]
        )
        ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
            "Integral flux (50 GeV - 2 TeV)",
            d["Flux50"].value,
            d["Unc_Flux50"].value,
            "cm-2 s-1",
        )
        ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
            "Energy flux (50 GeV - 2 TeV)",
            d["Energy_Flux50"].value,
            d["Unc_Energy_Flux50"].value,
            "erg cm-2 s-1",
        )
        return ss
    @property
    def is_pointlike(self):
        return self.data["Source_Name"].strip()[-1] != "e"
[docs]
    def spatial_model(self):
        """Spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
        d = self.data
        ra = d["RAJ2000"]
        dec = d["DEJ2000"]
        if self.is_pointlike:
            model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
        else:
            de = self.data_extended
            morph_type = de["Model_Form"].strip()
            e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
            sigma = de["Model_SemiMajor"]
            phi = de["Model_PosAng"]
            if morph_type in ["Disk", "Elliptical Disk"]:
                r_0 = de["Model_SemiMajor"]
                model = DiskSpatialModel(
                    lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
                )
            elif morph_type in ["Map", "Ring", "2D Gaussian x2"]:
                filename = de["Spatial_Filename"].strip()
                path = make_path(
                    "$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v15/Templates/"
                )
                return TemplateSpatialModel.read(path / filename)
            elif morph_type in ["2D Gaussian", "Elliptical 2D Gaussian"]:
                model = GaussianSpatialModel(
                    lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
                )
            else:
                raise ValueError(f"Invalid spatial model: {morph_type!r}")
        self._set_spatial_errors(model)
        return model 
[docs]
    def spectral_model(self):
        """Best fit spectral model as a `~gammapy.modeling.models.SpectralModel`."""
        tag = "PowerLaw2SpectralModel"
        pars = {
            "amplitude": self.data["Flux50"],
            "emin": self.energy_range[0],
            "emax": self.energy_range[1],
            "index": self.data["Spectral_Index"],
        }
        errs = {
            "amplitude": self.data["Unc_Flux50"],
            "index": self.data["Unc_Spectral_Index"],
        }
        model = Model.create(tag, "spectral", **pars)
        for name, value in errs.items():
            model.parameters[name].error = value
        return model 
    @property
    def flux_points_table(self):
        """Flux points as a `~astropy.table.Table`."""
        table = Table()
        table.meta.update(self.flux_points_meta)
        table["e_min"] = self._energy_edges[:-1]
        table["e_max"] = self._energy_edges[1:]
        table["flux"] = self._get_flux_values("Flux")
        flux_err = self._get_flux_values("Unc_Flux")
        table["flux_errn"] = np.abs(flux_err[:, 0])
        table["flux_errp"] = flux_err[:, 1]
        # handle upper limits
        is_ul = np.isnan(table["flux_errn"])
        table["is_ul"] = is_ul
        table["flux_ul"] = np.nan * flux_err.unit
        flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
        table["flux_ul"][is_ul] = flux_ul[is_ul]
        return table
    def _get_flux_values(self, prefix, unit="cm-2 s-1"):
        values = [self.data[prefix + _ + "GeV"] for _ in self._energy_edges_suffix]
        return u.Quantity(values, unit) 
[docs]
class SourceCatalogObject3FHL(SourceCatalogObjectFermiBase):
    """One source from the Fermi-LAT 3FHL catalog.
    Catalog is represented by `~gammapy.catalog.SourceCatalog3FHL`.
    """
    asso = ["ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM"]
    energy_range = u.Quantity([0.01, 2], "TeV")
    """Energy range used for the catalog."""
    _energy_edges = u.Quantity([10, 20, 50, 150, 500, 2000], "GeV")
    def _info_position(self):
        d = self.data
        ss = "\n*** Position info ***\n\n"
        ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
        ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])
        # TODO: All sources are non-elliptical; just give one number for radius?
        ss += "\n"
        ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"])
        ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"])
        ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"])
        ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"])
        return ss
    def _info_spectral_fit(self):
        d = self.data
        spec_type = d["SpectrumType"].strip()
        ss = "\n*** Spectral fit info ***\n\n"
        ss += "{:<32s} : {}\n".format("Spectrum type", d["SpectrumType"])
        ss += "{:<32s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"])
        # Power-law parameters are always given; give in any case
        fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
        ss += fmt.format(
            "Power-law spectral index", d["PowerLaw_Index"], d["Unc_PowerLaw_Index"]
        )
        if spec_type == "PowerLaw":
            pass
        elif spec_type == "LogParabola":
            fmt = "{:<32s} : {:.3f} +- {:.3f}\n"
            ss += fmt.format(
                "LogParabolaSpectralModel spectral index",
                d["Spectral_Index"],
                d["Unc_Spectral_Index"],
            )
            ss += "{:<32s} : {:.3f} +- {:.3f}\n".format(
                "LogParabolaSpectralModel beta", d["beta"], d["Unc_beta"]
            )
        else:
            raise ValueError(f"Invalid spec_type: {spec_type!r}")
        ss += "{:<32s} : {:.1f} {}\n".format(
            "Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit
        )
        ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
            "Flux Density at pivot energy",
            d["Flux_Density"].value,
            d["Unc_Flux_Density"].value,
            "cm-2 GeV-1 s-1",
        )
        ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
            "Integral flux (10 GeV - 1 TeV)",
            d["Flux"].value,
            d["Unc_Flux"].value,
            "cm-2 s-1",
        )
        ss += "{:<32s} : {:.3} +- {:.3} {}\n".format(
            "Energy flux (10 GeV - TeV)",
            d["Energy_Flux"].value,
            d["Unc_Energy_Flux"].value,
            "erg cm-2 s-1",
        )
        return ss
    def _info_more(self):
        d = self.data
        ss = "\n*** Other info ***\n\n"
        fmt = "{:<32s} : {:.3f}\n"
        ss += fmt.format("Significance (10 GeV - 2 TeV)", d["Signif_Avg"])
        ss += "{:<32s} : {:.1f}\n".format("Npred", d["Npred"])
        ss += "\n{:<16s} : {:.3f} {}\n".format(
            "HEP Energy", d["HEP_Energy"].value, d["HEP_Energy"].unit
        )
        ss += "{:<16s} : {:.3f}\n".format("HEP Probability", d["HEP_Prob"])
        ss += "{:<16s} : {}\n".format("Bayesian Blocks", d["Variability_BayesBlocks"])
        ss += "{:<16s} : {:.3f}\n".format("Redshift", d["Redshift"])
        ss += "{:<16s} : {:.3} {}\n".format(
            "NuPeak_obs", d["NuPeak_obs"].value, d["NuPeak_obs"].unit
        )
        return ss
[docs]
    def spectral_model(self):
        """Best fit spectral model as a `~gammapy.modeling.models.SpectralModel` object."""
        d = self.data
        spec_type = self.data["SpectrumType"].strip()
        if spec_type == "PowerLaw":
            tag = "PowerLawSpectralModel"
            pars = {
                "reference": d["Pivot_Energy"],
                "amplitude": d["Flux_Density"],
                "index": d["PowerLaw_Index"],
            }
            errs = {
                "amplitude": d["Unc_Flux_Density"],
                "index": d["Unc_PowerLaw_Index"],
            }
        elif spec_type == "LogParabola":
            tag = "LogParabolaSpectralModel"
            pars = {
                "reference": d["Pivot_Energy"],
                "amplitude": d["Flux_Density"],
                "alpha": d["Spectral_Index"],
                "beta": d["beta"],
            }
            errs = {
                "amplitude": d["Unc_Flux_Density"],
                "alpha": d["Unc_Spectral_Index"],
                "beta": d["Unc_beta"],
            }
        else:
            raise ValueError(f"Invalid spec_type: {spec_type!r}")
        model = Model.create(tag, "spectral", **pars)
        for name, value in errs.items():
            model.parameters[name].error = value
        return model 
    @property
    def flux_points_table(self):
        """Flux points as a `~astropy.table.Table`."""
        table = Table()
        table.meta.update(self.flux_points_meta)
        table["e_min"] = self._energy_edges[:-1]
        table["e_max"] = self._energy_edges[1:]
        flux = self.data["Flux_Band"]
        flux_err = self.data["Unc_Flux_Band"]
        e2dnde = self.data["nuFnu"]
        table["flux"] = flux
        table["flux_errn"] = np.abs(flux_err[:, 0])
        table["flux_errp"] = flux_err[:, 1]
        table["e2dnde"] = e2dnde
        table["e2dnde_errn"] = np.abs(e2dnde * flux_err[:, 0] / flux)
        table["e2dnde_errp"] = e2dnde * flux_err[:, 1] / flux
        is_ul = np.isnan(table["flux_errn"])
        table["is_ul"] = is_ul
        # handle upper limits
        table["flux_ul"] = np.nan * flux_err.unit
        flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
        table["flux_ul"][is_ul] = flux_ul[is_ul]
        table["e2dnde_ul"] = np.nan * e2dnde.unit
        e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
        table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
        # Square root of test statistic
        table["sqrt_ts"] = self.data["Sqrt_TS_Band"]
        return table
[docs]
    def spatial_model(self):
        """Source spatial model as a `~gammapy.modeling.models.SpatialModel` object."""
        d = self.data
        ra = d["RAJ2000"]
        dec = d["DEJ2000"]
        if self.is_pointlike:
            model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs")
        else:
            de = self.data_extended
            morph_type = de["Spatial_Function"].strip()
            e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5
            sigma = de["Model_SemiMajor"]
            phi = de["Model_PosAng"]
            if morph_type == "RadialDisk":
                r_0 = de["Model_SemiMajor"]
                model = DiskSpatialModel(
                    lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs"
                )
            elif morph_type in ["SpatialMap"]:
                filename = de["Spatial_Filename"].strip()
                path = make_path(
                    "$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v18/Templates/"
                )
                model = TemplateSpatialModel.read(path / filename)
            elif morph_type == "RadialGauss":
                model = GaussianSpatialModel(
                    lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs"
                )
            else:
                raise ValueError(f"Invalid morph_type: {morph_type!r}")
        self._set_spatial_errors(model)
        return model 
 
[docs]
class SourceCatalogObject2PC(SourceCatalogObjectFermiPCBase):
    """One source from the 2PC catalog."""
    @property
    def _auxiliary_filename(self):
        return make_path(
            f"$GAMMAPY_DATA/catalogs/fermi/2PC_auxiliary/PSR{self.name}_2PC_data.fits.gz"
        )
    def _info_more(self):
        d = self.data
        ss = "\n*** Other info ***\n\n"
        ss += "{:<20s} : {:s}\n".format("Binary", d["Binary"])
        return ss
    def _info_pulsar(self):
        d = self.data
        ss = "\n*** Pulsar info ***\n\n"
        ss += "{:<20s} : {:.3f}\n".format("Period", d["Period"])
        ss += "{:<20s} : {:.3e}\n".format("P_Dot", d["P_Dot"])
        ss += "{:<20s} : {:.3e}\n".format("E_Dot", d["E_Dot"])
        ss += "{:<20s} : {}\n".format("Type", d["Type"])
        return ss
    def _info_spectral_fit(self):
        d = self.data_spectral
        ss = "\n*** Spectral info ***\n\n"
        if d is None:
            ss += "No spectral info available.\n"
            return ss
        ss += "{:<20s} : {}\n".format("On peak", d["On_Peak"])
        ss += "{:<20s} : {:.0f}\n".format("TS DC", d["TS_DC"])
        ss += "{:<20s} : {:.0f}\n".format("TS cutoff", d["TS_Cutoff"])
        ss += "{:<20s} : {:.0f}\n".format("TS b free", d["TS_bfree"])
        indentation = " " * 4
        fmt_e = "{}{:<20s} : {:.3e} +- {:.3e}\n"
        fmt_f = "{}{:<20s} : {:.3f} +- {:.3f}\n"
        if not isinstance(d["PLEC1_Prefactor"], np.ma.core.MaskedConstant):
            ss += "\n{}* PLSuperExpCutoff b = 1 *\n\n".format(indentation)
            ss += fmt_e.format(
                indentation, "Amplitude", d["PLEC1_Prefactor"], d["Unc_PLEC1_Prefactor"]
            )
            ss += fmt_f.format(
                indentation,
                "Index 1",
                d["PLEC1_Photon_Index"],
                d["Unc_PLEC1_Photon_Index"],
            )
            ss += "{}{:<20s} : {:.3f}\n".format(indentation, "Index 2", 1)
            ss += "{}{:<20s} : {:.3f}\n".format(
                indentation, "Reference", d["PLEC1_Scale"]
            )
            ss += fmt_f.format(
                indentation, "Ecut", d["PLEC1_Cutoff"], d["Unc_PLEC1_Cutoff"]
            )
        if not isinstance(d["PLEC_Prefactor"], np.ma.core.MaskedConstant):
            ss += "\n{}* PLSuperExpCutoff b free *\n\n".format(indentation)
            ss += fmt_e.format(
                indentation, "Amplitude", d["PLEC_Prefactor"], d["Unc_PLEC_Prefactor"]
            )
            ss += fmt_f.format(
                indentation,
                "Index 1",
                d["PLEC_Photon_Index"],
                d["Unc_PLEC_Photon_Index"],
            )
            ss += fmt_f.format(
                indentation,
                "Index 2",
                d["PLEC_Exponential_Index"],
                d["Unc_PLEC_Exponential_Index"],
            )
            ss += "{}{:<20s} : {:.3f}\n".format(
                indentation, "Reference", d["PLEC_Scale"]
            )
            ss += fmt_f.format(
                indentation, "Ecut", d["PLEC_Cutoff"], d["Unc_PLEC_Cutoff"]
            )
        if not isinstance(d["PL_Prefactor"], np.ma.core.MaskedConstant):
            ss += "\n{}* PowerLaw *\n\n".format(indentation)
            ss += fmt_e.format(
                indentation, "Amplitude", d["PL_Prefactor"], d["Unc_PL_Prefactor"]
            )
            ss += fmt_f.format(
                indentation, "Index", d["PL_Photon_Index"], d["Unc_PL_Photon_Index"]
            )
            ss += "{}{:<20s} : {:.3f}\n".format(indentation, "Reference", d["PL_Scale"])
        return ss
    def _info_phasogram(self):
        d = self.data
        ss = "\n*** Phasogram info ***\n\n"
        ss += "{:<20s} : {:d}\n".format("Number of peaks", d["Num_Peaks"])
        ss += "{:<20s} : {:.3f}\n".format("Peak separation", d["Peak_Sep"])
        return ss
[docs]
    def spectral_model(self):
        d = self.data_spectral
        if d is None:
            log.warning(f"No spectral model available for source {self.name}")
            return None
        if d["TS_Cutoff"] < 9:
            tag = "PowerLawSpectralModel"
            pars = {
                "reference": d["PL_Scale"],
                "amplitude": d["PL_Prefactor"],
                "index": d["PL_Photon_Index"],
            }
            errs = {
                "amplitude": d["Unc_PL_Prefactor"],
                "index": d["Unc_PL_Photon_Index"],
            }
        elif d["TS_bfree"] >= 9:
            tag = "SuperExpCutoffPowerLaw3FGLSpectralModel"
            pars = {
                "index_1": d["PLEC_Photon_Index"],
                "index_2": d["PLEC_Exponential_Index"],
                "amplitude": d["PLEC_Prefactor"],
                "reference": d["PLEC_Scale"],
                "ecut": d["PLEC_Cutoff"],
            }
            errs = {
                "index_1": d["Unc_PLEC_Photon_Index"],
                "index_2": d["Unc_PLEC_Exponential_Index"],
                "amplitude": d["Unc_PLEC_Prefactor"],
                "ecut": d["Unc_PLEC_Cutoff"],
            }
        elif d["TS_bfree"] < 9:
            tag = "SuperExpCutoffPowerLaw3FGLSpectralModel"
            pars = {
                "index_1": d["PLEC1_Photon_Index"],
                "index_2": 1,
                "amplitude": d["PLEC1_Prefactor"],
                "reference": d["PLEC1_Scale"],
                "ecut": d["PLEC1_Cutoff"],
            }
            errs = {
                "index_1": d["Unc_PLEC1_Photon_Index"],
                "amplitude": d["Unc_PLEC1_Prefactor"],
                "ecut": d["Unc_PLEC1_Cutoff"],
            }
        else:
            log.warning(f"No spectral model available for source {self.name}")
            return None
        model = Model.create(tag, "spectral", **pars)
        for name, value in errs.items():
            model.parameters[name].error = value
        return model 
    @property
    def flux_points_table(self):
        """Flux points (`~astropy.table.Table`)."""
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore", u.UnitsWarning)
                fp_data = Table.read(self._auxiliary_filename, hdu="PULSAR_SED")
        except (KeyError, FileNotFoundError):
            log.warning(f"No flux points available for source {self.name}")
            return None
        table = Table()
        table["e_min"] = fp_data["Energy_Min"]
        table["e_max"] = fp_data["Energy_Max"]
        table["e_ref"] = fp_data["Center_Energy"]
        table["flux"] = fp_data["PhotonFlux"]
        table["flux_err"] = fp_data["Unc_PhotonFlux"]
        table["e2dnde"] = fp_data["EnergyFlux"]
        table["e2dnde_err"] = fp_data["Unc_EnergyFlux"]
        is_ul = np.where(table["e2dnde_err"] == 0, True, False)
        table["is_ul"] = is_ul
        table["flux_ul"] = np.nan * table["flux_err"].unit
        flux_ul = compute_flux_points_ul(table["flux"], table["flux_err"])
        table["flux_ul"][is_ul] = flux_ul[is_ul]
        table["e2dnde_ul"] = np.nan * table["e2dnde"].unit
        e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_err"])
        table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
        return table 
[docs]
class SourceCatalogObject3PC(SourceCatalogObjectFermiPCBase):
    """One source from the 3PC catalog."""
    asso = ["assoc_new"]
    _energy_edges = u.Quantity([50, 100, 300, 1_000, 3e3, 1e4, 3e4, 1e5, 1e6], "MeV")
    @property
    def _auxiliary_filename(self):
        return make_path(
            f"$GAMMAPY_DATA/catalogs/fermi/3PC_auxiliary_20230728/{self.name}_3PC_data.fits.gz"
        )
    def _info_pulsar(self):
        d = self.data
        ss = "\n*** Pulsar info ***\n\n"
        ss += "{:<20s} : {:.3f}\n".format("Period", d["P0"])
        ss += "{:<20s} : {:.3e}\n".format("P_Dot", d["P1"])
        ss += "{:<20s} : {:.3e}\n".format("E_Dot", d["EDOT"])
        return ss
    def _info_phasogram(self):
        d = self.data
        ss = "\n*** Phasogram info ***\n\n"
        if not isinstance(d["NPEAK"], np.ma.core.MaskedConstant):
            npeak = d["NPEAK"]
            ss += "{:<20s} : {:.3f}\n".format("Number of peaks", npeak)
            if npeak > 1:
                ss += "{:<20s} : {:.3f}\n".format("Ph1 (peak one)", d["PHI1"])
                ss += "{:<20s} : {:.3f}\n".format(
                    "Ph2 (peak two)", d["PHI1"] + d["PKSEP"]
                )
                ss += "{:<20s} : {:.3f}\n".format("Peak separation", d["PKSEP"])
            else:
                if not isinstance(d["PHI1"], np.ma.core.MaskedConstant):
                    ss += "{:<20s} : {:.3f}\n".format("Ph1 (peak one)", d["PHI1"])
        else:
            ss += "No phasogram info available.\n"
        return ss
    def _info_spectral_fit(self):
        d = self.data_spectral
        ss = "\n*** Spectral info ***\n\n"
        if d is None:
            ss += "No spectral info available.\n"
            return ss
        ss += "{:<20s} : {:.0f}\n".format("TS", d["Test_Statistic"])
        ss += "{:<20s} : {:.0f}\n".format("Significance (DC)", d["Signif_Avg"])
        ss += "{:<20s} : {:s}\n".format("Spectrum Type", d["SpectrumType"])
        indentation = " " * 4
        fmt_e = "{}{:<20s} : {:.3e} +- {:.3e}\n"
        fmt_f = "{}{:<20s} : {:.3f} +- {:.3f}\n"
        if not isinstance(d["PLEC_Flux_Density_b23"], np.ma.core.MaskedConstant):
            ss += "\n{}* SuperExpCutoffPowerLaw4FGLDR3 b = 2/3 *\n\n".format(
                indentation
            )
            ss += fmt_e.format(
                indentation,
                "Amplitude",
                d["PLEC_Flux_Density_b23"],
                d["Unc_PLEC_Flux_Density_b23"],
            )
            ss += fmt_f.format(
                indentation,
                "Index 1",
                -d["PLEC_IndexS_b23"],
                d["Unc_PLEC_IndexS_b23"],
            )
            ss += "{}{:<20s} : {:.3f}\n".format(indentation, "Index 2", 0.6667)
            ss += "{}{:<20s} : {:.3f}\n".format(
                indentation, "Reference", d["Pivot_Energy_b23"]
            )
            ss += fmt_f.format(
                indentation,
                "Expfactor",
                d["PLEC_ExpfactorS_b23"],
                d["Unc_PLEC_ExpfactorS_b23"],
            )
        if not isinstance(d["PLEC_Flux_Density_bfr"], np.ma.core.MaskedConstant):
            ss += "\n{}* SuperExpCutoffPowerLaw4FGLDR3 b free *\n\n".format(indentation)
            ss += fmt_e.format(
                indentation,
                "Amplitude",
                d["PLEC_Flux_Density_bfr"],
                d["Unc_PLEC_Flux_Density_bfr"],
            )
            ss += fmt_f.format(
                indentation,
                "Index 1",
                -d["PLEC_IndexS_bfr"],
                d["Unc_PLEC_IndexS_bfr"],
            )
            ss += fmt_f.format(
                indentation,
                "Index 2",
                d["PLEC_Exp_Index_bfr"],
                d["Unc_PLEC_Exp_Index_bfr"],
            )
            ss += "{}{:<20s} : {:.3f}\n".format(
                indentation, "Reference", d["Pivot_Energy_bfr"]
            )
            ss += fmt_f.format(
                indentation,
                "Expfactor",
                d["PLEC_ExpfactorS_bfr"],
                d["Unc_PLEC_ExpfactorS_bfr"],
            )
        return ss
[docs]
    def spectral_model(self, fit="auto"):
        """
        In the 3PC, Fermi-LAT collaboration tried to fit a
        `~gammapy.modelling.models.SuperExpCutoffPowerLaw4FGLDR3SpectralModel` with the
        exponential index `index_2` free, or fixed to 2/3. These two models are referred
        as "b free" and "b 23". For most pulsars, both models are available. However,
        in some cases the "b free" model did not fit correctly.
        Parameters
        ----------
        fit : str, optional
            Which fitted model to return. The user can choose between "auto", "b free"
            and "b 23". "auto" will always try to return the "b free" first and fall
            back to the "b 23" fit if "b free" is not available. Default is "auto".
        """
        d = self.data_spectral
        if d is None or d["SpectrumType"] != "PLSuperExpCutoff4":
            log.warning(f"No spectral model available for source {self.name}")
            return None
        tag = "SuperExpCutoffPowerLaw4FGLDR3SpectralModel"
        if not (
            isinstance(d["PLEC_IndexS_bfr"], np.ma.core.masked_array) or (fit == "b 23")
        ):
            pars = {
                "reference": d["Pivot_Energy_bfr"],
                "amplitude": d["PLEC_Flux_Density_bfr"],
                "index_1": -d["PLEC_IndexS_bfr"],
                "index_2": d["PLEC_Exp_Index_bfr"],
                "expfactor": d["PLEC_ExpfactorS_bfr"],
            }
            errs = {
                "amplitude": d["Unc_PLEC_Flux_Density_bfr"],
                "index_1": d["Unc_PLEC_IndexS_bfr"],
                "index_2": d["Unc_PLEC_Exp_Index_bfr"],
                "expfactor": d["Unc_PLEC_ExpfactorS_bfr"],
            }
        else:
            pars = {
                "reference": d["Pivot_Energy_b23"],
                "amplitude": d["PLEC_Flux_Density_b23"],
                "index_1": -d["PLEC_IndexS_b23"],
                "index_2": d["PLEC_Exp_Index_b23"],
                "expfactor": d["PLEC_ExpfactorS_b23"],
            }
            errs = {
                "amplitude": d["Unc_PLEC_Flux_Density_b23"],
                "index_1": d["Unc_PLEC_IndexS_b23"],
                "expfactor": d["Unc_PLEC_ExpfactorS_b23"],
            }
        model = Model.create(tag, "spectral", **pars)
        for name, value in errs.items():
            model.parameters[name].error = value
        return model 
    @property
    def flux_points_table(self):
        """Flux points (`~astropy.table.Table`). Flux point is an upper limit if
        its significance is less than 2."""
        fp_data = self.data_spectral
        if fp_data is None:
            log.warning(f"No flux points available for source {self.name}")
            return None
        table = Table()
        table["e_min"] = self._energy_edges[:-1]
        table["e_max"] = self._energy_edges[1:]
        table["e_ref"] = np.sqrt(table["e_min"] * table["e_max"])
        fgl_cols = ["Flux_Band", "Unc_Flux_Band", "Sqrt_TS_Band", "nuFnu_Band"]
        flux, flux_err, sig, nuFnu = [fp_data[col] for col in fgl_cols]
        table["flux"] = flux
        table["flux_errn"] = np.abs(flux_err[:, 0])
        table["flux_errp"] = flux_err[:, 1]
        table["e2dnde"] = nuFnu
        table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux)
        table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux
        is_ul = np.isnan(flux_err[:, 0]) | (sig < 2)
        table["is_ul"] = is_ul
        table["flux_ul"] = np.nan * flux_err.unit
        flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"])
        table["flux_ul"][is_ul] = flux_ul[is_ul]
        table["e2dnde_ul"] = np.nan * table["e2dnde"].unit
        e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"])
        table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul]
        table["sqrt_ts"] = fp_data["Sqrt_TS_Band"]
        return table 
[docs]
class SourceCatalog3FGL(SourceCatalog):
    """Fermi-LAT 3FGL source catalog.
    - https://ui.adsabs.harvard.edu/abs/2015ApJS..218...23A
    - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/4yr_catalog/
    One source is represented by `~gammapy.catalog.SourceCatalogObject3FGL`.
    """
    tag = "3fgl"
    description = "LAT 4-year point source catalog"
    source_object_class = SourceCatalogObject3FGL
    def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psc_v16.fit.gz"):
        filename = make_path(filename)
        with warnings.catch_warnings():  # ignore FITS units warnings
            warnings.simplefilter("ignore", u.UnitsWarning)
            table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
        table_standardise_units_inplace(table)
        source_name_key = "Source_Name"
        source_name_alias = (
            "Extended_Source_Name",
            "0FGL_Name",
            "1FGL_Name",
            "2FGL_Name",
            "1FHL_Name",
            "ASSOC_TEV",
            "ASSOC1",
            "ASSOC2",
        )
        super().__init__(
            table=table,
            source_name_key=source_name_key,
            source_name_alias=source_name_alias,
        )
        self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
        self.hist_table = Table.read(filename, hdu="Hist_Start") 
[docs]
class SourceCatalog4FGL(SourceCatalog):
    """Fermi-LAT 4FGL source catalog.
    - https://arxiv.org/abs/1902.10045 (DR1)
    - https://arxiv.org/abs/2005.11208 (DR2)
    - https://arxiv.org/abs/2201.11184 (DR3)
    - https://arxiv.org/abs/2307.12546 (DR4)
    By default we use the file of the DR4 initial release
    from https://fermi.gsfc.nasa.gov/ssc/data/access/lat/14yr_catalog/
    One source is represented by `~gammapy.catalog.SourceCatalogObject4FGL`.
    """
    tag = "4fgl"
    description = "LAT 14-year point source catalog"
    source_object_class = SourceCatalogObject4FGL
    def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psc_v32.fit.gz"):
        filename = make_path(filename)
        table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
        table_standardise_units_inplace(table)
        source_name_key = "Source_Name"
        source_name_alias = (
            "Extended_Source_Name",
            "ASSOC_FGL",
            "ASSOC_FHL",
            "ASSOC_GAM1",
            "ASSOC_GAM2",
            "ASSOC_GAM3",
            "ASSOC_TEV",
            "ASSOC1",
            "ASSOC2",
        )
        super().__init__(
            table=table,
            source_name_key=source_name_key,
            source_name_alias=source_name_alias,
        )
        self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
        self.extended_sources_table["version"] = int(
            "".join(filter(str.isdigit, table.meta["VERSION"]))
        )
        try:
            self.hist_table = Table.read(filename, hdu="Hist_Start")
            if "MJDREFI" not in self.hist_table.meta:
                self.hist_table.meta = Table.read(filename, hdu="GTI").meta
        except KeyError:
            pass
        try:
            self.hist2_table = Table.read(filename, hdu="Hist2_Start")
            if "MJDREFI" not in self.hist_table.meta:
                self.hist2_table.meta = Table.read(filename, hdu="GTI").meta
        except KeyError:
            pass
        table = Table.read(filename, hdu="EnergyBounds")
        self.flux_points_energy_edges = np.unique(
            np.c_[table["LowerEnergy"].quantity, table["UpperEnergy"].quantity]
        ) 
[docs]
class SourceCatalog2FHL(SourceCatalog):
    """Fermi-LAT 2FHL source catalog.
    - https://ui.adsabs.harvard.edu/abs/2016ApJS..222....5A
    - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/2FHL/
    One source is represented by `~gammapy.catalog.SourceCatalogObject2FHL`.
    """
    tag = "2fhl"
    description = "LAT second high-energy source catalog"
    source_object_class = SourceCatalogObject2FHL
    def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v09.fit.gz"):
        filename = make_path(filename)
        with warnings.catch_warnings():  # ignore FITS units warnings
            warnings.simplefilter("ignore", u.UnitsWarning)
            table = Table.read(filename, hdu="2FHL Source Catalog")
        table_standardise_units_inplace(table)
        source_name_key = "Source_Name"
        source_name_alias = ("ASSOC", "3FGL_Name", "1FHL_Name", "TeVCat_Name")
        super().__init__(
            table=table,
            source_name_key=source_name_key,
            source_name_alias=source_name_alias,
        )
        self.extended_sources_table = Table.read(filename, hdu="Extended Sources")
        self.rois = Table.read(filename, hdu="ROIs") 
[docs]
class SourceCatalog3FHL(SourceCatalog):
    """Fermi-LAT 3FHL source catalog.
    - https://ui.adsabs.harvard.edu/abs/2017ApJS..232...18A
    - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/3FHL/
    One source is represented by `~gammapy.catalog.SourceCatalogObject3FHL`.
    """
    tag = "3fhl"
    description = "LAT third high-energy source catalog"
    source_object_class = SourceCatalogObject3FHL
    def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v13.fit.gz"):
        filename = make_path(filename)
        with warnings.catch_warnings():  # ignore FITS units warnings
            warnings.simplefilter("ignore", u.UnitsWarning)
            table = Table.read(filename, hdu="LAT_Point_Source_Catalog")
        table_standardise_units_inplace(table)
        source_name_key = "Source_Name"
        source_name_alias = ("ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM")
        super().__init__(
            table=table,
            source_name_key=source_name_key,
            source_name_alias=source_name_alias,
        )
        self.extended_sources_table = Table.read(filename, hdu="ExtendedSources")
        self.rois = Table.read(filename, hdu="ROIs")
        self.energy_bounds_table = Table.read(filename, hdu="EnergyBounds") 
[docs]
class SourceCatalog2PC(SourceCatalog):
    """Fermi-LAT 2nd pulsar catalog.
    - https://ui.adsabs.harvard.edu/abs/2013ApJS..208...17A
    - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/2nd_PSR_catalog/
    One source is represented by `~gammapy.catalog.SourceCatalogObject2PC`.
    """
    tag = "2PC"
    description = "LAT 2nd pulsar catalog"
    source_object_class = SourceCatalogObject2PC
    def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/2PC_catalog_v04.fits.gz"):
        filename = make_path(filename)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", u.UnitsWarning)
            table_psr = Table.read(filename, hdu="PULSAR_CATALOG")
            table_spectral = Table.read(filename, hdu="SPECTRAL")
            table_off_peak = Table.read(filename, hdu="OFF_PEAK")
        table_standardise_units_inplace(table_psr)
        table_standardise_units_inplace(table_spectral)
        table_standardise_units_inplace(table_off_peak)
        source_name_key = "PSR_Name"
        super().__init__(table=table_psr, source_name_key=source_name_key)
        self.source_object_class._source_name_key = source_name_key
        self.off_peak_table = table_off_peak
        self.spectral_table = table_spectral
[docs]
    def to_models(self, **kwargs):
        models = Models()
        for m in self:
            sky_model = m.sky_model()
            if sky_model is not None:
                models.append(sky_model)
        return models 
    def _get_name_spectral(self, data):
        return f"{data[self._source_name_key].strip()}" 
[docs]
class SourceCatalog3PC(SourceCatalog):
    """Fermi-LAT 3rd pulsar catalog.
    - https://arxiv.org/abs/2307.11132
    - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/3rd_PSR_catalog/
    One source is represented by `~gammapy.catalog.SourceCatalogObject3PC`.
    """
    tag = "3PC"
    description = "LAT 3rd pulsar catalog"
    source_object_class = SourceCatalogObject3PC
    def __init__(
        self, filename="$GAMMAPY_DATA/catalogs/fermi/3PC_Catalog+SEDs_20230803.fits.gz"
    ):
        filename = make_path(filename)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", u.UnitsWarning)
            table_psr = Table.read(filename, hdu="PULSARS_BIGFILE")
            table_spectral = Table.read(filename, hdu="LAT_Point_Source_Catalog")
            table_bigfile_config = Table.read(filename, hdu="BIGFILE_CONFIG")
        table_standardise_units_inplace(table_psr)
        table_standardise_units_inplace(table_spectral)
        table_standardise_units_inplace(table_bigfile_config)
        source_name_key = "PSRJ"
        super().__init__(table=table_psr, source_name_key=source_name_key)
        self.source_object_class._source_name_key = source_name_key
        self.spectral_table = table_spectral
        self.off_bigfile_config = table_bigfile_config
[docs]
    def to_models(self, **kwargs):
        models = Models()
        for m in self:
            sky_model = m.sky_model()
            if sky_model is not None:
                models.append(sky_model)
        return models 
    @property
    def _get_source_name_key(self):
        return "NickName"
    def _get_name_spectral(self, data):
        return f"PSR{data[self._source_name_key].strip()}"