Skip to content

Commit a0ca6fe

Browse files
committed
more optimizations and simplifications
1 parent ae1bb25 commit a0ca6fe

File tree

7 files changed

+66
-25
lines changed

7 files changed

+66
-25
lines changed

include/driver_configuration.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ class DriverConfiguration {
1717
// Where to place on-disk datastructures
1818
std::string _disk_dir = ".";
1919

20-
// The number of worker threads
20+
// The number of worker threads. These perform the algorithm updates.
2121
size_t _num_worker_threads = 1;
2222

23+
// The number of threads that read from the stream
24+
size_t _num_stream_threads = 1;
25+
2326
// Configuration for the guttering system
2427
GutteringConfiguration _gutter_conf;
2528

@@ -29,13 +32,15 @@ class DriverConfiguration {
2932
// setters
3033
DriverConfiguration& gutter_sys(GutterSystem gutter_sys);
3134
DriverConfiguration& disk_dir(std::string disk_dir);
32-
DriverConfiguration& worker_threads(size_t num_groups);
35+
DriverConfiguration& worker_threads(size_t num_threads);
36+
DriverConfiguration& stream_threads(size_t num_threads);
3337
GutteringConfiguration& gutter_conf();
3438

3539
// getters
3640
GutterSystem get_gutter_sys() { return _gutter_sys; }
3741
std::string get_disk_dir() { return _disk_dir; }
3842
size_t get_worker_threads() { return _num_worker_threads; }
43+
size_t get_stream_threads() { return _num_stream_threads; }
3944

4045
friend std::ostream& operator<< (std::ostream &out, const DriverConfiguration &conf);
4146

include/graph_sketch_driver.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,10 @@ class GraphSketchDriver {
8585

8686
std::atomic<size_t> total_updates;
8787
public:
88-
GraphSketchDriver(Alg *sketching_alg, GraphStream *stream, DriverConfiguration config,
89-
size_t num_stream_threads = 1)
90-
: sketching_alg(sketching_alg), stream(stream), num_stream_threads(num_stream_threads) {
88+
GraphSketchDriver(Alg *sketching_alg, GraphStream *stream, DriverConfiguration config)
89+
: sketching_alg(sketching_alg),
90+
stream(stream),
91+
num_stream_threads(config.get_stream_threads()) {
9192
sketching_alg->allocate_worker_memory(config.get_worker_threads());
9293
// set the leaf size of the guttering system appropriately
9394
if (config.gutter_conf().get_gutter_bytes() == GutteringConfiguration::uninit_param) {

include/sparse_sketch.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ class SparseSketch {
5656
// Allocated buckets
5757
Bucket* buckets;
5858

59+
// number of dense buckets to sample from when sparse region is populated
60+
static constexpr size_t num_dense_to_sample = 2;
61+
62+
// minimum number of dense rows in a sketch no matter how sparse the vector is.
5963
static constexpr size_t min_num_dense_rows = 6;
6064
size_t num_dense_rows = min_num_dense_rows;
6165

@@ -131,7 +135,7 @@ class SparseSketch {
131135
}
132136

133137
inline size_t position_func(size_t col, size_t row, size_t num_rows) const {
134-
return col * num_rows + row + 1;
138+
return col * num_rows + row + 1; // column-major
135139
}
136140

137141
// return the bucket at a particular index in bucket array

src/driver_configuration.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,22 @@ DriverConfiguration& DriverConfiguration::disk_dir(std::string disk_dir) {
1212
return *this;
1313
}
1414

15-
DriverConfiguration& DriverConfiguration::worker_threads(size_t num_worker_threads) {
16-
_num_worker_threads = num_worker_threads;
17-
if (_num_worker_threads < 1) {
18-
std::cout << "num_worker_threads="<< _num_worker_threads << " is out of bounds. [1, infty)"
15+
DriverConfiguration& DriverConfiguration::worker_threads(size_t num_threads) {
16+
if (num_threads < 1) {
17+
std::cout << "num_worker_threads = "<< num_threads << " is out of bounds. [1, infty)"
1918
<< "Defaulting to 1." << std::endl;
20-
_num_worker_threads = 1;
19+
} else {
20+
_num_worker_threads = num_threads;
21+
}
22+
return *this;
23+
}
24+
25+
DriverConfiguration& DriverConfiguration::stream_threads(size_t num_threads) {
26+
if (num_threads < 1) {
27+
std::cout << "num_stream_threads = "<< num_threads << " is out of bounds. [1, infty)"
28+
<< "Defaulting to 1." << std::endl;
29+
} else {
30+
_num_stream_threads = num_threads;
2131
}
2232
return *this;
2333
}
@@ -35,6 +45,7 @@ std::ostream& operator<< (std::ostream &out, const DriverConfiguration &conf) {
3545
gutter_system = "CacheTree";
3646
out << " Guttering system = " << gutter_system << std::endl;
3747
out << " Worker thread count = " << conf._num_worker_threads << std::endl;
48+
out << " Stream thread count = " << conf._num_stream_threads << std::endl;
3849
out << " On disk data location = " << conf._disk_dir;
3950
return out;
4051
}

src/sparse_sketch.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,17 +252,28 @@ void SparseSketch::update(const vec_t update_idx) {
252252
SketchBucket::Depths depths;
253253

254254
// Update higher depth buckets
255-
for (size_t i = 0; i < num_columns; i++) {
256-
size_t bit = i & 0x1;
257-
if (bit == 0) {
258-
depths = SketchBucket::get_index_depths(update_idx, column_seed(i), bkt_per_col);
255+
for (size_t i = 0; i < num_columns - 1; i += 2) {
256+
depths = SketchBucket::get_index_depths(update_idx, column_seed(i), bkt_per_col);
257+
for (size_t j = 0; j < 2; j++) {
258+
col_hash_t depth = depths[j];
259+
likely_if(depth < bkt_per_col) {
260+
likely_if(depth < num_dense_rows) {
261+
SketchBucket::update(bucket(i + j, depth), update_idx, checksum);
262+
} else {
263+
update_sparse(i + j, {uint8_t(-1), uint8_t(depth), {update_idx, checksum}});
264+
}
265+
}
259266
}
260-
col_hash_t depth = depths[bit];
267+
}
268+
if ((num_columns & 0x1) == 1) {
269+
size_t col = num_columns - 1;
270+
271+
size_t depth = SketchBucket::get_index_depth(update_idx, column_seed(col), bkt_per_col);
261272
likely_if(depth < bkt_per_col) {
262273
likely_if(depth < num_dense_rows) {
263-
SketchBucket::update(bucket(i, depth), update_idx, checksum);
274+
SketchBucket::update(bucket(col, depth), update_idx, checksum);
264275
} else {
265-
update_sparse(i, {uint8_t(-1), uint8_t(depth), {update_idx, checksum}});
276+
update_sparse(col, {uint8_t(-1), uint8_t(depth), {update_idx, checksum}});
266277
}
267278
}
268279
}
@@ -325,8 +336,14 @@ SketchSample SparseSketch::sample() {
325336
return sample;
326337
}
327338

339+
// if dense region is densely populated then only check the "deepest" few rows
340+
int dense_row_min = 0;
341+
if (number_of_sparse_buckets > num_columns || num_dense_rows > min_num_dense_rows) {
342+
dense_row_min = num_dense_rows - num_dense_to_sample;
343+
}
344+
328345
for (size_t c = 0; c < cols_per_sample; ++c) {
329-
for (int r = num_dense_rows - 1; r >= 0; --r) { // TODO: Consider reducing the number of dense rows checked to 1 or 2
346+
for (int r = num_dense_rows - 1; r >= dense_row_min; --r) {
330347
if (SketchBucket::is_good(bucket(c + first_column, r), checksum_seed())) {
331348
// std::cout << "Found GOOD dense bucket" << std::endl;
332349
return {bucket(c + first_column, r).alpha, GOOD};

test/cc_alg_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ TEST(CCAlgTest, InsertOnlyStream) {
299299

300300
TEST(CCAlgTest, MTStreamWithMultipleQueries) {
301301
for (int t = 1; t <= 3; t++) {
302-
auto driver_config = DriverConfiguration().gutter_sys(STANDALONE);
302+
auto driver_config = DriverConfiguration().gutter_sys(STANDALONE).stream_threads(4);
303303

304304
const std::string fname = __FILE__;
305305
size_t pos = fname.find_last_of("\\/");
@@ -313,7 +313,7 @@ TEST(CCAlgTest, MTStreamWithMultipleQueries) {
313313
std::cerr << num_nodes << " " << num_edges << std::endl;
314314

315315
CCSketchAlg cc_alg{num_nodes, get_seed()};
316-
GraphSketchDriver<CCSketchAlg> driver(&cc_alg, &stream, driver_config, 4);
316+
GraphSketchDriver<CCSketchAlg> driver(&cc_alg, &stream, driver_config);
317317
GraphVerifier verify(num_nodes);
318318

319319
size_t num_queries = 10;

tools/process_stream.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,14 @@ int main(int argc, char **argv) {
8484
std::cout << "num_updates = " << num_updates << std::endl;
8585
std::cout << std::endl;
8686

87-
auto driver_config = DriverConfiguration().gutter_sys(CACHETREE).worker_threads(num_threads);
88-
driver_config.gutter_conf().wq_batch_per_elm(4);
89-
auto cc_config = CCAlgConfiguration().batch_factor(1);
87+
auto driver_config = DriverConfiguration()
88+
.gutter_sys(CACHETREE)
89+
.worker_threads(num_threads)
90+
.stream_threads(reader_threads);
91+
driver_config.gutter_conf().wq_batch_per_elm(2);
92+
auto cc_config = CCAlgConfiguration().batch_factor(1.2);
9093
CCSketchAlg cc_alg{num_nodes, get_seed(), cc_config};
91-
GraphSketchDriver<CCSketchAlg> driver{&cc_alg, &stream, driver_config, reader_threads};
94+
GraphSketchDriver<CCSketchAlg> driver{&cc_alg, &stream, driver_config};
9295

9396
auto ins_start = std::chrono::steady_clock::now();
9497
std::thread querier(track_insertions, num_updates, &driver, ins_start);

0 commit comments

Comments
 (0)