Source code for gammapy.maps.hpx.ndmap

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import numpy as np
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from regions import PointSkyRegion
import matplotlib.pyplot as plt
from gammapy.utils.units import unit_from_fits_image_hdu
from ..coord import MapCoord
from ..geom import pix_tuple_to_idx
from ..utils import INVALID_INDEX
from .core import HpxMap
from .geom import HpxGeom
from .io import HPX_FITS_CONVENTIONS, HpxConv
from .utils import HpxToWcsMapping, get_pix_size_from_nside, get_superpixels

__all__ = ["HpxNDMap"]

log = logging.getLogger(__name__)


[docs]class HpxNDMap(HpxMap): """HEALPix map with any number of non-spatial dimensions. This class uses an N+1D numpy array to represent the sequence of HEALPix image planes. Following the convention of WCS-based maps this class uses a column-wise ordering for the data array with the spatial dimension being tied to the last index of the array. Parameters ---------- geom : `~gammapy.maps.HpxGeom` HEALPix geometry object. data : `~numpy.ndarray` HEALPix data array. If None, then an empty array will be allocated. meta : `dict` Dictionary to store metadata. unit : str or `~astropy.units.Unit` The map unit. """ def __init__(self, geom, data=None, dtype="float32", meta=None, unit=""): data_shape = geom.data_shape if data is None: data = self._make_default_data(geom, data_shape, dtype) super().__init__(geom, data, meta, unit) @staticmethod def _make_default_data(geom, shape_np, dtype): if geom.npix.size > 1: data = np.full(shape_np, np.nan, dtype=dtype) idx = geom.get_idx(local=True) data[idx[::-1]] = 0 else: data = np.zeros(shape_np, dtype=dtype) return data
[docs] @classmethod def from_wcs_tiles(cls, wcs_tiles, nest=True): """Create HEALPix map from WCS tiles. Parameters ---------- wcs_tiles : list of `WcsNDMap` WCS map tiles. nest : bool, optional Indexing scheme. If True, "NESTED" scheme. If False, "RING" scheme. Default is True. Returns ------- hpx_map : `HpxNDMap` HEALPix map. """ import healpy as hp geom_wcs = wcs_tiles[0].geom geom_hpx = HpxGeom.create( binsz=geom_wcs.pixel_scales[0], frame=geom_wcs.frame, nest=nest, axes=geom_wcs.axes, ) map_hpx = cls.from_geom(geom=geom_hpx, unit=wcs_tiles[0].unit) coords = map_hpx.geom.get_coord().skycoord nside_superpix = hp.npix2nside(len(wcs_tiles)) hpx_ref = HpxGeom(nside=nside_superpix, nest=nest, frame=geom_wcs.frame) idx = np.arange(map_hpx.geom.to_image().npix.item()) indices = get_superpixels(idx, map_hpx.geom.nside, nside_superpix, nest=nest) for wcs_tile in wcs_tiles: hpx_idx = hpx_ref.coord_to_idx(wcs_tile.geom.center_skydir)[0] hpx_idx = int(hpx_idx.item()) mask = indices == hpx_idx map_hpx.data[mask] = wcs_tile.interp_by_coord(coords[mask]) return map_hpx
[docs] def to_wcs_tiles( self, nside_tiles=4, margin="0 deg", method="nearest", oversampling_factor=1 ): """Convert HpxNDMap to a list of WCS tiles. Parameters ---------- nside_tiles : int, optional HEALPix NSIDE parameter for super pixel tiles. Default is 4. margin : Angle, optional Width margin of the WCS tile. Default is "0 deg". method : {'nearest', 'linear'} Interpolation method. Default is "nearest". oversampling_factor : int, optional Oversampling factor. Default is 1. Returns ------- wcs_tiles : list of `WcsNDMap` WCS tiles. """ wcs_tiles = [] wcs_geoms = self.geom.to_wcs_tiles(nside_tiles=nside_tiles, margin=margin) for geom in wcs_geoms: if oversampling_factor > 1: geom = geom.upsample(oversampling_factor) wcs_map = self.interp_to_geom(geom=geom, method=method) wcs_tiles.append(wcs_map) return wcs_tiles
[docs] @classmethod def from_hdu(cls, hdu, hdu_bands=None, format=None, colname=None): """Make a HpxNDMap object from a FITS HDU. Parameters ---------- hdu : `~astropy.io.fits.BinTableHDU` The FITS HDU. hdu_bands : `~astropy.io.fits.BinTableHDU`, optional The BANDS table HDU. Default is None. format : str, optional FITS convention. Default is None. If None the format is guessed. The following formats are supported: - "gadf" - "fgst-ccube" - "fgst-ltcube" - "fgst-bexpcube" - "fgst-srcmap" - "fgst-template" - "fgst-srcmap-sparse" - "galprop" - "galprop2" colname : str, optional Data column name to be used for the HEALPix map. Default is None. Returns ------- map : `HpxMap` HEALPix map. """ if format is None: format = HpxConv.identify_hpx_format(hdu.header) geom = HpxGeom.from_header(hdu.header, hdu_bands, format=format) hpx_conv = HPX_FITS_CONVENTIONS[format] shape = geom.axes.shape[::-1] # TODO: Should we support extracting slices? meta = cls._get_meta_from_header(hdu.header) unit = unit_from_fits_image_hdu(hdu.header) map_out = cls(geom, None, meta=meta, unit=unit) colnames = hdu.columns.names cnames = [] if hdu.header.get("INDXSCHM", None) == "SPARSE": pix = hdu.data.field("PIX") vals = hdu.data.field("VALUE") if "CHANNEL" in hdu.data.columns.names: chan = hdu.data.field("CHANNEL") chan = np.unravel_index(chan, shape) idx = chan + (pix,) else: idx = (pix,) map_out.set_by_idx(idx[::-1], vals) else: if colname is not None: cnames.append(colname) else: for c in colnames: if c.find(hpx_conv.colstring) == 0: cnames.append(c) nbin = len(cnames) if nbin == 1: map_out.data = hdu.data.field(cnames[0]) else: for idx, cname in enumerate(cnames): idx = np.unravel_index(idx, shape) map_out.data[idx + (slice(None),)] = hdu.data.field(cname) return map_out
[docs] def to_wcs( self, sum_bands=False, normalize=True, proj="AIT", oversample=2, width_pix=None, hpx2wcs=None, fill_nan=True, ): from gammapy.maps import WcsNDMap if sum_bands and self.geom.nside.size > 1: map_sum = self.sum_over_axes() return map_sum.to_wcs( sum_bands=False, normalize=normalize, proj=proj, oversample=oversample, width_pix=width_pix, ) # FIXME: Check whether the old mapping is still valid and reuse it if hpx2wcs is None: geom_wcs_image = self.geom.to_wcs_geom( proj=proj, oversample=oversample, width_pix=width_pix ).to_image() hpx2wcs = HpxToWcsMapping.create(self.geom, geom_wcs_image) # FIXME: Need a function to extract a valid shape from npix property if sum_bands: axes = np.arange(self.data.ndim - 1) hpx_data = np.apply_over_axes(np.sum, self.data, axes=axes) hpx_data = np.squeeze(hpx_data) wcs_shape = tuple([t.flat[0] for t in hpx2wcs.npix]) wcs_data = np.zeros(wcs_shape).T wcs = hpx2wcs.wcs.to_image() else: hpx_data = self.data wcs_shape = tuple([t.flat[0] for t in hpx2wcs.npix]) + self.geom.shape_axes wcs_data = np.zeros(wcs_shape).T wcs = hpx2wcs.wcs.to_cube(self.geom.axes) # FIXME: Should reimplement instantiating map first and fill data array hpx2wcs.fill_wcs_map_from_hpx_data(hpx_data, wcs_data, normalize, fill_nan) return WcsNDMap(wcs, wcs_data, unit=self.unit)
def _pad_spatial(self, pad_width, mode="constant", cval=0): geom = self.geom._pad_spatial(pad_width=pad_width) map_out = self._init_copy(geom=geom, data=None) map_out.coadd(self) coords = geom.get_coord(flat=True) m = self.geom.contains(coords) coords = tuple([c[~m] for c in coords]) if mode == "constant": map_out.set_by_coord(coords, cval) elif mode == "interp": raise ValueError("Method 'interp' not supported for HpxMap") else: raise ValueError(f"Unrecognized pad mode: {mode!r}") return map_out
[docs] def crop(self, crop_width): geom = self.geom.crop(crop_width) map_out = self._init_copy(geom=geom, data=None) map_out.coadd(self) return map_out
[docs] def upsample(self, factor, order=0, preserve_counts=True, axis_name=None): if axis_name: raise NotImplementedError( "HpxNDMap.upsample does currently not support upsampling of non-spatial axes." ) if order != 0: raise ValueError( "HpxNDMap.upsample currently only supports nearest upsampling" ) geom = self.geom.upsample(factor) coords = geom.get_coord() data = self.get_by_coord(coords) if preserve_counts: data /= factor**2 return self._init_copy(geom=geom, data=data)
[docs] def downsample(self, factor, preserve_counts=True, axis_name=None): if axis_name: raise NotImplementedError( "HpxNDMap does currently not support upsampling of non-spatial axes." ) geom = self.geom.downsample(factor) coords = self.geom.get_coord() vals = self.get_by_coord(coords) map_out = self._init_copy(geom=geom, data=None) map_out.fill_by_coord(coords, vals) if not preserve_counts: map_out.data /= factor**2 return map_out
[docs] def to_nside(self, nside, preserve_counts=True): """Upsample or downsample the map to a given nside. Parameters ---------- nside : int HEALPix NSIDE parameter. preserve_counts : bool, optional Preserve the integral over each bin. This should be true if the map is an integral quantity (e.g. counts) and false if the map is a differential quantity (e.g. intensity). Default is True. Returns ------- geom : `~HpxNDMap` HEALPix map with new NSIDE. """ if len(self.geom.nside) > 1: raise NotImplementedError( "to_nside() is not supported for an irregular map." ) factor = nside / self.geom.nside.item() if factor > 1: return self.upsample(factor=int(factor), preserve_counts=preserve_counts) elif factor < 1: return self.downsample( factor=int(1 / factor), preserve_counts=preserve_counts ) else: return self.copy()
[docs] def interp_by_coord(self, coords, method="linear", fill_value=None): # inherited docstring coords = MapCoord.create(coords, frame=self.geom.frame) if method == "linear": return self._interp_by_coord(coords) elif method == "nearest": return self.get_by_coord(coords) else: raise ValueError(f"Invalid interpolation method: {method!r}")
[docs] def interp_by_pix(self, pix, method=None, fill_value=None): """Interpolate map values at the given pixel coordinates.""" raise NotImplementedError
[docs] def cutout(self, position, width, *args, **kwargs): """Create a cutout around a given position. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Center position of the cutout region. width : `~astropy.coordinates.Angle` or `~astropy.units.Quantity` Diameter of the circular cutout region. Returns ------- cutout : `~gammapy.maps.HpxNDMap` Cutout map. """ geom = self.geom.cutout(position=position, width=width) if self.geom.is_allsky: idx = geom._ipix else: idx = self.geom.to_image().global_to_local((geom._ipix,)) data = self.data[..., idx] return self.__class__(geom=geom, data=data, unit=self.unit, meta=self.meta)
[docs] def stack(self, other, weights=None, nan_to_num=True): """Stack cutout into map. Parameters ---------- other : `HpxNDMap` Other map to stack. weights : `HpxNDMap`, optional Array to be used as weights. The spatial geometry must be equivalent to `other` and additional axes must be broadcastable. Default is None. nan_to_num: bool, optional Non-finite values are replaced by zero if True. Default is True. """ if self.geom == other.geom: idx = None elif self.geom.is_aligned(other.geom): if self.geom.is_allsky: idx = other.geom._ipix else: idx = self.geom.to_image().global_to_local((other.geom._ipix,))[0] else: raise ValueError( "Can only stack equivalent maps or cutout of the same map." ) data = other.quantity.to_value(self.unit) if nan_to_num: not_finite = ~np.isfinite(data) if np.any(not_finite): data = data.copy() data[not_finite] = 0 if weights is not None: if not other.geom.to_image() == weights.geom.to_image(): raise ValueError("Incompatible spatial geoms between map and weights") data = data * weights.data if idx is None: self.data += data else: self.data[..., idx] += data
[docs] def smooth(self, width, kernel="gauss"): """Smooth the map. Iterates over 2D image planes, processing one at a time. Parameters ---------- width : `~astropy.units.Quantity`, str or float Smoothing width given as quantity or float. If a float is given it is interpreted as smoothing width in pixels. If an (angular) quantity is given it is converted to pixels using `~healpy.nside2resol`. It corresponds to the standard deviation in case of a Gaussian kernel, and the radius in case of a disk kernel. kernel : {'gauss', 'disk'}, optional Kernel shape. Default is "gauss". Returns ------- image : `HpxNDMap` Smoothed image (a copy, the original object is unchanged). """ import healpy as hp if len(self.geom.nside) > 1: raise NotImplementedError("smooth is not supported for an irregular map.") nside = self.geom.nside.item() lmax = int(3 * nside - 1) # maximum l of the power spectrum ipix = self.geom._ipix if not self.geom.is_allsky: # stack into an all sky map full_sky_geom = HpxGeom.create( nside=self.geom.nside, nest=self.geom.nest, frame=self.geom.frame, axes=self.geom.axes, ) full_sky_map = HpxNDMap.from_geom(full_sky_geom) for img, idx in self.iter_by_image_data(): full_sky_map.data[idx][ipix] = img else: full_sky_map = self # The smoothing width is expected by healpy in radians if isinstance(width, (u.Quantity, str)): width = u.Quantity(width) width = width.to_value("rad") else: binsz = np.degrees(hp.nside2resol(nside)) width = width * binsz width = np.deg2rad(width) smoothed_data = np.empty(self.data.shape, dtype=float) for img, idx in full_sky_map.iter_by_image_data(): img = img.astype(float) if self.geom.nest: # reorder to ring to do the smoothing img = hp.pixelfunc.reorder(img, n2r=True) if kernel == "gauss": data = hp.sphtfunc.smoothing(img, sigma=width, pol=False, lmax=lmax) elif kernel == "disk": # create the step function in angular space theta = np.linspace(0, width) beam = np.ones(len(theta)) beam[theta > width] = 0 # convert to the spherical harmonics space window_beam = hp.sphtfunc.beam2bl(beam, theta, lmax) # normalize the window beam window_beam = window_beam / window_beam.max() data = hp.sphtfunc.smoothing( img, beam_window=window_beam, pol=False, lmax=lmax ) else: raise ValueError(f"Invalid kernel: {kernel!r}") if self.geom.nest: # reorder back to nest after the smoothing data = hp.pixelfunc.reorder(data, r2n=True) smoothed_data[idx] = data[ipix] return self._init_copy(data=smoothed_data)
[docs] def convolve(self, kernel, convolution_method="wcs-tan", **kwargs): """Convolve map with a WCS kernel. Project the map into a WCS geometry, convolve with a WCS kernel and project back into the initial HEALPix geometry. If the kernel is two-dimensional, it is applied to all image planes likewise. If the kernel is higher dimensional it must match the map in the number of dimensions and the corresponding kernel is selected for every image plane. Parameters ---------- kernel : `~gammapy.irf.PSFKernel` Convolution kernel. The pixel size must be upsampled by a factor 2 or bigger with respect to the input map to prevent artifacts in the projection. convolution_method : {"wcs-tan", ""} Convolution method. If "wcs-tan", project on WCS geometry and convolve with WCS kernel. See `~gammapy.maps.HpxNDMap.convolve_wcs`. If "", convolve map with a symmetrical WCS kernel. See `~gammapy.maps.HpxNDMap.convolve_full`. Default is "wcs-tan". **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsNDMap.convolve`. Returns ------- map : `HpxNDMap` Convolved map. """ if convolution_method == "wcs-tan": return self.convolve_wcs(kernel, **kwargs) elif convolution_method == "": return self.convolve_full(kernel) else: raise ValueError( f"Not a valid method for HPX convolution: {convolution_method}" )
[docs] def convolve_wcs(self, kernel, **kwargs): """Convolve map with a WCS kernel. Project the map into a WCS geometry, convolve with a WCS kernel and project back into the initial HEALPix geometry. If the kernel is two-dimensional, it is applied to all image planes likewise. If the kernel is higher dimensional should either match the map in the number of dimensions or the map must be an image (no non-spatial axes). In that case, the corresponding kernel is selected and applied to every image plane or to the single input image respectively. Parameters ---------- kernel : `~gammapy.irf.PSFKernel` Convolution kernel. The pixel size must be upsampled by a factor 2 or bigger with respect to the input map to prevent artifacts in the projection. **kwargs : dict Keyword arguments passed to `~gammapy.maps.WcsNDMap.convolve`. Returns ------- map : `HpxNDMap` Convolved map. """ # TODO: maybe go through `.to_wcs_tiles()` to make this work for allsky maps if self.geom.is_allsky: raise ValueError( "Convolution via WCS projection is not supported for allsky maps." ) if self.geom.width > 10 * u.deg: log.warning( "Convolution via WCS projection is not recommended for large " "maps (> 10 deg). Perhaps the method `convolve_full()` is more suited for " "this case." ) geom_kernel = kernel.psf_kernel_map.geom wcs_size = np.max(geom_kernel.to_image().pixel_scales.deg) hpx_size = get_pix_size_from_nside(self.geom.nside[0]) if wcs_size > 0.5 * hpx_size: raise ValueError( f"The kernel pixel size of {wcs_size} has to be smaller by at least" f" a factor 2 than the pixel size of the input map of {hpx_size}" ) geom_wcs = self.geom.to_wcs_geom(proj="TAN").to_image() hpx2wcs = HpxToWcsMapping.create( hpx=self.geom, wcs=geom_wcs.to_binsz(binsz=wcs_size) ) # Project to WCS and convolve wcs_map = self.to_wcs(hpx2wcs=hpx2wcs, fill_nan=False) conv_wcs_map = wcs_map.convolve(kernel=kernel, **kwargs) if self.geom.is_image and geom_kernel.ndim > 2: target_geom = self.geom.to_cube(geom_kernel.axes) else: target_geom = self.geom # and back to hpx data = np.zeros(target_geom.data_shape) data = hpx2wcs.fill_hpx_map_from_wcs_data( wcs_data=conv_wcs_map.data, hpx_data=data ) return HpxNDMap.from_geom(target_geom, data=data)
[docs] def convolve_full(self, kernel): """Convolve map with a symmetrical WCS kernel. Extract the radial profile of the kernel (assuming radial symmetry) and convolve via `~healpy.sphtfunc.smoothing`. Since no projection is applied, this is suited for full-sky and large maps. If the kernel is two-dimensional, it is applied to all image planes likewise. If the kernel is higher dimensional it must match the map in the number of dimensions and the corresponding kernel is selected for every image plane. Parameters ---------- kernel : `~gammapy.irf.PSFKernel` Convolution kernel. The pixel size must be upsampled by a factor 2 or bigger with respect to the input map to prevent artifacts in the projection. Returns ------- map : `HpxNDMap` Convolved map. """ import healpy as hp if len(self.geom.nside) > 1: raise NotImplementedError( "convolve_full() is not supported for an irregular map." ) nside = self.geom.nside.item() lmax = int(3 * nside - 1) # maximum l of the power spectrum nest = self.geom.nest allsky = self.geom.is_allsky ipix = self.geom._ipix if not allsky: # stack into an all sky map full_sky_geom = HpxGeom.create( nside=self.geom.nside, nest=self.geom.nest, frame=self.geom.frame, axes=self.geom.axes, ) full_sky_map = HpxNDMap.from_geom(full_sky_geom) for img, idx in self.iter_by_image_data(): full_sky_map.data[idx][ipix] = img else: full_sky_map = self # Get radial profile from the kernel psf_kernel = kernel.psf_kernel_map center_pix = psf_kernel.geom.center_pix[:2] center = max(center_pix) dim = np.argmax(center_pix) pixels = [0, 0] pixels[dim] = np.linspace( 0, center, int(center + 1) ) # assuming radially symmetric kernel pixels[abs(1 - dim)] = center_pix[abs(1 - dim)] * np.ones(int(center + 1)) coords = psf_kernel.geom.pix_to_coord(pixels) coordinates = SkyCoord(coords[0], coords[1], frame=psf_kernel.geom.frame) angles = coordinates.separation(psf_kernel.geom.center_skydir).rad values = psf_kernel.get_by_pix(pixels) # Do the convolution in each image plane convolved_data = np.empty(self.data.shape, dtype=float) for img, idx in full_sky_map.iter_by_image_data(): img = img.astype(float) if nest: # reorder to ring to do the convolution img = hp.pixelfunc.reorder(img, n2r=True) radial_profile = np.reshape(values[:, idx], (values.shape[0],)) window_beam = hp.sphtfunc.beam2bl( np.flip(radial_profile), np.flip(angles), lmax ) window_beam = window_beam / window_beam.max() data = hp.sphtfunc.smoothing( img, beam_window=window_beam, pol=False, lmax=lmax ) if nest: # reorder back to nest after the convolution data = hp.pixelfunc.reorder(data, r2n=True) convolved_data[idx] = data[ipix] return self._init_copy(data=convolved_data)
[docs] def get_by_idx(self, idx): # inherited docstring idx = pix_tuple_to_idx(idx) idx = self.geom.global_to_local(idx) return self.data.T[idx]
def _interp_by_coord(self, coords): """Linearly interpolate map values.""" pix, wts = self.geom.interp_weights(coords) if self.geom.is_image: return np.sum(self.data.T[tuple(pix)] * wts, axis=0) val = np.zeros(pix[0].shape[1:]) # Loop over function values at corners for i in range(2 ** len(self.geom.axes)): pix_i = [] wt = np.ones(pix[0].shape[1:])[np.newaxis, ...] for j, ax in enumerate(self.geom.axes): idx = ax.coord_to_idx(coords[ax.name]) idx = np.clip(idx, 0, len(ax.center) - 2) w = ax.center[idx + 1] - ax.center[idx] c = u.Quantity(coords[ax.name], ax.center.unit, copy=False).value if i & (1 << j): wt *= (c - ax.center[idx].value) / w.value pix_i += [idx + 1] else: wt *= 1.0 - (c - ax.center[idx].value) / w.value pix_i += [idx] if not self.geom.is_regular: pix, wts = self.geom.interp_weights(coords, idxs=pix_i) wts[pix[0] == INVALID_INDEX.int] = 0 wt[~np.isfinite(wt)] = 0 val += np.nansum(wts * wt * self.data.T[tuple(pix[:1] + pix_i)], axis=0) return val def _resample_by_idx(self, idx, weights=None, preserve_counts=False): idx = pix_tuple_to_idx(idx) msk = np.all(np.stack([t != INVALID_INDEX.int for t in idx]), axis=0) if weights is not None: weights = weights[msk] idx = [t[msk] for t in idx] idx_local = list(self.geom.global_to_local(idx)) msk = idx_local[0] >= 0 idx_local = [t[msk] for t in idx_local] if weights is not None: if isinstance(weights, u.Quantity): weights = weights.to_value(self.unit) weights = weights[msk] idx_local = np.ravel_multi_index(idx_local, self.data.T.shape) idx_local, idx_inv = np.unique(idx_local, return_inverse=True) weights = np.bincount(idx_inv, weights=weights) if not preserve_counts: weights /= np.bincount(idx_inv).astype(self.data.dtype) self.data.T.flat[idx_local] += weights
[docs] def fill_by_idx(self, idx, weights=None): return self._resample_by_idx(idx, weights=weights, preserve_counts=True)
[docs] def set_by_idx(self, idx, vals): idx = pix_tuple_to_idx(idx) idx_local = self.geom.global_to_local(idx) self.data.T[idx_local] = vals
def _make_cols(self, header, conv): shape = self.data.shape cols = [] if header["INDXSCHM"] == "SPARSE": data = self.data.copy() data[~np.isfinite(data)] = 0 nonzero = np.where(data > 0) value = data[nonzero].astype(float) pix = self.geom.local_to_global(nonzero[::-1])[0] if len(shape) == 1: cols.append(fits.Column("PIX", "J", array=pix)) cols.append(fits.Column("VALUE", "E", array=value)) else: channel = np.ravel_multi_index(nonzero[:-1], shape[:-1]) cols.append(fits.Column("PIX", "J", array=pix)) cols.append(fits.Column("CHANNEL", "I", array=channel)) cols.append(fits.Column("VALUE", "E", array=value)) elif len(shape) == 1: name = conv.colname(indx=conv.firstcol) array = self.data.astype(float) cols.append(fits.Column(name, "E", array=array)) else: for i, idx in enumerate(np.ndindex(shape[:-1])): name = conv.colname(indx=i + conv.firstcol) array = self.data[idx].astype(float) cols.append(fits.Column(name, "E", array=array)) return cols
[docs] def to_swapped(self): import healpy as hp hpx_out = self.geom.to_swapped() map_out = self._init_copy(geom=hpx_out, data=None) idx = self.geom.get_idx(flat=True) vals = self.get_by_idx(idx) if self.geom.nside.size > 1: nside = self.geom.nside[idx[1:]] else: nside = self.geom.nside if self.geom.nest: idx_new = tuple([hp.nest2ring(nside, idx[0])]) + idx[1:] else: idx_new = tuple([hp.ring2nest(nside, idx[0])]) + idx[1:] map_out.set_by_idx(idx_new, vals) return map_out
[docs] def to_region_nd_map(self, region, func=np.nansum, weights=None, method="nearest"): """Get region ND map in a given region. By default, the whole map region is considered. Parameters ---------- region: `~regions.Region` or `~astropy.coordinates.SkyCoord` Region. func : numpy.func, optional Function to reduce the data. Default is np.nansum. For boolean Map, use np.any or np.all. weights : `WcsNDMap`, optional Array to be used as weights. The geometry must be equivalent. Default is None. method : {"nearest", "linear"} How to interpolate if a position is given. Default is "neraest". Returns ------- spectrum : `~gammapy.maps.RegionNDMap` Spectrum in the given region. """ from gammapy.maps import RegionGeom, RegionNDMap if isinstance(region, SkyCoord): region = PointSkyRegion(region) if weights is not None: if not self.geom == weights.geom: raise ValueError("Incompatible spatial geoms between map and weights") geom = RegionGeom(region=region, axes=self.geom.axes) if isinstance(region, PointSkyRegion): coords = geom.get_coord() data = self.interp_by_coord(coords=coords, method=method) if weights is not None: data *= weights.interp_by_coord(coords=coords, method=method) else: cutout = self.cutout(position=geom.center_skydir, width=np.max(geom.width)) if weights is not None: weights_cutout = weights.cutout( position=geom.center_skydir, width=geom.width ) cutout.data *= weights_cutout.data mask = cutout.geom.to_image().region_mask([region]).data data = func(cutout.data[..., mask], axis=-1) return RegionNDMap(geom=geom, data=data, unit=self.unit, meta=self.meta.copy())
[docs] def plot( self, method="raster", ax=None, normalize=False, proj="AIT", oversample=2, width_pix=1000, **kwargs, ): """Quickplot method. This will generate a visualization of the map by converting to a rasterized WCS image (method='raster') or drawing polygons for each pixel (method='poly'). Parameters ---------- method : {'raster','poly'} Method for mapping HEALPix pixels to a two-dimensional image. Can be set to 'raster' (rasterization to cartesian image plane) or 'poly' (explicit polygons for each pixel). WARNING: The 'poly' method is much slower than 'raster' and only suitable for maps with less than ~10k pixels. Default is "raster". proj : string, optional Any valid WCS projection type. Default is "AIT". oversample : float, optional Oversampling factor for WCS map. This will be the approximate ratio of the width of a HPX pixel to a WCS pixel. If this parameter is None then the width will be set from ``width_pix``. Default is 2. width_pix : int, optional Width of the WCS geometry in pixels. The pixel size will be set to the number of pixels satisfying ``oversample`` or ``width_pix`` whichever is smaller. If this parameter is None then the width will be set from ``oversample``. Default is 1000. **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.imshow`. Returns ------- ax : `~astropy.visualization.wcsaxes.WCSAxes` WCS axes object. """ if method == "raster": m = self.to_wcs( sum_bands=True, normalize=normalize, proj=proj, oversample=oversample, width_pix=width_pix, ) return m.plot(ax, **kwargs) elif method == "poly": return self._plot_poly(proj=proj, ax=ax) else: raise ValueError(f"Invalid method: {method!r}")
def _plot_poly(self, proj="AIT", step=1, ax=None): """Plot the map using a collection of polygons. Parameters ---------- proj : string, optional Any valid WCS projection type. Default is "AIT". step : int, optional Set the number vertices that will be computed for each pixel in multiples of 4. Default is 1. ax : `~matplotlib.axes.Axes`, optional Matplotlib axes. Default is None. """ # FIXME: At the moment this only works for all-sky maps if the # projection is centered at (0,0) # FIXME: Figure out how to force a square aspect-ratio like imshow import healpy as hp from matplotlib.collections import PatchCollection from matplotlib.patches import Polygon wcs = self.geom.to_wcs_geom(proj=proj, oversample=1) if ax is None: fig = plt.gcf() ax = fig.add_subplot(111, projection=wcs.wcs, aspect="equal") wcs_lonlat = wcs.center_coord[:2] idx = self.geom.get_idx() vtx = hp.boundaries( self.geom.nside.item(), idx[0], nest=self.geom.nest, step=step ) theta, phi = hp.vec2ang(np.rollaxis(vtx, 2)) theta = theta.reshape((4 * step, -1)).T phi = phi.reshape((4 * step, -1)).T patches = [] data = [] def get_angle(x, t): return 180.0 - (180.0 - x + t) % 360.0 for i, (x, y) in enumerate(zip(phi, theta)): lon, lat = np.degrees(x), np.degrees(np.pi / 2.0 - y) # Add a small offset to avoid vertices wrapping to the # other size of the projection if get_angle(np.median(lon), wcs_lonlat[0].to_value("deg")) > 0: idx = wcs.coord_to_pix((lon - 1e-4, lat)) else: idx = wcs.coord_to_pix((lon + 1e-4, lat)) dist = np.max(np.abs(idx[0][0] - idx[0])) # Split pixels that wrap around the edges of the projection if dist > wcs.npix[0] / 1.5: lon, lat = np.degrees(x), np.degrees(np.pi / 2.0 - y) lon0 = lon - 1e-4 lon1 = lon + 1e-4 pix0 = wcs.coord_to_pix((lon0, lat)) pix1 = wcs.coord_to_pix((lon1, lat)) idx0 = np.argsort(pix0[0]) idx1 = np.argsort(pix1[0]) pix0 = (pix0[0][idx0][:3], pix0[1][idx0][:3]) pix1 = (pix1[0][idx1][1:], pix1[1][idx1][1:]) patches.append(Polygon(np.vstack((pix0[0], pix0[1])).T, closed=True)) patches.append(Polygon(np.vstack((pix1[0], pix1[1])).T, closed=True)) data.append(self.data[i]) data.append(self.data[i]) else: polygon = Polygon(np.vstack((idx[0], idx[1])).T, closed=True) patches.append(polygon) data.append(self.data[i]) p = PatchCollection(patches, linewidths=0, edgecolors="None") p.set_array(np.array(data)) ax.add_collection(p) ax.autoscale_view() ax.coords.grid(color="w", linestyle=":", linewidth=0.5) return ax
[docs] def plot_mask( self, method="raster", ax=None, proj="AIT", oversample=2, width_pix=1000, **kwargs, ): """Plot the mask as a shaded area. Parameters ---------- method : {'raster','poly'} Method for mapping HEALPix pixels to a two-dimensional image. Can be set to 'raster' (rasterization to cartesian image plane) or 'poly' (explicit polygons for each pixel). WARNING: The 'poly' method is much slower than 'raster' and only suitable for maps with less than ~10k pixels. Default is "raster". proj : string, optional Any valid WCS projection type. Default is "AIT". oversample : float, optional Oversampling factor for WCS map. This will be the approximate ratio of the width of a HPX pixel to a WCS pixel. If this parameter is None then the width will be set from ``width_pix``. Default is 2. width_pix : int, optional Width of the WCS geometry in pixels. The pixel size will be set to the number of pixels satisfying ``oversample`` or ``width_pix`` whichever is smaller. If this parameter is None then the width will be set from ``oversample``. Default is 1000. **kwargs : dict Keyword arguments passed to `~matplotlib.pyplot.imshow`. Returns ------- ax : `~astropy.visualization.wcsaxes.WCSAxes` WCS axis object. """ if not self.is_mask: raise ValueError( "`.plot_mask()` only supports maps containing boolean values." ) if method == "raster": m = self.to_wcs( sum_bands=True, normalize=False, proj=proj, oversample=oversample, width_pix=width_pix, ) m.data = np.nan_to_num(m.data).astype(bool) return m.plot_mask(ax=ax, **kwargs) else: raise ValueError(f"Invalid method: {method!r}")
[docs] def sample_coord(self, n_events, random_state=0): raise NotImplementedError("HpXNDMap.sample_coord is not implemented yet.")