forked from AliceO2Group/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathChunkedDigitPublisher.cxx
More file actions
371 lines (318 loc) · 13.4 KB
/
ChunkedDigitPublisher.cxx
File metadata and controls
371 lines (318 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
/// @author Sandro Wenzel
/// @since 2021-03-10
/// @brief Takes TPC digit chunks (such drift times) --> accumulates to digit timeframe format --> publishes
#include "Framework/WorkflowSpec.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/Task.h"
#include "Framework/DataAllocator.h"
#include "Framework/ControlService.h"
#include "DataFormatsTPC/Digit.h"
#include "TPCSimWorkflow/TPCDigitRootWriterSpec.h"
#include "CommonUtils/ConfigurableParam.h"
#include "DetectorsRaw/HBFUtilsInitializer.h"
#include "TPCSimulation/CommonMode.h"
#include "DetectorsBase/Detector.h"
#include <SimulationDataFormat/MCCompLabel.h>
#include <SimulationDataFormat/MCTruthContainer.h>
#include <SimulationDataFormat/ConstMCTruthContainer.h>
#include <CommonUtils/FileSystemUtils.h>
#include "Algorithm/RangeTokenizer.h"
#include "TPCBase/Sector.h"
#include <TFile.h>
#include <TTree.h>
#include <TBranch.h>
#include <stdexcept>
#include <string>
#include <vector>
#include <utility>
#include <numeric>
#include <TROOT.h>
#ifdef NDEBUG
#undef NDEBUG
#endif
#include <cassert>
#ifdef WITH_OPENMP
#include <omp.h>
#endif
#include <TStopwatch.h>
#include "CommonDataFormat/RangeReference.h"
using DigiGroupRef = o2::dataformats::RangeReference<int, int>;
using SubSpecificationType = o2::framework::DataAllocator::SubSpecificationType;
using namespace o2::framework;
using namespace o2::header;
void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
{
o2::raw::HBFUtilsInitializer::addNewTimeSliceCallback(policies);
}
// we need to add workflow options before including Framework/runDataProcessing
void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
{
// for the TPC it is useful to take at most half of the available (logical) cores due to memory requirements
int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
std::string laneshelp("Number of tpc processing lanes. A lane is a pipeline of algorithms.");
workflowOptions.push_back(
ConfigParamSpec{"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
std::string sectorshelp("List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
std::string sectorDefault = "0-" + std::to_string(o2::tpc::Sector::MAXSECTOR - 1);
workflowOptions.push_back(
ConfigParamSpec{"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
// option to write merged data to file
workflowOptions.push_back(ConfigParamSpec{"writer-mode", o2::framework::VariantType::Bool, false, {"enable ROOT file output"}});
// option to disable MC truth
workflowOptions.push_back(ConfigParamSpec{"disable-mc", o2::framework::VariantType::Bool, false, {"disable mc-truth"}});
workflowOptions.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings ..."}});
o2::raw::HBFUtilsInitializer::addConfigOption(workflowOptions);
}
// ------------------------------------------------------------------
#include "Framework/runDataProcessing.h"
#include "DataFormatsTPC/TPCSectorHeader.h"
using MCTruthContainer = o2::dataformats::MCTruthContainer<o2::MCCompLabel>;
namespace o2
{
namespace tpc
{
template <typename T, typename R>
void copyHelper(T const& origin, R& target)
{
// Using critical section here as this is writing to shared mem
// and not sure if boost shared mem allocator is thread-safe.
// It was crashing without this.
#pragma omp critical
std::copy(origin.begin(), origin.end(), std::back_inserter(target));
}
template <>
void copyHelper<MCTruthContainer>(MCTruthContainer const& origin, MCTruthContainer& target)
{
target.mergeAtBack(origin);
}
// a trait to map TPC data types to a DPL channel name
template <typename T>
struct OutputChannelName;
template <>
struct OutputChannelName<std::vector<o2::tpc::Digit>> {
static constexpr char value[] = "DIGITS";
};
template <>
struct OutputChannelName<std::vector<o2::tpc::CommonMode>> {
static constexpr char value[] = "COMMONMODE";
};
template <>
struct OutputChannelName<std::vector<DigiGroupRef>> {
static constexpr char value[] = "DIGTRIGGERS";
};
template <typename T>
auto makePublishBuffer(framework::ProcessingContext& pc, int sector, uint64_t activeSectors)
{
LOG(info) << "PUBLISHING SECTOR " << sector << " FOR CHANNEL " << OutputChannelName<T>::value;
o2::tpc::TPCSectorHeader header{sector};
header.activeSectors = activeSectors;
return &pc.outputs().make<T>(Output{"TPC", OutputChannelName<T>::value, static_cast<SubSpecificationType>(sector), header});
}
template <>
auto makePublishBuffer<MCTruthContainer>(framework::ProcessingContext& pc, int sector, uint64_t activeSectors)
{
return new MCTruthContainer();
}
template <typename T>
void publishBuffer(framework::ProcessingContext& pc, int sector, uint64_t activeSectors, T* accum)
{
// nothing by default
}
template <>
void publishBuffer<MCTruthContainer>(framework::ProcessingContext& pc, int sector, uint64_t activeSectors, MCTruthContainer* accum)
{
LOG(info) << "PUBLISHING MC LABELS " << accum->getNElements();
o2::tpc::TPCSectorHeader header{sector};
header.activeSectors = activeSectors;
using LabelType = std::decay_t<decltype(pc.outputs().make<o2::dataformats::ConstMCTruthContainer<o2::MCCompLabel>>(Output{"", "", 0}))>;
LabelType* sharedlabels;
#pragma omp critical
sharedlabels = &pc.outputs().make<o2::dataformats::ConstMCTruthContainer<o2::MCCompLabel>>(
Output{"TPC", "DIGITSMCTR", static_cast<SubSpecificationType>(sector), header});
accum->flatten_to(*sharedlabels);
delete accum;
}
template <typename T>
void mergeHelper(const char* brprefix, std::vector<int> const& tpcsectors, uint64_t activeSectors,
TFile& originfile, framework::ProcessingContext& pc)
{
auto keyslist = originfile.GetListOfKeys();
for (int i = 0; i < keyslist->GetEntries(); ++i) {
auto key = keyslist->At(i);
int sector = atoi(key->GetName());
if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
// do nothing if sector not wanted
continue;
}
auto oldtree = (TTree*)originfile.Get(key->GetName());
assert(oldtree);
std::stringstream digitbrname;
digitbrname << brprefix << key->GetName();
auto br = oldtree->GetBranch(digitbrname.str().c_str());
if (!br) {
continue;
}
T* chunk = nullptr;
br->SetAddress(&chunk);
using AccumType = std::decay_t<decltype(makePublishBuffer<T>(pc, sector, activeSectors))>;
AccumType accum;
#pragma omp critical
accum = makePublishBuffer<T>(pc, sector, activeSectors);
for (auto e = 0; e < br->GetEntries(); ++e) {
br->GetEntry(e);
copyHelper(*chunk, *accum);
delete chunk;
chunk = nullptr;
}
br->ResetAddress();
br->DropBaskets("all");
delete oldtree;
// some data (labels are published slightly differently)
publishBuffer(pc, sector, activeSectors, accum);
}
}
template <>
void mergeHelper<std::vector<DigiGroupRef>>(const char* brprefix, std::vector<int> const& tpcsectors, uint64_t activeSectors,
TFile& originfile, framework::ProcessingContext& pc)
{
// specialization for TPC Trigger
auto keyslist = originfile.GetListOfKeys();
for (int i = 0; i < keyslist->GetEntries(); ++i) {
auto key = keyslist->At(i);
int sector = atoi(key->GetName());
if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
// do nothing if sector not wanted
continue;
}
using AccumType = std::decay_t<decltype(makePublishBuffer<std::vector<DigiGroupRef>>(pc, sector, activeSectors))>;
AccumType accum;
#pragma omp critical
accum = makePublishBuffer<std::vector<DigiGroupRef>>(pc, sector, activeSectors);
// no actual data sent. Continuous mode.
publishBuffer(pc, sector, activeSectors, accum);
}
}
void publishMergedTimeframes(std::vector<int> const& lanes, std::vector<int> const& tpcsectors, bool domctruth, framework::ProcessingContext& pc)
{
uint64_t activeSectors = 0;
for (auto s : tpcsectors) {
activeSectors |= (uint64_t)0x1 << s;
}
ROOT::EnableThreadSafety();
// we determine the exact input list of files
auto digitfilelist = o2::utils::listFiles("tpc_driftime_digits_lane.*.root$");
#ifdef WITH_OPENMP
omp_set_num_threads(std::min(lanes.size(), digitfilelist.size()));
LOG(info) << "Running digit publisher with OpenMP enabled";
#pragma omp parallel for schedule(dynamic)
#endif
for (size_t fi = 0; fi < digitfilelist.size(); ++fi) {
auto& filename = digitfilelist[fi];
LOG(debug) << "MERGING CHUNKED DIGITS FROM FILE " << filename;
auto originfile = new TFile(filename.c_str(), "OPEN");
assert(originfile);
// data definitions
using DigitsType = std::vector<o2::tpc::Digit>;
using LabelType = o2::dataformats::MCTruthContainer<o2::MCCompLabel>;
mergeHelper<DigitsType>("TPCDigit_", tpcsectors, activeSectors, *originfile, pc);
if (domctruth) {
mergeHelper<LabelType>("TPCDigitMCTruth_", tpcsectors, activeSectors, *originfile, pc);
}
// we also merge common modes and publish a (fake) trigger entry
using CommonModeType = std::vector<o2::tpc::CommonMode>;
mergeHelper<CommonModeType>("TPCCommonMode_", tpcsectors, activeSectors, *originfile, pc);
using TriggerType = std::vector<DigiGroupRef>;
mergeHelper<TriggerType>("TPCCommonMode_", tpcsectors, activeSectors, *originfile, pc);
originfile->Close();
delete originfile;
}
}
class Task
{
public:
Task(std::vector<int> laneConfig, std::vector<int> tpcsectors, bool mctruth) : mLanes(laneConfig), mTPCSectors(tpcsectors), mDoMCTruth(mctruth)
{
}
void run(framework::ProcessingContext& pc)
{
LOG(info) << "Preparing digits (from digit chunks) for reconstruction";
TStopwatch w;
w.Start();
publishMergedTimeframes(mLanes, mTPCSectors, mDoMCTruth, pc);
pc.services().get<ControlService>().endOfStream();
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
LOG(info) << "DIGIT PUBLISHING TOOK " << w.RealTime();
return;
}
void init(framework::InitContext& ctx)
{
}
private:
bool mDoMCTruth = true;
std::vector<int> mLanes;
std::vector<int> mTPCSectors;
};
/// create the processor spec
/// describing a processor aggregating digits for various TPC sectors and writing them to file
/// MC truth information is also aggregated and written out
DataProcessorSpec getSpec(std::vector<int> const& laneConfiguration, std::vector<int> const& tpcsectors, bool mctruth, bool publish = true)
{
// data definitions
using DigitsOutputType = std::vector<o2::tpc::Digit>;
using CommonModeOutputType = std::vector<o2::tpc::CommonMode>;
std::vector<OutputSpec> outputs; // define channel by triple of (origin, type id of data to be sent on this channel, subspecification)
if (publish) {
// effectively the input expects one sector per subspecification
for (int s = 0; s < 36; ++s) {
OutputLabel binding{std::to_string(s)};
outputs.emplace_back("TPC", "DIGITS", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
if (mctruth) {
outputs.emplace_back("TPC", "DIGITSMCTR", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
}
// common mode
outputs.emplace_back("TPC", "COMMONMODE", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
// trigger records
outputs.emplace_back("TPC", "DIGTRIGGERS", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
}
}
return DataProcessorSpec{
"TPCDigitMerger", {}, outputs, AlgorithmSpec{o2::framework::adaptFromTask<Task>(laneConfiguration, tpcsectors, mctruth)}, Options{}};
}
} // end namespace tpc
} // end namespace o2
WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
{
WorkflowSpec specs;
o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
auto numlanes = configcontext.options().get<int>("tpc-lanes");
bool mctruth = !configcontext.options().get<bool>("disable-mc");
bool writeout = configcontext.options().get<bool>("writer-mode");
auto tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.options().get<std::string>("tpc-sectors"));
std::vector<int> lanes(numlanes);
std::iota(lanes.begin(), lanes.end(), 0);
specs.emplace_back(o2::tpc::getSpec(lanes, tpcsectors, mctruth));
if (writeout) {
// for now writeout to a ROOT file only works if all sectors
// are included
if (tpcsectors.size() != 36) {
LOG(error) << "You currently need to include all TPC sectors in the ROOT writer-mode";
} else {
std::vector<int> writerlanes(tpcsectors.size());
std::iota(writerlanes.begin(), writerlanes.end(), 0);
specs.emplace_back(o2::tpc::getTPCDigitRootWriterSpec(writerlanes, mctruth));
}
}
// configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
o2::raw::HBFUtilsInitializer hbfIni(configcontext, specs);
return specs;
}