# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Source catalog and object base classes."""importabcimporthtmlimportnumbersfromcopyimportdeepcopyimportnumpyasnpfromastropy.coordinatesimportSkyCoordfromastropy.tableimportTablefromastropy.utilsimportlazypropertyfromgammapy.mapsimportTimeMapAxisfromgammapy.modeling.modelsimportModelsfromgammapy.utils.tableimporttable_row_to_dict__all__=["SourceCatalog","SourceCatalogObject"]# https://pydanny.blogspot.com/2011/11/loving-bunch-class.htmlclassBunch(dict):def__init__(self,**kw):dict.__init__(self,kw)self.__dict__.update(kw)defformat_flux_points_table(table):forcolumnintable.colnames:ifcolumn.startswith(("dnde","eflux","flux","e2dnde","ref")):table[column].format=".3e"elifcolumn.startswith(("e_min","e_max","e_ref","sqrt_ts","norm","ts","stat")):table[column].format=".3f"returntable
[docs]classSourceCatalogObject:"""Source catalog object. This class can be used directly, but it is mostly used as a base class for the other source catalog classes. The catalog data on this source is stored in the `source.data` attribute as a dict. The source catalog object is decoupled from the source catalog, it doesn't hold a reference back to it, except for a key ``_row_index`` of type ``int`` that links to the catalog table row the source information comes from. """_source_name_key="Source_Name"_row_index_key="_row_index"def__init__(self,data,data_extended=None):self.data=Bunch(**data)ifdata_extended:self.data_extended=Bunch(**data_extended)def_repr_html_(self):try:returnself.to_html()exceptAttributeError:returnf"<pre>{html.escape(str(self))}</pre>"@propertydefname(self):"""Source name as a string."""name=self.data[self._source_name_key]returnname.strip()@propertydefrow_index(self):"""Row index of source in catalog as an integer."""returnself.data[self._row_index_key]@propertydefposition(self):"""Source position as an `~astropy.coordinates.SkyCoord` object."""table=Table([self.data])return_skycoord_from_table(table)[0]
[docs]classSourceCatalog(abc.ABC):"""Generic source catalog. This class can be used directly, but it is mostly used as a base class for the other source catalog classes. This is a thin wrapper around `~astropy.table.Table`, which is stored in the ``catalog.table`` attribute. Parameters ---------- table : `~astropy.table.Table` Table with catalog data. source_name_key : str Column with source name information. source_name_alias : tuple of str Columns with source name aliases. This will allow accessing the source row by alias names as well. """
[docs]@classmethod@abc.abstractmethoddefdescription(cls):"""Catalog description as a string."""pass
@property@abc.abstractmethoddeftag(self):passsource_object_class=SourceCatalogObject"""Source class (`SourceCatalogObject`)."""def__init__(self,table,source_name_key="Source_Name",source_name_alias=()):self.table=tableself._source_name_key=source_name_keyself._source_name_alias=source_name_aliasdef__str__(self):return(f"{self.__class__.__name__}:\n"f" name: {self.tag}\n"f" description: {self.description}\n"f" sources: {len(self.table)}\n")@lazypropertydef_name_to_index_cache(self):# Make a dict for quick lookup: source name -> row indexnames={}foridx,rowinenumerate(self.table):name=row[self._source_name_key]names[name.strip()]=idxforalias_columninself._source_name_alias:foraliasinstr(row[alias_column]).split(","):ifnotalias=="":names[alias.strip()]=idxreturnnamesdef_repr_html_(self):try:returnself.to_html()exceptAttributeError:returnf"<pre>{html.escape(str(self))}</pre>"
[docs]defrow_index(self,name):"""Look up row index of source by name. Parameters ---------- name : str Source name. Returns ------- index : int Row index of source in table. """index=self._name_to_index_cache[name]row=self.table[index]# check if name lookup is correct other wise recompute _name_to_index_cachepossible_names=[row[self._source_name_key]]foralias_columninself._source_name_alias:possible_names+=str(row[alias_column]).split(",")ifnamenotinpossible_names:self.__dict__.pop("_name_to_index_cache")index=self._name_to_index_cache[name]returnindex
[docs]defsource_name(self,index):"""Look up source name by row index. Parameters ---------- index : int Row index of source in table. """source_name_col=self.table[self._source_name_key]name=source_name_col[index]returnname.strip()
def__getitem__(self,key):"""Get source by name. Parameters ---------- key : str or int Source name or row index. Returns ------- source : `SourceCatalogObject` An object representing one source. """ifisinstance(key,str):index=self.row_index(key)elifisinstance(key,numbers.Integral):index=keyelifisinstance(key,np.ndarray)andkey.dtype==bool:new=deepcopy(self)new.table=self.table[key]returnnewelse:raiseTypeError(f"Invalid key: {key!r}, {type(key)}\n")returnself._make_source_object(index)def_make_source_object(self,index):"""Make one source object. Parameters ---------- index : int Row index. Returns ------- source : `SourceCatalogObject` Source object. """data=table_row_to_dict(self.table[index])data[SourceCatalogObject._row_index_key]=indexfp_energy_edges=getattr(self,"flux_points_energy_edges",None)iffp_energy_edgesisnotNone:data["fp_energy_edges"]=fp_energy_edgeshist_table=getattr(self,"hist_table",None)hist2_table=getattr(self,"hist2_table",None)ifhist_table:try:data["time_axis"]=TimeMapAxis.from_table(hist_table,format="fermi-fgl")exceptKeyError:passifhist2_table:try:data["time_axis_2"]=TimeMapAxis.from_table(hist2_table,format="fermi-fgl")exceptKeyError:passif"Extended_Source_Name"indata:name_extended=data["Extended_Source_Name"].strip()elif"Source_Name"indata:name_extended=data["Source_Name"].strip()else:name_extended=Nonetry:idx=self._lookup_extended_source_idx[name_extended]data_extended=table_row_to_dict(self.extended_sources_table[idx])except(KeyError,AttributeError):data_extended=Nonesource=self.source_object_class(data,data_extended)returnsource@lazypropertydef_lookup_extended_source_idx(self):names=[_.strip()for_inself.extended_sources_table["Source_Name"]]idx=range(len(names))returndict(zip(names,idx))@propertydefpositions(self):"""Source positions as a `~astropy.coordinates.SkyCoord` object."""return_skycoord_from_table(self.table)
[docs]defto_models(self,**kwargs):"""Create Models object from catalog."""returnModels([_.sky_model(**kwargs)for_inself])
def_skycoord_from_table(table):keys=table.colnamesif{"RAJ2000","DEJ2000"}.issubset(keys):lon,lat,frame="RAJ2000","DEJ2000","icrs"elif{"RAJ2000","DECJ2000"}.issubset(keys):lon,lat,frame="RAJ2000","DECJ2000","fk5"elif{"RA","DEC"}.issubset(keys):lon,lat,frame="RA","DEC","icrs"elif{"ra","dec"}.issubset(keys):lon,lat,frame="ra","dec","icrs"else:raiseKeyError("No column GLON / GLAT or RA / DEC or RAJ2000 / DEJ2000 found.")unit=table[lon].unit.to_string()iftable[lon].unitelse"deg"returnSkyCoord(table[lon],table[lat],unit=unit,frame=frame)