From 6f8a066f9ea1257f80156c9c93333a20847b2989 Mon Sep 17 00:00:00 2001 From: Michael Dowling Date: Fri, 13 Mar 2026 14:41:30 -0500 Subject: [PATCH] Add DataStream.writeTo() and IO utilities Add writeTo(OutputStream) to DataStream interface with optimized overrides in all implementations: - ByteBufferDataStream: writes directly from backing array - InputStreamDataStream: delegates to transferTo - PublisherDataStream: subscribes and writes buffers directly - EmptyDataStream: no-op - WrappedDataStream: delegates to inner stream Also: - Fix ByteBufferDataStream.asInputStream() to use buffer.duplicate() so the stream remains replayable - Add ByteBufferOutputStream utility class - Add ByteBufferUtils.wrap(ByteBuffer) helper --- .../java/io/ByteBufferOutputStream.java | 29 +++++++ .../smithy/java/io/ByteBufferUtils.java | 13 +++ .../io/datastream/ByteBufferDataStream.java | 11 ++- .../smithy/java/io/datastream/DataStream.java | 17 ++++ .../java/io/datastream/EmptyDataStream.java | 6 ++ .../io/datastream/InputStreamDataStream.java | 6 ++ .../io/datastream/PublisherDataStream.java | 81 ++++++++++++++++++- .../java/io/datastream/WrappedDataStream.java | 7 ++ .../datastream/ByteBufferDataStreamTest.java | 28 +++++++ .../datastream/InputStreamDataStreamTest.java | 24 ++++++ .../datastream/PublisherDataStreamTest.java | 81 +++++++++++++++++++ .../io/datastream/WrappedDataStreamTest.java | 15 ++++ 12 files changed, 314 insertions(+), 4 deletions(-) create mode 100644 io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java diff --git a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java index d23de9911..187ff6869 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java @@ -68,6 +68,35 @@ public int size() { return count; } + /** + * Returns the internal buffer array. + * + *

This provides direct access to avoid copying. The valid data is from index 0 to {@link #size()} - 1. + * The buffer may be larger than the valid data. Do not hold onto a reference to this data. + * + * @return the internal buffer + */ + public byte[] array() { + return buf; + } + + /** + * Writes an ASCII string directly to the buffer. + * Each character is cast to a byte (assumes ASCII/Latin-1 input). + * + * @param s the string to write + */ + @SuppressWarnings("deprecation") + public void writeAscii(String s) { + int len = s.length(); + if (len == 0) { + return; + } + ensureCapacity(count + len); + s.getBytes(0, len, buf, count); + count += len; + } + /** * Resets the stream to empty, allowing buffer reuse. * The internal buffer is retained, avoiding reallocation. diff --git a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java index 1e237e9be..9b0be82e7 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java +++ b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java @@ -5,7 +5,9 @@ package software.amazon.smithy.java.io; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Base64; @@ -75,5 +77,16 @@ public int read(byte[] bytes, int off, int len) { b.get(bytes, off, len); return len; } + + @Override + public long transferTo(OutputStream out) throws IOException { + // Skip buffering used in the default implementation. + int remaining = b.remaining(); + if (remaining > 0 && b.hasArray()) { + out.write(b.array(), b.arrayOffset() + b.position(), remaining); + b.position(b.limit()); + } + return remaining; + } } } diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java index 0bd2b9647..ddb621311 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java @@ -5,7 +5,9 @@ package software.amazon.smithy.java.io.datastream; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.http.HttpRequest; import java.nio.ByteBuffer; import java.util.concurrent.Flow; @@ -43,7 +45,14 @@ public ByteBuffer asByteBuffer() { @Override public InputStream asInputStream() { - return ByteBufferUtils.byteBufferInputStream(buffer); + // Use duplicate() to avoid mutating the original buffer's position, + // allowing the DataStream to be replayed (isReplayable() returns true) + return ByteBufferUtils.byteBufferInputStream(buffer.duplicate()); + } + + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } @Override diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java index 5e180d43c..e824c6d9b 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.http.HttpRequest; import java.nio.ByteBuffer; @@ -84,6 +85,22 @@ default boolean hasKnownLength() { */ InputStream asInputStream(); + /** + * Write the contents of this stream to the given output stream. + * + *

This is the preferred way to transfer data from a DataStream to an OutputStream. + * Implementations may override this to avoid intermediate InputStream allocation + * (e.g., writing directly from a byte array or ByteBuffer). + * + * @param out the output stream to write to + * @throws IOException if an I/O error occurs + */ + default void writeTo(OutputStream out) throws IOException { + try (var is = asInputStream()) { + is.transferTo(out); + } + } + /** * Read the contents of the stream into a ByteBuffer by reading all bytes from {@link #asInputStream()}. * diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java index 47766564e..1e1b53fcc 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java @@ -6,6 +6,7 @@ package software.amazon.smithy.java.io.datastream; import java.io.InputStream; +import java.io.OutputStream; import java.net.http.HttpRequest; import java.nio.ByteBuffer; import java.util.concurrent.Flow; @@ -26,6 +27,11 @@ public InputStream asInputStream() { return InputStream.nullInputStream(); } + @Override + public void writeTo(OutputStream out) { + // No-op + } + @Override public boolean isReplayable() { return true; diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java index d6619912a..971485b8b 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.UncheckedIOException; final class InputStreamDataStream implements DataStream { @@ -32,6 +33,11 @@ public InputStream asInputStream() { return inputStream; } + @Override + public void writeTo(OutputStream out) throws IOException { + asInputStream().transferTo(out); + } + @Override public boolean isReplayable() { return false; diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java index 92a34f670..50264adcf 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java @@ -5,10 +5,14 @@ package software.amazon.smithy.java.io.datastream; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.http.HttpResponse; import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicReference; final class PublisherDataStream implements DataStream { @@ -67,18 +71,89 @@ public InputStream asInputStream() { consumed = true; var subscriber = HttpResponse.BodySubscribers.ofInputStream(); - var delegate = new HttpBodySubscriberAdapter<>(subscriber); - subscribe(delegate); + innerSubscribe(new HttpBodySubscriberAdapter<>(subscriber)); return subscriber.getBody().toCompletableFuture().join(); } @Override - public void subscribe(Flow.Subscriber subscriber) { + public void writeTo(OutputStream out) throws IOException { if (!isReplayable && consumed) { throw new IllegalStateException("DataStream is not replayable and has already been consumed"); } + consumed = true; + + var error = new AtomicReference(); + var done = new CountDownLatch(1); + + publisher.subscribe(new Flow.Subscriber<>() { + Flow.Subscription subscription; + boolean cancelled; + + @Override + public void onSubscribe(Flow.Subscription s) { + this.subscription = s; + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ByteBuffer buf) { + if (cancelled) { + return; + } + try { + if (buf.hasArray()) { + out.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + } else { + byte[] tmp = new byte[buf.remaining()]; + buf.get(tmp); + out.write(tmp); + } + } catch (IOException e) { + cancelled = true; + error.set(e); + subscription.cancel(); + done.countDown(); + } + } + + @Override + public void onError(Throwable t) { + error.set(t); + done.countDown(); + } + + @Override + public void onComplete() { + done.countDown(); + } + }); + + try { + done.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while writing publisher data", e); + } + Throwable t = error.get(); + if (t instanceof IOException ioe) { + throw ioe; + } else if (t != null) { + throw new IOException("Publisher error", t); + } + } + + private void innerSubscribe(Flow.Subscriber subscriber) { consumed = true; publisher.subscribe(subscriber); } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + if (!isReplayable && consumed) { + throw new IllegalStateException("DataStream is not replayable and has already been consumed"); + } + + innerSubscribe(subscriber); + } } diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java index b2646d4b6..73ad10209 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java @@ -5,7 +5,9 @@ package software.amazon.smithy.java.io.datastream; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.concurrent.Flow; @@ -33,6 +35,11 @@ public InputStream asInputStream() { return delegate.asInputStream(); } + @Override + public void writeTo(OutputStream out) throws IOException { + delegate.writeTo(out); + } + @Override public long contentLength() { return contentLength; diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java index 9341c97e6..1f9a07732 100644 --- a/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java +++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java @@ -8,7 +8,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Test; @@ -31,4 +34,29 @@ public void isAlwaysAvailable() { ds.asByteBuffer(); assertThat(ds.isAvailable(), is(true)); } + + @Test + public void writeTo() throws IOException { + var data = "hello world".getBytes(StandardCharsets.UTF_8); + var ds = DataStream.ofBytes(data); + var out = new ByteArrayOutputStream(); + + ds.writeTo(out); + + assertArrayEquals(data, out.toByteArray()); + } + + @Test + public void writeToIsReplayable() throws IOException { + var data = "replay".getBytes(StandardCharsets.UTF_8); + var ds = DataStream.ofBytes(data); + + var out1 = new ByteArrayOutputStream(); + ds.writeTo(out1); + var out2 = new ByteArrayOutputStream(); + ds.writeTo(out2); + + assertArrayEquals(data, out1.toByteArray()); + assertArrayEquals(data, out2.toByteArray()); + } } diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java index d509715d4..e37823e0f 100644 --- a/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java +++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java @@ -9,7 +9,12 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -87,4 +92,23 @@ public void isNotAvailableAfterConsumption() throws Exception { ds.asInputStream(); assertThat(ds.isAvailable(), is(false)); } + + @Test + public void writeTo() throws IOException { + var data = "from input stream".getBytes(StandardCharsets.UTF_8); + var ds = DataStream.ofInputStream(new ByteArrayInputStream(data)); + var out = new ByteArrayOutputStream(); + + ds.writeTo(out); + + assertArrayEquals(data, out.toByteArray()); + } + + @Test + public void writeToNotReplayable() throws IOException { + var ds = DataStream.ofInputStream(new ByteArrayInputStream(new byte[] {1, 2, 3})); + ds.writeTo(new ByteArrayOutputStream()); + + assertThrows(IllegalStateException.class, () -> ds.writeTo(new ByteArrayOutputStream())); + } } diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java new file mode 100644 index 000000000..7e76db460 --- /dev/null +++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java @@ -0,0 +1,81 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.io.datastream; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.SubmissionPublisher; +import org.junit.jupiter.api.Test; + +class PublisherDataStreamTest { + + @Test + void writeTo() throws IOException { + var chunk1 = "hello ".getBytes(StandardCharsets.UTF_8); + var chunk2 = "world".getBytes(StandardCharsets.UTF_8); + + var publisher = new SubmissionPublisher(); + var ds = DataStream.ofPublisher(publisher, null, -1); + var out = new ByteArrayOutputStream(); + + // Run writeTo on a virtual thread so it subscribes to the publisher + // before items are submitted, avoiding a race where close() fires + // before the subscription is established. + var writeThread = Thread.startVirtualThread(() -> { + try { + ds.writeTo(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + // Wait for writeTo's subscriber to be registered. + while (publisher.getNumberOfSubscribers() < 1) { + Thread.onSpinWait(); + } + + publisher.submit(ByteBuffer.wrap(chunk1)); + publisher.submit(ByteBuffer.wrap(chunk2)); + publisher.close(); + + try { + writeThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertArrayEquals("hello world".getBytes(StandardCharsets.UTF_8), out.toByteArray()); + } + + @Test + void writeToNotReplayable() throws IOException { + var publisher = new SubmissionPublisher(); + var ds = DataStream.ofPublisher(publisher, null, -1); + + Thread.startVirtualThread(publisher::close); + ds.writeTo(new ByteArrayOutputStream()); + + assertThrows(IllegalStateException.class, () -> ds.writeTo(new ByteArrayOutputStream())); + } + + @Test + void writeToEmpty() throws IOException { + var publisher = new SubmissionPublisher(); + var ds = DataStream.ofPublisher(publisher, null, 0); + var out = new ByteArrayOutputStream(); + + Thread.startVirtualThread(publisher::close); + ds.writeTo(out); + + assertEquals(0, out.size()); + } +} diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java index c99e027e1..1d7044365 100644 --- a/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java +++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java @@ -8,8 +8,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Test; @@ -33,4 +36,16 @@ public void delegatesIsAvailableToUnderlyingStream() { ds.asInputStream(); assertThat(wrapped.isAvailable(), is(false)); } + + @Test + public void writeToDelegates() throws IOException { + var data = "wrapped".getBytes(StandardCharsets.UTF_8); + var inner = DataStream.ofBytes(data); + var ds = DataStream.withMetadata(inner, "text/plain", (long) data.length, true); + var out = new ByteArrayOutputStream(); + + ds.writeTo(out); + + assertArrayEquals(data, out.toByteArray()); + } }