fix(lake): use watermark for offset in lake sink path#546
fix(lake): use watermark for offset in lake sink path#546shortishly merged 1 commit intotansu-io:mainfrom
Conversation
The lake sink code path was hardcoding offset to 0, causing all parquet files to have the same name (based on partition-offset). This caused Iceberg to reject subsequent writes as duplicate files. This fix reuses the same watermark logic from the normal storage path to calculate proper incrementing offsets for lake sink writes.
There was a problem hiding this comment.
Pull request overview
This PR fixes a critical bug in the lake sink code path where the offset was hardcoded to 0, causing all parquet files to have identical names. This resulted in Iceberg rejecting subsequent writes as duplicates after the first batch. The fix reuses the same watermark-based offset calculation logic from the normal storage path to generate proper incrementing offsets.
- Replaces hardcoded
offset = 0with watermark-based offset calculation - Ensures each batch writes to a uniquely named parquet file based on incrementing offset
- Reuses existing watermark management pattern from the normal storage path
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let watermark = self.watermarks.lock().map(|mut locked| { | ||
| locked | ||
| .entry(topition.to_owned()) | ||
| .or_insert_with(|| OptiCon::<Watermark>::new(self.cluster.as_str(), topition)) | ||
| .to_owned() | ||
| })?; | ||
|
|
||
| let offset = watermark | ||
| .with_mut(&self.object_store, |watermark| { | ||
| debug!(?watermark); | ||
|
|
||
| let offset = watermark.high.unwrap_or_default(); | ||
| watermark.high = watermark.high.map_or_else( | ||
| || Some(deflated.last_offset_delta as i64 + 1i64), | ||
| |high| Some(high + deflated.last_offset_delta as i64 + 1i64), | ||
| ); | ||
|
|
||
| watermark.timestamps = None; | ||
|
|
||
| debug!(?watermark); | ||
|
|
||
| Ok(offset) | ||
| }) | ||
| .await | ||
| .inspect(|offset| debug!(offset, transaction_id, ?topition)) | ||
| .inspect_err(|err| error!(?err, transaction_id, ?topition))?; |
There was a problem hiding this comment.
The watermark calculation logic here (lines 729-754) is duplicated from the normal storage path (lines 856-881). Consider extracting this into a helper method to avoid code duplication and ensure both paths remain consistent if the watermark logic needs to be updated in the future.
Summary
The lake sink code path was hardcoding offset to 0, causing all parquet files to have the same name (based on partition-offset). This caused Iceberg to reject subsequent writes as duplicate files after the first batch.
Problem
In
tansu-storage/src/dynostore.rs, the lake sink branch had:This offset is used in the parquet filename, so every batch tried to write to the same file.
Fix
Reuse the same watermark logic from the normal storage path to calculate proper incrementing offsets:
Test Plan
cargo test -p tansu-storage)