This repository was archived by the owner on May 31, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathMixer.cpp
More file actions
77 lines (66 loc) · 2.55 KB
/
Mixer.cpp
File metadata and controls
77 lines (66 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include "Mixer.h"
#include <gflags/gflags.h>
#include <jemalloc/jemalloc.h>
DEFINE_int32(producer_duration, 10000, "scales the length of producers. Making"
"this number higher means each producer runs for a long time.");
using std::make_unique;
using std::unique_ptr;
using std::shared_ptr;
using std::vector;
Mixer::Mixer(int numProducers, const Distribution &distr, int me,
vector<shared_ptr<ToFreeQueue>> toFreeQueues)
: producersRemaining_(numProducers),
toFreeQueues_(toFreeQueues), me_(me),
consumerIdPicker_(0, toFreeQueues.size() - 1) {
this->totalWeight_ = 0.0;
this->initProducers(distr);
this->producerWeightPicker_ = std::uniform_real_distribution<double>(0.0, this->totalWeight_);
}
void Mixer::addProducer(double weight, unique_ptr<Producer> p) {
this->totalWeight_ += weight;
this->producers_.push_back(std::move(p));
this->weightArray_.push_back(this->totalWeight_);
}
void Mixer::initProducers(const Distribution &distr) {
auto oneSecond = std::chrono::duration<double>(1.0);
std::uniform_int_distribution<int> vectorInitFuzzer(1, 100);
for (auto it = begin(distr); it != end(distr); ++it) {
addProducer(it->freq / 3.0,
std::move(make_unique<SimpleProducer>(it->size, FLAGS_producer_duration)));
// provide a bit of fuzziness to vector initial value
int vectorInit = vectorInitFuzzer(this->generator_);
addProducer(it->freq / 3.0,
std::move(make_unique<VectorProducer>(FLAGS_producer_duration, oneSecond, vectorInit)));
addProducer(it->freq / 3.0,
std::move(make_unique<LinkedListProducer>(it->size, FLAGS_producer_duration, oneSecond)));
}
}
const Producer &Mixer::pickProducer() {
double r = this->producerWeightPicker_(this->generator_);
int producerIndex;
for (producerIndex = 0; producerIndex < this->weightArray_.size(); ++producerIndex) {
if (r <= weightArray_[producerIndex]) {
break;
}
}
assert(producerIndex != this->weightArray_.size());
return *(this->producers_[producerIndex]);
}
// Picks next producer for the mixer to run. Currently uniform random choice
ToFreeQueue& Mixer::pickConsumer() {
int consumerIndex = this->consumerIdPicker_(this->generator_);
return *(this->toFreeQueues_[consumerIndex]);
}
void Mixer::run() {
while (this->producersRemaining_ > 0) {
this->toFreeQueues_[this->me_]->free();
// otherwise run a random producer
Allocation a = this->pickProducer().run();
if (!a.isEmpty()) {
this->pickConsumer().addToFree(std::move(a));
}
producersRemaining_--;
}
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
// Main loop will eventually cleanup memory
}