Source code for gammapy.cube.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import OrderedDict
import numpy as np
from numpy.testing import assert_allclose
from astropy.io import fits
import astropy.units as u
from astropy.units import Quantity
from astropy.table import Table
from astropy.wcs import WCS
from astropy.utils import lazyproperty
from ..utils.scripts import make_path
from ..utils.energy import EnergyBounds, Energy
from ..utils.fits import SmartHDUList, fits_header_to_meta_dict, table_to_fits_table
from ..image.core import SkyImage, MapBase
from ..spectrum import LogEnergyAxis
from ..spectrum.utils import _trapz_loglog

__all__ = [
    'SkyCube',
]


[docs]class SkyCube(MapBase): """Sky cube with dimensions lon, lat and energy. .. note:: A new set of map and cube classes is being developed in `gammapy.maps` and long-term will replace the existing `gammapy.image.SkyImage` and `gammapy.cube.SkyCube` classes. Please consider trying out `gammapy.maps` and changing your scripts to use those new classes. See :ref:`maps`. The order of the sky cube axes is defined as following: * The ``data`` array axis order is ``(energy, lat, lon)``. * The ``wcs`` object is a two dimensional celestial WCS with axis order ``(lon, lat)``. For further information, see :ref:`cube`. Parameters ---------- name : str Name of the cube. data : `~numpy.ndarray` Data array. wcs : `~astropy.wcs.WCS` WCS transformation object. energy_axis : `~gammapy.spectrum.LogEnergyAxis` Energy axis object, defining the energy transformation. meta : `~collections.OrderedDict` Dictionary to store meta data. """ def __init__(self, name=None, data=None, wcs=None, energy_axis=None, meta=None): # TODO: check validity of inputs self.name = name # TODO: In gammapy SkyCube is used sometimes with ndim = 2 for cubes # with a single energy band if not data.ndim > 1: raise ValueError('Dimension of the data must be ndim = 3, but is ' 'ndim = {}'.format(data.ndim)) self.data = data self.wcs = wcs self.meta = meta self.energy_axis = energy_axis @lazyproperty def _interpolate_data(self): """Interpolate data using `~scipy.interpolate.RegularGridInterpolator`.""" from scipy.interpolate import RegularGridInterpolator # set up log interpolation unit = self.data.unit # TODO: how to deal with zero values? log_data = np.log(self.data.value) z = np.arange(self.data.shape[0]) y = np.arange(self.data.shape[1]) x = np.arange(self.data.shape[2]) f = RegularGridInterpolator((z, y, x), log_data, fill_value=None, bounds_error=False) def interpolate(z, y, x, method='linear'): shape = z.shape coords = np.column_stack([z.flat, y.flat, x.flat]) val = f(coords, method=method) return Quantity(np.exp(val).reshape(shape), unit) return interpolate
[docs] @classmethod def read_hdu(cls, hdu_list): """Read sky cube from HDU list. Parameters ---------- object_hdu : `~astropy.io.fits.ImageHDU` Image HDU object to be read energy_table_hdu : `~astropy.io.fits.TableHDU` Table HDU object giving energies of each slice of the Image HDU object_hdu Returns ------- sky_cube : `SkyCube` Sky cube """ object_hdu = hdu_list[0] energy_table_hdu = hdu_list[1] data = object_hdu.data data = Quantity(data, '1 / (cm2 MeV s sr)') # Note: the energy axis of the FITS cube is unusable. # We only use proj for LON, LAT and call ENERGY from the table extension header = object_hdu.header wcs = WCS(header) energy = energy_table_hdu.data['Energy'] energy = Quantity(energy, 'MeV') meta = OrderedDict(header) return cls(data=data, wcs=wcs, energy=energy, meta=meta)
[docs] @classmethod def read(cls, filename, format='fermi-counts'): """Read sky cube from FITS file. Parameters ---------- filename : str File name format : {'fermi-counts', 'fermi-background', 'fermi-exposure'} Fits file format. Returns ------- sky_cube : `SkyCube` Sky cube """ filename = str(make_path(filename)) hdu_list = SmartHDUList.open(filename) hdu = hdu_list.get_hdu(hdu_type='image') header = hdu.header wcs = WCS(header).celestial meta = fits_header_to_meta_dict(header) data = hdu.data # TODO: check and give reference for fermi data units # TODO: choose format automatically if format == 'fermi-background': energy = Table.read(filename, 'ENERGIES')['Energy'] energy_axis = LogEnergyAxis(Quantity(energy, 'MeV'), mode='center') data = Quantity(data, '1 / (cm2 MeV s sr)') name = 'flux' elif format == 'fermi-counts': energy = EnergyBounds.from_ebounds(fits.open(filename)['EBOUNDS']) energy_axis = LogEnergyAxis(energy, mode='edges') data = Quantity(data, 'count') name = 'counts' elif format == 'fermi-exposure': energy = Table.read(filename, 'ENERGIES')['Energy'] energy_axis = LogEnergyAxis(Quantity(energy, 'MeV'), mode='center') data = Quantity(data, 'cm2 s') name = 'exposure' else: raise ValueError('Not a valid cube fits format') obj = cls(name=name, data=data, wcs=wcs, energy_axis=energy_axis, meta=meta) obj._header = header return obj
[docs] def fill_events(self, events, weights=None): """Fill events (modifies ``data`` attribute). Parameters ---------- events : `~gammapy.data.EventList` Event list weights : str, optional Column to use as weights (none by default) """ if weights is not None: weights = events[weights] xx, yy = self.sky_image_ref.wcs_skycoord_to_pixel(events.radec) zz = events.energy bins = self.energies(mode='edges'), self.sky_image_ref._bins_pix[0], self.sky_image_ref._bins_pix[1] data = np.histogramdd([zz, yy, xx], bins, weights=weights)[0] self.data = self.data + data
[docs] @classmethod def empty(cls, emin=0.5, emax=100, enumbins=10, eunit='TeV', mode='edges', **kwargs): """Create empty sky cube with log equal energy binning from the scratch. Parameters ---------- emin : float Minimum energy. emax : float Maximum energy. enumbins : int Number of energy bins. eunit : str Energy unit. mode : {'edges', 'center'} Whether emin and emax denote the bin centers or edges. kwargs : dict Keyword arguments passed to `~gammapy.image.SkyImage.empty` to create the spatial part of the cube. Examples -------- Create an empty sky cube:: from gammapy.cube import SkyCube cube = SkyCube.empty(nxpix=11, nypix=7, enumbins=3, mode='center', emin=1, emax=100, eunit='TeV') Returns ------- empty_cube : `SkyCube` Empty sky cube object. """ image = SkyImage.empty(**kwargs) if mode == 'edges': energy = EnergyBounds.equal_log_spacing(emin, emax, enumbins, eunit) elif mode == 'center': energy = Energy.equal_log_spacing(emin, emax, enumbins, eunit) else: raise ValueError("Not a valid mode. Choose either 'center' or 'edges'.") energy_axis = LogEnergyAxis(energy, mode=mode) data = Quantity(image.data * np.ones(enumbins).reshape((-1, 1, 1)), image.unit) return cls(data=data, wcs=image.wcs, energy_axis=energy_axis)
[docs] @classmethod def empty_like(cls, reference, energies=None, unit='', fill=0): """Create an empty sky cube with a given WCS and energy specification. Parameters ---------- reference : `~gammapy.cube.SkyCube` or `~gammapy.image.SkyImage` Reference sky cube or image. energies : `~gammapy.utils.energy.Energy` or `~gammapy.utils.energy.EnergyBounds` (optional) Reference energies, mandatory when a `~gammapy.image.SkyImage` is passed. unit : str String specifying the data units. fill : float Value to fill the data array with. Examples -------- Create an empty sky cube from an image and energy center specification:: from astropy import units as u from gammapy.image import SkyImage from gammapy.cube import SkyCube from gammapy.utils.energy import Energy, EnergyBounds # define reference image image = SkyImage.empty(nxpix=11, nypix=7) # define energy binning centers energies = Energy.equal_log_spacing(1 * u.TeV, 100 * u.TeV, 3) cube = SkyCube.empty_like(reference=image, energies=energies) # define energy binning bounds energies = EnergyBounds.equal_log_spacing(1 * u.TeV, 100 * u.TeV, 3) cube = SkyCube.empty_like(reference=image, energies=energies) Returns ------- empty_cube : `SkyCube` Empty sky cube object. """ wcs = reference.wcs.copy() if isinstance(reference, SkyImage): if type(energies) == Energy: mode = 'center' enumbins = len(energies) elif type(energies) == EnergyBounds: mode = 'edges' enumbins = len(energies) - 1 else: raise ValueError("'energies' must be instance of Energy or EnergyBounds, " "but {} was given.".format(type(energies))) energy_axis = LogEnergyAxis(energies, mode=mode) data = np.ones(reference.data.shape) data = data * np.ones(enumbins).reshape((-1, 1, 1)) elif isinstance(reference, SkyCube): energy_axis = reference.energy_axis data = np.ones(reference.data.shape) else: raise ValueError("'reference' must be instance of SkyImage or SkyCube") return cls(data=data * fill * u.Unit(unit), wcs=wcs, energy_axis=energy_axis)
[docs] def energies(self, mode='center'): """Energy coordinate vector. Parameters ---------- mode : {'center', 'edges'} Return coordinate values at the pixels edges or pixel centers. Returns ------- coordinates : `~astropy.units.Quantity` Energy """ if mode == 'center': z = np.arange(self.data.shape[0]) elif mode == 'edges': z = np.arange(self.data.shape[0] + 1) - 0.5 else: raise ValueError('Invalid mode: {}'.format(mode)) return self.energy_axis.wcs_pix2world(z)
@property def energy_width(self): """Energy bin width vector (`~astropy.units.Quantity`)""" ebounds = self.energies(mode='edges') return ebounds[1:] - ebounds[:-1] @property def bin_size(self): """Sky cube element bin size (`~astropy.units.Quantity`) This is a convenience method which computes this:: cube.energy_width * cube.sky_image_ref.solid_angle() Units could be "TeV" (or whatever ``energy_width`` returns) times "sr" """ solid_angle = self.sky_image_ref.solid_angle() de = self.energy_width return solid_angle * de[:, np.newaxis, np.newaxis]
[docs] def cutout(self, position, size): """Cut out rectangular piece of a cube. See `~gammapy.image.SkyImage.cutout()` for details. """ out = [] for energy in self.energies(): image = self.sky_image(energy) cutout = image.cutout(position=position, size=size) out.append(cutout.data) data = Quantity(np.stack(out, axis=0), self.data.unit) wcs = cutout.wcs.copy() return self.__class__(name=self.name, data=data, wcs=wcs, meta=self.meta, energy_axis=self.energy_axis)
[docs] def wcs_skycoord_to_pixel(self, position, energy): """Convert world to pixel coordinates. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position on the sky. energy : `~astropy.units.Quantity` Energy Returns ------- (x, y, z) : tuple Tuple of x, y, z coordinates. """ x, y = self.sky_image_ref.wcs_skycoord_to_pixel(position) z = self.energy_axis.wcs_world2pix(energy) # TODO: check order, so that it corresponds to data axis order return x, y, z
[docs] def wcs_pixel_to_skycoord(self, x, y, z): """Convert pixel to world coordinates. Parameters ---------- x : `~numpy.ndarry` x coordinate array y : `~numpy.ndarry` y coordinate array z : `~numpy.ndarry` z coordinate array Returns ------- (position, energy) : tuple Tuple of (`~astropy.coordinates.SkyCoord`, `~astropy.units.Quantity`). """ position = self.sky_image_ref.wcs_pixel_to_skycoord(x, y) energy = self.energy_axis.wcs_pix2world(z) return position, energy
[docs] def to_sherpa_data3d(self, dstype='Data3D'): """Convert sky cube to sherpa `Data3D` or `Data3DInt` object. Parameters ---------- dstype : {'Data3D', 'Data3DInt'} Sherpa data type. """ from .sherpa_ import Data3D, Data3DInt energies = self.energies(mode='edges').to("TeV").value elo = energies[:-1] ehi = energies[1:] n_ebins = len(elo) if dstype == 'Data3DInt': coordinates = self.sky_image_ref.coordinates(mode="edges") ra = coordinates.data.lon.degree dec = coordinates.data.lat.degree ra_cube_hi = np.tile(ra[0:-1, 0:-1], (n_ebins, 1, 1)) ra_cube_lo = np.tile(ra[0:-1, 1:], (n_ebins, 1, 1)) dec_cube_hi = np.tile(dec[1:, 0:-1], (n_ebins, 1, 1)) dec_cube_lo = np.tile(dec[0:-1, 0:-1], (n_ebins, 1, 1)) elo_cube = elo.reshape(n_ebins, 1, 1) * np.ones_like(ra[0:-1, 0:-1]) * u.TeV ehi_cube = ehi.reshape(n_ebins, 1, 1) * np.ones_like(ra[0:-1, 0:-1]) * u.TeV return Data3DInt('', elo_cube.ravel(), ra_cube_lo.ravel(), dec_cube_lo.ravel(), ehi_cube.ravel(), ra_cube_hi.ravel(), dec_cube_hi.ravel(), self.data.value.ravel(), self.data.value.ravel().shape) elif dstype == 'Data3D': coordinates = self.sky_image_ref.coordinates() ra = coordinates.data.lon.degree dec = coordinates.data.lat.degree ra_cube = np.tile(ra, (n_ebins, 1, 1)) dec_cube = np.tile(dec, (n_ebins, 1, 1)) elo_cube = elo.reshape(n_ebins, 1, 1) * np.ones_like(ra) * u.TeV ehi_cube = ehi.reshape(n_ebins, 1, 1) * np.ones_like(ra) * u.TeV return Data3D('', elo_cube.ravel(), ehi_cube.ravel(), ra_cube.ravel(), dec_cube.ravel(), self.data.value.ravel(), self.data.value.ravel().shape) else: raise ValueError('Invalid sherpa data type.')
[docs] def sky_image(self, energy, interpolation=None): """Slice a 2-dim `~gammapy.image.SkyImage` from the cube at a given energy. Parameters ---------- energy : `~astropy.units.Quantity` Energy value interpolation : {None, 'linear', 'nearest'} Interpolate data values between energies. None corresponds to 'nearest', but might have advantages in performance, because no interpolator is set up. Returns ------- image : `~gammapy.image.SkyImage` 2-dim sky image """ # TODO: should we pass something in SkyImage (we speak about meta)? z = self.energy_axis.wcs_world2pix(energy) if interpolation: y = np.arange(self.data.shape[1]) x = np.arange(self.data.shape[2]) z, y, x = np.meshgrid(z, y, x, indexing='ij') data = self._interpolate_data(z, y, x)[0] else: data = self.data[int(np.rint(z))].copy() wcs = self.wcs.deepcopy() if self.wcs else None return SkyImage(name=self.name, data=data, wcs=wcs)
[docs] def sky_image_idx(self, idx): """Slice a 2-dim `~gammapy.image.SkyImage` from the cube at a given index. Parameters ---------- idx : int Index of the sky image. Returns ------- image : `~gammapy.image.SkyImage` 2-dim sky image """ data = self.data[idx].copy() wcs = self.wcs.deepcopy() if self.wcs else None return SkyImage(name=self.name, data=data, wcs=wcs)
@lazyproperty def sky_image_ref(self): """Empty reference `~gammapy.image.SkyImage`. Examples -------- Can be used to access the spatial information of the cube: >>> from gammapy.cube import SkyCube >>> cube = SkyCube.empty() >>> coords = cube.sky_image_ref.coordinates() >>> solid_angle = cube.sky_image_ref.solid_angle() """ wcs = self.wcs.celestial data = np.zeros_like(self.data[0]) return SkyImage(name=self.name, data=data, wcs=wcs)
[docs] def lookup(self, position, energy, interpolation=None): """Lookup value in the cube at given sky position and energy. Parameters ---------- position : `~astropy.coordinates.SkyCoord` Position on the sky. energy : `~astropy.units.Quantity` Energy interpolation : {None, 'linear', 'nearest'} Interpolate data values between energies. Returns ------- value : `~astropy.units.Quantity` Value at the given sky position and energy. """ # TODO: add interpolation option using NDDataArray x, y, z = self.wcs_skycoord_to_pixel(position, energy) if interpolation: vals = self._interpolate_data(z, y, x) else: vals = self.data[np.rint(z).astype('int'), np.rint(y).astype('int'), np.rint(x).astype('int')] return vals
[docs] def show(self, viewer='mpl', ds9options=None, **kwargs): """Show sky cube in image viewer. Parameters ---------- viewer : {'mpl', 'ds9'} Which image viewer to use. Option 'ds9' requires ds9 to be installed. ds9options : list, optional List of options passed to ds9. E.g. ['-cmap', 'heat', '-scale', 'log']. Any valid ds9 command line option can be passed. See http://ds9.si.edu/doc/ref/command.html for details. **kwargs : dict Keyword arguments passed to `matplotlib.pyplot.imshow`. """ from ipywidgets import interact if viewer == 'mpl': max_ = self.data.shape[0] - 1 def show_image(idx): energy = self.energy_axis.wcs_pix2world(idx) image = self.sky_image(energy) image.data = image.data.value try: norm = kwargs['norm'] norm.vmax = np.nanmax(image.data) except KeyError: pass image.show(**kwargs) return interact(show_image, idx=(0, max_, 1)) elif viewer == 'ds9': raise NotImplementedError else: raise ValueError('Invalid viewer: {}'.format(viewer))
[docs] def plot_rgb(self, ax=None, fig=None, **kwargs): """Plot sky cube as RGB image. Parameters ---------- ax : `~astropy.visualization.wcsaxes.WCSAxes`, optional WCS axis object to plot on. fig : `~matplotlib.figure.Figure`, optional Figure Returns ------- ax : `~astropy.visualization.wcsaxes.WCSAxes` WCS axis object """ import matplotlib.pyplot as plt if fig is None: fig = plt.gcf() if ax is None: ax = fig.add_subplot(1, 1, 1, projection=self.wcs) kwargs['origin'] = kwargs.get('origin', 'lower') kwargs['interpolation'] = kwargs.get('interpolation', 'None') ne = self.data.shape[0] if ne != 3: raise ValueError("Energy axes must be length 3, but is {}".format(ne)) data_rgb = [] for idx in range(3): data_rgb.append(self.data[idx, :, :]) ax.imshow(np.dstack(data_rgb), **kwargs) try: ax.coords['glon'].set_axislabel('Galactic Longitude') ax.coords['glat'].set_axislabel('Galactic Latitude') except KeyError: ax.coords['ra'].set_axislabel('Right Ascension') ax.coords['dec'].set_axislabel('Declination') # without this the axis limits are changed when calling scatter ax.autoscale(enable=False) return ax
[docs] def sky_image_integral(self, emin, emax, nbins=10, per_decade=False, interpolation='linear'): """Integrate cube along the energy axes using the log-log trapezoidal rule. Parameters ---------- emin : `~astropy.units.Quantity` Integration range minimum. emax : `~astropy.units.Quantity` Integration range maximum. nbins : int, optional Number of grid points used for the integration. per_decade : bool Whether nbins is per decade. interpolation : {None, 'linear', 'nearest'} Interpolate data values between energies. Returns ------- image : `~gammapy.image.SkyImage` Integral image. """ y, x = np.indices(self.data.shape[1:]) if interpolation: energy = Energy.equal_log_spacing(emin, emax, nbins, per_decade=per_decade) z = self.energy_axis.wcs_world2pix(energy).reshape(-1, 1, 1) y = np.arange(self.data.shape[1]) x = np.arange(self.data.shape[2]) z, y, x = np.meshgrid(z, y, x, indexing='ij') data = self._interpolate_data(z, y, x) else: energy_slice = self._get_energy_slice_for_energy_range(emin, emax) energy = self.energies()[energy_slice] data = self.data[energy_slice] integral = _trapz_loglog(data, energy, axis=0) name = 'integrated {}'.format(self.name) return SkyImage(name=name, data=integral, wcs=self.wcs.celestial)
# TODO: is this the rounding we want? document what it does! def _get_energy_slice_for_energy_range(self, emin, emax): idx_min = np.rint(self.energy_axis.wcs_world2pix(emin)).astype('int') idx_max = np.rint(self.energy_axis.wcs_world2pix(emax)).astype('int') return slice(idx_min, idx_max)
[docs] def sky_image_sum(self, emin, emax): """Sum cube along the energy axis. Similar to the ``sky_image_integral`` method, but not doing interpolation / integration. Just selects a subset of energy bins and sums those. This is useful for counts. """ sky_image = self.sky_image_ref() energy_slice = self._get_energy_slice_for_energy_range(emin, emax) sky_image.data = self.data[energy_slice].sum(axis=1) return sky_image
[docs] def reproject(self, reference, mode='interp', *args, **kwargs): """Reproject spatial dimensions onto a reference image. Parameters ---------- reference : `~astropy.io.fits.Header`, `~gammapy.image.SkyImage` or `SkyCube` Reference wcs specification to reproject the data on. mode : {'interp', 'exact'} Interpolation mode. *args : list Arguments passed to `~reproject.reproject_interp` or `~reproject.reproject_exact`. **kwargs : dict Keyword arguments passed to `~reproject.reproject_interp` or `~reproject.reproject_exact`. Returns ------- reprojected_cube : `SkyCube` Cube spatially reprojected to the reference. """ if isinstance(reference, SkyCube): reference = reference.sky_image_ref out = [] for energy in self.energies(): image = self.sky_image(energy) image_out = image.reproject(reference, mode=mode, *args, **kwargs) out.append(image_out.data) data = Quantity(np.stack(out, axis=0), self.data.unit) wcs = image_out.wcs.copy() return self.__class__(name=self.name, data=data, wcs=wcs, meta=self.meta, energy_axis=self.energy_axis)
[docs] def convolve(self, kernels, **kwargs): """Convolve cube with a given set of kernels. Parameters ---------- kernels : list or `~numpy.ndarray` List of 2D convolution kernels or 3D array. The energy axis must correspond to array axis=0. Returns ------- convolved : `SkyCube` Convolved cube. """ data = [] if not len(kernels) == self.data.shape[0]: raise ValueError('Number of kernels must match size of energy axis' ' of the cube.') for idx, kernel in enumerate(kernels): image = self.sky_image_idx(idx) data.append(image.convolve(kernel, **kwargs).data) convolved = u.Quantity(data) wcs = self.wcs.deepcopy() if self.wcs else None return self.__class__(name=self.name, data=convolved, wcs=wcs, energy_axis=self.energy_axis)
[docs] def to_fits(self, format): """Write to FITS HDU list. Parameters ---------- format : {'fermi-counts', 'fermi-background', 'fermi-exposure'} Fits file format. Returns ------- hdu_list : `~astropy.io.fits.HDUList` * hdu_list[0] : `~astropy.io.fits.PrimaryHDU` Image array of data * hdu_list[1] : `~astropy.io.fits.BinTableHDU` Table of energies """ image_hdu = fits.PrimaryHDU(self.data.value, self.wcs.to_header()) image_hdu.header['SPECUNIT'] = '{0.unit:FITS}'.format(self.data) if format == 'fermi-counts': energies = self.energies(mode='edges') # for BinTableHDU's the data must be added via a Table object energy_table = Table() energy_table['E_MIN'] = energies[:-1] energy_table['E_MAX'] = energies[1:] energy_table.meta['name'] = 'EBOUNDS' energy_hdu = table_to_fits_table(energy_table) elif format in ['fermi-exposure', 'fermi-background']: # for BinTableHDU's the data must be added via a Table object energy_table = Table() energy_table['Energy'] = self.energies() energy_table.meta['name'] = 'ENERGIES' energy_hdu = table_to_fits_table(energy_table) else: raise ValueError('Not a valid cube fits format') hdu_list = fits.HDUList([image_hdu, energy_hdu]) return hdu_list
[docs] def to_images(self): """Convert to `~gammapy.cube.SkyCubeImages`.""" from .images import SkyCubeImages energies = self.energies(mode='center') images = [self.sky_image(energy) for energy in energies] return SkyCubeImages(self.name, images, self.wcs, energies)
[docs] def spectrum(self, region): """Extract spectrum in a given sky region. Parameters ---------- region : `~regions.SkyRegion` Sky region to extract the spectrum from. Returns ------- spectrum : `~astropy.table.Table` Summed spectrum of pixels in the mask. """ spectrum = Table() # store energy binning energies = self.energies('edges') e_ref = self.energies('center') spectrum['e_min'] = energies[:-1] spectrum['e_max'] = energies[1:] spectrum['e_ref'] = e_ref # mask region and sum mask = self.region_mask(region) value = (self.data * mask.data).sum(-1).sum(-1) spectrum['value'] = value return spectrum
[docs] def region_mask(self, region): """Create a boolean cube mask for a region. The mask is: - ``True`` for pixels inside the region - ``False`` for pixels outside the region Parameters ---------- region : `~regions.PixelRegion` or `~regions.SkyRegion` Region in pixel or sky coordinates. Returns ------- mask : `SkyCube` A boolean sky cube mask. """ mask = self.sky_image_ref.region_mask(region) data = mask.data * np.ones(self.data.shape, dtype='bool') * u.Unit('') wcs = self.wcs.deepcopy() if self.wcs else None return self.__class__(name=self.name, data=data.astype('bool'), wcs=wcs, energy_axis=self.energy_axis)
[docs] def write(self, filename, format='fermi-counts', **kwargs): """Write to FITS file. Parameters ---------- filename : str Filename format : {'fermi-counts', 'fermi-background', 'fermi-exposure'} Fits file format. """ filename = str(make_path(filename)) self.to_fits(format).writeto(filename, **kwargs)
def __str__(self): ss = "Sky cube {} with shape={}".format(self.name, self.data.shape) if self.data.unit is u.dimensionless_unscaled: ss += ":\n" else: ss += " and unit={}:\n".format(self.data.unit) ss += " n_lon: {:5d} type_lon: {:15s} unit_lon: {}\n".format( self.data.shape[2], self.wcs.wcs.ctype[0], self.wcs.wcs.cunit[0]) ss += " n_lat: {:5d} type_lat: {:15s} unit_lat: {}\n".format( self.data.shape[1], self.wcs.wcs.ctype[1], self.wcs.wcs.cunit[1]) ss += " n_energy: {:5d} unit_energy: {}\n".format( self.data.shape[0], self.energy_axis._eunit) return ss
[docs] def info(self): """Print summary info about the cube.""" print(repr(self))
[docs] @staticmethod def assert_allclose(cube1, cube2): from ..utils.testing import assert_wcs_allclose assert cube1.name == cube2.name assert_allclose(cube1.data, cube2.data) # TODO: add check_unit option, just like SkyImage has it. assert_allclose(cube1.energies(), cube2.energies()) assert_wcs_allclose(cube1.wcs, cube2.wcs)
[docs] def to_wcs_nd_map(self, energy_axis_mode='center'): """Convert to a `gammapy.maps.WcsNDMap`. There is no copy of the ``data`` or ``wcs`` object, this conversion is cheap. This is meant to help migrate code using `SkyCube` over to the new maps classes. """ from gammapy.maps import WcsNDMap, WcsGeom, MapAxis if energy_axis_mode == 'center': energy = self.energies(mode='center') energy_axis = MapAxis.from_nodes(energy.value, unit=energy.unit) elif energy_axis_mode == 'edges': energy = self.energies(mode='edges') energy_axis = MapAxis.from_edges(energy.value, unit=energy.unit) else: raise ValueError('Invalid energy_axis_mode: {}'.format(energy_axis_mode)) # Axis order in SkyCube: energy, lat, lon npix = (self.data.shape[2], self.data.shape[1]) geom = WcsGeom(wcs=self.wcs, npix=npix, axes=[energy_axis]) # TODO: change maps and SkyCube to have a unit attribute # For now, SkyCube is a mix of numpy array and quantity in `data` # and we just strip the unit here data = np.asarray(self.data) # unit = getattr(self.data, 'unit', None) return WcsNDMap(geom=geom, data=data)
[docs] @classmethod def from_wcs_nd_map(cls, wcs_map_nd): """Create from a `gammapy.maps.WcsNDMap`. There is no copy of the ``data`` or ``wcs`` object, this conversion is cheap. This is meant to help migrate code using `SkyCube` over to the new maps classes. """ geom_axis = wcs_map_nd.geom.axes[0] if geom_axis.node_type == 'center': energy = geom_axis.center * geom_axis.unit energy_axis = LogEnergyAxis(energy, mode='center') elif geom_axis.node_type == 'edges': energy = geom_axis.edges * geom_axis.unit energy_axis = LogEnergyAxis(energy, mode='edges') else: raise ValueError('Not supported: node_type: {}'.format(geom_axis.node_type)) data = wcs_map_nd.data # TODO: copy unit once it's added to # if wcs_map_nd.unit is not None: # data = data * wcs_map_nd.unit return cls( data=data, wcs=wcs_map_nd.geom.wcs, energy_axis=energy_axis, )