@@ -19,31 +19,41 @@ namespace ipxp::output {
1919template <typename ElementType>
2020class B2OutputStorage : public BOutputStorage <ElementType> {
2121public:
22- explicit B2OutputStorage (const uint8_t writersCount) noexcept
23- : BOutputStorage<ElementType>(writersCount)
22+ explicit B2OutputStorage (
23+ const uint8_t expectedWritersCount,
24+ const uint8_t expectedReadersCount,
25+ std::shared_ptr<AllocationBufferBase<ReferenceCounter<OutputContainer<ElementType>>>>
26+ allocationBuffer) noexcept
27+ : BOutputStorage<ElementType>(expectedWritersCount, expectedReadersCount, allocationBuffer)
2428 {
2529 }
2630
27- bool write (ElementType* element, const uint8_t writerIndex) noexcept override
31+ bool write (
32+ const Reference<OutputContainer<ElementType>>& container,
33+ const uint8_t writerIndex) noexcept override
2834 {
2935 typename BOutputStorage<ElementType>::WriterData& writerData
3036 = this ->m_writersData [writerIndex].get ();
3137 const uint16_t containersLeft = writerData.bucketAllocation .containersLeft ();
3238 switch (containersLeft) {
3339 case 1 : {
34- this ->m_allocationBuffer ->replace (
40+ /* this->m_allocationBuffer->replace(
3541 this->getNextElement(writerData.bucketAllocation),
36- element,
37- writerIndex);
42+ container,
43+ writerIndex);*/
44+ this ->getNextElement (writerData.bucketAllocation )
45+ .assign (container, this ->makeDeallocationCallback (writerIndex));
3846 }
3947 [[fallthrough]];
4048 case 0 :
4149 break ;
4250 default : {
43- this ->m_allocationBuffer ->replace (
51+ /* this->m_allocationBuffer->replace(
4452 this->getNextElement(writerData.bucketAllocation),
45- element,
46- writerIndex);
53+ container,
54+ writerIndex);*/
55+ this ->getNextElement (writerData.bucketAllocation )
56+ .assign (container, this ->makeDeallocationCallback (writerIndex));
4757 return true ;
4858 }
4959 }
@@ -53,19 +63,19 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
5363 do {
5464 const bool overflowed = writerData.randomShift ();
5565 if (overflowed) {
56- writerData.cachedLowestReaderGeneration = this -> m_lowestReaderGeneration . load ();
57- if (containersLeft == 0 ) {}
66+ writerData.cachedLowestReaderGeneration
67+ = this -> m_lowestReaderGeneration . load (std::memory_order_acquire);
5868 backoffScheme.backoff ();
5969 }
6070
61- if (this ->m_buckets [writerData.writePosition ].generation
71+ if (this ->m_buckets [writerData.writePosition ].generation . load (std::memory_order_acquire)
6272 >= writerData.cachedLowestReaderGeneration
6373 || !BOutputStorage<ElementType>::BucketAllocation::isValidBucketIndex (
6474 this ->m_buckets [writerData.writePosition ].bucketIndex )
6575 || !this ->m_buckets [writerData.writePosition ].lock .try_lock ()) {
6676 continue ;
6777 }
68- if (this ->m_buckets [writerData.writePosition ].generation
78+ if (this ->m_buckets [writerData.writePosition ].generation . load (std::memory_order_acquire)
6979 >= writerData.cachedLowestReaderGeneration
7080 || !BOutputStorage<ElementType>::BucketAllocation::isValidBucketIndex (
7181 this ->m_buckets [writerData.writePosition ].bucketIndex )) {
@@ -77,39 +87,37 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
7787
7888 this ->m_buckets [writerData.writePosition ].bucketIndex = writerData.bucketAllocation .reset (
7989 this ->m_buckets [writerData.writePosition ].bucketIndex );
80- std::atomic_thread_fence (std::memory_order_release);
90+ // std::atomic_thread_fence(std::memory_order_release);
8191
8292 const uint64_t highestReaderGeneration
8393 = this ->m_highestReaderGeneration .load (std::memory_order_acquire);
84- if (writerData.generation
94+ if (writerData.generation . load (std::memory_order_acquire)
8595 < highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE) {
86- writerData.generation
87- = highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE;
96+ writerData.generation .store (
97+ highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE,
98+ std::memory_order_release);
8899 // casMax(m_highestWriterGeneration, writerData.generation);
89100 }
90- this ->m_buckets [writerData.writePosition ].generation = writerData.generation ;
101+ this ->m_buckets [writerData.writePosition ].generation .store (
102+ writerData.generation .load (std::memory_order_acquire),
103+ std::memory_order_release);
91104
92105 this ->m_buckets [writerData.writePosition ].lock .unlock ();
93106
94107 if (containersLeft == 0 ) {
95- this ->m_allocationBuffer ->replace (
96- this ->getNextElement (writerData.bucketAllocation ),
97- element,
98- writerIndex);
108+ this ->getNextElement (writerData.bucketAllocation )
109+ .assign (container, this ->makeDeallocationCallback (writerIndex));
99110 }
100111 return true ;
101112 }
102113
103- ElementType* read (
104- const std::size_t readerGroupIndex,
105- const uint8_t localReaderIndex,
106- const uint8_t globalReaderIndex) noexcept override
114+ OutputContainer<ElementType>* read (const uint8_t readerIndex) noexcept override
107115 {
108116 typename BOutputStorage<ElementType>::ReaderData& readerData
109- = this ->m_readersData [globalReaderIndex ].get ();
117+ = this ->m_readersData [readerIndex ].get ();
110118 // const uint64_t readPosition = readerData.readPosition;
111119 if (readerData.bucketAllocation .containersLeft ()) {
112- return this ->getNextElement (readerData.bucketAllocation );
120+ return & this ->getNextElement (readerData.bucketAllocation ). getData ( );
113121 }
114122
115123 // uint8_t loopCounter = 0;
@@ -118,12 +126,13 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
118126 // const uint16_t initialPosition = readerData.readPosition;
119127 BackoffScheme backoffScheme (0 , std::numeric_limits<std::size_t >::max ());
120128 do {
121- readerData.shift (this ->m_readerGroupSizes [readerGroupIndex], localReaderIndex );
129+ const bool overflowed = readerData.shift (this ->m_expectedReadersCount , readerIndex );
122130
123131 // auto& y = this->m_buckets[readerData.readPosition];
124- if (readerData.isOnBufferBegin (this ->m_readerGroupSizes [readerGroupIndex])) {
132+ // if (readerData.isOnBufferBegin(this->m_expectedReadersCount)) {
133+ if (overflowed) {
125134 if (!this ->writersPresent ()) {
126- readerData.generation ++ ;
135+ readerData.generation . fetch_add ( 1 , std::memory_order_release) ;
127136 updateLowestReaderGeneration ();
128137 return nullptr ;
129138 }
@@ -133,25 +142,27 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
133142 readerData.skipLoop = true ;
134143 return nullptr ;
135144 }
136- readerData.generation ++ ;
145+ readerData.generation . fetch_add ( 1 , std::memory_order_release) ;
137146 readerData.seenValidBucket = false ;
138147 readerData.skipLoop = false ;
139148 updateLowestReaderGeneration ();
140149 }
141- cachedGeneration = this ->m_buckets [readerData.readPosition ].generation ;
142- std::atomic_thread_fence (std::memory_order_acquire);
150+ cachedGeneration = this ->m_buckets [readerData.readPosition ].generation .load (
151+ std::memory_order_acquire);
152+ // std::atomic_thread_fence(std::memory_order_acquire);
143153 cachedBucketIndex = this ->m_buckets [readerData.readPosition ].bucketIndex ;
144- if (cachedGeneration >= readerData.generation + 2 ) {
154+ if (cachedGeneration >= readerData.generation .load (std::memory_order_acquire)
155+ + BOutputStorage<ElementType>::WINDOW_SIZE) {
145156 readerData.seenValidBucket = true ;
146157 }
147- } while (cachedGeneration != readerData.generation
158+ } while (cachedGeneration != readerData.generation . load (std::memory_order_acquire)
148159 || !BOutputStorage<ElementType>::BucketAllocation::isValidBucketIndex (
149160 cachedBucketIndex));
150161
151162 readerData.seenValidBucket = true ;
152163 readerData.bucketAllocation .reset (this ->m_buckets [readerData.readPosition ].bucketIndex );
153164
154- return this ->getNextElement (readerData.bucketAllocation );
165+ return & this ->getNextElement (readerData.bucketAllocation ). getData ( );
155166 }
156167
157168 /* bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
@@ -166,7 +177,9 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
166177 = this ->m_readersData
167178 | std::views::transform (
168179 [](const CacheAlligned<typename BOutputStorage<ElementType>::ReaderData>&
169- readerDataAlligned) { return readerDataAlligned->generation ; })
180+ readerDataAlligned) {
181+ return readerDataAlligned->generation .load (std::memory_order_acquire);
182+ })
170183 | std::ranges::to<boost::container::static_vector<
171184 uint64_t ,
172185 OutputStorage<ElementType>::MAX_READERS_COUNT>>();
@@ -175,7 +188,7 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
175188 // m_highestReaderGeneration = highestReaderGeneration;
176189 const uint64_t lowestReaderGeneration = *std::ranges::min_element (readerGenerations);
177190 // casMin(m_lowestReaderGeneration, lowestReaderGeneration);
178- this ->m_lowestReaderGeneration = lowestReaderGeneration;
191+ this ->m_lowestReaderGeneration . store ( lowestReaderGeneration, std::memory_order_release) ;
179192 }
180193
181194 std::atomic<uint64_t > m_highestWriterGeneration {0 };
0 commit comments