Skip to content

Commit f654ded

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent 45a31e3 commit f654ded

1 file changed

Lines changed: 34 additions & 8 deletions

File tree

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferB.hpp

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@ namespace ipxp::output {
1919

2020
template<typename ElementType>
2121
class AllocationBufferB : public AllocationBufferBase<ElementType> {
22-
constexpr static std::size_t BUCKET_SIZE = 64;
22+
constexpr static std::size_t BUCKET_SIZE = 16;
2323
constexpr static std::size_t INDEXES_IN_CACHE_LINE = 64 / sizeof(uint16_t);
24-
constexpr static std::size_t WINDOW_SIZE = 4;
24+
constexpr static std::size_t WINDOW_SIZE = 16;
2525

2626
public:
27+
__attribute__((noinline)) std::size_t d_test(auto& container)
28+
{
29+
return std::ranges::count_if(container, [](const auto& bucket) {
30+
return bucket.load(std::memory_order_acquire) != std::numeric_limits<uint16_t>::max();
31+
});
32+
}
33+
2734
explicit AllocationBufferB(const std::size_t capacity, const uint8_t writersCount) noexcept
2835
: m_objectPool(capacity + writersCount * BUCKET_SIZE)
2936
, m_buckets(m_objectPool.size() / BUCKET_SIZE)
@@ -46,6 +53,10 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
4653
m_fullBuckets[i].store(i);
4754
m_emptyBuckets[i].store(Bucket::PLACEHOLDER);
4855
}
56+
for (std::size_t i = m_buckets.size(); i < m_fullBuckets.size(); i++) {
57+
m_fullBuckets[i].store(Bucket::PLACEHOLDER);
58+
m_emptyBuckets[i].store(Bucket::PLACEHOLDER);
59+
}
4960
m_writersData.resize(writersCount);
5061
for (uint8_t writerIndex = 0; writerIndex < writersCount; writerIndex++) {
5162
m_writersData[writerIndex]->fullPushRank = writerIndex * INDEXES_IN_CACHE_LINE;
@@ -124,39 +135,54 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
124135

125136
void pushBucket(auto& buckets, const std::size_t bucketIndex, std::size_t& pushRank) noexcept
126137
{
138+
std::size_t offset = 0;
127139
while (true) {
128140
uint16_t expected = buckets[pushRank].load(std::memory_order_acquire);
141+
if (++offset % 100'000'000 == 0) {
142+
std::cout << "d_test(push)=" << d_test(buckets) << "\n";
143+
}
129144
if (expected != Bucket::PLACEHOLDER) {
130-
pushRank = ((pushRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE)
131-
% m_buckets.size();
145+
const std::size_t newPushRank
146+
= ((pushRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE + offset)
147+
% buckets.size();
148+
if (newPushRank < pushRank) {
149+
// offset++;
150+
}
151+
pushRank = newPushRank;
132152
continue;
133153
}
134154
if (buckets[pushRank].compare_exchange_weak(
135155
expected,
136156
bucketIndex,
137157
std::memory_order_release,
138158
std::memory_order_acquire)) {
139-
pushRank = (pushRank + 1) % m_buckets.size();
159+
pushRank = (pushRank + 1) % buckets.size();
140160
return;
141161
}
142162
}
143163
}
144164

145165
uint16_t popBucket(auto& buckets, std::size_t& popRank) noexcept
146166
{
167+
std::size_t offset = 0;
168+
popRank = (popRank - 1 + buckets.size()) % buckets.size();
147169
while (true) {
170+
if (++offset % 100'000'000 == 0) {
171+
std::cout << "d_test(pop)=" << d_test(buckets) << "\n";
172+
}
148173
uint16_t expected = buckets[popRank].load(std::memory_order_acquire);
149174
if (expected == Bucket::PLACEHOLDER) {
150-
popRank = ((popRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE)
151-
% m_buckets.size();
175+
popRank
176+
= (popRank / INDEXES_IN_CACHE_LINE * INDEXES_IN_CACHE_LINE - 1 + buckets.size())
177+
% buckets.size();
152178
continue;
153179
}
154180
if (buckets[popRank].compare_exchange_weak(
155181
expected,
156182
Bucket::PLACEHOLDER,
157183
std::memory_order_release,
158184
std::memory_order_acquire)) {
159-
popRank = (popRank + 1) % m_buckets.size();
185+
// popRank = (popRank - 1 + buckets.size()) % buckets.size();
160186
return expected;
161187
}
162188
}

0 commit comments

Comments
 (0)