diff --git a/include/dbscan/algo.h b/include/dbscan/algo.h index 693e736..56f73e4 100644 --- a/include/dbscan/algo.h +++ b/include/dbscan/algo.h @@ -57,7 +57,7 @@ int DBSCAN(intT n, floatT* PF, double epsilon, intT minPts, bool* coreFlagOut, i intT count = 0; auto isCore = [&] (pointT *p) { if(count >= minPts) return true; - if(p->distSqr(P[i]) <= epsSqr) {//todo sqrt opt + if(p->distSqr(P[i]) <= epsSqr) { count ++;} return false;}; G->nghPointMap(P[i].coordinate(), isCore); @@ -165,67 +165,30 @@ int DBSCAN(intT n, floatT* PF, double epsilon, intT minPts, bool* coreFlagOut, i delete G; //improving cluster representation - auto cluster2 = newA(intT, n); - auto flag = newA(intT, n+1); - parallel_for(0, n, [&](intT i){cluster2[i] = cluster[i];}); - sampleSort(cluster, n, std::less()); - - flag[0] = 1; - parallel_for(1, n, [&](intT i){ - if (cluster[i] != cluster[i-1]) - flag[i] = 1; - else - flag[i] = 0; - }); - flag[n] = sequence::prefixSum(flag, 0, n); - - // typedef pair eType; - struct myPair { - intT first; - intT second; - myPair(intT _first, intT _second): first(_first), second(_second) {} - myPair(): first(-1), second(-1) {} - inline bool operator==(myPair a) { - if(a.first==first && a.second== second) - return true; - else - return false; - } - }; - - typedef Table,intT> tableT; - auto T = new tableT(n, hashSimplePair()); - parallel_for(0, n, [&] (intT i) { - if (flag[i] != flag[i+1]) { - // T->insert(make_pair(cluster[i], flag[i])); - T->insert(myPair(cluster[i], flag[i])); - } - }); + // O(n) renumbering: cluster IDs are point indices ∈ [0,n), use prefix sum + // instead of O(n log n) sort + hash table. + auto idMap = newA(intT, n); + parallel_for(0, n, [&](intT i) { idMap[i] = 0; }); + parallel_for(0, n, [&](intT i) { + if (cluster[i] >= 0) idMap[cluster[i]] = 1; + }); + sequence::prefixSum(idMap, 0, n); // exclusive: idMap[cid] = sequential ID for cid - if(T->find(-1).second < 0) { - parallel_for(0, n, [&](intT i){ - cluster2[i] = T->find(cluster2[i]).second; - }); - } else { - parallel_for(0, n, [&](intT i){ - if (cluster2[i] > 0) - cluster2[i] = T->find(cluster2[i]).second-1; - }); - } + // Remap to sequential IDs (separate buffer to avoid read/write conflict) + auto cluster2 = newA(intT, n); + parallel_for(0, n, [&](intT i) { + cluster2[i] = (cluster[i] >= 0) ? idMap[cluster[i]] : cluster[i]; + }); //restoring order - parallel_for(0, n, [&](intT i){ - cluster[I[i]] = cluster2[i]; - }); - parallel_for(0, n, [&](intT i){ - coreFlagOut[I[i]] = coreFlag[i]; - }); + parallel_for(0, n, [&](intT i) { + cluster[I[i]] = cluster2[i]; + coreFlagOut[I[i]] = coreFlag[i]; + }); free(I); free(cluster2); - free(flag); - T->del(); // Required to clean-up T's internals - delete T; + free(idMap); free(P); #ifdef VERBOSE cout << "output-time = " << tt.stop() << endl; diff --git a/include/dbscan/coreBccp.h b/include/dbscan/coreBccp.h index f163183..3f7bd65 100644 --- a/include/dbscan/coreBccp.h +++ b/include/dbscan/coreBccp.h @@ -29,9 +29,10 @@ #include "pbbs/parallel.h" #include "pbbs/utils.h" +// r holds squared distance; using distSqr and nodeDistanceSqr avoids sqrt in hot path template inline void compBcpCoreHSerial(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, objT* P) { - if (n1->nodeDistance(n2) > *r) return; + if (n1->nodeDistanceSqr(n2) > *r) return; if (n1->isLeaf() && n2->isLeaf()) {//basecase for (intT i=0; isize(); ++i) { @@ -39,7 +40,7 @@ inline void compBcpCoreHSerial(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, auto pi = n1->getItem(i); auto pj = n2->getItem(j); if (coreFlag[pi - P] && coreFlag[pj - P]) { - floatT dist = pi->dist(*pj); + floatT dist = pi->distSqr(*pj); r[0] = min(r[0], dist); } } @@ -78,7 +79,7 @@ inline void compBcpCoreHSerial(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, template inline void compBcpCoreHBase(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, objT* P) { - if (n1->nodeDistance(n2) > *r) return; + if (n1->nodeDistanceSqr(n2) > *r) return; if (n1->isLeaf() && n2->isLeaf()) {//basecase for (intT i=0; isize(); ++i) { @@ -86,14 +87,15 @@ inline void compBcpCoreHBase(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, ob auto pi = n1->getItem(i); auto pj = n2->getItem(j); if (coreFlag[pi - P] && coreFlag[pj - P]) { - floatT dist = pi->dist(*pj); + floatT dist = pi->distSqr(*pj); utils::writeMin(r, dist); } } } - } else {//recursive, todo consider call order, might help + } else {//recursive if (n1->isLeaf()) { - if (n1->nodeDistance(n2->L()) < n1->nodeDistance(n2->R())) { + // nodeDistanceSqr avoids sqrt; monotonicity preserves ordering + if (n1->nodeDistanceSqr(n2->L()) < n1->nodeDistanceSqr(n2->R())) { compBcpCoreH(n1, n2->L(), r, coreFlag, P); compBcpCoreH(n1, n2->R(), r, coreFlag, P); } else { @@ -101,7 +103,7 @@ inline void compBcpCoreHBase(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, ob compBcpCoreH(n1, n2->L(), r, coreFlag, P); } } else if (n2->isLeaf()) { - if (n2->nodeDistance(n1->L()) < n2->nodeDistance(n1->R())) { + if (n2->nodeDistanceSqr(n1->L()) < n2->nodeDistanceSqr(n1->R())) { compBcpCoreH(n2, n1->L(), r, coreFlag, P); compBcpCoreH(n2, n1->R(), r, coreFlag, P); } else { @@ -115,7 +117,7 @@ inline void compBcpCoreHBase(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, ob ordering[2] = make_pair(n2->L(), n1->R()); ordering[3] = make_pair(n2->R(), n1->R()); auto bbd = [&](pair p1, pair p2) { - return p1.first->nodeDistance(p1.second) < p2.first->nodeDistance(p2.second);}; + return p1.first->nodeDistanceSqr(p1.second) < p2.first->nodeDistanceSqr(p2.second);}; quickSortSerial(ordering, 4, bbd); for (intT o=0; o<4; ++o) { compBcpCoreH(ordering[o].first, ordering[o].second, r, coreFlag, P);} @@ -125,13 +127,13 @@ inline void compBcpCoreHBase(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, ob template inline void compBcpCoreH(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, objT* P) { - if (n1->nodeDistance(n2) > *r) return; + if (n1->nodeDistanceSqr(n2) > *r) return; if ((n1->isLeaf() && n2->isLeaf()) || (n1->size()+n2->size() < 2000)) { return compBcpCoreHBase(n1, n2, r, coreFlag, P); - } else {//recursive, todo consider call order, might help + } else {//recursive if (n1->isLeaf()) { - if (n1->nodeDistance(n2->L()) < n1->nodeDistance(n2->R())) { + if (n1->nodeDistanceSqr(n2->L()) < n1->nodeDistanceSqr(n2->R())) { par_do([&](){compBcpCoreH(n1, n2->L(), r, coreFlag, P);}, [&](){compBcpCoreH(n1, n2->R(), r, coreFlag, P);}); } else { @@ -139,7 +141,7 @@ inline void compBcpCoreH(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, objT* [&](){compBcpCoreH(n1, n2->L(), r, coreFlag, P);}); } } else if (n2->isLeaf()) { - if (n2->nodeDistance(n1->L()) < n2->nodeDistance(n1->R())) { + if (n2->nodeDistanceSqr(n1->L()) < n2->nodeDistanceSqr(n1->R())) { par_do([&](){compBcpCoreH(n2, n1->L(), r, coreFlag, P);}, [&](){compBcpCoreH(n2, n1->R(), r, coreFlag, P);}); } else { @@ -153,7 +155,7 @@ inline void compBcpCoreH(nodeT* n1, nodeT* n2, floatT* r, intT* coreFlag, objT* ordering[2] = make_pair(n2->L(), n1->R()); ordering[3] = make_pair(n2->R(), n1->R()); auto bbd = [&](pair p1, pair p2) { - return p1.first->nodeDistance(p1.second) < p2.first->nodeDistance(p2.second);}; + return p1.first->nodeDistanceSqr(p1.second) < p2.first->nodeDistanceSqr(p2.second);}; quickSortSerial(ordering, 4, bbd); parallel_for (0, 4, [&](intT o) { compBcpCoreH(ordering[o].first, ordering[o].second, r, coreFlag, P);}, 1); @@ -179,11 +181,11 @@ inline bool hasEdge(intT n1, intT n2, intT* coreFlag, objT* P, floatT epsilon, c if (!trees[n1]) trees[n1] = new treeT(cells[n1].getItem(), cells[n1].size(), false);//todo allocation, parallel - if (!trees[n2]) + if (!trees[n2]) trees[n2] = new treeT(cells[n2].getItem(), cells[n2].size(), false);//todo allocation, parallel floatT r = floatMax(); compBcpCoreH(trees[n1]->rootNode(), trees[n2]->rootNode(), &r, coreFlag, P); - return r <= epsilon; + return r <= epsilon * epsilon; // r holds squared distance now } #endif diff --git a/include/dbscan/grid.h b/include/dbscan/grid.h index cdd2884..9b18c3a 100644 --- a/include/dbscan/grid.h +++ b/include/dbscan/grid.h @@ -92,21 +92,19 @@ struct grid { cells = newA(cellT, cellCapacity); nbrCache = newA(cellBuf*, cellCapacity); cacheLocks = (std::mutex*) malloc(cellCapacity * sizeof(std::mutex)); - parallel_for(0, cellCapacity, [&](intT i) { - new (&cacheLocks[i]) std::mutex(); - nbrCache[i] = NULL; - cells[i].init(); - }); numCells = 0; myHash = new cellHashT(pMinn, r); - table = new tableT(cellMax*2, cellHash(myHash));//todo load + // Hash table sized for expected number of cells, with safety rebuild + // in insertParallel if numCells exceeds the estimate. + intT tableHint = std::max((intT)2048, cellMax / 4); + table = new tableT(tableHint, cellHash(myHash)); } ~grid() { free(cells); free(cacheLocks); - parallel_for(0, cellCapacity, [&](intT i) { + parallel_for(0, numCells, [&](intT i) { if(nbrCache[i]) delete nbrCache[i]; }); free(nbrCache); @@ -205,9 +203,24 @@ struct grid { freeFlag=true;} parallel_for(0, nn, [&](intT i){I[i] = i;}); + + // Pre-compute integer cell coordinates to avoid floor() in every sort comparison + auto cellKeys = newA(intT, nn * dim); + floatT invR = 1.0 / r; + parallel_for(0, nn, [&](intT i) { + for (int d = 0; d < dim; d++) { + cellKeys[i * dim + d] = (intT)floor((P[i][d] - pMin[d]) * invR); + } + }); auto ipLess = [&] (intT a, intT b) { - return pointGridCmp(P[a], P[b], pMin, r);}; + for (int d = 0; d < dim; d++) { + intT ca = cellKeys[a * dim + d]; + intT cb = cellKeys[b * dim + d]; + if (ca != cb) return ca < cb; + } + return false;}; sampleSort(I, nn, ipLess); + free(cellKeys); parallel_for(0, nn, [&](intT i){PP[i] = P[I[i]];}); flag[0] = 1; @@ -224,6 +237,19 @@ struct grid { if (numCells > cellCapacity) { cout << "error, grid insert exceeded cell capacity, abort()" << endl;abort();} + // Rebuild hash table if initial estimate was too small + if (numCells > cellCapacity / 8) { + table->del(); delete table; + table = new tableT(numCells * 2, cellHash(myHash)); + } + + // Initialize only the cells that will actually be used + parallel_for(0, numCells, [&](intT i) { + new (&cacheLocks[i]) std::mutex(); + nbrCache[i] = NULL; + cells[i].init(); + }); + parallel_for(0, nn, [&](intT i) { if (flag[i] != flag[i+1]) { auto c = &cells[flag[i]]; diff --git a/include/dbscan/kdNode.h b/include/dbscan/kdNode.h index 0dea9d5..cdf35fa 100644 --- a/include/dbscan/kdNode.h +++ b/include/dbscan/kdNode.h @@ -47,11 +47,10 @@ class kdNode { }} inline void boundingBoxParallel() { - // intT P = getWorkers()*8; - static const intT P = 36 * 8; + intT P = getWorkers() * 8; intT blockSize = (n+P-1)/P; - pointT localMin[P]; - pointT localMax[P]; + auto localMin = newA(pointT, P); + auto localMax = newA(pointT, P); for (intT i=0; icoordinate()); localMax[i] = pointT(items[0]->coordinate());} @@ -68,6 +67,8 @@ class kdNode { for(intT p=0; ppMax[d]; + floatT gapB = n2->pMin[d] - pMax[d]; + floatT gap = gapA > gapB ? gapA : gapB; + gap = gap > 0 ? gap : 0; + rsqr += gap * gap; + } + return rsqr; + } + //return the far bb distance between n1 and n2 inline floatT nodeFarDistance(nodeT* n2) { floatT result = 0; @@ -296,44 +311,45 @@ class kdNode { } //vecT need to be vector + // rSqr is the squared query radius; use distSqr to avoid sqrt per point template - void rangeNeighbor(pointT queryPt, floatT r, pointT pMin1, pointT pMax1, vecT* accum) { + void rangeNeighbor(pointT queryPt, floatT rSqr, pointT pMin1, pointT pMax1, vecT* accum) { int relation = boxCompare(pMin1, pMax1, pMin, pMax); if (relation == boxInclude) { for(intT i=0; igetCoordObj()->dist(queryPt) <= r) + if (items[i]->getCoordObj()->distSqr(queryPt) <= rSqr) accum->push_back(items[i]); } } else if (relation == boxOverlap) { if (isLeaf()) { for(intT i=0; igetCoordObj()->dist(queryPt) <= r && + if (items[i]->getCoordObj()->distSqr(queryPt) <= rSqr && itemInBox(pMin1, pMax1, items[i])) accum->push_back(items[i]); } } else { - left->rangeNeighbor(queryPt, r, pMin1, pMax1, accum); - right->rangeNeighbor(queryPt, r, pMin1, pMax1, accum);} + left->rangeNeighbor(queryPt, rSqr, pMin1, pMax1, accum); + right->rangeNeighbor(queryPt, rSqr, pMin1, pMax1, accum);} } } template - void rangeNeighbor(pointT queryPt, floatT r, pointT pMin1, pointT pMax1, func term, func2 doTerm) { + void rangeNeighbor(pointT queryPt, floatT rSqr, pointT pMin1, pointT pMax1, func term, func2 doTerm) { if (term()) return; int relation = boxCompare(pMin1, pMax1, pMin, pMax); if (relation == boxInclude) { for(intT i=0; igetCoordObj()->dist(queryPt) <= r && + if (items[i]->getCoordObj()->distSqr(queryPt) <= rSqr && doTerm(items[i])) break; } } else if (relation == boxOverlap) { if (isLeaf()) { for(intT i=0; igetCoordObj()->dist(queryPt) <= r && + if (items[i]->getCoordObj()->distSqr(queryPt) <= rSqr && doTerm(items[i])) break; } } else { - left->rangeNeighbor(queryPt, r, pMin1, pMax1, term, doTerm); - right->rangeNeighbor(queryPt, r, pMin1, pMax1, term, doTerm);} + left->rangeNeighbor(queryPt, rSqr, pMin1, pMax1, term, doTerm); + right->rangeNeighbor(queryPt, rSqr, pMin1, pMax1, term, doTerm);} } } diff --git a/include/dbscan/kdTree.h b/include/dbscan/kdTree.h index 2909e70..78e2d9f 100644 --- a/include/dbscan/kdTree.h +++ b/include/dbscan/kdTree.h @@ -84,15 +84,16 @@ class kdTree { queryPt.updateX(i, center[i]); pMin1.updateX(i, center[i]-r); pMax1.updateX(i, center[i]+r);} + floatT rSqr = r * r; if(cache) { if(!accum) accum = new vecT(); - root->rangeNeighbor(queryPt, r, pMin1, pMax1, accum); + root->rangeNeighbor(queryPt, rSqr, pMin1, pMax1, accum); for (auto accum_i : *accum) { if(doTerm(accum_i)) break; } return accum; } else { - root->rangeNeighbor(queryPt, r, pMin1, pMax1, term, doTerm); + root->rangeNeighbor(queryPt, rSqr, pMin1, pMax1, term, doTerm); return NULL; } } diff --git a/include/dbscan/pbbs/scheduler.h b/include/dbscan/pbbs/scheduler.h index 43af8fc..25d7483 100644 --- a/include/dbscan/pbbs/scheduler.h +++ b/include/dbscan/pbbs/scheduler.h @@ -49,8 +49,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -159,7 +161,8 @@ struct scheduler { deques(num_deques), attempts(num_deques), spawned_threads(), - finished_flag(false) { + finished_flag(false), + num_sleeping(0) { // Stopping condition auto finished = [this]() { return finished_flag.load(std::memory_order_relaxed); @@ -177,29 +180,43 @@ struct scheduler { ~scheduler() { finished_flag.store(true, std::memory_order_relaxed); + // Wake any sleeping threads so they see the finished flag + wake_sleeping(); for (unsigned int i = 1; i < num_threads; i++) { spawned_threads[i - 1].join(); } } - // Push onto local stack. + // Push onto local stack. Wakes sleeping threads only if any exist, + // so this is zero-cost during active computation. void spawn(Job* job) { int id = worker_id(); deques[id].push_bottom(job); + if (num_sleeping.load(std::memory_order_relaxed) > 0) + wake_sleeping(); } // Wait for condition: finished(). + // Does NOT enter Phase 3 (CV sleep) — instead spins/yields while + // opportunistically executing other stolen work. This prevents deadlock + // where a waiting thread sleeps on the CV but nobody notifies it. template void wait(F finished, bool conservative = false) { - // Conservative avoids deadlock if scheduler is used in conjunction - // with user locks enclosing a wait. if (conservative) { while (!finished()) std::this_thread::yield(); + } else { + while (!finished()) { + Job* job = try_pop(); + if (!job) { + size_t id = worker_id(); + job = try_steal(id); + } + if (job) + (*job)(); + else + std::this_thread::yield(); + } } - // If not conservative, schedule within the wait. - // Can deadlock if a stolen job uses same lock as encloses the wait. - else - start(finished); } // All scheduler threads quit after this is called. @@ -247,6 +264,23 @@ struct scheduler { std::vector attempts; std::vector spawned_threads; std::atomic finished_flag; + // Sleep/wake mechanism: threads entering Phase 3 sleep on the CV. + // spawn() only signals when num_sleeping > 0, making it zero-cost + // during active computation. wake_gen is bumped to break the CV wait. + std::mutex sleep_mutex; + std::condition_variable sleep_cv; + std::atomic num_sleeping; + std::atomic wake_gen{0}; + + void wake_sleeping() { + // Must lock to prevent race: a thread could be between incrementing + // num_sleeping and entering wait(), missing our notify. + { + std::lock_guard lk(sleep_mutex); + wake_gen.fetch_add(1, std::memory_order_release); + } + sleep_cv.notify_all(); + } // Start an individual scheduler task. Runs until finished(). template @@ -266,6 +300,8 @@ struct scheduler { } // Find a job, first trying local stack, then random steals. + // Uses progressive backoff: spin aggressively at first (for latency when + // work is about to appear), then yield, then sleep with increasing duration. template Job* get_job(F finished) { if (finished()) return nullptr; @@ -273,14 +309,36 @@ struct scheduler { if (job) return job; size_t id = worker_id(); while (true) { - // By coupon collector's problem, this should touch all. + // Phase 1: aggressive spin — covers all deques several times for (int i = 0; i <= num_deques * 100; i++) { if (finished()) return nullptr; job = try_steal(id); if (job) return job; } - // If haven't found anything, take a breather. - std::this_thread::sleep_for(std::chrono::nanoseconds(num_deques * 100)); + // Phase 2: yield + short spins (transition period) + for (int round = 0; round < 40; round++) { + std::this_thread::yield(); + for (int i = 0; i <= num_deques * 20; i++) { + if (finished()) return nullptr; + job = try_steal(id); + if (job) return job; + } + } + // Phase 3: CV sleep (truly idle — no work found after phases 1+2). + // Uses CV with short timeout so threads wake for both new work + // (notified by spawn()) and job completions (checked via finished()). + { + std::unique_lock lk(sleep_mutex); + auto gen_before = wake_gen.load(std::memory_order_acquire); + num_sleeping.fetch_add(1, std::memory_order_release); + sleep_cv.wait(lk, [&]() { + return finished() || + wake_gen.load(std::memory_order_acquire) != gen_before; + }); + num_sleeping.fetch_sub(1, std::memory_order_release); + } + if (finished()) return nullptr; + // After wake, loop back to Phase 1 (aggressive steal) } } @@ -336,33 +394,21 @@ class fork_join_scheduler { #pragma warning(disable: 4267) // conversion from 'size_t' to *, possible loss of data #endif - template - size_t get_granularity(size_t start, size_t end, F f) { - size_t done = 0; - size_t sz = 1; - int ticks = 0; - do { - sz = std::min(sz, end - (start + done)); - auto tstart = std::chrono::high_resolution_clock::now(); - for (size_t i = 0; i < sz; i++) f(start + done + i); - auto tstop = std::chrono::high_resolution_clock::now(); - ticks = static_cast((tstop - tstart).count()); - done += sz; - sz *= 2; - } while (ticks < 1000 && done < (end - start)); - return done; - } - template void parfor(size_t start, size_t end, F f, size_t granularity = 0, bool conservative = false) { if (end <= start) return; + size_t n = end - start; if (granularity == 0) { - size_t done = get_granularity(start, end, f); - granularity = std::max(done, (end - start) / (128 * sched->num_threads)); - parfor_(start + done, end, f, granularity, conservative); - } else - parfor_(start, end, f, granularity, conservative); + // Aim for ~4 chunks per thread to balance load without excessive splitting. + granularity = std::max(1, n / (4 * sched->num_threads)); + } + // Sequential fast-path: skip task machinery for small ranges + if (n <= granularity) { + for (size_t i = start; i < end; i++) f(i); + return; + } + parfor_(start, end, f, granularity, conservative); } private: diff --git a/include/dbscan/shared.h b/include/dbscan/shared.h index 737437c..69450f4 100644 --- a/include/dbscan/shared.h +++ b/include/dbscan/shared.h @@ -126,10 +126,9 @@ point pMinSerial(point* items, intT n) { template point pMinParallel(point* items, intT n) { point pMin = point(items[0].x); - // intT P = getWorkers()*8; - static const intT P = 36 * 8; + intT P = getWorkers() * 8; intT blockSize = (n+P-1)/P; - point localMin[P]; + auto localMin = newA(point, P); for (intT i=0; i(items[0].x);} parallel_for(0, P, [&](intT p) { @@ -141,5 +140,6 @@ point pMinParallel(point* items, intT n) { pMin = point(items[0].x); for(intT p=0; p