diff --git a/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h new file mode 100644 index 0000000000000..109eff2654466 --- /dev/null +++ b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h @@ -0,0 +1,122 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMV.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Common mode values data format definition + +/// The data is sent by the CRU as 256+16 bit words. The CMV data layout is as follows: +/// - 256-bit Header: [version:8][packetID:8][errorCode:8][magicWord:8][heartbeatOrbit:32][heartbeatBC:16][padding:176] +/// - 16-bit CMV value: [sign:1][I8F7:15] where bit 15 is the sign (1=positive, 0=negative) and the lower 15 bits are a fixed point I8F7 value (8 integer bits, 7 fractional bits) +/// Float conversion: sign ? (value & 0x7FFF) / 128.0 : -(value & 0x7FFF) / 128.0 + +#ifndef ALICEO2_DATAFORMATSTPC_CMV_H +#define ALICEO2_DATAFORMATSTPC_CMV_H + +#include +#include + +namespace o2::tpc::cmv +{ + +static constexpr uint32_t NTimeBinsPerPacket = 3564; ///< number of time bins (covering 8 heartbeats) +static constexpr uint32_t NPacketsPerTFPerCRU = 4; ///< 4 packets per timeframe +static constexpr uint32_t NTimeBinsPerTF = NTimeBinsPerPacket * NPacketsPerTFPerCRU; ///< maximum number of timebins per timeframe (14256) + +/// Data padding: NTimeBinsPerPacket * sizeof(Data) = 3564 * 2 = 7128 bytes +static constexpr uint32_t DataSizeBytes = NTimeBinsPerPacket * sizeof(uint16_t); ///< 7128 bytes +static constexpr uint32_t DataPaddingBytes = (32 - (DataSizeBytes % 32)) % 32; ///< 8 bytes + +/// Header definition of the CMVs +struct Header { + static constexpr uint8_t MagicWord = 0xDC; + union { + uint64_t word0 = 0; ///< bits 0 - 63 + struct { + uint8_t version : 8; ///< version + uint8_t packetID : 8; ///< packet id + uint8_t errorCode : 8; ///< errors + uint8_t magicWord : 8; ///< magic word + uint32_t heartbeatOrbit : 32; ///< first heart beat timing of the package + }; + }; + union { + uint64_t word1 = 0; ///< bits 64 - 127 + struct { + uint16_t heartbeatBC : 16; ///< first BC id of the package + uint16_t unused1 : 16; ///< reserved + uint32_t unused2 : 32; ///< reserved + }; + }; + union { + uint64_t word3 = 0; ///< bits 128 - 191 + struct { + uint64_t unused3 : 64; ///< reserved + }; + }; + union { + uint64_t word4 = 0; ///< bits 192 - 255 + struct { + uint64_t unused4 : 64; ///< reserved + }; + }; +}; + +/// CMV single data container +struct Data { + uint16_t cmv{0}; ///< 16-bit signed fixed point value: bit 15 = sign (1=positive, 0=negative), bits 14-0 = I8F7 magnitude + + uint16_t getCMV() const { return cmv; } ///< raw 16-bit integer representation + void setCMV(uint16_t value) { cmv = value; } ///< set raw 16-bit integer representation + + // Decode to float: sign-magnitude with 7 fractional bits, range ±255.992 + float getCMVFloat() const + { + const bool positive = (cmv >> 15) & 1; // bit 15: sign (1=positive, 0=negative) + const float magnitude = (cmv & 0x7FFF) / 128.f; // lower 15 bits, shift right by 7 (divide by 2^7) + return positive ? magnitude : -magnitude; + } + + // Encode from float: clamps magnitude to 15 bits, range ±255.992 + void setCMVFloat(float value) + { + const bool positive = (value >= 0.f); + const uint16_t magnitude = static_cast(std::abs(value) * 128.f + 0.5f) & 0x7FFF; + cmv = (positive ? 0x8000 : 0x0000) | magnitude; + } +}; + +/// CMV full data container: one packet carries NTimeBinsPerPacket CMV values followed by padding +/// Layout: Header (32 bytes) + Data[NTimeBinsPerPacket] (7128 bytes) + padding (8 bytes) = 7168 bytes total (224 * 32 = 7168) +/// The padding bytes at the end of the data array are rubbish/unused and must not be interpreted as CMV values +struct Container { + Header header; ///< CMV data header + Data data[NTimeBinsPerPacket]; ///< data values + uint8_t padding[DataPaddingBytes]{}; ///< trailing padding to align data to 32-byte boundary + + // Header and data accessors + const Header& getHeader() const { return header; } + Header& getHeader() { return header; } + + const Data* getData() const { return data; } + Data* getData() { return data; } + + // Per timebin CMV accessors + uint16_t getCMV(uint32_t timeBin) const { return data[timeBin].getCMV(); } + void setCMV(uint32_t timeBin, uint16_t value) { data[timeBin].setCMV(value); } + + float getCMVFloat(uint32_t timeBin) const { return data[timeBin].getCMVFloat(); } + void setCMVFloat(uint32_t timeBin, float value) { data[timeBin].setCMVFloat(value); } +}; + +} // namespace o2::tpc::cmv + +#endif \ No newline at end of file diff --git a/Detectors/TPC/base/include/TPCBase/RDHUtils.h b/Detectors/TPC/base/include/TPCBase/RDHUtils.h index adfd94cf6b703..71b5d16b85702 100644 --- a/Detectors/TPC/base/include/TPCBase/RDHUtils.h +++ b/Detectors/TPC/base/include/TPCBase/RDHUtils.h @@ -13,7 +13,7 @@ #define AliceO2_TPC_RDHUtils_H #include "DetectorsRaw/RDHUtils.h" -//#include "Headers/RAWDataHeader.h" +// #include "Headers/RAWDataHeader.h" namespace o2 { @@ -28,6 +28,7 @@ static constexpr FEEIDType UserLogicLinkID = 15; ///< virtual link ID for ZS dat static constexpr FEEIDType IDCLinkID = 20; ///< Identifier for integrated digital currents static constexpr FEEIDType ILBZSLinkID = 21; ///< Identifier for improved link-based ZS static constexpr FEEIDType DLBZSLinkID = 22; ///< Identifier for dense link-based ZS +static constexpr FEEIDType CMVLinkID = 23; ///< Identifier for common mode values static constexpr FEEIDType SACLinkID = 25; ///< Identifier for sampled analog currents /// compose feeid from cru, endpoint and link diff --git a/Detectors/TPC/calibration/CMakeLists.txt b/Detectors/TPC/calibration/CMakeLists.txt index 27f7f0200bb92..a1068b928780d 100644 --- a/Detectors/TPC/calibration/CMakeLists.txt +++ b/Detectors/TPC/calibration/CMakeLists.txt @@ -58,6 +58,7 @@ o2_add_library(TPCCalibration src/DigitAdd.cxx src/CorrectdEdxDistortions.cxx src/PressureTemperatureHelper.cxx + src/CMVContainer.cxx PUBLIC_LINK_LIBRARIES O2::DataFormatsTPC O2::TPCBaseRecSim O2::TPCReconstruction ROOT::Minuit Microsoft.GSL::GSL @@ -115,7 +116,8 @@ o2_target_root_dictionary(TPCCalibration include/TPCCalibration/TPCMShapeCorrection.h include/TPCCalibration/DigitAdd.h include/TPCCalibration/CorrectdEdxDistortions.h - include/TPCCalibration/PressureTemperatureHelper.h) + include/TPCCalibration/PressureTemperatureHelper.h + include/TPCCalibration/CMVContainer.h) o2_add_test_root_macro(macro/comparePedestalsAndNoise.C PUBLIC_LINK_LIBRARIES O2::TPCBaseRecSim @@ -153,6 +155,10 @@ o2_add_test_root_macro(macro/prepareITFiles.C COMPILE_ONLY PUBLIC_LINK_LIBRARIES O2::TPCCalibration LABELS tpc) +o2_add_test_root_macro(macro/drawCMV.C + COMPILE_ONLY + PUBLIC_LINK_LIBRARIES O2::TPCCalibration O2::TPCBase + LABELS tpc) o2_add_test(IDCFourierTransform COMPONENT_NAME calibration diff --git a/Detectors/TPC/calibration/include/TPCCalibration/CMVContainer.h b/Detectors/TPC/calibration/include/TPCCalibration/CMVContainer.h new file mode 100644 index 0000000000000..f1904c3db8f8d --- /dev/null +++ b/Detectors/TPC/calibration/include/TPCCalibration/CMVContainer.h @@ -0,0 +1,141 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVContainer.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Structs for storing CMVs to the CCDB + +#ifndef ALICEO2_TPC_CMVCONTAINER_H_ +#define ALICEO2_TPC_CMVCONTAINER_H_ + +#include +#include +#include +#include + +#include "TTree.h" +#include "TPCBase/CRU.h" +#include "DataFormatsTPC/CMV.h" + +namespace o2::tpc +{ + +struct CMVPerTF; // forward declaration +struct CMVPerTFCompressed; // forward declaration + +/// Bitmask flags describing which encoding stages are applied in CMVPerTFCompressed +struct CMVEncoding { + static constexpr uint8_t kNone = 0x00; ///< No compression — raw uint16 values stored flat + static constexpr uint8_t kSparse = 0x01; ///< Non-zero positions stored sparsely (varint-encoded deltas) + static constexpr uint8_t kDelta = 0x02; ///< Delta coding between consecutive values (dense only) + static constexpr uint8_t kZigzag = 0x04; ///< Zigzag encoding of deltas or signed values + static constexpr uint8_t kVarint = 0x08; ///< Varint compression of the value stream + static constexpr uint8_t kHuffman = 0x10; ///< Canonical Huffman compression of the value stream +}; + +/// Single compressed representation for one TF across all CRUs, stored in a TTree +/// mFlags is a bitmask of CMVEncoding values that fully describes the encoding pipeline +/// mData holds the encoded payload whose binary layout depends on mFlags: +/// +/// Dense path (!kSparse): +/// kZigzag absent → N × uint16_t LE (raw values, CRU-major order) +/// kZigzag + kVarint → N × varint(zigzag(delta(signed(raw)))) +/// kZigzag + kHuffman → [Huffman table] + [bitstream] of zigzag(delta(signed(raw))) +/// +/// Sparse path (kSparse): +/// 4 bytes LE uint32_t : posStreamSize +/// posStream: for each CRU: varint(N), N × varint(tb_delta) +/// valStream (one entry per non-zero): +/// default → uint16_t LE raw value +/// kZigzag + kVarint → varint(zigzag(signed(raw))) +/// kZigzag + kHuffman → [Huffman table] + [bitstream] of zigzag(signed(raw)) +struct CMVPerTFCompressed { + uint32_t firstOrbit{0}; ///< First orbit of this TF + uint16_t firstBC{0}; ///< First bunch crossing of this TF + uint8_t mFlags{0}; ///< Bitmask of CMVEncoding values + + std::vector mData; ///< Encoded payload + + /// Restore a CMVPerTF from this compressed object into *cmv (must not be null) + void decompress(CMVPerTF* cmv) const; + + /// Serialise into a TTree; each Fill() call appends one entry (one TF) + std::unique_ptr toTTree() const; + + private: + /// Decode the sparse position stream; advances ptr past the position block + /// Returns (cru, timeBin) pairs for every non-zero entry, in CRU-major order + static std::vector> decodeSparsePositions(const uint8_t*& ptr, const uint8_t* end); + + /// Decode the value stream into raw uint32_t symbols + /// Dispatches to Huffman, varint, or raw uint16 based on flags + static std::vector decodeValueStream(const uint8_t*& ptr, const uint8_t* end, uint32_t N, uint8_t flags); + + /// Apply inverse zigzag and scatter decoded values into the sparse positions of *cmv + static void decodeSparseValues(const std::vector& symbols, + const std::vector>& positions, + uint8_t flags, CMVPerTF* cmv); + + /// Apply inverse zigzag and inverse delta, then fill the full dense CMV array in *cmv + static void decodeDenseValues(const std::vector& symbols, uint8_t flags, CMVPerTF* cmv); + + public: + ClassDefNV(CMVPerTFCompressed, 1) +}; + +/// CMV data for one TF across all CRUs +/// Raw 16-bit CMV values are stored in a flat C array indexed as [cru * NTimeBinsPerTF + timeBin] +struct CMVPerTF { + uint32_t firstOrbit{0}; ///< First orbit of this TF, from heartbeatOrbit of the first CMV packet + uint16_t firstBC{0}; ///< First bunch crossing of this TF, from heartbeatBC of the first CMV packet + + // Raw 16-bit CMV values, flat array indexed as [cru * NTimeBinsPerTF + timeBin] + uint16_t mDataPerTF[CRU::MaxCRU * cmv::NTimeBinsPerTF]{}; + + /// Return the raw 16-bit CMV value for a given CRU and timebin within this TF + uint16_t getCMV(const int cru, const int timeBin) const; + + /// Return the float CMV value for a given CRU and timebin within this TF + float getCMVFloat(const int cru, const int timeBin) const; + + /// Zero out raw CMV values whose float magnitude is below threshold + void zeroSmallValues(float threshold = 1.0f); + + /// Round values to the nearest integer ADC for all values whose rounded magnitude is <= threshold + void roundToIntegers(uint16_t threshold); + + /// Quantise |v| with a Gaussian-CDF recovery profile: + /// Coarse decimal-style precision below and around mean, then a smooth return to the full native I8F7 precision as the magnitude increases with width sigma + void trimGaussianPrecision(float mean, float sigma); + + /// Compress this object into a CMVPerTFCompressed using the encoding pipeline described by flags + /// Quantisation (trimGaussianPrecision / roundToIntegers / zeroSmallValues) should be applied to this object before calling compress(); it is not part of the flags pipeline + CMVPerTFCompressed compress(uint8_t flags) const; + + /// Serialise into a TTree; each Fill() call appends one entry (one TF) + std::unique_ptr toTTree() const; + + /// Write the TTree to a ROOT file + static void writeToFile(const std::string& filename, const std::unique_ptr& tree); + + private: + static int32_t cmvToSigned(uint16_t raw); ///< Sign-magnitude uint16_t → signed integer + static uint16_t quantizeBelowThreshold(uint16_t raw, float quantizationMean, float quantizationSigma); ///< Quantise sub-threshold values with a Gaussian-shaped recovery to full precision + static uint32_t zigzagEncode(int32_t value); ///< Zigzag encode + static void encodeVarintInto(uint32_t value, std::vector& out); ///< Varint encode + + public: + ClassDefNV(CMVPerTF, 1) +}; + +} // namespace o2::tpc + +#endif // ALICEO2_TPC_CMVCONTAINER_H_ diff --git a/Detectors/TPC/calibration/macro/drawCMV.C b/Detectors/TPC/calibration/macro/drawCMV.C new file mode 100644 index 0000000000000..8a89157b75721 --- /dev/null +++ b/Detectors/TPC/calibration/macro/drawCMV.C @@ -0,0 +1,160 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#if !defined(__CLING__) || defined(__ROOTCLING__) +#include +#include +#include +#include + +#include "TFile.h" +#include "TParameter.h" +#include "TTree.h" +#include "TH1F.h" +#include "TH2F.h" +#include "TCanvas.h" + +#include "TPCCalibration/CMVContainer.h" +#include "TPCBase/Utils.h" +#endif + +using namespace o2::tpc; + +/// Draw CMV (Common Mode Values) vs timebin from a CCDB TTree file +/// \param filename input ROOT file containing the ccdb_object TTree +/// \param outDir output directory for saved plots; nothing is saved if empty +/// \return array of canvases +TObjArray* drawCMV(std::string_view filename, std::string_view outDir) +{ + TObjArray* arrCanvases = new TObjArray; + arrCanvases->SetName("CMV"); + + // open file + TFile f(filename.data(), "READ"); + if (f.IsZombie()) { + fmt::print("ERROR: cannot open '{}'\n", filename); + return arrCanvases; + } + fmt::print("Opened file: {}\n", filename); + + // get TTree + TTree* tree = nullptr; + f.GetObject("ccdb_object", tree); + if (!tree) { + fmt::print("ERROR: TTree 'ccdb_object' not found\n"); + return arrCanvases; + } + fmt::print("Tree 'ccdb_object' found, entries: {}\n", tree->GetEntries()); + + // read metadata + long firstTF = -1, lastTF = -1; + if (auto* userInfo = tree->GetUserInfo()) { + for (int i = 0; i < userInfo->GetSize(); ++i) { + if (auto* p = dynamic_cast*>(userInfo->At(i))) { + if (std::string(p->GetName()) == "firstTF") + firstTF = p->GetVal(); + if (std::string(p->GetName()) == "lastTF") + lastTF = p->GetVal(); + } + } + } + fmt::print("firstTF: {}, lastTF: {}\n", firstTF, lastTF); + + const int nEntries = tree->GetEntries(); + if (nEntries == 0) { + fmt::print("ERROR: no entries in tree\n"); + return arrCanvases; + } + + constexpr int nCRUs = CRU::MaxCRU; + constexpr int nTimeBins = cmv::NTimeBinsPerTF; + + TH2F* h2d = new TH2F("hCMVvsTimeBin", ";Timebin (200 ns);Common Mode Values (ADC)", + 100, 0, nTimeBins, + 110, -100.5, 9.5); + h2d->SetStats(1); + TH1F* h1d = new TH1F("hCMV", ";Common Mode Values (ADC);Counts", + 1100, -100.5, 9.5); + h1d->SetStats(1); + + // auto-detect branch format: compressed or raw + const bool isCompressed = (tree->GetBranch("CMVPerTFCompressed") != nullptr); + const bool isRaw = (tree->GetBranch("CMVPerTF") != nullptr); + if (!isCompressed && !isRaw) { + fmt::print("ERROR: no recognised branch found (expected 'CMVPerTFCompressed' or 'CMVPerTF')\n"); + return arrCanvases; + } + fmt::print("Branch format: {}\n", isCompressed ? "CMVPerTFCompressed" : "CMVPerTF (raw)"); + + o2::tpc::CMVPerTFCompressed* tfCompressed = nullptr; + o2::tpc::CMVPerTF* tfRaw = nullptr; + CMVPerTF* tfDecoded = isCompressed ? new CMVPerTF() : nullptr; + + if (isCompressed) { + tree->SetBranchAddress("CMVPerTFCompressed", &tfCompressed); + } else { + tree->SetBranchAddress("CMVPerTF", &tfRaw); + } + + long firstOrbit = -1; + + for (int i = 0; i < nEntries; ++i) { + tree->GetEntry(i); + + // Decompress if needed; resolve to a unified CMVPerTF pointer + const CMVPerTF* tf = nullptr; + if (isCompressed) { + tfCompressed->decompress(tfDecoded); + tf = tfDecoded; + } else { + tf = tfRaw; + } + + if (i == 0) { + firstOrbit = tf->firstOrbit; + } + + for (int cru = 0; cru < nCRUs; ++cru) { + for (int tb = 0; tb < nTimeBins; ++tb) { + const float cmvValue = tf->getCMVFloat(cru, tb); + h2d->Fill(tb, cmvValue); + h1d->Fill(cmvValue); + // fmt::print("cru: {}, tb: {}, cmv: {}\n", cru, tb, cmvValue); + } + } + } + + delete tfDecoded; + tree->ResetBranchAddresses(); + delete tfCompressed; + + fmt::print("firstOrbit: {}\n", firstOrbit); + + // draw + auto* c = new TCanvas("cCMVvsTimeBin", ""); + c->SetLogz(); + h2d->Draw("colz"); + + arrCanvases->Add(c); + + auto* c1 = new TCanvas("cCMVDistribution", ""); + c1->SetLogy(); + h1d->Draw(); + + arrCanvases->Add(c1); + + if (outDir.size()) { + utils::saveCanvases(*arrCanvases, outDir, "png,pdf", "CMVCanvases.root"); + } + + f.Close(); + return arrCanvases; +} diff --git a/Detectors/TPC/calibration/src/CMVContainer.cxx b/Detectors/TPC/calibration/src/CMVContainer.cxx new file mode 100644 index 0000000000000..5a3b8f1c63c3a --- /dev/null +++ b/Detectors/TPC/calibration/src/CMVContainer.cxx @@ -0,0 +1,729 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVContainer.cxx +/// @author Tuba Gündem, tuba.gundem@cern.ch + +#include +#include +#include +#include +#include +#include + +#include "TFile.h" + +#include "TPCCalibration/CMVContainer.h" +#include "TPCBase/CRU.h" +#include "DataFormatsTPC/CMV.h" + +namespace o2::tpc +{ + +// CMVPerTF private helpers + +int32_t CMVPerTF::cmvToSigned(uint16_t raw) +{ + const int32_t mag = raw & 0x7FFF; + return (raw >> 15) ? mag : -mag; +} + +uint16_t CMVPerTF::quantizeBelowThreshold(uint16_t raw, float quantizationMean, float quantizationSigma) +{ + if (raw == 0u) { + return raw; + } + + if (quantizationSigma <= 0.f) { + return raw; + } + + const float adc = (raw & 0x7FFFu) / 128.f; + const float distance = (adc - quantizationMean) / quantizationSigma; + const float lossStrength = std::exp(-0.5f * distance * distance); + + // A true Gaussian bell: strongest trimming around the mean, then gradual recovery away from it + float quantizedADC = adc; + if (lossStrength > 0.85f) { + quantizedADC = std::round(adc * 10.f) / 10.f; + } else if (lossStrength > 0.60f) { + quantizedADC = std::round(adc * 100.f) / 100.f; + } else if (lossStrength > 0.30f) { + quantizedADC = std::round(adc * 1000.f) / 1000.f; + } else if (lossStrength > 0.12f) { + quantizedADC = std::round(adc * 10000.f) / 10000.f; + } else if (lossStrength > 0.03f) { + quantizedADC = std::round(adc * 1000000.f) / 1000000.f; + } + + // Snap the chosen decimal-style value back to the nearest raw I8F7 level + const uint16_t quantizedMagnitude = static_cast(std::clamp(std::lround(quantizedADC * 128.f), 0l, 0x7FFFl)); + return static_cast((raw & 0x8000u) | quantizedMagnitude); +} + +uint32_t CMVPerTF::zigzagEncode(int32_t value) +{ + return (static_cast(value) << 1) ^ static_cast(value >> 31); +} + +void CMVPerTF::encodeVarintInto(uint32_t value, std::vector& out) +{ + while (value > 0x7F) { + out.push_back(static_cast((value & 0x7F) | 0x80)); + value >>= 7; + } + out.push_back(static_cast(value)); +} + +// Shared file-local helpers + +namespace +{ + +int32_t zigzagDecodeLocal(uint32_t value) +{ + return static_cast((value >> 1) ^ -(value & 1)); +} + +uint16_t signedToCmvLocal(int32_t val) +{ + const uint16_t mag = static_cast(std::abs(val)) & 0x7FFF; + return static_cast((val >= 0 ? 0x8000u : 0u) | mag); +} + +uint32_t decodeVarintLocal(const uint8_t*& data, const uint8_t* end) +{ + uint32_t value = 0; + int shift = 0; + while (data < end && (*data & 0x80)) { + value |= static_cast(*data & 0x7F) << shift; + shift += 7; + ++data; + } + if (data >= end) { + throw std::runtime_error("decodeVarintLocal: unexpected end of varint data"); + } + value |= static_cast(*data) << shift; + ++data; + return value; +} + +/// Build and serialise a canonical Huffman table + bitstream over `symbols` into `buf` +/// Format: +/// 4 bytes LE uint32_t : numSymbols +/// numSymbols × 5 bytes: symbol (4 bytes LE) + code length (1 byte) +/// 8 bytes LE uint64_t : totalBits +/// ceil(totalBits/8) bytes: MSB-first bitstream +void huffmanEncode(const std::vector& symbols, std::vector& buf) +{ + // Frequency count + std::map freq; + for (const uint32_t z : symbols) { + ++freq[z]; + } + + // Build tree using index-based min-heap + struct HNode { + uint64_t freq{0}; + uint32_t sym{0}; + int left{-1}, right{-1}; + bool isLeaf{true}; + }; + std::vector nodes; + nodes.reserve(freq.size() * 2); + for (const auto& [sym, f] : freq) { + nodes.push_back({f, sym, -1, -1, true}); + } + + auto cmp = [&](int a, int b) { + return nodes[a].freq != nodes[b].freq ? nodes[a].freq > nodes[b].freq : nodes[a].sym > nodes[b].sym; + }; + std::vector heap; + heap.reserve(nodes.size()); + for (int i = 0; i < static_cast(nodes.size()); ++i) { + heap.push_back(i); + } + std::make_heap(heap.begin(), heap.end(), cmp); + + while (heap.size() > 1) { + std::pop_heap(heap.begin(), heap.end(), cmp); + const int a = heap.back(); + heap.pop_back(); + std::pop_heap(heap.begin(), heap.end(), cmp); + const int b = heap.back(); + heap.pop_back(); + nodes.push_back({nodes[a].freq + nodes[b].freq, 0, a, b, false}); + heap.push_back(static_cast(nodes.size()) - 1); + std::push_heap(heap.begin(), heap.end(), cmp); + } + + // Assign code lengths via iterative DFS + std::map codeLens; + { + const int root = heap[0]; + std::vector> stack; + stack.push_back({root, 0}); + while (!stack.empty()) { + auto [idx, depth] = stack.back(); + stack.pop_back(); + if (nodes[idx].isLeaf) { + codeLens[nodes[idx].sym] = static_cast(depth == 0 ? 1 : depth); + } else { + stack.push_back({nodes[idx].left, depth + 1}); + stack.push_back({nodes[idx].right, depth + 1}); + } + } + } + + // Sort by (codeLen ASC, symbol ASC) for canonical assignment + struct SymLen { + uint32_t sym; + uint8_t len; + }; + std::vector symLens; + symLens.reserve(codeLens.size()); + for (const auto& [sym, len] : codeLens) { + symLens.push_back({sym, len}); + } + std::sort(symLens.begin(), symLens.end(), [](const SymLen& a, const SymLen& b) { + return a.len != b.len ? a.len < b.len : a.sym < b.sym; + }); + + // Assign canonical codes + std::map> codeTable; + { + uint32_t code = 0; + uint8_t prevLen = 0; + for (const auto& sl : symLens) { + if (prevLen != 0) { + code = (code + 1) << (sl.len - prevLen); + } + codeTable[sl.sym] = {code, sl.len}; + prevLen = sl.len; + } + } + + // Serialise table header + buf.reserve(buf.size() + 4 + symLens.size() * 5 + 8 + (symbols.size() / 8 + 1)); + const uint32_t numSym = static_cast(symLens.size()); + for (int i = 0; i < 4; ++i) { + buf.push_back(static_cast((numSym >> (8 * i)) & 0xFF)); + } + for (const auto& sl : symLens) { + for (int i = 0; i < 4; ++i) { + buf.push_back(static_cast((sl.sym >> (8 * i)) & 0xFF)); + } + buf.push_back(sl.len); + } + + // Placeholder for totalBits + const size_t totalBitsOffset = buf.size(); + for (int i = 0; i < 8; ++i) { + buf.push_back(0); + } + + // Encode bitstream (MSB-first) + uint64_t totalBits = 0; + uint8_t curByte = 0; + int bitsInByte = 0; + for (const uint32_t z : symbols) { + const auto& [code, len] = codeTable.at(z); + for (int b = static_cast(len) - 1; b >= 0; --b) { + curByte = static_cast(curByte | (((code >> b) & 1u) << (7 - bitsInByte))); + ++bitsInByte; + ++totalBits; + if (bitsInByte == 8) { + buf.push_back(curByte); + curByte = 0; + bitsInByte = 0; + } + } + } + if (bitsInByte > 0) { + buf.push_back(curByte); + } + + // Backfill totalBits + for (int i = 0; i < 8; ++i) { + buf[totalBitsOffset + i] = static_cast((totalBits >> (8 * i)) & 0xFF); + } +} + +/// Decode `N` symbols from a canonical Huffman payload at [ptr, end) +/// `ptr` must point to the start of the Huffman table header (numSymbols field) +/// After return, `ptr` is advanced past the bitstream +std::vector huffmanDecode(const uint8_t*& ptr, const uint8_t* end, uint32_t N) +{ + auto readU32 = [&]() -> uint32_t { + if (ptr + 4 > end) { + throw std::runtime_error("huffmanDecode: unexpected end reading uint32"); + } + const uint32_t v = static_cast(ptr[0]) | (static_cast(ptr[1]) << 8) | + (static_cast(ptr[2]) << 16) | (static_cast(ptr[3]) << 24); + ptr += 4; + return v; + }; + + const uint32_t numSym = readU32(); + struct SymLen { + uint32_t sym; + uint8_t len; + }; + std::vector symLens(numSym); + for (uint32_t i = 0; i < numSym; ++i) { + symLens[i].sym = readU32(); + if (ptr >= end) { + throw std::runtime_error("huffmanDecode: unexpected end reading code length"); + } + symLens[i].len = *ptr++; + } + + std::map firstCode; + std::map> symsByLen; + { + uint32_t code = 0; + uint8_t prevLen = 0; + for (const auto& sl : symLens) { + if (prevLen != 0) { + code = (code + 1) << (sl.len - prevLen); + } + if (!firstCode.count(sl.len)) { + firstCode[sl.len] = code; + } + symsByLen[sl.len].push_back(sl.sym); + prevLen = sl.len; + } + } + + if (ptr + 8 > end) { + throw std::runtime_error("huffmanDecode: unexpected end reading totalBits"); + } + uint64_t totalBits = 0; + for (int i = 0; i < 8; ++i) { + totalBits |= static_cast(ptr[i]) << (8 * i); + } + ptr += 8; + + const uint8_t minLen = symLens.empty() ? 1 : symLens.front().len; + const uint8_t maxLen = symLens.empty() ? 1 : symLens.back().len; + uint64_t bitsRead = 0; + uint8_t curByte = 0; + int bitPos = -1; + + auto nextBit = [&]() -> int { + if (bitPos < 0) { + if (ptr >= end) { + throw std::runtime_error("huffmanDecode: unexpected end of bitstream"); + } + curByte = *ptr++; + bitPos = 7; + } + const int bit = (curByte >> bitPos) & 1; + --bitPos; + return bit; + }; + + std::vector out; + out.reserve(N); + while (out.size() < N) { + uint32_t accum = 0; + bool found = false; + for (uint8_t curLen = 1; curLen <= maxLen; ++curLen) { + if (bitsRead >= totalBits) { + throw std::runtime_error("huffmanDecode: bitstream exhausted before all symbols decoded"); + } + accum = (accum << 1) | static_cast(nextBit()); + ++bitsRead; + if (curLen < minLen) { + continue; + } + const auto fcIt = firstCode.find(curLen); + if (fcIt == firstCode.end()) { + continue; + } + if (accum >= fcIt->second) { + const uint32_t idx = accum - fcIt->second; + const auto& sv = symsByLen.at(curLen); + if (idx < sv.size()) { + out.push_back(sv[idx]); + found = true; + break; + } + } + } + if (!found) { + throw std::runtime_error("huffmanDecode: invalid Huffman code in bitstream"); + } + } + return out; +} + +} // anonymous namespace + +// CMVPerTF public methods + +uint16_t CMVPerTF::getCMV(const int cru, const int timeBin) const +{ + if (cru < 0 || cru >= static_cast(CRU::MaxCRU)) { + throw std::out_of_range(fmt::format("CMVPerTF::getCMV: cru {} out of range [0, {})", cru, static_cast(CRU::MaxCRU))); + } + if (timeBin < 0 || static_cast(timeBin) >= cmv::NTimeBinsPerTF) { + throw std::out_of_range(fmt::format("CMVPerTF::getCMV: timeBin {} out of range [0, {})", timeBin, static_cast(cmv::NTimeBinsPerTF))); + } + return mDataPerTF[cru * cmv::NTimeBinsPerTF + timeBin]; +} + +float CMVPerTF::getCMVFloat(const int cru, const int timeBin) const +{ + const uint16_t raw = getCMV(cru, timeBin); + const uint16_t mag = raw & 0x7FFF; + if (mag == 0) { + return 0.0f; // 0x0000 and 0x8000 both represent zero; return +0 to avoid -0 display + } + const bool positive = (raw >> 15) & 1; // bit 15: sign (1=positive, 0=negative) + return positive ? mag / 128.f : -mag / 128.f; +} + +void CMVPerTF::zeroSmallValues(float threshold) +{ + if (threshold <= 0.f) { + return; + } + for (uint32_t i = 0; i < static_cast(CRU::MaxCRU) * cmv::NTimeBinsPerTF; ++i) { + const float mag = (mDataPerTF[i] & 0x7FFF) / 128.f; + if (mag < threshold) { + mDataPerTF[i] = 0; + } + } +} + +void CMVPerTF::roundToIntegers(uint16_t threshold) +{ + if (threshold == 0) { + return; + } + for (uint32_t i = 0; i < static_cast(CRU::MaxCRU) * cmv::NTimeBinsPerTF; ++i) { + const uint16_t raw = mDataPerTF[i]; + if (raw == 0) { + continue; + } + const uint16_t rounded = static_cast(((raw & 0x7FFFu) + 64u) >> 7); + if (rounded > threshold) { + continue; // above range: keep full precision + } + mDataPerTF[i] = (rounded == 0) ? 0 : static_cast((raw & 0x8000u) | (rounded << 7)); + } +} + +void CMVPerTF::trimGaussianPrecision(float mean, float sigma) +{ + if (sigma <= 0.f) { + return; + } + + for (uint32_t i = 0; i < static_cast(CRU::MaxCRU) * cmv::NTimeBinsPerTF; ++i) { + mDataPerTF[i] = quantizeBelowThreshold(mDataPerTF[i], mean, sigma); + } +} + +CMVPerTFCompressed CMVPerTF::compress(uint8_t flags) const +{ + CMVPerTFCompressed out; + out.firstOrbit = firstOrbit; + out.firstBC = firstBC; + out.mFlags = flags; + + if (flags & CMVEncoding::kSparse) { + // --- Sparse path: position stream + value stream --- + + // Single pass per CRU: build the position stream and collect raw non-zero values. + std::vector posStream; + std::vector rawValues; + + for (int cru = 0; cru < static_cast(CRU::MaxCRU); ++cru) { + struct Entry { + uint32_t tb; + uint16_t val; + }; + std::vector entries; + for (uint32_t tb = 0; tb < cmv::NTimeBinsPerTF; ++tb) { + const uint16_t val = mDataPerTF[cru * cmv::NTimeBinsPerTF + tb]; + if (val != 0) { + entries.push_back({tb, val}); + } + } + + encodeVarintInto(static_cast(entries.size()), posStream); + uint32_t prevTB = 0; + bool first = true; + for (const auto& e : entries) { + encodeVarintInto(first ? e.tb : (e.tb - prevTB), posStream); + rawValues.push_back(e.val); + prevTB = e.tb; + first = false; + } + } + + // Encode the value stream based on flags. + std::vector valStream; + if (flags & CMVEncoding::kZigzag) { + std::vector zigzags; + zigzags.reserve(rawValues.size()); + for (const uint16_t v : rawValues) { + zigzags.push_back(zigzagEncode(cmvToSigned(v))); + } + if (flags & CMVEncoding::kHuffman) { + huffmanEncode(zigzags, valStream); + } else { // kVarint + for (const uint32_t z : zigzags) { + encodeVarintInto(z, valStream); + } + } + } else { + // Raw uint16 LE + for (const uint16_t v : rawValues) { + valStream.push_back(static_cast(v & 0xFF)); + valStream.push_back(static_cast(v >> 8)); + } + } + + // Assemble: [4 bytes posStreamSize][posStream][valStream] + const uint32_t posStreamSize = static_cast(posStream.size()); + out.mData.reserve(4 + posStream.size() + valStream.size()); + for (int i = 0; i < 4; ++i) { + out.mData.push_back(static_cast((posStreamSize >> (8 * i)) & 0xFF)); + } + out.mData.insert(out.mData.end(), posStream.begin(), posStream.end()); + out.mData.insert(out.mData.end(), valStream.begin(), valStream.end()); + + } else { + // --- Dense path: all CRU * TimeBin values --- + const uint32_t total = static_cast(CRU::MaxCRU) * cmv::NTimeBinsPerTF; + + if (!(flags & CMVEncoding::kZigzag)) { + // No encoding: raw uint16 LE + out.mData.reserve(total * 2); + for (uint32_t i = 0; i < total; ++i) { + out.mData.push_back(static_cast(mDataPerTF[i] & 0xFF)); + out.mData.push_back(static_cast(mDataPerTF[i] >> 8)); + } + } else { + // Zigzag + optional delta (CRU-major, time-minor) + const bool useDelta = (flags & CMVEncoding::kDelta) != 0; + std::vector zigzags; + zigzags.reserve(total); + for (int cru = 0; cru < static_cast(CRU::MaxCRU); ++cru) { + int32_t prev = 0; + for (uint32_t tb = 0; tb < cmv::NTimeBinsPerTF; ++tb) { + const int32_t val = cmvToSigned(mDataPerTF[cru * cmv::NTimeBinsPerTF + tb]); + const int32_t encoded = useDelta ? (val - prev) : val; + if (useDelta) { + prev = val; + } + zigzags.push_back(zigzagEncode(encoded)); + } + } + + if (flags & CMVEncoding::kHuffman) { + huffmanEncode(zigzags, out.mData); + } else { // kVarint + for (const uint32_t z : zigzags) { + encodeVarintInto(z, out.mData); + } + } + } + } + + return out; +} + +// CMVPerTFCompressed::decompress staged pipeline + +std::vector> CMVPerTFCompressed::decodeSparsePositions(const uint8_t*& ptr, const uint8_t* end) +{ + // Read 4-byte LE posStreamSize + if (ptr + 4 > end) { + throw std::runtime_error("CMVPerTFCompressed::decompress: truncated position header"); + } + const uint32_t posStreamSize = static_cast(ptr[0]) | (static_cast(ptr[1]) << 8) | + (static_cast(ptr[2]) << 16) | (static_cast(ptr[3]) << 24); + ptr += 4; + + const uint8_t* posEnd = ptr + posStreamSize; + if (posEnd > end) { + throw std::runtime_error("CMVPerTFCompressed::decompress: posStream overflows payload"); + } + + // Decode per-CRU varint(N) + N×varint(tb_delta) + std::vector> positions; + const uint8_t* p = ptr; + for (int cru = 0; cru < static_cast(CRU::MaxCRU); ++cru) { + const uint32_t count = decodeVarintLocal(p, posEnd); + uint32_t tb = 0; + bool first = true; + for (uint32_t i = 0; i < count; ++i) { + const uint32_t delta = decodeVarintLocal(p, posEnd); + tb = first ? delta : (tb + delta); + first = false; + positions.emplace_back(cru, tb); + } + } + ptr = posEnd; // advance past the entire position block + return positions; +} + +std::vector CMVPerTFCompressed::decodeValueStream(const uint8_t*& ptr, const uint8_t* end, uint32_t N, uint8_t flags) +{ + if (flags & CMVEncoding::kHuffman) { + // Huffman-encoded symbols + return huffmanDecode(ptr, end, N); + } + + if (flags & CMVEncoding::kVarint) { + // Varint-encoded symbols + std::vector out; + out.reserve(N); + for (uint32_t i = 0; i < N; ++i) { + out.push_back(decodeVarintLocal(ptr, end)); + } + return out; + } + + // Raw uint16 LE (no value encoding) + std::vector out; + out.reserve(N); + for (uint32_t i = 0; i < N; ++i) { + if (ptr + 2 > end) { + throw std::runtime_error("CMVPerTFCompressed::decompress: unexpected end in raw value stream"); + } + const uint16_t v = static_cast(ptr[0]) | (static_cast(ptr[1]) << 8); + ptr += 2; + out.push_back(v); + } + return out; +} + +void CMVPerTFCompressed::decodeSparseValues(const std::vector& symbols, + const std::vector>& positions, + uint8_t flags, CMVPerTF* cmv) +{ + const bool useZigzag = (flags & CMVEncoding::kZigzag) != 0; + for (uint32_t i = 0; i < static_cast(positions.size()); ++i) { + uint16_t raw; + if (useZigzag) { + raw = signedToCmvLocal(zigzagDecodeLocal(symbols[i])); + } else { + raw = static_cast(symbols[i]); + } + cmv->mDataPerTF[positions[i].first * cmv::NTimeBinsPerTF + positions[i].second] = raw; + } +} + +void CMVPerTFCompressed::decodeDenseValues(const std::vector& symbols, uint8_t flags, CMVPerTF* cmv) +{ + const bool useZigzag = (flags & CMVEncoding::kZigzag) != 0; + const bool useDelta = (flags & CMVEncoding::kDelta) != 0; + + if (!useZigzag) { + // Symbols are raw uint16 values; write directly + for (uint32_t i = 0; i < static_cast(symbols.size()); ++i) { + cmv->mDataPerTF[i] = static_cast(symbols[i]); + } + return; + } + + // Inverse zigzag + optional inverse delta (CRU-major, time-minor) + uint32_t s = 0; + for (int cru = 0; cru < static_cast(CRU::MaxCRU); ++cru) { + int32_t prev = 0; + for (uint32_t tb = 0; tb < cmv::NTimeBinsPerTF; ++tb, ++s) { + int32_t val = zigzagDecodeLocal(symbols[s]); + if (useDelta) { + val += prev; + prev = val; + } + cmv->mDataPerTF[s] = signedToCmvLocal(val); + } + } +} + +void CMVPerTFCompressed::decompress(CMVPerTF* cmv) const +{ + if (!cmv) { + throw std::invalid_argument("CMVPerTFCompressed::decompress: cmv pointer is null"); + } + cmv->firstOrbit = firstOrbit; + cmv->firstBC = firstBC; + std::fill(std::begin(cmv->mDataPerTF), std::end(cmv->mDataPerTF), uint16_t(0)); + + const uint8_t* ptr = mData.data(); + const uint8_t* end = ptr + mData.size(); + + if (mFlags & CMVEncoding::kSparse) { + // Stage 1: decode position stream + auto positions = decodeSparsePositions(ptr, end); + const uint32_t N = static_cast(positions.size()); + + // Stage 2: decode value stream (Huffman / varint / raw) + auto symbols = decodeValueStream(ptr, end, N, mFlags); + + // Stage 3: inverse zigzag and scatter into CMV array + decodeSparseValues(symbols, positions, mFlags, cmv); + } else { + const uint32_t N = static_cast(CRU::MaxCRU) * cmv::NTimeBinsPerTF; + + // Stage 1: decode value stream (Huffman / varint / raw) + auto symbols = decodeValueStream(ptr, end, N, mFlags); + + // Stage 2: inverse zigzag, inverse delta, fill CMV array + decodeDenseValues(symbols, mFlags, cmv); + } +} + +std::unique_ptr CMVPerTF::toTTree() const +{ + auto tree = std::make_unique("ccdb_object", "ccdb_object"); + tree->SetAutoSave(0); + tree->SetDirectory(nullptr); + + const CMVPerTF* ptr = this; + tree->Branch("CMVPerTF", &ptr); + tree->Fill(); + + tree->ResetBranchAddresses(); + return tree; +} + +std::unique_ptr CMVPerTFCompressed::toTTree() const +{ + auto tree = std::make_unique("ccdb_object", "ccdb_object"); + tree->SetAutoSave(0); + tree->SetDirectory(nullptr); + + const CMVPerTFCompressed* ptr = this; + tree->Branch("CMVPerTFCompressed", &ptr); + tree->Fill(); + + tree->ResetBranchAddresses(); + return tree; +} + +void CMVPerTF::writeToFile(const std::string& filename, const std::unique_ptr& tree) +{ + TFile f(filename.c_str(), "RECREATE"); + if (f.IsZombie()) { + throw std::runtime_error(fmt::format("CMVPerTF::writeToFile: cannot open '{}'", filename)); + } + tree->Write(); + f.Close(); +} + +} // namespace o2::tpc diff --git a/Detectors/TPC/calibration/src/TPCCalibrationLinkDef.h b/Detectors/TPC/calibration/src/TPCCalibrationLinkDef.h index 6e15e2dd0427a..14d3d0a8ffb8e 100644 --- a/Detectors/TPC/calibration/src/TPCCalibrationLinkDef.h +++ b/Detectors/TPC/calibration/src/TPCCalibrationLinkDef.h @@ -123,4 +123,8 @@ #pragma link C++ class o2::tpc::DigitAdd + ; #pragma link C++ class std::vector < o2::tpc::DigitAdd> + ; #pragma link C++ class o2::tpc::PressureTemperatureHelper + ; + +#pragma link C++ class o2::tpc::CMVPerTF + ; +#pragma link C++ class o2::tpc::CMVPerTFCompressed + ; + #endif diff --git a/Detectors/TPC/workflow/CMakeLists.txt b/Detectors/TPC/workflow/CMakeLists.txt index 6930f332bfbf1..0f8d73b1cbe7e 100644 --- a/Detectors/TPC/workflow/CMakeLists.txt +++ b/Detectors/TPC/workflow/CMakeLists.txt @@ -25,6 +25,7 @@ o2_add_library(TPCWorkflow src/KryptonRawFilterSpec.cxx src/OccupancyFilterSpec.cxx src/SACProcessorSpec.cxx + src/CMVToVectorSpec.cxx src/IDCToVectorSpec.cxx src/CalibdEdxSpec.cxx src/CalibratordEdxSpec.cxx @@ -288,4 +289,19 @@ o2_add_executable(pressure-temperature SOURCES src/tpc-pressure-temperature.cxx PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) -add_subdirectory(readers) +o2_add_executable(cmv-to-vector + COMPONENT_NAME tpc + SOURCES src/tpc-cmv-to-vector.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +o2_add_executable(cmv-flp + COMPONENT_NAME tpc + SOURCES src/tpc-flp-cmv.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +o2_add_executable(cmv-distribute + COMPONENT_NAME tpc + SOURCES src/tpc-distribute-cmv.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +add_subdirectory(readers) \ No newline at end of file diff --git a/Detectors/TPC/workflow/README.md b/Detectors/TPC/workflow/README.md index e34faa2813edf..b7a19da121e9b 100644 --- a/Detectors/TPC/workflow/README.md +++ b/Detectors/TPC/workflow/README.md @@ -274,3 +274,191 @@ To directly dump the digits to file for inspection use for the reco workflow ```bash | o2-tpc-reco-workflow --input-type digitizer --output-type digits --disable-mc ``` + +## TPC Common Mode Value (CMV) Workflows + +The CMV workflows parse raw TPC data, buffer Common Mode Values per CRU on FLPs, then merge and aggregate them on a calibration node before serializing the CMVContainer in a TTree. The resulting object can be uploaded to the CCDB or written to the disk. + +### Workflow components + +| Executable | Output | Description | +|---|---|---| +| `o2-tpc-cmv-to-vector` | `TPC/CMVVECTOR` | Parses raw TPC data and creates vectors of CMVs per CRU | +| `o2-tpc-cmv-flp` | `TPC/CMVGROUP` | Buffers N TFs per CRU on the FLP and groups them for forwarding | +| `o2-tpc-cmv-distribute` | TTree / CCDB payload | Merges CRUs over N TFs on the calibration node, serializes the CMVContainer into a TTree, and either writes it to disk (`--dump-cmvs`) or forwards it as a CCDB object (`--enable-CCDB-output`) | + +#### `o2-tpc-cmv-to-vector` + +| Option | Default | Description | +|---|---|---| +| `--input-spec` | `A:TPC/RAWDATA` | DPL input spec for raw TPC data | +| `--crus` | `0-359` | CRU range to process, comma-separated ranges | +| `--write-debug` | false | Write a debug output tree every TF | +| `--write-debug-on-error` | false | Write a debug output tree only when decoding errors occur | +| `--debug-file-name` | `/tmp/cmv_vector_debug.{run}.root` | Name of the debug output ROOT file | +| `--write-raw-data-on-error` | false | Dump raw data to file when decoding errors occur | +| `--raw-file-name` | `/tmp/cmv_debug.{run}.{raw_type}` | Name of the raw debug output file | +| `--raw-data-type` | 0 | Raw data format to dump on error: 0 = full TPC with DPL header, 1 = full TPC with DPL header (skip empty), 2 = full TPC no DPL header, 3 = full TPC no DPL header (skip empty), 4 = IDC raw only, 5 = CMV raw only | +| `--check-incomplete-hbf` | false | Check and report incomplete HBFs in the raw parser | + +#### `o2-tpc-cmv-flp` + +| Option | Default | Description | +|---|---|---| +| `--crus` | `0-359` | CRU range handled by this FLP | +| `--lanes` | hw_concurrency/2 | Parallel processing lanes (CRUs split per lane) | +| `--time-lanes` | 1 | Parallel lanes for time-frame splitting | +| `--n-TFs-buffer` | 1 | Number of TFs to buffer before forwarding | +| `--dump-cmvs-flp` | false | Dump raw CMV vectors per CRU to a ROOT file each TF (for debugging) | + +#### `o2-tpc-cmv-distribute` + +| Option | Default | Description | +|---|---|---| +| `--crus` | `0-359` | CRU range expected from upstream | +| `--timeframes` | 2000 | Number of TFs aggregated per calibration interval | +| `--firstTF` | -1 | First time frame index; -1 = auto-detect from first incoming TF; values < -1 set an offset of `\|firstTF\|+1` TFs before the first interval begins | +| `--lanes` | 1 | Number of parallel lanes (CRUs are split evenly across lanes) | +| `--n-TFs-buffer` | 1 | Number of TFs buffered per group in the upstream `o2-tpc-cmv-flp` (must match that workflow's setting) | +| `--enable-CCDB-output` | false | Forward the CMVContainer TTree as a CCDB object to `o2-calibration-ccdb-populator-workflow` | +| `--use-precise-timestamp` | false | Fetch orbit-reset and GRPECS from CCDB to compute a precise CCDB validity timestamp | +| `--dump-cmvs` | false | Write the CMVContainer TTree to a local ROOT file on disk | +| `--use-sparse` | false | Sparse encoding: skip zero time bins (raw uint16 values; combine with `--use-compression-varint` or `--use-compression-huffman` for compressed sparse output) | +| `--use-compression-varint` | false | Delta + zigzag + varint compression over all values; combined with `--use-sparse`: varint-encoded exact values at non-zero positions | +| `--use-compression-huffman` | false | Huffman encoding over all values; combined with `--use-sparse`: Huffman-encoded exact values at non-zero positions | +| `--cmv-zero-threshold` | 0 | Zero out CMV values whose magnitude is below this threshold (ADC) after optional rounding and before compression; 0 disables | +| `--cmv-round-integers-threshold` | 0 | Round values to nearest integer ADC for \|v\| ≤ N ADC before compression; 0 disables | +| `--cmv-dynamic-precision-mean` | 1.0 | Gaussian centre in \|CMV\| (ADC) where the strongest fractional-bit trimming is applied | +| `--cmv-dynamic-precision-sigma` | 0 | Gaussian width (ADC) for smooth CMV fractional-bit trimming; 0 disables | +| `--drop-data-after-nTFs` | 0 | Drop data for a relative TF slot after this many TFs have passed without receiving all CRUs; 0 uses the default derived from `--check-data-every-n` | +| `--check-data-every-n` | 0 | Check for missing CRU data every N invocations of the run function; -1 disables checking, 0 uses the default (timeframes/2) | +| `--nFactorTFs` | 1000 | Number of TFs to skip before flushing the oldest incomplete aggregation interval | + +### Example 1 — Simple usage for testing + +```bash +#!/bin/bash + +hash="test" +MAX_TFS=1 +CRUS="0-359" + +ARGS_ALL="-b --session ${USER}.${hash} --shm-segment-size $((8<<30))" + +o2-raw-tf-reader-workflow $ARGS_ALL \ + --input-data tf.subset.list \ + --max-tf ${MAX_TFS} | +o2-tfidinfo-writer-workflow $ARGS_ALL \ + --early-forward-policy noraw \ + --fairmq-rate-logging 0 \ + --timeframes-rate-limit ${MAX_TFS} \ + --timeframes-rate-limit-ipcid 583693664 | +o2-tpc-cmv-to-vector $ARGS_ALL \ + --input-spec "A:TPC/RAWDATA" \ + --write-debug-on-error \ + --crus ${CRUS} | +o2-tpc-cmv-flp $ARGS_ALL \ + --crus ${CRUS} | +o2-tpc-cmv-distribute $ARGS_ALL \ + --crus ${CRUS} \ + --dump-cmvs \ + --enable-CCDB-output \ + --cmv-zero-threshold 1.0 \ + --cmv-dynamic-precision-mean 1.0 \ + --cmv-dynamic-precision-sigma 8.0 \ + --use-sparse \ + --use-compression-huffman | +o2-calibration-ccdb-populator-workflow $ARGS_ALL \ + --ccdb-path ccdb-test.cern.ch:8080 +``` + +### Example 2 — Bash scripts for more realistic testing + +In a real online setup, multiple FLPs each process their own CRU subset and forward compressed CMV groups to a central aggregator node via ZeroMQ. + +**FLP side (`Send.sh`)** — run one instance per FLP (pass `N_FLPs` as first argument): + +```bash +#!/bin/bash + +# Number of FLPs (passed as first argument, default 1) +N_FLPs=${1:-1} + +hash="test" +MAX_TFS=1 + +minCRU=0 +maxCRU=360 + +ARGS_ALL="-b --shm-segment-size $((8<<30))" + +for ((i = 0; i < ${N_FLPs}; i++)); do + xpos_start=100 + xpos=$((xpos_start + 1000 * $i)) + + let diff=${maxCRU}-${minCRU} + let Start=${minCRU}+$i*${diff}/${N_FLPs} + let End=$Start+${diff}/${N_FLPs}-1 + + crus="$Start-$End" + echo "FLP $i: crus $crus" + + xterm -hold -geometry 150x41+$xpos+300 -e bash -c "unset PYTHONHOME PYTHONPATH; echo FLP $i; + o2-raw-tf-reader-workflow $ARGS_ALL \ + --session ${USER}.${hash}.send.$i \ + --input-data tf.subset.list \ + --max-tf ${MAX_TFS} | + o2-tfidinfo-writer-workflow $ARGS_ALL \ + --session ${USER}.${hash}.send.$i \ + --early-forward-policy noraw \ + --fairmq-rate-logging 0 \ + --timeframes-rate-limit ${MAX_TFS} \ + --timeframes-rate-limit-ipcid $((583693664 + $i)) | + o2-tpc-cmv-to-vector $ARGS_ALL \ + --session ${USER}.${hash}.send.$i \ + --input-spec 'A:TPC/RAWDATA' \ + --write-debug-on-error \ + --crus ${crus} | + o2-tpc-cmv-flp $ARGS_ALL \ + --session ${USER}.${hash}.send.$i \ + --crus ${crus} | + o2-dpl-output-proxy $ARGS_ALL \ + --session ${USER}.${hash}.send.$i \ + --sporadic-inputs \ + --channel-config 'name=downstream,method=connect,address=tcp://localhost:30453,type=push,transport=zeromq' \ + --dataspec 'downstream:TPC/CMVGROUP;downstream:TPC/CMVORBITINFO'; exec bash" & +done +``` + +Each FLP connects to the aggregator's pull socket on port `30453` and pushes `TPC/CMVGROUP` and `TPC/CMVORBITINFO` messages. The CRU range is automatically split evenly across `N_FLPs`. + +**Aggregator side (`Receive.sh`)**: + +```bash +#!/bin/bash + +hash="test" +CRUS="0-359" + +ARGS_ALL="-b --session ${USER}.${hash}.receive --shm-segment-size $((8<<30))" + +# ZeroMQ proxy: pull from all FLPs connecting on port 30453 +configProxy="name=readout-proxy,type=pull,method=bind,address=tcp://localhost:30453,rateLogging=1,transport=zeromq" + +o2-dpl-raw-proxy $ARGS_ALL \ + --channel-config "${configProxy}" \ + --dataspec "A:TPC/CMVGROUP;A:TPC/CMVORBITINFO" | +o2-tpc-cmv-distribute $ARGS_ALL \ + --crus ${CRUS} \ + --dump-cmvs \ + --enable-CCDB-output \ + --cmv-zero-threshold 1.0 \ + --cmv-dynamic-precision-mean 1.0 \ + --cmv-dynamic-precision-sigma 8.0 \ + --use-sparse \ + --use-compression-huffman | +o2-calibration-ccdb-populator-workflow $ARGS_ALL \ + --ccdb-path ccdb-test.cern.ch:8080 +``` + +The aggregator binds the ZeroMQ pull socket and waits for all FLPs to connect. Once `TPC/CMVGROUP` and `TPC/CMVORBITINFO` data arrive, `o2-tpc-cmv-distribute` merges them, applies compression, writes the object to the disk and uploads to the CCDB. diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h new file mode 100644 index 0000000000000..add37af5706e5 --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h @@ -0,0 +1,30 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVToVectorSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Processor to convert CMVs to a vector in a CRU + +#ifndef TPC_CMVToVectorSpec_H_ +#define TPC_CMVToVectorSpec_H_ + +#include "Framework/DataProcessorSpec.h" + +namespace o2::tpc +{ + +/// create a processor spec +/// convert CMV raw values to a vector in a CRU +o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus); + +} // end namespace o2::tpc + +#endif // TPC_CMVToVectorSpec_H_ diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h new file mode 100644 index 0000000000000..c1744ce86d3ac --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h @@ -0,0 +1,621 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file TPCDistributeCMVSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief TPC aggregation of grouped CMVs + +#ifndef O2_TPCDISTRIBUTECMVSPEC_H +#define O2_TPCDISTRIBUTECMVSPEC_H + +#include +#include +#include +#include "TParameter.h" +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/DataTakingContext.h" +#include "Headers/DataHeader.h" +#include "Framework/ConfigParamRegistry.h" +#include "TPCWorkflow/TPCFLPCMVSpec.h" +#include "MemoryResources/MemoryResources.h" +#include "TPCWorkflow/ProcessingHelpers.h" +#include "DetectorsBase/GRPGeomHelper.h" +#include "CommonDataFormat/Pair.h" +#include "TMemFile.h" +#include "CCDB/CcdbApi.h" +#include "CCDB/CcdbObjectInfo.h" +#include "DetectorsCalibration/Utils.h" +#include "TPCCalibration/CMVContainer.h" +#include "DataFormatsTPC/CMV.h" + +using namespace o2::framework; +using o2::header::gDataOriginTPC; +using namespace o2::tpc; + +namespace o2::tpc +{ + +class TPCDistributeCMVSpec : public o2::framework::Task +{ + public: + TPCDistributeCMVSpec(const std::vector& crus, const unsigned int timeframes, const int nTFsBuffer, const int firstTF, const bool sendCCDB, const bool usePreciseTimestamp, std::shared_ptr req) + : mCRUs{crus}, + mTimeFrames{timeframes}, + mNTFsBuffer{nTFsBuffer}, + mProcessedCRU{{std::vector(timeframes), std::vector(timeframes)}}, + mTFStart{{firstTF, firstTF + timeframes}}, + mTFEnd{{firstTF + timeframes - 1, mTFStart[1] + timeframes - 1}}, + mCCDBRequest(req), + mSendCCDB{sendCCDB}, + mUsePreciseTimestamp{usePreciseTimestamp}, + mSendCCDBOutputOrbitReset(1), + mSendCCDBOutputGRPECS(1), + mOrbitInfoForwarded{{std::vector(timeframes, false), std::vector(timeframes, false)}} + { + // sort vector for binary_search + std::sort(mCRUs.begin(), mCRUs.end()); + + for (auto& processedCRUbuffer : mProcessedCRUs) { + processedCRUbuffer.resize(mTimeFrames); + for (auto& crusMap : processedCRUbuffer) { + crusMap.reserve(mCRUs.size()); + for (const auto cruID : mCRUs) { + crusMap.emplace(cruID, false); + } + } + } + + mFilter.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic}); + mOrbitFilter.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic}); + + // pre-allocate the accumulator TTree for the current aggregation interval + initIntervalTree(); + }; + + void init(o2::framework::InitContext& ic) final + { + o2::base::GRPGeomHelper::instance().setRequest(mCCDBRequest); + mNFactorTFs = ic.options().get("nFactorTFs"); + mNTFsDataDrop = ic.options().get("drop-data-after-nTFs"); + mCheckEveryNData = ic.options().get("check-data-every-n"); + if (mCheckEveryNData == 0) { + mCheckEveryNData = mTimeFrames / 2; + if (mCheckEveryNData == 0) { + mCheckEveryNData = 1; + } + mNTFsDataDrop = mCheckEveryNData; + } + mDumpCMVs = ic.options().get("dump-cmvs"); + mUseCompressionVarint = ic.options().get("use-compression-varint"); + mUseSparse = ic.options().get("use-sparse"); + mUseCompressionHuffman = ic.options().get("use-compression-huffman"); + mRoundIntegersThreshold = static_cast(ic.options().get("cmv-round-integers-threshold")); + mZeroThreshold = ic.options().get("cmv-zero-threshold"); + mDynamicPrecisionMean = ic.options().get("cmv-dynamic-precision-mean"); + mDynamicPrecisionSigma = ic.options().get("cmv-dynamic-precision-sigma"); + LOGP(info, "CMV compression settings: use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}", + mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma); + // re-initialise the interval tree now that compression options are known (constructor used the defaults) + initIntervalTree(); + } + + void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final + { + o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj); + if (matcher == ConcreteDataMatcher("CTP", "ORBITRESET", 0)) { + LOGP(info, "Updating ORBITRESET"); + std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true); + } else if (matcher == ConcreteDataMatcher("GLO", "GRPECS", 0)) { + // check if received object is valid + if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) { + LOGP(info, "Updating GRPECS"); + std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(), true); + } else { + LOGP(info, "Detected default GRPECS object"); + } + } + } + + void run(o2::framework::ProcessingContext& pc) final + { + // capture orbit-reset info once for precise CCDB timestamp calculation + if (mCCDBRequest->askTime) { + const bool grpecsValid = pc.inputs().isValid("grpecs"); + const bool orbitResetValid = pc.inputs().isValid("orbitReset"); + if (grpecsValid) { + pc.inputs().get("grpecs"); + } + if (orbitResetValid) { + pc.inputs().get*>("orbitReset"); + } + if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) { + return; + } + // update mTFInfo from GRPGeomHelper whenever orbit-reset or GRPECS objects are fresh + if (mSendCCDBOutputOrbitReset[0] && mSendCCDBOutputGRPECS[0]) { + mSendCCDBOutputOrbitReset[0] = false; + mSendCCDBOutputGRPECS[0] = false; + mTFInfo = dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}; + } + } + + const auto tf = processing_helpers::getCurrentTF(pc); + mLastSeenTF = tf; // track for endOfStream flush + + // automatically detect firstTF in case firstTF was not specified + if (mTFStart.front() <= -1) { + const auto firstTF = tf; + const long offsetTF = std::abs(mTFStart.front() + 1); + const auto nTotTFs = getNRealTFs(); + mTFStart = {firstTF + offsetTF, firstTF + offsetTF + nTotTFs}; + mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs}; + LOGP(info, "Setting {} as first TF", mTFStart[0]); + LOGP(info, "Using offset of {} TFs for setting the first TF", offsetTF); + } + + // check which buffer to use for current incoming data + const bool currentBuffer = (tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer; + if (mTFStart[currentBuffer] > tf) { + LOGP(info, "All CRUs for current TF {} already received. Skipping this TF", tf); + return; + } + + const unsigned int relTF = (tf - mTFStart[currentBuffer]) / mNTFsBuffer; + LOGP(info, "Current TF: {}, relative TF: {}, current buffer: {}, mTFStart: {}", tf, relTF, currentBuffer, mTFStart[currentBuffer]); + + if (relTF >= mProcessedCRU[currentBuffer].size()) { + LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size()); + + // check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received + mProcessedTotalData = mCheckEveryNData; + checkIntervalsForMissingData(pc, currentBuffer, relTF, tf); + return; + } + + if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) { + return; + } + + // record the absolute first TF of this aggregation interval + if (mIntervalTFCount == 0) { + mIntervalFirstTF = tf; + } + + // set CCDB start timestamp once at the start of each aggregation interval + if (mTimestampStart == 0) { + setTimestampCCDB(relTF, pc); + } + + // capture orbit/BC info into the interval once per relTF. + // all CRUs within a TF carry identical timing, so the first one is sufficient. + if (!mOrbitInfoForwarded[currentBuffer][relTF]) { + for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); + const unsigned int cru = hdr->subSpecification >> 7; + if (std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { + const auto orbitBC = pc.inputs().get(ref); + if (mCurrentTF.firstOrbit == 0 && mCurrentTF.firstBC == 0) { + mCurrentTF.firstOrbit = static_cast(orbitBC >> 32); + mCurrentTF.firstBC = static_cast(orbitBC & 0xFFFFu); + } + mOrbitInfoForwarded[currentBuffer][relTF] = true; + break; // one per relTF is enough + } + } + } + + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); + const unsigned int cru = tpcCRUHeader->subSpecification >> 7; + + // check if cru is specified in input cru list + if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) { + LOGP(info, "Received data from CRU: {} which was not specified as input. Skipping", cru); + continue; + } + + if (mProcessedCRUs[currentBuffer][relTF][cru]) { + continue; + } else { + // count total number of processed CRUs for given TF + ++mProcessedCRU[currentBuffer][relTF]; + + // to keep track of processed CRUs + mProcessedCRUs[currentBuffer][relTF][cru] = true; + } + + // accumulate raw 16-bit CMVs into the flat array for the current TF + auto cmvVec = pc.inputs().get>(ref); + const uint32_t nTimeBins = std::min(static_cast(cmvVec.size()), cmv::NTimeBinsPerTF); + for (uint32_t tb = 0; tb < nTimeBins; ++tb) { + mCurrentTF.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = cmvVec[tb]; + } + } + + LOGP(info, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf); + + // check for missing data if specified + if (mNTFsDataDrop > 0) { + checkIntervalsForMissingData(pc, currentBuffer, relTF, tf); + } + + if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) { + ++mProcessedTFs[currentBuffer]; + + // Pre-processing: quantisation / rounding / zeroing (applied before compression) + mCurrentTF.roundToIntegers(mRoundIntegersThreshold); + if (mZeroThreshold > 0.f) { + mCurrentTF.zeroSmallValues(mZeroThreshold); + } + if (mDynamicPrecisionSigma > 0.f) { + mCurrentTF.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma); + } + + // Compress; the raw CMVPerTF branch is used when all flags are zero + const uint8_t flags = buildCompressionFlags(); + if (flags != CMVEncoding::kNone) { + mCurrentCompressedTF = mCurrentTF.compress(flags); + } + + mIntervalTree->Fill(); + ++mIntervalTFCount; + mCurrentTF = CMVPerTF{}; + } + + if (mProcessedTFs[currentBuffer] == mTimeFrames) { + sendOutput(pc.outputs(), tf); + finishInterval(pc, currentBuffer, tf); + } + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + LOGP(info, "End of stream, flushing CMV interval ({} TFs)", mIntervalTFCount); + // correct mTFEnd for the partial last interval so the CCDB validity end timestamp reflects the actual last TF, not the expected interval end + mTFEnd[mBuffer] = mLastSeenTF; + sendOutput(ec.outputs(), mLastSeenTF); + ec.services().get().readyToQuit(QuitRequest::Me); + } + + static constexpr header::DataDescription getDataDescriptionCCDBCMV() { return header::DataDescription{"TPC_CMV"}; } + + /// Return data description for aggregated CMVs for a given lane + static header::DataDescription getDataDescriptionCMV(const unsigned int lane) + { + const std::string name = fmt::format("CMVAGG{}", lane).data(); + header::DataDescription description; + description.runtimeInit(name.substr(0, 16).c_str()); + return description; + } + + /// Return data description for orbit/BC info for a given output lane + static header::DataDescription getDataDescriptionCMVOrbitInfo(const unsigned int lane) + { + const std::string name = fmt::format("CMVORB{}", lane); + header::DataDescription description; + description.runtimeInit(name.substr(0, 16).c_str()); + return description; + } + + static constexpr header::DataDescription getDataDescriptionCMVFirstTF() { return header::DataDescription{"CMVFIRSTTF"}; } + static constexpr header::DataDescription getDataDescriptionCMVOrbitReset() { return header::DataDescription{"CMVORBITRESET"}; } + + private: + std::vector mCRUs{}; ///< CRUs to process in this instance + const unsigned int mTimeFrames{}; ///< number of TFs per aggregation interval + const int mNTFsBuffer{1}; ///< number of TFs for which the CMVs will be buffered + std::array mProcessedTFs{{0, 0}}; ///< number of processed time frames to keep track of when the writing to CCDB will be done + std::array, 2> mProcessedCRU{}; ///< counter of received data from CRUs per TF to merge incoming data from FLPs. Buffer used in case one FLP delivers the TF after the last TF for the current aggregation interval faster then the other FLPs the last TF. + std::array>, 2> mProcessedCRUs{}; ///< to keep track of the already processed CRUs ([buffer][relTF][CRU]) + std::array mTFStart{}; ///< storing of first TF for buffer interval + std::array mTFEnd{}; ///< storing of last TF for buffer interval + std::shared_ptr mCCDBRequest; ///< info for CCDB request + std::vector mSendCCDBOutputOrbitReset{}; ///< flag for received orbit reset time from CCDB + std::vector mSendCCDBOutputGRPECS{}; ///< flag for received orbit GRPECS from CCDB + bool mBuffer{false}; ///< buffer index + bool mSendCCDB{false}; ///< send output to CCDB populator + bool mUsePreciseTimestamp{false}; ///< use precise timestamp from orbit-reset info + bool mDumpCMVs{false}; ///< write a local ROOT debug file + bool mUseCompressionVarint{false}; ///< use delta+zigzag+varint compression (all values, no sparse skip); combined with mUseSparse → SparseV2 mode 1 + bool mUseSparse{false}; ///< sparse encoding; alone = raw uint16 values; combined with varint/Huffman flag → SparseV2 + bool mUseCompressionHuffman{false}; ///< Huffman encoding; combined with mUseSparse → SparseV2 mode 2 + uint16_t mRoundIntegersThreshold{0}; ///< round values to nearest integer ADC for |v| <= N ADC; 0 = disabled + float mZeroThreshold{0.f}; ///< zero out CMV values whose float magnitude is below this threshold; 0 = disabled + float mDynamicPrecisionMean{1.f}; ///< Gaussian centre in |CMV| ADC where the strongest fractional-bit trimming is applied + float mDynamicPrecisionSigma{0.f}; ///< Gaussian width in ADC for the fractional-bit trimming; 0 disables + long mTimestampStart{0}; ///< CCDB validity start timestamp + dataformats::Pair mTFInfo{}; ///< orbit-reset time and NHBFPerTF for precise timestamp + std::unique_ptr mIntervalTree{}; ///< TTree accumulating one entry per completed TF in the current interval + CMVPerTF mCurrentTF{}; ///< staging object filled per CRU before compression + CMVPerTFCompressed mCurrentCompressedTF{}; ///< compressed output for the current TF (used when flags != kNone) + long mIntervalFirstTF{0}; ///< absolute TF counter of the first TF in the current aggregation interval + unsigned int mIntervalTFCount{0}; ///< number of TTree entries filled for the current aggregation interval + int mNFactorTFs{0}; ///< Number of TFs to skip for sending oldest TF + int mNTFsDataDrop{0}; ///< delay for the check if TFs are missing in TF units + std::array mStartNTFsDataDrop{0}; ///< first relative TF to check + long mProcessedTotalData{0}; ///< used to check for dropeed TF data + int mCheckEveryNData{1}; ///< factor after which to check for missing data (in case data missing -> send dummy data) + std::vector mFilter{}; ///< filter for looping over input data + std::vector mOrbitFilter{}; ///< filter for CMVORBITINFO from FLP + std::array, 2> mOrbitInfoForwarded{}; ///< tracks whether orbit/BC has been captured per (buffer, relTF) + uint32_t mLastSeenTF{0}; ///< last TF counter seen in run(), used to set lastTF in endOfStream flush + + /// Returns real number of TFs taking buffer size into account + unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; } + + /// Build the CMVEncoding bitmask from the current option flags. + uint8_t buildCompressionFlags() const + { + uint8_t flags = CMVEncoding::kNone; + if (mUseSparse) { + flags |= CMVEncoding::kSparse; + } + if (mUseCompressionHuffman) { + flags |= CMVEncoding::kZigzag | CMVEncoding::kHuffman; + } else if (mUseCompressionVarint) { + flags |= CMVEncoding::kZigzag | CMVEncoding::kVarint; + } + // Delta coding is only applied for the dense (non-sparse) path with a value compressor + if (!(flags & CMVEncoding::kSparse) && (flags & (CMVEncoding::kVarint | CMVEncoding::kHuffman))) { + flags |= CMVEncoding::kDelta; + } + return flags; + } + + /// Create a fresh in-memory TTree for the next aggregation interval. + /// Uses a single CMVPerTFCompressed branch whenever any compression is active, + /// or a raw CMVPerTF branch when no compression flags are set. + void initIntervalTree() + { + mIntervalTree = std::make_unique("ccdb_object", "ccdb_object"); + mIntervalTree->SetAutoSave(0); + mIntervalTree->SetDirectory(nullptr); + if (buildCompressionFlags() != CMVEncoding::kNone) { + mIntervalTree->Branch("CMVPerTFCompressed", &mCurrentCompressedTF); + } else { + mIntervalTree->Branch("CMVPerTF", &mCurrentTF); + } + } + + void clearBuffer(const bool currentBuffer) + { + // resetting received CRUs + for (auto& crusMap : mProcessedCRUs[currentBuffer]) { + for (auto& it : crusMap) { + it.second = false; + } + } + + mProcessedTFs[currentBuffer] = 0; // reset processed TFs for next aggregation interval + std::fill(mProcessedCRU[currentBuffer].begin(), mProcessedCRU[currentBuffer].end(), 0); + std::fill(mOrbitInfoForwarded[currentBuffer].begin(), mOrbitInfoForwarded[currentBuffer].end(), false); + + // set integration range for next integration interval + mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1; + mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1; + + // switch buffer + mBuffer = !mBuffer; + } + + void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const uint32_t tf) + { + if (!(mProcessedTotalData++ % mCheckEveryNData)) { + LOGP(info, "Checking for dropped packages..."); + + // if last buffer has smaller time range check the whole last buffer + if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) { + LOGP(warning, "Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size()); + checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size()); + LOGP(info, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf); + sendOutput(pc.outputs(), tf); + finishInterval(pc, !currentBuffer, tf); + } + + const int tfEndCheck = std::clamp(static_cast(relTF) - mNTFsDataDrop, 0, static_cast(mProcessedCRU[currentBuffer].size())); + LOGP(info, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck); + checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck); + mStartNTFsDataDrop[currentBuffer] = tfEndCheck; + } + } + + void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF) + { + for (int iTF = startTF; iTF < endTF; ++iTF) { + if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) { + LOGP(warning, "CRUs for rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size()); + ++mProcessedTFs[currentBuffer]; + mProcessedCRU[currentBuffer][iTF] = mCRUs.size(); + + // find missing CRUs and leave their interval slots empty (zero-filled) + for (auto& it : mProcessedCRUs[currentBuffer][iTF]) { + if (!it.second) { + it.second = true; + } + } + + // leave orbit/BC as zero placeholder for missing TFs + mOrbitInfoForwarded[currentBuffer][iTF] = true; + } + } + } + + void finishInterval(o2::framework::ProcessingContext& pc, const bool buffer, const uint32_t tf) + { + if (mNFactorTFs > 0) { + mNFactorTFs = 0; + // ToDo: Find better fix + auto& deviceProxy = pc.services().get(); + if (deviceProxy.getNumOutputChannels() > 0) { + auto& state = deviceProxy.getOutputChannelState({0}); + size_t oldest = std::numeric_limits::max() - 1; // just set to really large value + state.oldestForChannel = {oldest}; + } + } + + LOGP(info, "All TFs {} for current buffer received. Clearing buffer", tf); + clearBuffer(buffer); + mStartNTFsDataDrop[buffer] = 0; + + // reset per-interval state for the next aggregation interval + initIntervalTree(); + mIntervalFirstTF = 0; + mIntervalTFCount = 0; + mCurrentTF = CMVPerTF{}; + mCurrentCompressedTF = CMVPerTFCompressed{}; + mTimestampStart = 0; + LOGP(info, "Everything cleared. Waiting for new data to arrive."); + } + + void setTimestampCCDB(const long relTF, o2::framework::ProcessingContext& pc) + { + if (mUsePreciseTimestamp && !mTFInfo.second) { + return; + } + const auto& tinfo = pc.services().get(); + const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * mTFInfo.second; + mTimestampStart = mUsePreciseTimestamp + ? (mTFInfo.first + (tinfo.firstTForbit - nOrbitsOffset) * o2::constants::lhc::LHCOrbitMUS * 0.001) + : tinfo.creation; + LOGP(info, "Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}", + mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, mTFInfo.second, relTF, nOrbitsOffset); + } + + void sendOutput(DataAllocator& output, const uint32_t tf) + { + using timer = std::chrono::high_resolution_clock; + + if (mIntervalTFCount == 0) { + LOGP(warning, "CMV interval is empty at sendOutput, skipping"); + return; + } + + // attach interval metadata to the TTree (stored once per tree) + mIntervalTree->GetUserInfo()->Clear(); + mIntervalTree->GetUserInfo()->Add(new TParameter("firstTF", mIntervalFirstTF)); + mIntervalTree->GetUserInfo()->Add(new TParameter("lastTF", mLastSeenTF)); + + LOGP(info, "CMVPerTF TTree: {} entries, firstTF={}, lastTF={}", mIntervalTFCount, mIntervalFirstTF, mLastSeenTF); + auto start = timer::now(); + + // write local ROOT file for debugging + if (mDumpCMVs) { + const std::string fname = fmt::format("CMV_timestamp{}.root", mTimestampStart); + try { + mCurrentTF.writeToFile(fname, mIntervalTree); + LOGP(info, "CMV debug file written to {}", fname); + } catch (const std::exception& e) { + LOGP(error, "Failed to write CMV debug file: {}", e.what()); + } + } + + if (!mSendCCDB) { + LOGP(warning, "CCDB output disabled, skipping upload!"); + return; + } + + const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF(); + // use the actual number of TFs in this interval (mIntervalTFCount) rather than mTimeFrames, so the CCDB validity end is correct for partial last intervals + const long timeStampEnd = mTimestampStart + static_cast(mIntervalTFCount * mNTFsBuffer * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3); + + if (timeStampEnd <= mTimestampStart) { + LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload!", + mTimestampStart, timeStampEnd); + return; + } + + LOGP(info, "CCDB timestamp range start:{} end:{}", mTimestampStart, timeStampEnd); + + o2::ccdb::CcdbObjectInfo ccdbInfoCMV( + "TPC/Calib/CMV", + "TTree", + "CMV.root", + {}, + mTimestampStart, + timeStampEnd); + + auto image = o2::ccdb::CcdbApi::createObjectImage((mIntervalTree.get()), &ccdbInfoCMV); + // trim TMemFile zero-padding: GetSize() is block-rounded, GetEND() is the actual file end + { + TMemFile mf("trim", image->data(), static_cast(image->size()), "READ"); + image->resize(static_cast(mf.GetEND())); + mf.Close(); + } + LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", + ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), + ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp()); + + output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image); + output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV); + + auto stop = timer::now(); + std::chrono::duration elapsed = stop - start; + LOGP(info, "CMV CCDB serialisation time: {:.3f} s", elapsed.count()); + } +}; + +DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector& crus, const unsigned int timeframes, const int firstTF, const bool sendCCDB = false, const bool usePreciseTimestamp = false, const int nTFsBuffer = 1) +{ + std::vector inputSpecs; + inputSpecs.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic}); + inputSpecs.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic}); + + std::vector outputSpecs; + if (sendCCDB) { + outputSpecs.emplace_back( + ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, + TPCDistributeCMVSpec::getDataDescriptionCCDBCMV()}, + Lifetime::Sporadic); + outputSpecs.emplace_back( + ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, + TPCDistributeCMVSpec::getDataDescriptionCCDBCMV()}, + Lifetime::Sporadic); + } + + const bool fetchCCDB = usePreciseTimestamp; + auto ccdbRequest = std::make_shared(fetchCCDB, // orbitResetTime + fetchCCDB, // GRPECS=true + false, // GRPLHCIF + false, // GRPMagField + false, // askMatLUT + o2::base::GRPGeomRequest::None, // geometry + inputSpecs); + + const std::string type = "cmv"; + const auto id = fmt::format("tpc-distribute-{}-{:02}", type, ilane); + DataProcessorSpec spec{ + id.data(), + inputSpecs, + outputSpecs, + AlgorithmSpec{adaptFromTask(crus, timeframes, nTFsBuffer, firstTF, sendCCDB, usePreciseTimestamp, ccdbRequest)}, + Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data"}}, + {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)"}}, + {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF"}}, + {"dump-cmvs", VariantType::Bool, false, {"Dump CMVs to a local ROOT file for debugging"}}, + {"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}}, + {"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}}, + {"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}}, + {"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}}, + {"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}}, + {"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}}, + {"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}}; // end DataProcessorSpec + + spec.rank = ilane; + return spec; +} + +} // namespace o2::tpc + +#endif diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h new file mode 100644 index 0000000000000..9931c27c9d3fa --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h @@ -0,0 +1,172 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file TPCFLPCMVSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief TPC device for processing CMVs on FLPs + +#ifndef O2_TPCFLPIDCSPEC_H +#define O2_TPCFLPIDCSPEC_H + +#include +#include +#include +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/ConfigParamRegistry.h" +#include "Headers/DataHeader.h" +#include "TPCWorkflow/ProcessingHelpers.h" +#include "TPCBase/CRU.h" +#include "TFile.h" + +using namespace o2::framework; +using o2::header::gDataOriginTPC; +using namespace o2::tpc; + +namespace o2::tpc +{ + +class TPCFLPCMVDevice : public o2::framework::Task +{ + public: + TPCFLPCMVDevice(const int lane, const std::vector& crus, const int nTFsBuffer) + : mLane{lane}, mCRUs{crus}, mNTFsBuffer{nTFsBuffer} {} + + void init(o2::framework::InitContext& ic) final + { + mDumpCMVs = ic.options().get("dump-cmvs-flp"); + } + + void run(o2::framework::ProcessingContext& pc) final + { + LOGP(debug, "Processing CMVs for TF {} for CRUs {} to {}", processing_helpers::getCurrentTF(pc), mCRUs.front(), mCRUs.back()); + + ++mCountTFsForBuffer; + + // Capture heartbeatOrbit / heartbeatBC from the first TF in the buffer + if (mCountTFsForBuffer == 1) { + for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); + const uint32_t cru = hdr->subSpecification >> 7; + if (mFirstOrbitBC.find(cru) == mFirstOrbitBC.end()) { + auto orbitVec = pc.inputs().get>(ref); + if (!orbitVec.empty()) { + mFirstOrbitBC[cru] = orbitVec[0]; // packed: orbit<<32 | bc + } + } + } + } + + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); + const int cru = tpcCRUHeader->subSpecification >> 7; + auto vecCMVs = pc.inputs().get>(ref); + mCMVs[cru].insert(mCMVs[cru].end(), vecCMVs.begin(), vecCMVs.end()); + } + + if (mCountTFsForBuffer >= mNTFsBuffer) { + mCountTFsForBuffer = 0; + for (const auto cru : mCRUs) { + LOGP(debug, "Sending CMVs of size {} for TF {}", mCMVs[cru].size(), processing_helpers::getCurrentTF(pc)); + sendOutput(pc.outputs(), cru); + } + mFirstOrbitBC.clear(); + } + + if (mDumpCMVs) { + TFile fOut(fmt::format("CMVs_{}_tf_{}.root", mLane, processing_helpers::getCurrentTF(pc)).data(), "RECREATE"); + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); + const int cru = tpcCRUHeader->subSpecification >> 7; + auto vec = pc.inputs().get>(ref); + fOut.WriteObject(&vec, fmt::format("CRU_{}", cru).data()); + } + } + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + if (mCountTFsForBuffer > 0) { + LOGP(info, "Flushing remaining {} buffered TFs at end of stream", mCountTFsForBuffer); + for (const auto cru : mCRUs) { + sendOutput(ec.outputs(), cru); + } + } + ec.services().get().readyToQuit(QuitRequest::Me); + } + + static constexpr header::DataDescription getDataDescriptionCMVGroup() { return header::DataDescription{"CMVGROUP"}; } + + /// Data description for the packed (orbit<<32|bc) scalar forwarded alongside each CRU's CMVGROUP. + static constexpr header::DataDescription getDataDescriptionCMVOrbitInfo() { return header::DataDescription{"CMVORBITINFO"}; } + + private: + const int mLane{}; ///< lane number of processor + const std::vector mCRUs{}; ///< CRUs to process in this instance + int mNTFsBuffer{1}; ///< number of TFs to buffer before sending + bool mDumpCMVs{}; ///< dump CMVs to file for debugging + int mCountTFsForBuffer{0}; ///< counts TFs to track when to send output + std::unordered_map> mCMVs{}; ///< buffered raw 16-bit CMV values per CRU + std::unordered_map mFirstOrbitBC{}; ///< first packed orbit/BC per CRU for the current buffer window + + /// Filter for CMV float vectors (one CMVVECTOR message per CRU per TF) + const std::vector mFilter = {{"cmvs", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVVECTOR"}, Lifetime::Timeframe}}; + /// Filter for CMV packet timing info (one CMVORBITS message per CRU per TF, sent by CMVToVectorSpec) + const std::vector mOrbitFilter = {{"cmvorbits", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVORBITS"}, Lifetime::Timeframe}}; + + void sendOutput(DataAllocator& output, const uint32_t cru) + { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + + // Forward the first-TF orbit/BC for this CRU (0 if unavailable for any reason) + uint64_t orbitBC = 0; + if (auto it = mFirstOrbitBC.find(cru); it != mFirstOrbitBC.end()) { + orbitBC = it->second; + } + output.snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitInfo(), subSpec}, orbitBC); + + output.adoptContainer(Output{gDataOriginTPC, getDataDescriptionCMVGroup(), subSpec}, std::move(mCMVs[cru])); + } +}; + +DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector& crus, const int nTFsBuffer = 1) +{ + std::vector outputSpecs; + std::vector inputSpecs; + outputSpecs.reserve(crus.size()); + inputSpecs.reserve(crus.size()); + + for (const auto& cru : crus) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + + // Inputs from CMVToVectorSpec + inputSpecs.emplace_back(InputSpec{"cmvs", gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe}); + inputSpecs.emplace_back(InputSpec{"cmvorbits", gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe}); + + // Outputs to TPCDistributeCMVSpec + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(), subSpec}, Lifetime::Sporadic); + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo(), subSpec}, Lifetime::Sporadic); + } + + const auto id = fmt::format("tpc-flp-cmv-{:02}", ilane); + return DataProcessorSpec{ + id.data(), + inputSpecs, + outputSpecs, + AlgorithmSpec{adaptFromTask(ilane, crus, nTFsBuffer)}, + Options{{"dump-cmvs-flp", VariantType::Bool, false, {"Dump CMVs to file"}}}}; +} + +} // namespace o2::tpc +#endif \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx new file mode 100644 index 0000000000000..81ce358d1a809 --- /dev/null +++ b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx @@ -0,0 +1,434 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVToVectorSpec.cxx +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Processor to convert CMVs to a vector in a CRU + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "TFile.h" +#include "DetectorsRaw/RDHUtils.h" +#include "Framework/Task.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/DataRefUtils.h" +#include "DPLUtils/RawParser.h" +#include "Headers/DataHeader.h" +#include "Headers/DataHeaderHelpers.h" +#include "CommonUtils/TreeStreamRedirector.h" + +#include "DataFormatsTPC/CMV.h" +#include "DataFormatsTPC/RawDataTypes.h" +#include "TPCBase/RDHUtils.h" +#include "TPCBase/Mapper.h" +#include "TPCWorkflow/ProcessingHelpers.h" + +using namespace o2::framework; +using o2::header::gDataOriginTPC; +using RDHUtils = o2::raw::RDHUtils; +using RawDataType = o2::tpc::raw_data_types::Type; + +namespace o2::tpc +{ + +class CMVToVectorDevice : public o2::framework::Task +{ + public: + using FEEIDType = rdh_utils::FEEIDType; + CMVToVectorDevice(const std::vector& crus) : mCRUs(crus) {} + + void init(o2::framework::InitContext& ic) final + { + // set up ADC value filling + mWriteDebug = ic.options().get("write-debug"); + mWriteDebugOnError = ic.options().get("write-debug-on-error"); + mWriteRawDataOnError = ic.options().get("write-raw-data-on-error"); + mRawDataType = ic.options().get("raw-data-type"); + o2::framework::RawParser<>::setCheckIncompleteHBF(ic.options().get("check-incomplete-hbf")); + + mDebugStreamFileName = ic.options().get("debug-file-name").data(); + mRawOutputFileName = ic.options().get("raw-file-name").data(); + + initCMV(); + } + + void run(o2::framework::ProcessingContext& pc) final + { + const auto runNumber = processing_helpers::getRunNumber(pc); + std::vector filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; + const auto& mapper = Mapper::instance(); + + // open files if necessary + if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) { + const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg("run", runNumber)); + LOGP(info, "Creating debug stream {}", debugFileName); + mDebugStream = std::make_unique(debugFileName.data(), "recreate"); + } + + if (mWriteRawDataOnError && !mRawOutputFile.is_open()) { + std::string_view rawType = (mRawDataType < 2) ? "tf" : "raw"; + if (mRawDataType == 5) { + rawType = "cmv.raw"; + } + const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg("run", runNumber), fmt::arg("raw_type", rawType)); + LOGP(info, "Creating raw debug file {}", rawFileName); + mRawOutputFile.open(rawFileName, std::ios::binary); + } + + uint32_t heartbeatOrbit = 0; + uint16_t heartbeatBC = 0; + uint32_t tfCounter = 0; + bool first = true; + bool hasErrors = false; + + for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) { + const auto* dh = DataRefUtils::getHeader(ref); + tfCounter = dh->tfCounter; + const auto subSpecification = dh->subSpecification; + auto payloadSize = DataRefUtils::getPayloadSize(ref); + LOGP(debug, "Processing TF {}, subSpecification {}, payloadSize {}", tfCounter, subSpecification, payloadSize); + + // ---| data loop |--- + const gsl::span raw = pc.inputs().get>(ref); + try { + o2::framework::RawParser parser(raw.data(), raw.size()); + size_t lastErrorCount = 0; + + for (auto it = parser.begin(), end = parser.end(); it != end; ++it) { + const auto size = it.size(); + + if (parser.getNErrors() > lastErrorCount) { + lastErrorCount = parser.getNErrors(); + hasErrors = true; + } + + // skip empty packages (HBF open) + if (size == 0) { + continue; + } + + auto rdhPtr = reinterpret_cast(it.raw()); + const auto rdhVersion = RDHUtils::getVersion(rdhPtr); + if (!rdhPtr || rdhVersion < 6) { + throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data()); + } + + // ---| extract hardware information to do the processing |--- + const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr); + const auto link = rdh_utils::getLink(feeId); + const uint32_t cruID = rdh_utils::getCRU(feeId); + const auto detField = RDHUtils::getDetectorField(*rdhPtr); + + LOGP(debug, "Detected CMV packet: CRU {}, link {}, feeId {}", cruID, link, feeId); + + if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) { + LOGP(debug, "Skipping packet: detField {}, (expected RawDataType {}), link {}, (expected CMVLinkID {})", detField, (decltype(detField))RawDataType::CMV, link, rdh_utils::CMVLinkID); + continue; + } + + LOGP(debug, "Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link); + + if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) { + LOGP(warning, "CMV CRU {:3} not configured in CRUs, skipping", cruID); + continue; + } + + auto& cmvVec = mCMVvectors[cruID]; + auto& infoVec = mCMVInfos[cruID]; + + if (size != sizeof(cmv::Container)) { + LOGP(warning, "CMV packet size mismatch: got {} bytes, expected {} bytes (sizeof cmv::Container). Skipping package.", size, sizeof(cmv::Container)); + hasErrors = true; + continue; + } + auto data = it.data(); + auto& cmvs = *((cmv::Container*)(data)); + const uint32_t orbit = cmvs.header.heartbeatOrbit; + const uint16_t bc = cmvs.header.heartbeatBC; + + // record packet meta and append its CMV vector (3564 TB) + infoVec.emplace_back(orbit, bc); + cmvVec.reserve(cmvVec.size() + cmv::NTimeBinsPerPacket); + for (uint32_t tb = 0; tb < cmv::NTimeBinsPerPacket; ++tb) { + cmvVec.push_back(cmvs.getCMV(tb)); + // LOGP(debug, "Appended CMV {} for timebin {}, CRU {}, orbit {}, bc {}", cmvs.getCMV(tb), tb, cruID, orbit, bc); + } + } + } catch (const std::exception& e) { + // error message throtteling + using namespace std::literals::chrono_literals; + static std::unordered_map nErrorPerSubspec; + static std::chrono::time_point lastReport = std::chrono::steady_clock::now(); + const auto now = std::chrono::steady_clock::now(); + static size_t reportedErrors = 0; + const size_t MAXERRORS = 10; + const auto sleepTime = 10min; + ++nErrorPerSubspec[subSpecification]; + + if ((now - lastReport) < sleepTime) { + if (reportedErrors < MAXERRORS) { + ++reportedErrors; + std::string sleepInfo; + if (reportedErrors == MAXERRORS) { + sleepInfo = fmt::format(", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS, sleepTime); + } + LOGP(alarm, "EXCEPTION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts, + dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo); + lastReport = now; + } + } else { + lastReport = now; + reportedErrors = 0; + } + continue; + } + } + + hasErrors |= snapshotCMVs(pc.outputs(), tfCounter); + + if (mWriteDebug || (mWriteDebugOnError && hasErrors)) { + writeDebugOutput(tfCounter); + } + + if (mWriteRawDataOnError && hasErrors) { + writeRawData(pc.inputs()); + } + + // clear output + initCMV(); + } + + void closeFiles() + { + LOGP(info, "closeFiles"); + + if (mDebugStream) { + // set some default aliases + auto& stream = (*mDebugStream) << "cmvs"; + auto& tree = stream.getTree(); + tree.SetAlias("sector", "int(cru/10)"); + mDebugStream->Close(); + mDebugStream.reset(nullptr); + mRawOutputFile.close(); + } + } + + void stop() final + { + LOGP(info, "stop"); + closeFiles(); + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + LOGP(info, "endOfStream"); + // ec.services().get().readyToQuit(QuitRequest::Me); + closeFiles(); + } + + private: + /// CMV information for each cru + struct CMVInfo { + CMVInfo() = default; + CMVInfo(const CMVInfo&) = default; + CMVInfo(uint32_t orbit, uint16_t bc) : heartbeatOrbit(orbit), heartbeatBC(bc) {} + + uint32_t heartbeatOrbit{0}; + uint16_t heartbeatBC{0}; + + bool operator==(const uint32_t orbit) const { return (heartbeatOrbit == orbit); } + bool operator==(const CMVInfo& inf) const { return (inf.heartbeatOrbit == heartbeatOrbit) && (inf.heartbeatBC == heartbeatBC); } + bool matches(uint32_t orbit, int16_t bc) const { return ((heartbeatOrbit == orbit) && (heartbeatBC == bc)); } + }; + + int mRawDataType{0}; ///< type of raw data to dump in case of errors + bool mWriteDebug{false}; ///< write a debug output + bool mWriteDebugOnError{false}; ///< write a debug output in case of errors + bool mWriteRawDataOnError{false}; ///< write raw data in case of errors + std::vector mCRUs; ///< CRUs expected for this device + std::unordered_map> mCMVvectors; ///< raw 16-bit CMV values per cru over all CMV packets in the TF + std::unordered_map> mCMVInfos; ///< CMV packet information within the TF + std::string mDebugStreamFileName; ///< name of the debug stream output file + std::unique_ptr mDebugStream; ///< debug output streamer + std::ofstream mRawOutputFile; ///< raw output file + std::string mRawOutputFileName; ///< name of the raw output file + + //____________________________________________________________________________ + bool snapshotCMVs(DataAllocator& output, uint32_t tfCounter) + { + bool hasErrors = false; + + // send data per CRU with its own orbit/BC vector + for (auto& [cru, cmvVec] : mCMVvectors) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + const auto& infVec = mCMVInfos[cru]; + + if (infVec.size() != 4) { + // LOGP(error, "CRU {:3}: expected 4 packets per TF, got {}", cru, infVec.size()); + hasErrors = true; + } + if (cmvVec.size() != cmv::NTimeBinsPerPacket * infVec.size()) { + // LOGP(error, "CRU {:3}: vector size {} does not match expected {}", cru, cmvVec.size(), cmv::NTimeBinsPerPacket * infVec.size()); + hasErrors = true; + } + + std::vector orbitBCInfo; + orbitBCInfo.reserve(infVec.size()); + for (const auto& inf : infVec) { + orbitBCInfo.emplace_back((uint64_t(inf.heartbeatOrbit) << 32) + uint64_t(inf.heartbeatBC)); + } + + LOGP(debug, "Sending CMVs for CRU {} of size {} ({} packets)", cru, cmvVec.size(), infVec.size()); + output.snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec); + output.snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCInfo); + } + + return hasErrors; + } + + //____________________________________________________________________________ + void initCMV() + { + for (const auto cruID : mCRUs) { + auto& cmvVec = mCMVvectors[cruID]; + cmvVec.clear(); + + auto& infosCRU = mCMVInfos[cruID]; + infosCRU.clear(); + } + } + + //____________________________________________________________________________ + void writeDebugOutput(uint32_t tfCounter) + { + const auto& mapper = Mapper::instance(); + + mDebugStream->GetFile()->cd(); + auto& stream = (*mDebugStream) << "cmvs"; + uint32_t seen = 0; + static uint32_t firstOrbit = std::numeric_limits::max(); + + for (auto cru : mCRUs) { + if (mCMVInfos.find(cru) == mCMVInfos.end()) { + continue; + } + + auto& infos = mCMVInfos[cru]; + auto& cmvVec = mCMVvectors[cru]; + + stream << "cru=" << cru + << "tfCounter=" << tfCounter + << "nCMVs=" << cmvVec.size() + << "cmvs=" << cmvVec + << "\n"; + } + } + + void writeRawData(InputRecord& inputs) + { + if (!mRawOutputFile.is_open()) { + return; + } + + using DataHeader = o2::header::DataHeader; + + std::vector filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; + for (auto const& ref : InputRecordWalker(inputs, filter)) { + auto dh = DataRefUtils::getHeader(ref); + // LOGP(info, "write header: {}/{}/{}, payload size: {} / {}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize, ref.payloadSize); + if (((mRawDataType == 1) || (mRawDataType == 3)) && (dh->payloadSize == 2 * sizeof(o2::header::RAWDataHeader))) { + continue; + } + + if (mRawDataType < 2) { + mRawOutputFile.write(ref.header, sizeof(DataHeader)); + } + if (mRawDataType < 5) { + mRawOutputFile.write(ref.payload, ref.payloadSize); + } + + if (mRawDataType == 5) { + const gsl::span raw = inputs.get>(ref); + try { + o2::framework::RawParser parser(raw.data(), raw.size()); + for (auto it = parser.begin(), end = parser.end(); it != end; ++it) { + const auto size = it.size(); + // skip empty packages (HBF open) + if (size == 0) { + continue; + } + + auto rdhPtr = reinterpret_cast(it.raw()); + const auto rdhVersion = RDHUtils::getVersion(rdhPtr); + if (!rdhPtr || rdhVersion < 6) { + throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data()); + } + + // ---| extract hardware information to do the processing |--- + const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr); + const auto link = rdh_utils::getLink(feeId); + const auto detField = RDHUtils::getDetectorField(*rdhPtr); + + // only select CMVs + if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) { + continue; + } + + // write out raw data + mRawOutputFile.write((const char*)it.raw(), RDHUtils::getMemorySize(rdhPtr)); + } + } catch (...) { + } + } + } + } +}; + +o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus) +{ + using device = o2::tpc::CMVToVectorDevice; + + std::vector outputs; + for (const uint32_t cru : crus) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + outputs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe); + outputs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe); + } + + return DataProcessorSpec{ + fmt::format("tpc-cmv-to-vector"), + select(inputSpec.data()), + outputs, + AlgorithmSpec{adaptFromTask(crus)}, + Options{ + {"write-debug", VariantType::Bool, false, {"write a debug output tree"}}, + {"write-debug-on-error", VariantType::Bool, false, {"write a debug output tree in case errors occurred"}}, + {"debug-file-name", VariantType::String, "/tmp/cmv_vector_debug.{run}.root", {"name of the debug output file"}}, + {"write-raw-data-on-error", VariantType::Bool, false, {"dump raw data in case errors occurred"}}, + {"raw-file-name", VariantType::String, "/tmp/cmv_debug.{run}.{raw_type}", {"name of the raw output file"}}, + {"raw-data-type", VariantType::Int, 0, {"Which raw data to dump: 0-full TPC with DH, 1-full TPC with DH skip empty, 2-full TPC no DH, 3-full TPC no DH skip empty, 4-IDC raw only 5-CMV raw only"}}, + {"check-incomplete-hbf", VariantType::Bool, false, {"false: don't check; true: check and report"}}, + } // end Options + }; // end DataProcessorSpec +} +} // namespace o2::tpc \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx b/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx new file mode 100644 index 0000000000000..1040b64f98d04 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx @@ -0,0 +1,71 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include + +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "Framework/CompletionPolicy.h" +#include "Framework/CompletionPolicyHelpers.h" +#include "CommonUtils/ConfigurableParam.h" +#include "TPCBase/CRU.h" +#include "TPCWorkflow/CMVToVectorSpec.h" + +using namespace o2::framework; +using namespace o2::tpc; + +// customize the completion policy +void customize(std::vector& policies) +{ + using o2::framework::CompletionPolicy; + policies.push_back(CompletionPolicyHelpers::defineByName("tpc-cmv-to-vector", CompletionPolicy::CompletionOp::Consume)); +} + +// we need to add workflow options before including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + std::string crusDefault = "0-" + std::to_string(CRU::MaxCRU - 1); + + std::vector options{ + {"input-spec", VariantType::String, "A:TPC/RAWDATA", {"selection string input specs"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}}, + {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}}, + {"crus", VariantType::String, crusDefault.c_str(), {"List of TPC crus, comma separated ranges, e.g. 0-3,7,9-15"}}, + }; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + + using namespace o2::tpc; + + // set up configuration + o2::conf::ConfigurableParam::updateFromFile(config.options().get("configFile")); + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + o2::conf::ConfigurableParam::writeINI("o2tpccmv_configuration.ini"); + + const std::string inputSpec = config.options().get("input-spec"); + + const auto crus = o2::RangeTokenizer::tokenize(config.options().get("crus")); + + WorkflowSpec workflow; + + workflow.emplace_back(getCMVToVectorSpec(inputSpec, crus)); + + return workflow; +} diff --git a/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx b/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx new file mode 100644 index 0000000000000..b6aaaa0a109ad --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx @@ -0,0 +1,84 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "CommonUtils/ConfigurableParam.h" +#include "TPCWorkflow/TPCDistributeCMVSpec.h" +#include "Framework/CompletionPolicyHelpers.h" + +using namespace o2::framework; + +// customize the completion policy +void customize(std::vector& policies) +{ + using o2::framework::CompletionPolicy; + policies.push_back(CompletionPolicyHelpers::defineByName("tpc-distribute-*.*", CompletionPolicy::CompletionOp::Consume)); +} + +// we need to add workflow options before including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1); + + std::vector options{ + {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma separated ranges, e.g. 0-3,7,9-15"}}, + {"timeframes", VariantType::Int, 2000, {"Number of TFs which will be aggregated per aggregation interval."}}, + {"firstTF", VariantType::Int, -1, {"First time frame index. (if set to -1 the first TF will be automatically detected. Values < -1 are setting an offset for skipping the first TFs)"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}, + {"lanes", VariantType::Int, 1, {"Number of lanes of this device (CRUs are split per lane)"}}, + {"use-precise-timestamp", VariantType::Bool, false, {"Use precise timestamp which can be used for writing to CCDB"}}, + {"enable-CCDB-output", VariantType::Bool, false, {"Send output to the CCDB populator"}}, + {"n-TFs-buffer", VariantType::Int, 1, {"Buffer which was defined in the TPCFLPCMVSpec."}}}; + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + using namespace o2::tpc; + + // set up configuration + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + o2::conf::ConfigurableParam::writeINI("o2tpcdistributecmv_configuration.ini"); + + const auto tpcCRUs = o2::RangeTokenizer::tokenize(config.options().get("crus")); + const auto nCRUs = tpcCRUs.size(); + auto timeframes = static_cast(config.options().get("timeframes")); + const auto nLanes = static_cast(config.options().get("lanes")); + const auto firstTF = static_cast(config.options().get("firstTF")); + const bool usePreciseTimestamp = config.options().get("use-precise-timestamp"); + const bool sendCCDB = config.options().get("enable-CCDB-output"); + int nTFsBuffer = config.options().get("n-TFs-buffer"); + if (nTFsBuffer <= 0) { + nTFsBuffer = 1; + } + assert(timeframes >= nTFsBuffer); + timeframes /= nTFsBuffer; + LOGP(info, "Using {} timeframes as each TF contains {} CMVs", timeframes, nTFsBuffer); + const auto crusPerLane = nCRUs / nLanes + ((nCRUs % nLanes) != 0); + WorkflowSpec workflow; + for (int ilane = 0; ilane < nLanes; ++ilane) { + const auto first = tpcCRUs.begin() + ilane * crusPerLane; + if (first >= tpcCRUs.end()) { + break; + } + const auto last = std::min(tpcCRUs.end(), first + crusPerLane); + const std::vector rangeCRUs(first, last); + workflow.emplace_back(getTPCDistributeCMVSpec(ilane, rangeCRUs, timeframes, firstTF, sendCCDB, usePreciseTimestamp, nTFsBuffer)); + } + + return workflow; +} \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/tpc-flp-cmv.cxx b/Detectors/TPC/workflow/src/tpc-flp-cmv.cxx new file mode 100644 index 0000000000000..f41fe5b8fbd15 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-flp-cmv.cxx @@ -0,0 +1,72 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include +#include "CommonUtils/ConfigurableParam.h" +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "TPCWorkflow/TPCFLPCMVSpec.h" +#include "TPCBase/CRU.h" + +using namespace o2::framework; + +void customize(std::vector& workflowOptions) +{ + const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1); + const int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2); + + std::vector options{ + {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}}, + {"lanes", VariantType::Int, defaultlanes, {"Number of parallel processing lanes (crus are split per device)"}}, + {"time-lanes", VariantType::Int, 1, {"Number of parallel processing lanes (timeframes are split per device)"}}, + {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma separated ranges, e.g. 0-3,7,9-15"}}, + {"n-TFs-buffer", VariantType::Int, 1, {"Buffer n-TFs before sending output"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}}; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + using namespace o2::tpc; + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + const auto tpcCRUs = o2::RangeTokenizer::tokenize(config.options().get("crus")); + const auto nCRUs = tpcCRUs.size(); + const auto nLanes = std::min(static_cast(config.options().get("lanes")), nCRUs); + const auto time_lanes = static_cast(config.options().get("time-lanes")); + const auto crusPerLane = nCRUs / nLanes + ((nCRUs % nLanes) != 0); + const int nTFsBuffer = config.options().get("n-TFs-buffer"); + + o2::conf::ConfigurableParam::updateFromFile(config.options().get("configFile")); + o2::conf::ConfigurableParam::writeINI("o2tpcflp_configuration.ini"); + + WorkflowSpec workflow; + if (nLanes <= 0) { + return workflow; + } + + for (int ilane = 0; ilane < nLanes; ++ilane) { + const auto first = tpcCRUs.begin() + ilane * crusPerLane; + if (first >= tpcCRUs.end()) { + break; + } + const auto last = std::min(tpcCRUs.end(), first + crusPerLane); + const std::vector rangeCRUs(first, last); + workflow.emplace_back(timePipeline(getTPCFLPCMVSpec(ilane, rangeCRUs, nTFsBuffer), time_lanes)); + } + + return workflow; +} \ No newline at end of file