Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions crates/fluss/src/client/write/idempotence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,19 @@ impl IdempotenceManager {
}
}

/// Returns true if the batch has already been committed on the server.
///
/// If the batch's sequence is less than or equal to `last_acked_sequence`, it means
/// a higher-sequence batch has already been acknowledged. This implies the current batch
/// was also successfully written on the server (otherwise the higher-sequence batch could
/// not have been committed).
pub fn is_already_committed(&self, bucket: &TableBucket, batch_sequence: i32) -> bool {
let entries = self.bucket_entries.lock();
entries
.get(bucket)
.is_some_and(|e| e.last_acked_sequence >= 0 && batch_sequence <= e.last_acked_sequence)
}

pub fn can_retry_for_error(
&self,
bucket: &TableBucket,
Expand Down Expand Up @@ -530,6 +543,36 @@ mod tests {
assert!(mgr.can_send_more_requests(&b0)); // under limit again
}

#[test]
fn test_is_already_committed() {
let mgr = IdempotenceManager::new(true, 5);
let b0 = test_bucket(0);
mgr.set_writer_id(42);

// No entry yet → not committed
assert!(!mgr.is_already_committed(&b0, 0));

// Initialize bucket and ack seq=0
let _ = mgr.next_sequence_and_increment(&b0); // 0
mgr.add_in_flight_batch(&b0, 0, 100);
mgr.handle_completed_batch(&b0, 100, 42); // last_acked=0

// seq=0 <= last_acked(0) → committed
assert!(mgr.is_already_committed(&b0, 0));
// seq=1 > last_acked(0) → not committed
assert!(!mgr.is_already_committed(&b0, 1));

// Ack up to seq=4, then check seq=0 still committed
for seq in 1..=4 {
let _ = mgr.next_sequence_and_increment(&b0);
mgr.add_in_flight_batch(&b0, seq, 100 + seq as i64);
mgr.handle_completed_batch(&b0, 100 + seq as i64, 42);
}
assert!(mgr.is_already_committed(&b0, 0)); // seq=0 <= last_acked(4)
assert!(mgr.is_already_committed(&b0, 4)); // seq=4 <= last_acked(4)
assert!(!mgr.is_already_committed(&b0, 5)); // seq=5 > last_acked(4)
}

#[test]
fn test_reset_batch_ids_cleaned_on_complete() {
let (mgr, b0) = setup_three_in_flight();
Expand Down
44 changes: 32 additions & 12 deletions crates/fluss/src/client/write/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,35 @@ impl Sender {
message: String,
) -> Result<Option<Arc<PhysicalTablePath>>> {
let physical_table_path = Arc::clone(ready_write_batch.write_batch.physical_table_path());

if error == FlussError::DuplicateSequenceException {
warn!(
"Duplicate sequence for {} on bucket {}: {message}",
physical_table_path.as_ref(),
ready_write_batch.table_bucket.bucket_id()
);
self.complete_batch(ready_write_batch);
return Ok(None);
}

if error == FlussError::OutOfOrderSequenceException
&& self.idempotence_manager.is_enabled()
&& self.idempotence_manager.is_already_committed(
&ready_write_batch.table_bucket,
ready_write_batch.write_batch.batch_sequence(),
)
{
warn!(
"Batch for {} on bucket {} with sequence {} received OutOfOrderSequenceException \
but has already been committed. Treating as success due to lost response.",
physical_table_path.as_ref(),
ready_write_batch.table_bucket.bucket_id(),
ready_write_batch.write_batch.batch_sequence(),
);
self.complete_batch(ready_write_batch);
return Ok(None);
}

if self.can_retry(&ready_write_batch, error) {
warn!(
"Retrying write batch for {} on bucket {} after error {error:?}: {message}",
Expand Down Expand Up @@ -680,18 +709,9 @@ impl Sender {
return Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
}

if error == FlussError::DuplicateSequenceException {
warn!(
"Duplicate sequence for {} on bucket {}: {message}",
physical_table_path.as_ref(),
ready_write_batch.table_bucket.bucket_id()
);
self.complete_batch(ready_write_batch);
return Ok(None);
}

// Generic error path. handle_failed_batch will detect OutOfOrderSequence /
// UnknownWriterId and reset all writer state internally (matching Java).
// Generic error path. handle_failed_batch will detect remaining
// OutOfOrderSequence (not already committed) / UnknownWriterId cases and
// reset all writer state internally (matching Java).
// For other errors, only adjust sequences if the batch didn't exhaust its retries.
let can_adjust = ready_write_batch.write_batch.attempts() < self.retries;
self.fail_batch(
Expand Down
Loading