Source code for kmean

import numpy as np 
from src.clustering.clustering import Clustering
from math import isnan

[docs]class KCluster(Clustering): ''' A class to represent a k-means clustering model. Attributes ----------- k : int How many clusters to use in the algorithm final_clusters_idx : numpy.ndarray Store which cluster each observation belongs to final_centers : numpy.ndarray A self.k by self.dimension matrix storing the positions of the cluster centers number_iterations : int How many steps the algorithm took to converge threshold : float How small the error needs to be to end the convergence feature_means : numpy.ndarray Store the predicted position each observation is centered around. ''' def __init__(self, features, standardized=True, k=3, threshold=0.01): self.train_features = features self.test_features = np.array([[]]) super().__init__(features, standardized) self.k = k self.final_clusters_idx = None self.initial_means = None self.number_iterations = None self.threshold = threshold self.feature_means = None self.feature_clusters = None self.fit()
[docs] @staticmethod # Note, REUSE THIS FOR KNN!! def compute_distance_matrix(features, centers): ''' Compute distances of each of n row vectors in a matrix to each of k vectors representing 'centers.' Parameters ----------- features : numpy.ndarray n by d array, where d is the dimension of dataset, n is the sample size centers : numpy.ndarray k by d array, where k is the number of clusters and d is the dimension Returns -------- distance_matrix : numpy.ndarray An n by k array. The (i,j) entry is the distance from the i-th row of X to the jth row of centers. We use the l_2 Euclidean norm for d dimensions. Notes ------ An efficient numpy procedure (using its broadcasting functionality) to compute all pairwise differences between two collections of data points is given in [4]_. We use this, as an alternative to using a manual nested 'for-loop' procedure. References ------------ .. [4] https://sparrow.dev/pairwise-distance-in-numpy/ ''' pairwise_differences = features[:, None, :] - centers[None, :, :] distance_matrix = np.linalg.norm(pairwise_differences, axis = -1) return distance_matrix
[docs] @staticmethod def update_clusters(features, centers): ''' Given a current set of k centers, compute which cluster each row vector of the data matrix is closest to. Parameters ----------- features : numpy.ndarray n by d array, where d is the dimension of dataset, n is the sample size centers : numpy.ndarray k by d array, where k is the number of centers Returns -------- closest_center_idx : numpy.ndarray A vector of length n whose i-th entry is the integer p minimizing distance of ith row of features to the pth cluster point. We use the d-dimensional l_2 Euclidean norm. ''' distance_matrix = KCluster.compute_distance_matrix(features, centers) n = features.shape[0] closest_center_idx = np.zeros(n, dtype = np.int8) for row in range(n): distance_to_centers = distance_matrix[row, :] closest_center_indices = np.where(distance_to_centers == np.amin(distance_to_centers))[0] if row in closest_center_indices: closest_center_idx[row] = row else: closest_center_idx[row] = closest_center_indices[0] return closest_center_idx.astype(int)
[docs] @staticmethod def update_means(features, closest_center_idx, k): '''Compute the updated means given a set of new clustering centers. Parameters ----------- features : numpy.ndarray n by d array, where d is the dimension of dataset, n is the sample size closest_center_idx : numpy.ndarray n by 1 array k : int The number of centers. Returns -------- cluster_means : numpy.ndarray A k by d array. The i-th row is the d-dimensional element-wise mean of all row vectors of X in cluster i. ''' d = features.shape[1] cluster_means = np.zeros((k, d)) # Take d-dimensional mean of all points belonging to cluster i # Store in row i for cluster_index in range(k): cluster_subset = np.where(closest_center_idx == cluster_index)[0] if cluster_subset.size == 0: cluster_means[cluster_index, :] = np.random.rand(1, d) else: cluster_means[cluster_index, :] = np.mean(features[cluster_subset, :], axis = 0) return cluster_means
[docs] def fit(self): ''' Fit k-means to a dataset up to a predetermined threshold of convergence. ''' # Pick which rows of our data we will initialize as clusters (uniformly random) initial_data_idx = np.random.choice(self.sample_size, self.k) initial_data_idx.sort() # Store these row vectors as our first set of means, comprising the first cluster # An k times d array initial_means = self.features[initial_data_idx, :] self.initial_means = initial_means # Add comment # Initialize arbitrary large error, so first step will always run error = np.inf # Error of each step stored here errors = [] total_steps = 0 # How many iterations get performed? while error > self.threshold: total_steps += 1 updated_indices = KCluster.update_clusters(self.features, initial_means) updated_means = KCluster.update_means(self.features, updated_indices, self.k) error = np.linalg.norm(initial_means - updated_means) # How much did the centers move? errors.append(error) initial_means = updated_means # Make the update and begin next round self.final_clusters_idx = updated_indices self.final_centers = updated_means self.errors = errors self.number_iterations = total_steps feature_means = np.zeros((self.sample_size, self.dimension)) for i in range(self.sample_size): # The ith prediction is the jth row of the means, # where j = predicted_clusters[i] feature_means[i] = self.final_centers[updated_indices[i], :] self.feature_means = feature_means