Skip to content

Commit 462edd6

Browse files
authored
Merge 75221fb into sapling-pr-archive-ktf
2 parents 36c29f4 + 75221fb commit 462edd6

17 files changed

+290
-182
lines changed

Framework/Core/include/Framework/DataModelViews.h

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "DomainInfoHeader.h"
1717
#include "SourceInfoHeader.h"
1818
#include "Headers/DataHeader.h"
19+
#include "Framework/DataRef.h"
1920
#include "Framework/TimesliceSlot.h"
2021
#include <ranges>
2122
#include <span>
@@ -80,10 +81,7 @@ struct count_parts {
8081
}
8182
};
8283

83-
struct DataRefIndices {
84-
size_t headerIdx;
85-
size_t payloadIdx;
86-
};
84+
// DataRefIndices is defined in Framework/DataRef.h
8785

8886
struct get_pair {
8987
size_t pairId;
@@ -127,6 +125,43 @@ struct get_pair {
127125
}
128126
};
129127

128+
// Advance from a DataRefIndices to the next one in O(1), reading only the
129+
// current header. Intended for use in iterators so that ++ is O(1) rather
130+
// than the O(n) while-loop that get_pair requires.
131+
//
132+
// New-style block (splitPayloadIndex == splitPayloadParts > 1):
133+
// layout: [header, payload_0, payload_1, ..., payload_{N-1}]
134+
// advance within block while payloads remain, then jump to the next block.
135+
//
136+
// Old-style block (splitPayloadIndex != splitPayloadParts, splitPayloadParts > 1)
137+
// or single pair (splitPayloadParts == 0):
138+
// layout: [header, payload] – always advance by two messages.
139+
struct get_next_pair {
140+
DataRefIndices current;
141+
template <typename R>
142+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
143+
friend DataRefIndices operator|(R&& r, get_next_pair self)
144+
{
145+
size_t hIdx = self.current.headerIdx;
146+
auto* header = o2::header::get<o2::header::DataHeader*>(r[hIdx]->GetData());
147+
if (!header) {
148+
throw std::runtime_error("Not a DataHeader");
149+
}
150+
if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
151+
// New-style block: one header followed by splitPayloadParts contiguous payloads.
152+
if (self.current.payloadIdx < hIdx + header->splitPayloadParts) {
153+
// More sub-payloads remain in this block.
154+
return {hIdx, self.current.payloadIdx + 1};
155+
}
156+
// Last sub-payload consumed; move to the first pair of the next block.
157+
size_t nextHIdx = hIdx + header->splitPayloadParts + 1;
158+
return {nextHIdx, nextHIdx + 1};
159+
}
160+
// Old-style [header, payload] pairs or a single pair: advance by two messages.
161+
return {hIdx + 2, hIdx + 3};
162+
}
163+
};
164+
130165
struct get_dataref_indices {
131166
size_t part;
132167
size_t subPart;

Framework/Core/include/Framework/DataRef.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#define FRAMEWORK_DATAREF_H
1313

1414
#include <cstddef> // for size_t
15+
#include <compare>
1516

1617
namespace o2
1718
{
@@ -29,6 +30,15 @@ struct DataRef {
2930
size_t payloadSize = 0;
3031
};
3132

33+
/// Raw indices into the message vector for one (header, payload) pair.
34+
/// Kept in a lightweight header so InputSpan can use it without pulling in FairMQ.
35+
struct DataRefIndices {
36+
size_t headerIdx;
37+
size_t payloadIdx;
38+
bool operator==(const DataRefIndices&) const = default;
39+
auto operator<=>(const DataRefIndices&) const = default;
40+
};
41+
3242
} // namespace framework
3343
} // namespace o2
3444

Framework/Core/include/Framework/InputRecord.h

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include "Framework/DataRef.h"
1515
#include "Framework/DataRefUtils.h"
16+
#include "Framework/InputSpan.h"
1617
#include "Framework/InputRoute.h"
1718
#include "Framework/TypeTraits.h"
1819
#include "Framework/TableConsumer.h"
@@ -202,6 +203,15 @@ class InputRecord
202203

203204
[[nodiscard]] size_t getNofParts(int pos) const;
204205

206+
/// O(1) access to the part described by @a indices in slot @a pos.
207+
[[nodiscard]] DataRef getAtIndices(int pos, DataRefIndices indices) const;
208+
209+
/// O(1) advance from @a current to the next part's indices in slot @a pos.
210+
[[nodiscard]] DataRefIndices nextIndices(int pos, DataRefIndices current) const
211+
{
212+
return mSpan.nextIndices(pos, current);
213+
}
214+
205215
// Given a binding by string, return the associated DataRef
206216
DataRef getDataRefByString(const char* bindingName, int part = 0) const
207217
{
@@ -568,8 +578,8 @@ class InputRecord
568578

569579
Iterator() = delete;
570580

571-
Iterator(ParentType const* parent, size_t position = 0, size_t size = 0)
572-
: mPosition(position), mSize(size > position ? size : position), mParent(parent), mElement{nullptr, nullptr, nullptr}
581+
Iterator(ParentType const* parent, bool isEnd = false)
582+
: mPosition(isEnd ? parent->size() : 0), mSize(parent->size()), mParent(parent), mElement{nullptr, nullptr, nullptr}
573583
{
574584
if (mPosition < mSize) {
575585
if (mParent->isValid(mPosition)) {
@@ -678,18 +688,29 @@ class InputRecord
678688
using reference = typename BaseType::reference;
679689
using pointer = typename BaseType::pointer;
680690
using ElementType = typename std::remove_const<value_type>::type;
681-
using iterator = Iterator<SelfType, T>;
682-
using const_iterator = Iterator<SelfType, const T>;
691+
using iterator = InputSpan::Iterator<SelfType, T>;
692+
using const_iterator = InputSpan::Iterator<SelfType, const T>;
693+
694+
InputRecordIterator(InputRecord const* parent, bool isEnd = false)
695+
: BaseType(parent, isEnd)
696+
{
697+
}
698+
699+
/// Initial indices for part-level iteration: first part starts at {headerIdx=0, payloadIdx=1}.
700+
[[nodiscard]] DataRefIndices initialIndices() const { return {0, 1}; }
701+
/// Sentinel used by nextIndicesGetter to signal end-of-slot.
702+
[[nodiscard]] DataRefIndices endIndices() const { return {size_t(-1), size_t(-1)}; }
683703

684-
InputRecordIterator(InputRecord const* parent, size_t position = 0, size_t size = 0)
685-
: BaseType(parent, position, size)
704+
/// Get element at the given raw message indices in O(1).
705+
[[nodiscard]] ElementType getAtIndices(DataRefIndices indices) const
686706
{
707+
return this->parent()->getAtIndices(this->position(), indices);
687708
}
688709

689-
/// Get element at {slotindex, partindex}
690-
[[nodiscard]] ElementType getByPos(size_t pos) const
710+
/// Advance @a current to the next part's indices in O(1).
711+
[[nodiscard]] DataRefIndices nextIndices(DataRefIndices current) const
691712
{
692-
return this->parent()->getByPos(this->position(), pos);
713+
return this->parent()->nextIndices(this->position(), current);
693714
}
694715

695716
/// Check if slot is valid, index of part is not used
@@ -709,12 +730,12 @@ class InputRecord
709730

710731
[[nodiscard]] const_iterator begin() const
711732
{
712-
return const_iterator(this, 0, size());
733+
return const_iterator(this, size() == 0);
713734
}
714735

715736
[[nodiscard]] const_iterator end() const
716737
{
717-
return const_iterator(this, size());
738+
return const_iterator(this, true);
718739
}
719740
};
720741

@@ -723,12 +744,12 @@ class InputRecord
723744

724745
[[nodiscard]] const_iterator begin() const
725746
{
726-
return {this, 0, size()};
747+
return {this, false};
727748
}
728749

729750
[[nodiscard]] const_iterator end() const
730751
{
731-
return {this, size()};
752+
return {this, true};
732753
}
733754

734755
InputSpan& span()

0 commit comments

Comments
 (0)