Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,35 @@ public int size() {
return count;
}

/**
* Returns the internal buffer array.
*
* <p>This provides direct access to avoid copying. The valid data is from index 0 to {@link #size()} - 1.
* The buffer may be larger than the valid data. Do not hold onto a reference to this data.
*
* @return the internal buffer
*/
public byte[] array() {
return buf;
}

/**
* Writes an ASCII string directly to the buffer.
* Each character is cast to a byte (assumes ASCII/Latin-1 input).
*
* @param s the string to write
*/
@SuppressWarnings("deprecation")
public void writeAscii(String s) {
int len = s.length();
if (len == 0) {
return;
}
ensureCapacity(count + len);
s.getBytes(0, len, buf, count);
count += len;
}

/**
* Resets the stream to empty, allowing buffer reuse.
* The internal buffer is retained, avoiding reallocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package software.amazon.smithy.java.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
Expand Down Expand Up @@ -75,5 +77,16 @@ public int read(byte[] bytes, int off, int len) {
b.get(bytes, off, len);
return len;
}

@Override
public long transferTo(OutputStream out) throws IOException {
// Skip buffering used in the default implementation.
int remaining = b.remaining();
if (remaining > 0 && b.hasArray()) {
out.write(b.array(), b.arrayOffset() + b.position(), remaining);
b.position(b.limit());
}
return remaining;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package software.amazon.smithy.java.io.datastream;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
Expand Down Expand Up @@ -43,7 +45,14 @@ public ByteBuffer asByteBuffer() {

@Override
public InputStream asInputStream() {
return ByteBufferUtils.byteBufferInputStream(buffer);
// Use duplicate() to avoid mutating the original buffer's position,
// allowing the DataStream to be replayed (isReplayable() returns true)
return ByteBufferUtils.byteBufferInputStream(buffer.duplicate());
}

@Override
public void writeTo(OutputStream out) throws IOException {
out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -84,6 +85,22 @@ default boolean hasKnownLength() {
*/
InputStream asInputStream();

/**
* Write the contents of this stream to the given output stream.
*
* <p>This is the preferred way to transfer data from a DataStream to an OutputStream.
* Implementations may override this to avoid intermediate InputStream allocation
* (e.g., writing directly from a byte array or ByteBuffer).
*
* @param out the output stream to write to
* @throws IOException if an I/O error occurs
*/
default void writeTo(OutputStream out) throws IOException {
try (var is = asInputStream()) {
is.transferTo(out);
}
}

/**
* Read the contents of the stream into a ByteBuffer by reading all bytes from {@link #asInputStream()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package software.amazon.smithy.java.io.datastream;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
Expand All @@ -26,6 +27,11 @@ public InputStream asInputStream() {
return InputStream.nullInputStream();
}

@Override
public void writeTo(OutputStream out) {
// No-op
}

@Override
public boolean isReplayable() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;

final class InputStreamDataStream implements DataStream {
Expand All @@ -32,6 +33,11 @@ public InputStream asInputStream() {
return inputStream;
}

@Override
public void writeTo(OutputStream out) throws IOException {
asInputStream().transferTo(out);
}

@Override
public boolean isReplayable() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@

package software.amazon.smithy.java.io.datastream;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;

final class PublisherDataStream implements DataStream {

Expand Down Expand Up @@ -67,18 +71,89 @@ public InputStream asInputStream() {

consumed = true;
var subscriber = HttpResponse.BodySubscribers.ofInputStream();
var delegate = new HttpBodySubscriberAdapter<>(subscriber);
subscribe(delegate);
innerSubscribe(new HttpBodySubscriberAdapter<>(subscriber));
return subscriber.getBody().toCompletableFuture().join();
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
public void writeTo(OutputStream out) throws IOException {
if (!isReplayable && consumed) {
throw new IllegalStateException("DataStream is not replayable and has already been consumed");
}
consumed = true;

var error = new AtomicReference<Throwable>();
var done = new CountDownLatch(1);

publisher.subscribe(new Flow.Subscriber<>() {
Flow.Subscription subscription;
boolean cancelled;

@Override
public void onSubscribe(Flow.Subscription s) {
this.subscription = s;
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(ByteBuffer buf) {
if (cancelled) {
return;
}
try {
if (buf.hasArray()) {
out.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
} else {
byte[] tmp = new byte[buf.remaining()];
buf.get(tmp);
out.write(tmp);
}
} catch (IOException e) {
cancelled = true;
error.set(e);
subscription.cancel();
done.countDown();
}
}

@Override
public void onError(Throwable t) {
error.set(t);
done.countDown();
}

@Override
public void onComplete() {
done.countDown();
}
});

try {
done.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while writing publisher data", e);
}

Throwable t = error.get();
if (t instanceof IOException ioe) {
throw ioe;
} else if (t != null) {
throw new IOException("Publisher error", t);
}
}

private void innerSubscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
consumed = true;
publisher.subscribe(subscriber);
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
if (!isReplayable && consumed) {
throw new IllegalStateException("DataStream is not replayable and has already been consumed");
}

innerSubscribe(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package software.amazon.smithy.java.io.datastream;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;

Expand Down Expand Up @@ -33,6 +35,11 @@ public InputStream asInputStream() {
return delegate.asInputStream();
}

@Override
public void writeTo(OutputStream out) throws IOException {
delegate.writeTo(out);
}

@Override
public long contentLength() {
return contentLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
Expand All @@ -31,4 +34,29 @@ public void isAlwaysAvailable() {
ds.asByteBuffer();
assertThat(ds.isAvailable(), is(true));
}

@Test
public void writeTo() throws IOException {
var data = "hello world".getBytes(StandardCharsets.UTF_8);
var ds = DataStream.ofBytes(data);
var out = new ByteArrayOutputStream();

ds.writeTo(out);

assertArrayEquals(data, out.toByteArray());
}

@Test
public void writeToIsReplayable() throws IOException {
var data = "replay".getBytes(StandardCharsets.UTF_8);
var ds = DataStream.ofBytes(data);

var out1 = new ByteArrayOutputStream();
ds.writeTo(out1);
var out2 = new ByteArrayOutputStream();
ds.writeTo(out2);

assertArrayEquals(data, out1.toByteArray());
assertArrayEquals(data, out2.toByteArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -87,4 +92,23 @@ public void isNotAvailableAfterConsumption() throws Exception {
ds.asInputStream();
assertThat(ds.isAvailable(), is(false));
}

@Test
public void writeTo() throws IOException {
var data = "from input stream".getBytes(StandardCharsets.UTF_8);
var ds = DataStream.ofInputStream(new ByteArrayInputStream(data));
var out = new ByteArrayOutputStream();

ds.writeTo(out);

assertArrayEquals(data, out.toByteArray());
}

@Test
public void writeToNotReplayable() throws IOException {
var ds = DataStream.ofInputStream(new ByteArrayInputStream(new byte[] {1, 2, 3}));
ds.writeTo(new ByteArrayOutputStream());

assertThrows(IllegalStateException.class, () -> ds.writeTo(new ByteArrayOutputStream()));
}
}
Loading
Loading