Source code for gammapy.maps.hpxsparse

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from astropy.io import fits
from .sparse import SparseArray
from .geom import pix_tuple_to_idx
from .hpxmap import HpxMap
from .hpx import HpxGeom

__all__ = ["HpxSparseMap"]


[docs]class HpxSparseMap(HpxMap): """HEALPix sparse map with any number of non-spatial dimensions. This class uses a sparse matrix for HEALPix pixel values. Parameters ---------- geom : `~gammapy.maps.HpxGeom` HEALPIX geometry object. data : `~numpy.ndarray` HEALPIX data array. meta : `~collections.OrderedDict` Dictionary to store meta data. unit : `~astropy.units.Unit` The map unit """ def __init__(self, geom, data=None, dtype="float32", meta=None, unit=""): if data is None: shape = tuple([np.max(geom.npix)] + [ax.nbin for ax in geom.axes]) data = SparseArray(shape[::-1], dtype=dtype) elif isinstance(data, np.ndarray): data = SparseArray.from_array(data) super().__init__(geom, data, meta, unit)
[docs] @classmethod def from_hdu(cls, hdu, hdu_bands=None): """Create from a FITS HDU. Parameters ---------- hdu : `~astropy.io.fits.BinTableHDU` The FITS HDU hdu_bands : `~astropy.io.fits.BinTableHDU` The BANDS table HDU """ hpx = HpxGeom.from_header(hdu.header, hdu_bands) shape = tuple([ax.nbin for ax in hpx.axes[::-1]]) # TODO: Should we support extracting slices? meta = cls._get_meta_from_header(hdu.header) map_out = cls(hpx, meta=meta) colnames = hdu.columns.names cnames = [] if hdu.header["INDXSCHM"] == "SPARSE": pix = hdu.data.field("PIX") vals = hdu.data.field("VALUE") if "CHANNEL" in hdu.data.columns.names: chan = hdu.data.field("CHANNEL") chan = np.unravel_index(chan, shape) idx = chan + (pix,) else: idx = (pix,) map_out.set_by_idx(idx[::-1], vals) else: for c in colnames: if c.find(hpx.conv.colstring) == 0: cnames.append(c) if len(cnames) == 1: # Use [...] to force dense array indexing map_out.data[...] = hdu.data.field(cnames[0]) else: for i, cname in enumerate(cnames): idx = np.unravel_index(i, shape) map_out.data[idx + (slice(None),)] = hdu.data.field(cname) return map_out
[docs] def get_by_pix(self, pix, interp=None): if interp is None: return self.get_by_idx(pix) else: raise NotImplementedError
[docs] def get_by_idx(self, idx): # Convert to local pixel indices idx = pix_tuple_to_idx(idx) idx = self.geom.global_to_local(idx) return self.data[idx[::-1]]
[docs] def interp_by_coord(self, coords, interp=None): raise NotImplementedError
[docs] def interp_by_pix(self, pix, interp=None): raise NotImplementedError
[docs] def fill_by_idx(self, idx, weights=None): idx = pix_tuple_to_idx(idx) if weights is None: weights = np.ones(idx[0].shape) idx = self.geom.global_to_local(idx) idx_flat = np.ravel_multi_index(idx, self.data.shape[::-1]) idx_flat, idx_inv = np.unique(idx_flat, return_inverse=True) idx = np.unravel_index(idx_flat, self.data.shape[::-1]) weights = np.bincount(idx_inv, weights=weights) self.data.set(idx[::-1], weights, fill=True)
[docs] def set_by_idx(self, idx, vals): idx = pix_tuple_to_idx(idx) idx = self.geom.global_to_local(idx) self.data[idx[::-1]] = vals
def _make_cols(self, header, conv): shape = self.data.shape cols = [] if header["INDXSCHM"] == "SPARSE": array = self.data.data.astype(float) idx = np.unravel_index(self.data.idx, shape) pix = self.geom.local_to_global(idx[::-1])[0] if len(shape) == 1: cols.append(fits.Column("PIX", "J", array=pix)) cols.append(fits.Column("VALUE", "E", array=array)) else: channel = np.ravel_multi_index(idx[:-1], shape[:-1]) cols.append(fits.Column("PIX", "J", array=pix)) cols.append(fits.Column("CHANNEL", "I", array=channel)) cols.append(fits.Column("VALUE", "E", array=array)) elif len(shape) == 1: name = conv.colname(indx=conv.firstcol) # Use [...] to instantiate a dense array array = self.data[...].astype(float) cols.append(fits.Column(name, "E", array=array)) else: # FIXME: We should be filling undefined pixels here with NaN for i, idx in enumerate(np.ndindex(shape[:-1])): name = conv.colname(indx=i + conv.firstcol) # Use [...] to instantiate a dense array array = self.data[...][idx].astype(float) cols.append(fits.Column(name, "E", array=array)) return cols
[docs] def iter_by_image(self): raise NotImplementedError
[docs] def sum_over_axes(self): raise NotImplementedError
[docs] def pad(self, pad_width): raise NotImplementedError
[docs] def crop(self, crop_width): raise NotImplementedError
[docs] def upsample(self, factor): raise NotImplementedError
[docs] def downsample(self, factor): raise NotImplementedError
[docs] def to_wcs(self, sum_bands=False, normalize=True, proj="AIT", oversample=2): raise NotImplementedError
[docs] def to_swapped(self): raise NotImplementedError
[docs] def to_ud_graded(self): raise NotImplementedError