Source code for gammapy.utils.cluster

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Utilities for hierarchical/agglomerative clustering."""
import numpy as np
import scipy.cluster.hierarchy as sch

__all__ = ["standard_scaler", "hierarchical_clustering"]


[docs] def standard_scaler(features): r"""Compute standardized features by removing the mean and scaling to unit variance. Calculated through: .. math:: f_\text{scaled} = \frac{f-\text{mean}(f)}{\text{std}(f)} . Parameters ---------- features : `~astropy.table.Table` Table containing the features. Returns ------- scaled_features : `~astropy.table.Table` Table containing the scaled features (dimensionless). Examples -------- Compute standardized features of a cluster observations based on their IRF quantities:: >>> from gammapy.data.utils import get_irfs_features >>> from gammapy.data import DataStore >>> from gammapy.utils.cluster import standard_scaler >>> from astropy.coordinates import SkyCoord >>> import astropy.units as u >>> data_store = DataStore.from_dir("$GAMMAPY_DATA/hess-dl3-dr1/") >>> obs_ids = data_store.obs_table["OBS_ID"][:3] >>> obs = data_store.get_observations(obs_ids) >>> position = SkyCoord(329.716 * u.deg, -30.225 * u.deg, frame="icrs") >>> names = ["edisp-res", "psf-radius"] >>> features_irfs = get_irfs_features( ... obs, ... energy_true="1 TeV", ... position=position, ... names=names ... ) >>> scaled_features_irfs = standard_scaler(features_irfs) >>> print(scaled_features_irfs) edisp-res obs_id psf-radius ------------------- ------ -------------------- -0.1379190199428797 20136 -0.18046952655570045 1.2878662980210884 20137 1.3049664466089965 -1.1499472780781963 20151 -1.1244969200533408 """ scaled_features = features.copy() for col in scaled_features.columns: if col not in ["obs_id", "dataset_name"]: data = scaled_features[col].data scaled_features[col] = (data - data.mean()) / data.std() return scaled_features
[docs] def hierarchical_clustering(features, linkage_kwargs=None, fcluster_kwargs=None): """Hierarchical clustering using given features. Parameters ---------- features : `~astropy.table.Table` Table containing the features. linkage_kwargs : dict, optional Arguments forwarded to `scipy.cluster.hierarchy.linkage`. Default is None, which uses method="ward" and metric="euclidean". fcluster_kwargs : dict, optional Arguments forwarded to `scipy.cluster.hierarchy.fcluster`. Default is None, which uses criterion="maxclust" and t=3. Returns ------- features : `~astropy.table.Table` Table containing the features and an extra column for the groups labels. Examples -------- Cluster features into t=2 groups with a corresponding label for each group:: >>> from gammapy.data.utils import get_irfs_features >>> from gammapy.data import DataStore >>> from gammapy.utils.cluster import standard_scaler, hierarchical_clustering >>> from astropy.coordinates import SkyCoord >>> import astropy.units as u >>> data_store = DataStore.from_dir("$GAMMAPY_DATA/hess-dl3-dr1/") >>> obs_ids = data_store.obs_table["OBS_ID"][13:20] >>> obs = data_store.get_observations(obs_ids) >>> position = SkyCoord(329.716 * u.deg, -30.225 * u.deg, frame="icrs") >>> names = ["edisp-res", "psf-radius"] >>> features_irfs = get_irfs_features( ... obs, ... energy_true="1 TeV", ... position=position, ... names=names ... ) >>> scaled_features_irfs = standard_scaler(features_irfs) >>> features = hierarchical_clustering(scaled_features_irfs, fcluster_kwargs={"t": 2}) >>> print(features) edisp-res obs_id psf-radius labels ------------------- ------ ------------------- ------ -1.3020791585772495 20326 -1.2471938975366008 2 -1.3319831545301117 20327 -1.4586649826004114 2 -0.7763307219821931 20339 -0.6705024680435898 2 0.9677107409819438 20343 0.9500979841335693 1 0.820562952023891 20344 0.8160964882165554 1 0.7771617763704126 20345 0.7718272408581743 1 0.8449575657133206 20346 0.8383396349722769 1 """ features = features.copy() features_array = np.array( [ features[col].data for col in features.columns if col not in ["obs_id", "dataset_name"] ] ).T default_linkage_kwargs = dict(method="ward", metric="euclidean") if linkage_kwargs is not None: default_linkage_kwargs.update(linkage_kwargs) pairwise_distances = sch.distance.pdist(features_array) linkage = sch.linkage(pairwise_distances, **default_linkage_kwargs) default_fcluster_kwargs = dict(criterion="maxclust", t=3) if fcluster_kwargs is not None: default_fcluster_kwargs.update(fcluster_kwargs) labels = sch.fcluster(linkage, **default_fcluster_kwargs) features["labels"] = labels return features