Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import software.amazon.smithy.java.aws.events.AwsEventDecoderFactory;
import software.amazon.smithy.java.aws.events.AwsEventEncoderFactory;
import software.amazon.smithy.java.aws.events.AwsEventFrame;
import software.amazon.smithy.java.aws.events.RpcEventStreamsUtil;
import software.amazon.smithy.java.client.http.AmznErrorHeaderExtractor;
import software.amazon.smithy.java.client.http.HttpClientProtocol;
import software.amazon.smithy.java.client.http.HttpErrorDeserializer;
Expand All @@ -17,6 +21,9 @@
import software.amazon.smithy.java.core.schema.SerializableStruct;
import software.amazon.smithy.java.core.serde.Codec;
import software.amazon.smithy.java.core.serde.TypeRegistry;
import software.amazon.smithy.java.core.serde.event.EventDecoderFactory;
import software.amazon.smithy.java.core.serde.event.EventEncoderFactory;
import software.amazon.smithy.java.core.serde.event.EventStreamingException;
import software.amazon.smithy.java.http.api.HttpHeaders;
import software.amazon.smithy.java.http.api.HttpRequest;
import software.amazon.smithy.java.http.api.HttpResponse;
Expand Down Expand Up @@ -77,14 +84,21 @@ public <I extends SerializableStruct, O extends SerializableStruct> HttpRequest
var builder = HttpRequest.builder();
builder.method("POST");
builder.uri(endpoint);
builder.headers(
HttpHeaders.of(
Map.of(
"x-amz-target",
List.of(target),
"content-type",
List.of(contentType()))));

if (operation.inputEventBuilderSupplier() != null) {
// Event streaming
var encoderFactory = getEventEncoderFactory(operation);
var body = RpcEventStreamsUtil.bodyForEventStreaming(encoderFactory, input);
builder.headers(HttpHeaders.of(headersForEventStreaming(target)))
.body(body);
} else {
builder.headers(
HttpHeaders.of(
Map.of(
"x-amz-target",
List.of(target),
"content-type",
List.of(contentType()))));
}
return builder.body(DataStream.ofByteBuffer(codec.serialize(input), contentType())).build();
}

Expand All @@ -96,11 +110,16 @@ public <I extends SerializableStruct, O extends SerializableStruct> O deserializ
HttpRequest request,
HttpResponse response
) {
// Is it an error?
// TODO Is it an error?
if (response.statusCode() != 200) {
throw errorDeserializer.createError(context, operation, typeRegistry, response);
}

if (operation.outputEventBuilderSupplier() != null) {
var eventDecoderFactory = getEventDecoderFactory(operation);
return RpcEventStreamsUtil.deserializeResponse(eventDecoderFactory, bodyDataStream(response));
}

var builder = operation.outputBuilder();
var content = response.body();

Expand All @@ -112,4 +131,30 @@ public <I extends SerializableStruct, O extends SerializableStruct> O deserializ
var bytes = content.asByteBuffer();
return codec.deserializeShape(bytes, builder);
}

private EventEncoderFactory<AwsEventFrame> getEventEncoderFactory(ApiOperation<?, ?> operation) {
return AwsEventEncoderFactory.forInputStream(operation,
payloadCodec(),
contentType(),
(e) -> new EventStreamingException("InternalServerException", "Internal Server Error"));
}

private EventDecoderFactory<AwsEventFrame> getEventDecoderFactory(ApiOperation<?, ?> operation) {
return AwsEventDecoderFactory.forOutputStream(operation, payloadCodec(), f -> f);
}

private Map<String, List<String>> headersForEventStreaming(String target) {
return Map.of("x-amz-target",
List.of(target),
"content-type",
List.of("application/vnd.amazon.eventstream"),
"Accept",
List.of(contentType()));
}

private static DataStream bodyDataStream(HttpResponse response) {
var contentType = response.headers().contentType();
var contentLength = response.headers().contentLength();
return DataStream.withMetadata(response.body(), contentType, contentLength, null);
}
}