Skip to content

Commit e89ddf3

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent b704d19 commit e89ddf3

3 files changed

Lines changed: 20 additions & 25 deletions

File tree

include/ipfixprobe/outputPlugin/outputStorage/backoffScheme.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ class BackoffScheme {
1919
bool backoff() noexcept
2020
{
2121
if (m_waitCounter < m_shortWaitThreshold) {
22-
for (const auto _ : std::views::iota(0, 10'000)) {
23-
asm volatile("pause" ::: "memory");
24-
}
22+
asm volatile("pause" ::: "memory");
23+
/*for (const auto _ : std::views::iota(0, 10'000)) {
24+
}*/
2525

2626
} else if (m_waitCounter < m_longWaitThreshold) {
2727
std::this_thread::yield();

include/ipfixprobe/outputPlugin/outputStorage/ffq2OutputStorage.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class FFQ2OutputStorage : public FFQOutputStorage<ElementType> {
3131
throw std::runtime_error("Container read more times than there are reader groups.");
3232
}*/
3333
BackoffScheme backoffScheme(70, std::numeric_limits<std::size_t>::max());
34-
const uint64_t writeRank = this->m_writeRank.fetch_add(1, std::memory_order_acq_rel);
34+
const uint64_t writeRank = this->m_writeRank->fetch_add(1, std::memory_order_acq_rel);
3535
const uint64_t writeIndex = writeRank % OutputStorage<ElementType>::STORAGE_CAPACITY;
3636
while (!this->m_cells[writeIndex].state.tryToSetWriter()) {
3737
backoffScheme.backoff();
@@ -56,13 +56,13 @@ class FFQ2OutputStorage : public FFQOutputStorage<ElementType> {
5656
.state.setReadingFinished();
5757
this->m_readersData[readerIndex]->lastReadIndex = std::nullopt;
5858
}
59-
const uint64_t readRank = this->m_readRank.fetch_add(1, std::memory_order_acq_rel);
59+
const uint64_t readRank = this->m_readRank->fetch_add(1, std::memory_order_acq_rel);
6060
const uint64_t readIndex = readRank % OutputStorage<ElementType>::STORAGE_CAPACITY;
61-
while (readRank >= this->m_writeRank.load(std::memory_order_acquire)
61+
while (readRank >= this->m_writeRank->load(std::memory_order_acquire)
6262
&& this->writersPresent()) {
6363
backoffScheme.backoff();
6464
}
65-
if (readRank >= this->m_writeRank.load(std::memory_order_acquire)) {
65+
if (readRank >= this->m_writeRank->load(std::memory_order_acquire)) {
6666
return nullptr;
6767
}
6868

@@ -82,8 +82,8 @@ class FFQ2OutputStorage : public FFQOutputStorage<ElementType> {
8282
return !this->writersPresent() &&
8383
// m_readRanks[readerGroupIndex].get() % ALLOCATION_BUFFER_CAPACITY
8484
//== m_writeRank.load() % ALLOCATION_BUFFER_CAPACITY;
85-
this->m_readRank.load(std::memory_order_acquire)
86-
> this->m_writeRank.load(std::memory_order_acquire);
85+
this->m_readRank->load(std::memory_order_acquire)
86+
> this->m_writeRank->load(std::memory_order_acquire);
8787
}
8888

8989
private:

include/ipfixprobe/outputPlugin/outputStorage/ffqOutputStorage.hpp

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class FFQOutputStorage : public OutputStorage<ElementType> {
2727
{
2828
BackoffScheme backoffScheme(70, 1);
2929
while (true) {
30-
const uint64_t writeRank = this->m_writeRank.fetch_add(1, std::memory_order_acq_rel);
30+
const uint64_t writeRank = this->m_writeRank->fetch_add(1, std::memory_order_acq_rel);
3131
const uint64_t writeIndex = writeRank % OutputStorage<ElementType>::STORAGE_CAPACITY;
3232
if (
3333
/*(m_storage[writeIndex].empty()
@@ -59,7 +59,7 @@ class FFQOutputStorage : public OutputStorage<ElementType> {
5959
m_cells[*m_readersData[readerIndex]->lastReadIndex].state.setReadingFinished();
6060
}
6161
while (!finished()) {
62-
const uint64_t readRank = m_readRank.fetch_add(1, std::memory_order_acq_rel);
62+
const uint64_t readRank = m_readRank->fetch_add(1, std::memory_order_acq_rel);
6363
const uint64_t readIndex = readRank % OutputStorage<ElementType>::STORAGE_CAPACITY;
6464
if (m_cells[readIndex].state.tryToSetReadingStarted()) {
6565
// std::atomic_thread_fence(std::memory_order_acquire);
@@ -77,8 +77,10 @@ class FFQOutputStorage : public OutputStorage<ElementType> {
7777
bool finished() noexcept override
7878
{
7979
return !this->writersPresent()
80-
&& m_readRank % OutputStorage<ElementType>::STORAGE_CAPACITY
81-
== m_writeRank.load() % OutputStorage<ElementType>::STORAGE_CAPACITY;
80+
&& m_readRank->load(std::memory_order_acquire)
81+
% OutputStorage<ElementType>::STORAGE_CAPACITY
82+
== m_writeRank->load(std::memory_order_acquire)
83+
% OutputStorage<ElementType>::STORAGE_CAPACITY;
8284
}
8385

8486
protected:
@@ -166,27 +168,20 @@ class FFQOutputStorage : public OutputStorage<ElementType> {
166168
struct Cell {
167169
constexpr static uint64_t INVALID_RANK = std::numeric_limits<uint64_t>::max();
168170

169-
uint64_t rank;
171+
// uint64_t rank;
170172
ReaderGroupState state;
171-
bool gap;
173+
// bool gap;
172174
};
173175

174176
struct ReaderData {
175177
std::optional<uint16_t> lastReadIndex {0};
176178
};
177179

178-
boost::container::static_vector<Cell, OutputStorage<ElementType>::STORAGE_CAPACITY> m_cells;
179-
/*std::span<Cell> d_cells {
180-
m_cells.data(),
181-
OutputStorage<ElementType>::ALLOCATION_BUFFER_CAPACITY};*/
182-
std::atomic<uint64_t> m_writeRank {0};
183-
/*std::array<
184-
CacheAlligned<std::atomic<uint64_t>>,
185-
OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT>
186-
m_readRanks;*/
187-
std::atomic<uint64_t> m_readRank {0};
188180
std::array<CacheAlligned<ReaderData>, OutputStorage<ElementType>::MAX_READERS_COUNT>
189181
m_readersData;
182+
boost::container::static_vector<Cell, OutputStorage<ElementType>::STORAGE_CAPACITY> m_cells;
183+
CacheAlligned<std::atomic<uint64_t>> m_writeRank {0};
184+
CacheAlligned<std::atomic<uint64_t>> m_readRank {0};
190185
};
191186

192187
} // namespace ipxp::output

0 commit comments

Comments
 (0)