Skip to content

Commit 5820c2b

Browse files
committed
feat(mint): add automatic reconnection with exponential backoff
Implements retry logic in GRPC.Client.Adapters.Mint.ConnectionProcess so that dropped HTTP/2 connections are transparently re-established without requiring a new channel. Changes: - ConnectionProcess.State: add :scheme, :host, :port, :connect_opts, :retry, and :retry_attempt fields so the process can reconnect autonomously. - ConnectionProcess.init/1: persists connection params in state; pops :retry from opts before forwarding to Mint.HTTP.connect/4. - ConnectionProcess: add attempt_reconnect/1, handle_info(:reconnect), and retry_timeout/1 (exponential backoff, base 1.6, capped at 120s, with jitter). finish_all_pending_requests/1 triggers reconnection when retry > 0 instead of immediately notifying the parent. - Mint.connect/2: extracts :retry from adapter opts and passes it through to ConnectionProcess; documents the new option. - Remove Stub.retry_timeout/1 — dead code that was never called and had a broken guard making it fail for curr >= 11. The correct implementation now lives in ConnectionProcess. Tests: - connection_process_test.exs: unit tests for retry_timeout/1, immediate reconnect on drop, exhaustion notification, scheduled retry on failure, and successful reconnect resetting the attempt counter. - mint_test.exs: integration tests verifying :retry propagation to state and correct default of 0. Docs: - README.md / grpc_client/README.md: document the :retry option under the Mint adapter section with usage example and behaviour notes. Made-with: Cursor
1 parent abc5e1e commit 5820c2b

8 files changed

Lines changed: 280 additions & 26 deletions

File tree

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,23 @@ config :grpc, GRPC.Client.Adapters.Mint,
399399

400400
The accepted options are the same as [`Mint.HTTP.connect/4`](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options).
401401

402+
#### Automatic Reconnection
403+
404+
The Mint adapter supports automatic reconnection when the underlying HTTP/2 connection drops (e.g. server restart, network interruption). To enable it, pass the `:retry` option via `adapter_opts`:
405+
406+
```elixir
407+
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051",
408+
...> adapter: GRPC.Client.Adapters.Mint,
409+
...> adapter_opts: [retry: 5]
410+
...> )
411+
```
412+
413+
When the connection drops, the adapter will attempt to reconnect up to `retry` times using **exponential backoff with jitter**. The delay starts at ~1 second and grows up to a maximum of 120 seconds. If all attempts are exhausted, the parent process receives a `{:elixir_grpc, :connection_down, pid}` message.
414+
415+
By default, `:retry` is `0` (no reconnection attempts).
416+
417+
> **Note:** Any in-flight requests at the time of the drop will fail immediately. Reconnection only re-establishes the transport connection — it does not replay requests.
418+
402419
---
403420

404421
### **HTTP Transcoding**

grpc_client/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,23 @@ config :grpc, GRPC.Client.Adapters.Mint,
193193

194194
The accepted options are the same as [`Mint.HTTP.connect/4`](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options).
195195

196+
#### Automatic Reconnection
197+
198+
The Mint adapter supports automatic reconnection when the underlying HTTP/2 connection drops (e.g. server restart, network interruption). To enable it, pass the `:retry` option via `adapter_opts`:
199+
200+
```elixir
201+
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051",
202+
...> adapter: GRPC.Client.Adapters.Mint,
203+
...> adapter_opts: [retry: 5]
204+
...> )
205+
```
206+
207+
When the connection drops, the adapter will attempt to reconnect up to `retry` times using **exponential backoff with jitter**. The delay starts at ~1 second and grows up to a maximum of 120 seconds. If all attempts are exhausted, the parent process receives a `{:elixir_grpc, :connection_down, pid}` message.
208+
209+
By default, `:retry` is `0` (no reconnection attempts).
210+
211+
> **Note:** Any in-flight requests at the time of the drop will fail immediately. Reconnection only re-establishes the transport connection — it does not replay requests.
212+
196213
---
197214

198215
## Contributing

grpc_client/lib/grpc/client/adapters/mint.ex

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,20 @@ defmodule GRPC.Client.Adapters.Mint do
2828
window size ensures that the number of packages exchanges is smaller, thus speeding up the requests by reducing the
2929
amount of networks round trip, with the cost of having larger packages reaching the server per connection.
3030
Check [Mint.HTTP2.setting() type](https://hexdocs.pm/mint/Mint.HTTP2.html#t:setting/0) for additional configs.
31+
* `:retry`: Number of reconnection attempts when the connection drops. Defaults to `0` (no retries).
32+
Uses exponential backoff with jitter between attempts.
3133
"""
3234
@impl true
3335
def connect(%{host: host, port: port} = channel, opts \\ []) do
34-
# Added :config_options to facilitate testing.
3536
{config_opts, opts} = Keyword.pop(opts, :config_options, [])
37+
{retry, opts} = Keyword.pop(opts, :retry, 0)
3638
module_opts = Application.get_env(:grpc, __MODULE__, config_opts)
3739

38-
opts = connect_opts(channel, opts) |> merge_opts(module_opts)
40+
opts =
41+
channel
42+
|> connect_opts(opts)
43+
|> merge_opts(module_opts)
44+
|> Keyword.put(:retry, retry)
3945

4046
Process.flag(:trap_exit, true)
4147

grpc_client/lib/grpc/client/adapters/mint/connection_process/connection_process.ex

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,20 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
7676

7777
@impl true
7878
def init({scheme, host, port, opts}) do
79+
{retry, opts} = Keyword.pop(opts, :retry, 0)
80+
7981
case Mint.HTTP.connect(scheme, host, port, opts) do
8082
{:ok, conn} ->
81-
{:ok, State.new(conn, opts[:parent])}
83+
state_opts = [
84+
parent: opts[:parent],
85+
scheme: scheme,
86+
host: host,
87+
port: port,
88+
connect_opts: opts,
89+
retry: retry
90+
]
91+
92+
{:ok, State.new(conn, state_opts)}
8293

8394
{:error, reason} ->
8495
Logger.error(
@@ -178,6 +189,10 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
178189
end
179190

180191
@impl true
192+
def handle_info(:reconnect, state) do
193+
attempt_reconnect(state)
194+
end
195+
181196
def handle_info(message, state) do
182197
case Mint.HTTP.stream(state.conn, message) do
183198
:unknown ->
@@ -378,24 +393,76 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
378393
new_state
379394
end)
380395

381-
# Inform the parent that the connection is down
382-
send(new_state.parent, {:elixir_grpc, :connection_down, self()})
383-
384396
new_state.requests
385397
|> Enum.each(fn {ref, _} ->
386398
new_state
387399
|> State.stream_response_pid(ref)
388400
|> send_connection_close_and_end_stream_response()
389401
end)
390402

391-
{:noreply, State.update_request_stream_queue(%{new_state | requests: %{}}, :queue.new())}
403+
clean_state = State.update_request_stream_queue(%{new_state | requests: %{}}, :queue.new())
404+
405+
if clean_state.retry > 0 do
406+
attempt_reconnect(clean_state)
407+
else
408+
send(clean_state.parent, {:elixir_grpc, :connection_down, self()})
409+
{:noreply, clean_state}
410+
end
392411
end
393412

394413
defp send_connection_close_and_end_stream_response(pid) do
395414
:ok = StreamResponseProcess.consume(pid, :error, @connection_closed_error)
396415
:ok = StreamResponseProcess.done(pid)
397416
end
398417

418+
defp attempt_reconnect(%{retry: max, retry_attempt: attempt} = state)
419+
when attempt >= max do
420+
Logger.warning(
421+
"Connection retry exhausted (#{attempt}/#{max}) for #{state.scheme}://#{state.host}:#{state.port}"
422+
)
423+
424+
send(state.parent, {:elixir_grpc, :connection_down, self()})
425+
{:noreply, state}
426+
end
427+
428+
defp attempt_reconnect(state) do
429+
next_attempt = state.retry_attempt + 1
430+
431+
Logger.info(
432+
"Attempting reconnection #{next_attempt}/#{state.retry} to #{state.scheme}://#{state.host}:#{state.port}"
433+
)
434+
435+
case Mint.HTTP.connect(state.scheme, state.host, state.port, state.connect_opts) do
436+
{:ok, conn} ->
437+
Logger.info("Reconnected successfully to #{state.scheme}://#{state.host}:#{state.port}")
438+
439+
new_state = %{state | conn: conn, retry_attempt: 0}
440+
{:noreply, new_state}
441+
442+
{:error, reason} ->
443+
Logger.warning(
444+
"Reconnection attempt #{next_attempt}/#{state.retry} failed: #{inspect(reason)}"
445+
)
446+
447+
timeout = retry_timeout(next_attempt)
448+
Process.send_after(self(), :reconnect, timeout)
449+
{:noreply, %{state | retry_attempt: next_attempt}}
450+
end
451+
end
452+
453+
@doc false
454+
def retry_timeout(attempt) do
455+
timeout =
456+
if attempt < 11 do
457+
:math.pow(1.6, attempt - 1) * 1000
458+
else
459+
120_000
460+
end
461+
462+
jitter = (:rand.uniform_real() - 0.5) / 2.5
463+
round(timeout + jitter * timeout)
464+
end
465+
399466
defp check_connection_status(state) do
400467
if Mint.HTTP.open?(state.conn) do
401468
check_request_stream_queue(state)

grpc_client/lib/grpc/client/adapters/mint/connection_process/state.ex

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,42 @@
11
defmodule GRPC.Client.Adapters.Mint.ConnectionProcess.State do
22
@moduledoc false
33

4-
defstruct [:conn, :parent, requests: %{}, request_stream_queue: :queue.new()]
4+
defstruct [
5+
:conn,
6+
:parent,
7+
:scheme,
8+
:host,
9+
:port,
10+
:connect_opts,
11+
requests: %{},
12+
request_stream_queue: :queue.new(),
13+
retry: 0,
14+
retry_attempt: 0
15+
]
516

617
@type t :: %__MODULE__{
718
conn: Mint.HTTP.t(),
819
requests: map(),
9-
parent: pid()
20+
parent: pid(),
21+
scheme: Mint.Types.scheme() | nil,
22+
host: Mint.Types.address() | nil,
23+
port: :inet.port_number() | nil,
24+
connect_opts: keyword(),
25+
retry: non_neg_integer(),
26+
retry_attempt: non_neg_integer()
1027
}
1128

12-
def new(conn, parent) do
13-
%__MODULE__{conn: conn, request_stream_queue: :queue.new(), parent: parent}
29+
def new(conn, opts) do
30+
%__MODULE__{
31+
conn: conn,
32+
request_stream_queue: :queue.new(),
33+
parent: opts[:parent],
34+
scheme: opts[:scheme],
35+
host: opts[:host],
36+
port: opts[:port],
37+
connect_opts: opts[:connect_opts] || [],
38+
retry: opts[:retry] || 0
39+
}
1440
end
1541

1642
def update_conn(state, conn) do

grpc_client/lib/grpc/stub.ex

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -272,19 +272,6 @@ defmodule GRPC.Stub do
272272
connect("#{ip_type}:#{host}:#{port}", opts)
273273
end
274274

275-
def retry_timeout(curr) when curr < 11 do
276-
timeout =
277-
if curr < 11 do
278-
:math.pow(1.6, curr - 1) * 1000
279-
else
280-
120_000
281-
end
282-
283-
jitter = (:rand.uniform_real() - 0.5) / 2.5
284-
285-
round(timeout + jitter * timeout)
286-
end
287-
288275
@doc """
289276
Disconnects the adapter and frees any resources the adapter is consuming
290277
"""

grpc_client/test/grpc/adapters/mint/connection_process_test.exs

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,53 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
363363
assert_receive {:elixir_grpc, :connection_down, pid}, 500
364364
assert pid == self()
365365
end
366+
367+
test "does not attempt reconnect when retry is 0", %{
368+
state: state
369+
} do
370+
socket = state.conn.socket
371+
tcp_message = {:tcp_closed, socket}
372+
373+
assert {:noreply, new_state} = ConnectionProcess.handle_info(tcp_message, state)
374+
assert new_state.conn.state == :closed
375+
assert new_state.retry == 0
376+
assert_receive {:elixir_grpc, :connection_down, _pid}, 500
377+
end
378+
end
379+
380+
describe "handle_info - connection_closed - with retry" do
381+
setup :valid_connection_with_retry
382+
383+
test "attempts reconnect when retry > 0 and connection drops", %{
384+
state: state
385+
} do
386+
socket = state.conn.socket
387+
tcp_message = {:tcp_closed, socket}
388+
389+
assert {:noreply, new_state} = ConnectionProcess.handle_info(tcp_message, state)
390+
assert new_state.conn.state != :closed
391+
assert new_state.retry_attempt == 0
392+
refute_receive {:elixir_grpc, :connection_down, _pid}, 200
393+
end
394+
395+
test "notifies parent when all retry attempts are exhausted", %{
396+
state: state,
397+
port: port
398+
} do
399+
:ok = GRPC.Server.stop(FeatureServer)
400+
401+
logs =
402+
capture_log(fn ->
403+
exhausted_state = %{state | retry: 1, retry_attempt: 1}
404+
result = ConnectionProcess.handle_info(:reconnect, exhausted_state)
405+
assert {:noreply, _} = result
406+
assert_receive {:elixir_grpc, :connection_down, _pid}, 500
407+
end)
408+
409+
assert logs =~ "Connection retry exhausted"
410+
411+
{:ok, _, _} = GRPC.Server.start(FeatureServer, port)
412+
end
366413
end
367414

368415
describe "handle_info - connection_closed - with request" do
@@ -417,8 +464,68 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
417464
end
418465
end
419466

420-
defp valid_connection(%{port: port}) do
421-
{:ok, pid} = ConnectionProcess.start_link(:http, "localhost", port, protocols: [:http2])
467+
describe "retry_timeout/1" do
468+
test "returns exponentially increasing timeouts" do
469+
t1 = ConnectionProcess.retry_timeout(1)
470+
t2 = ConnectionProcess.retry_timeout(2)
471+
t5 = ConnectionProcess.retry_timeout(5)
472+
473+
assert t1 >= 800 and t1 <= 1200
474+
assert t2 > t1
475+
assert t5 > t2
476+
end
477+
478+
test "caps at 120 seconds for attempt >= 11" do
479+
t11 = ConnectionProcess.retry_timeout(11)
480+
t15 = ConnectionProcess.retry_timeout(15)
481+
482+
assert t11 >= 96_000 and t11 <= 144_000
483+
assert t15 >= 96_000 and t15 <= 144_000
484+
end
485+
end
486+
487+
describe "handle_info :reconnect" do
488+
setup :valid_connection_with_retry
489+
490+
test "successfully reconnects when server is available", %{
491+
state: state
492+
} do
493+
failed_state = %{state | retry_attempt: 1}
494+
495+
logs =
496+
capture_log(fn ->
497+
assert {:noreply, new_state} = ConnectionProcess.handle_info(:reconnect, failed_state)
498+
assert Mint.HTTP.open?(new_state.conn)
499+
assert new_state.retry_attempt == 0
500+
end)
501+
502+
assert logs =~ "Reconnected successfully"
503+
end
504+
505+
test "schedules another reconnect when server is unavailable", %{
506+
state: state,
507+
port: port
508+
} do
509+
:ok = GRPC.Server.stop(FeatureServer)
510+
511+
logs =
512+
capture_log(fn ->
513+
failed_state = %{state | retry_attempt: 0}
514+
assert {:noreply, new_state} = ConnectionProcess.handle_info(:reconnect, failed_state)
515+
assert new_state.retry_attempt == 1
516+
assert_receive :reconnect, 5_000
517+
end)
518+
519+
assert logs =~ "Reconnection attempt 1/"
520+
521+
{:ok, _, _} = GRPC.Server.start(FeatureServer, port)
522+
end
523+
end
524+
525+
defp valid_connection(%{port: port}, opts \\ []) do
526+
{:ok, pid} =
527+
ConnectionProcess.start_link(:http, "localhost", port, Keyword.merge([protocols: [:http2]], opts))
528+
422529
state = :sys.get_state(pid)
423530
version = Application.spec(:grpc_client) |> Keyword.get(:vsn)
424531

@@ -431,6 +538,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
431538
%{
432539
process_pid: pid,
433540
state: state,
541+
port: port,
434542
request: {"POST", "/routeguide.RouteGuide/RecordRoute", headers}
435543
}
436544
end
@@ -455,4 +563,6 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
455563

456564
%{state | requests: %{request_ref => %{request_ref_state | stream_response_pid: test_pid}}}
457565
end
566+
567+
defp valid_connection_with_retry(ctx), do: valid_connection(ctx, retry: 3)
458568
end

0 commit comments

Comments
 (0)