Skip to content

Commit 6989e87

Browse files
Merge branch 'main' into fluss-admin-inst-and-cache
2 parents 0493742 + 7d4bfd6 commit 6989e87

31 files changed

Lines changed: 3535 additions & 197 deletions

bindings/cpp/include/fluss.hpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ struct ErrorCode {
180180
static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56;
181181
/// Deletion operations are disabled on this table.
182182
static constexpr int DELETION_DISABLED_EXCEPTION = 57;
183+
184+
/// Returns true if retrying the request may succeed. Mirrors Java's RetriableException hierarchy.
185+
static constexpr bool IsRetriable(int32_t code) {
186+
return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE ||
187+
code == SCHEMA_NOT_EXIST || code == LOG_STORAGE_EXCEPTION ||
188+
code == KV_STORAGE_EXCEPTION || code == NOT_LEADER_OR_FOLLOWER ||
189+
code == CORRUPT_RECORD_EXCEPTION ||
190+
code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code == REQUEST_TIME_OUT ||
191+
code == STORAGE_EXCEPTION ||
192+
code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION ||
193+
code == NOT_ENOUGH_REPLICAS_EXCEPTION || code == LEADER_NOT_AVAILABLE_EXCEPTION;
194+
}
183195
};
184196

185197
struct Date {
@@ -326,6 +338,9 @@ struct Result {
326338
std::string error_message;
327339

328340
bool Ok() const { return error_code == 0; }
341+
342+
/// Returns true if retrying the request may succeed. Client-side errors always return false.
343+
bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); }
329344
};
330345

331346
struct TablePath {
@@ -997,7 +1012,23 @@ struct Configuration {
9971012
size_t scanner_remote_log_read_concurrency{4};
9981013
// Maximum number of records returned in a single call to Poll() for LogScanner
9991014
size_t scanner_log_max_poll_records{500};
1015+
// Maximum bytes per fetch response for LogScanner (16 MB)
1016+
int32_t scanner_log_fetch_max_bytes{16 * 1024 * 1024};
1017+
// Minimum bytes to accumulate before server returns a fetch response
1018+
int32_t scanner_log_fetch_min_bytes{1};
1019+
// Maximum time (ms) the server may wait to satisfy min bytes
1020+
int32_t scanner_log_fetch_wait_max_time_ms{500};
1021+
// Maximum bytes per fetch response per bucket for LogScanner (1 MB)
1022+
int32_t scanner_log_fetch_max_bytes_for_bucket{1024 * 1024};
10001023
int64_t writer_batch_timeout_ms{100};
1024+
// Whether to enable idempotent writes
1025+
bool writer_enable_idempotence{true};
1026+
// Maximum number of in-flight requests per bucket for idempotent writes
1027+
size_t writer_max_inflight_requests_per_bucket{5};
1028+
// Total memory available for buffering write batches (default 64MB)
1029+
size_t writer_buffer_memory_size{64 * 1024 * 1024};
1030+
// Maximum time in milliseconds to block waiting for buffer memory
1031+
uint64_t writer_buffer_wait_timeout_ms{std::numeric_limits<uint64_t>::max()};
10011032
// Connect timeout in milliseconds for TCP transport connect
10021033
uint64_t connect_timeout_ms{120000};
10031034
// Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)

bindings/cpp/src/ffi_converter.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,16 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
6464
ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num;
6565
ffi_config.scanner_remote_log_read_concurrency = config.scanner_remote_log_read_concurrency;
6666
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
67+
ffi_config.scanner_log_fetch_max_bytes = config.scanner_log_fetch_max_bytes;
68+
ffi_config.scanner_log_fetch_min_bytes = config.scanner_log_fetch_min_bytes;
69+
ffi_config.scanner_log_fetch_wait_max_time_ms = config.scanner_log_fetch_wait_max_time_ms;
70+
ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket;
6771
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
72+
ffi_config.writer_enable_idempotence = config.writer_enable_idempotence;
73+
ffi_config.writer_max_inflight_requests_per_bucket =
74+
config.writer_max_inflight_requests_per_bucket;
75+
ffi_config.writer_buffer_memory_size = config.writer_buffer_memory_size;
76+
ffi_config.writer_buffer_wait_timeout_ms = config.writer_buffer_wait_timeout_ms;
6877
ffi_config.connect_timeout_ms = config.connect_timeout_ms;
6978
ffi_config.security_protocol = rust::String(config.security_protocol);
7079
ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism);

bindings/cpp/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,15 @@ mod ffi {
4848
remote_file_download_thread_num: usize,
4949
scanner_remote_log_read_concurrency: usize,
5050
scanner_log_max_poll_records: usize,
51+
scanner_log_fetch_max_bytes: i32,
52+
scanner_log_fetch_min_bytes: i32,
53+
scanner_log_fetch_wait_max_time_ms: i32,
54+
scanner_log_fetch_max_bytes_for_bucket: i32,
5155
writer_batch_timeout_ms: i64,
56+
writer_enable_idempotence: bool,
57+
writer_max_inflight_requests_per_bucket: usize,
58+
writer_buffer_memory_size: usize,
59+
writer_buffer_wait_timeout_ms: u64,
5260
connect_timeout_ms: u64,
5361
security_protocol: String,
5462
security_sasl_mechanism: String,
@@ -668,6 +676,14 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult {
668676
remote_file_download_thread_num: config.remote_file_download_thread_num,
669677
scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency,
670678
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
679+
scanner_log_fetch_max_bytes: config.scanner_log_fetch_max_bytes,
680+
scanner_log_fetch_min_bytes: config.scanner_log_fetch_min_bytes,
681+
scanner_log_fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
682+
scanner_log_fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket,
683+
writer_enable_idempotence: config.writer_enable_idempotence,
684+
writer_max_inflight_requests_per_bucket: config.writer_max_inflight_requests_per_bucket,
685+
writer_buffer_memory_size: config.writer_buffer_memory_size,
686+
writer_buffer_wait_timeout_ms: config.writer_buffer_wait_timeout_ms,
671687
connect_timeout_ms: config.connect_timeout_ms,
672688
security_protocol: config.security_protocol.to_string(),
673689
security_sasl_mechanism: config.security_sasl_mechanism.to_string(),

bindings/python/fluss/__init__.pyi

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,42 @@ class Config:
174174
@scanner_log_max_poll_records.setter
175175
def scanner_log_max_poll_records(self, num: int) -> None: ...
176176
@property
177+
def scanner_log_fetch_max_bytes(self) -> int: ...
178+
@scanner_log_fetch_max_bytes.setter
179+
def scanner_log_fetch_max_bytes(self, bytes: int) -> None: ...
180+
@property
181+
def scanner_log_fetch_min_bytes(self) -> int: ...
182+
@scanner_log_fetch_min_bytes.setter
183+
def scanner_log_fetch_min_bytes(self, bytes: int) -> None: ...
184+
@property
185+
def scanner_log_fetch_wait_max_time_ms(self) -> int: ...
186+
@scanner_log_fetch_wait_max_time_ms.setter
187+
def scanner_log_fetch_wait_max_time_ms(self, ms: int) -> None: ...
188+
@property
189+
def scanner_log_fetch_max_bytes_for_bucket(self) -> int: ...
190+
@scanner_log_fetch_max_bytes_for_bucket.setter
191+
def scanner_log_fetch_max_bytes_for_bucket(self, bytes: int) -> None: ...
192+
@property
177193
def writer_batch_timeout_ms(self) -> int: ...
178194
@writer_batch_timeout_ms.setter
179195
def writer_batch_timeout_ms(self, timeout: int) -> None: ...
180196
@property
197+
def writer_enable_idempotence(self) -> bool: ...
198+
@writer_enable_idempotence.setter
199+
def writer_enable_idempotence(self, enabled: bool) -> None: ...
200+
@property
201+
def writer_max_inflight_requests_per_bucket(self) -> int: ...
202+
@writer_max_inflight_requests_per_bucket.setter
203+
def writer_max_inflight_requests_per_bucket(self, num: int) -> None: ...
204+
@property
205+
def writer_buffer_memory_size(self) -> int: ...
206+
@writer_buffer_memory_size.setter
207+
def writer_buffer_memory_size(self, size: int) -> None: ...
208+
@property
209+
def writer_buffer_wait_timeout_ms(self) -> int: ...
210+
@writer_buffer_wait_timeout_ms.setter
211+
def writer_buffer_wait_timeout_ms(self, timeout: int) -> None: ...
212+
@property
181213
def connect_timeout_ms(self) -> int: ...
182214
@connect_timeout_ms.setter
183215
def connect_timeout_ms(self, timeout: int) -> None: ...
@@ -837,6 +869,8 @@ class FlussError(Exception):
837869
error_code: int
838870
def __init__(self, message: str, error_code: int = -2) -> None: ...
839871
def __str__(self) -> str: ...
872+
@property
873+
def is_retriable(self) -> bool: ...
840874

841875
class LakeSnapshot:
842876
def __init__(self, snapshot_id: int) -> None: ...

bindings/python/src/config.rs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,64 @@ impl Config {
9797
))
9898
})?;
9999
}
100+
"scanner.log.fetch.max-bytes" => {
101+
config.scanner_log_fetch_max_bytes = value.parse::<i32>().map_err(|e| {
102+
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
103+
})?;
104+
}
105+
"scanner.log.fetch.min-bytes" => {
106+
config.scanner_log_fetch_min_bytes = value.parse::<i32>().map_err(|e| {
107+
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
108+
})?;
109+
}
110+
"scanner.log.fetch.wait-max-time-ms" => {
111+
config.scanner_log_fetch_wait_max_time_ms =
112+
value.parse::<i32>().map_err(|e| {
113+
FlussError::new_err(format!(
114+
"Invalid value '{value}' for '{key}': {e}"
115+
))
116+
})?;
117+
}
118+
"scanner.log.fetch.max-bytes-for-bucket" => {
119+
config.scanner_log_fetch_max_bytes_for_bucket =
120+
value.parse::<i32>().map_err(|e| {
121+
FlussError::new_err(format!(
122+
"Invalid value '{value}' for '{key}': {e}"
123+
))
124+
})?;
125+
}
126+
"writer.enable-idempotence" => {
127+
config.writer_enable_idempotence = match value.as_str() {
128+
"true" => true,
129+
"false" => false,
130+
other => {
131+
return Err(FlussError::new_err(format!(
132+
"Invalid value '{other}' for '{key}', expected 'true' or 'false'"
133+
)));
134+
}
135+
};
136+
}
137+
"writer.max-inflight-requests-per-bucket" => {
138+
config.writer_max_inflight_requests_per_bucket =
139+
value.parse::<usize>().map_err(|e| {
140+
FlussError::new_err(format!(
141+
"Invalid value '{value}' for '{key}': {e}"
142+
))
143+
})?;
144+
}
145+
"writer.buffer.memory-size" => {
146+
config.writer_buffer_memory_size = value.parse::<usize>().map_err(|e| {
147+
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
148+
})?;
149+
}
150+
"writer.buffer.wait-timeout-ms" => {
151+
config.writer_buffer_wait_timeout_ms =
152+
value.parse::<u64>().map_err(|e| {
153+
FlussError::new_err(format!(
154+
"Invalid value '{value}' for '{key}': {e}"
155+
))
156+
})?;
157+
}
100158
"writer.bucket.no-key-assigner" => {
101159
config.writer_bucket_no_key_assigner =
102160
value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
@@ -270,6 +328,54 @@ impl Config {
270328
Ok(())
271329
}
272330

331+
/// Get whether idempotent writes are enabled
332+
#[getter]
333+
fn writer_enable_idempotence(&self) -> bool {
334+
self.inner.writer_enable_idempotence
335+
}
336+
337+
/// Set whether idempotent writes are enabled
338+
#[setter]
339+
fn set_writer_enable_idempotence(&mut self, enabled: bool) {
340+
self.inner.writer_enable_idempotence = enabled;
341+
}
342+
343+
/// Get the max in-flight requests per bucket
344+
#[getter]
345+
fn writer_max_inflight_requests_per_bucket(&self) -> usize {
346+
self.inner.writer_max_inflight_requests_per_bucket
347+
}
348+
349+
/// Set the max in-flight requests per bucket
350+
#[setter]
351+
fn set_writer_max_inflight_requests_per_bucket(&mut self, num: usize) {
352+
self.inner.writer_max_inflight_requests_per_bucket = num;
353+
}
354+
355+
/// Get the writer buffer memory size
356+
#[getter]
357+
fn writer_buffer_memory_size(&self) -> usize {
358+
self.inner.writer_buffer_memory_size
359+
}
360+
361+
/// Set the writer buffer memory size
362+
#[setter]
363+
fn set_writer_buffer_memory_size(&mut self, size: usize) {
364+
self.inner.writer_buffer_memory_size = size;
365+
}
366+
367+
/// Get the writer buffer wait timeout in milliseconds
368+
#[getter]
369+
fn writer_buffer_wait_timeout_ms(&self) -> u64 {
370+
self.inner.writer_buffer_wait_timeout_ms
371+
}
372+
373+
/// Set the writer buffer wait timeout in milliseconds
374+
#[setter]
375+
fn set_writer_buffer_wait_timeout_ms(&mut self, timeout: u64) {
376+
self.inner.writer_buffer_wait_timeout_ms = timeout;
377+
}
378+
273379
/// Get the connect timeout in milliseconds
274380
#[getter]
275381
fn connect_timeout_ms(&self) -> u64 {
@@ -329,6 +435,54 @@ impl Config {
329435
fn set_security_sasl_password(&mut self, password: String) {
330436
self.inner.security_sasl_password = password;
331437
}
438+
439+
/// Get the maximum bytes per fetch response for LogScanner
440+
#[getter]
441+
fn scanner_log_fetch_max_bytes(&self) -> i32 {
442+
self.inner.scanner_log_fetch_max_bytes
443+
}
444+
445+
/// Set the maximum bytes per fetch response for LogScanner
446+
#[setter]
447+
fn set_scanner_log_fetch_max_bytes(&mut self, bytes: i32) {
448+
self.inner.scanner_log_fetch_max_bytes = bytes;
449+
}
450+
451+
/// Get the minimum bytes to accumulate before returning a fetch response
452+
#[getter]
453+
fn scanner_log_fetch_min_bytes(&self) -> i32 {
454+
self.inner.scanner_log_fetch_min_bytes
455+
}
456+
457+
/// Set the minimum bytes to accumulate before returning a fetch response
458+
#[setter]
459+
fn set_scanner_log_fetch_min_bytes(&mut self, bytes: i32) {
460+
self.inner.scanner_log_fetch_min_bytes = bytes;
461+
}
462+
463+
/// Get the maximum time (ms) the server may wait to satisfy min-bytes
464+
#[getter]
465+
fn scanner_log_fetch_wait_max_time_ms(&self) -> i32 {
466+
self.inner.scanner_log_fetch_wait_max_time_ms
467+
}
468+
469+
/// Set the maximum time (ms) the server may wait to satisfy min-bytes
470+
#[setter]
471+
fn set_scanner_log_fetch_wait_max_time_ms(&mut self, ms: i32) {
472+
self.inner.scanner_log_fetch_wait_max_time_ms = ms;
473+
}
474+
475+
/// Get the maximum bytes per fetch response per bucket for LogScanner
476+
#[getter]
477+
fn scanner_log_fetch_max_bytes_for_bucket(&self) -> i32 {
478+
self.inner.scanner_log_fetch_max_bytes_for_bucket
479+
}
480+
481+
/// Set the maximum bytes per fetch response per bucket for LogScanner
482+
#[setter]
483+
fn set_scanner_log_fetch_max_bytes_for_bucket(&mut self, bytes: i32) {
484+
self.inner.scanner_log_fetch_max_bytes_for_bucket = bytes;
485+
}
332486
}
333487

334488
impl Config {

bindings/python/src/error.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ impl FlussError {
5151
format!("FlussError: {}", self.message)
5252
}
5353
}
54+
55+
/// Returns ``True`` if retrying the request may succeed. Client-side errors always return ``False``.
56+
#[getter]
57+
fn is_retriable(&self) -> bool {
58+
use fluss::rpc::FlussError as CoreFlussError;
59+
if self.error_code == CLIENT_ERROR_CODE {
60+
return false;
61+
}
62+
CoreFlussError::for_code(self.error_code).is_retriable()
63+
}
5464
}
5565

5666
impl FlussError {

crates/fluss/src/client/connection.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ use parking_lot::RwLock;
2727
use std::sync::Arc;
2828
use std::time::Duration;
2929

30+
use crate::error::{Error, FlussError, Result};
31+
use crate::metadata::TablePath;
32+
33+
// TODO: implement `close(&self, timeout: Duration)` to gracefully shut down the
34+
// writer client (drain pending batches, then force-close on timeout).
35+
// Java's FlussConnection.close() calls writerClient.close(Long.MAX_VALUE).
36+
// WriterClient::close() already exists but is never called from the public API.
3037
pub struct FlussConnection {
3138
metadata: Arc<Metadata>,
3239
network_connects: Arc<RpcClient>,
@@ -39,6 +46,8 @@ impl FlussConnection {
3946
pub async fn new(arg: Config) -> Result<Self> {
4047
arg.validate_security()
4148
.map_err(|msg| Error::IllegalArgument { message: msg })?;
49+
arg.validate_scanner_fetch()
50+
.map_err(|msg| Error::IllegalArgument { message: msg })?;
4251

4352
let timeout = Duration::from_millis(arg.connect_timeout_ms);
4453
let connections = if arg.is_sasl_enabled() {

0 commit comments

Comments
 (0)