-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkmedoids.py
91 lines (77 loc) · 3.56 KB
/
kmedoids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import numpy as np
import random
class KMedoids:
def __init__(self, distance_matrix, n_clusters=2, start_prob=0.90, end_prob=0.99):
if not 0 <= start_prob < end_prob <= 1:
raise ValueError('start_prob must be smaller than end_prob.')
if not n_clusters < len(distance_matrix):
raise ValueError('number of clusters must not exceed number of data points.')
self.distance_matrix = distance_matrix
self.n_clusters = n_clusters
self.n_points = len(distance_matrix)
self.start_prob = start_prob
self.end_prob = end_prob
self.clusters = None
self.medoids = None
def initialize_medoids(self):
# K-means++ initialization
medoids = [random.randint(0, self.n_points - 1)]
while len(medoids) != self.n_clusters:
distances = [self.get_closest_medoid(medoids, point)[1] for point in range(self.n_points)]
distances_index = np.argsort(distances)
start_index = int(self.start_prob * len(distances_index))
end_index = round(self.end_prob * (len(distances_index) - 1))
new_medoid = distances_index[random.randint(start_index, end_index)]
medoids.append(new_medoid)
return medoids
def get_distance(self, point1, point2):
return self.distance_matrix[point1][point2]
def get_closest_medoid(self, medoids, point):
closest_medoid = None
closest_distance = float('inf')
for medoid in medoids:
distance = self.get_distance(point, medoid)
if distance < closest_distance:
closest_medoid = medoid
closest_distance = distance
return closest_medoid, closest_distance
def associate_points_to_closest_medoid(self, medoids):
clusters = {medoid: [medoid] for medoid in medoids}
for point in range(self.n_points):
medoid, _ = self.get_closest_medoid(medoids, point)
clusters[medoid].append(point)
return clusters
def get_medoid_cost(self, medoid, clusters):
return np.mean([self.get_distance(point, medoid) for point in clusters[medoid]])
def get_configuration_cost(self, medoids, clusters):
return np.sum([self.get_medoid_cost(medoid, clusters) for medoid in medoids])
def get_non_medoids(self, medoids, clusters):
return [pt for points in clusters.values() for pt in points if pt not in medoids]
# https://en.wikipedia.org/wiki/K-medoids
def run(self, max_iterations=10, tolerance=0.01):
# 1- Initialize: select k of the n data points as the medoids.
self.medoids = self.initialize_medoids()
# 2- Associate each data point to the closest medoid.
self.clusters = self.associate_points_to_closest_medoid(self.medoids)
# 3- While the cost of the configuration decreases:
# 3.1- For each medoid m, for each non-medoid data point o:
# 3.1.1- Swap m and o, associate each data point to the closest medoid, recompute the cost (sum of distances of points to their medoid)
# 3.1.2- If the total cost of the configuration increased in the previous step, undo the swap
cost_change = float('inf')
current_cost = self.get_configuration_cost(self.medoids, self.clusters)
for _ in range(max_iterations):
if cost_change > tolerance:
cost_change = 0
for m in self.medoids:
for o in self.get_non_medoids(self.medoids, self.clusters):
new_medoids = [o] + [med for med in self.medoids if med != m]
new_clusters = self.associate_points_to_closest_medoid(new_medoids)
new_cost = self.get_configuration_cost(new_medoids, new_clusters)
if new_cost < current_cost:
self.medoids = new_medoids
self.clusters = new_clusters
cost_change = current_cost - new_cost
current_cost = new_cost
break
else:
break