Skip to content

Commit e26702e

Browse files
chore: Idempotency bug: OOO loop when response is lost but subsequent batches succeed (#448)
1 parent 6740719 commit e26702e

2 files changed

Lines changed: 75 additions & 12 deletions

File tree

crates/fluss/src/client/write/idempotence.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,19 @@ impl IdempotenceManager {
314314
}
315315
}
316316

317+
/// Returns true if the batch has already been committed on the server.
318+
///
319+
/// If the batch's sequence is less than or equal to `last_acked_sequence`, it means
320+
/// a higher-sequence batch has already been acknowledged. This implies the current batch
321+
/// was also successfully written on the server (otherwise the higher-sequence batch could
322+
/// not have been committed).
323+
pub fn is_already_committed(&self, bucket: &TableBucket, batch_sequence: i32) -> bool {
324+
let entries = self.bucket_entries.lock();
325+
entries
326+
.get(bucket)
327+
.is_some_and(|e| e.last_acked_sequence >= 0 && batch_sequence <= e.last_acked_sequence)
328+
}
329+
317330
pub fn can_retry_for_error(
318331
&self,
319332
bucket: &TableBucket,
@@ -530,6 +543,36 @@ mod tests {
530543
assert!(mgr.can_send_more_requests(&b0)); // under limit again
531544
}
532545

546+
#[test]
547+
fn test_is_already_committed() {
548+
let mgr = IdempotenceManager::new(true, 5);
549+
let b0 = test_bucket(0);
550+
mgr.set_writer_id(42);
551+
552+
// No entry yet → not committed
553+
assert!(!mgr.is_already_committed(&b0, 0));
554+
555+
// Initialize bucket and ack seq=0
556+
let _ = mgr.next_sequence_and_increment(&b0); // 0
557+
mgr.add_in_flight_batch(&b0, 0, 100);
558+
mgr.handle_completed_batch(&b0, 100, 42); // last_acked=0
559+
560+
// seq=0 <= last_acked(0) → committed
561+
assert!(mgr.is_already_committed(&b0, 0));
562+
// seq=1 > last_acked(0) → not committed
563+
assert!(!mgr.is_already_committed(&b0, 1));
564+
565+
// Ack up to seq=4, then check seq=0 still committed
566+
for seq in 1..=4 {
567+
let _ = mgr.next_sequence_and_increment(&b0);
568+
mgr.add_in_flight_batch(&b0, seq, 100 + seq as i64);
569+
mgr.handle_completed_batch(&b0, 100 + seq as i64, 42);
570+
}
571+
assert!(mgr.is_already_committed(&b0, 0)); // seq=0 <= last_acked(4)
572+
assert!(mgr.is_already_committed(&b0, 4)); // seq=4 <= last_acked(4)
573+
assert!(!mgr.is_already_committed(&b0, 5)); // seq=5 > last_acked(4)
574+
}
575+
533576
#[test]
534577
fn test_reset_batch_ids_cleaned_on_complete() {
535578
let (mgr, b0) = setup_three_in_flight();

crates/fluss/src/client/write/sender.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,35 @@ impl Sender {
637637
message: String,
638638
) -> Result<Option<Arc<PhysicalTablePath>>> {
639639
let physical_table_path = Arc::clone(ready_write_batch.write_batch.physical_table_path());
640+
641+
if error == FlussError::DuplicateSequenceException {
642+
warn!(
643+
"Duplicate sequence for {} on bucket {}: {message}",
644+
physical_table_path.as_ref(),
645+
ready_write_batch.table_bucket.bucket_id()
646+
);
647+
self.complete_batch(ready_write_batch);
648+
return Ok(None);
649+
}
650+
651+
if error == FlussError::OutOfOrderSequenceException
652+
&& self.idempotence_manager.is_enabled()
653+
&& self.idempotence_manager.is_already_committed(
654+
&ready_write_batch.table_bucket,
655+
ready_write_batch.write_batch.batch_sequence(),
656+
)
657+
{
658+
warn!(
659+
"Batch for {} on bucket {} with sequence {} received OutOfOrderSequenceException \
660+
but has already been committed. Treating as success due to lost response.",
661+
physical_table_path.as_ref(),
662+
ready_write_batch.table_bucket.bucket_id(),
663+
ready_write_batch.write_batch.batch_sequence(),
664+
);
665+
self.complete_batch(ready_write_batch);
666+
return Ok(None);
667+
}
668+
640669
if self.can_retry(&ready_write_batch, error) {
641670
warn!(
642671
"Retrying write batch for {} on bucket {} after error {error:?}: {message}",
@@ -680,18 +709,9 @@ impl Sender {
680709
return Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
681710
}
682711

683-
if error == FlussError::DuplicateSequenceException {
684-
warn!(
685-
"Duplicate sequence for {} on bucket {}: {message}",
686-
physical_table_path.as_ref(),
687-
ready_write_batch.table_bucket.bucket_id()
688-
);
689-
self.complete_batch(ready_write_batch);
690-
return Ok(None);
691-
}
692-
693-
// Generic error path. handle_failed_batch will detect OutOfOrderSequence /
694-
// UnknownWriterId and reset all writer state internally (matching Java).
712+
// Generic error path. handle_failed_batch will detect remaining
713+
// OutOfOrderSequence (not already committed) / UnknownWriterId cases and
714+
// reset all writer state internally (matching Java).
695715
// For other errors, only adjust sequences if the batch didn't exhaust its retries.
696716
let can_adjust = ready_write_batch.write_batch.attempts() < self.retries;
697717
self.fail_batch(

0 commit comments

Comments
 (0)