Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 35 additions & 97 deletions DBCV/DBCV.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Society for Industrial and Applied Mathematics, 2014.
"""

from functools import reduce, partial
import numpy as np
from scipy.spatial.distance import euclidean, cdist
from scipy.sparse.csgraph import minimum_spanning_tree
Expand All @@ -19,10 +20,12 @@ def DBCV(X, labels, dist_function=euclidean):

Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
data to check validity of clustering, or a precomputed distances
matrix with dimensins [n_samples, n_samples]
labels (np.array): clustering assignments for data X
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
dist_dunction (func or int): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point.
If X is a precomputed distances matrix, pass an int of number of features.

Returns: cluster_validity (float)
score in range[-1, 1] indicating validity of clustering assignments
Expand All @@ -33,96 +36,50 @@ def DBCV(X, labels, dist_function=euclidean):
return cluster_validity


def _core_dist(point, neighbors, dist_function):
"""
Computes the core distance of a point.
Core distance is the inverse density of an object.

Args:
point (np.array): array of dimensions (n_features,)
point to compute core distance of
neighbors (np.ndarray): array of dimensions (n_neighbors, n_features):
array of all other points in object class
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point

Returns: core_dist (float)
inverse density of point
"""
n_features = np.shape(point)[0]
n_neighbors = np.shape(neighbors)[0]

distance_vector = cdist(point.reshape(1, -1), neighbors)
distance_vector = distance_vector[distance_vector != 0]
numerator = ((1/distance_vector)**n_features).sum()
core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features)
return core_dist


def _mutual_reachability_dist(point_i, point_j, neighbors_i,
neighbors_j, dist_function):
""".
Computes the mutual reachability distance between points

Args:
point_i (np.array): array of dimensions (n_features,)
point i to compare to point j
point_j (np.array): array of dimensions (n_features,)
point i to compare to point i
neighbors_i (np.ndarray): array of dims (n_neighbors, n_features):
array of all other points in object class of point i
neighbors_j (np.ndarray): array of dims (n_neighbors, n_features):
array of all other points in object class of point j
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point

Returns: mutual_reachability (float)
mutual reachability between points i and j

"""
core_dist_i = _core_dist(point_i, neighbors_i, dist_function)
core_dist_j = _core_dist(point_j, neighbors_j, dist_function)
dist = dist_function(point_i, point_j)
mutual_reachability = np.max([core_dist_i, core_dist_j, dist])
return mutual_reachability


def _mutual_reach_dist_graph(X, labels, dist_function):
"""
Computes the mutual reach distance complete graph.
Graph of all pair-wise mutual reachability distances between points

Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
data to check validity of clustering or precomputed distances
labels (np.array): clustering assignments for data X
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
func args must be [np.array, np.array] where each array is a point.
If X is a distance matrix, pass an int of number of features.

Returns: graph (np.ndarray)
array of dimensions (n_samples, n_samples)
Graph of all pair-wise mutual reachability distances between points.

"""
n_samples = np.shape(X)[0]
graph = []
counter = 0
for row in range(n_samples):
graph_row = []
for col in range(n_samples):
point_i = X[row]
point_j = X[col]
class_i = labels[row]
class_j = labels[col]
members_i = _get_label_members(X, labels, class_i)
members_j = _get_label_members(X, labels, class_j)
dist = _mutual_reachability_dist(point_i, point_j,
members_i, members_j,
dist_function)
graph_row.append(dist)
counter += 1
graph.append(graph_row)
graph = np.array(graph)
if isinstance(dist_function, int):
n_samples = np.shape(X)[0]
n_features = dist_function
dists = X
graph = np.empty_like(X)
else:
n_samples = np.shape(X)[0]
n_features = np.shape(X)[1]
dists = cdist(X, X, dist_function)
# If we're calculating the distances ourselves, might as well reuse the array for later
graph = dists

core_dists = np.empty(n_samples)
for label in np.unique(labels):
mask = labels == label
distance_vectors = dists[mask, :][:, mask]
n_neighbors = distance_vectors.shape[0]
z = distance_vectors == 0
distance_vectors[z] = np.nan
numerator = np.nansum((1/distance_vectors)**n_features, axis=1)
cluster_core_dists = (numerator / (n_neighbors - 1)) ** (-1/n_features)
core_dists[mask] = cluster_core_dists

distances = np.broadcast_arrays(dists, core_dists[:, np.newaxis], core_dists[np.newaxis, :])
reduce(partial(np.maximum, out=graph), distances)

return graph


Expand Down Expand Up @@ -238,22 +195,3 @@ def _clustering_validity_index(MST, labels):
cluster_validity = _cluster_validity_index(MST, labels, label)
validity_index += fraction * cluster_validity
return validity_index


def _get_label_members(X, labels, cluster):
"""
Helper function to get samples of a specified cluster.

Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest

Returns: members (np.ndarray)
array of dimensions (n_samples, n_features) of samples of the
specified cluster.
"""
indices = np.where(labels == cluster)[0]
members = X[indices]
return members