diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index edab91d5..0dff4e87 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -831,7 +831,8 @@ mod tests { use super::*; use crate::client::WriteRecord; use crate::compression::{ - ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, + DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, TablePath}; use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema}; @@ -908,6 +909,8 @@ mod tests { compression_type: ArrowCompressionType::None, compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }, + usize::MAX, + Arc::new(ArrowCompressionRatioEstimator::default()), )?; let mut row = GenericRow::new(2); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 43025393..15046af0 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -1700,7 +1700,8 @@ mod tests { use crate::client::WriteRecord; use crate::client::metadata::Metadata; use crate::compression::{ - ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, + DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo, TablePath}; use crate::record::MemoryLogRecordsArrowBuilder; @@ -1717,6 +1718,8 @@ mod tests { compression_type: ArrowCompressionType::None, compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }, + usize::MAX, + Arc::new(ArrowCompressionRatioEstimator::default()), )?; let physical_table_path = Arc::new(PhysicalTablePath::of(table_path)); let row = GenericRow { diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 53d00401..019d3b05 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -21,6 +21,7 @@ use crate::client::write::batch::WriteBatch::{ArrowLog, Kv}; use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, WriteBatch}; use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord}; use crate::cluster::{BucketLocation, Cluster, ServerNode}; +use crate::compression::ArrowCompressionRatioEstimator; use crate::config::Config; use crate::error::{Error, Result}; use crate::metadata::{PhysicalTablePath, TableBucket}; @@ -235,6 +236,7 @@ impl RecordAccumulator { dq: &mut VecDeque, permit: MemoryPermit, alloc_size: usize, + compression_ratio_estimator: Arc, ) -> Result { let physical_table_path = &record.physical_table_path; let table_path = physical_table_path.get_table_path(); @@ -253,6 +255,8 @@ impl RecordAccumulator { row_type, current_time_ms(), matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))), + alloc_size, + compression_ratio_estimator, )?), Record::Kv(kv_record) => Kv(KvWriteBatch::new( self.batch_id.fetch_add(1, Ordering::Relaxed), @@ -303,22 +307,29 @@ impl RecordAccumulator { None }; - let dq = { - let mut binding = self - .write_batches - .entry(Arc::clone(physical_table_path)) - .or_insert_with(|| BucketAndWriteBatches { - table_id: table_info.table_id, - is_partitioned_table, - partition_id, - batches: Default::default(), - }); + let (dq, compression_ratio_estimator) = { + let mut binding = + self.write_batches + .entry(Arc::clone(physical_table_path)) + .or_insert_with(|| BucketAndWriteBatches { + table_id: table_info.table_id, + is_partitioned_table, + partition_id, + batches: Default::default(), + compression_ratio_estimator: Arc::new( + ArrowCompressionRatioEstimator::default(), + ), + }); let bucket_and_batches = binding.value_mut(); - bucket_and_batches + let dq = bucket_and_batches .batches .entry(bucket_id) .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - .clone() + .clone(); + ( + dq, + Arc::clone(&bucket_and_batches.compression_ratio_estimator), + ) }; let mut dq_guard = dq.lock(); @@ -336,6 +347,11 @@ impl RecordAccumulator { // producer holds dq + blocks on memory, while sender needs dq to drain. drop(dq_guard); + // TODO: Implement DynamicWriteBatchSizeEstimator matching Java's + // client.writer.dynamic-batch-size-enabled. Adjusts the batch size target + // per table based on observed actual batch sizes (grow 10% when >80% full, + // shrink 5% when <50% full, clamped to [2*pageSize, maxBatchSize]). + // This would improve memory limiter utilization for tables with small rows. let batch_size = self.config.writer_batch_size as usize; let record_size = record.estimated_record_size(); let alloc_size = batch_size.max(record_size); @@ -348,7 +364,14 @@ impl RecordAccumulator { return Ok(append_result); // permit drops here, memory released } - self.append_new_batch(cluster, record, &mut dq_guard, permit, alloc_size) + self.append_new_batch( + cluster, + record, + &mut dq_guard, + permit, + alloc_size, + compression_ratio_estimator, + ) } pub fn ready(&self, cluster: &Arc) -> Result { @@ -767,6 +790,7 @@ impl RecordAccumulator { is_partitioned_table, partition_id, batches: Default::default(), + compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()), }); let bucket_and_batches = binding.value_mut(); bucket_and_batches @@ -912,6 +936,8 @@ struct BucketAndWriteBatches { is_partitioned_table: bool, partition_id: Option, batches: HashMap>>>, + /// Compression ratio estimator shared across Arrow log batches for this table. + compression_ratio_estimator: Arc, } pub struct RecordAppendResult { diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index e3cd2ca4..fd70cb97 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -17,7 +17,7 @@ use crate::client::broadcast::{BatchWriteResult, BroadcastOnce}; use crate::client::{Record, ResultHandle, WriteRecord}; -use crate::compression::ArrowCompressionInfo; +use crate::compression::{ArrowCompressionInfo, ArrowCompressionRatioEstimator}; use crate::error::{Error, Result}; use crate::metadata::{KvFormat, PhysicalTablePath, RowType}; use crate::record::MemoryLogRecordsArrowBuilder; @@ -230,6 +230,8 @@ impl ArrowLogWriteBatch { row_type: &RowType, create_ms: i64, to_append_record_batch: bool, + write_limit: usize, + compression_ratio_estimator: Arc, ) -> Result { let base = InnerWriteBatch::new(batch_id, physical_table_path, create_ms); Ok(Self { @@ -239,6 +241,8 @@ impl ArrowLogWriteBatch { row_type, to_append_record_batch, arrow_compression_info, + write_limit, + compression_ratio_estimator, )?, built_records: None, }) @@ -464,6 +468,8 @@ mod tests { &row_type, 0, false, + 2 * 1024 * 1024, + Arc::new(ArrowCompressionRatioEstimator::default()), ) .unwrap(); @@ -487,7 +493,7 @@ mod tests { let built_data = batch.build().unwrap(); let actual_size = built_data.len(); - let diff = actual_size - estimated_size; + let diff = actual_size.abs_diff(estimated_size); let threshold = actual_size / 10; // 10% tolerance assert!( diff <= threshold, @@ -508,6 +514,8 @@ mod tests { &row_type, 0, true, + 2 * 1024 * 1024, + Arc::new(ArrowCompressionRatioEstimator::default()), ) .unwrap(); @@ -538,7 +546,7 @@ mod tests { let built_data = batch.build().unwrap(); let actual_size = built_data.len(); - let diff = actual_size - estimated_size; + let diff = actual_size.abs_diff(estimated_size); let threshold = actual_size / 10; // 10% tolerance assert!( diff <= threshold, @@ -587,4 +595,196 @@ mod tests { "estimated size {estimated_size} is not equal to actual size" ); } + + /// Verifies byte-size-based fullness: + /// 1. Actual built size stays within the configured limit (no compression). + /// 2. Old 256-record cap is gone — large batches accept >256 small rows. + /// 3. Compression feedback loop: shared estimator updates after build(), + /// second batch with same estimator accepts more records. + #[test] + fn test_arrow_batch_byte_size_fullness() { + use crate::client::WriteRecord; + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, + DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::{DataField, DataTypes, RowType}; + use crate::row::GenericRow; + use std::sync::Arc; + + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataTypes::int(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); + + // --- Part 1: actual built size stays within limit (uncompressed) --- + let write_limit: usize = 16 * 1024; + let mut batch = ArrowLogWriteBatch::new( + 1, + Arc::clone(&physical_table_path), + 1, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + &row_type, + 0, + false, + write_limit, + Arc::new(ArrowCompressionRatioEstimator::default()), + ) + .unwrap(); + + let mut appended = 0; + for i in 0..100_000 { + let mut row = GenericRow::new(2); + row.set_field(0, i); + row.set_field(1, "hello_world"); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + &row, + ); + match batch.try_append(&record).unwrap() { + Some(_) => appended += 1, + None => break, + } + } + + assert!( + appended > 0 && appended < 100_000, + "batch should have filled, appended: {appended}" + ); + let built = batch.build().unwrap(); + assert!( + built.len() <= write_limit * 120 / 100, + "actual size {} exceeds write_limit {write_limit} by more than 20%", + built.len() + ); + + // --- Part 2: old 256-record cap is gone --- + let row_type_small = RowType::new(vec![DataField::new( + "id".to_string(), + DataTypes::int(), + None, + )]); + let mut batch = ArrowLogWriteBatch::new( + 2, + Arc::clone(&physical_table_path), + 1, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + &row_type_small, + 0, + false, + 2 * 1024 * 1024, + Arc::new(ArrowCompressionRatioEstimator::default()), + ) + .unwrap(); + + let mut appended = 0; + for i in 0..1000 { + let mut row = GenericRow::new(1); + row.set_field(0, i); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + &row, + ); + match batch.try_append(&record).unwrap() { + Some(_) => appended += 1, + None => break, + } + } + assert_eq!(appended, 1000, "2MB batch should fit 1000 tiny rows"); + + // --- Part 3: compression feedback loop --- + let estimator = Arc::new(ArrowCompressionRatioEstimator::default()); + assert_eq!(estimator.estimation(), 1.0); + + let write_limit = 64 * 1024; + let compression = ArrowCompressionInfo { + compression_type: ArrowCompressionType::Zstd, + compression_level: 3, + }; + + // First batch: fill and build with ZSTD. + let mut batch1 = ArrowLogWriteBatch::new( + 3, + Arc::clone(&physical_table_path), + 1, + compression.clone(), + &row_type, + 0, + false, + write_limit, + Arc::clone(&estimator), + ) + .unwrap(); + + for i in 0..500 { + let mut row = GenericRow::new(2); + row.set_field(0, i); + row.set_field(1, "aaaaaaaaaaaaaaaa"); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + &row, + ); + if batch1.try_append(&record).unwrap().is_none() { + break; + } + } + batch1.build().unwrap(); + + // Estimator should have decreased (ZSTD compresses repeated data well). + assert!( + estimator.estimation() < 1.0, + "ratio should decrease after compressed build, got: {}", + estimator.estimation() + ); + + // Second batch: same estimator → knows data compresses well → accepts more rows. + let mut batch2 = ArrowLogWriteBatch::new( + 4, + Arc::clone(&physical_table_path), + 1, + compression, + &row_type, + 0, + false, + write_limit, + Arc::clone(&estimator), + ) + .unwrap(); + + let mut appended2 = 0; + for i in 0..10_000 { + let mut row = GenericRow::new(2); + row.set_field(0, i); + row.set_field(1, "aaaaaaaaaaaaaaaa"); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + &row, + ); + match batch2.try_append(&record).unwrap() { + Some(_) => appended2 += 1, + None => break, + } + } + assert!( + appended2 > 500, + "second batch should accept more records with updated ratio, got: {appended2}" + ); + } } diff --git a/crates/fluss/src/compression/arrow_compression_ratio_estimator.rs b/crates/fluss/src/compression/arrow_compression_ratio_estimator.rs new file mode 100644 index 00000000..08b8048a --- /dev/null +++ b/crates/fluss/src/compression/arrow_compression_ratio_estimator.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::atomic::{AtomicU32, Ordering}; + +/// Adaptive estimator for Arrow compression ratios. +/// +/// Tracks the ratio between compressed and uncompressed Arrow body sizes. +/// The estimate adjusts asymmetrically: it increases quickly when compression +/// worsens (to avoid underestimating batch sizes) and decreases slowly when +/// compression improves (conservative). +/// +/// Thread-safe: uses atomic f32 (stored as u32 bits) matching Java's `volatile float`. +/// +/// Matching Java's `ArrowCompressionRatioEstimator`. +pub struct ArrowCompressionRatioEstimator { + /// Stored as `f32::to_bits()` for atomic access. + ratio_bits: AtomicU32, +} + +const COMPRESSION_RATIO_IMPROVING_STEP: f32 = 0.005; +const COMPRESSION_RATIO_DETERIORATE_STEP: f32 = 0.05; +const DEFAULT_COMPRESSION_RATIO: f32 = 1.0; + +impl ArrowCompressionRatioEstimator { + pub fn new() -> Self { + Self { + ratio_bits: AtomicU32::new(DEFAULT_COMPRESSION_RATIO.to_bits()), + } + } + + pub fn estimation(&self) -> f32 { + f32::from_bits(self.ratio_bits.load(Ordering::Relaxed)) + } + + pub fn update_estimation(&self, observed_ratio: f32) { + let current = self.estimation(); + let new_ratio = if observed_ratio > current { + (current + COMPRESSION_RATIO_DETERIORATE_STEP).max(observed_ratio) + } else if observed_ratio < current { + (current - COMPRESSION_RATIO_IMPROVING_STEP).max(observed_ratio) + } else { + return; + }; + self.ratio_bits + .store(new_ratio.to_bits(), Ordering::Relaxed); + } +} + +impl Default for ArrowCompressionRatioEstimator { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_ratio_is_one() { + let e = ArrowCompressionRatioEstimator::new(); + assert_eq!(e.estimation(), 1.0); + } + + #[test] + fn test_deterioration_jumps_quickly() { + let e = ArrowCompressionRatioEstimator::new(); + // Observed ratio worse than estimate: jump by at least DETERIORATE_STEP + e.update_estimation(1.1); + assert!(e.estimation() >= 1.05); + } + + #[test] + fn test_improvement_moves_slowly() { + let e = ArrowCompressionRatioEstimator::new(); + // Observed ratio better than estimate: move down by at most IMPROVING_STEP + e.update_estimation(0.5); + assert!((e.estimation() - 0.995).abs() < 0.001); + } + + #[test] + fn test_converges_to_observed() { + let e = ArrowCompressionRatioEstimator::new(); + // After many updates with same ratio, should converge + for _ in 0..1000 { + e.update_estimation(0.7); + } + assert!((e.estimation() - 0.7).abs() < 0.01); + } +} diff --git a/crates/fluss/src/compression/mod.rs b/crates/fluss/src/compression/mod.rs index 2b86dba7..29923c0a 100644 --- a/crates/fluss/src/compression/mod.rs +++ b/crates/fluss/src/compression/mod.rs @@ -16,5 +16,7 @@ // under the License. mod arrow_compression; +mod arrow_compression_ratio_estimator; pub use arrow_compression::*; +pub use arrow_compression_ratio_estimator::*; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index d8ba6d95..bda942ee 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -16,17 +16,20 @@ // under the License. use crate::client::{LogWriteRecord, Record, WriteRecord}; -use crate::compression::ArrowCompressionInfo; +use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, +}; use crate::error::{Error, Result}; use crate::metadata::{DataType, RowType}; use crate::record::{ChangeType, ScanRecord}; -use crate::row::column_writer::ColumnWriter; +use crate::row::column_writer::{ColumnWriter, round_up_to_8}; use crate::row::{ColumnarRow, InternalRow}; use arrow::array::{ArrayBuilder, ArrayRef}; use arrow::{ array::RecordBatch, buffer::Buffer, ipc::{ + CompressionType, reader::{StreamReader, read_record_batch}, root_as_message, writer::StreamWriter, @@ -40,6 +43,7 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::Bytes; use crc32c::crc32c; use std::{ + cell::Cell, collections::HashMap, fs::File, io::{Cursor, Read, Seek, SeekFrom, Write}, @@ -144,8 +148,13 @@ pub const NO_BATCH_SEQUENCE: i32 = -1; pub const BUILDER_DEFAULT_OFFSET: i64 = 0; -// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead of a hard record cap. -pub const DEFAULT_MAX_RECORD: i32 = 256; +/// Initial capacity for Arrow column vectors (pre-allocation hint, not a record cap). +/// Matching Java's `ArrowWriter.INITIAL_CAPACITY`. +const INITIAL_ROW_CAPACITY: usize = 1024; + +/// Fraction of the allocated buffer used as the effective write limit. +/// Matching Java's `ArrowWriter.BUFFER_USAGE_RATIO`. +const BUFFER_USAGE_RATIO: f32 = 0.95; pub struct MemoryLogRecordsArrowBuilder { base_log_offset: i64, @@ -156,9 +165,25 @@ pub struct MemoryLogRecordsArrowBuilder { arrow_record_batch_builder: Box, is_closed: bool, arrow_compression_info: ArrowCompressionInfo, + /// Effective write limit in bytes (after applying BUFFER_USAGE_RATIO). + write_limit: usize, + /// Pre-computed Arrow IPC overhead (metadata + body framing) for this schema. + /// Constant per schema+compression combination. + ipc_overhead: usize, + /// Estimated record count at which the next byte-size check should occur. + /// -1 means "unknown — check on the next append". Updated dynamically to + /// skip expensive `estimated_size_in_bytes()` calls on every append. + /// Matching Java's `ArrowWriter.estimatedMaxRecordsCount`. + estimated_max_records_count: Cell, + /// Compression ratio estimator shared across batches for the same table. + compression_ratio_estimator: Arc, + /// Snapshot of the compression ratio at batch creation time. + /// Matching Java's `ArrowWriter.estimatedCompressionRatio` which is + /// cached per batch and only refreshed on `reset()`. + estimated_compression_ratio: f32, } -pub trait ArrowRecordBatchInnerBuilder: Send + Sync { +pub trait ArrowRecordBatchInnerBuilder: Send { fn build_arrow_record_batch(&mut self) -> Result>; fn append(&mut self, row: &dyn InternalRow) -> Result; @@ -229,7 +254,7 @@ pub struct RowAppendRecordBatchBuilder { impl RowAppendRecordBatchBuilder { pub fn new(row_type: &RowType) -> Result { - let capacity = DEFAULT_MAX_RECORD as usize; + let capacity = INITIAL_ROW_CAPACITY; let schema_ref = to_arrow_schema(row_type)?; let writers: Result> = row_type .fields() @@ -310,26 +335,34 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { } fn is_full(&self) -> bool { - self.records_count() >= DEFAULT_MAX_RECORD + // Size-based fullness is handled by MemoryLogRecordsArrowBuilder, + // which accounts for metadata length and compression ratio. + false } fn estimated_size_in_bytes(&self) -> usize { - // Returns the uncompressed Arrow array memory size (same as Java's arrowWriter.estimatedSizeInBytes()). - // Note: This is the size before compression. After build(), the actual size may be smaller - // if compression is enabled. - self.column_writers - .iter() - .map(|writer| writer.finish_cloned().get_array_memory_size()) - .sum() + // Returns the uncompressed Arrow IPC body size by reading buffer lengths + // directly from the builders — O(num_columns), zero allocation. + // Analogous to Java's `ArrowUtils.estimateArrowBodyLength()`. + // Java reads exact IPC buffer sizes from vectors; we read builder + // buffer lengths. The IPC framing overhead is accounted for + // separately by `ipc_overhead`. + self.column_writers.iter().map(|w| w.buffer_size()).sum() } } +// TODO: Pool and reuse MemoryLogRecordsArrowBuilder instances per table/schema like +// Java's ArrowWriterPool. Reused writers can seed `estimated_max_records_count` from +// the previous batch (recordsCount / 2) for a warm start, avoiding the first-record +// size check on every new batch. impl MemoryLogRecordsArrowBuilder { pub fn new( schema_id: i32, row_type: &RowType, to_append_record_batch: bool, arrow_compression_info: ArrowCompressionInfo, + write_limit: usize, + compression_ratio_estimator: Arc, ) -> Result { let arrow_batch_builder: Box = { if to_append_record_batch { @@ -338,6 +371,11 @@ impl MemoryLogRecordsArrowBuilder { Box::new(RowAppendRecordBatchBuilder::new(row_type)?) } }; + let schema = to_arrow_schema(row_type)?; + let ipc_overhead = + estimate_arrow_ipc_overhead(&schema, arrow_compression_info.get_compression_type())?; + let effective_limit = (write_limit as f32 * BUFFER_USAGE_RATIO) as usize; + let estimated_compression_ratio = compression_ratio_estimator.estimation(); Ok(MemoryLogRecordsArrowBuilder { base_log_offset: BUILDER_DEFAULT_OFFSET, schema_id, @@ -347,6 +385,11 @@ impl MemoryLogRecordsArrowBuilder { is_closed: false, arrow_record_batch_builder: arrow_batch_builder, arrow_compression_info, + write_limit: effective_limit, + ipc_overhead, + estimated_max_records_count: Cell::new(-1), + compression_ratio_estimator, + estimated_compression_ratio, }) } @@ -367,8 +410,50 @@ impl MemoryLogRecordsArrowBuilder { // todo: consider write other change type } + /// Check if the builder is full based on estimated serialized size. + /// + /// Uses a threshold-based optimization to skip expensive size checks: + /// only computes the actual estimated size when the record count reaches + /// the predicted threshold. Matching Java's `ArrowWriter.isFull()`. pub fn is_full(&self) -> bool { - self.arrow_record_batch_builder.records_count() >= DEFAULT_MAX_RECORD + // Delegate to inner builder first (e.g. PrebuiltRecordBatchBuilder + // is always full after one batch, regardless of size). + if self.arrow_record_batch_builder.is_full() { + return true; + } + let records_count = self.arrow_record_batch_builder.records_count(); + let threshold = self.estimated_max_records_count.get(); + if records_count > 0 && records_count >= threshold { + let body_size = self.arrow_record_batch_builder.estimated_size_in_bytes(); + let estimated_body = self.estimated_compressed_size(body_size); + let current_size = self.ipc_overhead + estimated_body; + if current_size >= self.write_limit { + return true; + } + if estimated_body == 0 { + self.estimated_max_records_count.set(records_count + 1); + return false; + } + // Matching Java: subtract fixed metadata overhead from the limit, + // divide remaining body budget by per-record body cost. + let body_per_record = estimated_body as f64 / records_count as f64; + let next = ((self.write_limit.saturating_sub(self.ipc_overhead) as f64 + / body_per_record) + .ceil() as i32) + .max(records_count + 1); + self.estimated_max_records_count.set(next); + } + false + } + + /// Estimate the compressed body size using the ratio snapshot taken at batch creation. + /// Matching Java's `ArrowWriter.estimatedBytesWritten()`. + fn estimated_compressed_size(&self, uncompressed_body: usize) -> usize { + if self.arrow_compression_info.compression_type == ArrowCompressionType::None { + uncompressed_body + } else { + (uncompressed_body as f64 * self.estimated_compression_ratio as f64) as usize + } } pub fn is_closed(&self) -> bool { @@ -380,6 +465,9 @@ impl MemoryLogRecordsArrowBuilder { } pub fn build(&mut self) -> Result> { + // Capture uncompressed body size before serialization for compression ratio update. + let uncompressed_body_size = self.arrow_record_batch_builder.estimated_size_in_bytes(); + // serialize arrow batch let mut arrow_batch_bytes = vec![]; let table_schema = self.arrow_record_batch_builder.schema(); @@ -396,9 +484,23 @@ impl MemoryLogRecordsArrowBuilder { let header = writer.get_ref().len(); let record_batch = self.arrow_record_batch_builder.build_arrow_record_batch()?; writer.write(record_batch.as_ref())?; - // get real arrow batch bytes + // get real arrow batch bytes (metadata + body, potentially compressed) let real_arrow_batch_bytes = &arrow_batch_bytes[header..]; + // Update compression ratio estimator with actual ratio. + // The serialized bytes include metadata + compressed body. Subtract + // metadata to isolate the compressed body for an accurate ratio. + if uncompressed_body_size > 0 + && self.arrow_compression_info.compression_type != ArrowCompressionType::None + { + let compressed_body_size = real_arrow_batch_bytes + .len() + .saturating_sub(self.ipc_overhead); + let actual_ratio = compressed_body_size as f32 / uncompressed_body_size as f32; + self.compression_ratio_estimator + .update_estimation(actual_ratio); + } + // now, write batch header and arrow batch let mut batch_bytes = vec![0u8; RECORD_BATCH_HEADER_SIZE + real_arrow_batch_bytes.len()]; // write batch header @@ -451,12 +553,72 @@ impl MemoryLogRecordsArrowBuilder { } /// Get an estimate of the number of bytes written to the underlying buffer. - /// This includes the batch header size plus the estimated arrow data size. + /// Includes Fluss record batch header + Arrow IPC metadata + estimated + /// compressed body size. pub fn estimated_size_in_bytes(&self) -> usize { - RECORD_BATCH_HEADER_SIZE + self.arrow_record_batch_builder.estimated_size_in_bytes() + let body = self.arrow_record_batch_builder.estimated_size_in_bytes(); + let estimated_body = self.estimated_compressed_size(body); + RECORD_BATCH_HEADER_SIZE + self.ipc_overhead + estimated_body } } +/// Estimate the Arrow IPC overhead (metadata + body framing) for a given schema. +/// +/// Serializes a 1-row RecordBatch with known data sizes, then subtracts the +/// raw data contribution to isolate the fixed overhead: IPC message header, +/// RecordBatch flatbuffer, and per-buffer alignment padding within the body. +/// This overhead is constant for a given schema+compression combination. +/// +/// Note: called once per batch creation. With writer pooling (see TODO above), +/// this would be computed once per pooled writer and reused across batches. +/// Analogous to Java's `ArrowUtils.estimateArrowMetadataLength()`. +fn estimate_arrow_ipc_overhead( + schema: &SchemaRef, + compression: Option, +) -> Result { + use arrow::array::new_null_array; + + // Create a 1-row batch of nulls. Null arrays have minimal, predictable + // data: no validity bitmap, no variable-length data, just fixed-width + // zero buffers. This lets us compute raw data size exactly. + let null_arrays: Vec = schema + .fields() + .iter() + .map(|field| new_null_array(field.data_type(), 1)) + .collect(); + let batch = RecordBatch::try_new(schema.clone(), null_arrays)?; + + // Sum the raw buffer sizes — this is what buffer_size() would report. + let raw_data: usize = batch + .columns() + .iter() + .map(|col| { + col.to_data() + .buffers() + .iter() + .map(|buf| round_up_to_8(buf.len())) + .sum::() + // Validity buffer (null bitmap) + + col + .nulls() + .map_or(0, |n| round_up_to_8(n.buffer().len())) + }) + .sum(); + + // Serialize the batch via IPC and measure total output. + let mut buf = vec![]; + let write_option = + IpcWriteOptions::try_with_compression(IpcWriteOptions::default(), compression); + let mut writer = StreamWriter::try_new_with_options(&mut buf, schema, write_option?)?; + let header_len = writer.get_ref().len(); + writer.write(&batch)?; + let total_len = writer.get_ref().len(); + + // IPC overhead = total message size - raw data we put in. + let ipc_message_len = total_len - header_len; + Ok(ipc_message_len.saturating_sub(raw_data)) +} + pub trait ToArrow { fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>; } @@ -1910,6 +2072,8 @@ mod tests { compression_type: ArrowCompressionType::None, compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }, + usize::MAX, + Arc::new(ArrowCompressionRatioEstimator::default()), )?; let mut row = GenericRow::new(2); diff --git a/crates/fluss/src/row/column_writer.rs b/crates/fluss/src/row/column_writer.rs index 34dd0f5c..f82ad158 100644 --- a/crates/fluss/src/row/column_writer.rs +++ b/crates/fluss/src/row/column_writer.rs @@ -36,9 +36,16 @@ use arrow::array::{ }; use arrow_schema::DataType as ArrowDataType; +/// Round up to the next multiple of 8 (Arrow IPC buffer alignment). +#[inline] +pub(crate) fn round_up_to_8(n: usize) -> usize { + (n + 7) & !7 +} + /// Estimated average byte size for variable-width columns (Utf8, Binary). /// Used to pre-allocate data buffers and avoid reallocations during batch building. -const VARIABLE_WIDTH_AVG_BYTES: usize = 64; +/// Matches Java Arrow's `BaseVariableWidthVector.DEFAULT_RECORD_BYTE_COUNT`. +const VARIABLE_WIDTH_AVG_BYTES: usize = 8; /// A typed column writer that reads one column from an [`InternalRow`] and /// appends directly to a concrete Arrow builder — no intermediate [`Datum`], @@ -351,9 +358,70 @@ impl ColumnWriter { self.as_builder_mut().finish() } - /// Clone-finish the builder for size estimation (does not reset the builder). - pub fn finish_cloned(&self) -> ArrayRef { - self.as_builder_ref().finish_cloned() + /// Returns the total buffer size in bytes, rounded up to 8-byte alignment + /// per buffer. Reads buffer lengths directly from the builders — O(1), no + /// allocation. Analogous to Java's `ArrowUtils.estimateArrowBodyLength()` + /// which sums `buf.readableBytes()` with 8-byte rounding per buffer. + /// The IPC framing overhead not captured here is accounted for separately + /// by `estimate_arrow_ipc_overhead()`. + pub fn buffer_size(&self) -> usize { + /// Validity bitmap size, rounded to 8-byte alignment. + /// When no nulls have been appended, the builder does not materialize + /// the bitmap and the IPC body contributes 0 bytes for this buffer. + #[inline] + fn validity_size(slice: Option<&[u8]>) -> usize { + round_up_to_8(slice.map_or(0, |s| s.len())) + } + + /// Primitive builder: validity + values (values_slice returns &[T::Native]). + macro_rules! primitive_size { + ($b:expr) => { + validity_size($b.validity_slice()) + + round_up_to_8(std::mem::size_of_val($b.values_slice())) + }; + } + + /// Variable-width builder: validity + offsets + values. + macro_rules! var_width_size { + ($b:expr) => { + validity_size($b.validity_slice()) + + round_up_to_8(std::mem::size_of_val($b.offsets_slice())) + + round_up_to_8($b.values_slice().len()) + }; + } + + match &self.inner { + TypedWriter::Bool(b) => { + validity_size(b.validity_slice()) + round_up_to_8(b.values_slice().len()) + } + TypedWriter::Int8(b) => primitive_size!(b), + TypedWriter::Int16(b) => primitive_size!(b), + TypedWriter::Int32(b) => primitive_size!(b), + TypedWriter::Int64(b) => primitive_size!(b), + TypedWriter::Float32(b) => primitive_size!(b), + TypedWriter::Float64(b) => primitive_size!(b), + TypedWriter::Decimal128 { builder: b, .. } => primitive_size!(b), + TypedWriter::Date32(b) => primitive_size!(b), + TypedWriter::Time32Second(b) => primitive_size!(b), + TypedWriter::Time32Millisecond(b) => primitive_size!(b), + TypedWriter::Time64Microsecond(b) => primitive_size!(b), + TypedWriter::Time64Nanosecond(b) => primitive_size!(b), + TypedWriter::TimestampNtzSecond { builder: b, .. } => primitive_size!(b), + TypedWriter::TimestampNtzMillisecond { builder: b, .. } => primitive_size!(b), + TypedWriter::TimestampNtzMicrosecond { builder: b, .. } => primitive_size!(b), + TypedWriter::TimestampNtzNanosecond { builder: b, .. } => primitive_size!(b), + TypedWriter::TimestampLtzSecond { builder: b, .. } => primitive_size!(b), + TypedWriter::TimestampLtzMillisecond { builder: b, .. } => primitive_size!(b), + TypedWriter::TimestampLtzMicrosecond { builder: b, .. } => primitive_size!(b), + TypedWriter::TimestampLtzNanosecond { builder: b, .. } => primitive_size!(b), + // Variable-width types: validity + offsets + values + TypedWriter::Char { builder: b, .. } => var_width_size!(b), + TypedWriter::String(b) => var_width_size!(b), + TypedWriter::Bytes(b) => var_width_size!(b), + TypedWriter::Binary { builder: b, .. } => { + validity_size(b.validity_slice()) + round_up_to_8(b.values_slice().len()) + } + } } fn append_null(&mut self) { @@ -361,15 +429,11 @@ impl ColumnWriter { } /// Returns a trait-object reference to the inner builder. - /// Used for type-agnostic operations (`finish`, `finish_cloned`). + /// Used for type-agnostic operations (`finish`). fn as_builder_mut(&mut self) -> &mut dyn ArrayBuilder { with_builder!(&mut self.inner, b => b) } - fn as_builder_ref(&self) -> &dyn ArrayBuilder { - with_builder!(&self.inner, b => b) - } - #[inline] fn write_non_null(&mut self, row: &dyn InternalRow) -> Result<()> { let pos = self.pos; @@ -746,10 +810,10 @@ mod tests { assert_eq!(int_arr.value(1), 20); assert_eq!(int_arr.value(2), 30); - // finish_cloned does not reset + // buffer_size grows with appended data and does not reset the builder let mut w = writer_for(&DataTypes::int(), 4); w.write_field(&GenericRow::from_data(vec![42_i32])).unwrap(); - assert_eq!(w.finish_cloned().len(), 1); + assert!(w.buffer_size() > 0); w.write_field(&GenericRow::from_data(vec![99_i32])).unwrap(); let int_arr = w .finish()