Skip to content

Commit 6adcccb

Browse files
authored
Merge 0959691 into sapling-pr-archive-ktf
2 parents e39bf20 + 0959691 commit 6adcccb

File tree

2 files changed

+200
-0
lines changed

2 files changed

+200
-0
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ add_executable(o2-test-framework-core
238238
test/test_IndexBuilder.cxx
239239
test/test_InputRecord.cxx
240240
test/test_InputRecordWalker.cxx
241+
test/test_DataModelViews.cxx
241242
test/test_InputSpan.cxx
242243
test/test_InputSpec.cxx
243244
test/test_LogParsingHelpers.cxx
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/DataModelViews.h"
13+
#include "Framework/DataProcessingHeader.h"
14+
#include "Headers/DataHeader.h"
15+
#include "Headers/Stack.h"
16+
#include <fairmq/TransportFactory.h>
17+
#include <cstring>
18+
#include <catch_amalgamated.hpp>
19+
20+
using namespace o2::framework;
21+
using DataHeader = o2::header::DataHeader;
22+
using Stack = o2::header::Stack;
23+
24+
namespace
25+
{
26+
// Build a header message containing a DataHeader with the given split-payload fields.
27+
fair::mq::MessagePtr makeHeader(fair::mq::TransportFactory& transport,
28+
uint32_t splitPayloadParts, uint32_t splitPayloadIndex)
29+
{
30+
DataHeader dh;
31+
dh.dataDescription = "TEST";
32+
dh.dataOrigin = "TST";
33+
dh.subSpecification = 0;
34+
dh.splitPayloadParts = splitPayloadParts;
35+
dh.splitPayloadIndex = splitPayloadIndex;
36+
DataProcessingHeader dph{0, 1};
37+
Stack stack{dh, dph};
38+
auto msg = transport.CreateMessage(stack.size());
39+
memcpy(msg->GetData(), stack.data(), stack.size());
40+
return msg;
41+
}
42+
43+
fair::mq::MessagePtr makePayload(fair::mq::TransportFactory& transport)
44+
{
45+
return transport.CreateMessage(4);
46+
}
47+
} // namespace
48+
49+
// ---------------------------------------------------------------------------
50+
// Single [header, payload] pair (splitPayloadParts == 0)
51+
// ---------------------------------------------------------------------------
52+
TEST_CASE("SinglePair")
53+
{
54+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
55+
56+
std::vector<fair::mq::MessagePtr> msgs;
57+
msgs.emplace_back(makeHeader(*transport, 0, 0));
58+
msgs.emplace_back(makePayload(*transport));
59+
60+
REQUIRE((msgs | count_parts{}) == 1);
61+
REQUIRE((msgs | count_payloads{}) == 1);
62+
REQUIRE((msgs | get_num_payloads{0}) == 1);
63+
64+
auto idx = msgs | get_pair{0};
65+
REQUIRE(idx.headerIdx == 0);
66+
REQUIRE(idx.payloadIdx == 1);
67+
68+
// Advancing past the only pair goes out of range.
69+
auto next = msgs | get_next_pair{idx};
70+
REQUIRE(next.headerIdx >= msgs.size());
71+
}
72+
73+
// ---------------------------------------------------------------------------
74+
// Old-style multipart: N [header, payload] pairs, each with splitPayloadParts=N
75+
// and splitPayloadIndex running 0..N-1 (0-indexed).
76+
// The new-style sentinel is splitPayloadIndex == splitPayloadParts, which is
77+
// never true for old-style (max index is N-1 < N).
78+
// Layout: [h0,p0, h1,p1, h2,p2]
79+
// count_parts returns N because each [h,p] pair is a separate logical part.
80+
// ---------------------------------------------------------------------------
81+
TEST_CASE("OldStyleMultipart")
82+
{
83+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
84+
constexpr uint32_t N = 3;
85+
86+
std::vector<fair::mq::MessagePtr> msgs;
87+
for (uint32_t i = 0; i < N; ++i) {
88+
msgs.emplace_back(makeHeader(*transport, N, i)); // 0-indexed
89+
msgs.emplace_back(makePayload(*transport));
90+
}
91+
92+
REQUIRE((msgs | count_parts{}) == N); // N separate logical parts
93+
REQUIRE((msgs | count_payloads{}) == N); // one payload each
94+
for (uint32_t i = 0; i < N; ++i) {
95+
REQUIRE((msgs | get_num_payloads{i}) == 1);
96+
}
97+
98+
// get_pair reaches each sub-part directly.
99+
for (uint32_t i = 0; i < N; ++i) {
100+
auto idx = msgs | get_pair{i};
101+
REQUIRE(idx.headerIdx == 2 * i);
102+
REQUIRE(idx.payloadIdx == 2 * i + 1);
103+
}
104+
105+
// get_next_pair advances sequentially through all pairs.
106+
DataRefIndices idx{0, 1};
107+
for (uint32_t i = 1; i < N; ++i) {
108+
idx = msgs | get_next_pair{idx};
109+
REQUIRE(idx.headerIdx == 2 * i);
110+
REQUIRE(idx.payloadIdx == 2 * i + 1);
111+
}
112+
// One more step goes out of range.
113+
idx = msgs | get_next_pair{idx};
114+
REQUIRE(idx.headerIdx >= msgs.size());
115+
}
116+
117+
// ---------------------------------------------------------------------------
118+
// New-style multipart: one header followed by N contiguous payloads.
119+
// splitPayloadParts == splitPayloadIndex == N (the sentinel for new style).
120+
// Layout: [h, p0, p1, p2]
121+
// ---------------------------------------------------------------------------
122+
TEST_CASE("NewStyleMultiPayload")
123+
{
124+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
125+
constexpr uint32_t N = 3;
126+
127+
std::vector<fair::mq::MessagePtr> msgs;
128+
msgs.emplace_back(makeHeader(*transport, N, N));
129+
for (uint32_t i = 0; i < N; ++i) {
130+
msgs.emplace_back(makePayload(*transport));
131+
}
132+
133+
REQUIRE((msgs | count_parts{}) == 1);
134+
REQUIRE((msgs | count_payloads{}) == N);
135+
REQUIRE((msgs | get_num_payloads{0}) == N); // all payloads belong to part 0
136+
137+
// get_pair returns the same header for every sub-part, advancing payloadIdx.
138+
for (uint32_t i = 0; i < N; ++i) {
139+
auto idx = msgs | get_pair{i};
140+
REQUIRE(idx.headerIdx == 0);
141+
REQUIRE(idx.payloadIdx == 1 + i);
142+
}
143+
144+
// get_next_pair advances payloadIdx within the block, then moves to next block.
145+
DataRefIndices idx{0, 1};
146+
for (uint32_t i = 1; i < N; ++i) {
147+
idx = msgs | get_next_pair{idx};
148+
REQUIRE(idx.headerIdx == 0);
149+
REQUIRE(idx.payloadIdx == 1 + i);
150+
}
151+
// One more step exits the block.
152+
idx = msgs | get_next_pair{idx};
153+
REQUIRE(idx.headerIdx >= msgs.size());
154+
}
155+
156+
// ---------------------------------------------------------------------------
157+
// Mixed message set: two routes, one single-pair and one new-style block.
158+
// Layout: [h0, p0, h1, p1_0, p1_1]
159+
// ---------------------------------------------------------------------------
160+
TEST_CASE("MixedLayout")
161+
{
162+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
163+
164+
std::vector<fair::mq::MessagePtr> msgs;
165+
// Route 0: single pair
166+
msgs.emplace_back(makeHeader(*transport, 0, 0));
167+
msgs.emplace_back(makePayload(*transport));
168+
// Route 1: new-style 2-payload block
169+
msgs.emplace_back(makeHeader(*transport, 2, 2));
170+
msgs.emplace_back(makePayload(*transport));
171+
msgs.emplace_back(makePayload(*transport));
172+
173+
REQUIRE((msgs | count_parts{}) == 2);
174+
REQUIRE((msgs | count_payloads{}) == 3);
175+
176+
// get_pair across routes
177+
auto idx0 = msgs | get_pair{0};
178+
REQUIRE(idx0.headerIdx == 0);
179+
REQUIRE(idx0.payloadIdx == 1);
180+
181+
auto idx1 = msgs | get_pair{1};
182+
REQUIRE(idx1.headerIdx == 2);
183+
REQUIRE(idx1.payloadIdx == 3);
184+
185+
auto idx2 = msgs | get_pair{2};
186+
REQUIRE(idx2.headerIdx == 2);
187+
REQUIRE(idx2.payloadIdx == 4);
188+
189+
// get_next_pair traversal from the first element
190+
DataRefIndices idx{0, 1};
191+
idx = msgs | get_next_pair{idx};
192+
REQUIRE(idx.headerIdx == 2);
193+
REQUIRE(idx.payloadIdx == 3);
194+
idx = msgs | get_next_pair{idx};
195+
REQUIRE(idx.headerIdx == 2);
196+
REQUIRE(idx.payloadIdx == 4);
197+
idx = msgs | get_next_pair{idx};
198+
REQUIRE(idx.headerIdx >= msgs.size());
199+
}

0 commit comments

Comments
 (0)