44#include < utility>
55#include < atomic>
66#include < vector>
7+ #include < exception>
78#include " types.h"
89
9- struct update_batch {
10- node_id_t node_idx;
11- std::vector<node_id_t > upd_vec;
12- };
13-
10+ template <class T > // templatized by data type we're storing
1411class WorkQueue {
1512 public:
1613 class DataNode {
1714 private:
1815 // LL next pointer
1916 DataNode *next = nullptr ;
20- std::vector<update_batch> batches ;
17+ std::vector<T> data_batch ;
2118
22- DataNode (const size_t batch_per_elm, const size_t vec_size) {
23- batches.resize (batch_per_elm);
24- for (size_t i = 0 ; i < batch_per_elm; i++) {
25- batches[i].upd_vec .reserve (vec_size);
26- }
27- }
2819 friend class WorkQueue ;
2920 public:
30- const std::vector<update_batch >& get_batches () { return batches ; }
21+ const std::vector<T >& get_batches () { return data_batch ; }
3122 };
3223
33- /*
24+ /* *
3425 * Construct a work queue
35- * The number of elements in the queue is num_batches / batch_per_elm
36- * As a consequence num_batches is rounded up to the nearest multiple of batch_per_elm
37- * @param num_batches the rough number of batches to have in the queue
38- * @param max_batch_size the maximum size of a batch
39- * @param batch_per_elm number of batches per queue element.
26+ * @param num_queue_elements the rough number of batches to have in the queue
27+ * @param data_per_elm number of batches per queue element.
28+ */
29+ WorkQueue (size_t num_queue_elements, size_t data_per_elm)
30+ : len(num_queue_elements), batch_per_elm(data_per_elm) {
31+ non_block = false ;
32+
33+ // place all nodes of linked list in the producer queue and reserve
34+ // memory for the vectors
35+ for (size_t i = 0 ; i < len; i++) {
36+ // create and reserve space for updates
37+ DataNode *node = new DataNode ();
38+ node->next = producer_list; // next of node is head
39+ producer_list = node; // set head to new node
40+ }
41+ }
42+ ~WorkQueue () {
43+ // free data from the queues
44+ // grab locks to ensure that list variables aren't old due to cpu caching
45+ producer_list_lock.lock ();
46+ consumer_list_lock.lock ();
47+ while (producer_list != nullptr ) {
48+ DataNode *temp = producer_list;
49+ producer_list = producer_list->next ;
50+ delete temp;
51+ }
52+ while (consumer_list != nullptr ) {
53+ DataNode *temp = consumer_list;
54+ consumer_list = consumer_list->next ;
55+ delete temp;
56+ }
57+ producer_list_lock.unlock ();
58+ consumer_list_lock.unlock ();
59+ }
60+
61+ /* *
62+ * Initialize the queue pointers to point at actual data instead of nullptrs
63+ * If this function is called, IT MUST be called before performing any operations with the queue
64+ * The queue can also work without initializing pointers, so long as the pointers returned from
65+ * push being null is acceptable. (i.e. user initializes after push or does not need the returned
66+ * pointer)
67+ * @param data_batches a vector of data batches that will start in the queue but is swapped with
68+ * data that is pushed into the queue.
4069 */
41- WorkQueue (size_t num_batches, size_t max_batch_size, size_t batch_per_elm);
42- ~WorkQueue ();
70+ void populate_queue (std::vector<std::vector<T>> data_batches) {
71+ if (data_batches.size () != len) {
72+ throw std::invalid_argument (" WQ: Error number of initialized data batches incorrect" );
73+ }
74+ DataNode *data = producer_list; // head of producer list
75+ for (size_t i = 0 ; i < len; i++) {
76+ if (data_batches[i].size () != batch_per_elm) {
77+ throw std::invalid_argument (" WQ: Error number of data elements per batch incorrect" );
78+ }
79+ data->data_batch = data_batches[i];
80+ data = data->next ;
81+ }
82+ }
4383
44- /*
84+ /* *
4585 * Add a data element to the queue
4686 * @param upd_vec_batch vector of graph node id the associated updates
4787 */
48- void push (std::vector<update_batch> &upd_vec_batch);
88+ void push (std::vector<T> &upd_vec_batch) {
89+ if (upd_vec_batch.size () > batch_per_elm) {
90+ throw std::runtime_error (" WQ: Too many batches in call to push " +
91+ std::to_string (upd_vec_batch.size ()) + " > " + std::to_string (batch_per_elm));
92+ }
93+ std::unique_lock<std::mutex> lk (producer_list_lock);
94+ producer_condition.wait (lk, [this ]{return !full ();});
95+
96+ // printf("WQ: Push:\n");
97+ // print();
98+
99+ // remove head from produce_list
100+ DataNode *node = producer_list;
101+ producer_list = producer_list->next ;
102+ lk.unlock ();
103+
104+ // swap the batch vectors to perform the update
105+ std::swap (node->data_batch , upd_vec_batch);
106+
107+ // add this block to the consumer queue for processing
108+ consumer_list_lock.lock ();
109+ node->next = consumer_list;
110+ consumer_list = node;
111+ consumer_list_lock.unlock ();
112+ consumer_condition.notify_one ();
113+ }
49114
50- /*
115+ /* *
51116 * Get data from the queue for processing
52117 * @param data where to place the Data
53118 * @return true if we were able to get good data, false otherwise
54119 */
55- bool peek (DataNode *&data);
120+ bool pop (DataNode *&data) {
121+ // wait while queue is empty
122+ // printf("waiting to peek\n");
123+ std::unique_lock<std::mutex> lk (consumer_list_lock);
124+ consumer_condition.wait (lk, [this ]{return !empty () || non_block;});
125+
126+ // printf("WQ: Peek\n");
127+ // print();
128+
129+ // if non_block and queue is empty then there is no data to get
130+ // so inform the caller of this
131+ if (empty ()) {
132+ lk.unlock ();
133+ return false ;
134+ }
56135
57- /*
58- * Wait until the work queue has enough items in it to satisfy the request and then
59- * @param node_vec where to place the batch of Data
60- * @param batch_size the amount of Data requested
61- * return true if able to get good data, false otherwise
62- */
63- bool peek_batch (std::vector<DataNode *> &node_vec, size_t batch_size);
64-
65- /*
136+ // remove head from consumer_list and release lock
137+ DataNode *node = consumer_list;
138+ consumer_list = consumer_list->next ;
139+ lk.unlock ();
140+
141+ data = node;
142+ return true ;
143+ }
144+
145+ /* *
66146 * After processing data taken from the work queue call this function
67147 * to mark the node as ready to be overwritten
68148 * @param data the LL node that we have finished processing
69149 */
70- void peek_callback (DataNode *data);
71-
72- /*
73- * A batched version of peek_callback that avoids locking on every DataNode
74- */
75- void peek_batch_callback (const std::vector<DataNode *> &node_vec);
150+ void pop_callback (DataNode *node) {
151+ producer_list_lock.lock ();
152+ // printf("WQ: Callback\n");
153+ // print();
154+ node->next = producer_list;
155+ producer_list = node;
156+ producer_list_lock.unlock ();
157+ producer_condition.notify_one ();
158+ // printf("WQ: Callback done\n");
159+ }
76160
77- void set_non_block (bool _block);
161+ void set_non_block (bool _block) {
162+ consumer_list_lock.lock ();
163+ non_block = _block;
164+ consumer_list_lock.unlock ();
165+ consumer_condition.notify_all ();
166+ }
78167
79- /*
168+ /* *
80169 * Function which prints the work queue
81170 * Used for debugging
82171 */
83- void print ();
172+ void print () {
173+ std::string to_print = " " ;
174+
175+ int p_size = 0 ;
176+ DataNode *temp = producer_list;
177+ while (temp != nullptr ) {
178+ to_print += std::to_string (p_size) + " : " + std::to_string ((uint64_t )temp) + " \n " ;
179+ temp = temp->next ;
180+ ++p_size;
181+ }
182+ int c_size = 0 ;
183+ temp = consumer_list;
184+ while (temp != nullptr ) {
185+ to_print += std::to_string (c_size) + " : " + std::to_string ((uint64_t )temp) + " \n " ;
186+ temp = temp->next ;
187+ ++c_size;
188+ }
189+ printf (" WQ: producer_queue size = %i consumer_queue size = %i\n %s" , p_size, c_size, to_print.c_str ());
190+ }
84191
85192 // functions for checking if the queue is empty or full
86193 inline bool full () {return producer_list == nullptr ;} // if producer queue empty, wq full
@@ -91,7 +198,6 @@ class WorkQueue {
91198 DataNode *consumer_list = nullptr ; // list of nodes with data for reading
92199
93200 const size_t len; // number of elments in queue
94- const size_t max_batch_size; // maximum batch size
95201 const size_t batch_per_elm; // number of batches per work queue element
96202
97203 // locks and condition variables for producer list
@@ -106,16 +212,3 @@ class WorkQueue {
106212 // or return false on failure (true)
107213 bool non_block;
108214};
109-
110- class WriteTooBig : public std ::exception {
111- private:
112- const std::string message;
113-
114- public:
115- WriteTooBig (std::string message) :
116- message (message) {}
117-
118- virtual const char *what () const throw() {
119- return message.c_str ();
120- }
121- };
0 commit comments