-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathprocess_stream.cpp
More file actions
142 lines (119 loc) · 5.84 KB
/
process_stream.cpp
File metadata and controls
142 lines (119 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#include <graph_sketch_driver.h>
#include <cc_sketch_alg.h>
#include <binary_file_stream.h>
#include <thread>
#include <sys/resource.h> // for rusage
static bool shutdown = false;
static double get_max_mem_used() {
struct rusage data;
getrusage(RUSAGE_SELF, &data);
return (double) data.ru_maxrss / 1024.0;
}
static size_t get_seed() {
auto now = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
}
/*
* Function which is run in a seperate thread and will query
* the graph for the number of updates it has processed
* @param total the total number of edge updates
* @param g the graph object to query
* @param start_time the time that we started stream ingestion
*/
void track_insertions(uint64_t total, GraphSketchDriver<CCSketchAlg> *driver,
std::chrono::steady_clock::time_point start_time) {
total = total * 2; // we insert 2 edge updates per edge
printf("Insertions\n");
printf("Progress: | 0%%\r"); fflush(stdout);
std::chrono::steady_clock::time_point start = start_time;
std::chrono::steady_clock::time_point prev = start_time;
uint64_t prev_updates = 0;
while(true) {
sleep(1);
uint64_t updates = driver->get_total_updates();
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
std::chrono::duration<double> total_diff = now - start;
std::chrono::duration<double> cur_diff = now - prev;
// calculate the insertion rate
uint64_t upd_delta = updates - prev_updates;
// divide insertions per second by 2 because each edge is split into two updates
// we care about edges per second not about stream updates
size_t ins_per_sec = (((double)(upd_delta)) / cur_diff.count()) / 2;
if (updates >= total || shutdown)
break;
// display the progress
int progress = updates / (total * .05);
printf("Progress:%s%s", std::string(progress, '=').c_str(), std::string(20 - progress, ' ').c_str());
printf("| %i%% -- %lu per second\r", progress * 5, ins_per_sec); fflush(stdout);
}
printf("Progress:====================| Done \n");
return;
}
int main(int argc, char **argv) {
if (argc != 5) {
std::cerr << "ERROR: Incorrect number of arguments!" << std::endl;
std::cerr << "Arguments: stream_file, num_queries, graph_workers, reader_threads" << std::endl;
exit(EXIT_FAILURE);
}
shutdown = false;
std::string stream_file = argv[1];
int num_queries = std::atoi(argv[2]);
if (num_queries < 1 || num_queries > 1000) {
std::cerr << "ERROR: Invalid number of queries! Must be > 0 and <= 1000" << std::endl;
exit(EXIT_FAILURE);
}
int num_threads = std::atoi(argv[3]);
if (num_threads < 1) {
std::cerr << "ERROR: Invalid number of graph workers! Must be > 0." << std::endl;
exit(EXIT_FAILURE);
}
size_t reader_threads = std::atol(argv[4]);
double query_percent = 1.0 / num_queries;
size_t queries_in_stream = num_queries - 1;
BinaryFileStream stream(stream_file);
node_id_t num_nodes = stream.vertices();
size_t num_updates = stream.edges();
std::cout << "Processing stream: " << stream_file << std::endl;
std::cout << "nodes = " << num_nodes << std::endl;
std::cout << "num_updates = " << num_updates << std::endl;
std::cout << std::endl;
auto driver_config = DriverConfiguration().gutter_sys(CACHETREE).worker_threads(num_threads);
auto cc_config = CCAlgConfiguration().batch_factor(1.0);
CCSketchAlg cc_alg{num_nodes, get_seed(), cc_config};
GraphSketchDriver<CCSketchAlg> driver{&cc_alg, &stream, driver_config, reader_threads};
auto ins_start = std::chrono::steady_clock::now();
std::thread querier(track_insertions, num_updates, &driver, ins_start);
for (size_t q = 0; q < queries_in_stream; q++) {
driver.process_stream_until((q+1) * query_percent * num_updates);
auto cc_start = std::chrono::steady_clock::now();
driver.prep_query(CONNECTIVITY);
auto CC_num = cc_alg.connected_components().size();
std::chrono::duration<double> cc_time = std::chrono::steady_clock::now() - cc_start;
std::chrono::duration<double> flush_time = driver.flush_end - driver.flush_start;
std::chrono::duration<double> cc_alg_time = cc_alg.cc_alg_end - cc_alg.cc_alg_start;
std::cout << "Query " << q + 1 << std::endl;
std::cout << "Total CC query latency: " << cc_time.count() << std::endl;
std::cout << " Flush Gutters(sec): " << flush_time.count() << std::endl;
std::cout << " Boruvka's Algorithm(sec): " << cc_alg_time.count() << std::endl;
std::cout << "Connected Components: " << CC_num << std::endl;
}
// finish the stream
driver.process_stream_until(END_OF_STREAM);
auto cc_start = std::chrono::steady_clock::now();
driver.prep_query(CONNECTIVITY);
auto CC_num = cc_alg.connected_components().size();
std::chrono::duration<double> cc_time = std::chrono::steady_clock::now() - cc_start;
std::chrono::duration<double> flush_time = driver.flush_end - driver.flush_start;
std::chrono::duration<double> cc_alg_time = cc_alg.cc_alg_end - cc_alg.cc_alg_start;
std::chrono::duration<double> insert_time = driver.flush_end - ins_start;
shutdown = true;
querier.join();
double num_seconds = insert_time.count();
std::cout << "Total insertion time(sec): " << num_seconds << std::endl;
std::cout << "Updates per second: " << stream.edges() / num_seconds << std::endl;
std::cout << "Total CC query latency: " << cc_time.count() << std::endl;
std::cout << " Flush Gutters(sec): " << flush_time.count() << std::endl;
std::cout << " Boruvka's Algorithm(sec): " << cc_alg_time.count() << std::endl;
std::cout << "Connected Components: " << CC_num << std::endl;
std::cout << "Maximum Memory Usage(MiB): " << get_max_mem_used() << std::endl;
}