Skip to content

fix(lake): use watermark for offset in lake sink path#546

Merged
shortishly merged 1 commit intotansu-io:mainfrom
rawkode:fix/lake-sink-offset
Jan 8, 2026
Merged

fix(lake): use watermark for offset in lake sink path#546
shortishly merged 1 commit intotansu-io:mainfrom
rawkode:fix/lake-sink-offset

Conversation

@rawkode
Copy link
Copy Markdown
Contributor

@rawkode rawkode commented Jan 8, 2026

Summary

The lake sink code path was hardcoding offset to 0, causing all parquet files to have the same name (based on partition-offset). This caused Iceberg to reject subsequent writes as duplicate files after the first batch.

Problem

In tansu-storage/src/dynostore.rs, the lake sink branch had:

let offset = 0;  // Always 0!

This offset is used in the parquet filename, so every batch tried to write to the same file.

Fix

Reuse the same watermark logic from the normal storage path to calculate proper incrementing offsets:

let watermark = self.watermarks.lock()...;
let offset = watermark.with_mut(&self.object_store, |watermark| {
    let offset = watermark.high.unwrap_or_default();
    // update watermark...
    Ok(offset)
}).await?;

Test Plan

  • Existing tests pass (cargo test -p tansu-storage)
  • Deploy with lake sink enabled, verify multiple batches write to different parquet files

The lake sink code path was hardcoding offset to 0, causing all parquet
files to have the same name (based on partition-offset). This caused
Iceberg to reject subsequent writes as duplicate files.

This fix reuses the same watermark logic from the normal storage path
to calculate proper incrementing offsets for lake sink writes.
Copilot AI review requested due to automatic review settings January 8, 2026 16:29
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a critical bug in the lake sink code path where the offset was hardcoded to 0, causing all parquet files to have identical names. This resulted in Iceberg rejecting subsequent writes as duplicates after the first batch. The fix reuses the same watermark-based offset calculation logic from the normal storage path to generate proper incrementing offsets.

  • Replaces hardcoded offset = 0 with watermark-based offset calculation
  • Ensures each batch writes to a uniquely named parquet file based on incrementing offset
  • Reuses existing watermark management pattern from the normal storage path

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +729 to +754
let watermark = self.watermarks.lock().map(|mut locked| {
locked
.entry(topition.to_owned())
.or_insert_with(|| OptiCon::<Watermark>::new(self.cluster.as_str(), topition))
.to_owned()
})?;

let offset = watermark
.with_mut(&self.object_store, |watermark| {
debug!(?watermark);

let offset = watermark.high.unwrap_or_default();
watermark.high = watermark.high.map_or_else(
|| Some(deflated.last_offset_delta as i64 + 1i64),
|high| Some(high + deflated.last_offset_delta as i64 + 1i64),
);

watermark.timestamps = None;

debug!(?watermark);

Ok(offset)
})
.await
.inspect(|offset| debug!(offset, transaction_id, ?topition))
.inspect_err(|err| error!(?err, transaction_id, ?topition))?;
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watermark calculation logic here (lines 729-754) is duplicated from the normal storage path (lines 856-881). Consider extracting this into a helper method to avoid code duplication and ensure both paths remain consistent if the watermark logic needs to be updated in the future.

Copilot uses AI. Check for mistakes.
@shortishly shortishly merged commit b902b17 into tansu-io:main Jan 8, 2026
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants