diff --git a/include/dbscan/cell.h b/include/dbscan/cell.h index e4da7a3..ffd90ed 100644 --- a/include/dbscan/cell.h +++ b/include/dbscan/cell.h @@ -172,7 +172,11 @@ struct cellHash { bool replaceQ(eType c1, eType c2) {return 0;} bool cas(eType* p, eType o, eType n) { +#ifdef _MSC_VER return std::atomic_compare_exchange_strong_explicit( reinterpret_cast*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire); +#else + return __atomic_compare_exchange(p, &o, &n, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); +#endif } }; diff --git a/include/dbscan/grid.h b/include/dbscan/grid.h index cdd2884..c221738 100644 --- a/include/dbscan/grid.h +++ b/include/dbscan/grid.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include "cell.h" #include "point.h" @@ -77,7 +78,7 @@ struct grid { tableT* table=NULL; treeT* tree=NULL; intT totalPoints; - cellBuf **nbrCache; + std::atomic* nbrCache; std::mutex* cacheLocks; /** @@ -90,11 +91,11 @@ struct grid { r(rr), pMin(pMinn), cellCapacity(cellMax), totalPoints(0) { cells = newA(cellT, cellCapacity); - nbrCache = newA(cellBuf*, cellCapacity); + nbrCache = new std::atomic[cellCapacity]; cacheLocks = (std::mutex*) malloc(cellCapacity * sizeof(std::mutex)); parallel_for(0, cellCapacity, [&](intT i) { new (&cacheLocks[i]) std::mutex(); - nbrCache[i] = NULL; + nbrCache[i].store(nullptr, std::memory_order_relaxed); cells[i].init(); }); numCells = 0; @@ -107,9 +108,10 @@ struct grid { free(cells); free(cacheLocks); parallel_for(0, cellCapacity, [&](intT i) { - if(nbrCache[i]) delete nbrCache[i]; + auto cached = nbrCache[i].load(std::memory_order_relaxed); + if(cached) delete cached; }); - free(nbrCache); + delete[] nbrCache; if(myHash) delete myHash; if(table) { table->del(); @@ -147,22 +149,24 @@ struct grid { } return false;};//todo, optimize int idx = bait - cells; - if (nbrCache[idx]) { - auto accum = nbrCache[idx]; - for (auto accum_i : *accum) { + // Acquire ensures vector contents are visible if pointer is non-null + auto cached = nbrCache[idx].load(std::memory_order_acquire); + if (cached) { + for (auto accum_i : *cached) { if(fWrap(accum_i)) break; } } else { - // wait for other threads to do their thing then try again std::lock_guard lock(cacheLocks[idx]); - if (nbrCache[idx]) { - auto accum = nbrCache[idx]; - for (auto accum_i : *accum) { + cached = nbrCache[idx].load(std::memory_order_relaxed); + if (cached) { + for (auto accum_i : *cached) { if (fWrap(accum_i)) break; } } else { floatT hop = sqrt(dim + 3) * 1.0000001; - nbrCache[idx] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[idx]); + auto result = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, (cellBuf*)nullptr); + // Release ensures vector contents are fully written before pointer is visible + nbrCache[idx].store(result, std::memory_order_release); } } } @@ -176,22 +180,22 @@ struct grid { return false; }; int idx = bait - cells; - if (nbrCache[idx]) { - auto accum = nbrCache[idx]; - for (auto accum_i : *accum) { + auto cached = nbrCache[idx].load(std::memory_order_acquire); + if (cached) { + for (auto accum_i : *cached) { if (fWrap(accum_i)) break; } } else { - // wait for other threads to do their thing then try again std::lock_guard lock(cacheLocks[idx]); - if (nbrCache[idx]) { - auto accum = nbrCache[idx]; - for (auto accum_i : *accum) { + cached = nbrCache[idx].load(std::memory_order_relaxed); + if (cached) { + for (auto accum_i : *cached) { if (fWrap(accum_i)) break; } } else { floatT hop = sqrt(dim + 3) * 1.0000001; - nbrCache[bait-cells] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[idx]); + auto result = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, (cellBuf*)nullptr); + nbrCache[idx].store(result, std::memory_order_release); } } } diff --git a/include/dbscan/pbbs/ndHash.h b/include/dbscan/pbbs/ndHash.h index 764df6c..ea128b1 100644 --- a/include/dbscan/pbbs/ndHash.h +++ b/include/dbscan/pbbs/ndHash.h @@ -404,8 +404,12 @@ struct hashSimplePair { int cmp(intT v, intT b) {return (v > b) ? 1 : ((v == b) ? 0 : -1);} bool replaceQ(eType s, eType s2) {return 0;}//return s.second > s2.second;} bool cas(eType* p, eType o, eType n) { +#ifdef _MSC_VER return std::atomic_compare_exchange_strong_explicit( reinterpret_cast*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire); +#else + return __atomic_compare_exchange(p, &o, &n, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); +#endif } }; diff --git a/include/dbscan/pbbs/scheduler.h b/include/dbscan/pbbs/scheduler.h index 43af8fc..50de602 100644 --- a/include/dbscan/pbbs/scheduler.h +++ b/include/dbscan/pbbs/scheduler.h @@ -74,6 +74,9 @@ struct Deque { tag_t tag; qidx top; }; + // Catches layout issues at compile time on platforms where age_t is not exactly 8 bytes, + // which would break the lock-free atomic CAS in the work-stealing deque. + static_assert(sizeof(age_t) == sizeof(int64_t), "age_t must be 8 bytes for atomic CAS"); // align to avoid false sharing struct alignas(64) padded_job { diff --git a/include/dbscan/pbbs/utils.h b/include/dbscan/pbbs/utils.h index 565bd5c..bd96222 100644 --- a/include/dbscan/pbbs/utils.h +++ b/include/dbscan/pbbs/utils.h @@ -291,14 +291,20 @@ template template bool myCAS(eType* p, eType o, eType n) { +// Use compiler intrinsic to avoid undefined behaviour from reinterpret_cast to std::atomic* +// (ARM requires proper atomic instructions for CAS; the cast is not portable) +#ifdef _MSC_VER return std::atomic_compare_exchange_strong_explicit( reinterpret_cast*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire); +#else + return __atomic_compare_exchange(p, &o, &n, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); +#endif } template inline bool writeMin(ET *a, ET b) { ET c; bool r=0; - do c = *a; + do c = *a; // while (c > b && !(r=CAS_GCC(a,c,b))); while (c > b &&!(r=myCAS(a,c,b))); return r; @@ -312,4 +318,4 @@ struct addF { E operator() (const E& a, const E& b) const {return a+b;}}; } -#endif \ No newline at end of file +#endif