|
1 | 1 | package com.example.scalaspringexperiment.util |
2 | 2 |
|
3 | 3 | import io.circe.Json |
4 | | -import io.circe.parser.decode |
| 4 | +import io.circe.parser |
5 | 5 | import org.reactivestreams.Publisher |
6 | 6 | import org.springframework.core.ResolvableType |
7 | 7 | import org.springframework.core.codec.Decoder |
8 | | -import org.springframework.core.io.buffer.DataBuffer |
9 | | -import org.springframework.core.io.buffer.DataBufferUtils |
10 | | -import org.springframework.http.{MediaType, ResponseEntity} |
| 8 | +import org.springframework.core.io.buffer.{DataBuffer, DataBufferUtils} |
| 9 | +import org.springframework.http.MediaType |
11 | 10 | import org.springframework.util.MimeType |
12 | | -import reactor.core.publisher.Mono |
13 | | -import reactor.core.publisher.Flux |
| 11 | +import reactor.core.publisher.{Flux, Mono} |
14 | 12 |
|
15 | 13 | import java.nio.charset.StandardCharsets |
16 | 14 | import java.util |
17 | | -import java.lang.reflect.Type |
18 | 15 |
|
19 | 16 | class CirceJsonDecoder extends Decoder[io.circe.Json] { |
20 | 17 |
|
21 | | - override def canDecode(elementType: ResolvableType, mimeType: MimeType): Boolean = { |
| 18 | + override def canDecode( |
| 19 | + elementType: ResolvableType, |
| 20 | + mimeType: MimeType |
| 21 | + ): Boolean = { |
22 | 22 | mimeType == null || mimeType.isCompatibleWith(MediaType.APPLICATION_JSON) |
23 | 23 | } |
24 | 24 |
|
25 | | - override def decode(input: Publisher[DataBuffer], elementType: ResolvableType, mimeType: MimeType, hints: util.Map[String, AnyRef]): Flux[io.circe.Json] = { |
26 | | - Flux.from(input).flatMap { buffer => |
27 | | - val jsonStr = StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString |
28 | | - DataBufferUtils.release(buffer) |
29 | | - io.circe.parser.decode[io.circe.Json](jsonStr) match { |
30 | | - case Right(json) => Flux.just(json) |
31 | | - case Left(err) => Flux.error(new RuntimeException(s"JSON decoding error: ${err.getMessage}")) |
| 25 | + |
| 26 | + /** |
| 27 | + * TODO: there may be performance issues with how this method handles incoming JSON bodies; |
| 28 | + * particularly that it waits for the entire body to be read before decoding it. |
| 29 | + * need to investigate if this is the best approach to use with WebFlux |
| 30 | + * |
| 31 | + * @param input |
| 32 | + * @param elementType |
| 33 | + * @param mimeType |
| 34 | + * @param hints |
| 35 | + * @return |
| 36 | + */ |
| 37 | + override def decode( |
| 38 | + input: Publisher[DataBuffer], |
| 39 | + elementType: ResolvableType, |
| 40 | + mimeType: MimeType, |
| 41 | + hints: util.Map[String, AnyRef] |
| 42 | + ): Flux[Json] = { |
| 43 | + DataBufferUtils.join(Flux.from(input)).flatMap { joinedBuffer => |
| 44 | + val inputStream = joinedBuffer.asInputStream() |
| 45 | + try { |
| 46 | + val jsonStr = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8) |
| 47 | + parser.decode[Json](jsonStr) match { |
| 48 | + case Right(json) => Mono.just(json) |
| 49 | + case Left(err) => Mono.error(err) |
| 50 | + } |
| 51 | + } finally { |
| 52 | + DataBufferUtils.release(joinedBuffer) |
| 53 | + inputStream.close() |
32 | 54 | } |
33 | | - } |
| 55 | + }.flux() |
34 | 56 | } |
35 | 57 |
|
36 | | - override def decodeToMono(input: Publisher[DataBuffer], elementType: ResolvableType, mimeType: MimeType, hints: util.Map[String, AnyRef]): Mono[io.circe.Json] = { |
| 58 | + override def decodeToMono( |
| 59 | + input: Publisher[DataBuffer], |
| 60 | + elementType: ResolvableType, |
| 61 | + mimeType: MimeType, |
| 62 | + hints: util.Map[String, AnyRef] |
| 63 | + ): Mono[io.circe.Json] = { |
37 | 64 | decode(input, elementType, mimeType, hints).single() |
38 | 65 | } |
39 | 66 |
|
|
0 commit comments