Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,8 @@ mod tests {
use super::*;
use crate::client::WriteRecord;
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, TablePath};
use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema};
Expand Down Expand Up @@ -908,6 +909,8 @@ mod tests {
compression_type: ArrowCompressionType::None,
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
},
usize::MAX,
Arc::new(ArrowCompressionRatioEstimator::default()),
)?;

let mut row = GenericRow::new(2);
Expand Down
5 changes: 4 additions & 1 deletion crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,8 @@ mod tests {
use crate::client::WriteRecord;
use crate::client::metadata::Metadata;
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo, TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
Expand All @@ -1717,6 +1718,8 @@ mod tests {
compression_type: ArrowCompressionType::None,
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
},
usize::MAX,
Arc::new(ArrowCompressionRatioEstimator::default()),
)?;
let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
let row = GenericRow {
Expand Down
52 changes: 39 additions & 13 deletions crates/fluss/src/client/write/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::client::write::batch::WriteBatch::{ArrowLog, Kv};
use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, WriteBatch};
use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::compression::ArrowCompressionRatioEstimator;
use crate::config::Config;
use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, TableBucket};
Expand Down Expand Up @@ -235,6 +236,7 @@ impl RecordAccumulator {
dq: &mut VecDeque<WriteBatch>,
permit: MemoryPermit,
alloc_size: usize,
compression_ratio_estimator: Arc<ArrowCompressionRatioEstimator>,
) -> Result<RecordAppendResult> {
let physical_table_path = &record.physical_table_path;
let table_path = physical_table_path.get_table_path();
Expand All @@ -253,6 +255,8 @@ impl RecordAccumulator {
row_type,
current_time_ms(),
matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))),
alloc_size,
compression_ratio_estimator,
)?),
Record::Kv(kv_record) => Kv(KvWriteBatch::new(
self.batch_id.fetch_add(1, Ordering::Relaxed),
Expand Down Expand Up @@ -303,22 +307,29 @@ impl RecordAccumulator {
None
};

let dq = {
let mut binding = self
.write_batches
.entry(Arc::clone(physical_table_path))
.or_insert_with(|| BucketAndWriteBatches {
table_id: table_info.table_id,
is_partitioned_table,
partition_id,
batches: Default::default(),
});
let (dq, compression_ratio_estimator) = {
let mut binding =
self.write_batches
.entry(Arc::clone(physical_table_path))
.or_insert_with(|| BucketAndWriteBatches {
table_id: table_info.table_id,
is_partitioned_table,
partition_id,
batches: Default::default(),
compression_ratio_estimator: Arc::new(
ArrowCompressionRatioEstimator::default(),
),
});
let bucket_and_batches = binding.value_mut();
bucket_and_batches
let dq = bucket_and_batches
.batches
.entry(bucket_id)
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone()
.clone();
(
dq,
Arc::clone(&bucket_and_batches.compression_ratio_estimator),
)
};

let mut dq_guard = dq.lock();
Expand All @@ -336,6 +347,11 @@ impl RecordAccumulator {
// producer holds dq + blocks on memory, while sender needs dq to drain.
drop(dq_guard);

// TODO: Implement DynamicWriteBatchSizeEstimator matching Java's
// client.writer.dynamic-batch-size-enabled. Adjusts the batch size target
// per table based on observed actual batch sizes (grow 10% when >80% full,
// shrink 5% when <50% full, clamped to [2*pageSize, maxBatchSize]).
// This would improve memory limiter utilization for tables with small rows.
let batch_size = self.config.writer_batch_size as usize;
let record_size = record.estimated_record_size();
let alloc_size = batch_size.max(record_size);
Expand All @@ -348,7 +364,14 @@ impl RecordAccumulator {
return Ok(append_result); // permit drops here, memory released
}

self.append_new_batch(cluster, record, &mut dq_guard, permit, alloc_size)
self.append_new_batch(
cluster,
record,
&mut dq_guard,
permit,
alloc_size,
compression_ratio_estimator,
)
}

pub fn ready(&self, cluster: &Arc<Cluster>) -> Result<ReadyCheckResult> {
Expand Down Expand Up @@ -767,6 +790,7 @@ impl RecordAccumulator {
is_partitioned_table,
partition_id,
batches: Default::default(),
compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()),
});
let bucket_and_batches = binding.value_mut();
bucket_and_batches
Expand Down Expand Up @@ -912,6 +936,8 @@ struct BucketAndWriteBatches {
is_partitioned_table: bool,
partition_id: Option<PartitionId>,
batches: HashMap<BucketId, Arc<Mutex<VecDeque<WriteBatch>>>>,
/// Compression ratio estimator shared across Arrow log batches for this table.
compression_ratio_estimator: Arc<ArrowCompressionRatioEstimator>,
}

pub struct RecordAppendResult {
Expand Down
Loading
Loading