Skip to content

Commit 78dca5b

Browse files
aseigoAaron Seigosleipnir
authored
Support grpcweb trailers encoded in the message (#481)
* Support grpcweb's in-message-encoded trailers As the Fetch API does not expose HTTP trailers to the Javascript runtime, grpcweb mandates that tailers are included in the message payload with the most-significant bit of the leading byte (flags) set to 1. What follows is a run-length-encoded block of text that follows the same formatting as normal headers. Most extant grpcweb libraries written in JS/TS are lenient about this and will happily forego receiving trailers. However, some are more picky about this and REQUIRE trailers (the buf.read connect libraries are an example of this). GRPC.Server follows the spec when sending protos over grpcweb, allowing errors and other custom trailers to be sent in a way that is visible to the client. GRPC.Message also now recognizes trailers and parses them appropriately: it extracts partial-buffer messages using the run-length encoding bytes (which it was previously quietly ignoring, which would also allow malformed buffers due to e.g. network problems sneak through anwyays), it respects the trailers flag, and returns appropriate data in each of these cases. The GRPC client now also works with embedded trailers. Overhead for non-grpcweb should be nominal as new code paths are hidden behind grpcweb checks, while the additional binary checks are placed in front the error paths (so errors may be nominally slower to be reached, but the happy paths should be untouched). * mix format a few files * more formatting juggling to make the formatter running CI happy * specify only two parts, even if the more colons appear in the header line * Refactor: grpcweb trailer sending in its own function Reads cleaner, perhaps a bit more idiomatic as well. * call `stream_grpcweb_trailers` from `send_error_trailers` This *may* require that the status was already sent, as well as the state passed to `send_error_trailers` to decided whether or not to send grpcweb trailers. And so: * `send_error_trailers` now uses `check_sent_resp` instead of doing that check itself * `check_sent_resp` now takes an optional `status`, with 200 as the default * state is passed to `send_error_trailers` * `send_error_trailers` calls `stream_grpcweb_trailers` before `cowboy_req.stream_trailers` (which closes the connection as trailers implies fin) * return the req from send_error_trailers * the next action taken can depend on the request being grpcweb when the request isn't a grpcweb request, then it's business as usual. otherwise, the grpcweb trailers must be sent first, as they may cause a body to be sent. only after checking for grpcweb can the regular trailers be sent once the state of the req is confirmed, namely whether or not a reply has been started already or if a full reply must be initiated. * ignore .expert dirs * fixup formating * remove tooling ignores. put them in your global gitignore instead, if desired. --------- Co-authored-by: Aaron Seigo <aseigo@yaybr.com> Co-authored-by: Adriano Santos <solid.sistemas@gmail.com>
1 parent db0948a commit 78dca5b

6 files changed

Lines changed: 166 additions & 59 deletions

File tree

.gitignore

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,4 @@ erl_crash.dump
2222

2323
/log
2424

25-
.elixir_ls
26-
27-
2825
grpc-*.tar

grpc_client/lib/grpc/client/adapters/gun.ex

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,15 @@ defmodule GRPC.Client.Adapters.Gun do
206206
defp handle_nofin_response(adapter_payload, payload, stream, headers, opts) do
207207
# Regular response: fetch body and trailers
208208
with {:ok, body, trailers} <- recv_body(adapter_payload, payload, opts),
209-
{:ok, response} <- parse_response(stream, headers, body, trailers) do
209+
{:ok, response, embedded_trailers} <- parse_response(stream, headers, body, trailers) do
210210
if opts[:return_headers] do
211-
{:ok, response, %{headers: headers, trailers: trailers}}
211+
all_trailers = Map.merge(trailers, embedded_trailers)
212+
213+
{
214+
:ok,
215+
response,
216+
%{headers: headers, trailers: all_trailers}
217+
}
212218
else
213219
{:ok, response}
214220
end
@@ -410,15 +416,26 @@ defmodule GRPC.Client.Adapters.Gun do
410416
end
411417
end
412418

413-
defp read_stream(%{buffer: buffer, need_more: false, response_mod: res_mod, codec: codec} = s) do
419+
defp read_stream(
420+
%{buffer: buffer, need_more: false, response_mod: res_mod, codec: codec, opts: opts} =
421+
stream
422+
) do
414423
case GRPC.Message.get_message(buffer) do
424+
{{:trailers, trailers}, rest} ->
425+
new_stream =
426+
stream
427+
|> update_stream_with_trailers(trailers, opts[:return_headers])
428+
|> Map.put(:buffer, rest)
429+
430+
{{:ok, trailers}, new_stream}
431+
415432
{{_, message}, rest} ->
416433
reply = codec.decode(message, res_mod)
417-
new_s = Map.put(s, :buffer, rest)
418-
{{:ok, reply}, new_s}
434+
new_stream = Map.put(stream, :buffer, rest)
435+
{{:ok, reply}, new_stream}
419436

420437
_ ->
421-
read_stream(Map.put(s, :need_more, true))
438+
read_stream(Map.put(stream, :need_more, true))
422439
end
423440
end
424441

@@ -431,8 +448,17 @@ defmodule GRPC.Client.Adapters.Gun do
431448
with :ok <- parse_trailers(trailers),
432449
compressor <- get_compressor(headers, accepted_compressors),
433450
body <- get_body(codec, body),
434-
{:ok, msg} <- GRPC.Message.from_data(%{compressor: compressor}, body) do
435-
{:ok, codec.decode(msg, res_mod)}
451+
{:ok, msg, remaining} <- GRPC.Message.from_data(%{compressor: compressor}, body) do
452+
{:ok, codec.decode(msg, res_mod), check_for_trailers(remaining, compressor)}
453+
end
454+
end
455+
456+
defp check_for_trailers(<<>>, _compressor), do: %{}
457+
458+
defp check_for_trailers(body, compressor) do
459+
case GRPC.Message.from_data(%{compressor: compressor}, body) do
460+
{:trailers, trailers, <<>>} -> trailers
461+
_ -> %{}
436462
end
437463
end
438464

grpc_core/lib/grpc/message.ex

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ defmodule GRPC.Message do
1111
Message -> *{binary octet}
1212
"""
1313

14+
import Bitwise
1415
alias GRPC.RPCError
1516

16-
@max_message_length Bitwise.bsl(1, 32 - 1)
17+
@max_message_length bsl(1, 32 - 1)
18+
@trailers_flag 0b1000_0000
1719

1820
@doc """
1921
Transforms Protobuf data into a gRPC body binary.
@@ -46,20 +48,21 @@ defmodule GRPC.Message do
4648
iolist = opts[:iolist]
4749
codec = opts[:codec]
4850
max_length = opts[:max_message_length] || @max_message_length
51+
additional_flags = opts[:message_flag] || 0
4952

50-
{compress_flag, message} =
53+
{flag, message} =
5154
if compressor do
52-
{1, compressor.compress(message)}
55+
{1 ||| additional_flags, compressor.compress(message)}
5356
else
54-
{0, message}
57+
{0 ||| additional_flags, message}
5558
end
5659

5760
length = IO.iodata_length(message)
5861

5962
if length > max_length do
6063
{:error, "Encoded message is too large (#{length} bytes)"}
6164
else
62-
result = [compress_flag, <<length::size(4)-unit(8)>>, message]
65+
result = [flag, <<length::size(4)-unit(8)>>, message]
6366

6467
result =
6568
if function_exported?(codec, :pack_for_channel, 1),
@@ -78,12 +81,14 @@ defmodule GRPC.Message do
7881
## Examples
7982
8083
iex> GRPC.Message.from_data(<<0, 0, 0, 0, 8, 1, 2, 3, 4, 5, 6, 7, 8>>)
81-
<<1, 2, 3, 4, 5, 6, 7, 8>>
84+
{<<1, 2, 3, 4, 5, 6, 7, 8>>, <<>>}
8285
"""
83-
@spec from_data(binary) :: binary
86+
@spec from_data(binary) :: {message :: binary, rest :: binary}
8487
def from_data(data) do
85-
<<_flag::unsigned-integer-size(8), _length::bytes-size(4), message::binary>> = data
86-
message
88+
<<_flag::unsigned-integer-size(8), length::big-32, message::bytes-size(length), rest::binary>> =
89+
data
90+
91+
{message, rest}
8792
end
8893

8994
@doc """
@@ -92,13 +97,16 @@ defmodule GRPC.Message do
9297
## Examples
9398
9499
iex> GRPC.Message.from_data(%{compressor: nil}, <<0, 0, 0, 0, 8, 1, 2, 3, 4, 5, 6, 7, 8>>)
95-
{:ok, <<1, 2, 3, 4, 5, 6, 7, 8>>}
100+
{:ok, <<1, 2, 3, 4, 5, 6, 7, 8>>, <<>>}
96101
"""
97-
@spec from_data(map, binary) :: {:ok, binary} | {:error, GRPC.RPCError.t()}
102+
@spec from_data(map, binary) ::
103+
{:ok, message :: binary, rest :: binary}
104+
| {:trailers, map, rest :: binary}
105+
| {:error, GRPC.RPCError.t()}
98106
def from_data(%{compressor: nil}, data) do
99107
case data do
100-
<<0, _length::bytes-size(4), message::binary>> ->
101-
{:ok, message}
108+
<<0, length::big-32, message::bytes-size(length), rest::binary>> ->
109+
{:ok, message, rest}
102110

103111
<<1, _length::bytes-size(4), _::binary>> ->
104112
{:error,
@@ -107,24 +115,39 @@ defmodule GRPC.Message do
107115
message: "Compressed flag is set, but not specified in headers."
108116
)}
109117

118+
<<@trailers_flag, length::big-32, message::bytes-size(length), rest::binary>> ->
119+
{:trailers, parse_trailers(message), rest}
120+
110121
_ ->
111122
{:error, RPCError.exception(status: :invalid_argument, message: "Message is malformed.")}
112123
end
113124
end
114125

115126
def from_data(%{compressor: compressor}, data) do
116127
case data do
117-
<<1, _length::bytes-size(4), message::binary>> ->
118-
{:ok, compressor.decompress(message)}
128+
<<1, length::big-32, message::bytes-size(length), rest::binary>> ->
129+
{:ok, compressor.decompress(message), rest}
119130

120-
<<0, _length::bytes-size(4), message::binary>> ->
121-
{:ok, message}
131+
<<0, length::big-32, message::bytes-size(length), rest::binary>> ->
132+
{:ok, message, rest}
133+
134+
<<@trailers_flag, length::big-32, message::bytes-size(length), rest::binary>> ->
135+
{:trailers, parse_trailers(message), rest}
122136

123137
_ ->
124138
{:error, RPCError.exception(status: :invalid_argument, message: "Message is malformed.")}
125139
end
126140
end
127141

142+
defp parse_trailers(data) do
143+
data
144+
|> String.split("\r\n")
145+
|> Enum.reduce(%{}, fn line, acc ->
146+
[k, v] = String.split(line, ":", parts: 2)
147+
Map.put(acc, k, String.trim(v))
148+
end)
149+
end
150+
128151
def from_frame(bin), do: from_frame(bin, [])
129152
def from_frame(<<>>, acc), do: Enum.reverse(acc)
130153

@@ -166,7 +189,10 @@ defmodule GRPC.Message do
166189
<<flag::unsigned-integer-size(8), length::unsigned-integer-size(32),
167190
message::bytes-size(length), rest::binary>>
168191
) do
169-
{{flag, message}, rest}
192+
case flag do
193+
@trailers_flag -> {{:trailers, message}, rest}
194+
_ -> {{flag, message}, rest}
195+
end
170196
end
171197

172198
def get_message(_) do
@@ -175,6 +201,10 @@ defmodule GRPC.Message do
175201

176202
def get_message(data, nil = _compressor) do
177203
case data do
204+
<<@trailers_flag::8, length::unsigned-integer-size(32), message::bytes-size(length),
205+
rest::binary>> ->
206+
{{:trailers, message}, rest}
207+
178208
<<flag::unsigned-integer-size(8), length::unsigned-integer-size(32),
179209
message::bytes-size(length), rest::binary>> ->
180210
{{flag, message}, rest}
@@ -192,6 +222,10 @@ defmodule GRPC.Message do
192222
<<0::8, length::unsigned-integer-32, message::bytes-size(length), rest::binary>> ->
193223
{{0, message}, rest}
194224

225+
<<@trailers_flag::8, length::unsigned-integer-32, message::bytes-size(length),
226+
rest::binary>> ->
227+
{{:trailers, message}, rest}
228+
195229
_other ->
196230
data
197231
end

grpc_core/test/grpc/message_test.exs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ defmodule GRPC.MessageTest do
77
message = String.duplicate("foo", 100)
88

99
# 10th byte is the operating system ID
10-
assert {:ok,
11-
data =
12-
<<1, 0, 0, 0, 27, 31, 139, 8, 0, 0, 0, 0, 0, 0, _, 75, 203, 207, 79, 27, 69, 196,
13-
33, 0, 41, 249, 122, 62, 44, 1, 0, 0>>,
14-
32} = GRPC.Message.to_data(message, %{compressor: GRPC.Compressor.Gzip})
15-
16-
assert {:ok, message} == GRPC.Message.from_data(%{compressor: GRPC.Compressor.Gzip}, data)
10+
assert {
11+
:ok,
12+
data =
13+
<<1, 0, 0, 0, 27, 31, 139, 8, 0, 0, 0, 0, 0, 0, _, 75, 203, 207, 79, 27, 69, 196,
14+
33, 0, 41, 249, 122, 62, 44, 1, 0, 0>>,
15+
32
16+
} =
17+
GRPC.Message.to_data(message, %{compressor: GRPC.Compressor.Gzip})
18+
19+
assert {:ok, message, <<>>} ==
20+
GRPC.Message.from_data(%{compressor: GRPC.Compressor.Gzip}, data)
1721
end
1822

1923
test "iodata can be passed to and returned from `to_data/2`" do
@@ -25,13 +29,13 @@ defmodule GRPC.MessageTest do
2529
assert is_list(data)
2630
binary = IO.iodata_to_binary(data)
2731

28-
assert {:ok, IO.iodata_to_binary(message)} ==
32+
assert {:ok, IO.iodata_to_binary(message), <<>>} ==
2933
GRPC.Message.from_data(%{compressor: GRPC.Compressor.Gzip}, binary)
3034
end
3135

3236
test "to_data/2 invokes codec.pack_for_channel on the gRPC body if codec implements it" do
3337
message = "web-text"
3438
assert {:ok, base64_payload, _} = GRPC.Message.to_data(message, %{codec: GRPC.Codec.WebText})
35-
assert message == GRPC.Message.from_data(Base.decode64!(base64_payload))
39+
assert {message, ""} == GRPC.Message.from_data(Base.decode64!(base64_payload))
3640
end
3741
end

grpc_server/lib/grpc/server.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ defmodule GRPC.Server do
300300
end
301301

302302
case GRPC.Message.from_data(stream, body) do
303-
{:ok, message} ->
303+
{:ok, message, <<>>} ->
304304
request = codec.decode(message, req_mod)
305305

306306
call_with_interceptors(res_stream, func_name, stream, request)

0 commit comments

Comments
 (0)