Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/dbscan/cell.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ struct cellHash {
bool replaceQ(eType c1, eType c2) {return 0;}

bool cas(eType* p, eType o, eType n) {
#ifdef _MSC_VER
return std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire);
#else
return __atomic_compare_exchange(p, &o, &n, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
#endif
}
};
46 changes: 25 additions & 21 deletions include/dbscan/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#pragma once

#include <atomic>
#include <mutex>
#include "cell.h"
#include "point.h"
Expand Down Expand Up @@ -77,7 +78,7 @@ struct grid {
tableT* table=NULL;
treeT* tree=NULL;
intT totalPoints;
cellBuf **nbrCache;
std::atomic<cellBuf*>* nbrCache;
std::mutex* cacheLocks;

/**
Expand All @@ -90,11 +91,11 @@ struct grid {
r(rr), pMin(pMinn), cellCapacity(cellMax), totalPoints(0) {

cells = newA(cellT, cellCapacity);
nbrCache = newA(cellBuf*, cellCapacity);
nbrCache = new std::atomic<cellBuf*>[cellCapacity];
cacheLocks = (std::mutex*) malloc(cellCapacity * sizeof(std::mutex));
parallel_for(0, cellCapacity, [&](intT i) {
new (&cacheLocks[i]) std::mutex();
nbrCache[i] = NULL;
nbrCache[i].store(nullptr, std::memory_order_relaxed);
cells[i].init();
});
numCells = 0;
Expand All @@ -107,9 +108,10 @@ struct grid {
free(cells);
free(cacheLocks);
parallel_for(0, cellCapacity, [&](intT i) {
if(nbrCache[i]) delete nbrCache[i];
auto cached = nbrCache[i].load(std::memory_order_relaxed);
if(cached) delete cached;
});
free(nbrCache);
delete[] nbrCache;
if(myHash) delete myHash;
if(table) {
table->del();
Expand Down Expand Up @@ -147,22 +149,24 @@ struct grid {
}
return false;};//todo, optimize
int idx = bait - cells;
if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
// Acquire ensures vector contents are visible if pointer is non-null
auto cached = nbrCache[idx].load(std::memory_order_acquire);
if (cached) {
for (auto accum_i : *cached) {
if(fWrap(accum_i)) break;
}
} else {
// wait for other threads to do their thing then try again
std::lock_guard<std::mutex> lock(cacheLocks[idx]);
if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
cached = nbrCache[idx].load(std::memory_order_relaxed);
if (cached) {
for (auto accum_i : *cached) {
if (fWrap(accum_i)) break;
}
} else {
floatT hop = sqrt(dim + 3) * 1.0000001;
nbrCache[idx] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[idx]);
auto result = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, (cellBuf*)nullptr);
// Release ensures vector contents are fully written before pointer is visible
nbrCache[idx].store(result, std::memory_order_release);
}
}
}
Expand All @@ -176,22 +180,22 @@ struct grid {
return false;
};
int idx = bait - cells;
if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
auto cached = nbrCache[idx].load(std::memory_order_acquire);
if (cached) {
for (auto accum_i : *cached) {
if (fWrap(accum_i)) break;
}
} else {
// wait for other threads to do their thing then try again
std::lock_guard<std::mutex> lock(cacheLocks[idx]);
if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
cached = nbrCache[idx].load(std::memory_order_relaxed);
if (cached) {
for (auto accum_i : *cached) {
if (fWrap(accum_i)) break;
}
} else {
floatT hop = sqrt(dim + 3) * 1.0000001;
nbrCache[bait-cells] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[idx]);
auto result = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, (cellBuf*)nullptr);
nbrCache[idx].store(result, std::memory_order_release);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions include/dbscan/pbbs/ndHash.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,12 @@ struct hashSimplePair {
int cmp(intT v, intT b) {return (v > b) ? 1 : ((v == b) ? 0 : -1);}
bool replaceQ(eType s, eType s2) {return 0;}//return s.second > s2.second;}
bool cas(eType* p, eType o, eType n) {
#ifdef _MSC_VER
return std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire);
#else
return __atomic_compare_exchange(p, &o, &n, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
#endif
}
};

Expand Down
3 changes: 3 additions & 0 deletions include/dbscan/pbbs/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ struct Deque {
tag_t tag;
qidx top;
};
// Catches layout issues at compile time on platforms where age_t is not exactly 8 bytes,
// which would break the lock-free atomic CAS in the work-stealing deque.
static_assert(sizeof(age_t) == sizeof(int64_t), "age_t must be 8 bytes for atomic CAS");

// align to avoid false sharing
struct alignas(64) padded_job {
Expand Down
10 changes: 8 additions & 2 deletions include/dbscan/pbbs/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,20 @@ template <class E1, class E2>

template<typename eType>
bool myCAS(eType* p, eType o, eType n) {
// Use compiler intrinsic to avoid undefined behaviour from reinterpret_cast to std::atomic*
// (ARM requires proper atomic instructions for CAS; the cast is not portable)
#ifdef _MSC_VER
return std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire);
#else
return __atomic_compare_exchange(p, &o, &n, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
#endif
}

template <class ET>
inline bool writeMin(ET *a, ET b) {
ET c; bool r=0;
do c = *a;
do c = *a;
// while (c > b && !(r=CAS_GCC(a,c,b)));
while (c > b &&!(r=myCAS(a,c,b)));
return r;
Expand All @@ -312,4 +318,4 @@ struct addF { E operator() (const E& a, const E& b) const {return a+b;}};

}

#endif
#endif
Loading