Source code for gammapy.utils.modeling

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Model classes to generate XML.

This is prototype code.

The goal was to be able to save gamma-cat in XML format
for the CTA data challenge GPS sky model.

TODO (needs a bit of experimentation / discussion / thought and a few days of coding):

* add repr to all classes
* integrate this the existing Gammapy model classes to make analysis possible.
* don't couple this to gamma-cat. Gamma-cat should change to create model classes that support XML I/O.
* sub-class Astropy Parameter and ParameterSet classes instead of starting from scratch?
* implement spatial and spectral mode registries instead of `if-elif` set on type to make SourceLibrary extensible.
* write test and docs
* Once modeling setup OK, ask new people to add missing models
  (see Gammalib, Fermi ST, naima, Sherpa, HESS)
  (it's one of the simplest and nicest things to get started with)

For XML model format definitions, see here:

* http://cta.irap.omp.eu/ctools/user_manual/getting_started/models.html#spectral-model-components
* http://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/source_models.html
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import abc
import numpy as np
from astropy.extern import six
from astropy import units as u
from astropy.table import Table, Column, vstack
from ..extern import xmltodict
from .scripts import make_path

__all__ = [
    'Parameter',
    'ParameterList',
    'SourceLibrary',
    'SourceModel',

    'SpectralModel',
    'SpectralModelPowerLaw',
    'SpectralModelPowerLaw2',
    'SpectralModelExpCutoff',

    'SpatialModel',
    'SpatialModelPoint',
    'SpatialModelGauss',
    'SpatialModelShell',

    'UnknownModelError',
]


[docs]class UnknownModelError(ValueError): """ Error when encountering unknown model types. """
[docs]class Parameter(object): """ Class representing model parameters. Parameters ---------- name : str Name of the parameter. value : float Value of the parameter. unit : str Unit of the parameter. parmin : float Parameter value minimum. Used as minimum boundary value in a model fit. parmax : float Parameter value maximum. Used as minimum boundary value in a model fit. frozen : bool Whether the parameter is free to be varied in a model fit. """ def __init__(self, name, value, unit='', parmin=None, parmax=None, frozen=False): self.name = name if isinstance(value, u.Quantity): self.quantity = value else: self.value = value self.unit = unit self.parmin = parmin self.parmax = parmax self.frozen = frozen @property def quantity(self): retval = self.value * u.Unit(self.unit) return retval @quantity.setter def quantity(self, par): self.value = par.value self.unit = par.unit def __str__(self): ss = 'Parameter(name={name!r}, value={value!r}, unit={unit!r}, ' ss += 'min={parmin!r}, max={parmax!r}, frozen={frozen!r})' return ss.format(**self.__dict__)
[docs] def to_dict(self): return dict(name=self.name, value=float(self.value), unit=str(self.unit), frozen=self.frozen, min=self.parmin, max=self.parmax)
# TODO: I think this method is not very useful, because the same can be just # achieved with `Parameter(**data)`. Why duplicate? @classmethod
[docs] def from_dict(cls, data): return cls( name=data['name'], value=data['val'], unit=data['unit'], )
@classmethod
[docs] def from_dict_gammacat(cls, data): return cls( name=data['name'], value=float(data['value']), unit=data['unit'], )
@classmethod
[docs] def from_dict_xml(cls, data): unit = data.get('@unit', '') return cls( name=data['@name'], value=float(data['@value']), unit=unit, )
[docs] def to_xml(self): return ' <parameter name="{name}" value="{value}" unit="{unit}"/>'.format(**self.__dict__)
[docs] def to_sherpa(self, modelname='Default'): """Convert to sherpa parameter""" from sherpa.models import Parameter par = Parameter(modelname=modelname, name=self.name, val=self.value, units=self.unit, min=self.parmin, max=self.parmax, frozen=self.frozen) return par
[docs]class ParameterList(object): """List of `~gammapy.spectrum.models.Parameters` Holds covariance matrix Parameters ---------- parameters : list of `Parameter` List of parameters covariance : `~numpy.ndarray` Parameters covariance matrix. Order of values as specified by `parameters`. """ def __init__(self, parameters, covariance=None): self.parameters = parameters self.covariance = covariance def __str__(self): ss = self.__class__.__name__ for par in self.parameters: ss += '\n{}'.format(par) ss += '\n\nCovariance: \n{}'.format(self.covariance) return ss def __getitem__(self, name): """Access parameter by name""" for par in self.parameters: if name == par.name: return par raise IndexError('Parameter {} not found for : {}'.format(name, self)) @classmethod
[docs] def from_list_of_dict_gammacat(cls, data): return cls([Parameter.from_dict_gammacat(_) for _ in data])
@classmethod
[docs] def from_list_of_dict_xml(cls, data): return cls([Parameter.from_dict_xml(_) for _ in data])
[docs] def to_xml(self): xml = [_.to_xml() for _ in self.parameters] return '\n'.join(xml)
[docs] def to_dict(self): retval = dict(parameters=list(), covariance=None) for par in self.parameters: retval['parameters'].append(par.to_dict()) if self.covariance is not None: retval['covariance'] = self.covariance.tolist() return retval
[docs] def to_list_of_dict(self): result = [] for i, parameter in enumerate(self.parameters): vals = parameter.to_dict() if self.covariance is None: vals['error'] = np.nan else: vals['error'] = np.sqrt(self.covariance[i, i]) result.append(vals) return result
[docs] def to_table(self): """ Serialize parameter list into `~astropy.table.Table` """ names = ['name', 'value', 'error', 'unit', 'min', 'max', 'frozen'] formats = {'value': '.3e', 'error': '.3e',} table = Table(self.to_list_of_dict(), names=names) for name in formats: table[name].format = formats[name] return table
# TODO: this is a temporary solution until we have a better way # to handle covariance matrices via a class
[docs] def covariance_to_table(self): """ Serialize parameter covariance into `~astropy.table.Table` """ t = Table(self.covariance, names=self.names)[self.free] for name in t.colnames: t[name].format = '.3' col = Column(name='name/name', data=self.names) t.add_column(col, index=0) rows = [row for row in t if row['name/name'] in self.free] return vstack(rows)
@classmethod
[docs] def from_dict(cls, val): pars = list() for par in val['parameters']: pars.append(Parameter(name=par['name'], value=par['value'], unit=par['unit'])) try: covariance = np.array(val['covariance']) except KeyError: covariance = None return cls(parameters=pars, covariance=covariance)
@property def names(self): """List of parameter names""" return [par.name for par in self.parameters] @property def _ufloats(self): """ Return dict of ufloats with covariance """ from uncertainties import correlated_values values = [_.value for _ in self.parameters] try: # convert existing parameters to ufloats uarray = correlated_values(values, self.covariance) except np.linalg.LinAlgError: raise ValueError('Covariance matrix not set.') upars = {} for par, upar in zip(self.parameters, uarray): upars[par.name] = upar return upars @property def free(self): """ Return list of free parameters names. """ free_pars = [par.name for par in self.parameters if not par.frozen] return free_pars @property def frozen(self): """ Return list of frozen parameters names. """ frozen_pars = [par.name for par in self.parameters if par.frozen] return frozen_pars # TODO: this is a temporary solution until we have a better way # to handle covariance matrices via a class
[docs] def set_parameter_errors(self, errors): """ Set uncorrelated parameters errors. Parameters ---------- errors : dict of `~astropy.units.Quantity` Dict of parameter errors. """ values = [] for par in self.parameters: quantity = errors.get(par.name, 0 * u.Unit(par.unit)) values.append(quantity.to(par.unit).value) self.covariance = np.diag(values) ** 2
# TODO: this is a temporary solution until we have a better way # to handle covariance matrices via a class
[docs] def set_parameter_covariance(self, covariance, covar_axis): """ Set full correlated parameters errors. Parameters ---------- covariance : array-like Covariance matrix covar_axis : list List of strings defining the parameter order in covariance """ shape = (len(self.parameters), len(self.parameters)) covariance_new = np.zeros(shape) idx_lookup = dict([(par.name, idx) for idx, par in enumerate(self.parameters)]) # TODO: make use of covariance matrix symmetry for i, par in enumerate(covar_axis): i_new = idx_lookup[par] for j, par_other in enumerate(covar_axis): j_new = idx_lookup[par_other] covariance_new[i_new, j_new] = covariance[i, j] self.covariance = covariance_new
[docs]class SourceLibrary(object): def __init__(self, source_list): self.source_list = source_list @classmethod
[docs] def read(cls, filename): path = make_path(filename) xml = path.read_text() return cls.from_xml(xml)
@classmethod
[docs] def from_xml(cls, xml): full_dict = xmltodict.parse(xml) sources_dict = full_dict['source_library']['source'] return cls.from_list_of_dict(sources_dict)
@classmethod
[docs] def from_list_of_dict(cls, data): source_list = [] for source_data in data: source_model = SourceModel.from_xml_dict(source_data) source_list.append(source_model) return cls(source_list=source_list)
[docs] def to_xml(self, title='sources', header='\n'): sources_xml = [] for source in self.source_list: sources_xml.append(source.to_xml()) sources_xml = '\n'.join(sources_xml) fmt = '<?xml version="1.0" ?>\n' fmt += '{header}\n' fmt += '<source_library title="{title}">\n' fmt += '\n' fmt += '{sources_xml}\n' fmt += '</source_library>\n' return fmt.format( title=title, header=header, sources_xml=sources_xml, )
[docs]class SourceModel(object): """ TODO: having "source_type" separate, but often inferred from the spatial model is weird. -> how to improve this? """ def __init__(self, source_name, source_type, spatial_model, spectral_model): self.source_name = source_name self.source_type = source_type self.spatial_model = spatial_model self.spectral_model = spectral_model @classmethod
[docs] def from_gammacat(cls, source): source_name = source.data['common_name'] spectral_model = SpectralModel.from_gammacat(source) spatial_model = SpatialModel.from_gammacat(source) source_type = spatial_model.xml_source_type return cls( source_name=source_name, source_type=source_type, spatial_model=spatial_model, spectral_model=spectral_model, )
@classmethod
[docs] def from_xml_dict(cls, data): source_name = data['@name'] source_type = data['@type'] spectral_model = SpectralModel.from_xml_dict(data['spectrum']) if 'radialModel' in data: spatial_model = SpatialModel.from_xml_dict(data['radialModel']) elif 'spatialModel' in data: spatial_model = SpatialModel.from_xml_dict(data['spatialModel']) else: raise UnknownModelError('No spatial / radial model found for: {}'.format(data)) return cls( source_name=source_name, source_type=source_type, spatial_model=spatial_model, spectral_model=spectral_model, )
[docs] def to_xml(self): fmt = '<source name="{source_name}" type="{source_type}">\n' fmt += ' {spectrum_xml}\n' fmt += ' {spatial_xml}\n' fmt += '</source>\n' data = dict( source_name=self.source_name, source_type=self.source_type, spectrum_xml=self.spectral_model.to_xml(), spatial_xml=self.spatial_model.to_xml(), ) return fmt.format(**data)
@six.add_metaclass(abc.ABCMeta) class BaseModel(object): """ Abstract base class to avoid code duplication between `SpectralModel` and `SpatialModel`. """ @abc.abstractproperty def xml_type(self): """The XML type string""" pass @abc.abstractproperty def xml_types(self): """List of XML type strings (to support older versions of the format)""" pass def __init__(self, parameters): if isinstance(parameters, ParameterList): pass elif isinstance(parameters, list): parameters = ParameterList(parameters) else: raise ValueError('Need list of Parameter or ParameterList. ' 'Invalid input: {}'.format(parameters)) self.parameters = parameters @six.add_metaclass(abc.ABCMeta)
[docs]class SpectralModel(BaseModel): """ Spectral model abstract base class. """ @classmethod
[docs] def from_xml_dict(cls, data): model_type = data['@type'] parameters = ParameterList.from_list_of_dict_xml(data['parameter']) if model_type == SpectralModelPowerLaw.xml_type: model = SpectralModelPowerLaw(parameters=parameters) elif model_type == SpectralModelPowerLaw2.xml_type: model = SpectralModelPowerLaw2(parameters=parameters) elif model_type == SpectralModelExpCutoff.xml_type: model = SpectralModelExpCutoff(parameters=parameters) else: raise UnknownModelError('Unknown spatial model: {}'.format(model_type)) return model
@classmethod
[docs] def from_gammacat(cls, source): try: data = source.spectral_model.to_dict() except ValueError: from ..catalog.gammacat import NoDataAvailableError raise NoDataAvailableError(source) plist = ParameterList.from_list_of_dict_gammacat(data['parameters']) if data['name'] == 'PowerLaw': model = SpectralModelPowerLaw.from_plist(plist) elif data['name'] == 'PowerLaw2': model = SpectralModelPowerLaw2.from_plist(plist) elif data['name'] == 'ExponentialCutoffPowerLaw': model = SpectralModelExpCutoff.from_plist(plist) else: raise UnknownModelError('Unknown spectral model: {}'.format(data)) return model
[docs] def to_xml(self): return '<spectrum type="{}">\n{}\n </spectrum>'.format(self.xml_type, self.parameters.to_xml())
[docs]class SpectralModelPowerLaw(SpectralModel): xml_type = 'PowerLaw' xml_types = [xml_type] @classmethod
[docs] def from_plist(cls, plist): par = plist['amplitude'] prefactor = Parameter(name='Prefactor', value=par.value, unit=par.unit) par = plist['index'] index = Parameter(name='Index', value=-par.value, unit=par.unit) par = plist['reference'] scale = Parameter(name='Scale', value=par.value, unit=par.unit) parameters = [prefactor, index, scale] return cls(parameters=parameters)
[docs]class SpectralModelPowerLaw2(SpectralModel): xml_type = 'PowerLaw2' xml_types = [xml_type] @classmethod
[docs] def from_plist(cls, plist): par = plist['amplitude'] integral = Parameter(name='Integral', value=par.value, unit=par.unit) par = plist['index'] index = Parameter(name='Index', value=-par.value, unit=par.unit) par = plist['emin'] lower_limit = Parameter(name='LowerLimit', value=par.value, unit=par.unit) par = plist['emax'] upper_limit = Parameter(name='UpperLimit', value=par.value, unit=par.unit) parameters = [integral, index, lower_limit, upper_limit] return cls(parameters=parameters)
[docs]class SpectralModelExpCutoff(SpectralModel): xml_type = 'ExpCutoff' xml_types = [xml_type] @classmethod
[docs] def from_plist(cls, plist): par = plist['amplitude'] prefactor = Parameter(name='Prefactor', value=par.value, unit=par.unit) par = plist['index'] index = Parameter(name='Index', value=par.value, unit=par.unit) par = plist['lambda_'] cutoff = Parameter(name='Cutoff', value=1 / par.value, unit=par.unit) par = plist['reference'] scale = Parameter(name='Scale', value=par.value, unit=par.unit) parameters = [prefactor, index, cutoff, scale] return cls(parameters=parameters)
@six.add_metaclass(abc.ABCMeta)
[docs]class SpatialModel(BaseModel): """ Spatial model abstract base class """ @classmethod
[docs] def from_xml_dict(cls, data): model_type = data['@type'] if model_type in SpatialModelPoint.xml_types: model = SpatialModelPoint(parameters=[ ]) elif model_type in SpatialModelGauss.xml_types: model = SpatialModelGauss(parameters=[ ]) elif model_type in SpatialModelShell.xml_types: model = SpatialModelShell(parameters=[ ]) else: raise UnknownModelError('Unknown spatial model: {}'.format(model_type)) return model
@classmethod
[docs] def from_gammacat(cls, source): data = source.data if data['morph_type'] == 'point': model = SpatialModelPoint.from_gammacat(source) elif data['morph_type'] == 'gauss': model = SpatialModelGauss.from_gammacat(source) elif data['morph_type'] == 'shell': model = SpatialModelShell.from_gammacat(source) else: raise UnknownModelError('Unknown spatial model: {}'.format(source)) return model
[docs] def to_xml(self): return '<spatialModel type="{}">\n{}\n </spatialModel>'.format(self.xml_type, self.parameters.to_xml())
[docs]class SpatialModelPoint(SpatialModel): xml_source_type = 'PointSource' xml_type = 'Point' xml_types = [xml_type] @classmethod
[docs] def from_gammacat(cls, source): d = source.data glon = Parameter(name='GLON', value=d['glon'].value, unit=d['glon'].unit) glat = Parameter(name='GLAT', value=d['glat'].value, unit=d['glat'].unit) parameters = [glon, glat] return cls(parameters=parameters)
[docs]class SpatialModelGauss(SpatialModel): xml_source_type = 'ExtendedSource' xml_type = 'Gauss' xml_types = [xml_type, 'Gaussian'] @classmethod
[docs] def from_gammacat(cls, source): d = source.data glon = Parameter(name='GLON', value=d['glon'].value, unit=d['glon'].unit) glat = Parameter(name='GLAT', value=d['glat'].value, unit=d['glat'].unit) sigma = Parameter(name='Sigma', value=d['morph_sigma'].value, unit=d['morph_sigma'].unit) # TODO: fill `morph_sigma2` and `morph_pa` info parameters = [glon, glat, sigma] return cls(parameters=parameters)
[docs]class SpatialModelShell(SpatialModel): xml_source_type = 'ExtendedSource' xml_type = 'Shell' xml_types = [xml_type, 'RadialShell'] @classmethod
[docs] def from_gammacat(cls, source): d = source.data glon = Parameter(name='GLON', value=d['glon'].value, unit=d['glon'].unit) glat = Parameter(name='GLAT', value=d['glat'].value, unit=d['glat'].unit) radius = Parameter(name='Radius', value=d['morph_sigma'].value, unit=d['morph_sigma'].unit) width = Parameter(name='Width', value=0, unit='deg') parameters = [glon, glat, radius, width] return cls(parameters=parameters)