diff --git a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java
index d23de9911..187ff6869 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java
@@ -68,6 +68,35 @@ public int size() {
return count;
}
+ /**
+ * Returns the internal buffer array.
+ *
+ *
This provides direct access to avoid copying. The valid data is from index 0 to {@link #size()} - 1.
+ * The buffer may be larger than the valid data. Do not hold onto a reference to this data.
+ *
+ * @return the internal buffer
+ */
+ public byte[] array() {
+ return buf;
+ }
+
+ /**
+ * Writes an ASCII string directly to the buffer.
+ * Each character is cast to a byte (assumes ASCII/Latin-1 input).
+ *
+ * @param s the string to write
+ */
+ @SuppressWarnings("deprecation")
+ public void writeAscii(String s) {
+ int len = s.length();
+ if (len == 0) {
+ return;
+ }
+ ensureCapacity(count + len);
+ s.getBytes(0, len, buf, count);
+ count += len;
+ }
+
/**
* Resets the stream to empty, allowing buffer reuse.
* The internal buffer is retained, avoiding reallocation.
diff --git a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java
index 1e237e9be..9b0be82e7 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java
@@ -5,7 +5,9 @@
package software.amazon.smithy.java.io;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@@ -75,5 +77,16 @@ public int read(byte[] bytes, int off, int len) {
b.get(bytes, off, len);
return len;
}
+
+ @Override
+ public long transferTo(OutputStream out) throws IOException {
+ // Skip buffering used in the default implementation.
+ int remaining = b.remaining();
+ if (remaining > 0 && b.hasArray()) {
+ out.write(b.array(), b.arrayOffset() + b.position(), remaining);
+ b.position(b.limit());
+ }
+ return remaining;
+ }
}
}
diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java
index 0bd2b9647..ddb621311 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java
@@ -5,7 +5,9 @@
package software.amazon.smithy.java.io.datastream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
@@ -43,7 +45,14 @@ public ByteBuffer asByteBuffer() {
@Override
public InputStream asInputStream() {
- return ByteBufferUtils.byteBufferInputStream(buffer);
+ // Use duplicate() to avoid mutating the original buffer's position,
+ // allowing the DataStream to be replayed (isReplayable() returns true)
+ return ByteBufferUtils.byteBufferInputStream(buffer.duplicate());
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
}
@Override
diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java
index 5e180d43c..e824c6d9b 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java
@@ -7,6 +7,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
@@ -84,6 +85,22 @@ default boolean hasKnownLength() {
*/
InputStream asInputStream();
+ /**
+ * Write the contents of this stream to the given output stream.
+ *
+ *
This is the preferred way to transfer data from a DataStream to an OutputStream.
+ * Implementations may override this to avoid intermediate InputStream allocation
+ * (e.g., writing directly from a byte array or ByteBuffer).
+ *
+ * @param out the output stream to write to
+ * @throws IOException if an I/O error occurs
+ */
+ default void writeTo(OutputStream out) throws IOException {
+ try (var is = asInputStream()) {
+ is.transferTo(out);
+ }
+ }
+
/**
* Read the contents of the stream into a ByteBuffer by reading all bytes from {@link #asInputStream()}.
*
diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java
index 47766564e..1e1b53fcc 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java
@@ -6,6 +6,7 @@
package software.amazon.smithy.java.io.datastream;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
@@ -26,6 +27,11 @@ public InputStream asInputStream() {
return InputStream.nullInputStream();
}
+ @Override
+ public void writeTo(OutputStream out) {
+ // No-op
+ }
+
@Override
public boolean isReplayable() {
return true;
diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java
index d6619912a..971485b8b 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java
@@ -7,6 +7,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.UncheckedIOException;
final class InputStreamDataStream implements DataStream {
@@ -32,6 +33,11 @@ public InputStream asInputStream() {
return inputStream;
}
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ asInputStream().transferTo(out);
+ }
+
@Override
public boolean isReplayable() {
return false;
diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java
index 92a34f670..50264adcf 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java
@@ -5,10 +5,14 @@
package software.amazon.smithy.java.io.datastream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicReference;
final class PublisherDataStream implements DataStream {
@@ -67,18 +71,89 @@ public InputStream asInputStream() {
consumed = true;
var subscriber = HttpResponse.BodySubscribers.ofInputStream();
- var delegate = new HttpBodySubscriberAdapter<>(subscriber);
- subscribe(delegate);
+ innerSubscribe(new HttpBodySubscriberAdapter<>(subscriber));
return subscriber.getBody().toCompletableFuture().join();
}
@Override
- public void subscribe(Flow.Subscriber super ByteBuffer> subscriber) {
+ public void writeTo(OutputStream out) throws IOException {
if (!isReplayable && consumed) {
throw new IllegalStateException("DataStream is not replayable and has already been consumed");
}
+ consumed = true;
+
+ var error = new AtomicReference();
+ var done = new CountDownLatch(1);
+
+ publisher.subscribe(new Flow.Subscriber<>() {
+ Flow.Subscription subscription;
+ boolean cancelled;
+
+ @Override
+ public void onSubscribe(Flow.Subscription s) {
+ this.subscription = s;
+ s.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(ByteBuffer buf) {
+ if (cancelled) {
+ return;
+ }
+ try {
+ if (buf.hasArray()) {
+ out.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+ } else {
+ byte[] tmp = new byte[buf.remaining()];
+ buf.get(tmp);
+ out.write(tmp);
+ }
+ } catch (IOException e) {
+ cancelled = true;
+ error.set(e);
+ subscription.cancel();
+ done.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ error.set(t);
+ done.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ done.countDown();
+ }
+ });
+
+ try {
+ done.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while writing publisher data", e);
+ }
+ Throwable t = error.get();
+ if (t instanceof IOException ioe) {
+ throw ioe;
+ } else if (t != null) {
+ throw new IOException("Publisher error", t);
+ }
+ }
+
+ private void innerSubscribe(Flow.Subscriber super ByteBuffer> subscriber) {
consumed = true;
publisher.subscribe(subscriber);
}
+
+ @Override
+ public void subscribe(Flow.Subscriber super ByteBuffer> subscriber) {
+ if (!isReplayable && consumed) {
+ throw new IllegalStateException("DataStream is not replayable and has already been consumed");
+ }
+
+ innerSubscribe(subscriber);
+ }
}
diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java
index b2646d4b6..73ad10209 100644
--- a/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java
+++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java
@@ -5,7 +5,9 @@
package software.amazon.smithy.java.io.datastream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
@@ -33,6 +35,11 @@ public InputStream asInputStream() {
return delegate.asInputStream();
}
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ delegate.writeTo(out);
+ }
+
@Override
public long contentLength() {
return contentLength;
diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java
index 9341c97e6..1f9a07732 100644
--- a/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java
+++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java
@@ -8,7 +8,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
@@ -31,4 +34,29 @@ public void isAlwaysAvailable() {
ds.asByteBuffer();
assertThat(ds.isAvailable(), is(true));
}
+
+ @Test
+ public void writeTo() throws IOException {
+ var data = "hello world".getBytes(StandardCharsets.UTF_8);
+ var ds = DataStream.ofBytes(data);
+ var out = new ByteArrayOutputStream();
+
+ ds.writeTo(out);
+
+ assertArrayEquals(data, out.toByteArray());
+ }
+
+ @Test
+ public void writeToIsReplayable() throws IOException {
+ var data = "replay".getBytes(StandardCharsets.UTF_8);
+ var ds = DataStream.ofBytes(data);
+
+ var out1 = new ByteArrayOutputStream();
+ ds.writeTo(out1);
+ var out2 = new ByteArrayOutputStream();
+ ds.writeTo(out2);
+
+ assertArrayEquals(data, out1.toByteArray());
+ assertArrayEquals(data, out2.toByteArray());
+ }
}
diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java
index d509715d4..e37823e0f 100644
--- a/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java
+++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java
@@ -9,7 +9,12 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -87,4 +92,23 @@ public void isNotAvailableAfterConsumption() throws Exception {
ds.asInputStream();
assertThat(ds.isAvailable(), is(false));
}
+
+ @Test
+ public void writeTo() throws IOException {
+ var data = "from input stream".getBytes(StandardCharsets.UTF_8);
+ var ds = DataStream.ofInputStream(new ByteArrayInputStream(data));
+ var out = new ByteArrayOutputStream();
+
+ ds.writeTo(out);
+
+ assertArrayEquals(data, out.toByteArray());
+ }
+
+ @Test
+ public void writeToNotReplayable() throws IOException {
+ var ds = DataStream.ofInputStream(new ByteArrayInputStream(new byte[] {1, 2, 3}));
+ ds.writeTo(new ByteArrayOutputStream());
+
+ assertThrows(IllegalStateException.class, () -> ds.writeTo(new ByteArrayOutputStream()));
+ }
}
diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java
new file mode 100644
index 000000000..7e76db460
--- /dev/null
+++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package software.amazon.smithy.java.io.datastream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.SubmissionPublisher;
+import org.junit.jupiter.api.Test;
+
+class PublisherDataStreamTest {
+
+ @Test
+ void writeTo() throws IOException {
+ var chunk1 = "hello ".getBytes(StandardCharsets.UTF_8);
+ var chunk2 = "world".getBytes(StandardCharsets.UTF_8);
+
+ var publisher = new SubmissionPublisher();
+ var ds = DataStream.ofPublisher(publisher, null, -1);
+ var out = new ByteArrayOutputStream();
+
+ // Run writeTo on a virtual thread so it subscribes to the publisher
+ // before items are submitted, avoiding a race where close() fires
+ // before the subscription is established.
+ var writeThread = Thread.startVirtualThread(() -> {
+ try {
+ ds.writeTo(out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Wait for writeTo's subscriber to be registered.
+ while (publisher.getNumberOfSubscribers() < 1) {
+ Thread.onSpinWait();
+ }
+
+ publisher.submit(ByteBuffer.wrap(chunk1));
+ publisher.submit(ByteBuffer.wrap(chunk2));
+ publisher.close();
+
+ try {
+ writeThread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ assertArrayEquals("hello world".getBytes(StandardCharsets.UTF_8), out.toByteArray());
+ }
+
+ @Test
+ void writeToNotReplayable() throws IOException {
+ var publisher = new SubmissionPublisher();
+ var ds = DataStream.ofPublisher(publisher, null, -1);
+
+ Thread.startVirtualThread(publisher::close);
+ ds.writeTo(new ByteArrayOutputStream());
+
+ assertThrows(IllegalStateException.class, () -> ds.writeTo(new ByteArrayOutputStream()));
+ }
+
+ @Test
+ void writeToEmpty() throws IOException {
+ var publisher = new SubmissionPublisher();
+ var ds = DataStream.ofPublisher(publisher, null, 0);
+ var out = new ByteArrayOutputStream();
+
+ Thread.startVirtualThread(publisher::close);
+ ds.writeTo(out);
+
+ assertEquals(0, out.size());
+ }
+}
diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java
index c99e027e1..1d7044365 100644
--- a/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java
+++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java
@@ -8,8 +8,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
@@ -33,4 +36,16 @@ public void delegatesIsAvailableToUnderlyingStream() {
ds.asInputStream();
assertThat(wrapped.isAvailable(), is(false));
}
+
+ @Test
+ public void writeToDelegates() throws IOException {
+ var data = "wrapped".getBytes(StandardCharsets.UTF_8);
+ var inner = DataStream.ofBytes(data);
+ var ds = DataStream.withMetadata(inner, "text/plain", (long) data.length, true);
+ var out = new ByteArrayOutputStream();
+
+ ds.writeTo(out);
+
+ assertArrayEquals(data, out.toByteArray());
+ }
}