Optimizing K-Means Clustering for Time Series Data
Given the amount of data we collect, faster clustering times are crucial. Here's how we sped up our k-means clustering process!
Join the DZone community and get the full member experience.
Join For FreeHere at New Relic, we collect 1.37 billion data points per minute. A vast amount of the data we collect, analyze, and display for our customers is stored as time series. In an effort to build relationships between applications and other entities, such as servers and containers, for new, intelligent products like New Relic Radar, we're constantly exploring faster and more efficient methods of grouping time series data. Given the amount of data we collect, faster clustering times are crucial.
Speeding Up K-Means Clustering
A popular method of grouping data is k-means clustering. The basic principle of k-means involves determining the distances between each data point and grouping them into meaningful clusters. This process is usually demonstrated using two-dimensional data on a plane. It's certainly possible to cluster in more than two dimensions — but our ability to visualize the data quickly becomes more complicated. For example, the following graphic shows the convergence of k-means clustering in two arbitrary dimensions over several iterations:
Unfortunately, this method does not map well to time series since they are generally one-dimensional over time. However, we can still compute a distance factor between two time series using a number of different functions. In these examples, we use the mean squared error (MSE) to explore different k-means implementations. In testing these implementations, we noticed many suffered from serious performance issues, but we were still able to demonstrate how it's possible to speed up k-means clustering, in some cases, by an order of magnitude.
For this exercise, we'll use Python's NumPy package. If you're following along on your own, you can copy and paste the code right into a Jupyter Notebook. Let's start by importing the packages we'll use along the way:
import time
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
In the following tests, we start by generating 10,000 random time series, 500 samples long. Then we add noise to sine waves of random lengths. While this kind of data isn't really ideal for k-means clustering, it should be enough to strain unoptimized implementations.
n = 10000
ts_len = 500
phases = np.array(np.random.randint(0, 50, [n, 2]))
pure = np.sin([np.linspace(-np.pi * x[0], -np.pi * x[1], ts_len) for x in phases])
noise = np.array([np.random.normal(0, 1, ts_len) for x in range(n)])
signals = pure * noise
# Normalize everything between 0 and 1
signals += np.abs(np.min(signals))
signals /= np.max(signals)
plt.plot(signals[0])
The First Implementation
Let's start with the most basic and straightforward implementation we encountered. euclid_dist
implements a simple MSE estimator for the distance function, and k_means
implements the basic k-means algorithm. We chose num_clust
random time series from our initial dataset as centroids (which represent the middle of each cluster). For num_iter
iterations, we continually moved the centroids around while minimizing the distance between them and the other time series.
def euclid_dist(t1, t2):
return np.sqrt(((t1-t2)**2).sum())
def k_means(data, num_clust, num_iter):
centroids = signals[np.random.randint(0, signals.shape[0], num_clust)]
for n in range(num_iter):
assignments={}
for ind, i in enumerate(data):
min_dist = float('inf')
closest_clust = None
for c_ind, j in enumerate(centroids):
dist = euclid_dist(i, j)
if dist < min_dist:
min_dist = dist
closest_clust = c_ind
if closest_clust in assignments:
assignments[closest_clust].append(ind)
else:
assignments[closest_clust]=[]
assignments[closest_clust].append(ind)
for key in assignments:
clust_sum = 0
for k in assignments[key]:
clust_sum = clust_sum + data[k]
centroids[key] = [m / len(assignments[key]) for m in clust_sum]
return centroids
t1 = time.time()
centroids = k_means(signals, 100, 100)
t2 = time.time()
print("Took {} seconds".format(t2 - t1))
Took 1138.8745470046997 seconds
It took almost 20 minutes to cluster the data. That isn't terrible, but it's certainly not great. To speed this up for the next implementation, we decided to eliminate as many for loops as possible.
The Vectorized Implementation
One of the awesome benefits of using NumPy is vectorized operations. (If you're unfamiliar with vector operations, check out this excellent review.)
The k-means algorithm calls for pairwise comparisons between each centroid and data point. This means, in our previous iteration, we compared each of our 100 centroids to 10,000 time series for a total of one million comparisons per iteration. Keep in mind that each comparison involved two sets of 500 samples. Since we iterated 100 times, that gave us a total of 100 million comparisons-quite a bit of work for a single CPU. While Python is a reasonably efficient language, it's hard to beat operations written in C. For this reason, most of NumPy's core operations are written in C and vectorized to minimize overhead incurred by loops.
Let's explore how we vectorized our code in order to eliminate as many for
loops as possible.
First, we divided the code into functional blocks. This gave us a better idea of what each section was responsible for. Next, we altered the calc_centroids
step to only iterate over centroids (instead of each time series). This way, we were passing all the time series and one centroid to euclid_dist
. We also allocated the dist
matrix up front instead of treating it like a dictionary and expanding it over time. NumPy's argmin
compared each vector pair in one shot.
In move_centroids
, we collapsed another for loop using vector operations, and we iterated only over the unique set of centroids. If we lost a centroid, we added the appropriate number by randomly choosing from our time series dataset (this rarely happens in the wild).
Lastly, we added an early stopping check to k_means
to halt our iterations if there were no updates to the centroids.
Here's a look at the code:
def euclid_dist(t1, t2):
return np.sqrt(((t1-t2)**2).sum(axis = 1))
def calc_centroids(data, centroids):
dist = np.zeros([data.shape[0], centroids.shape[0]])
for idx, centroid in enumerate(centroids):
dist[:, idx] = euclid_dist(centroid, data)
return np.array(dist)
def closest_centroids(data, centroids):
dist = calc_centroids(data, centroids)
return np.argmin(dist, axis = 1)
def move_centroids(data, closest, centroids):
k = centroids.shape[0]
new_centroids = np.array([data[closest == c].mean(axis = 0) for c in np.unique(closest)])
if k - new_centroids.shape[0] > 0:
print("adding {} centroid(s)".format(k - new_centroids.shape[0]))
additional_centroids = data[np.random.randint(0, data.shape[0], k - new_centroids.shape[0])]
new_centroids = np.append(new_centroids, additional_centroids, axis = 0)
return new_centroids
def k_means(data, num_clust, num_iter):
centroids = signals[np.random.randint(0, signals.shape[0], num_clust)]
last_centroids = centroids
for n in range(num_iter):
closest = closest_centroids(data, centroids)
centroids = move_centroids(data, closest, centroids)
if not np.any(last_centroids != centroids):
print("early finish!")
break
last_centroids = centroids
return centroids
t1 = time.time()
centroids = k_means(signals, 100, 100)
t2 = time.time()
print("Took {} seconds".format(t2 - t1))
adding 1 centroid(s)
early finish!
took 206.72993397712708 seconds
Just a bit longer than 3.5 minutes. Not bad! But we wanted to finish even faster.
The K-Means++ Implementation
For the next iteration, we applied the k-means++ algorithm. This algorithm seeks to choose better initial centroids. Let's see if this optimization helped...
def init_centroids(data, num_clust):
centroids = np.zeros([num_clust, data.shape[1]])
centroids[0,:] = data[np.random.randint(0, data.shape[0], 1)]
for i in range(1, num_clust):
D2 = np.min([np.linalg.norm(data - c, axis = 1)**2 for c in centroids[0:i, :]], axis = 0)
probs = D2/D2.sum()
cumprobs = probs.cumsum()
ind = np.where(cumprobs >= np.random.random())[0][0]
centroids[i, :] = np.expand_dims(data[ind], axis = 0)
return centroids
def k_means(data, num_clust, num_iter):
centroids = init_centroids(data, num_clust)
last_centroids = centroids
for n in range(num_iter):
closest = closest_centroids(data, centroids)
centroids = move_centroids(data, closest, centroids)
if not np.any(last_centroids != centroids):
print("Early finish!")
break
last_centroids = centroids
return centroids
t1 = time.time()
centroids = k_means(signals, 100, 100)
t2 = time.time()
print("Took {} seconds".format(t2 - t1))
early finish!
took 180.91435194015503 seconds
Adding the k-means++ algorithm yielded slightly better performance compared to our previous iteration. However, this optimization really started to pay off once we parallelized it.
The Parallel Implementation
Until this point, all of our implementations were single-threaded, so we decided to explore parallelizing parts of the k-means++ algorithm. Since we were using a Jupyter Notebook, we opted to manage parallelism with ipyparallel for parallel computing. With ipyparallel, we didn't have to worry about forking the entire server, but there were a few idiosyncrasies we needed to address. For instance, we had to inform our worker nodes to load NumPy.
import ipyparallel as ipp
c = ipp.Client()
v = c[:]
v.use_cloudpickle()
with v.sync_imports():
import numpy as np
(For details on launching workers, see the pyparallel getting started guide.)
In this iteration, we focused on two areas for parallelization. First, calc_centroids
had a loop that iterated over each centroid and compared it with our time series data. We used map_sync
to send each of these iterations to our workers.
Next, we parallelized a similar loop in the k-means++ centroid search. Note the call to v.push
: since our lambda referenced data
, we needed to make sure it was available on the worker nodes. We accomplished this by calling ipyparallel's push
method to copy the variable into the workers' global scope.
Take a look:
def calc_centroids(data, centroids):
return np.array(v.map_sync(lambda x: np.sqrt(((x - data)**2).sum(axis = 1)), centroids))
def closest_centroids(points, centroids):
dist = calc_centroids(points, centroids)
return np.argmin(dist, axis=0)
def init_centroids(data, num_clust):
v.push(dict(data=data))
centroids = np.zeros([num_clust, data.shape[1]])
centroids[0,:] = data[np.random.randint(0, data.shape[0], 1)]
for i in range(1, num_clust):
D2 = np.min(v.map_sync(lambda c: np.linalg.norm(data - c, axis = 1)**2, centroids[0:i,:]), axis = 0)
probs = D2/D2.sum()
cumprobs = probs.cumsum()
ind = np.where(cumprobs >= np.random.random())[0][0]
centroids[i, :] = np.expand_dims(data[ind], axis = 0)
return centroids
t1 = time.time()
centroids = k_means(signals, 100, 100)
t2 = time.time()
print("Took {} seconds".format(t2 - t1))
adding 2 centroid(s)
early finish!
took 143.49819207191467 seconds
At just a bit more than two minutes, this iteration achieved our fastest time yet!
Next Steps: Go Faster!
In these tests, we relied on central processing unit (CPU) cores. CPUs offer convenient parallelization, but we suspect that with a little more effort, we could implement clustering orders of a magnitude faster using graphical processing units (GPUs). We might be able to do this using TensorFlow, open-source software for numerical computations and machine learning. In fact, TensorFlow already includes a k-means implementation, but we'll almost certainly have to tweak it to support time-series clustering. At any rate, we'll never stop looking for more efficient and faster clustering algorithms to help manage our users' data.
This article was originally posted here.
Published at DZone with permission of Jason Snell, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments