Source code for gammapy.maps.sparse

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from ._sparse import find_in_array, merge_sparse_arrays

__all__ = ["SparseArray"]


def slices_to_idxs(slices, shape, ndim):
    if slices == Ellipsis:
        slices = tuple([slice(None)] * ndim)
    elif not isinstance(slices, tuple):
        slices = tuple([slices])

    if len(slices) < ndim:
        slices = tuple(list(slices) + [slice(None)] * (ndim - len(slices)))

    # nslice = min(1, sum([not isinstance(s, slice) for s in slices]))
    # nslice += sum([isinstance(s, slice) for s in slices])
    nslice = len(slices)
    idim = None
    idx = []
    for i, s in enumerate(slices):

        si = [None] * nslice
        if isinstance(s, slice):
            si[i] = slice(None)
            start = 0 if s.start is None else s.start
            stop = shape[i] if s.stop is None else s.stop

            idx += [np.arange(start, stop, 1)[tuple(si)]]
        else:
            if idim is None:
                idim = i
            si[idim] = slice(None)
            idx += [np.array(s, ndmin=1)[tuple(si)]]

    return idx


[docs]class SparseArray: """Sparse N-dimensional array object. This class implements a data structure for sparse n-dimensional arrays such that only non-zero data values are allocated in memory. Supports numpy conventions for indexing and slicing logic. Parameters ---------- shape : tuple of ints Shape of array. idx : `~numpy.ndarray`, optional Flattened index vector that initializes the array. If none then an empty array will be created. data : `~numpy.ndarray`, optional Flattened data vector that initializes the array. If none then an empty array will be created. dtype : data-type, optional Type of data vector. fill_value : scalar, optional Value assigned to array elements that are not allocated in memory. Examples -------- A SparseArray is created in the same way as `~numpy.ndarray` by passing the array shape to the constructor with an optional argument for the array type: >>> import numpy as np >>> from gammapy.maps import SparseArray >>> v = SparseArray((10,20), dtype=float) Alternatively you can create a new SparseArray from an `~numpy.ndarray` with `~SparseArray.from_array`: >>> x = np.ones((10,20)) >>> v = SparseArray.from_array(x) SparseArray follows numpy indexing and slicing conventions for setting/getting array elements. The primary difference with respect to the behavior of `~numpy.ndarray` is that indexing always returns a copy rather than a view. >>> v[0,0] = 1.0 >>> print(v[0,0]) 1.0 >>> v[:,0] = 1.0 >>> print(v[:,0]) [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] """ def __init__(self, shape, idx=None, data=None, dtype=float, fill_value=0.0): self._shape = tuple(shape) self._fill_value = fill_value if idx is not None: self._idx = idx self._data = data else: self._idx = np.zeros(0, dtype=np.int64) self._data = np.zeros(0, dtype=dtype) def __getitem__(self, slices): idx = slices_to_idxs(slices, self.shape, self.ndim) idx = np.broadcast_arrays(*idx) return self.get(idx) def __setitem__(self, slices, vals): idx = slices_to_idxs(slices, self.shape, self.ndim) idx = np.broadcast_arrays(*idx) return self.set(idx, vals) @property def size(self): """Return current number of elements.""" return len(self._data) @property def data(self): """Return the sparsified data array.""" return self._data @property def dtype(self): """Return the type of the data array member.""" return self._data.dtype @property def idx(self): """Return flattened index vector.""" return self._idx @property def shape(self): """Array shape.""" return self._shape @property def ndim(self): """Array number of dimensions (int).""" return len(self._shape)
[docs] @classmethod def from_array(cls, data, min_value=0.0): """Create a `~SparseArray` from a numpy array. Parameters ---------- data : `numpy.ndarray` Input data array. min_value : float Threshold for sparsifying the data vector. Returns ------- out : `~SparseArray` Output sparse array. """ shape = data.shape idx = np.where(data > min_value) idx = np.ravel_multi_index(idx, shape) data = data[data > min_value] return cls(shape, idx, data)
def _to_flat_index(self, idx_in): """Convert index tuple to a flattened index.""" idx_in = tuple([np.array(z, ndmin=1, copy=False) for z in idx_in]) msk = np.all(np.stack([t < n for t, n in zip(idx_in, self.shape)]), axis=0) idx = np.ravel_multi_index( tuple([t[msk] for t in idx_in]), self.shape, mode="wrap" ) return idx, msk
[docs] def set(self, idx_in, vals, fill=False): """Set array values at indices ``idx_in``.""" o = np.broadcast_arrays(vals, *idx_in) vals = np.ravel(o[0]) # TODO: Determine whether new vs. existing indices are being # addressed, in the latter case we only need to update data # array vals = np.array(vals, ndmin=1) idx_flat_in, msk_in = self._to_flat_index(idx_in) vals = np.asanyarray(vals, dtype=self.data.dtype) idx, data = merge_sparse_arrays( idx_flat_in, vals, self.idx, self.data, fill=fill ) # Remove elements equal to fill value msk = data != self._fill_value idx = idx[msk] data = data[msk] self._idx = idx self._data = data
# idx, msk = find_in_array(idx_flat_in, self.idx) # self._data[idx[msk]] = vals[msk]
[docs] def get(self, idx_in): """Get array values at indices ``idx_in``.""" shape_out = idx_in[0].shape idx_flat_in, msk_in = self._to_flat_index(idx_in) idx, msk = find_in_array(idx_flat_in, self.idx) val_out = np.full(shape_out, self._fill_value) val_out.flat[np.flatnonzero(msk_in)[msk]] = self._data[idx[msk]] return np.squeeze(val_out)
[docs] def sum(self, axis=None, dtype=None, out=None, keepdims=False, **unused_kwargs): # FIXME: Figure out how to correctly support np.apply_over_axes if axis is None: return np.sum(self._data) else: shape = list(self.shape) if keepdims: shape[axis] = 1 else: del shape[axis] out = SparseArray(shape) return out