Skip to content

Commit a38eb12

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent 7898371 commit a38eb12

2 files changed

Lines changed: 32 additions & 8 deletions

File tree

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferS.hpp

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ template<typename ElementType>
1919
class AllocationBufferS : public AllocationBufferBase<ElementType> {
2020
public:
2121
explicit AllocationBufferS(const std::size_t capacity, const uint8_t writersCount) noexcept
22-
: m_objectPool(capacity + writersCount)
22+
: m_objectPool(capacity + writersCount * 4)
2323
{
2424
static_assert(
2525
std::atomic<HelpState>::is_always_lock_free,
@@ -73,10 +73,21 @@ class AllocationBufferS : public AllocationBufferBase<ElementType> {
7373
private:
7474
void handleStealRequest(const uint8_t writerIndex) noexcept
7575
{
76-
if (!m_helpStates[writerIndex]->load(std::memory_order_acquire).stealingRequested) {
77-
return;
78-
}
79-
m_helpStates[writerIndex]->store(HelpState {true, true}, std::memory_order_release);
76+
HelpState expected;
77+
HelpState desired;
78+
do {
79+
expected = m_helpStates[writerIndex]->load(std::memory_order_acquire);
80+
if (!expected.stealingRequested) {
81+
return;
82+
}
83+
desired = expected;
84+
desired.stealingAllowed = true;
85+
} while (!m_helpStates[writerIndex]->compare_exchange_weak(
86+
expected,
87+
desired,
88+
std::memory_order_release,
89+
std::memory_order_acquire));
90+
8091
while (m_helpStates[writerIndex]->load(std::memory_order_acquire).stealingRequested) {
8192
BackoffScheme(1, 0).backoff();
8293
}
@@ -95,8 +106,9 @@ class AllocationBufferS : public AllocationBufferBase<ElementType> {
95106
if (!setStealRequest(stealVictimIndex)) {
96107
continue;
97108
}
98-
while (!isStealAllowed(stealVictimIndex)) {
99-
BackoffScheme(1, 0).backoff();
109+
if (!waitForStealApproval(stealVictimIndex)) {
110+
clearStealRequest(stealVictimIndex);
111+
continue;
100112
}
101113
WriterData& stealVictimData = m_writersData[stealVictimIndex].get();
102114
for (std::size_t i = 0; i < stealVictimData.storage.size() / 2; i++) {
@@ -108,6 +120,17 @@ class AllocationBufferS : public AllocationBufferBase<ElementType> {
108120
}
109121
}
110122

123+
bool waitForStealApproval(const uint8_t victimIndex) noexcept
124+
{
125+
BackoffScheme backoff(200, 5);
126+
while (!isStealAllowed(victimIndex)) {
127+
if (!backoff.backoff()) {
128+
return false;
129+
}
130+
}
131+
return true;
132+
}
133+
111134
bool setStealRequest(const uint8_t victimIndex) noexcept
112135
{
113136
HelpState expected;

include/ipfixprobe/outputPlugin/outputStorage/outputStorageRegistrar.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ class OutputStorageRegistrar {
2424
m_allocationBuffer
2525
= std::make_shared<AllocationBufferS<ReferenceCounter<OutputContainer<ElementType>>>>(
2626
OutputStorage<ElementType>::STORAGE_CAPACITY
27-
* OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT,
27+
//* OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT
28+
,
2829
writersCount);
2930
}
3031

0 commit comments

Comments
 (0)