Skip to content

Commit 917c4f6

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent c0f103d commit 917c4f6

2 files changed

Lines changed: 21 additions & 9 deletions

File tree

include/ipfixprobe/outputPlugin/outputStorage/mqOutputStorage.hpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,22 @@ class MQOutputStorage : public OutputStorage<ElementType> {
196196
const uint64_t readPos = currentState.read.fetch_add(1, std::memory_order_acq_rel);
197197
if (readPos >= m_buffersSize) {
198198
const uint64_t readPosOfWriteBuffer = readPos - m_buffersSize;
199+
if (m_writerFinished.load(std::memory_order_acquire)) {
200+
if (readPosOfWriteBuffer
201+
< currentState.written.load(std::memory_order_acquire)) {
202+
return &currentState.writeBuffer[readPosOfWriteBuffer].getData();
203+
}
204+
return nullptr;
205+
}
206+
currentState.read.fetch_sub(1, std::memory_order_acq_rel);
207+
return nullptr;
208+
/*const uint64_t readPosOfWriteBuffer = readPos - m_buffersSize;
199209
if (m_writerFinished.load(std::memory_order_acquire)
200210
&& readPosOfWriteBuffer < currentState.written.load(std::memory_order_acquire))
201211
[[unlikely]] {
202212
return &currentState.writeBuffer[readPosOfWriteBuffer].getData();
203213
}
204-
return nullptr;
214+
return nullptr;*/
205215
}
206216
// ElementType* res = currentState.readBuffer[readPos];
207217
return &currentState.readBuffer[readPos].getData();
@@ -221,10 +231,14 @@ class MQOutputStorage : public OutputStorage<ElementType> {
221231

222232
bool finished() const noexcept
223233
{
224-
// const State& currentState = m_stateBuffer.getCurrentValue();
234+
const State& currentState = m_stateBuffer.getCurrentValue();
225235
return m_writerFinished.load(std::memory_order_acquire)
236+
&& currentState.read.load(std::memory_order_acquire)
237+
>= m_buffersSize + currentState.written.load(std::memory_order_acquire);
238+
// const State& currentState = m_stateBuffer.getCurrentValue();
239+
/*return m_writerFinished.load(std::memory_order_acquire)
226240
&& m_stateBuffer.getCurrentValue().read.load(std::memory_order_acquire)
227-
>= 2 * m_buffersSize;
241+
>= 2 * m_buffersSize;*/
228242
/*&& std::ranges::all_of(
229243
m_stateBuffer.getCurrentValue().,
230244
[&](const CacheAlligned<std::atomic<uint64_t>>& readPos) {

include/ipfixprobe/outputPlugin/outputStorage/ringOutputStorage.hpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,9 @@ class RingOutputStorage : public OutputStorage<ElementType> {
5151
if (pop == nullptr) {
5252
return nullptr;
5353
}
54-
OutputContainer<ElementType>* element
55-
= reinterpret_cast<OutputContainer<ElementType>*>(pop);
56-
m_lastReadContainer = element;
57-
return nullptr;
58-
return m_lastReadContainer;
54+
m_container.storage.clear();
55+
m_container.storage.push_back(nullptr);
56+
return &m_container;
5957
}
6058

6159
bool finished() noexcept override
@@ -65,7 +63,7 @@ class RingOutputStorage : public OutputStorage<ElementType> {
6563

6664
private:
6765
std::unique_ptr<ipx_ring_t, decltype(&ipx_ring_destroy)> m_ring;
68-
OutputContainer<ElementType>* m_lastReadContainer {nullptr};
66+
OutputContainer<ElementType> m_container;
6967
};
7068

7169
} // namespace ipxp::output

0 commit comments

Comments
 (0)