diff --git a/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java b/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java index 9aa980061..fd09e81d3 100644 --- a/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java +++ b/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java @@ -9,6 +9,10 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import software.amazon.smithy.java.aws.events.AwsEventDecoderFactory; +import software.amazon.smithy.java.aws.events.AwsEventEncoderFactory; +import software.amazon.smithy.java.aws.events.AwsEventFrame; +import software.amazon.smithy.java.aws.events.RpcEventStreamsUtil; import software.amazon.smithy.java.client.http.AmznErrorHeaderExtractor; import software.amazon.smithy.java.client.http.HttpClientProtocol; import software.amazon.smithy.java.client.http.HttpErrorDeserializer; @@ -17,6 +21,9 @@ import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.serde.Codec; import software.amazon.smithy.java.core.serde.TypeRegistry; +import software.amazon.smithy.java.core.serde.event.EventDecoderFactory; +import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; +import software.amazon.smithy.java.core.serde.event.EventStreamingException; import software.amazon.smithy.java.http.api.HttpHeaders; import software.amazon.smithy.java.http.api.HttpRequest; import software.amazon.smithy.java.http.api.HttpResponse; @@ -77,14 +84,21 @@ public HttpRequest var builder = HttpRequest.builder(); builder.method("POST"); builder.uri(endpoint); - builder.headers( - HttpHeaders.of( - Map.of( - "x-amz-target", - List.of(target), - "content-type", - List.of(contentType())))); - + if (operation.inputEventBuilderSupplier() != null) { + // Event streaming + var encoderFactory = getEventEncoderFactory(operation); + var body = RpcEventStreamsUtil.bodyForEventStreaming(encoderFactory, input); + builder.headers(HttpHeaders.of(headersForEventStreaming(target))) + .body(body); + } else { + builder.headers( + HttpHeaders.of( + Map.of( + "x-amz-target", + List.of(target), + "content-type", + List.of(contentType())))); + } return builder.body(DataStream.ofByteBuffer(codec.serialize(input), contentType())).build(); } @@ -96,11 +110,16 @@ public O deserializ HttpRequest request, HttpResponse response ) { - // Is it an error? + // TODO Is it an error? if (response.statusCode() != 200) { throw errorDeserializer.createError(context, operation, typeRegistry, response); } + if (operation.outputEventBuilderSupplier() != null) { + var eventDecoderFactory = getEventDecoderFactory(operation); + return RpcEventStreamsUtil.deserializeResponse(eventDecoderFactory, bodyDataStream(response)); + } + var builder = operation.outputBuilder(); var content = response.body(); @@ -112,4 +131,30 @@ public O deserializ var bytes = content.asByteBuffer(); return codec.deserializeShape(bytes, builder); } + + private EventEncoderFactory getEventEncoderFactory(ApiOperation operation) { + return AwsEventEncoderFactory.forInputStream(operation, + payloadCodec(), + contentType(), + (e) -> new EventStreamingException("InternalServerException", "Internal Server Error")); + } + + private EventDecoderFactory getEventDecoderFactory(ApiOperation operation) { + return AwsEventDecoderFactory.forOutputStream(operation, payloadCodec(), f -> f); + } + + private Map> headersForEventStreaming(String target) { + return Map.of("x-amz-target", + List.of(target), + "content-type", + List.of("application/vnd.amazon.eventstream"), + "Accept", + List.of(contentType())); + } + + private static DataStream bodyDataStream(HttpResponse response) { + var contentType = response.headers().contentType(); + var contentLength = response.headers().contentLength(); + return DataStream.withMetadata(response.body(), contentType, contentLength, null); + } }