From f236e2227d104c2a1b2f6305cf1452d390b6b4d8 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Wed, 27 May 2026 19:11:29 +0530 Subject: [PATCH 1/2] [FLINK-39778][s3] Recoverable writer silently loses the in-flight tail on resume --- ...NativeS3RecoverableFsDataOutputStream.java | 51 +++- .../writer/NativeS3RecoverableWriter.java | 90 +++++- .../writer/InMemoryNativeS3Operations.java | 165 ++++++++++ ...NativeS3RecoverableWriterRecoveryTest.java | 288 ++++++++++++++++++ 4 files changed, 585 insertions(+), 9 deletions(-) create mode 100644 flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java create mode 100644 flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java index 56e7729112607..fb39f861a81f0 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.io.BufferedOutputStream; @@ -95,6 +96,29 @@ public NativeS3RecoverableFsDataOutputStream( List existingParts, long numBytesInParts) throws IOException { + this( + s3AccessHelper, + key, + uploadId, + localTmpDir, + minPartSize, + existingParts, + numBytesInParts, + null, + 0L); + } + + public NativeS3RecoverableFsDataOutputStream( + NativeS3ObjectOperations s3AccessHelper, + String key, + String uploadId, + String localTmpDir, + long minPartSize, + List existingParts, + long numBytesInParts, + @Nullable File seedTailFile, + long seedTailLength) + throws IOException { this.s3AccessHelper = s3AccessHelper; this.key = key; this.uploadId = uploadId; @@ -106,7 +130,32 @@ public NativeS3RecoverableFsDataOutputStream( this.currentPartSize = 0; this.closed = false; - createNewTempFile(); + if (seedTailFile != null) { + adoptSeedTailFile(seedTailFile, seedTailLength); + } else { + createNewTempFile(); + } + } + + private void adoptSeedTailFile(File seedFile, long expectedLength) throws IOException { + if (!seedFile.exists()) { + throw new IOException("Seed tail file does not exist: " + seedFile); + } + long actualLength = seedFile.length(); + if (actualLength != expectedLength) { + throw new IOException( + "Seed tail file " + + seedFile + + " has unexpected length: expected " + + expectedLength + + " got " + + actualLength); + } + currentTempFile = seedFile; + // Append mode so subsequent writes land after the seeded bytes. + currentFileStream = new FileOutputStream(currentTempFile, true); + currentOutputStream = new BufferedOutputStream(currentFileStream, BUFFER_SIZE); + currentPartSize = expectedLength; } private void createNewTempFile() throws IOException { diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java index 942b1d3e80ba0..37faf1bd1b417 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java @@ -27,7 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -85,14 +88,85 @@ public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverabl public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException { checkNotClosed(); NativeS3Recoverable s3recoverable = castToNativeS3Recoverable(recoverable); - return new NativeS3RecoverableFsDataOutputStream( - s3AccessHelper, - s3recoverable.getObjectName(), - s3recoverable.uploadId(), - localTmpDir, - userDefinedMinPartSize, - s3recoverable.parts(), - s3recoverable.numBytesInParts()); + + File seedTail = null; + long seedTailLength = 0L; + if (s3recoverable.incompleteObjectName() != null) { + seedTail = downloadIncompleteTail(s3recoverable); + seedTailLength = s3recoverable.incompleteObjectLength(); + } + + try { + LOG.debug( + "Resuming stream - key: {}, uploadId: {}, parts: {}, bytesInParts: {}, seedTail: {} ({} bytes)", + s3recoverable.getObjectName(), + s3recoverable.uploadId(), + s3recoverable.parts().size(), + s3recoverable.numBytesInParts(), + s3recoverable.incompleteObjectName(), + seedTailLength); + return new NativeS3RecoverableFsDataOutputStream( + s3AccessHelper, + s3recoverable.getObjectName(), + s3recoverable.uploadId(), + localTmpDir, + userDefinedMinPartSize, + s3recoverable.parts(), + s3recoverable.numBytesInParts(), + seedTail, + seedTailLength); + } catch (Throwable t) { + // The downloaded seed file is owned by recover() until the constructor takes + // ownership. If construction fails, drop the local file so we don't leak it. + if (seedTail != null) { + try { + Files.deleteIfExists(seedTail.toPath()); + } catch (IOException cleanup) { + t.addSuppressed(cleanup); + } + } + throw t; + } + } + + /** + * Downloads the side object holding the previously-persisted sub-part-size tail into a fresh + * file under {@link #localTmpDir}. The side object itself is left in place so that a repeated + * recovery from the same checkpoint remains correct; cleanup is the responsibility of {@link + * #cleanupRecoverableState(ResumeRecoverable)} which Flink invokes when the checkpoint is + * retired. + */ + private File downloadIncompleteTail(NativeS3Recoverable s3recoverable) throws IOException { + File tmpDir = new File(localTmpDir); + if (!tmpDir.exists() && !tmpDir.mkdirs()) { + throw new IOException("Cannot create local tmp dir: " + localTmpDir); + } + File target = new File(tmpDir, "s3-resume-" + UUID.randomUUID()); + try { + long downloaded = + s3AccessHelper.getObject(s3recoverable.incompleteObjectName(), target); + if (downloaded != s3recoverable.incompleteObjectLength()) { + throw new IOException( + "Incomplete-tail object " + + s3recoverable.incompleteObjectName() + + " has unexpected length: expected " + + s3recoverable.incompleteObjectLength() + + " got " + + downloaded); + } + return target; + } catch (IOException e) { + try { + Files.deleteIfExists(target.toPath()); + } catch (IOException cleanup) { + LOG.warn( + "Failed to delete partial download {} after error: {}", + target, + cleanup.getMessage()); + e.addSuppressed(cleanup); + } + throw e; + } } @Override diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java new file mode 100644 index 0000000000000..8e70b13d3f56f --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.writer; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * In-memory implementation for {@link NativeS3ObjectOperations}. + * + *

Backs every reachable S3 operation with hash maps so writer/committer logic can be exercised + * without an S3 endpoint (no MinIO/Testcontainers required). The parent's {@code S3Client} / {@code + * S3TransferManager} constructor arguments are passed as {@code null} because no overridden method + * dereferences them. + * + *

State exposure: the storage maps are exposed as public final fields so tests can + * inspect them, corrupt them, or simulate object loss directly: + * + *

    + *
  • {@link #storedObjects} — keys written via {@link #putObject(String, File)} (e.g. the + * incomplete-tail side objects persisted by {@link NativeS3RecoverableFsDataOutputStream}). + *
  • {@link #committedObjects} — keys finalized via {@link #commitMultiPartUpload}. + *
  • {@link #openMultipartUploads} — uploadId → partNumber → bytes for in-flight MPUs; entries + * are removed on commit or abort. + *
+ * + *

{@link #getObject} reads from both {@link #storedObjects} and {@link + * #committedObjects} so tests can fetch a committed object the same way real S3 would serve it. + * + *

Thread safety: not thread-safe. Use a single thread per instance, matching the + * single-thread invariant of the production {@link NativeS3RecoverableFsDataOutputStream}. + */ +public final class InMemoryNativeS3Operations extends NativeS3ObjectOperations { + + public static final String DEFAULT_BUCKET = "test-bucket"; + + /** Keys written via {@link #putObject(String, File)}. */ + public final Map storedObjects = new HashMap<>(); + + /** Keys finalized via {@link #commitMultiPartUpload}. */ + public final Map committedObjects = new HashMap<>(); + + /** uploadId → partNumber → uploaded bytes for in-flight MPUs. */ + public final Map> openMultipartUploads = new HashMap<>(); + + private final String bucketName; + private final AtomicInteger uploadIdSeq = new AtomicInteger(); + + public InMemoryNativeS3Operations() { + this(DEFAULT_BUCKET); + } + + public InMemoryNativeS3Operations(String bucketName) { + super(/* s3Client */ null, /* transferManager */ null, bucketName, /* useAsync */ false); + this.bucketName = bucketName; + } + + @Override + public String startMultiPartUpload(String key) { + String uploadId = "U" + uploadIdSeq.incrementAndGet(); + openMultipartUploads.put(uploadId, new HashMap<>()); + return uploadId; + } + + @Override + public UploadPartResult uploadPart( + String key, String uploadId, int partNumber, File file, long length) + throws IOException { + Map parts = openMultipartUploads.get(uploadId); + if (parts == null) { + throw new IOException("unknown uploadId: " + uploadId); + } + byte[] data = Files.readAllBytes(file.toPath()); + if (data.length != length) { + throw new IOException( + "part length mismatch: expected " + length + ", got " + data.length); + } + parts.put(partNumber, data); + return new UploadPartResult(partNumber, "etag-" + uploadId + "-" + partNumber); + } + + @Override + public PutObjectResult putObject(String key, File file) throws IOException { + storedObjects.put(key, Files.readAllBytes(file.toPath())); + return new PutObjectResult("etag-" + UUID.randomUUID()); + } + + @Override + public long getObject(String key, File targetLocation) throws IOException { + byte[] data = storedObjects.get(key); + if (data == null) { + data = committedObjects.get(key); + } + if (data == null) { + throw new IOException("not found: " + key); + } + Files.write(targetLocation.toPath(), data); + return data.length; + } + + @Override + public CompleteMultipartUploadResult commitMultiPartUpload( + String key, String uploadId, List parts, long length) + throws IOException { + Map uploaded = openMultipartUploads.remove(uploadId); + if (uploaded == null) { + throw new IOException("unknown uploadId: " + uploadId); + } + List ordered = new ArrayList<>(parts.size()); + for (UploadPartResult p : parts) { + ordered.add(p.getPartNumber()); + } + Collections.sort(ordered); + ByteArrayOutputStream merged = new ByteArrayOutputStream(); + for (int n : ordered) { + byte[] partData = uploaded.get(n); + if (partData == null) { + throw new IOException("missing part " + n + " for uploadId " + uploadId); + } + merged.write(partData); + } + byte[] finalBytes = merged.toByteArray(); + if (finalBytes.length != length) { + throw new IOException( + "committed length mismatch: expected " + length + ", got " + finalBytes.length); + } + committedObjects.put(key, finalBytes); + return new CompleteMultipartUploadResult(bucketName, key, "final-etag-" + uploadId, null); + } + + @Override + public void abortMultiPartUpload(String key, String uploadId) { + openMultipartUploads.remove(uploadId); + } + + @Override + public boolean deleteObject(String key) { + return storedObjects.remove(key) != null || committedObjects.remove(key) != null; + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java new file mode 100644 index 0000000000000..3748719efc9dc --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link NativeS3RecoverableWriter#recover}. */ +class NativeS3RecoverableWriterRecoveryTest { + + private static final String BUCKET = InMemoryNativeS3Operations.DEFAULT_BUCKET; + private static final String KEY = "out.txt"; + private static final long MIN_PART_SIZE = 10L; + + @TempDir java.nio.file.Path tmp; + + @Test + void persistThenRecoverPreservesTailBytes() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer( + s3, tmp.toString(), MIN_PART_SIZE, /* maxConcurrent */ 1); + + // --- Phase 1: write 15 bytes; minPartSize=10 ⇒ 10B uploaded as part #1, 5B tail in memory. + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + byte[] firstChunk = bytes('A', 10); // becomes part #1 + byte[] tail = bytes('E', 5); // becomes the persisted side object + out.write(firstChunk, 0, firstChunk.length); + out.write(tail, 0, tail.length); + + // --- Phase 2: checkpoint barrier: persist() and round-trip metadata through the + // serializer (this is what Flink does when storing checkpoint state). + RecoverableWriter.ResumeRecoverable r = out.persist(); + byte[] checkpointed = + NativeS3RecoverableSerializer.INSTANCE.serialize((NativeS3Recoverable) r); + + // Sanity: the 5-byte tail is sitting in S3 as a side object right now. + assertThat(s3.storedObjects).hasSize(1); + String sideObjectKey = s3.storedObjects.keySet().iterator().next(); + assertThat(sideObjectKey).startsWith(KEY + "/.incomplete/"); + assertThat(s3.storedObjects.get(sideObjectKey)).containsExactly(tail); + + // --- Phase 3: simulate task crash + restore from checkpoint. We deliberately do NOT + // close `out` because in a real crash the JVM dies; close() would also abort + // the MPU and invalidate the recoverable. + NativeS3Recoverable restored = + NativeS3RecoverableSerializer.INSTANCE.deserialize( + NativeS3RecoverableSerializer.INSTANCE.getVersion(), checkpointed); + assertThat(restored.incompleteObjectName()) + .as("metadata survives the checkpoint round-trip") + .isEqualTo(sideObjectKey); + assertThat(restored.incompleteObjectLength()).isEqualTo(tail.length); + + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream resumed = writer2.recover(restored); + + // --- Phase 4: continue writing 10 more bytes, then commit. + byte[] afterResume = bytes('F', 10); + resumed.write(afterResume, 0, afterResume.length); + RecoverableFsDataOutputStream.Committer committer = resumed.closeForCommit(); + committer.commit(); + + // --- Phase 5: read what landed in S3. + byte[] finalObject = s3.committedObjects.get(KEY); + assertThat(finalObject).isNotNull(); + + // EXPECTED (correct exactly-once): 25 bytes — "AAAAAAAAAA" + "EEEEE" + "FFFFFFFFFF" + ByteArrayOutputStream expected = new ByteArrayOutputStream(); + expected.write(firstChunk); + expected.write(tail); + expected.write(afterResume); + + // ON UN-FIXED CODE this assertion FAILS: + // expected length 25 ("AAAAAAAAAAEEEEEFFFFFFFFFF") + // actual length 20 ("AAAAAAAAAAFFFFFFFFFF") + // The 5 'E' bytes — the tail durably persisted to S3 at checkpoint time — would be gone. + assertThat(finalObject) + .as("recover() must replay the persisted tail before continuing") + .containsExactly(expected.toByteArray()); + + // The side object is intentionally NOT deleted by recover() so that re-recovery from the + // same checkpoint stays correct. Cleanup is owned by cleanupRecoverableState(), which + // Flink invokes once the checkpoint is retired. + assertThat(s3.storedObjects) + .as("side object outlives recover() to support re-recovery from same checkpoint") + .containsKey(sideObjectKey); + assertThat(writer2.cleanupRecoverableState(restored)).isTrue(); + assertThat(s3.storedObjects) + .as("cleanupRecoverableState must delete the side object") + .doesNotContainKey(sideObjectKey); + } + + /** + * Pre-existing happy path: a recoverable with NO incomplete tail (parts only) must still work. + */ + @Test + void recoverWithoutIncompleteTailStillWorks() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + + // Write exactly 20 bytes => two full parts, currentPartSize=0, no side object on persist. + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + out.write(bytes('A', 10), 0, 10); + out.write(bytes('B', 10), 0, 10); + RecoverableWriter.ResumeRecoverable r = out.persist(); + assertThat(((NativeS3Recoverable) r).incompleteObjectName()) + .as("no tail => no side object") + .isNull(); + assertThat(s3.storedObjects).isEmpty(); + + // Resume and append another 10 bytes, commit. + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream resumed = writer2.recover(r); + resumed.write(bytes('C', 10), 0, 10); + resumed.closeForCommit().commit(); + + assertThat(s3.committedObjects.get(KEY)) + .containsExactly(concat(bytes('A', 10), bytes('B', 10), bytes('C', 10))); + } + + /** If the side object is gone from S3, recover() must fail cleanly and clean up local state. */ + @Test + void recoverFailsCleanlyWhenSideObjectMissing() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + out.write(bytes('A', 10), 0, 10); + out.write(bytes('E', 5), 0, 5); + NativeS3Recoverable r = (NativeS3Recoverable) out.persist(); + String sideObjectKey = r.incompleteObjectName(); + assertThat(sideObjectKey).isNotNull(); + + s3.storedObjects.remove(sideObjectKey); + long localFilesBefore = countLocalFilesIn(tmp); + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + + assertThatThrownBy(() -> writer2.recover(r)) + .isInstanceOf(IOException.class) + .hasMessageContaining("not found"); + + assertThat(countLocalFilesIn(tmp)) + .as("partial download must be cleaned up on failure") + .isEqualTo(localFilesBefore); + } + + /** + * If the side object is the wrong length, recover() must fail and clean up the partial file. + */ + @Test + void recoverFailsCleanlyOnLengthMismatch() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + out.write(bytes('A', 10), 0, 10); + out.write(bytes('E', 5), 0, 5); + NativeS3Recoverable r = (NativeS3Recoverable) out.persist(); + String sideObjectKey = r.incompleteObjectName(); + + // Corrupt the side object so its actual length disagrees with the metadata. + s3.storedObjects.put(sideObjectKey, bytes('X', 99)); + + long localFilesBefore = countLocalFilesIn(tmp); + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + + assertThatThrownBy(() -> writer2.recover(r)) + .isInstanceOf(IOException.class) + .hasMessageContaining("unexpected length"); + + assertThat(countLocalFilesIn(tmp)) + .as("partial download must be cleaned up on failure") + .isEqualTo(localFilesBefore); + } + + /** + * persist → recover → persist → recover chain. Models multiple checkpoint cycles, each one + * recovering from the previous. Every persisted byte must end up in the committed object. + */ + @Test + void recoverThenPersistThenRecoverPreservesAllBytes() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + + // Cycle 1: write 12 bytes (10 -> part, 2 -> tail), persist. + NativeS3RecoverableWriter w1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream s1 = w1.open(new Path("s3://" + BUCKET + "/" + KEY)); + s1.write(bytes('A', 10), 0, 10); + s1.write(bytes('B', 2), 0, 2); + NativeS3Recoverable r1 = (NativeS3Recoverable) s1.persist(); + String firstSideObject = r1.incompleteObjectName(); + assertThat(firstSideObject).isNotNull(); + + // Cycle 2: recover from r1, write 3 bytes => total tail now 2+3=5 (still < minPartSize), + // persist again => new side object containing the combined 5-byte tail. + NativeS3RecoverableWriter w2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream s2 = w2.recover(r1); + s2.write(bytes('C', 3), 0, 3); + NativeS3Recoverable r2 = (NativeS3Recoverable) s2.persist(); + String secondSideObject = r2.incompleteObjectName(); + assertThat(secondSideObject).isNotEqualTo(firstSideObject); + assertThat(s3.storedObjects.get(secondSideObject)) + .as("second side object must contain old tail + new bytes") + .containsExactly(concat(bytes('B', 2), bytes('C', 3))); + + // Cycle 3: recover from r2, write 7 more bytes => tail becomes 5+7=12 >= minPartSize => + // upload. + // Then commit. + NativeS3RecoverableWriter w3 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream s3stream = w3.recover(r2); + s3stream.write(bytes('D', 7), 0, 7); + s3stream.closeForCommit().commit(); + + // Final object must be A*10 + B*2 + C*3 + D*7 = 22 bytes. + assertThat(s3.committedObjects.get(KEY)) + .containsExactly( + concat(bytes('A', 10), bytes('B', 2), bytes('C', 3), bytes('D', 7))); + + // Both old side objects survive until cleanupRecoverableState is called per checkpoint. + assertThat(s3.storedObjects).containsKeys(firstSideObject, secondSideObject); + assertThat(w3.cleanupRecoverableState(r1)).isTrue(); + assertThat(w3.cleanupRecoverableState(r2)).isTrue(); + assertThat(s3.storedObjects).doesNotContainKeys(firstSideObject, secondSideObject); + } + + private static long countLocalFilesIn(java.nio.file.Path dir) throws IOException { + if (!java.nio.file.Files.isDirectory(dir)) { + return 0; + } + try (java.util.stream.Stream s = java.nio.file.Files.list(dir)) { + return s.count(); + } + } + + private static byte[] bytes(char c, int n) { + byte[] b = new byte[n]; + Arrays.fill(b, (byte) c); + return b; + } + + private static byte[] concat(byte[]... chunks) { + int total = 0; + for (byte[] c : chunks) { + total += c.length; + } + byte[] out = new byte[total]; + int off = 0; + for (byte[] c : chunks) { + System.arraycopy(c, 0, out, off, c.length); + off += c.length; + } + return out; + } +} From 606ffa40bef9b9c71602e08ede9da0b7b1b252e2 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Wed, 3 Jun 2026 23:48:11 +0530 Subject: [PATCH 2/2] Address to review comments --- ...NativeS3RecoverableFsDataOutputStream.java | 24 +++++------ .../writer/NativeS3RecoverableWriter.java | 43 +++++++++++-------- .../writer/InMemoryNativeS3Operations.java | 4 +- 3 files changed, 39 insertions(+), 32 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java index fb39f861a81f0..b23baec578877 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java @@ -116,8 +116,8 @@ public NativeS3RecoverableFsDataOutputStream( long minPartSize, List existingParts, long numBytesInParts, - @Nullable File seedTailFile, - long seedTailLength) + @Nullable File incompleteTailFile, + long incompleteTailLength) throws IOException { this.s3AccessHelper = s3AccessHelper; this.key = key; @@ -130,29 +130,29 @@ public NativeS3RecoverableFsDataOutputStream( this.currentPartSize = 0; this.closed = false; - if (seedTailFile != null) { - adoptSeedTailFile(seedTailFile, seedTailLength); + if (incompleteTailFile != null) { + resumeFromIncompleteTail(incompleteTailFile, incompleteTailLength); } else { createNewTempFile(); } } - private void adoptSeedTailFile(File seedFile, long expectedLength) throws IOException { - if (!seedFile.exists()) { - throw new IOException("Seed tail file does not exist: " + seedFile); + private void resumeFromIncompleteTail(File tailFile, long expectedLength) throws IOException { + if (!tailFile.exists()) { + throw new IOException("Incomplete-tail file does not exist: " + tailFile); } - long actualLength = seedFile.length(); + final long actualLength = tailFile.length(); if (actualLength != expectedLength) { throw new IOException( - "Seed tail file " - + seedFile + "Incomplete-tail file " + + tailFile + " has unexpected length: expected " + expectedLength + " got " + actualLength); } - currentTempFile = seedFile; - // Append mode so subsequent writes land after the seeded bytes. + currentTempFile = tailFile; + // Append mode so subsequent writes land after the recovered bytes. currentFileStream = new FileOutputStream(currentTempFile, true); currentOutputStream = new BufferedOutputStream(currentFileStream, BUFFER_SIZE); currentPartSize = expectedLength; diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java index 37faf1bd1b417..cfd67f2e5d5f2 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java @@ -87,24 +87,24 @@ public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverabl @Override public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException { checkNotClosed(); - NativeS3Recoverable s3recoverable = castToNativeS3Recoverable(recoverable); + final NativeS3Recoverable s3recoverable = castToNativeS3Recoverable(recoverable); - File seedTail = null; - long seedTailLength = 0L; + File incompleteTail = null; + long incompleteTailLength = 0L; if (s3recoverable.incompleteObjectName() != null) { - seedTail = downloadIncompleteTail(s3recoverable); - seedTailLength = s3recoverable.incompleteObjectLength(); + incompleteTail = downloadIncompleteTail(s3recoverable); + incompleteTailLength = s3recoverable.incompleteObjectLength(); } try { LOG.debug( - "Resuming stream - key: {}, uploadId: {}, parts: {}, bytesInParts: {}, seedTail: {} ({} bytes)", + "Resuming stream - key: {}, uploadId: {}, parts: {}, bytesInParts: {}, incompleteTail: {} ({} bytes)", s3recoverable.getObjectName(), s3recoverable.uploadId(), s3recoverable.parts().size(), s3recoverable.numBytesInParts(), s3recoverable.incompleteObjectName(), - seedTailLength); + incompleteTailLength); return new NativeS3RecoverableFsDataOutputStream( s3AccessHelper, s3recoverable.getObjectName(), @@ -113,14 +113,14 @@ public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) thro userDefinedMinPartSize, s3recoverable.parts(), s3recoverable.numBytesInParts(), - seedTail, - seedTailLength); + incompleteTail, + incompleteTailLength); } catch (Throwable t) { - // The downloaded seed file is owned by recover() until the constructor takes + // The downloaded tail file is owned by recover() until the constructor takes // ownership. If construction fails, drop the local file so we don't leak it. - if (seedTail != null) { + if (incompleteTail != null) { try { - Files.deleteIfExists(seedTail.toPath()); + Files.deleteIfExists(incompleteTail.toPath()); } catch (IOException cleanup) { t.addSuppressed(cleanup); } @@ -137,22 +137,29 @@ public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) thro * retired. */ private File downloadIncompleteTail(NativeS3Recoverable s3recoverable) throws IOException { - File tmpDir = new File(localTmpDir); + final File tmpDir = new File(localTmpDir); if (!tmpDir.exists() && !tmpDir.mkdirs()) { throw new IOException("Cannot create local tmp dir: " + localTmpDir); } - File target = new File(tmpDir, "s3-resume-" + UUID.randomUUID()); + final File target = new File(tmpDir, "s3-resume-" + UUID.randomUUID()); try { - long downloaded = + final long downloaded = s3AccessHelper.getObject(s3recoverable.incompleteObjectName(), target); if (downloaded != s3recoverable.incompleteObjectLength()) { throw new IOException( "Incomplete-tail object " + s3recoverable.incompleteObjectName() - + " has unexpected length: expected " + + " has unexpected length (expected " + s3recoverable.incompleteObjectLength() - + " got " - + downloaded); + + " bytes, got " + + downloaded + + " bytes). The side object holding the in-flight tail " + + "has been truncated, overwritten, or replaced out-of-band " + + "since the checkpoint was taken. Recovery cannot proceed: " + + "the writer state is inconsistent with the checkpoint and " + + "this failure is NOT retriable from the same checkpoint. " + + "Either restore the side object to its original length or " + + "roll back to an earlier checkpoint that does not reference it."); } return target; } catch (IOException e) { diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java index 8e70b13d3f56f..39bfe6a4d5cd5 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; /** @@ -70,6 +69,7 @@ public final class InMemoryNativeS3Operations extends NativeS3ObjectOperations { private final String bucketName; private final AtomicInteger uploadIdSeq = new AtomicInteger(); + private final AtomicInteger putObjectSeq = new AtomicInteger(); public InMemoryNativeS3Operations() { this(DEFAULT_BUCKET); @@ -107,7 +107,7 @@ public UploadPartResult uploadPart( @Override public PutObjectResult putObject(String key, File file) throws IOException { storedObjects.put(key, Files.readAllBytes(file.toPath())); - return new PutObjectResult("etag-" + UUID.randomUUID()); + return new PutObjectResult("etag-put-" + putObjectSeq.incrementAndGet()); } @Override