diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java index 56e7729112607..b23baec578877 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.io.BufferedOutputStream; @@ -95,6 +96,29 @@ public NativeS3RecoverableFsDataOutputStream( List existingParts, long numBytesInParts) throws IOException { + this( + s3AccessHelper, + key, + uploadId, + localTmpDir, + minPartSize, + existingParts, + numBytesInParts, + null, + 0L); + } + + public NativeS3RecoverableFsDataOutputStream( + NativeS3ObjectOperations s3AccessHelper, + String key, + String uploadId, + String localTmpDir, + long minPartSize, + List existingParts, + long numBytesInParts, + @Nullable File incompleteTailFile, + long incompleteTailLength) + throws IOException { this.s3AccessHelper = s3AccessHelper; this.key = key; this.uploadId = uploadId; @@ -106,7 +130,32 @@ public NativeS3RecoverableFsDataOutputStream( this.currentPartSize = 0; this.closed = false; - createNewTempFile(); + if (incompleteTailFile != null) { + resumeFromIncompleteTail(incompleteTailFile, incompleteTailLength); + } else { + createNewTempFile(); + } + } + + private void resumeFromIncompleteTail(File tailFile, long expectedLength) throws IOException { + if (!tailFile.exists()) { + throw new IOException("Incomplete-tail file does not exist: " + tailFile); + } + final long actualLength = tailFile.length(); + if (actualLength != expectedLength) { + throw new IOException( + "Incomplete-tail file " + + tailFile + + " has unexpected length: expected " + + expectedLength + + " got " + + actualLength); + } + currentTempFile = tailFile; + // Append mode so subsequent writes land after the recovered bytes. + currentFileStream = new FileOutputStream(currentTempFile, true); + currentOutputStream = new BufferedOutputStream(currentFileStream, BUFFER_SIZE); + currentPartSize = expectedLength; } private void createNewTempFile() throws IOException { diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java index 942b1d3e80ba0..cfd67f2e5d5f2 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java @@ -27,7 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -84,15 +87,93 @@ public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverabl @Override public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException { checkNotClosed(); - NativeS3Recoverable s3recoverable = castToNativeS3Recoverable(recoverable); - return new NativeS3RecoverableFsDataOutputStream( - s3AccessHelper, - s3recoverable.getObjectName(), - s3recoverable.uploadId(), - localTmpDir, - userDefinedMinPartSize, - s3recoverable.parts(), - s3recoverable.numBytesInParts()); + final NativeS3Recoverable s3recoverable = castToNativeS3Recoverable(recoverable); + + File incompleteTail = null; + long incompleteTailLength = 0L; + if (s3recoverable.incompleteObjectName() != null) { + incompleteTail = downloadIncompleteTail(s3recoverable); + incompleteTailLength = s3recoverable.incompleteObjectLength(); + } + + try { + LOG.debug( + "Resuming stream - key: {}, uploadId: {}, parts: {}, bytesInParts: {}, incompleteTail: {} ({} bytes)", + s3recoverable.getObjectName(), + s3recoverable.uploadId(), + s3recoverable.parts().size(), + s3recoverable.numBytesInParts(), + s3recoverable.incompleteObjectName(), + incompleteTailLength); + return new NativeS3RecoverableFsDataOutputStream( + s3AccessHelper, + s3recoverable.getObjectName(), + s3recoverable.uploadId(), + localTmpDir, + userDefinedMinPartSize, + s3recoverable.parts(), + s3recoverable.numBytesInParts(), + incompleteTail, + incompleteTailLength); + } catch (Throwable t) { + // The downloaded tail file is owned by recover() until the constructor takes + // ownership. If construction fails, drop the local file so we don't leak it. + if (incompleteTail != null) { + try { + Files.deleteIfExists(incompleteTail.toPath()); + } catch (IOException cleanup) { + t.addSuppressed(cleanup); + } + } + throw t; + } + } + + /** + * Downloads the side object holding the previously-persisted sub-part-size tail into a fresh + * file under {@link #localTmpDir}. The side object itself is left in place so that a repeated + * recovery from the same checkpoint remains correct; cleanup is the responsibility of {@link + * #cleanupRecoverableState(ResumeRecoverable)} which Flink invokes when the checkpoint is + * retired. + */ + private File downloadIncompleteTail(NativeS3Recoverable s3recoverable) throws IOException { + final File tmpDir = new File(localTmpDir); + if (!tmpDir.exists() && !tmpDir.mkdirs()) { + throw new IOException("Cannot create local tmp dir: " + localTmpDir); + } + final File target = new File(tmpDir, "s3-resume-" + UUID.randomUUID()); + try { + final long downloaded = + s3AccessHelper.getObject(s3recoverable.incompleteObjectName(), target); + if (downloaded != s3recoverable.incompleteObjectLength()) { + throw new IOException( + "Incomplete-tail object " + + s3recoverable.incompleteObjectName() + + " has unexpected length (expected " + + s3recoverable.incompleteObjectLength() + + " bytes, got " + + downloaded + + " bytes). The side object holding the in-flight tail " + + "has been truncated, overwritten, or replaced out-of-band " + + "since the checkpoint was taken. Recovery cannot proceed: " + + "the writer state is inconsistent with the checkpoint and " + + "this failure is NOT retriable from the same checkpoint. " + + "Either restore the side object to its original length or " + + "roll back to an earlier checkpoint that does not reference it."); + } + return target; + } catch (IOException e) { + try { + Files.deleteIfExists(target.toPath()); + } catch (IOException cleanup) { + LOG.warn( + "Failed to delete partial download {} after error: {}", + target, + cleanup.getMessage()); + e.addSuppressed(cleanup); + } + throw e; + } } @Override diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java new file mode 100644 index 0000000000000..39bfe6a4d5cd5 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/InMemoryNativeS3Operations.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.writer; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * In-memory implementation for {@link NativeS3ObjectOperations}. + * + *

Backs every reachable S3 operation with hash maps so writer/committer logic can be exercised + * without an S3 endpoint (no MinIO/Testcontainers required). The parent's {@code S3Client} / {@code + * S3TransferManager} constructor arguments are passed as {@code null} because no overridden method + * dereferences them. + * + *

State exposure: the storage maps are exposed as public final fields so tests can + * inspect them, corrupt them, or simulate object loss directly: + * + *

+ * + *

{@link #getObject} reads from both {@link #storedObjects} and {@link + * #committedObjects} so tests can fetch a committed object the same way real S3 would serve it. + * + *

Thread safety: not thread-safe. Use a single thread per instance, matching the + * single-thread invariant of the production {@link NativeS3RecoverableFsDataOutputStream}. + */ +public final class InMemoryNativeS3Operations extends NativeS3ObjectOperations { + + public static final String DEFAULT_BUCKET = "test-bucket"; + + /** Keys written via {@link #putObject(String, File)}. */ + public final Map storedObjects = new HashMap<>(); + + /** Keys finalized via {@link #commitMultiPartUpload}. */ + public final Map committedObjects = new HashMap<>(); + + /** uploadId → partNumber → uploaded bytes for in-flight MPUs. */ + public final Map> openMultipartUploads = new HashMap<>(); + + private final String bucketName; + private final AtomicInteger uploadIdSeq = new AtomicInteger(); + private final AtomicInteger putObjectSeq = new AtomicInteger(); + + public InMemoryNativeS3Operations() { + this(DEFAULT_BUCKET); + } + + public InMemoryNativeS3Operations(String bucketName) { + super(/* s3Client */ null, /* transferManager */ null, bucketName, /* useAsync */ false); + this.bucketName = bucketName; + } + + @Override + public String startMultiPartUpload(String key) { + String uploadId = "U" + uploadIdSeq.incrementAndGet(); + openMultipartUploads.put(uploadId, new HashMap<>()); + return uploadId; + } + + @Override + public UploadPartResult uploadPart( + String key, String uploadId, int partNumber, File file, long length) + throws IOException { + Map parts = openMultipartUploads.get(uploadId); + if (parts == null) { + throw new IOException("unknown uploadId: " + uploadId); + } + byte[] data = Files.readAllBytes(file.toPath()); + if (data.length != length) { + throw new IOException( + "part length mismatch: expected " + length + ", got " + data.length); + } + parts.put(partNumber, data); + return new UploadPartResult(partNumber, "etag-" + uploadId + "-" + partNumber); + } + + @Override + public PutObjectResult putObject(String key, File file) throws IOException { + storedObjects.put(key, Files.readAllBytes(file.toPath())); + return new PutObjectResult("etag-put-" + putObjectSeq.incrementAndGet()); + } + + @Override + public long getObject(String key, File targetLocation) throws IOException { + byte[] data = storedObjects.get(key); + if (data == null) { + data = committedObjects.get(key); + } + if (data == null) { + throw new IOException("not found: " + key); + } + Files.write(targetLocation.toPath(), data); + return data.length; + } + + @Override + public CompleteMultipartUploadResult commitMultiPartUpload( + String key, String uploadId, List parts, long length) + throws IOException { + Map uploaded = openMultipartUploads.remove(uploadId); + if (uploaded == null) { + throw new IOException("unknown uploadId: " + uploadId); + } + List ordered = new ArrayList<>(parts.size()); + for (UploadPartResult p : parts) { + ordered.add(p.getPartNumber()); + } + Collections.sort(ordered); + ByteArrayOutputStream merged = new ByteArrayOutputStream(); + for (int n : ordered) { + byte[] partData = uploaded.get(n); + if (partData == null) { + throw new IOException("missing part " + n + " for uploadId " + uploadId); + } + merged.write(partData); + } + byte[] finalBytes = merged.toByteArray(); + if (finalBytes.length != length) { + throw new IOException( + "committed length mismatch: expected " + length + ", got " + finalBytes.length); + } + committedObjects.put(key, finalBytes); + return new CompleteMultipartUploadResult(bucketName, key, "final-etag-" + uploadId, null); + } + + @Override + public void abortMultiPartUpload(String key, String uploadId) { + openMultipartUploads.remove(uploadId); + } + + @Override + public boolean deleteObject(String key) { + return storedObjects.remove(key) != null || committedObjects.remove(key) != null; + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java new file mode 100644 index 0000000000000..3748719efc9dc --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link NativeS3RecoverableWriter#recover}. */ +class NativeS3RecoverableWriterRecoveryTest { + + private static final String BUCKET = InMemoryNativeS3Operations.DEFAULT_BUCKET; + private static final String KEY = "out.txt"; + private static final long MIN_PART_SIZE = 10L; + + @TempDir java.nio.file.Path tmp; + + @Test + void persistThenRecoverPreservesTailBytes() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer( + s3, tmp.toString(), MIN_PART_SIZE, /* maxConcurrent */ 1); + + // --- Phase 1: write 15 bytes; minPartSize=10 ⇒ 10B uploaded as part #1, 5B tail in memory. + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + byte[] firstChunk = bytes('A', 10); // becomes part #1 + byte[] tail = bytes('E', 5); // becomes the persisted side object + out.write(firstChunk, 0, firstChunk.length); + out.write(tail, 0, tail.length); + + // --- Phase 2: checkpoint barrier: persist() and round-trip metadata through the + // serializer (this is what Flink does when storing checkpoint state). + RecoverableWriter.ResumeRecoverable r = out.persist(); + byte[] checkpointed = + NativeS3RecoverableSerializer.INSTANCE.serialize((NativeS3Recoverable) r); + + // Sanity: the 5-byte tail is sitting in S3 as a side object right now. + assertThat(s3.storedObjects).hasSize(1); + String sideObjectKey = s3.storedObjects.keySet().iterator().next(); + assertThat(sideObjectKey).startsWith(KEY + "/.incomplete/"); + assertThat(s3.storedObjects.get(sideObjectKey)).containsExactly(tail); + + // --- Phase 3: simulate task crash + restore from checkpoint. We deliberately do NOT + // close `out` because in a real crash the JVM dies; close() would also abort + // the MPU and invalidate the recoverable. + NativeS3Recoverable restored = + NativeS3RecoverableSerializer.INSTANCE.deserialize( + NativeS3RecoverableSerializer.INSTANCE.getVersion(), checkpointed); + assertThat(restored.incompleteObjectName()) + .as("metadata survives the checkpoint round-trip") + .isEqualTo(sideObjectKey); + assertThat(restored.incompleteObjectLength()).isEqualTo(tail.length); + + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream resumed = writer2.recover(restored); + + // --- Phase 4: continue writing 10 more bytes, then commit. + byte[] afterResume = bytes('F', 10); + resumed.write(afterResume, 0, afterResume.length); + RecoverableFsDataOutputStream.Committer committer = resumed.closeForCommit(); + committer.commit(); + + // --- Phase 5: read what landed in S3. + byte[] finalObject = s3.committedObjects.get(KEY); + assertThat(finalObject).isNotNull(); + + // EXPECTED (correct exactly-once): 25 bytes — "AAAAAAAAAA" + "EEEEE" + "FFFFFFFFFF" + ByteArrayOutputStream expected = new ByteArrayOutputStream(); + expected.write(firstChunk); + expected.write(tail); + expected.write(afterResume); + + // ON UN-FIXED CODE this assertion FAILS: + // expected length 25 ("AAAAAAAAAAEEEEEFFFFFFFFFF") + // actual length 20 ("AAAAAAAAAAFFFFFFFFFF") + // The 5 'E' bytes — the tail durably persisted to S3 at checkpoint time — would be gone. + assertThat(finalObject) + .as("recover() must replay the persisted tail before continuing") + .containsExactly(expected.toByteArray()); + + // The side object is intentionally NOT deleted by recover() so that re-recovery from the + // same checkpoint stays correct. Cleanup is owned by cleanupRecoverableState(), which + // Flink invokes once the checkpoint is retired. + assertThat(s3.storedObjects) + .as("side object outlives recover() to support re-recovery from same checkpoint") + .containsKey(sideObjectKey); + assertThat(writer2.cleanupRecoverableState(restored)).isTrue(); + assertThat(s3.storedObjects) + .as("cleanupRecoverableState must delete the side object") + .doesNotContainKey(sideObjectKey); + } + + /** + * Pre-existing happy path: a recoverable with NO incomplete tail (parts only) must still work. + */ + @Test + void recoverWithoutIncompleteTailStillWorks() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + + // Write exactly 20 bytes => two full parts, currentPartSize=0, no side object on persist. + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + out.write(bytes('A', 10), 0, 10); + out.write(bytes('B', 10), 0, 10); + RecoverableWriter.ResumeRecoverable r = out.persist(); + assertThat(((NativeS3Recoverable) r).incompleteObjectName()) + .as("no tail => no side object") + .isNull(); + assertThat(s3.storedObjects).isEmpty(); + + // Resume and append another 10 bytes, commit. + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream resumed = writer2.recover(r); + resumed.write(bytes('C', 10), 0, 10); + resumed.closeForCommit().commit(); + + assertThat(s3.committedObjects.get(KEY)) + .containsExactly(concat(bytes('A', 10), bytes('B', 10), bytes('C', 10))); + } + + /** If the side object is gone from S3, recover() must fail cleanly and clean up local state. */ + @Test + void recoverFailsCleanlyWhenSideObjectMissing() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + out.write(bytes('A', 10), 0, 10); + out.write(bytes('E', 5), 0, 5); + NativeS3Recoverable r = (NativeS3Recoverable) out.persist(); + String sideObjectKey = r.incompleteObjectName(); + assertThat(sideObjectKey).isNotNull(); + + s3.storedObjects.remove(sideObjectKey); + long localFilesBefore = countLocalFilesIn(tmp); + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + + assertThatThrownBy(() -> writer2.recover(r)) + .isInstanceOf(IOException.class) + .hasMessageContaining("not found"); + + assertThat(countLocalFilesIn(tmp)) + .as("partial download must be cleaned up on failure") + .isEqualTo(localFilesBefore); + } + + /** + * If the side object is the wrong length, recover() must fail and clean up the partial file. + */ + @Test + void recoverFailsCleanlyOnLengthMismatch() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + NativeS3RecoverableWriter writer1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream out = writer1.open(new Path("s3://" + BUCKET + "/" + KEY)); + out.write(bytes('A', 10), 0, 10); + out.write(bytes('E', 5), 0, 5); + NativeS3Recoverable r = (NativeS3Recoverable) out.persist(); + String sideObjectKey = r.incompleteObjectName(); + + // Corrupt the side object so its actual length disagrees with the metadata. + s3.storedObjects.put(sideObjectKey, bytes('X', 99)); + + long localFilesBefore = countLocalFilesIn(tmp); + NativeS3RecoverableWriter writer2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + + assertThatThrownBy(() -> writer2.recover(r)) + .isInstanceOf(IOException.class) + .hasMessageContaining("unexpected length"); + + assertThat(countLocalFilesIn(tmp)) + .as("partial download must be cleaned up on failure") + .isEqualTo(localFilesBefore); + } + + /** + * persist → recover → persist → recover chain. Models multiple checkpoint cycles, each one + * recovering from the previous. Every persisted byte must end up in the committed object. + */ + @Test + void recoverThenPersistThenRecoverPreservesAllBytes() throws Exception { + InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations(); + + // Cycle 1: write 12 bytes (10 -> part, 2 -> tail), persist. + NativeS3RecoverableWriter w1 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream s1 = w1.open(new Path("s3://" + BUCKET + "/" + KEY)); + s1.write(bytes('A', 10), 0, 10); + s1.write(bytes('B', 2), 0, 2); + NativeS3Recoverable r1 = (NativeS3Recoverable) s1.persist(); + String firstSideObject = r1.incompleteObjectName(); + assertThat(firstSideObject).isNotNull(); + + // Cycle 2: recover from r1, write 3 bytes => total tail now 2+3=5 (still < minPartSize), + // persist again => new side object containing the combined 5-byte tail. + NativeS3RecoverableWriter w2 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream s2 = w2.recover(r1); + s2.write(bytes('C', 3), 0, 3); + NativeS3Recoverable r2 = (NativeS3Recoverable) s2.persist(); + String secondSideObject = r2.incompleteObjectName(); + assertThat(secondSideObject).isNotEqualTo(firstSideObject); + assertThat(s3.storedObjects.get(secondSideObject)) + .as("second side object must contain old tail + new bytes") + .containsExactly(concat(bytes('B', 2), bytes('C', 3))); + + // Cycle 3: recover from r2, write 7 more bytes => tail becomes 5+7=12 >= minPartSize => + // upload. + // Then commit. + NativeS3RecoverableWriter w3 = + NativeS3RecoverableWriter.writer(s3, tmp.toString(), MIN_PART_SIZE, 1); + RecoverableFsDataOutputStream s3stream = w3.recover(r2); + s3stream.write(bytes('D', 7), 0, 7); + s3stream.closeForCommit().commit(); + + // Final object must be A*10 + B*2 + C*3 + D*7 = 22 bytes. + assertThat(s3.committedObjects.get(KEY)) + .containsExactly( + concat(bytes('A', 10), bytes('B', 2), bytes('C', 3), bytes('D', 7))); + + // Both old side objects survive until cleanupRecoverableState is called per checkpoint. + assertThat(s3.storedObjects).containsKeys(firstSideObject, secondSideObject); + assertThat(w3.cleanupRecoverableState(r1)).isTrue(); + assertThat(w3.cleanupRecoverableState(r2)).isTrue(); + assertThat(s3.storedObjects).doesNotContainKeys(firstSideObject, secondSideObject); + } + + private static long countLocalFilesIn(java.nio.file.Path dir) throws IOException { + if (!java.nio.file.Files.isDirectory(dir)) { + return 0; + } + try (java.util.stream.Stream s = java.nio.file.Files.list(dir)) { + return s.count(); + } + } + + private static byte[] bytes(char c, int n) { + byte[] b = new byte[n]; + Arrays.fill(b, (byte) c); + return b; + } + + private static byte[] concat(byte[]... chunks) { + int total = 0; + for (byte[] c : chunks) { + total += c.length; + } + byte[] out = new byte[total]; + int off = 0; + for (byte[] c : chunks) { + System.arraycopy(c, 0, out, off, c.length); + off += c.length; + } + return out; + } +}