From 36f4ebf2161137fe54b383649c575a3abad01035 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 20 Mar 2026 15:42:20 +0000 Subject: [PATCH 1/2] [BUG-447] Idempotency bug: OOO exception when response is lost but subsequent batches succeed --- crates/fluss/src/client/write/idempotence.rs | 43 ++++++++++++++++++++ crates/fluss/src/client/write/sender.rs | 39 +++++++++++++----- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/crates/fluss/src/client/write/idempotence.rs b/crates/fluss/src/client/write/idempotence.rs index 3c55f6ac..eeec8761 100644 --- a/crates/fluss/src/client/write/idempotence.rs +++ b/crates/fluss/src/client/write/idempotence.rs @@ -314,6 +314,19 @@ impl IdempotenceManager { } } + /// Returns true if the batch has already been committed on the server. + /// + /// If the batch's sequence is less than or equal to `last_acked_sequence`, it means + /// a higher-sequence batch has already been acknowledged. This implies the current batch + /// was also successfully written on the server (otherwise the higher-sequence batch could + /// not have been committed). + pub fn is_already_committed(&self, bucket: &TableBucket, batch_sequence: i32) -> bool { + let entries = self.bucket_entries.lock(); + entries + .get(bucket) + .is_some_and(|e| e.last_acked_sequence >= 0 && batch_sequence <= e.last_acked_sequence) + } + pub fn can_retry_for_error( &self, bucket: &TableBucket, @@ -530,6 +543,36 @@ mod tests { assert!(mgr.can_send_more_requests(&b0)); // under limit again } + #[test] + fn test_is_already_committed() { + let mgr = IdempotenceManager::new(true, 5); + let b0 = test_bucket(0); + mgr.set_writer_id(42); + + // No entry yet → not committed + assert!(!mgr.is_already_committed(&b0, 0)); + + // Initialize bucket and ack seq=0 + let _ = mgr.next_sequence_and_increment(&b0); // 0 + mgr.add_in_flight_batch(&b0, 0, 100); + mgr.handle_completed_batch(&b0, 100, 42); // last_acked=0 + + // seq=0 <= last_acked(0) → committed + assert!(mgr.is_already_committed(&b0, 0)); + // seq=1 > last_acked(0) → not committed + assert!(!mgr.is_already_committed(&b0, 1)); + + // Ack up to seq=4, then check seq=0 still committed + for seq in 1..=4 { + let _ = mgr.next_sequence_and_increment(&b0); + mgr.add_in_flight_batch(&b0, seq, 100 + seq as i64); + mgr.handle_completed_batch(&b0, 100 + seq as i64, 42); + } + assert!(mgr.is_already_committed(&b0, 0)); // seq=0 <= last_acked(4) + assert!(mgr.is_already_committed(&b0, 4)); // seq=4 <= last_acked(4) + assert!(!mgr.is_already_committed(&b0, 5)); // seq=5 > last_acked(4) + } + #[test] fn test_reset_batch_ids_cleaned_on_complete() { let (mgr, b0) = setup_three_in_flight(); diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index b526e1a9..438095c1 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -637,6 +637,35 @@ impl Sender { message: String, ) -> Result>> { let physical_table_path = Arc::clone(ready_write_batch.write_batch.physical_table_path()); + + if error == FlussError::DuplicateSequenceException { + warn!( + "Duplicate sequence for {} on bucket {}: {message}", + physical_table_path.as_ref(), + ready_write_batch.table_bucket.bucket_id() + ); + self.complete_batch(ready_write_batch); + return Ok(None); + } + + if error == FlussError::OutOfOrderSequenceException + && self.idempotence_manager.is_enabled() + && self.idempotence_manager.is_already_committed( + &ready_write_batch.table_bucket, + ready_write_batch.write_batch.batch_sequence(), + ) + { + warn!( + "Batch for {} on bucket {} with sequence {} received OutOfOrderSequenceException \ + but has already been committed. Treating as success due to lost response.", + physical_table_path.as_ref(), + ready_write_batch.table_bucket.bucket_id(), + ready_write_batch.write_batch.batch_sequence(), + ); + self.complete_batch(ready_write_batch); + return Ok(None); + } + if self.can_retry(&ready_write_batch, error) { warn!( "Retrying write batch for {} on bucket {} after error {error:?}: {message}", @@ -680,16 +709,6 @@ impl Sender { return Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path)); } - if error == FlussError::DuplicateSequenceException { - warn!( - "Duplicate sequence for {} on bucket {}: {message}", - physical_table_path.as_ref(), - ready_write_batch.table_bucket.bucket_id() - ); - self.complete_batch(ready_write_batch); - return Ok(None); - } - // Generic error path. handle_failed_batch will detect OutOfOrderSequence / // UnknownWriterId and reset all writer state internally (matching Java). // For other errors, only adjust sequences if the batch didn't exhaust its retries. From 2c7cdec94a998e7459293573e79a9070a284b7a6 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 20 Mar 2026 21:35:45 +0000 Subject: [PATCH 2/2] change comment --- crates/fluss/src/client/write/sender.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index 438095c1..dd5370d8 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -709,8 +709,9 @@ impl Sender { return Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path)); } - // Generic error path. handle_failed_batch will detect OutOfOrderSequence / - // UnknownWriterId and reset all writer state internally (matching Java). + // Generic error path. handle_failed_batch will detect remaining + // OutOfOrderSequence (not already committed) / UnknownWriterId cases and + // reset all writer state internally (matching Java). // For other errors, only adjust sequences if the batch didn't exhaust its retries. let can_adjust = ready_write_batch.write_batch.attempts() < self.retries; self.fail_batch(