Source code for gammapy.catalog.fermi

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Fermi catalog and source classes."""
import abc
import warnings
import numpy as np
import astropy.units as u
from astropy.table import Table
from astropy.wcs import FITSFixedWarning
from gammapy.estimators import FluxPoints
from gammapy.maps import MapAxis, Maps, RegionGeom
from gammapy.modeling.models import (
    DiskSpatialModel,
    GaussianSpatialModel,
    Model,
    PointSpatialModel,
    SkyModel,
    TemplateSpatialModel,
)
from gammapy.utils.gauss import Gauss2DPDF
from gammapy.utils.scripts import make_path
from gammapy.utils.table import table_standardise_units_inplace
from .core import SourceCatalog, SourceCatalogObject, format_flux_points_table

__all__ = [
    "SourceCatalogObject4FGL",
    "SourceCatalogObject3FGL",
    "SourceCatalogObject2FHL",
    "SourceCatalogObject3FHL",
    "SourceCatalog4FGL",
    "SourceCatalog3FGL",
    "SourceCatalog2FHL",
    "SourceCatalog3FHL",
]


def compute_flux_points_ul(quantity, quantity_errp):
    """Compute UL value for fermi flux points.

    See https://arxiv.org/pdf/1501.02003.pdf (page 30)
    """
    return 2 * quantity_errp + quantity


class SourceCatalogObjectFermiBase(SourceCatalogObject, abc.ABC):
    """Base class for Fermi-LAT catalogs."""

    asso = ["ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM1", "ASSOC_GAM2", "ASSOC_GAM3"]
    flux_points_meta = {
        "sed_type_init": "flux",
        "n_sigma": 1,
        "sqrt_ts_threshold_ul": 1,
        "n_sigma_ul": 2,
    }

    def __str__(self):
        return self.info()

    def info(self, info="all"):
        """Summary info string.

        Parameters
        ----------
        info : {'all', 'basic', 'more', 'position', 'spectral','lightcurve'}
            Comma separated list of options
        """
        if info == "all":
            info = "basic,more,position,spectral,lightcurve"

        ss = ""
        ops = info.split(",")
        if "basic" in ops:
            ss += self._info_basic()
        if "more" in ops:
            ss += self._info_more()
        if "position" in ops:
            ss += self._info_position()
            if not self.is_pointlike:
                ss += self._info_morphology()
        if "spectral" in ops:
            ss += self._info_spectral_fit()
            ss += self._info_spectral_points()
        if "lightcurve" in ops:
            ss += self._info_lightcurve()
        return ss

    def _info_basic(self):
        d = self.data
        keys = self.asso
        ss = "\n*** Basic info ***\n\n"
        ss += "Catalog row index (zero-based) : {}\n".format(self.row_index)
        ss += "{:<20s} : {}\n".format("Source name", self.name)
        if "Extended_Source_Name" in d:
            ss += "{:<20s} : {}\n".format("Extended name", d["Extended_Source_Name"])

        def get_nonentry_keys(keys):
            vals = [str(d[_]).strip() for _ in keys]
            return ", ".join([_ for _ in vals if _ != ""])

        associations = get_nonentry_keys(keys)
        ss += "{:<16s} : {}\n".format("Associations", associations)
        try:
            ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_BAY", d["ASSOC_PROB_BAY"])
            ss += "{:<16s} : {:.3f}\n".format("ASSOC_PROB_LR", d["ASSOC_PROB_LR"])
        except (KeyError):
            pass
        try:
            ss += "{:<16s} : {}\n".format("Class1", d["CLASS1"])
        except (KeyError):
            ss += "{:<16s} : {}\n".format("Class", d["CLASS"])
        try:
            ss += "{:<16s} : {}\n".format("Class2", d["CLASS2"])
        except (KeyError):
            pass
        ss += "{:<16s} : {}\n".format("TeVCat flag", d.get("TEVCAT_FLAG", "N/A"))
        return ss

    @abc.abstractmethod
    def _info_more(self):
        pass

    def _info_position(self):
        d = self.data
        ss = "\n*** Position info ***\n\n"
        ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"])
        ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"])
        ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"])

        ss += "\n"
        ss += "{:<20s} : {:.4f}\n".format("Semimajor (68%)", d["Conf_68_SemiMajor"])
        ss += "{:<20s} : {:.4f}\n".format("Semiminor (68%)", d["Conf_68_SemiMinor"])
        ss += "{:<20s} : {:.2f}\n".format("Position angle (68%)", d["Conf_68_PosAng"])
        ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"])
        ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"])
        ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"])
        ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"])
        return ss

    def _info_morphology(self):
        e = self.data_extended
        ss = "\n*** Extended source information ***\n\n"
        ss += "{:<16s} : {}\n".format("Model form", e["Model_Form"])
        ss += "{:<16s} : {:.4f}\n".format("Model semimajor", e["Model_SemiMajor"])
        ss += "{:<16s} : {:.4f}\n".format("Model semiminor", e["Model_SemiMinor"])
        ss += "{:<16s} : {:.4f}\n".format("Position angle", e["Model_PosAng"])
        try:
            ss += "{:<16s} : {}\n".format("Spatial function", e["Spatial_Function"])
        except KeyError:
            pass
        ss += "{:<16s} : {}\n\n".format("Spatial filename", e["Spatial_Filename"])
        return ss

    def _info_spectral_fit(self):
        return "\n"

    def _info_spectral_points(self):
        ss = "\n*** Spectral points ***\n\n"
        lines = format_flux_points_table(self.flux_points_table).pformat(
            max_width=-1, max_lines=-1
        )
        ss += "\n".join(lines)
        return ss

    def _info_lightcurve(self):
        return "\n"

    @property
    def is_pointlike(self):
        return self.data["Extended_Source_Name"].strip() == ""

    # FIXME: this should be renamed `set_position_error`,
    # and `phi_0` isn't filled correctly, other parameters missing
    # see https://github.com/gammapy/gammapy/pull/2533#issuecomment-553329049
    def _set_spatial_errors(self, model):
        d = self.data

        if "Pos_err_68" in d:
            percent = 0.68
            semi_minor = d["Pos_err_68"]
            semi_major = d["Pos_err_68"]
            phi_0 = 0.0
        else:
            percent = 0.95
            semi_minor = d["Conf_95_SemiMinor"]
            semi_major = d["Conf_95_SemiMajor"]
            phi_0 = d["Conf_95_PosAng"]

        if np.isnan(phi_0):
            phi_0 = 0.0 * u.deg

        scale_1sigma = Gauss2DPDF().containment_radius(percent)
        lat_err = semi_major / scale_1sigma
        lon_err = semi_minor / scale_1sigma / np.cos(d["DEJ2000"])

        if "TemplateSpatialModel" not in model.tag:
            model.parameters["lon_0"].error = lon_err
            model.parameters["lat_0"].error = lat_err
            model.phi_0 = phi_0

    def sky_model(self, name=None):
        """Sky model (`~gammapy.modeling.models.SkyModel`)."""
        if name is None:
            name = self.name

        return SkyModel(
            spatial_model=self.spatial_model(),
            spectral_model=self.spectral_model(),
            name=name,
        )

    @property
    def flux_points(self):
        """Flux points (`~gammapy.estimators.FluxPoints`)."""

        return FluxPoints.from_table(
            table=self.flux_points_table,
            reference_model=self.sky_model(),
            format="gadf-sed",
        )


[docs]class SourceCatalogObject4FGL(SourceCatalogObjectFermiBase): """One source from the Fermi-LAT 4FGL catalog. Catalog is represented by `~gammapy.catalog.SourceCatalog4FGL`. """ asso = [ "ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_FGL", "ASSOC_FHL", "ASSOC_GAM1", "ASSOC_GAM2", "ASSOC_GAM3", ] _energy_edges = u.Quantity([50, 100, 300, 1000, 3000, 10000, 30000, 300000], "MeV") def _info_more(self): d = self.data ss = "\n*** Other info ***\n\n" fmt = "{:<32s} : {:.3f}\n" ss += fmt.format("Significance (100 MeV - 1 TeV)", d["Signif_Avg"]) ss += "{:<32s} : {:.1f}\n".format("Npred", d["Npred"]) ss += "\n{:<20s} : {}\n".format("Other flags", d["Flags"]) return ss def _info_spectral_fit(self): d = self.data spec_type = d["SpectrumType"].strip() ss = "\n*** Spectral info ***\n\n" ss += "{:<45s} : {}\n".format("Spectrum type", d["SpectrumType"]) fmt = "{:<45s} : {:.3f}\n" ss += fmt.format("Detection significance (100 MeV - 1 TeV)", d["Signif_Avg"]) if spec_type == "PowerLaw": tag = "PL" elif spec_type == "LogParabola": tag = "LP" ss += "{:<45s} : {:.4f} +- {:.5f}\n".format( "beta", d["LP_beta"], d["Unc_LP_beta"] ) ss += "{:<45s} : {:.1f}\n".format("Significance curvature", d["LP_SigCurv"]) elif spec_type == "PLSuperExpCutoff": tag = "PLEC" fmt = "{:<45s} : {:.4f} +- {:.4f}\n" ss += fmt.format( "Exponential factor", d["PLEC_Expfactor"], d["Unc_PLEC_Expfactor"] ) ss += "{:<45s} : {:.4f} +- {:.4f}\n".format( "Super-exponential cutoff index", d["PLEC_Exp_Index"], d["Unc_PLEC_Exp_Index"], ) ss += "{:<45s} : {:.1f}\n".format( "Significance curvature", d["PLEC_SigCurv"] ) else: raise ValueError(f"Invalid spec_type: {spec_type!r}") ss += "{:<45s} : {:.0f} {}\n".format( "Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit ) fmt = "{:<45s} : {:.3f} +- {:.3f}\n" ss += fmt.format( "Spectral index", d[tag + "_Index"], d["Unc_" + tag + "_Index"] ) fmt = "{:<45s} : {:.3} +- {:.3} {}\n" ss += fmt.format( "Flux Density at pivot energy", d[tag + "_Flux_Density"].value, d["Unc_" + tag + "_Flux_Density"].value, "cm-2 MeV-1 s-1", ) fmt = "{:<45s} : {:.3} +- {:.3} {}\n" ss += fmt.format( "Integral flux (1 - 100 GeV)", d["Flux1000"].value, d["Unc_Flux1000"].value, "cm-2 s-1", ) fmt = "{:<45s} : {:.3} +- {:.3} {}\n" ss += fmt.format( "Energy flux (100 MeV - 100 GeV)", d["Energy_Flux100"].value, d["Unc_Energy_Flux100"].value, "erg cm-2 s-1", ) return ss def _info_lightcurve(self): d = self.data ss = "\n*** Lightcurve info ***\n\n" ss += "Lightcurve measured in the energy band: 100 MeV - 100 GeV\n\n" ss += "{:<15s} : {:.3f}\n".format("Variability index", d["Variability_Index"]) if np.isfinite(d["Flux_Peak"]): ss += "{:<40s} : {:.3f}\n".format( "Significance peak (100 MeV - 100 GeV)", d["Signif_Peak"] ) fmt = "{:<40s} : {:.3} +- {:.3} cm^-2 s^-1\n" ss += fmt.format( "Integral flux peak (100 MeV - 100 GeV)", d["Flux_Peak"].value, d["Unc_Flux_Peak"].value, ) # TODO: give time as UTC string, not MET ss += "{:<40s} : {:.3} s (Mission elapsed time)\n".format( "Time peak", d["Time_Peak"].value ) peak_interval = d["Peak_Interval"].to_value("day") ss += "{:<40s} : {:.3} day\n".format("Peak interval", peak_interval) else: ss += "\nNo peak measured for this source.\n" # TODO: Add a lightcurve table with d['Flux_History'] and d['Unc_Flux_History'] return ss
[docs] def spatial_model(self): """Spatial model (`~gammapy.modeling.models.SpatialModel`).""" d = self.data ra = d["RAJ2000"] dec = d["DEJ2000"] if self.is_pointlike: model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs") else: de = self.data_extended morph_type = de["Model_Form"].strip() e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5 sigma = de["Model_SemiMajor"] phi = de["Model_PosAng"] if morph_type == "Disk": r_0 = de["Model_SemiMajor"] model = DiskSpatialModel( lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs" ) elif morph_type in ["Map", "Ring", "2D Gaussian x2"]: filename = de["Spatial_Filename"].strip() path = make_path( "$GAMMAPY_DATA/catalogs/fermi/LAT_extended_sources_8years/Templates/" ) with warnings.catch_warnings(): # ignore FITS units warnings warnings.simplefilter("ignore", FITSFixedWarning) model = TemplateSpatialModel.read(path / filename) elif morph_type == "2D Gaussian": model = GaussianSpatialModel( lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs" ) else: raise ValueError(f"Invalid spatial model: {morph_type!r}") self._set_spatial_errors(model) return model
[docs] def spectral_model(self): """Best fit spectral model (`~gammapy.modeling.models.SpectralModel`).""" spec_type = self.data["SpectrumType"].strip() if spec_type == "PowerLaw": tag = "PowerLawSpectralModel" pars = { "reference": self.data["Pivot_Energy"], "amplitude": self.data["PL_Flux_Density"], "index": self.data["PL_Index"], } errs = { "amplitude": self.data["Unc_PL_Flux_Density"], "index": self.data["Unc_PL_Index"], } elif spec_type == "LogParabola": tag = "LogParabolaSpectralModel" pars = { "reference": self.data["Pivot_Energy"], "amplitude": self.data["LP_Flux_Density"], "alpha": self.data["LP_Index"], "beta": self.data["LP_beta"], } errs = { "amplitude": self.data["Unc_LP_Flux_Density"], "alpha": self.data["Unc_LP_Index"], "beta": self.data["Unc_LP_beta"], } elif spec_type == "PLSuperExpCutoff": tag = "SuperExpCutoffPowerLaw4FGLSpectralModel" pars = { "reference": self.data["Pivot_Energy"], "amplitude": self.data["PLEC_Flux_Density"], "index_1": self.data["PLEC_Index"], "index_2": self.data["PLEC_Exp_Index"], "expfactor": self.data["PLEC_Expfactor"], } errs = { "amplitude": self.data["Unc_PLEC_Flux_Density"], "index_1": self.data["Unc_PLEC_Index"], "index_2": np.nan_to_num(float(self.data["Unc_PLEC_Exp_Index"])), "expfactor": self.data["Unc_PLEC_Expfactor"], } else: raise ValueError(f"Invalid spec_type: {spec_type!r}") model = Model.create(tag, "spectral", **pars) for name, value in errs.items(): model.parameters[name].error = value return model
@property def flux_points_table(self): """Flux points (`~astropy.table.Table`).""" table = Table() table.meta.update(self.flux_points_meta) table["e_min"] = self._energy_edges[:-1] table["e_max"] = self._energy_edges[1:] flux = self._get_flux_values("Flux_Band") flux_err = self._get_flux_values("Unc_Flux_Band") table["flux"] = flux table["flux_errn"] = np.abs(flux_err[:, 0]) table["flux_errp"] = flux_err[:, 1] nuFnu = self._get_flux_values("nuFnu_Band", "erg cm-2 s-1") table["e2dnde"] = nuFnu table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux) table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux is_ul = np.isnan(table["flux_errn"]) table["is_ul"] = is_ul # handle upper limits table["flux_ul"] = np.nan * flux_err.unit flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"]) table["flux_ul"][is_ul] = flux_ul[is_ul] # handle upper limits table["e2dnde_ul"] = np.nan * nuFnu.unit e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"]) table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul] # Square root of test statistic table["sqrt_ts"] = self.data["Sqrt_TS_Band"] return table def _get_flux_values(self, prefix, unit="cm-2 s-1"): values = self.data[prefix] return u.Quantity(values, unit)
[docs] def lightcurve(self, interval="1-year"): """Lightcurve (`~gammapy.estimators.FluxPoints`). Parameters ---------- interval : {'1-year', '2-month'} Time interval of the lightcurve. Default is '1-year'. Note that '2-month' is not available for all catalogue version. """ if interval == "1-year": tag = "Flux_History" time_axis = self.data["time_axis"] tag_sqrt_ts = "Sqrt_TS_History" elif interval == "2-month": tag = "Flux2_History" if tag not in self.data: raise ValueError( "Only '1-year' interval is available for this catalogue version" ) time_axis = self.data["time_axis_2"] tag_sqrt_ts = "Sqrt_TS2_History" else: raise ValueError("Time intervals available are '1-year' or '2-month'") energy_axis = MapAxis.from_energy_edges([50, 300000] * u.MeV) geom = RegionGeom.create(region=self.position, axes=[energy_axis, time_axis]) names = ["flux", "flux_errp", "flux_errn", "flux_ul", "ts"] maps = Maps.from_geom(geom=geom, names=names) maps["flux"].quantity = self.data[tag] maps["flux_errp"].quantity = self.data[f"Unc_{tag}"][:, 1] maps["flux_errn"].quantity = -self.data[f"Unc_{tag}"][:, 0] maps["flux_ul"].quantity = compute_flux_points_ul( maps["flux"].quantity, maps["flux_errp"].quantity ) maps["ts"].quantity = self.data[tag_sqrt_ts] ** 2 return FluxPoints.from_maps( maps=maps, sed_type="flux", reference_model=self.sky_model(), meta=self.flux_points.meta.copy(), )
[docs]class SourceCatalogObject3FGL(SourceCatalogObjectFermiBase): """One source from the Fermi-LAT 3FGL catalog. Catalog is represented by `~gammapy.catalog.SourceCatalog3FGL`. """ _energy_edges = u.Quantity([100, 300, 1000, 3000, 10000, 100000], "MeV") _energy_edges_suffix = [ "100_300", "300_1000", "1000_3000", "3000_10000", "10000_100000", ] energy_range = u.Quantity([100, 100000], "MeV") """Energy range used for the catalog. Paper says that analysis uses data up to 300 GeV, but results are all quoted up to 100 GeV only to be consistent with previous catalogs. """ def _info_more(self): d = self.data ss = "\n*** Other info ***\n\n" ss += "{:<20s} : {}\n".format("Other flags", d["Flags"]) return ss def _info_spectral_fit(self): d = self.data spec_type = d["SpectrumType"].strip() ss = "\n*** Spectral info ***\n\n" ss += "{:<45s} : {}\n".format("Spectrum type", d["SpectrumType"]) fmt = "{:<45s} : {:.3f}\n" ss += fmt.format("Detection significance (100 MeV - 300 GeV)", d["Signif_Avg"]) ss += "{:<45s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"]) if spec_type == "PowerLaw": pass elif spec_type == "LogParabola": ss += "{:<45s} : {} +- {}\n".format("beta", d["beta"], d["Unc_beta"]) elif spec_type in ["PLExpCutoff", "PlSuperExpCutoff"]: fmt = "{:<45s} : {:.0f} +- {:.0f} {}\n" ss += fmt.format( "Cutoff energy", d["Cutoff"].value, d["Unc_Cutoff"].value, d["Cutoff"].unit, ) elif spec_type == "PLSuperExpCutoff": ss += "{:<45s} : {} +- {}\n".format( "Super-exponential cutoff index", d["Exp_Index"], d["Unc_Exp_Index"] ) else: raise ValueError(f"Invalid spec_type: {spec_type!r}") ss += "{:<45s} : {:.0f} {}\n".format( "Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit ) ss += "{:<45s} : {:.3f}\n".format( "Power law spectral index", d["PowerLaw_Index"] ) fmt = "{:<45s} : {:.3f} +- {:.3f}\n" ss += fmt.format("Spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"]) fmt = "{:<45s} : {:.3} +- {:.3} {}\n" ss += fmt.format( "Flux Density at pivot energy", d["Flux_Density"].value, d["Unc_Flux_Density"].value, "cm-2 MeV-1 s-1", ) fmt = "{:<45s} : {:.3} +- {:.3} {}\n" ss += fmt.format( "Integral flux (1 - 100 GeV)", d["Flux1000"].value, d["Unc_Flux1000"].value, "cm-2 s-1", ) fmt = "{:<45s} : {:.3} +- {:.3} {}\n" ss += fmt.format( "Energy flux (100 MeV - 100 GeV)", d["Energy_Flux100"].value, d["Unc_Energy_Flux100"].value, "erg cm-2 s-1", ) return ss def _info_lightcurve(self): d = self.data ss = "\n*** Lightcurve info ***\n\n" ss += "Lightcurve measured in the energy band: 100 MeV - 100 GeV\n\n" ss += "{:<15s} : {:.3f}\n".format("Variability index", d["Variability_Index"]) if np.isfinite(d["Flux_Peak"]): ss += "{:<40s} : {:.3f}\n".format( "Significance peak (100 MeV - 100 GeV)", d["Signif_Peak"] ) fmt = "{:<40s} : {:.3} +- {:.3} cm^-2 s^-1\n" ss += fmt.format( "Integral flux peak (100 MeV - 100 GeV)", d["Flux_Peak"].value, d["Unc_Flux_Peak"].value, ) # TODO: give time as UTC string, not MET ss += "{:<40s} : {:.3} s (Mission elapsed time)\n".format( "Time peak", d["Time_Peak"].value ) peak_interval = d["Peak_Interval"].to_value("day") ss += "{:<40s} : {:.3} day\n".format("Peak interval", peak_interval) else: ss += "\nNo peak measured for this source.\n" # TODO: Add a lightcurve table with d['Flux_History'] and d['Unc_Flux_History'] return ss
[docs] def spectral_model(self): """Best fit spectral model (`~gammapy.modeling.models.SpectralModel`).""" spec_type = self.data["SpectrumType"].strip() if spec_type == "PowerLaw": tag = "PowerLawSpectralModel" pars = { "amplitude": self.data["Flux_Density"], "reference": self.data["Pivot_Energy"], "index": self.data["Spectral_Index"], } errs = { "amplitude": self.data["Unc_Flux_Density"], "index": self.data["Unc_Spectral_Index"], } elif spec_type == "PLExpCutoff": tag = "ExpCutoffPowerLaw3FGLSpectralModel" pars = { "amplitude": self.data["Flux_Density"], "reference": self.data["Pivot_Energy"], "index": self.data["Spectral_Index"], "ecut": self.data["Cutoff"], } errs = { "amplitude": self.data["Unc_Flux_Density"], "index": self.data["Unc_Spectral_Index"], "ecut": self.data["Unc_Cutoff"], } elif spec_type == "LogParabola": tag = "LogParabolaSpectralModel" pars = { "amplitude": self.data["Flux_Density"], "reference": self.data["Pivot_Energy"], "alpha": self.data["Spectral_Index"], "beta": self.data["beta"], } errs = { "amplitude": self.data["Unc_Flux_Density"], "alpha": self.data["Unc_Spectral_Index"], "beta": self.data["Unc_beta"], } elif spec_type == "PLSuperExpCutoff": tag = "SuperExpCutoffPowerLaw3FGLSpectralModel" pars = { "amplitude": self.data["Flux_Density"], "reference": self.data["Pivot_Energy"], "index_1": self.data["Spectral_Index"], "index_2": self.data["Exp_Index"], "ecut": self.data["Cutoff"], } errs = { "amplitude": self.data["Unc_Flux_Density"], "index_1": self.data["Unc_Spectral_Index"], "index_2": self.data["Unc_Exp_Index"], "ecut": self.data["Unc_Cutoff"], } else: raise ValueError(f"Invalid spec_type: {spec_type!r}") model = Model.create(tag, "spectral", **pars) for name, value in errs.items(): model.parameters[name].error = value return model
[docs] def spatial_model(self): """Spatial model (`~gammapy.modeling.models.SpatialModel`).""" d = self.data ra = d["RAJ2000"] dec = d["DEJ2000"] if self.is_pointlike: model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs") else: de = self.data_extended morph_type = de["Model_Form"].strip() e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5 sigma = de["Model_SemiMajor"] phi = de["Model_PosAng"] if morph_type == "Disk": r_0 = de["Model_SemiMajor"] model = DiskSpatialModel( lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs" ) elif morph_type in ["Map", "Ring", "2D Gaussian x2"]: filename = de["Spatial_Filename"].strip() path = make_path( "$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v15/Templates/" ) model = TemplateSpatialModel.read(path / filename) elif morph_type == "2D Gaussian": model = GaussianSpatialModel( lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs" ) else: raise ValueError(f"Invalid spatial model: {morph_type!r}") self._set_spatial_errors(model) return model
@property def flux_points_table(self): """Flux points (`~astropy.table.Table`).""" table = Table() table.meta.update(self.flux_points_meta) table["e_min"] = self._energy_edges[:-1] table["e_max"] = self._energy_edges[1:] flux = self._get_flux_values("Flux") flux_err = self._get_flux_values("Unc_Flux") table["flux"] = flux table["flux_errn"] = np.abs(flux_err[:, 0]) table["flux_errp"] = flux_err[:, 1] nuFnu = self._get_flux_values("nuFnu", "erg cm-2 s-1") table["e2dnde"] = nuFnu table["e2dnde_errn"] = np.abs(nuFnu * flux_err[:, 0] / flux) table["e2dnde_errp"] = nuFnu * flux_err[:, 1] / flux is_ul = np.isnan(table["flux_errn"]) table["is_ul"] = is_ul # handle upper limits table["flux_ul"] = np.nan * flux_err.unit flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"]) table["flux_ul"][is_ul] = flux_ul[is_ul] # handle upper limits table["e2dnde_ul"] = np.nan * nuFnu.unit e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"]) table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul] # Square root of test statistic table["sqrt_ts"] = [self.data["Sqrt_TS" + _] for _ in self._energy_edges_suffix] return table def _get_flux_values(self, prefix, unit="cm-2 s-1"): values = [self.data[prefix + _] for _ in self._energy_edges_suffix] return u.Quantity(values, unit)
[docs] def lightcurve(self): """Lightcurve (`~gammapy.estimators.FluxPoints`).""" time_axis = self.data["time_axis"] tag = "Flux_History" energy_axis = MapAxis.from_energy_edges(self.energy_range) geom = RegionGeom.create(region=self.position, axes=[energy_axis, time_axis]) names = ["flux", "flux_errp", "flux_errn", "flux_ul"] maps = Maps.from_geom(geom=geom, names=names) maps["flux"].quantity = self.data[tag] maps["flux_errp"].quantity = self.data[f"Unc_{tag}"][:, 1] maps["flux_errn"].quantity = -self.data[f"Unc_{tag}"][:, 0] maps["flux_ul"].quantity = compute_flux_points_ul( maps["flux"].quantity, maps["flux_errp"].quantity ) is_ul = np.isnan(maps["flux_errn"]) maps["flux_ul"].data[~is_ul] = np.nan return FluxPoints.from_maps( maps=maps, sed_type="flux", reference_model=self.sky_model(), meta=self.flux_points_meta.copy(), )
[docs]class SourceCatalogObject2FHL(SourceCatalogObjectFermiBase): """One source from the Fermi-LAT 2FHL catalog. Catalog is represented by `~gammapy.catalog.SourceCatalog2FHL`. """ asso = ["ASSOC", "3FGL_Name", "1FHL_Name", "TeVCat_Name"] _energy_edges = u.Quantity([50, 171, 585, 2000], "GeV") _energy_edges_suffix = ["50_171", "171_585", "585_2000"] energy_range = u.Quantity([0.05, 2], "TeV") """Energy range used for the catalog.""" def _info_more(self): d = self.data ss = "\n*** Other info ***\n\n" fmt = "{:<32s} : {:.3f}\n" ss += fmt.format("Test statistic (50 GeV - 2 TeV)", d["TS"]) return ss def _info_position(self): d = self.data ss = "\n*** Position info ***\n\n" ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"]) ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"]) ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"]) ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"]) ss += "\n" ss += "{:<20s} : {:.4f}\n".format("Error on position (68%)", d["Pos_err_68"]) ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI"]) return ss def _info_spectral_fit(self): d = self.data ss = "\n*** Spectral fit info ***\n\n" fmt = "{:<32s} : {:.3f} +- {:.3f}\n" ss += fmt.format( "Power-law spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"] ) ss += "{:<32s} : {:.3} +- {:.3} {}\n".format( "Integral flux (50 GeV - 2 TeV)", d["Flux50"].value, d["Unc_Flux50"].value, "cm-2 s-1", ) ss += "{:<32s} : {:.3} +- {:.3} {}\n".format( "Energy flux (50 GeV - 2 TeV)", d["Energy_Flux50"].value, d["Unc_Energy_Flux50"].value, "erg cm-2 s-1", ) return ss @property def is_pointlike(self): return self.data["Source_Name"].strip()[-1] != "e"
[docs] def spatial_model(self): """Spatial model (`~gammapy.modeling.models.SpatialModel`).""" d = self.data ra = d["RAJ2000"] dec = d["DEJ2000"] if self.is_pointlike: model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs") else: de = self.data_extended morph_type = de["Model_Form"].strip() e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5 sigma = de["Model_SemiMajor"] phi = de["Model_PosAng"] if morph_type in ["Disk", "Elliptical Disk"]: r_0 = de["Model_SemiMajor"] model = DiskSpatialModel( lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs" ) elif morph_type in ["Map", "Ring", "2D Gaussian x2"]: filename = de["Spatial_Filename"].strip() path = make_path( "$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v15/Templates/" ) return TemplateSpatialModel.read(path / filename) elif morph_type in ["2D Gaussian", "Elliptical 2D Gaussian"]: model = GaussianSpatialModel( lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs" ) else: raise ValueError(f"Invalid spatial model: {morph_type!r}") self._set_spatial_errors(model) return model
[docs] def spectral_model(self): """Best fit spectral model (`~gammapy.modeling.models.SpectralModel`).""" tag = "PowerLaw2SpectralModel" pars = { "amplitude": self.data["Flux50"], "emin": self.energy_range[0], "emax": self.energy_range[1], "index": self.data["Spectral_Index"], } errs = { "amplitude": self.data["Unc_Flux50"], "index": self.data["Unc_Spectral_Index"], } model = Model.create(tag, "spectral", **pars) for name, value in errs.items(): model.parameters[name].error = value return model
@property def flux_points_table(self): """Flux points (`~astropy.table.Table`).""" table = Table() table.meta.update(self.flux_points_meta) table["e_min"] = self._energy_edges[:-1] table["e_max"] = self._energy_edges[1:] table["flux"] = self._get_flux_values("Flux") flux_err = self._get_flux_values("Unc_Flux") table["flux_errn"] = np.abs(flux_err[:, 0]) table["flux_errp"] = flux_err[:, 1] # handle upper limits is_ul = np.isnan(table["flux_errn"]) table["is_ul"] = is_ul table["flux_ul"] = np.nan * flux_err.unit flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"]) table["flux_ul"][is_ul] = flux_ul[is_ul] return table def _get_flux_values(self, prefix, unit="cm-2 s-1"): values = [self.data[prefix + _ + "GeV"] for _ in self._energy_edges_suffix] return u.Quantity(values, unit)
[docs]class SourceCatalogObject3FHL(SourceCatalogObjectFermiBase): """One source from the Fermi-LAT 3FHL catalog. Catalog is represented by `~gammapy.catalog.SourceCatalog3FHL`. """ asso = ["ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM"] energy_range = u.Quantity([0.01, 2], "TeV") """Energy range used for the catalog.""" _energy_edges = u.Quantity([10, 20, 50, 150, 500, 2000], "GeV") def _info_position(self): d = self.data ss = "\n*** Position info ***\n\n" ss += "{:<20s} : {:.3f}\n".format("RA", d["RAJ2000"]) ss += "{:<20s} : {:.3f}\n".format("DEC", d["DEJ2000"]) ss += "{:<20s} : {:.3f}\n".format("GLON", d["GLON"]) ss += "{:<20s} : {:.3f}\n".format("GLAT", d["GLAT"]) # TODO: All sources are non-elliptical; just give one number for radius? ss += "\n" ss += "{:<20s} : {:.4f}\n".format("Semimajor (95%)", d["Conf_95_SemiMajor"]) ss += "{:<20s} : {:.4f}\n".format("Semiminor (95%)", d["Conf_95_SemiMinor"]) ss += "{:<20s} : {:.2f}\n".format("Position angle (95%)", d["Conf_95_PosAng"]) ss += "{:<20s} : {:.0f}\n".format("ROI number", d["ROI_num"]) return ss def _info_spectral_fit(self): d = self.data spec_type = d["SpectrumType"].strip() ss = "\n*** Spectral fit info ***\n\n" ss += "{:<32s} : {}\n".format("Spectrum type", d["SpectrumType"]) ss += "{:<32s} : {:.1f}\n".format("Significance curvature", d["Signif_Curve"]) # Power-law parameters are always given; give in any case fmt = "{:<32s} : {:.3f} +- {:.3f}\n" ss += fmt.format( "Power-law spectral index", d["PowerLaw_Index"], d["Unc_PowerLaw_Index"] ) if spec_type == "PowerLaw": pass elif spec_type == "LogParabola": fmt = "{:<32s} : {:.3f} +- {:.3f}\n" ss += fmt.format( "LogParabolaSpectralModel spectral index", d["Spectral_Index"], d["Unc_Spectral_Index"], ) ss += "{:<32s} : {:.3f} +- {:.3f}\n".format( "LogParabolaSpectralModel beta", d["beta"], d["Unc_beta"] ) else: raise ValueError(f"Invalid spec_type: {spec_type!r}") ss += "{:<32s} : {:.1f} {}\n".format( "Pivot energy", d["Pivot_Energy"].value, d["Pivot_Energy"].unit ) ss += "{:<32s} : {:.3} +- {:.3} {}\n".format( "Flux Density at pivot energy", d["Flux_Density"].value, d["Unc_Flux_Density"].value, "cm-2 GeV-1 s-1", ) ss += "{:<32s} : {:.3} +- {:.3} {}\n".format( "Integral flux (10 GeV - 1 TeV)", d["Flux"].value, d["Unc_Flux"].value, "cm-2 s-1", ) ss += "{:<32s} : {:.3} +- {:.3} {}\n".format( "Energy flux (10 GeV - TeV)", d["Energy_Flux"].value, d["Unc_Energy_Flux"].value, "erg cm-2 s-1", ) return ss def _info_more(self): d = self.data ss = "\n*** Other info ***\n\n" fmt = "{:<32s} : {:.3f}\n" ss += fmt.format("Significance (10 GeV - 2 TeV)", d["Signif_Avg"]) ss += "{:<32s} : {:.1f}\n".format("Npred", d["Npred"]) ss += "\n{:<16s} : {:.3f} {}\n".format( "HEP Energy", d["HEP_Energy"].value, d["HEP_Energy"].unit ) ss += "{:<16s} : {:.3f}\n".format("HEP Probability", d["HEP_Prob"]) ss += "{:<16s} : {}\n".format("Bayesian Blocks", d["Variability_BayesBlocks"]) ss += "{:<16s} : {:.3f}\n".format("Redshift", d["Redshift"]) ss += "{:<16s} : {:.3} {}\n".format( "NuPeak_obs", d["NuPeak_obs"].value, d["NuPeak_obs"].unit ) return ss
[docs] def spectral_model(self): """Best fit spectral model (`~gammapy.modeling.models.SpectralModel`).""" d = self.data spec_type = self.data["SpectrumType"].strip() if spec_type == "PowerLaw": tag = "PowerLawSpectralModel" pars = { "reference": d["Pivot_Energy"], "amplitude": d["Flux_Density"], "index": d["PowerLaw_Index"], } errs = { "amplitude": d["Unc_Flux_Density"], "index": d["Unc_PowerLaw_Index"], } elif spec_type == "LogParabola": tag = "LogParabolaSpectralModel" pars = { "reference": d["Pivot_Energy"], "amplitude": d["Flux_Density"], "alpha": d["Spectral_Index"], "beta": d["beta"], } errs = { "amplitude": d["Unc_Flux_Density"], "alpha": d["Unc_Spectral_Index"], "beta": d["Unc_beta"], } else: raise ValueError(f"Invalid spec_type: {spec_type!r}") model = Model.create(tag, "spectral", **pars) for name, value in errs.items(): model.parameters[name].error = value return model
@property def flux_points_table(self): """Flux points (`~astropy.table.Table`).""" table = Table() table.meta.update(self.flux_points_meta) table["e_min"] = self._energy_edges[:-1] table["e_max"] = self._energy_edges[1:] flux = self.data["Flux_Band"] flux_err = self.data["Unc_Flux_Band"] e2dnde = self.data["nuFnu"] table["flux"] = flux table["flux_errn"] = np.abs(flux_err[:, 0]) table["flux_errp"] = flux_err[:, 1] table["e2dnde"] = e2dnde table["e2dnde_errn"] = np.abs(e2dnde * flux_err[:, 0] / flux) table["e2dnde_errp"] = e2dnde * flux_err[:, 1] / flux is_ul = np.isnan(table["flux_errn"]) table["is_ul"] = is_ul # handle upper limits table["flux_ul"] = np.nan * flux_err.unit flux_ul = compute_flux_points_ul(table["flux"], table["flux_errp"]) table["flux_ul"][is_ul] = flux_ul[is_ul] table["e2dnde_ul"] = np.nan * e2dnde.unit e2dnde_ul = compute_flux_points_ul(table["e2dnde"], table["e2dnde_errp"]) table["e2dnde_ul"][is_ul] = e2dnde_ul[is_ul] # Square root of test statistic table["sqrt_ts"] = self.data["Sqrt_TS_Band"] return table
[docs] def spatial_model(self): """Source spatial model (`~gammapy.modeling.models.SpatialModel`).""" d = self.data ra = d["RAJ2000"] dec = d["DEJ2000"] if self.is_pointlike: model = PointSpatialModel(lon_0=ra, lat_0=dec, frame="icrs") else: de = self.data_extended morph_type = de["Spatial_Function"].strip() e = (1 - (de["Model_SemiMinor"] / de["Model_SemiMajor"]) ** 2.0) ** 0.5 sigma = de["Model_SemiMajor"] phi = de["Model_PosAng"] if morph_type == "RadialDisk": r_0 = de["Model_SemiMajor"] model = DiskSpatialModel( lon_0=ra, lat_0=dec, r_0=r_0, e=e, phi=phi, frame="icrs" ) elif morph_type in ["SpatialMap"]: filename = de["Spatial_Filename"].strip() path = make_path( "$GAMMAPY_DATA/catalogs/fermi/Extended_archive_v18/Templates/" ) model = TemplateSpatialModel.read(path / filename) elif morph_type == "RadialGauss": model = GaussianSpatialModel( lon_0=ra, lat_0=dec, sigma=sigma, e=e, phi=phi, frame="icrs" ) else: raise ValueError(f"Invalid morph_type: {morph_type!r}") self._set_spatial_errors(model) return model
[docs]class SourceCatalog3FGL(SourceCatalog): """Fermi-LAT 3FGL source catalog. - https://ui.adsabs.harvard.edu/#abs/2015ApJS..218...23A - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/4yr_catalog/ One source is represented by `~gammapy.catalog.SourceCatalogObject3FGL`. """ tag = "3fgl" description = "LAT 4-year point source catalog" source_object_class = SourceCatalogObject3FGL def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psc_v16.fit.gz"): filename = make_path(filename) with warnings.catch_warnings(): # ignore FITS units warnings warnings.simplefilter("ignore", u.UnitsWarning) table = Table.read(filename, hdu="LAT_Point_Source_Catalog") table_standardise_units_inplace(table) source_name_key = "Source_Name" source_name_alias = ( "Extended_Source_Name", "0FGL_Name", "1FGL_Name", "2FGL_Name", "1FHL_Name", "ASSOC_TEV", "ASSOC1", "ASSOC2", ) super().__init__( table=table, source_name_key=source_name_key, source_name_alias=source_name_alias, ) self.extended_sources_table = Table.read(filename, hdu="ExtendedSources") self.hist_table = Table.read(filename, hdu="Hist_Start")
[docs]class SourceCatalog4FGL(SourceCatalog): """Fermi-LAT 4FGL-DR2 source catalog. - https://arxiv.org/abs/1902.10045 (4FGL paper) - https://arxiv.org/abs/2005.11208 (DR2 document) - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/10yr_catalog/ One source is represented by `~gammapy.catalog.SourceCatalogObject4FGL`. """ tag = "4fgl" description = "LAT 8-year point source catalog" source_object_class = SourceCatalogObject4FGL def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psc_v27.fit.gz"): filename = make_path(filename) table = Table.read(filename, hdu="LAT_Point_Source_Catalog") table_standardise_units_inplace(table) source_name_key = "Source_Name" source_name_alias = ( "Extended_Source_Name", "ASSOC_FGL", "ASSOC_FHL", "ASSOC_GAM1", "ASSOC_GAM2", "ASSOC_GAM3", "ASSOC_TEV", "ASSOC1", "ASSOC2", ) super().__init__( table=table, source_name_key=source_name_key, source_name_alias=source_name_alias, ) self.extended_sources_table = Table.read(filename, hdu="ExtendedSources") self.hist_table = Table.read(filename, hdu="Hist_Start") try: self.hist2_table = Table.read(filename, hdu="Hist2_Start") except KeyError: pass
[docs]class SourceCatalog2FHL(SourceCatalog): """Fermi-LAT 2FHL source catalog. - https://ui.adsabs.harvard.edu/abs/2016ApJS..222....5A - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/2FHL/ One source is represented by `~gammapy.catalog.SourceCatalogObject2FHL`. """ tag = "2fhl" description = "LAT second high-energy source catalog" source_object_class = SourceCatalogObject2FHL def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v09.fit.gz"): filename = make_path(filename) with warnings.catch_warnings(): # ignore FITS units warnings warnings.simplefilter("ignore", u.UnitsWarning) table = Table.read(filename, hdu="2FHL Source Catalog") table_standardise_units_inplace(table) source_name_key = "Source_Name" source_name_alias = ("ASSOC", "3FGL_Name", "1FHL_Name", "TeVCat_Name") super().__init__( table=table, source_name_key=source_name_key, source_name_alias=source_name_alias, ) self.extended_sources_table = Table.read(filename, hdu="Extended Sources") self.rois = Table.read(filename, hdu="ROIs")
[docs]class SourceCatalog3FHL(SourceCatalog): """Fermi-LAT 3FHL source catalog. - https://ui.adsabs.harvard.edu/abs/2017ApJS..232...18A - https://fermi.gsfc.nasa.gov/ssc/data/access/lat/3FHL/ One source is represented by `~gammapy.catalog.SourceCatalogObject3FHL`. """ tag = "3fhl" description = "LAT third high-energy source catalog" source_object_class = SourceCatalogObject3FHL def __init__(self, filename="$GAMMAPY_DATA/catalogs/fermi/gll_psch_v13.fit.gz"): filename = make_path(filename) with warnings.catch_warnings(): # ignore FITS units warnings warnings.simplefilter("ignore", u.UnitsWarning) table = Table.read(filename, hdu="LAT_Point_Source_Catalog") table_standardise_units_inplace(table) source_name_key = "Source_Name" source_name_alias = ("ASSOC1", "ASSOC2", "ASSOC_TEV", "ASSOC_GAM") super().__init__( table=table, source_name_key=source_name_key, source_name_alias=source_name_alias, ) self.extended_sources_table = Table.read(filename, hdu="ExtendedSources") self.rois = Table.read(filename, hdu="ROIs") self.energy_bounds_table = Table.read(filename, hdu="EnergyBounds")