C++: optimize batch read/write paths and aligned table null handling#823
C++: optimize batch read/write paths and aligned table null handling#823ColinLeeo wants to merge 9 commits into
Conversation
69e1658 to
7db1a3a
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #823 +/- ##
===========================================
- Coverage 61.60% 60.51% -1.09%
===========================================
Files 734 735 +1
Lines 45968 48426 +2458
Branches 6897 7672 +775
===========================================
+ Hits 28317 29307 +990
- Misses 16637 17726 +1089
- Partials 1014 1393 +379 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR consolidates a large optimization branch into develop: it adds batch decode/write APIs across the C++ Decoder/Encoder hierarchy, multi-value aligned read paths with optional parallel decode, columnar tablet write helpers, SIMD fast paths, and a set of correctness fixes for aligned/table null handling (null TAG segments, null FIELD writes, all-null value pages, sparse aligned columns, repeated logical devices, ValuePageWriter::reset state). It also trims the C wrapper API (drops unused metadata export/tag-filter symbols, then re-adds tag-filter helpers in a different section) and removes several regression tests for behaviors it claims to fix.
Changes:
- Add batch decode/encode/write paths through
Decoder/Encoder/page/chunk writers and aMultiAlignedTimeseriesIndexplus single-device aligned fast-path reader. - Several aligned table fixes (null TAG/FIELD, all-null pages, single-device tablet flag,
ValuePageWriter::reset, double-free of first-page buffers viarelease_cur_page_data). - Build/infra: SIMD option, optional
BUILD_EXAMPLES, mem-stat counters widened to 64-bit, newBlockingQueue, removal of several existing regression tests, license-header punctuation churn in multiple CMake files.
Reviewed changes
Copilot reviewed 118 out of 119 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/src/reader/filter/{and,or}_filter.h, filter.h, time_operator.h | Adds satisfy_batch_time (uses fixed 129-element stack buffer — flagged). |
| cpp/src/encoding/{plain,decoder,encoder,plain_decoder,dictionary_encoder}.h | Batch encode/decode API + dictionary index assignment change (flagged). |
| cpp/src/writer/{value_,time_,}{chunk,page}_writer.{h,cc} | Batch write paths, first-page ownership transfer, larger page buffers. |
| cpp/src/writer/tsfile_table_writer.{h,cc}, tsfile_writer.h | Memoized lowercasing, idempotent close, optional parallel write pool. |
| cpp/src/reader/* | Aligned multi-value batch path, bloom-filter contains, table result-set lifecycle. |
| cpp/src/file/tsfile_io_writer.{h,cc}, restorable_tsfile_io_writer.cc | Recovery cleanup simplified; conditional sync_on_close_ (flagged); chunk-group index for O(1) lookup. |
| cpp/src/file/tsfile_io_reader.h | Device-node cache + multi-SSI alloc. |
| cpp/src/common/allocator/byte_stream.h, alloc_base.h, mem_alloc.cc | Page-mask bitwise modulo + power-of-2 rounding for wrapped buffers (flagged), 64-bit stat counters. |
| cpp/src/common/{tablet,schema,path,global,thread_pool}.* | Single-device flag, string-column uint32_t offsets, Path inlined, config knobs reshuffled. |
| cpp/src/common/container/{bit_map,blocking_queue,byte_buffer}.* | New BlockingQueue, BitMap::may_have_set_bits, bounds asserts. |
| cpp/src/compress/{snappy,lz4,uncompressed}_compressor.* | Safer after_compress ownership handling; Uncompressed now copies. |
| cpp/src/cwrapper/{tsfile_cwrapper.h,arrow_c.cc} | Tag-filter API moved, sliced-Arrow handling reverted (loses prior bug-fix paths). |
| cpp/test/** | Deletes several regression tests (deep path, missing measurement, aligned NULL boundary, dictionary RLE run counts, Arrow slice-with-offset, etc.) and adds new batch/page-boundary tests. |
| python/tsfile/dataset/reader.py + tests | Switches row reads to read_arrow_batch(). |
| cpp/{CMakeLists.txt,examples/**,src/CMakeLists.txt,src/common/CMakeLists.txt,test/CMakeLists.txt} | Build flags, SIMD option, Arrow/Parquet-dependent examples, license-header punctuation regressions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
651f4af to
fcef966
Compare
| // Without the try/catch, a task that throws would: | ||
| // (1) skip the active_-- below → wait_all() blocks forever | ||
| // because active_ never drops to zero, and | ||
| // (2) propagate the exception out of the std::thread function | ||
| // → std::terminate() takes down the whole process. | ||
| // Swallowing the exception is unfortunate but it matches the | ||
| // contract of the public submit(std::function<void()>) overload | ||
| // which has no way to surface the failure back to the caller. | ||
| // submit<F>() callers receive their error via the std::future | ||
| // wrapper installed by std::packaged_task — that path never | ||
| // reaches here, so this catch only fires for fire-and-forget | ||
| // tasks where the alternative is termination. | ||
| try { | ||
| task(); | ||
| } catch (...) { | ||
| // Intentionally suppressed; see comment above. | ||
| } | ||
| { | ||
| std::lock_guard<std::mutex> lk(mu_); |
5ffe889 to
2ad0461
Compare
| // Round n up to the next power of two (>=1). Used to normalize ByteStream | ||
| // page sizes so that `& page_mask_` is equivalent to `% page_size_`. | ||
| // Values above the largest power-of-two that fits in uint32_t are clamped to | ||
| // 0x80000000 — the previous `while (ps < n) ps <<= 1` would shift past 2^31 | ||
| // and overflow to 0, looping forever. | ||
| FORCE_INLINE uint32_t round_up_pow2(uint32_t n) { | ||
| if (n <= 1) return 1; | ||
| if (n > 0x80000000u) return 0x80000000u; | ||
| uint32_t v = n - 1; | ||
| v |= v >> 1; | ||
| v |= v >> 2; | ||
| v |= v >> 4; | ||
| v |= v >> 8; | ||
| v |= v >> 16; | ||
| return v + 1; | ||
| } |
There was a problem hiding this comment.
Is it faster to do so or store a pre-calculated array and use the number of leading zeros of n as the index?
| g_config_value_.float_encoding_type_ = GORILLA; | ||
| g_config_value_.double_encoding_type_ = GORILLA; | ||
| g_config_value_.float_encoding_type_ = PLAIN; | ||
| g_config_value_.double_encoding_type_ = PLAIN; | ||
| g_config_value_.string_encoding_type_ = PLAIN; |
| // Pick the strongest compressor that was actually compiled in. Gating on | ||
| // ENABLE_LZ4 while setting SNAPPY (the original code) would request a | ||
| // compressor that the factory can't produce when the build disables | ||
| // Snappy, returning nullptr at write time. | ||
| #ifdef ENABLE_SNAPPY | ||
| g_config_value_.default_compression_type_ = SNAPPY; | ||
| #elif defined(ENABLE_LZ4) | ||
| g_config_value_.default_compression_type_ = LZ4; | ||
| #else |
There was a problem hiding this comment.
Why is SNAPPY stronger than LZ4?
| #ifdef ENABLE_ANTLR4 | ||
| std::vector<std::string> nodes = | ||
| PathNodesGenerator::invokeParser(path_sc); | ||
| #else | ||
| std::vector<std::string> nodes = | ||
| IDeviceID::split_string(path_sc, '.'); | ||
| #endif | ||
| if (nodes.size() > 1) { | ||
| // Join nodes, then parse like write path / Java Path | ||
| // (route through the interpretive string ctor instead of | ||
| // the literal per-segment vector ctor, so a stored | ||
| // "root.sg.d1" device matches a query path | ||
| // "root.sg.d1.s1"). | ||
| std::string device_joined; | ||
| for (size_t i = 0; i + 1 < nodes.size(); ++i) { | ||
| if (i > 0) { | ||
| device_joined += PATH_SEPARATOR_CHAR; | ||
| } | ||
| device_joined += nodes[i]; | ||
| } | ||
| device_id_ = | ||
| std::make_shared<StringArrayDeviceID>(device_joined); | ||
| measurement_ = nodes[nodes.size() - 1]; | ||
| full_path_ = | ||
| device_id_->get_device_name() + "." + measurement_; | ||
| } else { | ||
| full_path_ = path_sc; | ||
| device_id_ = std::make_shared<StringArrayDeviceID>(); | ||
| measurement_ = path_sc; | ||
| } | ||
| } |
There was a problem hiding this comment.
Is this join necessary?
Why is device_id empty in the else branch?
| for (uint32_t i = start; i < count; i++) { | ||
| if (timestamps[i] < start_time_) start_time_ = timestamps[i]; | ||
| if (timestamps[i] > end_time_) end_time_ = timestamps[i]; | ||
| if (values[i] < min_value_) min_value_ = values[i]; | ||
| if (values[i] > max_value_) max_value_ = values[i]; | ||
| sum_value_ += (int64_t)values[i]; | ||
| } | ||
| last_value_ = values[count - 1]; | ||
| count_ += (count - start); | ||
| } |
There was a problem hiding this comment.
If the timestamps are sorted, there is no point in updating start_time_ and end_time_ each time.
If not, first_value and last_value should be associated with the timestamp that is start_time or end_time.
| } else { | ||
| int start_idx = 0; |
There was a problem hiding this comment.
This method is too long, may extract some sub-methods.
| auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id"); | ||
| result.emplace_back(std::move(sentinel), 0); |
There was a problem hiding this comment.
Is it possible to make the sentiel a static member?
| for (uint32_t r = start_idx; r < end_idx; r++) { | ||
| if (col_notnull_bitmap.test(r)) { | ||
| has_null = true; | ||
| break; | ||
| } |
There was a problem hiding this comment.
Is it possible to add test_range_any and test_range_all for Bitmap?
| if (IS_SUCC(ret)) { | ||
| save_first_page_data(value_page_writer_); | ||
| value_page_writer_.clear_page_data(); | ||
| // value_page_writer_.destroy_page_data(); |
| common::ThreadPool pool(0); | ||
| EXPECT_GE(pool.num_threads(), static_cast<size_t>(1)); |
There was a problem hiding this comment.
Or, maybe we can define that any submitted task will be executed in the submitter thread synchronously when size == 0?
Batch decode/encode APIs (PLAIN / TS2DIFF / Gorilla) with single-pass TsBlock decode, AVX2/NEON SIMD paths, a single process-wide worker pool for chunk-level parallel read and column-parallel write, and batched NEON statistics. On-disk format unchanged; interoperable with Java/Python.
MultiValueAlignedSkipsBatchPreservesValueAlignment and MultiValueAlignedWideChunkParallelDecode constructed ColIterator locals inside the read loop and called ssi->revert_tsblock() (which frees the TsBlock and its column vectors) while those iterators were still in scope. ~ColIterator() calls vec_->reset_offset(), writing back into the just-freed vector at the closing brace of the loop body — a real use-after-free that Linux Release+ASan flags and whose heap corruption then cascaded into spurious SprintzCodec/CRelease failures later in the same single-process run. Scope the ColIterators in a nested block so they are destroyed before revert_tsblock(). Verified on Linux x86_64 Release+ASan+UBSan: full suite 701/701 pass, 0 ASan/UBSan reports.
TsFileIOReader::get_cached_device_node keyed device_node_cache_ by
IDeviceID::get_device_name(), which renders a null tag segment as the
literal text "null". A device with a real null tag (e.g. tags
(null, b, c)) and a device whose tag value is the string "null"
(("null", b, c)) therefore produce the identical name "a.null.b.c" and
collide in the cache: whichever device is queried first populates the
entry, and every later query for the other device on the same reused
reader gets the first device's cached MetaIndexNode and silently reads
its chunks.
This surfaced through the Python dataset API, where one long-lived
TsFileReader answers many per-device queries: the pytest
test_dataset_null_tag_positions_and_string_null_are_distinct read
(null,b,c)'s data for the ("null",b,c) device. The device metadata
binary search (DeviceIDComparable, segment-based) was always correct;
only the string cache key was lossy.
Key the cache by a collision-free, length-prefixed encoding of the
segment vector that flags null segments explicitly, so a null tag can
never alias the string "null". Add a device_id unit test pinning the
invariant (names collide, segment equality distinguishes them).
Verified: C++ suite 707/707 (Release+ASan+UBSan), Python suite 150/150.
Release builds unconditionally added -march=native on Linux/macOS. Two
problems for artifacts that leave the build host:
1. Portability: a wheel/binary built on a newer CPU can fault with an
illegal instruction on an older target machine.
2. On virtualized macOS arm64 CI runners, -march=native mis-detects the
feature set and drops +crc, while snappy's CRC32 feature probe (run
without -march) still reports crc available and defines
SNAPPY_HAVE_NEON_CRC32=1. snappy.cc then calls the always_inline
__crc32cw intrinsic in a TU compiled without crc support:
error: always_inline function '__crc32cw' requires target feature
'crc', but would be inlined into a function compiled without 'crc'
The default (portable) target for macOS arm64 keeps +crc, so snappy's
fast path stays available while the binary stays portable.
Default Release flags are now -O3 -flto (LTO stays off on MinGW/Windows
as before). -march=native is opt-in via -DTSFILE_ENABLE_NATIVE_ARCH=ON
for local-only builds that never ship. Applies uniformly to CI, wheels,
and manual local builds.
Verified: non-ASan Release (CI config) builds snappy cleanly and passes
705/705; -DTSFILE_ENABLE_NATIVE_ARCH=ON restores -march=native.
Summary
This PR optimizes the C++ TsFile read/write paths for batch and columnar workloads, and fixes several aligned table null-handling issues uncovered while validating the optimized path.
It consolidates the batch decode/write work from the long-lived optimization branch into a reviewable change for
develop.Supersedes #749, #754, and #774.
Main Changes
Decoderhierarchy and implement batch paths for PLAIN, TS2DIFF, and Gorilla.ChunkReaderandAlignedChunkReader, including shared timestamp decoding for aligned multi-value reads.ByteStream, compressor, and page/chunk writer internals used by the optimized paths.Correctness Fixes
ValuePageWriter::reset()so row count and null bitmap state are reset together.Compatibility Notes
cpp/third_party/is left atdevelopstate so existing platform compatibility fixes are preserved.Verification
cmake --build cpp/target/build --target TsFile_Test -j1ctest --test-dir cpp/target/build/test --output-on-failure -R '^TsFileTableReaderTest\.TestNullInTable4$'ctest --test-dir cpp/target/build/test --output-on-failure -j4cd cpp && mvn spotless:checkcd cpp && mvn apache-rat:checkCurrent full C++ test result:
496/496tests pass.