Parallelized time series k means barycenter update procedure#321
Parallelized time series k means barycenter update procedure#321enricofallacara wants to merge 4 commits intotslearn-team:mainfrom
Conversation
|
I forgot to mention that also some redundant calls to the functions to_time_series_dataset() were removed because they caused memory allocation problems and they were basically not useful |
|
Hi @enricofallacara , I'm not the maintainer for tslearn but I thought I'd share some comments since this would be useful for me.
|
You are completly right, sorry for the psutil mistake. I have used it for some stuffs and i forgot to remove it. Also, I know that I had hard-coded the number of threads, but I was in a hurry and I loaded my implementation. Tomorrow I will fix this stuffs, thanks! |
|
Your PR is now in conflict with the master branch since we have refactored the code. I can take care of the merge to resolve the conflicts if you want, after which I could do a review of your code. |
|
Hello,
No problem with that. Sorry, I didn't have time to fix the code due to work
problems, I really appreciate your help.
Il giorno mar 5 gen 2021 alle ore 11:00 Romain Tavenard <
notifications@github.com> ha scritto:
… Hi @enricofallacara <https://github.com/enricofallacara>
Your PR is now in conflict with the master branch since we have refactored
the code. I can take care of the merge to resolve the conflicts if you
want, after which I could do a review of your code.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#321 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AKOARI3KO75OW4YPXDB54JTSYLPNTANCNFSM4U4M42ZQ>
.
|
# Conflicts: # tslearn/barycenters/dba.py # tslearn/clustering/kmeans.py # tslearn/metrics/dtw_variants.py Co-authored-by: enricofallacara <enrico.fallacara@gmail.com>
# Conflicts: # tslearn/barycenters/dba.py # tslearn/clustering/kmeans.py # tslearn/metrics/dtw_variants.py Co-authored-by: enricofallacara <enrico.fallacara@gmail.com>
| for dynamic time warping, with applications to clustering. Pattern | ||
| Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693 | ||
| """ | ||
| X_ = to_time_series_dataset(X) |
There was a problem hiding this comment.
I understand that you want to avoid a copy here, but making sure that the data is in the right format is important. Maybe we shoud tweak the to_time_series_dataset function to add an avoid_copy_if_possible parameter, WDYT?
|
|
||
|
|
||
| def _mm_assignment(X, barycenter, weights, metric_params=None): | ||
| def _mm_assignment(X, barycenter, weights, metric_params=None, n_treads=15): |
There was a problem hiding this comment.
We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.
| #with parallel_backend('threading'): | ||
| res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n)) | ||
| paths, dists = zip(*res) | ||
| paths = list(paths) |
There was a problem hiding this comment.
Do you really need these 2 casts?
| cost += dist_i ** 2 * weights[i] | ||
| list_p_k.append(path) | ||
| #with parallel_backend('threading'): | ||
| res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n)) |
There was a problem hiding this comment.
Should we define a cdist_dtw_path function instead?
There was a problem hiding this comment.
I think we should design a high-efficiency cdist_dtw_path function instead. The main bottlenecks are _mm_assignment function and _subgradient_valence_warping from my view. However, as DBA is communication-intensive, we require a good tune on parallelizing DBA's implementation. Perhaps, a good target is to ensure that all the available computing units are at the desired utilization level.
|
|
||
|
|
||
| def _subgradient_valence_warping(list_p_k, barycenter_size, weights): | ||
| def _subgradient_valence_warping(list_p_k, barycenter_size, weights, n_treads): |
There was a problem hiding this comment.
We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.
| list_w_k.append(w_k * weights[k]) | ||
| list_v_k.append(w_k.sum(axis=1) * weights[k]) | ||
| return w_k | ||
| w_k_ones = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(create_w_k)(p_k,barycenter_size) for p_k in list_p_k) |
There was a problem hiding this comment.
Is this part really crucial to parallelize? I would expect that it would be neglectible in terms of running times.
There was a problem hiding this comment.
This function cannot fully utilize CPU cores, even if we force to parallelize dba on different clusters and then becomes the bottleneck.
|
|
||
|
|
||
| def _mm_valence_warping(list_p_k, barycenter_size, weights): | ||
| def _mm_valence_warping(list_p_k, barycenter_size, weights, n_treads): |
There was a problem hiding this comment.
We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.
| #diag_sum_v_k = numpy.zeros(list_v_k[0].shape) | ||
| #for v_k in list_v_k: | ||
| # diag_sum_v_k += v_k | ||
| diag_sum_v_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.sum)(x) for x in zip(*numpy.array(list_v_k))) |
There was a problem hiding this comment.
Once again, I'm not sure we need to parallelize this part.
|
|
||
|
|
||
| def _mm_update_barycenter(X, diag_sum_v_k, list_w_k): | ||
| def _mm_update_barycenter(X, diag_sum_v_k, list_w_k, n_treads): |
There was a problem hiding this comment.
We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.
| #sum_w_x = numpy.zeros((barycenter_size, d)) | ||
| #for k, (w_k, x_k) in enumerate(zip(list_w_k, X)): | ||
| # sum_w_x += w_k.dot(x_k[:ts_size(x_k)]) | ||
| sum_w_x = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.dot)(w_k,x_k[:ts_size(x_k)]) for (w_k, x_k) in zip(list_w_k, X)) |
There was a problem hiding this comment.
Once again, I'm not sure we need to parallelize this part.
|
|
||
| def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None, | ||
| max_iter=30, tol=1e-5, weights=None, | ||
| max_iter=30, tol=1e-5, weights=None, n_treads=15, |
There was a problem hiding this comment.
We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.
| def dtw_barycenter_averaging_one_init(X, barycenter_size=None, | ||
| init_barycenter=None, | ||
| max_iter=30, tol=1e-5, weights=None, | ||
| n_treads=15, |
There was a problem hiding this comment.
We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.
| for Averaging in Dynamic Time Warping Spaces. | ||
| Pattern Recognition, 74, 340-358. | ||
| """ | ||
| X_ = to_time_series_dataset(X) |
There was a problem hiding this comment.
See comment above about the array copy issue.
|
|
||
| def _update_centroids(self, X): | ||
| metric_params = self._get_metric_params() | ||
| X_list = [] |
There was a problem hiding this comment.
Here, I guess we could use a comprehension list for better readability.
| self.cluster_centers_[k] = euclidean_barycenter( | ||
| X=X[self.labels_ == k]) | ||
| X_list.append(X[self.labels_ == k]) | ||
| cluster_centers = Parallel(backend='threading', prefer='threads', require='sharedmem', n_jobs=self.n_clusters, verbose=5)(delayed(dtw_barycenter_averaging)( |
There was a problem hiding this comment.
Take care that here, the function to be called depends on the value of self.metric
There was a problem hiding this comment.
Simply parallelize DBA over the clusters only speedup less than 0.5x times. More effort is required.
| compute_diagonal=True, dtype=numpy.float, *args, **kwargs): | ||
| """Compute cross-similarity matrix with joblib parallelization for a given | ||
| similarity function. | ||
|
|
There was a problem hiding this comment.
Could you please revert all these line suppressions? They tend to pack the docstrings.
| k=0 if compute_diagonal else 1, | ||
| m=len(dataset1)) | ||
| matrix[indices] = Parallel(n_jobs=n_jobs, | ||
| prefer="threads", |
There was a problem hiding this comment.
Should we always prefer this? Or should it be specified when calling the function?
| dataset1[i], dataset2[j], | ||
| *args, **kwargs | ||
| #dataset2 = to_time_series_dataset(dataset2, dtype=dtype) | ||
| with parallel_backend('loky'): |
There was a problem hiding this comment.
Is it a good idea to specify the parallel_backend here? Will loky always be a better alternative or could it depend on some specificities or one's data?
|
I have left a few comments. Maybe the main point would be to assess the improvement (in terms of running times) that these changes offer on some benchmark. |
|
Also, tests seem to fail at the moment (let me know if it's due to a bug in my merge). |
First of all I want to thank you for the amazing job that you have done creating this python package which is awesome and very useful. I have used it in my final thesis for a master degree in which I had to cluster some type of players based on their behaviors. The problem in using Time series k-means wiht my dataset derived from the fact that the actual implementation performs parallel computations only in distance matrices (DTW in my case),thus i re-wrote the code related to the MM algorithm and to the barycenter update proceudre in the following way: the barceynter update procedure creates a process for each cluster of the algorithm (equal to the value of k) and then each process creates a pool of threads (15 in my implementation), that are used to perform the barycenter update procedure in a parallel and efficient way.