Source code for gammapy.catalog.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Source catalog and object base classes."""
import abc
import numbers
from astropy.coordinates import SkyCoord
from astropy.utils import lazyproperty
from gammapy.utils.table import table_from_row_data, table_row_to_dict

__all__ = ["SourceCatalog", "SourceCatalogObject"]


# https://pydanny.blogspot.com/2011/11/loving-bunch-class.html
class Bunch(dict):
    def __init__(self, **kw):
        dict.__init__(self, kw)
        self.__dict__.update(kw)


[docs]class SourceCatalogObject: """Source catalog object. This class can be used directly, but it's mostly used as a base class for the other source catalog classes. The catalog data on this source is stored in the `source.data` attribute as a dict. The source catalog object is decoupled from the source catalog, it doesn't hold a reference back to it, except for a key ``_row_index`` of type ``int`` that links to the catalog table row the source information comes from. """ _source_name_key = "Source_Name" _row_index_key = "_row_index" def __init__(self, data, data_extended=None): self.data = Bunch(**data) if data_extended: self.data_extended = Bunch(**data_extended) @property def name(self): """Source name (str)""" name = self.data[self._source_name_key] return name.strip() @property def row_index(self): """Row index of source in catalog (int)""" return self.data[self._row_index_key] @property def position(self): """Source position (`~astropy.coordinates.SkyCoord`).""" table = table_from_row_data([self.data]) return _skycoord_from_table(table)[0]
[docs]class SourceCatalog(abc.ABC): """Generic source catalog. This class can be used directly, but it's mostly used as a base class for the other source catalog classes. This is a thin wrapper around `~astropy.table.Table`, which is stored in the ``catalog.table`` attribute. Parameters ---------- table : `~astropy.table.Table` Table with catalog data. source_name_key : str Column with source name information source_name_alias : tuple of str Columns with source name aliases. This will allow accessing the source row by alias names as well. """
[docs] @classmethod @abc.abstractmethod def description(cls): """Catalog description (str).""" pass
source_object_class = SourceCatalogObject """Source class (`SourceCatalogObject`).""" def __init__(self, table, source_name_key="Source_Name", source_name_alias=()): self.table = table self._source_name_key = source_name_key self._source_name_alias = source_name_alias def __str__(self): return ( f"{self.__class__.__name__}:\n" f" name: {self.name}\n" f" description: {self.description}\n" f" sources: {len(self.table)}\n" ) @lazyproperty def _name_to_index_cache(self): # Make a dict for quick lookup: source name -> row index names = dict() for idx, row in enumerate(self.table): name = row[self._source_name_key] names[name.strip()] = idx for alias_column in self._source_name_alias: for alias in row[alias_column].split(","): if not alias == "": names[alias.strip()] = idx return names
[docs] def row_index(self, name): """Look up row index of source by name. Parameters ---------- name : str Source name Returns ------- index : int Row index of source in table """ index = self._name_to_index_cache[name] row = self.table[index] # check if name lookup is correct other wise recompute _name_to_index_cache possible_names = [row[self._source_name_key]] for alias_column in self._source_name_alias: possible_names += row[alias_column].split(",") if name not in possible_names: self.__dict__.pop("_name_to_index_cache") index = self._name_to_index_cache[name] return index
[docs] def source_name(self, index): """Look up source name by row index. Parameters ---------- index : int Row index of source in table """ source_name_col = self.table[self._source_name_key] name = source_name_col[index] return name.strip()
def __getitem__(self, key): """Get source by name. Parameters ---------- key : str or int Source name or row index Returns ------- source : `SourceCatalogObject` An object representing one source """ if isinstance(key, str): index = self.row_index(key) elif isinstance(key, numbers.Integral): index = key else: raise TypeError(f"Invalid key: {key!r}, {type(key)}\n") return self._make_source_object(index) def _make_source_object(self, index): """Make one source object. Parameters ---------- index : int Row index Returns ------- source : `SourceCatalogObject` Source object """ data = table_row_to_dict(self.table[index]) data[SourceCatalogObject._row_index_key] = index if "Extended_Source_Name" in data: name_extended = data["Extended_Source_Name"].strip() elif "Source_Name" in data: name_extended = data["Source_Name"].strip() else: name_extended = None try: idx = self._lookup_extended_source_idx[name_extended] data_extended = table_row_to_dict(self.extended_sources_table[idx]) except (KeyError, AttributeError): data_extended = None source = self.source_object_class(data, data_extended) return source @lazyproperty def _lookup_extended_source_idx(self): names = [_.strip() for _ in self.extended_sources_table["Source_Name"]] idx = range(len(names)) return dict(zip(names, idx)) @property def positions(self): """Source positions (`~astropy.coordinates.SkyCoord`).""" return _skycoord_from_table(self.table)
def _skycoord_from_table(table): keys = table.colnames if {"RAJ2000", "DEJ2000"}.issubset(keys): lon, lat, frame = "RAJ2000", "DEJ2000", "icrs" elif {"RA", "DEC"}.issubset(keys): lon, lat, frame = "RA", "DEC", "icrs" elif {"ra", "dec"}.issubset(keys): lon, lat, frame = "ra", "dec", "icrs" else: raise KeyError("No column GLON / GLAT or RA / DEC or RAJ2000 / DEJ2000 found.") unit = table[lon].unit.to_string() if table[lon].unit else "deg" return SkyCoord(table[lon], table[lat], unit=unit, frame=frame)