Skip to content

Commit 1de1d73

Browse files
committed
Add DataStream.writeTo() and IO utilities
Add writeTo(OutputStream) to DataStream interface with optimized overrides in all implementations: - ByteBufferDataStream: writes directly from backing array - InputStreamDataStream: delegates to transferTo - PublisherDataStream: subscribes and writes buffers directly - EmptyDataStream: no-op - WrappedDataStream: delegates to inner stream Also: - Fix ByteBufferDataStream.asInputStream() to use buffer.duplicate() so the stream remains replayable - Add ByteBufferOutputStream utility class - Add ByteBufferUtils.wrap(ByteBuffer) helper
1 parent e84a3a4 commit 1de1d73

12 files changed

Lines changed: 314 additions & 4 deletions

io/src/main/java/software/amazon/smithy/java/io/ByteBufferOutputStream.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,35 @@ public int size() {
6868
return count;
6969
}
7070

71+
/**
72+
* Returns the internal buffer array.
73+
*
74+
* <p>This provides direct access to avoid copying. The valid data is from index 0 to {@link #size()} - 1.
75+
* The buffer may be larger than the valid data. Do not hold onto a reference to this data.
76+
*
77+
* @return the internal buffer
78+
*/
79+
public byte[] array() {
80+
return buf;
81+
}
82+
83+
/**
84+
* Writes an ASCII string directly to the buffer.
85+
* Each character is cast to a byte (assumes ASCII/Latin-1 input).
86+
*
87+
* @param s the string to write
88+
*/
89+
@SuppressWarnings("deprecation")
90+
public void writeAscii(String s) {
91+
int len = s.length();
92+
if (len == 0) {
93+
return;
94+
}
95+
ensureCapacity(count + len);
96+
s.getBytes(0, len, buf, count);
97+
count += len;
98+
}
99+
71100
/**
72101
* Resets the stream to empty, allowing buffer reuse.
73102
* The internal buffer is retained, avoiding reallocation.

io/src/main/java/software/amazon/smithy/java/io/ByteBufferUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package software.amazon.smithy.java.io;
77

8+
import java.io.IOException;
89
import java.io.InputStream;
10+
import java.io.OutputStream;
911
import java.nio.ByteBuffer;
1012
import java.nio.charset.StandardCharsets;
1113
import java.util.Base64;
@@ -75,5 +77,16 @@ public int read(byte[] bytes, int off, int len) {
7577
b.get(bytes, off, len);
7678
return len;
7779
}
80+
81+
@Override
82+
public long transferTo(OutputStream out) throws IOException {
83+
// Skip buffering used in the default implementation.
84+
int remaining = b.remaining();
85+
if (remaining > 0 && b.hasArray()) {
86+
out.write(b.array(), b.arrayOffset() + b.position(), remaining);
87+
b.position(b.limit());
88+
}
89+
return remaining;
90+
}
7891
}
7992
}

io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package software.amazon.smithy.java.io.datastream;
77

8+
import java.io.IOException;
89
import java.io.InputStream;
10+
import java.io.OutputStream;
911
import java.net.http.HttpRequest;
1012
import java.nio.ByteBuffer;
1113
import java.util.concurrent.Flow;
@@ -43,7 +45,14 @@ public ByteBuffer asByteBuffer() {
4345

4446
@Override
4547
public InputStream asInputStream() {
46-
return ByteBufferUtils.byteBufferInputStream(buffer);
48+
// Use duplicate() to avoid mutating the original buffer's position,
49+
// allowing the DataStream to be replayed (isReplayable() returns true)
50+
return ByteBufferUtils.byteBufferInputStream(buffer.duplicate());
51+
}
52+
53+
@Override
54+
public void writeTo(OutputStream out) throws IOException {
55+
out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
4756
}
4857

4958
@Override

io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.io.IOException;
99
import java.io.InputStream;
10+
import java.io.OutputStream;
1011
import java.io.UncheckedIOException;
1112
import java.net.http.HttpRequest;
1213
import java.nio.ByteBuffer;
@@ -84,6 +85,22 @@ default boolean hasKnownLength() {
8485
*/
8586
InputStream asInputStream();
8687

88+
/**
89+
* Write the contents of this stream to the given output stream.
90+
*
91+
* <p>This is the preferred way to transfer data from a DataStream to an OutputStream.
92+
* Implementations may override this to avoid intermediate InputStream allocation
93+
* (e.g., writing directly from a byte array or ByteBuffer).
94+
*
95+
* @param out the output stream to write to
96+
* @throws IOException if an I/O error occurs
97+
*/
98+
default void writeTo(OutputStream out) throws IOException {
99+
try (var is = asInputStream()) {
100+
is.transferTo(out);
101+
}
102+
}
103+
87104
/**
88105
* Read the contents of the stream into a ByteBuffer by reading all bytes from {@link #asInputStream()}.
89106
*

io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package software.amazon.smithy.java.io.datastream;
77

88
import java.io.InputStream;
9+
import java.io.OutputStream;
910
import java.net.http.HttpRequest;
1011
import java.nio.ByteBuffer;
1112
import java.util.concurrent.Flow;
@@ -26,6 +27,11 @@ public InputStream asInputStream() {
2627
return InputStream.nullInputStream();
2728
}
2829

30+
@Override
31+
public void writeTo(OutputStream out) {
32+
// No-op
33+
}
34+
2935
@Override
3036
public boolean isReplayable() {
3137
return true;

io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.io.IOException;
99
import java.io.InputStream;
10+
import java.io.OutputStream;
1011
import java.io.UncheckedIOException;
1112

1213
final class InputStreamDataStream implements DataStream {
@@ -32,6 +33,11 @@ public InputStream asInputStream() {
3233
return inputStream;
3334
}
3435

36+
@Override
37+
public void writeTo(OutputStream out) throws IOException {
38+
asInputStream().transferTo(out);
39+
}
40+
3541
@Override
3642
public boolean isReplayable() {
3743
return false;

io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@
55

66
package software.amazon.smithy.java.io.datastream;
77

8+
import java.io.IOException;
89
import java.io.InputStream;
10+
import java.io.OutputStream;
911
import java.net.http.HttpResponse;
1012
import java.nio.ByteBuffer;
13+
import java.util.concurrent.CountDownLatch;
1114
import java.util.concurrent.Flow;
15+
import java.util.concurrent.atomic.AtomicReference;
1216

1317
final class PublisherDataStream implements DataStream {
1418

@@ -67,18 +71,89 @@ public InputStream asInputStream() {
6771

6872
consumed = true;
6973
var subscriber = HttpResponse.BodySubscribers.ofInputStream();
70-
var delegate = new HttpBodySubscriberAdapter<>(subscriber);
71-
subscribe(delegate);
74+
innerSubscribe(new HttpBodySubscriberAdapter<>(subscriber));
7275
return subscriber.getBody().toCompletableFuture().join();
7376
}
7477

7578
@Override
76-
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
79+
public void writeTo(OutputStream out) throws IOException {
7780
if (!isReplayable && consumed) {
7881
throw new IllegalStateException("DataStream is not replayable and has already been consumed");
7982
}
83+
consumed = true;
84+
85+
var error = new AtomicReference<Throwable>();
86+
var done = new CountDownLatch(1);
87+
88+
publisher.subscribe(new Flow.Subscriber<>() {
89+
Flow.Subscription subscription;
90+
boolean cancelled;
91+
92+
@Override
93+
public void onSubscribe(Flow.Subscription s) {
94+
this.subscription = s;
95+
s.request(Long.MAX_VALUE);
96+
}
97+
98+
@Override
99+
public void onNext(ByteBuffer buf) {
100+
if (cancelled) {
101+
return;
102+
}
103+
try {
104+
if (buf.hasArray()) {
105+
out.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
106+
} else {
107+
byte[] tmp = new byte[buf.remaining()];
108+
buf.get(tmp);
109+
out.write(tmp);
110+
}
111+
} catch (IOException e) {
112+
cancelled = true;
113+
error.set(e);
114+
subscription.cancel();
115+
done.countDown();
116+
}
117+
}
118+
119+
@Override
120+
public void onError(Throwable t) {
121+
error.set(t);
122+
done.countDown();
123+
}
124+
125+
@Override
126+
public void onComplete() {
127+
done.countDown();
128+
}
129+
});
130+
131+
try {
132+
done.await();
133+
} catch (InterruptedException e) {
134+
Thread.currentThread().interrupt();
135+
throw new IOException("Interrupted while writing publisher data", e);
136+
}
80137

138+
Throwable t = error.get();
139+
if (t instanceof IOException ioe) {
140+
throw ioe;
141+
} else if (t != null) {
142+
throw new IOException("Publisher error", t);
143+
}
144+
}
145+
146+
private void innerSubscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
81147
consumed = true;
82148
publisher.subscribe(subscriber);
83149
}
150+
151+
@Override
152+
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
153+
if (!isReplayable && consumed) {
154+
throw new IllegalStateException("DataStream is not replayable and has already been consumed");
155+
}
156+
157+
innerSubscribe(subscriber);
158+
}
84159
}

io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package software.amazon.smithy.java.io.datastream;
77

8+
import java.io.IOException;
89
import java.io.InputStream;
10+
import java.io.OutputStream;
911
import java.nio.ByteBuffer;
1012
import java.util.concurrent.Flow;
1113

@@ -33,6 +35,11 @@ public InputStream asInputStream() {
3335
return delegate.asInputStream();
3436
}
3537

38+
@Override
39+
public void writeTo(OutputStream out) throws IOException {
40+
delegate.writeTo(out);
41+
}
42+
3643
@Override
3744
public long contentLength() {
3845
return contentLength;

io/src/test/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStreamTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
import static org.hamcrest.MatcherAssert.assertThat;
99
import static org.hamcrest.Matchers.equalTo;
1010
import static org.hamcrest.Matchers.is;
11+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
1112

13+
import java.io.ByteArrayOutputStream;
14+
import java.io.IOException;
1215
import java.nio.ByteBuffer;
1316
import java.nio.charset.StandardCharsets;
1417
import org.junit.jupiter.api.Test;
@@ -31,4 +34,29 @@ public void isAlwaysAvailable() {
3134
ds.asByteBuffer();
3235
assertThat(ds.isAvailable(), is(true));
3336
}
37+
38+
@Test
39+
public void writeTo() throws IOException {
40+
var data = "hello world".getBytes(StandardCharsets.UTF_8);
41+
var ds = DataStream.ofBytes(data);
42+
var out = new ByteArrayOutputStream();
43+
44+
ds.writeTo(out);
45+
46+
assertArrayEquals(data, out.toByteArray());
47+
}
48+
49+
@Test
50+
public void writeToIsReplayable() throws IOException {
51+
var data = "replay".getBytes(StandardCharsets.UTF_8);
52+
var ds = DataStream.ofBytes(data);
53+
54+
var out1 = new ByteArrayOutputStream();
55+
ds.writeTo(out1);
56+
var out2 = new ByteArrayOutputStream();
57+
ds.writeTo(out2);
58+
59+
assertArrayEquals(data, out1.toByteArray());
60+
assertArrayEquals(data, out2.toByteArray());
61+
}
3462
}

io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@
99
import static org.hamcrest.Matchers.equalTo;
1010
import static org.hamcrest.Matchers.is;
1111
import static org.hamcrest.Matchers.nullValue;
12+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
13+
import static org.junit.jupiter.api.Assertions.assertThrows;
1214

15+
import java.io.ByteArrayInputStream;
16+
import java.io.ByteArrayOutputStream;
17+
import java.io.IOException;
1318
import java.nio.ByteBuffer;
1419
import java.nio.charset.StandardCharsets;
1520
import java.nio.file.Files;
@@ -87,4 +92,23 @@ public void isNotAvailableAfterConsumption() throws Exception {
8792
ds.asInputStream();
8893
assertThat(ds.isAvailable(), is(false));
8994
}
95+
96+
@Test
97+
public void writeTo() throws IOException {
98+
var data = "from input stream".getBytes(StandardCharsets.UTF_8);
99+
var ds = DataStream.ofInputStream(new ByteArrayInputStream(data));
100+
var out = new ByteArrayOutputStream();
101+
102+
ds.writeTo(out);
103+
104+
assertArrayEquals(data, out.toByteArray());
105+
}
106+
107+
@Test
108+
public void writeToNotReplayable() throws IOException {
109+
var ds = DataStream.ofInputStream(new ByteArrayInputStream(new byte[] {1, 2, 3}));
110+
ds.writeTo(new ByteArrayOutputStream());
111+
112+
assertThrows(IllegalStateException.class, () -> ds.writeTo(new ByteArrayOutputStream()));
113+
}
90114
}

0 commit comments

Comments
 (0)