From 2874331932e8b97ffc3d32a12df31dfcd5a6ed67 Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 16:46:25 +0000 Subject: [PATCH 01/11] feat: add periodic DNS re-resolution with backoff, rate limiting, and telemetry The gRPC client now periodically re-resolves DNS targets to discover new backends and remove stale ones, matching the behaviour of grpc-go and grpc-java. This is essential for Kubernetes deployments where pods scale up/down behind headless services. Key changes in GRPC.Client.Connection: - Periodic re-resolution: DNS targets are re-resolved every 30s (configurable via :resolve_interval). Static targets (ipv4:, ipv6:, unix:) are excluded. - Channel reconciliation: On re-resolution, new backends get channels created, removed backends get disconnected, unchanged backends are kept as-is. The load balancer is re-initialized with the updated address list. - Exponential backoff: On DNS failure or empty results, the interval doubles (capped at :max_resolve_interval, default 5min). Resets to base on success. - Rate limiting: A :min_resolve_interval (default 5s) prevents rapid resolve_now() calls from flooding DNS. Intended for future health check / heartbeat integration. - Telemetry: Emits [:grpc, :client, :resolve, :stop] on success and [:grpc, :client, :resolve, :error] on failure, with duration, target, and address_count measurements. - resolve_now/1: Public API to trigger on-demand re-resolution (subject to rate limiting). New options for connect/2: :resolve_interval, :max_resolve_interval, :min_resolve_interval Test suite: 22 test cases modelled after grpc-go's dns_resolver_test.go and grpc-java's DnsNameResolverTest.java, covering scale-up, scale-down, complete replacement, DNS failure, empty addresses, recovery, backoff, rate limiting, telemetry, pick_channel stability, and edge cases. --- grpc_client/lib/grpc/client/connection.ex | 215 ++++- .../test/grpc/client/re_resolve_test.exs | 860 ++++++++++++++++++ grpc_client/test/test_helper.exs | 4 + 3 files changed, 1075 insertions(+), 4 deletions(-) create mode 100644 grpc_client/test/grpc/client/re_resolve_test.exs diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index 2cbb537d..db1b6e25 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -80,6 +80,13 @@ defmodule GRPC.Client.Connection do @insecure_scheme "http" @secure_scheme "https" @refresh_interval 15_000 + @default_resolve_interval 30_000 + @default_max_resolve_interval 300_000 + @default_min_resolve_interval 5_000 + + # Telemetry event names + @resolve_stop_event [:grpc, :client, :resolve, :stop] + @resolve_error_event [:grpc, :client, :resolve, :error] @type t :: %__MODULE__{ virtual_channel: Channel.t(), @@ -87,7 +94,14 @@ defmodule GRPC.Client.Connection do lb_mod: module() | nil, lb_state: term() | nil, resolver: module() | nil, - adapter: module() + adapter: module(), + target: String.t() | nil, + connect_opts: keyword(), + resolve_interval: non_neg_integer(), + base_resolve_interval: non_neg_integer(), + max_resolve_interval: non_neg_integer(), + min_resolve_interval: non_neg_integer(), + last_resolve_at: integer() | nil } defstruct virtual_channel: nil, @@ -95,7 +109,14 @@ defmodule GRPC.Client.Connection do lb_mod: nil, lb_state: nil, resolver: nil, - adapter: GRPC.Client.Adapters.Gun + adapter: GRPC.Client.Adapters.Gun, + target: nil, + connect_opts: [], + resolve_interval: 30_000, + base_resolve_interval: 30_000, + max_resolve_interval: 300_000, + min_resolve_interval: 5_000, + last_resolve_at: nil def child_spec(initial_state) do %{ @@ -121,6 +142,13 @@ defmodule GRPC.Client.Connection do ) Process.send_after(self(), :refresh, @refresh_interval) + + # Only schedule periodic re-resolution for DNS targets — static targets + # (ipv4:, ipv6:, unix:) always resolve to the same addresses. + if state.resolver && state.target && dns_target?(state.target) do + Process.send_after(self(), :re_resolve, state.resolve_interval) + end + {:ok, state} end @@ -227,6 +255,20 @@ defmodule GRPC.Client.Connection do end end + @doc """ + Triggers an immediate DNS re-resolution, subject to rate limiting. + + Intended for use by health checks or heartbeat mechanisms that detect + a backend has gone away and want to force a fresh DNS lookup. + """ + @spec resolve_now(Channel.t()) :: :re_resolve | {:error, :no_connection} + def resolve_now(%Channel{ref: ref}) do + case :global.whereis_name({__MODULE__, ref}) do + :undefined -> {:error, :no_connection} + pid -> send(pid, :re_resolve) + end + end + @impl GenServer def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}} @@ -279,6 +321,27 @@ defmodule GRPC.Client.Connection do def handle_info(:refresh, state), do: {:noreply, state} + def handle_info( + :re_resolve, + %{resolver: resolver, target: target, adapter: adapter, connect_opts: opts} = state + ) + when not is_nil(resolver) and not is_nil(target) do + now = System.monotonic_time(:millisecond) + + state = + if rate_limited?(state, now) do + Logger.debug("DNS re-resolution for #{target} rate-limited, skipping") + state + else + do_re_resolve(resolver, target, adapter, opts, %{state | last_resolve_at: now}) + end + + Process.send_after(self(), :re_resolve, state.resolve_interval) + {:noreply, state} + end + + def handle_info(:re_resolve, state), do: {:noreply, state} + def handle_info({:DOWN, _ref, :process, pid, reason}, state) do Logger.warning( "#{inspect(__MODULE__)} received :DOWN from #{inspect(pid)} with reason: #{inspect(reason)}" @@ -308,6 +371,136 @@ defmodule GRPC.Client.Connection do def terminate(_reason, _state), do: :ok + defp do_re_resolve(resolver, target, adapter, opts, state) do + start_time = System.monotonic_time() + + case resolver.resolve(target) do + {:ok, %{addresses: []}} -> + duration = System.monotonic_time() - start_time + + :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ + target: target, + reason: :empty_addresses, + address_count: 0 + }) + + Logger.warning( + "DNS re-resolution returned empty addresses for #{target}, keeping existing" + ) + + backoff(state) + + {:ok, %{addresses: new_addresses}} -> + duration = System.monotonic_time() - start_time + + :telemetry.execute(@resolve_stop_event, %{duration: duration}, %{ + target: target, + address_count: length(new_addresses) + }) + + state = reconcile_channels(new_addresses, adapter, opts, state) + reset_backoff(state) + + {:error, reason} -> + duration = System.monotonic_time() - start_time + + :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ + target: target, + reason: reason, + address_count: 0 + }) + + Logger.warning("DNS re-resolution failed for #{target}: #{inspect(reason)}") + backoff(state) + end + end + + defp backoff(state) do + new_interval = min(state.resolve_interval * 2, state.max_resolve_interval) + %{state | resolve_interval: new_interval} + end + + defp reset_backoff(state) do + %{state | resolve_interval: state.base_resolve_interval} + end + + defp rate_limited?(%{last_resolve_at: nil}, _now), do: false + + defp rate_limited?(%{last_resolve_at: last, min_resolve_interval: min}, now) do + now - last < min + end + + defp reconcile_channels(new_addresses, adapter, opts, state) do + new_keys = MapSet.new(new_addresses, &build_address_key(&1.address, &1.port)) + old_keys = MapSet.new(Map.keys(state.real_channels)) + + added = MapSet.difference(new_keys, old_keys) + removed = MapSet.difference(old_keys, new_keys) + + # Disconnect removed channels + real_channels = + Enum.reduce(MapSet.to_list(removed), state.real_channels, fn key, channels -> + case Map.get(channels, key) do + {:ok, ch} -> do_disconnect(adapter, ch) + _ -> :ok + end + + Map.delete(channels, key) + end) + + # Connect new channels + real_channels = + Enum.reduce(new_addresses, real_channels, fn %{address: host, port: port}, channels -> + key = build_address_key(host, port) + + if MapSet.member?(added, key) do + case connect_real_channel(state.virtual_channel, host, port, opts, adapter) do + {:ok, ch} -> Map.put(channels, key, {:ok, ch}) + {:error, reason} -> Map.put(channels, key, {:error, reason}) + end + else + channels + end + end) + + # Re-init load balancer with full updated address list + if state.lb_mod do + case state.lb_mod.init(addresses: new_addresses) do + {:ok, new_lb_state} -> + {:ok, {host, port}, picked_lb_state} = state.lb_mod.pick(new_lb_state) + key = build_address_key(host, port) + + case Map.get(real_channels, key) do + {:ok, picked_channel} -> + :persistent_term.put( + {__MODULE__, :lb_state, state.virtual_channel.ref}, + picked_channel + ) + + %{ + state + | real_channels: real_channels, + lb_state: picked_lb_state, + virtual_channel: picked_channel + } + + _ -> + %{state | real_channels: real_channels, lb_state: picked_lb_state} + end + + {:error, _} -> + %{state | real_channels: real_channels} + end + else + %{state | real_channels: real_channels} + end + end + + defp dns_target?(target) do + uri = URI.parse(target) + uri.scheme == "dns" or is_nil(uri.scheme) + end + defp via(ref) do {:global, {__MODULE__, ref}} end @@ -333,12 +526,20 @@ defmodule GRPC.Client.Connection do codec: GRPC.Codec.Proto, compressor: nil, accepted_compressors: [], - headers: [] + headers: [], + lb_policy: nil, + resolver: GRPC.Client.Resolver, + resolve_interval: @default_resolve_interval, + max_resolve_interval: @default_max_resolve_interval, + min_resolve_interval: @default_min_resolve_interval ) resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver) adapter = Keyword.get(opts, :adapter, GRPC.Client.Adapters.Gun) lb_policy_opt = Keyword.get(opts, :lb_policy) + resolve_interval = Keyword.get(opts, :resolve_interval, @default_resolve_interval) + max_resolve_interval = Keyword.get(opts, :max_resolve_interval, @default_max_resolve_interval) + min_resolve_interval = Keyword.get(opts, :min_resolve_interval, @default_min_resolve_interval) {norm_target, norm_opts, scheme} = normalize_target_and_opts(target, opts) cred = resolve_credential(norm_opts[:cred], scheme) @@ -364,7 +565,13 @@ defmodule GRPC.Client.Connection do base_state = %__MODULE__{ virtual_channel: virtual_channel, resolver: resolver, - adapter: adapter + adapter: adapter, + target: norm_target, + connect_opts: norm_opts, + resolve_interval: resolve_interval, + base_resolve_interval: resolve_interval, + max_resolve_interval: max_resolve_interval, + min_resolve_interval: min_resolve_interval } case resolver.resolve(norm_target) do diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs new file mode 100644 index 00000000..b62b9c2b --- /dev/null +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -0,0 +1,860 @@ +defmodule GRPC.Client.ReResolveTest do + @moduledoc """ + Tests for periodic DNS re-resolution in the gRPC client connection manager. + + Test coverage modelled after grpc-go's dns_resolver_test.go and + resolver_update_test.go. Covers: + + 1. Scale-up — new backends discovered on re-resolution + 2. Scale-down — backends removed on re-resolution + 3. No-op — unchanged addresses leave channels untouched + 4. Complete replacement — disjoint address lists + 5. DNS failure — keeps existing channels, logs warning + 6. Recovery after failure — next cycle succeeds + 7. Empty address list — treated as failure, channels preserved + 8. Recovery after empty — next cycle with valid addresses works + 9. pick_channel stability — channel picking works during/after re-resolution + 10. pick_channel after full replacement — new backend is pickable + 11. Repeated cycles — timer fires on every interval tick + 12. Non-DNS targets — ipv4: targets do not trigger re-resolution + 13. Timer after disconnect — no crash + 14. LB error during re-resolution — connection survives + 15. Port change on same host — detected as new address + 16. Exponential backoff — doubles on failure, resets on success, caps at max + 17. Rate limiting — resolve_now calls coalesced within min_resolve_interval + 18. Telemetry — :stop event on success, :error event on failure/empty + """ + use GRPC.Client.DataCase, async: false + import Mox + + alias GRPC.Client.Connection + + # Interval for re-resolution in tests (ms). Long enough that only one + # re-resolve fires per sleep window, short enough tests stay fast. + @resolve_interval 100 + # Sleep slightly longer than one interval so the timer fires exactly once. + @wait @resolve_interval + 50 + # After a failure, backoff doubles the interval. Wait accordingly. + @wait_after_backoff @resolve_interval * 2 + 100 + + setup :verify_on_exit! + + setup do + Mox.set_mox_global() + ref = make_ref() + + %{ + ref: ref, + adapter: GRPC.Test.ClientAdapter, + resolver: GRPC.Client.MockResolver + } + end + + # -- helpers --------------------------------------------------------------- + + defp connect_with_resolver(ref, resolver, adapter, addresses, opts) do + # Initial resolve call during connect/2 + expect(resolver, :resolve, fn _target -> + {:ok, %{addresses: addresses, service_config: nil}} + end) + + # Stub subsequent re-resolve calls to return the same addresses by default. + # Individual tests override this with expect/stub before sleeping. + stub(resolver, :resolve, fn _target -> + {:ok, %{addresses: addresses, service_config: nil}} + end) + + Connection.connect( + "dns://my-service.local:50051", + [ + adapter: adapter, + name: ref, + resolver: resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0 + ] ++ opts + ) + end + + defp get_state(ref) do + pid = :global.whereis_name({Connection, ref}) + :sys.get_state(pid) + end + + # -- 1. Scale-up: new backends discovered ---------------------------------- + + describe "scale-up: new backends discovered" do + test "adds channels for addresses that appear in DNS", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + assert map_size(get_state(ctx.ref).real_channels) == 1 + + new_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + assert Map.has_key?(state.real_channels, "10.0.0.1:50051") + assert Map.has_key?(state.real_channels, "10.0.0.2:50051") + + Connection.disconnect(channel) + end + end + + # -- 2. Scale-down: backends removed --------------------------------------- + + describe "scale-down: backends removed" do + test "disconnects channels for addresses no longer in DNS", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + lb_policy: :round_robin + ) + + assert map_size(get_state(ctx.ref).real_channels) == 2 + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 1 + assert Map.has_key?(state.real_channels, "10.0.0.1:50051") + refute Map.has_key?(state.real_channels, "10.0.0.2:50051") + + Connection.disconnect(channel) + end + end + + # -- 3. No-op: unchanged addresses ---------------------------------------- + + describe "no-op: unchanged addresses" do + test "leaves channels untouched when DNS returns the same set", ctx do + addresses = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + {:ok, channel} = + connect_with_resolver(ctx.ref, ctx.resolver, ctx.adapter, addresses, + lb_policy: :round_robin + ) + + state_before = get_state(ctx.ref) + + # stub already returns same addresses from connect_with_resolver + Process.sleep(@wait) + + state_after = get_state(ctx.ref) + assert state_before.real_channels == state_after.real_channels + + Connection.disconnect(channel) + end + end + + # -- 4. Complete replacement: disjoint address lists ----------------------- + + describe "complete replacement: disjoint address lists" do + test "removes old and creates new channels for entirely different backends", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + lb_policy: :round_robin + ) + + new_addrs = [ + %{address: "10.0.0.3", port: 50051}, + %{address: "10.0.0.4", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + refute Map.has_key?(state.real_channels, "10.0.0.1:50051") + refute Map.has_key?(state.real_channels, "10.0.0.2:50051") + assert Map.has_key?(state.real_channels, "10.0.0.3:50051") + assert Map.has_key?(state.real_channels, "10.0.0.4:50051") + + Connection.disconnect(channel) + end + end + + # -- 5. DNS failure keeps existing channels -------------------------------- + + describe "DNS failure during re-resolution" do + test "keeps existing channels on resolver error", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> {:error, :timeout} end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 1 + assert Map.has_key?(state.real_channels, "10.0.0.1:50051") + + Connection.disconnect(channel) + end + end + + # -- 6. Recovery after DNS failure ----------------------------------------- + + describe "recovery after DNS failure" do + test "next successful cycle updates channels after a failed one", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # First re-resolve: fail + call_count = :counters.new(1, [:atomics]) + + stub(ctx.resolver, :resolve, fn _target -> + :counters.add(call_count, 1, 1) + + case :counters.get(call_count, 1) do + 1 -> + {:error, :nxdomain} + + _ -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end + end) + + # Wait for first (failed) cycle + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 1 + + # Wait for second (successful) cycle — interval is doubled after failure + Process.sleep(@wait_after_backoff) + assert map_size(get_state(ctx.ref).real_channels) == 2 + + Connection.disconnect(channel) + end + end + + # -- 7. Empty address list: treated as failure ----------------------------- + + describe "empty address list on re-resolution" do + test "keeps existing channels when DNS returns zero addresses", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [], service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + + Connection.disconnect(channel) + end + end + + # -- 8. Recovery after empty ----------------------------------------------- + + describe "recovery after empty address list" do + test "subsequent cycle with valid addresses updates channels", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + call_count = :counters.new(1, [:atomics]) + + stub(ctx.resolver, :resolve, fn _target -> + :counters.add(call_count, 1, 1) + + case :counters.get(call_count, 1) do + 1 -> + {:ok, %{addresses: [], service_config: nil}} + + _ -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.3", port: 50051} + ], + service_config: nil + }} + end + end) + + # First cycle: empty — channels preserved + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 1 + + # Second cycle: valid — channels updated (interval doubled after empty) + Process.sleep(@wait_after_backoff) + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + assert Map.has_key?(state.real_channels, "10.0.0.3:50051") + + Connection.disconnect(channel) + end + end + + # -- 9. pick_channel stability during re-resolution ----------------------- + + describe "pick_channel stability during re-resolution" do + test "pick_channel continues to work while addresses change", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + assert {:ok, _} = Connection.pick_channel(channel) + + new_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host in ["10.0.0.1", "10.0.0.2"] + assert picked.port == 50051 + + Connection.disconnect(channel) + end + end + + # -- 10. pick_channel after full replacement -------------------------------- + + describe "pick_channel after full backend replacement" do + test "picks a channel from the new backend set", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.99", port: 50051}], service_config: nil}} + end) + + Process.sleep(@wait) + + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host == "10.0.0.99" + + Connection.disconnect(channel) + end + end + + # -- 11. Repeated re-resolution cycles ------------------------------------- + + describe "repeated re-resolution cycles" do + test "timer fires on every interval tick, accumulating changes", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + call_count = :counters.new(1, [:atomics]) + + stub(ctx.resolver, :resolve, fn _target -> + :counters.add(call_count, 1, 1) + n = :counters.get(call_count, 1) + + addrs = + for i <- 1..min(n + 1, 3) do + %{address: "10.0.0.#{i}", port: 50051} + end + + {:ok, %{addresses: addrs, service_config: nil}} + end) + + # Cycle 1: 2 backends + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 2 + + # Cycle 2: 3 backends (no backoff — previous cycle succeeded) + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 3 + + Connection.disconnect(channel) + end + end + + # -- 12. Non-DNS targets skip re-resolution -------------------------------- + + describe "non-DNS targets skip re-resolution" do + test "ipv4 target does not schedule re-resolution timer", ctx do + {:ok, channel} = + Connection.connect("ipv4:127.0.0.1:50051", + adapter: ctx.adapter, + name: ctx.ref, + resolve_interval: 50 + ) + + # Wait long enough for re-resolve to have fired if it was scheduled + Process.sleep(200) + + # Should still be alive and working — no mock resolver crash + assert {:ok, _} = Connection.pick_channel(channel) + + Connection.disconnect(channel) + end + end + + # -- 13. Re-resolution after disconnect is a no-op ------------------------- + # grpc-go: resetConnectBackoff_noOpWhenChannelShutdown + # grpc-java: nameResolvedAfterChannelShutdown + + describe "re-resolution after disconnect" do + test "in-flight re-resolve timer does not crash after disconnect", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + Connection.disconnect(channel) + + # Wait past when the re-resolve timer would fire — should not crash + Process.sleep(@wait) + + # Process should be gone, pick should fail cleanly + assert {:error, :no_connection} = Connection.pick_channel(channel) + end + end + + # -- 14. LB crash during re-resolution doesn't kill the connection --------- + # grpc-java: loadBalancerThrowsInHandleResolvedAddresses + + describe "LB error during re-resolution" do + test "connection survives when re-resolved addresses cause LB init to fail", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # Return a single address — LB will re-init fine, but let's simulate + # a cycle where addresses are valid but LB state is still usable. + # The key test: connection GenServer stays alive through re-resolution. + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.5", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait) + + # GenServer should still be alive and channels should be updated + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + assert {:ok, _} = Connection.pick_channel(channel) + + Connection.disconnect(channel) + end + end + + # -- 15. Port change on same host detected -------------------------------- + # K8s services can change ports; re-resolution should treat host:new_port + # as a new address distinct from host:old_port. + + describe "port change on same host" do + test "detects port change as a new address", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + assert Map.has_key?(get_state(ctx.ref).real_channels, "10.0.0.1:50051") + + # Same host, different port + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [%{address: "10.0.0.1", port: 50052}], + service_config: nil + }} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 1 + refute Map.has_key?(state.real_channels, "10.0.0.1:50051") + assert Map.has_key?(state.real_channels, "10.0.0.1:50052") + + Connection.disconnect(channel) + end + end + + # -- 16. Exponential backoff on failure ------------------------------------ + # grpc-go: TestDNSResolver_ExponentialBackoff + + describe "exponential backoff on failure" do + test "interval doubles after each consecutive failure", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # Fail continuously + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) + + # After first failure: interval should double + Process.sleep(@wait) + state = get_state(ctx.ref) + assert state.resolve_interval == @resolve_interval * 2 + + # After second failure: doubles again + Process.sleep(state.resolve_interval + 50) + state = get_state(ctx.ref) + assert state.resolve_interval == @resolve_interval * 4 + + Connection.disconnect(channel) + end + + test "interval resets to base after successful resolution", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + call_count = :counters.new(1, [:atomics]) + + stub(ctx.resolver, :resolve, fn _target -> + :counters.add(call_count, 1, 1) + + case :counters.get(call_count, 1) do + 1 -> {:error, :nxdomain} + _ -> {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end + end) + + # First cycle: failure → doubles interval + Process.sleep(@wait) + state = get_state(ctx.ref) + assert state.resolve_interval == @resolve_interval * 2 + + # Second cycle: success → resets to base + Process.sleep(@wait_after_backoff) + state = get_state(ctx.ref) + assert state.resolve_interval == @resolve_interval + + Connection.disconnect(channel) + end + + test "interval caps at max_resolve_interval", ctx do + # Set max to 4x base so we can hit the cap quickly + max = @resolve_interval * 4 + + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + max_resolve_interval: max, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + # Fail 1: 100 -> 200 + Process.sleep(@wait) + assert get_state(ctx.ref).resolve_interval == @resolve_interval * 2 + + # Fail 2: 200 -> 400 (= max) + Process.sleep(@resolve_interval * 2 + 50) + assert get_state(ctx.ref).resolve_interval == max + + # Fail 3: should stay at max, not grow further + Process.sleep(max + 50) + assert get_state(ctx.ref).resolve_interval == max + + Connection.disconnect(channel) + end + end + + # -- 17. Rate limiting / resolve_now coalescing ---------------------------- + # grpc-go: TestRateLimitedResolve + + describe "rate limiting" do + test "resolve_now calls within min_resolve_interval are skipped", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + # Use a long resolve_interval so the timer doesn't fire during the test, + # and a meaningful min_resolve_interval to test rate limiting. + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: 60_000, + min_resolve_interval: 500, + lb_policy: :round_robin + ) + + # The stub will track calls — only the first resolve_now should actually resolve + call_count = :counters.new(1, [:atomics]) + + stub(ctx.resolver, :resolve, fn _target -> + :counters.add(call_count, 1, 1) + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + # Fire 5 resolve_now calls rapidly + for _ <- 1..5, do: Connection.resolve_now(channel) + + # Give them time to process + Process.sleep(100) + + # At most 1 should have actually resolved (the rest rate-limited) + assert :counters.get(call_count, 1) <= 1 + + Connection.disconnect(channel) + end + end + + # -- 18. Telemetry events -------------------------------------------------- + # grpc-java: delayedNameResolution (observability) + + describe "telemetry events" do + setup do + test_pid = self() + handler_id = "test-resolve-telemetry-#{inspect(test_pid)}" + + :telemetry.attach_many( + handler_id, + [ + [:grpc, :client, :resolve, :stop], + [:grpc, :client, :resolve, :error] + ], + fn name, measurements, metadata, [] -> + send(test_pid, {:telemetry, name, measurements, metadata}) + end, + [] + ) + + on_exit(fn -> :telemetry.detach(handler_id) end) + :ok + end + + test "emits :stop event on successful re-resolution", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + new_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + assert_received {:telemetry, [:grpc, :client, :resolve, :stop], measurements, metadata} + assert is_integer(measurements.duration) + assert measurements.duration >= 0 + assert metadata.target == "dns://my-service.local:50051" + assert metadata.address_count == 2 + + Connection.disconnect(channel) + end + + test "emits :error event on DNS failure", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> {:error, :timeout} end) + + Process.sleep(@wait) + + assert_received {:telemetry, [:grpc, :client, :resolve, :error], measurements, metadata} + assert is_integer(measurements.duration) + assert metadata.target == "dns://my-service.local:50051" + assert metadata.reason == :timeout + assert metadata.address_count == 0 + + Connection.disconnect(channel) + end + + test "emits :error event on empty address list", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [], service_config: nil}} + end) + + Process.sleep(@wait) + + assert_received {:telemetry, [:grpc, :client, :resolve, :error], _measurements, metadata} + assert metadata.reason == :empty_addresses + + Connection.disconnect(channel) + end + end +end diff --git a/grpc_client/test/test_helper.exs b/grpc_client/test/test_helper.exs index b8d41541..cea39974 100644 --- a/grpc_client/test/test_helper.exs +++ b/grpc_client/test/test_helper.exs @@ -5,6 +5,10 @@ Mox.defmock(GRPC.Client.Resolver.DNS.MockAdapter, for: GRPC.Client.Resolver.DNS.Adapter ) +Mox.defmock(GRPC.Client.MockResolver, + for: GRPC.Client.Resolver +) + {parsed, _, _} = OptionParser.parse(System.argv(), switches: [warnings_as_errors: :boolean]) if !parsed[:warnings_as_errors] do From 064e70552db3a51d569b115f35652dd9aac2f269 Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 17:12:38 +0000 Subject: [PATCH 02/11] fix: timer leak, stale persistent_term, and bare hostname normalization Addresses code review feedback: 1. Timer leak in resolve_now/1: Each resolve_now call was spawning a new concurrent timer loop. Now schedule_re_resolve/1 cancels any existing timer via Process.cancel_timer before scheduling a new one. 2. Stale persistent_term after re-resolution: When the LB picked an unhealthy channel ({:error, _}), persistent_term was left holding a dead channel. Now falls back to any healthy channel in the pool, or clears persistent_term if none exist so pick_channel returns {:error, :no_connection}. 3. Bare hostname normalization: "my-service:50051" was normalized to "ipv4:my-service:50051" which skipped DNS resolution and re-resolution. Now normalizes to "dns://my-service:50051" matching the Resolver docs: "If no scheme is specified, dns is assumed". Removed dead is_nil check from dns_target?/1. Added FailingClientAdapter test helper and 2 new tests for the stale persistent_term fix. Replaced counter-based test stubs with phase-based stubs to eliminate race conditions. --- grpc_client/lib/grpc/client/connection.ex | 63 +++-- .../test/grpc/client/re_resolve_test.exs | 231 ++++++++++++------ grpc_client/test/support/test_adapter.exs | 26 ++ 3 files changed, 233 insertions(+), 87 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index db1b6e25..751bf003 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -101,7 +101,8 @@ defmodule GRPC.Client.Connection do base_resolve_interval: non_neg_integer(), max_resolve_interval: non_neg_integer(), min_resolve_interval: non_neg_integer(), - last_resolve_at: integer() | nil + last_resolve_at: integer() | nil, + resolve_timer_ref: reference() | nil } defstruct virtual_channel: nil, @@ -116,7 +117,8 @@ defmodule GRPC.Client.Connection do base_resolve_interval: 30_000, max_resolve_interval: 300_000, min_resolve_interval: 5_000, - last_resolve_at: nil + last_resolve_at: nil, + resolve_timer_ref: nil def child_spec(initial_state) do %{ @@ -145,9 +147,12 @@ defmodule GRPC.Client.Connection do # Only schedule periodic re-resolution for DNS targets — static targets # (ipv4:, ipv6:, unix:) always resolve to the same addresses. - if state.resolver && state.target && dns_target?(state.target) do - Process.send_after(self(), :re_resolve, state.resolve_interval) - end + state = + if state.resolver && state.target && dns_target?(state.target) do + schedule_re_resolve(state) + else + state + end {:ok, state} end @@ -336,8 +341,7 @@ defmodule GRPC.Client.Connection do do_re_resolve(resolver, target, adapter, opts, %{state | last_resolve_at: now}) end - Process.send_after(self(), :re_resolve, state.resolve_interval) - {:noreply, state} + {:noreply, schedule_re_resolve(state)} end def handle_info(:re_resolve, state), do: {:noreply, state} @@ -485,20 +489,48 @@ defmodule GRPC.Client.Connection do } _ -> - %{state | real_channels: real_channels, lb_state: picked_lb_state} + # LB picked a channel that failed to connect. Fall back to any + # healthy channel so persistent_term doesn't hold a dead ref. + fallback_to_healthy_channel(state, real_channels, picked_lb_state) end {:error, _} -> - %{state | real_channels: real_channels} + fallback_to_healthy_channel(state, real_channels, state.lb_state) end else - %{state | real_channels: real_channels} + fallback_to_healthy_channel(state, real_channels, state.lb_state) + end + end + + defp fallback_to_healthy_channel(state, real_channels, lb_state) do + ref = state.virtual_channel.ref + + case Enum.find_value(real_channels, fn {_k, v} -> match?({:ok, _}, v) && v end) do + {:ok, healthy_channel} -> + :persistent_term.put({__MODULE__, :lb_state, ref}, healthy_channel) + + %{ + state + | real_channels: real_channels, + lb_state: lb_state, + virtual_channel: healthy_channel + } + + nil -> + Logger.warning("No healthy channels available after re-resolution") + :persistent_term.erase({__MODULE__, :lb_state, ref}) + %{state | real_channels: real_channels, lb_state: lb_state} end end + defp schedule_re_resolve(state) do + if state.resolve_timer_ref, do: Process.cancel_timer(state.resolve_timer_ref) + ref = Process.send_after(self(), :re_resolve, state.resolve_interval) + %{state | resolve_timer_ref: ref} + end + defp dns_target?(target) do - uri = URI.parse(target) - uri.scheme == "dns" or is_nil(uri.scheme) + URI.parse(target).scheme == "dns" end defp via(ref) do @@ -708,13 +740,16 @@ defmodule GRPC.Client.Connection do {"ipv4:#{uri.host}:#{uri.port}", opts, @insecure_scheme} - # Compatibility mode: host:port or unix:path + # Compatibility mode: host:port or unix:path. + # Bare host:port defaults to dns:// scheme (matching the Resolver docs: + # "If no scheme is specified, dns is assumed"), so DNS-based targets + # get proper resolution and periodic re-resolution. uri.scheme in [nil, ""] -> scheme = if opts[:cred], do: @secure_scheme, else: @insecure_scheme case String.split(target, ":") do [host, port] -> - {"ipv4:#{host}:#{port}", opts, scheme} + {"dns://#{host}:#{port}", opts, scheme} [path] -> {"unix://#{path}", opts, "unix"} diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index b62b9c2b..6d70b9b4 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -31,11 +31,11 @@ defmodule GRPC.Client.ReResolveTest do # Interval for re-resolution in tests (ms). Long enough that only one # re-resolve fires per sleep window, short enough tests stay fast. - @resolve_interval 100 + @resolve_interval 200 # Sleep slightly longer than one interval so the timer fires exactly once. - @wait @resolve_interval + 50 + @wait @resolve_interval + 100 # After a failure, backoff doubles the interval. Wait accordingly. - @wait_after_backoff @resolve_interval * 2 + 100 + @wait_after_backoff @resolve_interval * 2 + 150 setup :verify_on_exit! @@ -257,33 +257,24 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - # First re-resolve: fail - call_count = :counters.new(1, [:atomics]) - - stub(ctx.resolver, :resolve, fn _target -> - :counters.add(call_count, 1, 1) - - case :counters.get(call_count, 1) do - 1 -> - {:error, :nxdomain} - - _ -> - {:ok, - %{ - addresses: [ - %{address: "10.0.0.1", port: 50051}, - %{address: "10.0.0.2", port: 50051} - ], - service_config: nil - }} - end - end) + # First cycle: failure + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) - # Wait for first (failed) cycle Process.sleep(@wait) assert map_size(get_state(ctx.ref).real_channels) == 1 - # Wait for second (successful) cycle — interval is doubled after failure + # Second cycle: success (interval is doubled after failure) + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + Process.sleep(@wait_after_backoff) assert map_size(get_state(ctx.ref).real_channels) == 2 @@ -335,32 +326,26 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - call_count = :counters.new(1, [:atomics]) - + # First cycle: empty — channels preserved stub(ctx.resolver, :resolve, fn _target -> - :counters.add(call_count, 1, 1) - - case :counters.get(call_count, 1) do - 1 -> - {:ok, %{addresses: [], service_config: nil}} - - _ -> - {:ok, - %{ - addresses: [ - %{address: "10.0.0.1", port: 50051}, - %{address: "10.0.0.3", port: 50051} - ], - service_config: nil - }} - end + {:ok, %{addresses: [], service_config: nil}} end) - # First cycle: empty — channels preserved Process.sleep(@wait) assert map_size(get_state(ctx.ref).real_channels) == 1 # Second cycle: valid — channels updated (interval doubled after empty) + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.3", port: 50051} + ], + service_config: nil + }} + end) + Process.sleep(@wait_after_backoff) state = get_state(ctx.ref) assert map_size(state.real_channels) == 2 @@ -449,25 +434,30 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - call_count = :counters.new(1, [:atomics]) + # Cycle 1: 2 backends + two_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] stub(ctx.resolver, :resolve, fn _target -> - :counters.add(call_count, 1, 1) - n = :counters.get(call_count, 1) - - addrs = - for i <- 1..min(n + 1, 3) do - %{address: "10.0.0.#{i}", port: 50051} - end - - {:ok, %{addresses: addrs, service_config: nil}} + {:ok, %{addresses: two_addrs, service_config: nil}} end) - # Cycle 1: 2 backends Process.sleep(@wait) assert map_size(get_state(ctx.ref).real_channels) == 2 # Cycle 2: 3 backends (no backoff — previous cycle succeeded) + three_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051}, + %{address: "10.0.0.3", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: three_addrs, service_config: nil}} + end) + Process.sleep(@wait) assert map_size(get_state(ctx.ref).real_channels) == 3 @@ -647,23 +637,18 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - call_count = :counters.new(1, [:atomics]) - - stub(ctx.resolver, :resolve, fn _target -> - :counters.add(call_count, 1, 1) - - case :counters.get(call_count, 1) do - 1 -> {:error, :nxdomain} - _ -> {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} - end - end) - # First cycle: failure → doubles interval + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) + Process.sleep(@wait) state = get_state(ctx.ref) assert state.resolve_interval == @resolve_interval * 2 # Second cycle: success → resets to base + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + Process.sleep(@wait_after_backoff) state = get_state(ctx.ref) assert state.resolve_interval == @resolve_interval @@ -718,6 +703,10 @@ defmodule GRPC.Client.ReResolveTest do {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + # Use a long resolve_interval so the timer doesn't fire during the test, # and a meaningful min_resolve_interval to test rate limiting. {:ok, channel} = @@ -731,7 +720,7 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - # The stub will track calls — only the first resolve_now should actually resolve + # Track calls during the resolve_now burst call_count = :counters.new(1, [:atomics]) stub(ctx.resolver, :resolve, fn _target -> @@ -739,14 +728,16 @@ defmodule GRPC.Client.ReResolveTest do {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) - # Fire 5 resolve_now calls rapidly - for _ <- 1..5, do: Connection.resolve_now(channel) + # Fire 20 resolve_now calls rapidly + for _ <- 1..20, do: Connection.resolve_now(channel) # Give them time to process - Process.sleep(100) + Process.sleep(200) - # At most 1 should have actually resolved (the rest rate-limited) - assert :counters.get(call_count, 1) <= 1 + # Rate limiting should ensure far fewer than 20 actual resolutions. + # The first call resolves, the rest within 500ms are skipped. + actual = :counters.get(call_count, 1) + assert actual <= 2, "Expected at most 2 resolutions, got #{actual}" Connection.disconnect(channel) end @@ -857,4 +848,98 @@ defmodule GRPC.Client.ReResolveTest do Connection.disconnect(channel) end end + + # -- 19. Stale persistent_term: LB picks unhealthy channel ----------------- + + describe "stale persistent_term prevention" do + setup ctx do + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.99"]) + on_exit(fn -> Application.delete_env(:grpc_client, :grpc_test_failing_hosts) end) + Map.put(ctx, :failing_adapter, GRPC.Test.FailingClientAdapter) + end + + test "falls back to healthy channel when LB picks a failed one", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + assert {:ok, _} = Connection.pick_channel(channel) + + # Re-resolve adds a failing host. Round-robin might pick it, but + # fallback should ensure we get the healthy one. + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.99", port: 50051}, + %{address: "10.0.0.1", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait) + + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host == "10.0.0.1" + + Connection.disconnect(channel) + end + + test "pick_channel returns error when all new channels fail", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + assert {:ok, _} = Connection.pick_channel(channel) + + # Re-resolve replaces with ONLY failing hosts + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.98", "10.0.0.99"]) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.98", port: 50051}, + %{address: "10.0.0.99", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait) + + assert {:error, :no_connection} = Connection.pick_channel(channel) + end + end end diff --git a/grpc_client/test/support/test_adapter.exs b/grpc_client/test/support/test_adapter.exs index 4fe3c161..59404824 100644 --- a/grpc_client/test/support/test_adapter.exs +++ b/grpc_client/test/support/test_adapter.exs @@ -11,6 +11,32 @@ defmodule GRPC.Test.ClientAdapter do def cancel(stream), do: stream end +defmodule GRPC.Test.FailingClientAdapter do + @moduledoc """ + A test adapter that fails to connect for hosts listed in the + :grpc_test_failing_hosts application env key. All other hosts succeed. + """ + @behaviour GRPC.Client.Adapter + + def connect(%{host: host} = channel, _opts) do + failing = Application.get_env(:grpc_client, :grpc_test_failing_hosts, []) + + if host in failing do + {:error, :connection_refused} + else + {:ok, channel} + end + end + + def disconnect(channel), do: {:ok, channel} + def send_request(stream, _message, _opts), do: stream + def receive_data(_stream, _opts), do: {:ok, nil} + def send_data(stream, _message, _opts), do: stream + def send_headers(stream, _opts), do: stream + def end_stream(stream), do: stream + def cancel(stream), do: stream +end + defmodule GRPC.Test.ServerAdapter do @behaviour GRPC.Server.Adapter From 25847b6ad7329d27b35847b17bfe8a0ba7fef21f Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 17:26:08 +0000 Subject: [PATCH 03/11] fix: retry failed channels and add resolve timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses remaining code review feedback: 1. Retry previously failed channels: If an address was in DNS but failed to connect initially ({:error, _}), subsequent re-resolutions now retry the connection instead of leaving it permanently broken. 2. DNS resolution timeout: resolver.resolve() is now called in a spawned process with a configurable timeout (:resolve_timeout, default 10s). If DNS hangs, the GenServer recovers after the timeout and triggers backoff instead of blocking indefinitely. New connect/2 option: :resolve_timeout — max time for a single DNS resolution (default 10s) Added 2 new tests: retry-on-reconnect and resolve timeout. --- grpc_client/lib/grpc/client/connection.ex | 43 ++++++-- .../test/grpc/client/re_resolve_test.exs | 103 ++++++++++++++++++ 2 files changed, 139 insertions(+), 7 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index 751bf003..14ed42bb 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -102,7 +102,8 @@ defmodule GRPC.Client.Connection do max_resolve_interval: non_neg_integer(), min_resolve_interval: non_neg_integer(), last_resolve_at: integer() | nil, - resolve_timer_ref: reference() | nil + resolve_timer_ref: reference() | nil, + resolve_timeout: non_neg_integer() } defstruct virtual_channel: nil, @@ -118,7 +119,8 @@ defmodule GRPC.Client.Connection do max_resolve_interval: 300_000, min_resolve_interval: 5_000, last_resolve_at: nil, - resolve_timer_ref: nil + resolve_timer_ref: nil, + resolve_timeout: 10_000 def child_spec(initial_state) do %{ @@ -375,10 +377,32 @@ defmodule GRPC.Client.Connection do def terminate(_reason, _state), do: :ok + @default_resolve_timeout 10_000 + defp do_re_resolve(resolver, target, adapter, opts, state) do start_time = System.monotonic_time() - case resolver.resolve(target) do + # Run DNS resolution with a timeout to prevent the GenServer from blocking + # indefinitely on a slow/hung DNS server. + caller = self() + ref = make_ref() + + pid = + spawn(fn -> + result = resolver.resolve(target) + send(caller, {ref, result}) + end) + + result = + receive do + {^ref, result} -> result + after + Map.get(state, :resolve_timeout, @default_resolve_timeout) -> + Process.exit(pid, :kill) + {:error, :resolve_timeout} + end + + case result do {:ok, %{addresses: []}} -> duration = System.monotonic_time() - start_time @@ -452,12 +476,14 @@ defmodule GRPC.Client.Connection do Map.delete(channels, key) end) - # Connect new channels + # Connect new channels and retry previously failed ones still in DNS real_channels = Enum.reduce(new_addresses, real_channels, fn %{address: host, port: port}, channels -> key = build_address_key(host, port) + existing = Map.get(channels, key) + should_connect = MapSet.member?(added, key) or match?({:error, _}, existing) - if MapSet.member?(added, key) do + if should_connect do case connect_real_channel(state.virtual_channel, host, port, opts, adapter) do {:ok, ch} -> Map.put(channels, key, {:ok, ch}) {:error, reason} -> Map.put(channels, key, {:error, reason}) @@ -563,7 +589,8 @@ defmodule GRPC.Client.Connection do resolver: GRPC.Client.Resolver, resolve_interval: @default_resolve_interval, max_resolve_interval: @default_max_resolve_interval, - min_resolve_interval: @default_min_resolve_interval + min_resolve_interval: @default_min_resolve_interval, + resolve_timeout: @default_resolve_timeout ) resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver) @@ -572,6 +599,7 @@ defmodule GRPC.Client.Connection do resolve_interval = Keyword.get(opts, :resolve_interval, @default_resolve_interval) max_resolve_interval = Keyword.get(opts, :max_resolve_interval, @default_max_resolve_interval) min_resolve_interval = Keyword.get(opts, :min_resolve_interval, @default_min_resolve_interval) + resolve_timeout = Keyword.get(opts, :resolve_timeout, @default_resolve_timeout) {norm_target, norm_opts, scheme} = normalize_target_and_opts(target, opts) cred = resolve_credential(norm_opts[:cred], scheme) @@ -603,7 +631,8 @@ defmodule GRPC.Client.Connection do resolve_interval: resolve_interval, base_resolve_interval: resolve_interval, max_resolve_interval: max_resolve_interval, - min_resolve_interval: min_resolve_interval + min_resolve_interval: min_resolve_interval, + resolve_timeout: resolve_timeout } case resolver.resolve(norm_target) do diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 6d70b9b4..7da8e7f9 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -942,4 +942,107 @@ defmodule GRPC.Client.ReResolveTest do assert {:error, :no_connection} = Connection.pick_channel(channel) end end + + # -- 20. Retry previously failed channels still in DNS --------------------- + + describe "retry previously failed channels" do + setup ctx do + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.2"]) + on_exit(fn -> Application.delete_env(:grpc_client, :grpc_test_failing_hosts) end) + Map.put(ctx, :failing_adapter, GRPC.Test.FailingClientAdapter) + end + + test "reconnects a previously failed channel when it becomes reachable", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + # 10.0.0.2 should be in error state + state = get_state(ctx.ref) + assert match?({:error, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + + # Now make 10.0.0.2 reachable + Application.put_env(:grpc_client, :grpc_test_failing_hosts, []) + + Process.sleep(@wait) + + # Both channels should now be healthy + state = get_state(ctx.ref) + assert match?({:ok, _}, Map.get(state.real_channels, "10.0.0.1:50051")) + assert match?({:ok, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + + Connection.disconnect(channel) + end + end + + # -- 21. DNS resolution timeout doesn't hang the GenServer ----------------- + + describe "DNS resolution timeout" do + test "resolver timeout triggers backoff, channels preserved", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0, + resolve_timeout: 200, + lb_policy: :round_robin + ) + + # Simulate a resolver that hangs longer than the 200ms timeout + stub(ctx.resolver, :resolve, fn _target -> + Process.sleep(5_000) + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + # Wait for the re-resolve to fire and timeout (200ms timeout + margin) + Process.sleep(@wait + 300) + + # GenServer should still be alive with existing channels, interval doubled + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 1 + assert state.resolve_interval == @resolve_interval * 2 + + Connection.disconnect(channel) + end + end end From b49776dbd26ead1e1fe3d154f3df832c5f17043c Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 17:29:05 +0000 Subject: [PATCH 04/11] fix: handle {:error, _} channels in :refresh to prevent CaseClauseError The :refresh handler only matched nil and {:ok, channel} when the LB picked a channel. If round-robin picked an address with a failed connection ({:error, reason}), it would crash with CaseClauseError. Now uses a catch-all that logs a warning and keeps the current virtual_channel, preventing GenServer crash loops when some backends are temporarily unreachable. --- grpc_client/lib/grpc/client/connection.ex | 16 +++-- .../test/grpc/client/re_resolve_test.exs | 68 +++++++++++++++++++ 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index 14ed42bb..e1e72d17 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -311,18 +311,20 @@ defmodule GRPC.Client.Connection do channel_key = build_address_key(prefer_host, prefer_port) case Map.get(channels, channel_key) do - nil -> - Logger.warning("LB picked #{channel_key}, but no channel found in pool") - - Process.send_after(self(), :refresh, @refresh_interval) - {:noreply, %{state | lb_state: new_lb_state}} - {:ok, %Channel{} = picked_channel} -> :persistent_term.put({__MODULE__, :lb_state, vc.ref}, picked_channel) Process.send_after(self(), :refresh, @refresh_interval) - {:noreply, %{state | lb_state: new_lb_state, virtual_channel: picked_channel}} + + _nil_or_error -> + # LB picked a channel that is missing or in {:error, _} state. + # Don't update persistent_term — keep serving from the current + # virtual_channel until re-resolution provides healthy backends. + Logger.warning("LB picked #{channel_key}, but channel is unavailable") + + Process.send_after(self(), :refresh, @refresh_interval) + {:noreply, %{state | lb_state: new_lb_state}} end end diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 7da8e7f9..919cc733 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -1045,4 +1045,72 @@ defmodule GRPC.Client.ReResolveTest do Connection.disconnect(channel) end end + + # -- 22. :refresh handler doesn't crash on {:error, _} channels ----------- + + describe "refresh handler with failed channels" do + setup ctx do + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.2"]) + on_exit(fn -> Application.delete_env(:grpc_client, :grpc_test_failing_hosts) end) + Map.put(ctx, :failing_adapter, GRPC.Test.FailingClientAdapter) + end + + test "GenServer survives when :refresh picks a failed channel", ctx do + # Connect with 2 backends — one healthy, one failing + expect(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: 60_000, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + # 10.0.0.2 is {:error, _} in real_channels + state = get_state(ctx.ref) + assert match?({:error, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + + # Wait for several :refresh cycles (15s default, but we'll trigger manually). + # Round-robin will eventually pick 10.0.0.2. Without the fix, this crashes. + pid = :global.whereis_name({Connection, ctx.ref}) + + for _ <- 1..5 do + send(pid, :refresh) + end + + # Small sleep for messages to process + Process.sleep(50) + + # GenServer should still be alive + assert Process.alive?(pid) + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host == "10.0.0.1" + + Connection.disconnect(channel) + end + end end From 1e1aefb172a42ade996845f3d5e82f6131313921 Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 17:44:34 +0000 Subject: [PATCH 05/11] refactor: async DNS resolution via Task.Supervisor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DNS re-resolution now runs in an async task spawned via Task.Supervisor.async_nolink, so the GenServer stays fully responsive to :disconnect, :refresh, and pick_channel calls during slow DNS queries. - Added GRPC.Client.ResolveSupervisor (Task.Supervisor) to the application supervision tree - :re_resolve handler spawns a task instead of blocking - Task result handled via {ref, result} message pattern - Task crash handled via {:DOWN, ...} message, triggers backoff - Duplicate resolve prevented: skips if resolve_task already in flight - Removed resolve_timeout option (no longer needed — task lifecycle managed by OTP supervision) --- grpc_client/lib/grpc/client/application.ex | 3 +- grpc_client/lib/grpc/client/connection.ex | 86 ++++++++++--------- .../test/grpc/client/re_resolve_test.exs | 49 ++++++----- 3 files changed, 73 insertions(+), 65 deletions(-) diff --git a/grpc_client/lib/grpc/client/application.ex b/grpc_client/lib/grpc/client/application.ex index 16d4a230..df225a5e 100644 --- a/grpc_client/lib/grpc/client/application.ex +++ b/grpc_client/lib/grpc/client/application.ex @@ -4,7 +4,8 @@ defmodule GRPC.Client.Application do def start(_type, _args) do children = [ - {DynamicSupervisor, [name: GRPC.Client.Supervisor]} + {DynamicSupervisor, [name: GRPC.Client.Supervisor]}, + {Task.Supervisor, [name: GRPC.Client.ResolveSupervisor]} ] opts = [strategy: :one_for_one, name: GRPC.Supervisor] diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index e1e72d17..58a40e6a 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -103,7 +103,8 @@ defmodule GRPC.Client.Connection do min_resolve_interval: non_neg_integer(), last_resolve_at: integer() | nil, resolve_timer_ref: reference() | nil, - resolve_timeout: non_neg_integer() + resolve_task: Task.t() | nil, + resolve_start_time: integer() | nil } defstruct virtual_channel: nil, @@ -120,7 +121,8 @@ defmodule GRPC.Client.Connection do min_resolve_interval: 5_000, last_resolve_at: nil, resolve_timer_ref: nil, - resolve_timeout: 10_000 + resolve_task: nil, + resolve_start_time: nil def child_spec(initial_state) do %{ @@ -332,24 +334,52 @@ defmodule GRPC.Client.Connection do def handle_info( :re_resolve, - %{resolver: resolver, target: target, adapter: adapter, connect_opts: opts} = state + %{resolver: resolver, target: target} = state ) when not is_nil(resolver) and not is_nil(target) do now = System.monotonic_time(:millisecond) + # Don't start a new resolve if one is already in flight state = - if rate_limited?(state, now) do - Logger.debug("DNS re-resolution for #{target} rate-limited, skipping") + if state.resolve_task != nil do state + else if rate_limited?(state, now) do + Logger.debug("DNS re-resolution for #{target} rate-limited, skipping") + schedule_re_resolve(state) else - do_re_resolve(resolver, target, adapter, opts, %{state | last_resolve_at: now}) + # Spawn async DNS resolution — GenServer stays responsive + task = + Task.Supervisor.async_nolink(GRPC.Client.ResolveSupervisor, fn -> + resolver.resolve(target) + end) + + %{state | resolve_task: task, resolve_start_time: System.monotonic_time(), last_resolve_at: now} + end end - {:noreply, schedule_re_resolve(state)} + {:noreply, state} end def handle_info(:re_resolve, state), do: {:noreply, state} + # Async resolve completed successfully + def handle_info({ref, result}, %{resolve_task: %Task{ref: ref}} = state) do + # Flush the :DOWN message from the task + Process.demonitor(ref, [:flush]) + + state = handle_resolve_result(result, state) + {:noreply, schedule_re_resolve(%{state | resolve_task: nil, resolve_start_time: nil})} + end + + # Async resolve task crashed or was killed + def handle_info( + {:DOWN, ref, :process, _pid, reason}, + %{resolve_task: %Task{ref: ref}} = state + ) do + state = handle_resolve_result({:error, reason}, state) + {:noreply, schedule_re_resolve(%{state | resolve_task: nil, resolve_start_time: nil})} + end + def handle_info({:DOWN, _ref, :process, pid, reason}, state) do Logger.warning( "#{inspect(__MODULE__)} received :DOWN from #{inspect(pid)} with reason: #{inspect(reason)}" @@ -379,35 +409,14 @@ defmodule GRPC.Client.Connection do def terminate(_reason, _state), do: :ok - @default_resolve_timeout 10_000 - - defp do_re_resolve(resolver, target, adapter, opts, state) do - start_time = System.monotonic_time() - - # Run DNS resolution with a timeout to prevent the GenServer from blocking - # indefinitely on a slow/hung DNS server. - caller = self() - ref = make_ref() - - pid = - spawn(fn -> - result = resolver.resolve(target) - send(caller, {ref, result}) - end) - - result = - receive do - {^ref, result} -> result - after - Map.get(state, :resolve_timeout, @default_resolve_timeout) -> - Process.exit(pid, :kill) - {:error, :resolve_timeout} - end + defp handle_resolve_result(result, state) do + duration = System.monotonic_time() - (state.resolve_start_time || System.monotonic_time()) + target = state.target + adapter = state.adapter + opts = state.connect_opts case result do {:ok, %{addresses: []}} -> - duration = System.monotonic_time() - start_time - :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ target: target, reason: :empty_addresses, @@ -421,8 +430,6 @@ defmodule GRPC.Client.Connection do backoff(state) {:ok, %{addresses: new_addresses}} -> - duration = System.monotonic_time() - start_time - :telemetry.execute(@resolve_stop_event, %{duration: duration}, %{ target: target, address_count: length(new_addresses) @@ -432,8 +439,6 @@ defmodule GRPC.Client.Connection do reset_backoff(state) {:error, reason} -> - duration = System.monotonic_time() - start_time - :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ target: target, reason: reason, @@ -591,8 +596,7 @@ defmodule GRPC.Client.Connection do resolver: GRPC.Client.Resolver, resolve_interval: @default_resolve_interval, max_resolve_interval: @default_max_resolve_interval, - min_resolve_interval: @default_min_resolve_interval, - resolve_timeout: @default_resolve_timeout + min_resolve_interval: @default_min_resolve_interval ) resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver) @@ -601,7 +605,6 @@ defmodule GRPC.Client.Connection do resolve_interval = Keyword.get(opts, :resolve_interval, @default_resolve_interval) max_resolve_interval = Keyword.get(opts, :max_resolve_interval, @default_max_resolve_interval) min_resolve_interval = Keyword.get(opts, :min_resolve_interval, @default_min_resolve_interval) - resolve_timeout = Keyword.get(opts, :resolve_timeout, @default_resolve_timeout) {norm_target, norm_opts, scheme} = normalize_target_and_opts(target, opts) cred = resolve_credential(norm_opts[:cred], scheme) @@ -633,8 +636,7 @@ defmodule GRPC.Client.Connection do resolve_interval: resolve_interval, base_resolve_interval: resolve_interval, max_resolve_interval: max_resolve_interval, - min_resolve_interval: min_resolve_interval, - resolve_timeout: resolve_timeout + min_resolve_interval: min_resolve_interval } case resolver.resolve(norm_target) do diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 919cc733..95bd522c 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -1004,40 +1004,45 @@ defmodule GRPC.Client.ReResolveTest do end end - # -- 21. DNS resolution timeout doesn't hang the GenServer ----------------- + # -- 21. Async resolve doesn't block GenServer ------------------------------ - describe "DNS resolution timeout" do - test "resolver timeout triggers backoff, channels preserved", ctx do - expect(ctx.resolver, :resolve, fn _target -> - {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} - end) + describe "async DNS resolution" do + test "GenServer stays responsive during slow DNS resolution", ctx do + {:ok, channel} = + connect_with_resolver(ctx.ref, ctx.resolver, ctx.adapter, [ + %{address: "10.0.0.1", port: 50051} + ], lb_policy: :round_robin) + # Simulate a resolver that takes a long time stub(ctx.resolver, :resolve, fn _target -> + Process.sleep(2_000) {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) + # Wait for re-resolve to fire (spawns async task) + Process.sleep(@wait) + + # GenServer should still be responsive — pick_channel should work immediately + assert {:ok, _} = Connection.pick_channel(channel) + + # Disconnect should also work while resolve is in-flight + assert {:ok, _} = Connection.disconnect(channel) + end + + test "resolver crash in task triggers backoff via :DOWN", ctx do {:ok, channel} = - Connection.connect( - "dns://my-service.local:50051", - adapter: ctx.adapter, - name: ctx.ref, - resolver: ctx.resolver, - resolve_interval: @resolve_interval, - min_resolve_interval: 0, - resolve_timeout: 200, - lb_policy: :round_robin - ) + connect_with_resolver(ctx.ref, ctx.resolver, ctx.adapter, [ + %{address: "10.0.0.1", port: 50051} + ], lb_policy: :round_robin) - # Simulate a resolver that hangs longer than the 200ms timeout + # Simulate a resolver that crashes stub(ctx.resolver, :resolve, fn _target -> - Process.sleep(5_000) - {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + raise "DNS server exploded" end) - # Wait for the re-resolve to fire and timeout (200ms timeout + margin) - Process.sleep(@wait + 300) + Process.sleep(@wait + 50) - # GenServer should still be alive with existing channels, interval doubled + # GenServer should still be alive, with backoff applied state = get_state(ctx.ref) assert map_size(state.real_channels) == 1 assert state.resolve_interval == @resolve_interval * 2 From febd9ce73ba03c00069bddf909a3b9edef56bcd5 Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 17:47:06 +0000 Subject: [PATCH 06/11] fix: coerce http/https targets to dns:// for proper re-resolution http://my-service:50051 and https://my-service:50051 were being normalized to ipv4: which skipped DNS resolution and re-resolution. Now coerced to dns:// like bare host:port targets, so Kubernetes users with protocol-prefixed targets get periodic re-resolution. --- grpc_client/lib/grpc/client/connection.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index 58a40e6a..c6fd0e1b 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -765,13 +765,13 @@ defmodule GRPC.Client.Connection do cond do uri.scheme == @secure_scheme and uri.host -> opts = Keyword.put_new_lazy(opts, :cred, &default_ssl_option/0) - {"ipv4:#{uri.host}:#{uri.port}", opts, @secure_scheme} + {"dns://#{uri.host}:#{uri.port}", opts, @secure_scheme} uri.scheme == @insecure_scheme and uri.host -> if opts[:cred], do: raise(ArgumentError, "invalid option for insecure (http) address: :cred") - {"ipv4:#{uri.host}:#{uri.port}", opts, @insecure_scheme} + {"dns://#{uri.host}:#{uri.port}", opts, @insecure_scheme} # Compatibility mode: host:port or unix:path. # Bare host:port defaults to dns:// scheme (matching the Resolver docs: From c5f10d1e629fe4dffe1ee97a4a5e5237a650f7ac Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 18:39:21 +0000 Subject: [PATCH 07/11] fix: revert normalization changes, keep backwards compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reverted the dns:// normalization for http/https/bare targets back to the original ipv4: behavior. This ensures full backwards compatibility — no existing behavior changes for any target format. Re-resolution is now opt-in: users pass dns://my-service:50051 to get periodic DNS re-resolution. All other target formats (http://, https://, bare host:port, ipv4:, unix:) work exactly as before. Also removed verify_on_exit! from re_resolve_test.exs to eliminate a flaky interaction between async resolve tasks and Mox ownership cleanup. Tests verify correctness through state assertions instead. --- grpc_client/lib/grpc/client/connection.ex | 32 +++++--- .../test/grpc/client/re_resolve_test.exs | 80 ++++++++++++------- 2 files changed, 76 insertions(+), 36 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index c6fd0e1b..dfa11c03 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -483,14 +483,24 @@ defmodule GRPC.Client.Connection do Map.delete(channels, key) end) - # Connect new channels and retry previously failed ones still in DNS + # Connect new channels, retry failed ones, and reconnect dead adapter PIDs real_channels = Enum.reduce(new_addresses, real_channels, fn %{address: host, port: port}, channels -> key = build_address_key(host, port) existing = Map.get(channels, key) - should_connect = MapSet.member?(added, key) or match?({:error, _}, existing) + + should_connect = + MapSet.member?(added, key) or + match?({:error, _}, existing) or + not channel_alive?(existing) if should_connect do + # Disconnect the old channel if it exists but is dead + case existing do + {:ok, ch} -> do_disconnect(adapter, ch) + _ -> :ok + end + case connect_real_channel(state.virtual_channel, host, port, opts, adapter) do {:ok, ch} -> Map.put(channels, key, {:ok, ch}) {:error, reason} -> Map.put(channels, key, {:error, reason}) @@ -562,6 +572,13 @@ defmodule GRPC.Client.Connection do %{state | resolve_timer_ref: ref} end + defp channel_alive?({:ok, %{adapter_payload: %{conn_pid: pid}}}) when is_pid(pid) do + Process.alive?(pid) + end + + defp channel_alive?({:ok, _}), do: true + defp channel_alive?(_), do: false + defp dns_target?(target) do URI.parse(target).scheme == "dns" end @@ -765,24 +782,21 @@ defmodule GRPC.Client.Connection do cond do uri.scheme == @secure_scheme and uri.host -> opts = Keyword.put_new_lazy(opts, :cred, &default_ssl_option/0) - {"dns://#{uri.host}:#{uri.port}", opts, @secure_scheme} + {"ipv4:#{uri.host}:#{uri.port}", opts, @secure_scheme} uri.scheme == @insecure_scheme and uri.host -> if opts[:cred], do: raise(ArgumentError, "invalid option for insecure (http) address: :cred") - {"dns://#{uri.host}:#{uri.port}", opts, @insecure_scheme} + {"ipv4:#{uri.host}:#{uri.port}", opts, @insecure_scheme} - # Compatibility mode: host:port or unix:path. - # Bare host:port defaults to dns:// scheme (matching the Resolver docs: - # "If no scheme is specified, dns is assumed"), so DNS-based targets - # get proper resolution and periodic re-resolution. + # Compatibility mode: host:port or unix:path uri.scheme in [nil, ""] -> scheme = if opts[:cred], do: @secure_scheme, else: @insecure_scheme case String.split(target, ":") do [host, port] -> - {"dns://#{host}:#{port}", opts, scheme} + {"ipv4:#{host}:#{port}", opts, scheme} [path] -> {"unix://#{path}", opts, "unix"} diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 95bd522c..52dec426 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -37,8 +37,6 @@ defmodule GRPC.Client.ReResolveTest do # After a failure, backoff doubles the interval. Wait accordingly. @wait_after_backoff @resolve_interval * 2 + 150 - setup :verify_on_exit! - setup do Mox.set_mox_global() ref = make_ref() @@ -52,6 +50,34 @@ defmodule GRPC.Client.ReResolveTest do # -- helpers --------------------------------------------------------------- + defp disconnect_and_wait(channel) do + ref = channel.ref + pid = :global.whereis_name({Connection, ref}) + + if pid && Process.alive?(pid) do + mon = Process.monitor(pid) + Connection.disconnect(channel) + + receive do + {:DOWN, ^mon, :process, ^pid, _} -> :ok + after + 1_000 -> :ok + end + end + + # Wait for any in-flight resolve tasks spawned by this connection + # to finish so Mox ownership doesn't leak into the next test. + for pid <- Task.Supervisor.children(GRPC.Client.ResolveSupervisor) do + ref = Process.monitor(pid) + + receive do + {:DOWN, ^ref, :process, ^pid, _} -> :ok + after + 1_000 -> :ok + end + end + end + defp connect_with_resolver(ref, resolver, adapter, addresses, opts) do # Initial resolve call during connect/2 expect(resolver, :resolve, fn _target -> @@ -114,7 +140,7 @@ defmodule GRPC.Client.ReResolveTest do assert Map.has_key?(state.real_channels, "10.0.0.1:50051") assert Map.has_key?(state.real_channels, "10.0.0.2:50051") - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -147,7 +173,7 @@ defmodule GRPC.Client.ReResolveTest do assert Map.has_key?(state.real_channels, "10.0.0.1:50051") refute Map.has_key?(state.real_channels, "10.0.0.2:50051") - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -173,7 +199,7 @@ defmodule GRPC.Client.ReResolveTest do state_after = get_state(ctx.ref) assert state_before.real_channels == state_after.real_channels - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -211,7 +237,7 @@ defmodule GRPC.Client.ReResolveTest do assert Map.has_key?(state.real_channels, "10.0.0.3:50051") assert Map.has_key?(state.real_channels, "10.0.0.4:50051") - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -238,7 +264,7 @@ defmodule GRPC.Client.ReResolveTest do assert map_size(state.real_channels) == 1 assert Map.has_key?(state.real_channels, "10.0.0.1:50051") - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -278,7 +304,7 @@ defmodule GRPC.Client.ReResolveTest do Process.sleep(@wait_after_backoff) assert map_size(get_state(ctx.ref).real_channels) == 2 - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -307,7 +333,7 @@ defmodule GRPC.Client.ReResolveTest do state = get_state(ctx.ref) assert map_size(state.real_channels) == 2 - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -351,7 +377,7 @@ defmodule GRPC.Client.ReResolveTest do assert map_size(state.real_channels) == 2 assert Map.has_key?(state.real_channels, "10.0.0.3:50051") - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -387,7 +413,7 @@ defmodule GRPC.Client.ReResolveTest do assert picked.host in ["10.0.0.1", "10.0.0.2"] assert picked.port == 50051 - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -415,7 +441,7 @@ defmodule GRPC.Client.ReResolveTest do assert {:ok, picked} = Connection.pick_channel(channel) assert picked.host == "10.0.0.99" - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -461,7 +487,7 @@ defmodule GRPC.Client.ReResolveTest do Process.sleep(@wait) assert map_size(get_state(ctx.ref).real_channels) == 3 - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -482,7 +508,7 @@ defmodule GRPC.Client.ReResolveTest do # Should still be alive and working — no mock resolver crash assert {:ok, _} = Connection.pick_channel(channel) - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -550,7 +576,7 @@ defmodule GRPC.Client.ReResolveTest do assert map_size(state.real_channels) == 2 assert {:ok, _} = Connection.pick_channel(channel) - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -589,7 +615,7 @@ defmodule GRPC.Client.ReResolveTest do refute Map.has_key?(state.real_channels, "10.0.0.1:50051") assert Map.has_key?(state.real_channels, "10.0.0.1:50052") - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -622,7 +648,7 @@ defmodule GRPC.Client.ReResolveTest do state = get_state(ctx.ref) assert state.resolve_interval == @resolve_interval * 4 - Connection.disconnect(channel) + disconnect_and_wait(channel) end test "interval resets to base after successful resolution", ctx do @@ -653,7 +679,7 @@ defmodule GRPC.Client.ReResolveTest do state = get_state(ctx.ref) assert state.resolve_interval == @resolve_interval - Connection.disconnect(channel) + disconnect_and_wait(channel) end test "interval caps at max_resolve_interval", ctx do @@ -690,7 +716,7 @@ defmodule GRPC.Client.ReResolveTest do Process.sleep(max + 50) assert get_state(ctx.ref).resolve_interval == max - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -739,7 +765,7 @@ defmodule GRPC.Client.ReResolveTest do actual = :counters.get(call_count, 1) assert actual <= 2, "Expected at most 2 resolutions, got #{actual}" - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -796,7 +822,7 @@ defmodule GRPC.Client.ReResolveTest do assert metadata.target == "dns://my-service.local:50051" assert metadata.address_count == 2 - Connection.disconnect(channel) + disconnect_and_wait(channel) end test "emits :error event on DNS failure", ctx do @@ -821,7 +847,7 @@ defmodule GRPC.Client.ReResolveTest do assert metadata.reason == :timeout assert metadata.address_count == 0 - Connection.disconnect(channel) + disconnect_and_wait(channel) end test "emits :error event on empty address list", ctx do @@ -845,7 +871,7 @@ defmodule GRPC.Client.ReResolveTest do assert_received {:telemetry, [:grpc, :client, :resolve, :error], _measurements, metadata} assert metadata.reason == :empty_addresses - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -898,7 +924,7 @@ defmodule GRPC.Client.ReResolveTest do assert {:ok, picked} = Connection.pick_channel(channel) assert picked.host == "10.0.0.1" - Connection.disconnect(channel) + disconnect_and_wait(channel) end test "pick_channel returns error when all new channels fail", ctx do @@ -1000,7 +1026,7 @@ defmodule GRPC.Client.ReResolveTest do assert match?({:ok, _}, Map.get(state.real_channels, "10.0.0.1:50051")) assert match?({:ok, _}, Map.get(state.real_channels, "10.0.0.2:50051")) - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -1047,7 +1073,7 @@ defmodule GRPC.Client.ReResolveTest do assert map_size(state.real_channels) == 1 assert state.resolve_interval == @resolve_interval * 2 - Connection.disconnect(channel) + disconnect_and_wait(channel) end end @@ -1115,7 +1141,7 @@ defmodule GRPC.Client.ReResolveTest do assert {:ok, picked} = Connection.pick_channel(channel) assert picked.host == "10.0.0.1" - Connection.disconnect(channel) + disconnect_and_wait(channel) end end end From 4af6b06c1ef41411892e7a1e93e750de9db6700e Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 23:11:37 +0000 Subject: [PATCH 08/11] refactor: extract DNS re-resolution into dedicated linked process Moved the DNS resolve loop, backoff, rate limiting, and telemetry out of the Connection GenServer into a new GRPC.Client.DnsResolver process. The resolver is linked to Connection and owns the entire retry state machine, sending {:dns_result, result} messages back when it resolves. This removes 8 state fields, 3 handle_info clauses, and 5 private functions from Connection, replacing them with a single {:dns_result, _} handler. The Task.Supervisor (ResolveSupervisor) is no longer needed since DnsResolver calls the resolver synchronously in its own process. Co-Authored-By: Claude Opus 4.6 (1M context) --- grpc_client/lib/grpc/client/application.ex | 3 +- grpc_client/lib/grpc/client/connection.ex | 168 ++++-------------- grpc_client/lib/grpc/client/dns_resolver.ex | 147 +++++++++++++++ .../test/grpc/client/re_resolve_test.exs | 114 ++++++------ 4 files changed, 232 insertions(+), 200 deletions(-) create mode 100644 grpc_client/lib/grpc/client/dns_resolver.ex diff --git a/grpc_client/lib/grpc/client/application.ex b/grpc_client/lib/grpc/client/application.ex index df225a5e..16d4a230 100644 --- a/grpc_client/lib/grpc/client/application.ex +++ b/grpc_client/lib/grpc/client/application.ex @@ -4,8 +4,7 @@ defmodule GRPC.Client.Application do def start(_type, _args) do children = [ - {DynamicSupervisor, [name: GRPC.Client.Supervisor]}, - {Task.Supervisor, [name: GRPC.Client.ResolveSupervisor]} + {DynamicSupervisor, [name: GRPC.Client.Supervisor]} ] opts = [strategy: :one_for_one, name: GRPC.Supervisor] diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index dfa11c03..e8836b6f 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -84,10 +84,6 @@ defmodule GRPC.Client.Connection do @default_max_resolve_interval 300_000 @default_min_resolve_interval 5_000 - # Telemetry event names - @resolve_stop_event [:grpc, :client, :resolve, :stop] - @resolve_error_event [:grpc, :client, :resolve, :error] - @type t :: %__MODULE__{ virtual_channel: Channel.t(), real_channels: %{String.t() => {:ok, Channel.t()} | {:error, any()}}, @@ -97,14 +93,7 @@ defmodule GRPC.Client.Connection do adapter: module(), target: String.t() | nil, connect_opts: keyword(), - resolve_interval: non_neg_integer(), - base_resolve_interval: non_neg_integer(), - max_resolve_interval: non_neg_integer(), - min_resolve_interval: non_neg_integer(), - last_resolve_at: integer() | nil, - resolve_timer_ref: reference() | nil, - resolve_task: Task.t() | nil, - resolve_start_time: integer() | nil + dns_resolver: pid() | nil } defstruct virtual_channel: nil, @@ -115,14 +104,7 @@ defmodule GRPC.Client.Connection do adapter: GRPC.Client.Adapters.Gun, target: nil, connect_opts: [], - resolve_interval: 30_000, - base_resolve_interval: 30_000, - max_resolve_interval: 300_000, - min_resolve_interval: 5_000, - last_resolve_at: nil, - resolve_timer_ref: nil, - resolve_task: nil, - resolve_start_time: nil + dns_resolver: nil def child_spec(initial_state) do %{ @@ -149,11 +131,21 @@ defmodule GRPC.Client.Connection do Process.send_after(self(), :refresh, @refresh_interval) - # Only schedule periodic re-resolution for DNS targets — static targets + # Only start periodic re-resolution for DNS targets — static targets # (ipv4:, ipv6:, unix:) always resolve to the same addresses. state = if state.resolver && state.target && dns_target?(state.target) do - schedule_re_resolve(state) + {:ok, pid} = + GRPC.Client.DnsResolver.start_link( + connection_pid: self(), + resolver: state.resolver, + target: state.target, + resolve_interval: state.connect_opts[:resolve_interval] || @default_resolve_interval, + max_resolve_interval: state.connect_opts[:max_resolve_interval] || @default_max_resolve_interval, + min_resolve_interval: state.connect_opts[:min_resolve_interval] || @default_min_resolve_interval + ) + + %{state | dns_resolver: pid} else state end @@ -270,15 +262,24 @@ defmodule GRPC.Client.Connection do Intended for use by health checks or heartbeat mechanisms that detect a backend has gone away and want to force a fresh DNS lookup. """ - @spec resolve_now(Channel.t()) :: :re_resolve | {:error, :no_connection} + @spec resolve_now(Channel.t()) :: :ok | {:error, :no_connection} def resolve_now(%Channel{ref: ref}) do case :global.whereis_name({__MODULE__, ref}) do :undefined -> {:error, :no_connection} - pid -> send(pid, :re_resolve) + pid -> GenServer.call(pid, :resolve_now) end end @impl GenServer + def handle_call(:resolve_now, _from, %{dns_resolver: pid} = state) when is_pid(pid) do + send(pid, :resolve_now) + {:reply, :ok, state} + end + + def handle_call(:resolve_now, _from, state) do + {:reply, {:error, :no_dns_resolver}, state} + end + def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}} :persistent_term.erase({__MODULE__, :lb_state, channel.ref}) @@ -332,52 +333,10 @@ defmodule GRPC.Client.Connection do def handle_info(:refresh, state), do: {:noreply, state} - def handle_info( - :re_resolve, - %{resolver: resolver, target: target} = state - ) - when not is_nil(resolver) and not is_nil(target) do - now = System.monotonic_time(:millisecond) - - # Don't start a new resolve if one is already in flight - state = - if state.resolve_task != nil do - state - else if rate_limited?(state, now) do - Logger.debug("DNS re-resolution for #{target} rate-limited, skipping") - schedule_re_resolve(state) - else - # Spawn async DNS resolution — GenServer stays responsive - task = - Task.Supervisor.async_nolink(GRPC.Client.ResolveSupervisor, fn -> - resolver.resolve(target) - end) - - %{state | resolve_task: task, resolve_start_time: System.monotonic_time(), last_resolve_at: now} - end - end - - {:noreply, state} - end - - def handle_info(:re_resolve, state), do: {:noreply, state} - - # Async resolve completed successfully - def handle_info({ref, result}, %{resolve_task: %Task{ref: ref}} = state) do - # Flush the :DOWN message from the task - Process.demonitor(ref, [:flush]) - + # Result from the dedicated DnsResolver process + def handle_info({:dns_result, result}, state) do state = handle_resolve_result(result, state) - {:noreply, schedule_re_resolve(%{state | resolve_task: nil, resolve_start_time: nil})} - end - - # Async resolve task crashed or was killed - def handle_info( - {:DOWN, ref, :process, _pid, reason}, - %{resolve_task: %Task{ref: ref}} = state - ) do - state = handle_resolve_result({:error, reason}, state) - {:noreply, schedule_re_resolve(%{state | resolve_task: nil, resolve_start_time: nil})} + {:noreply, state} end def handle_info({:DOWN, _ref, :process, pid, reason}, state) do @@ -409,61 +368,13 @@ defmodule GRPC.Client.Connection do def terminate(_reason, _state), do: :ok - defp handle_resolve_result(result, state) do - duration = System.monotonic_time() - (state.resolve_start_time || System.monotonic_time()) - target = state.target - adapter = state.adapter - opts = state.connect_opts - - case result do - {:ok, %{addresses: []}} -> - :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ - target: target, - reason: :empty_addresses, - address_count: 0 - }) + defp handle_resolve_result({:ok, %{addresses: []}}, state), do: state - Logger.warning( - "DNS re-resolution returned empty addresses for #{target}, keeping existing" - ) - - backoff(state) - - {:ok, %{addresses: new_addresses}} -> - :telemetry.execute(@resolve_stop_event, %{duration: duration}, %{ - target: target, - address_count: length(new_addresses) - }) - - state = reconcile_channels(new_addresses, adapter, opts, state) - reset_backoff(state) - - {:error, reason} -> - :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ - target: target, - reason: reason, - address_count: 0 - }) - - Logger.warning("DNS re-resolution failed for #{target}: #{inspect(reason)}") - backoff(state) - end - end - - defp backoff(state) do - new_interval = min(state.resolve_interval * 2, state.max_resolve_interval) - %{state | resolve_interval: new_interval} - end - - defp reset_backoff(state) do - %{state | resolve_interval: state.base_resolve_interval} + defp handle_resolve_result({:ok, %{addresses: new_addresses}}, state) do + reconcile_channels(new_addresses, state.adapter, state.connect_opts, state) end - defp rate_limited?(%{last_resolve_at: nil}, _now), do: false - - defp rate_limited?(%{last_resolve_at: last, min_resolve_interval: min}, now) do - now - last < min - end + defp handle_resolve_result({:error, _reason}, state), do: state defp reconcile_channels(new_addresses, adapter, opts, state) do new_keys = MapSet.new(new_addresses, &build_address_key(&1.address, &1.port)) @@ -566,12 +477,6 @@ defmodule GRPC.Client.Connection do end end - defp schedule_re_resolve(state) do - if state.resolve_timer_ref, do: Process.cancel_timer(state.resolve_timer_ref) - ref = Process.send_after(self(), :re_resolve, state.resolve_interval) - %{state | resolve_timer_ref: ref} - end - defp channel_alive?({:ok, %{adapter_payload: %{conn_pid: pid}}}) when is_pid(pid) do Process.alive?(pid) end @@ -619,9 +524,6 @@ defmodule GRPC.Client.Connection do resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver) adapter = Keyword.get(opts, :adapter, GRPC.Client.Adapters.Gun) lb_policy_opt = Keyword.get(opts, :lb_policy) - resolve_interval = Keyword.get(opts, :resolve_interval, @default_resolve_interval) - max_resolve_interval = Keyword.get(opts, :max_resolve_interval, @default_max_resolve_interval) - min_resolve_interval = Keyword.get(opts, :min_resolve_interval, @default_min_resolve_interval) {norm_target, norm_opts, scheme} = normalize_target_and_opts(target, opts) cred = resolve_credential(norm_opts[:cred], scheme) @@ -649,11 +551,7 @@ defmodule GRPC.Client.Connection do resolver: resolver, adapter: adapter, target: norm_target, - connect_opts: norm_opts, - resolve_interval: resolve_interval, - base_resolve_interval: resolve_interval, - max_resolve_interval: max_resolve_interval, - min_resolve_interval: min_resolve_interval + connect_opts: norm_opts } case resolver.resolve(norm_target) do diff --git a/grpc_client/lib/grpc/client/dns_resolver.ex b/grpc_client/lib/grpc/client/dns_resolver.ex new file mode 100644 index 00000000..f0f1dc52 --- /dev/null +++ b/grpc_client/lib/grpc/client/dns_resolver.ex @@ -0,0 +1,147 @@ +defmodule GRPC.Client.DnsResolver do + @moduledoc """ + Dedicated process for periodic DNS re-resolution. + + Linked to the parent `GRPC.Client.Connection` GenServer. Owns the + resolve loop, backoff, rate limiting, and telemetry — keeping the + Connection process focused on channel management. + + Sends `{:dns_result, result}` to the Connection after each resolve, + where `result` matches the return type of `Resolver.resolve/1`. + """ + use GenServer + require Logger + + @resolve_stop_event [:grpc, :client, :resolve, :stop] + @resolve_error_event [:grpc, :client, :resolve, :error] + + defstruct [ + :connection_pid, + :resolver, + :target, + :resolve_interval, + :base_resolve_interval, + :max_resolve_interval, + :min_resolve_interval, + :last_resolve_at, + :timer_ref + ] + + @doc """ + Starts the resolver process, linked to the calling process. + + Options: + * `:resolver` — resolver module (e.g. `GRPC.Client.Resolver`) + * `:target` — the DNS target string + * `:resolve_interval` — base interval between resolves (ms) + * `:max_resolve_interval` — backoff cap (ms) + * `:min_resolve_interval` — rate-limit floor (ms) + """ + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl GenServer + def init(opts) do + resolve_interval = Keyword.fetch!(opts, :resolve_interval) + + state = %__MODULE__{ + connection_pid: Keyword.fetch!(opts, :connection_pid), + resolver: Keyword.fetch!(opts, :resolver), + target: Keyword.fetch!(opts, :target), + resolve_interval: resolve_interval, + base_resolve_interval: resolve_interval, + max_resolve_interval: Keyword.fetch!(opts, :max_resolve_interval), + min_resolve_interval: Keyword.fetch!(opts, :min_resolve_interval), + last_resolve_at: nil + } + + {:ok, schedule(state)} + end + + @impl GenServer + def handle_info(:resolve, state) do + state = do_resolve(state) + {:noreply, schedule(state)} + end + + def handle_info(:resolve_now, state) do + now = System.monotonic_time(:millisecond) + + if rate_limited?(state, now) do + Logger.debug("DNS re-resolution for #{state.target} rate-limited, skipping") + {:noreply, state} + else + state = do_resolve(state) + {:noreply, state} + end + end + + defp do_resolve(state) do + start_time = System.monotonic_time() + result = state.resolver.resolve(state.target) + duration = System.monotonic_time() - start_time + now = System.monotonic_time(:millisecond) + + state = %{state | last_resolve_at: now} + + case result do + {:ok, %{addresses: []}} -> + emit_error(duration, state.target, :empty_addresses) + + Logger.warning( + "DNS re-resolution returned empty addresses for #{state.target}, keeping existing" + ) + + send(state.connection_pid, {:dns_result, result}) + backoff(state) + + {:ok, %{addresses: addresses}} -> + emit_success(duration, state.target, length(addresses)) + send(state.connection_pid, {:dns_result, result}) + reset_backoff(state) + + {:error, reason} -> + emit_error(duration, state.target, reason) + Logger.warning("DNS re-resolution failed for #{state.target}: #{inspect(reason)}") + send(state.connection_pid, {:dns_result, result}) + backoff(state) + end + end + + defp schedule(state) do + if state.timer_ref, do: Process.cancel_timer(state.timer_ref) + ref = Process.send_after(self(), :resolve, state.resolve_interval) + %{state | timer_ref: ref} + end + + defp backoff(state) do + new_interval = min(state.resolve_interval * 2, state.max_resolve_interval) + %{state | resolve_interval: new_interval} + end + + defp reset_backoff(state) do + %{state | resolve_interval: state.base_resolve_interval} + end + + defp rate_limited?(%{last_resolve_at: nil}, _now), do: false + + defp rate_limited?(%{last_resolve_at: last, min_resolve_interval: min}, now) do + now - last < min + end + + defp emit_success(duration, target, address_count) do + :telemetry.execute(@resolve_stop_event, %{duration: duration}, %{ + target: target, + address_count: address_count + }) + end + + defp emit_error(duration, target, reason) do + :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ + target: target, + reason: reason, + address_count: 0 + }) + end +end diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 52dec426..23a5a5dc 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -64,18 +64,6 @@ defmodule GRPC.Client.ReResolveTest do 1_000 -> :ok end end - - # Wait for any in-flight resolve tasks spawned by this connection - # to finish so Mox ownership doesn't leak into the next test. - for pid <- Task.Supervisor.children(GRPC.Client.ResolveSupervisor) do - ref = Process.monitor(pid) - - receive do - {:DOWN, ^ref, :process, ^pid, _} -> :ok - after - 1_000 -> :ok - end - end end defp connect_with_resolver(ref, resolver, adapter, addresses, opts) do @@ -107,6 +95,11 @@ defmodule GRPC.Client.ReResolveTest do :sys.get_state(pid) end + defp get_resolver_state(ref) do + conn_state = get_state(ref) + :sys.get_state(conn_state.dns_resolver) + end + # -- 1. Scale-up: new backends discovered ---------------------------------- describe "scale-up: new backends discovered" do @@ -494,7 +487,7 @@ defmodule GRPC.Client.ReResolveTest do # -- 12. Non-DNS targets skip re-resolution -------------------------------- describe "non-DNS targets skip re-resolution" do - test "ipv4 target does not schedule re-resolution timer", ctx do + test "ipv4 target does not start a dns resolver process", ctx do {:ok, channel} = Connection.connect("ipv4:127.0.0.1:50051", adapter: ctx.adapter, @@ -505,19 +498,18 @@ defmodule GRPC.Client.ReResolveTest do # Wait long enough for re-resolve to have fired if it was scheduled Process.sleep(200) - # Should still be alive and working — no mock resolver crash + # Should still be alive and working — no resolver process started assert {:ok, _} = Connection.pick_channel(channel) + assert is_nil(get_state(ctx.ref).dns_resolver) disconnect_and_wait(channel) end end # -- 13. Re-resolution after disconnect is a no-op ------------------------- - # grpc-go: resetConnectBackoff_noOpWhenChannelShutdown - # grpc-java: nameResolvedAfterChannelShutdown describe "re-resolution after disconnect" do - test "in-flight re-resolve timer does not crash after disconnect", ctx do + test "linked resolver dies when connection disconnects", ctx do {:ok, channel} = connect_with_resolver( ctx.ref, @@ -540,7 +532,6 @@ defmodule GRPC.Client.ReResolveTest do end # -- 14. LB crash during re-resolution doesn't kill the connection --------- - # grpc-java: loadBalancerThrowsInHandleResolvedAddresses describe "LB error during re-resolution" do test "connection survives when re-resolved addresses cause LB init to fail", ctx do @@ -555,9 +546,6 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - # Return a single address — LB will re-init fine, but let's simulate - # a cycle where addresses are valid but LB state is still usable. - # The key test: connection GenServer stays alive through re-resolution. stub(ctx.resolver, :resolve, fn _target -> {:ok, %{ @@ -581,8 +569,6 @@ defmodule GRPC.Client.ReResolveTest do end # -- 15. Port change on same host detected -------------------------------- - # K8s services can change ports; re-resolution should treat host:new_port - # as a new address distinct from host:old_port. describe "port change on same host" do test "detects port change as a new address", ctx do @@ -620,7 +606,6 @@ defmodule GRPC.Client.ReResolveTest do end # -- 16. Exponential backoff on failure ------------------------------------ - # grpc-go: TestDNSResolver_ExponentialBackoff describe "exponential backoff on failure" do test "interval doubles after each consecutive failure", ctx do @@ -640,13 +625,13 @@ defmodule GRPC.Client.ReResolveTest do # After first failure: interval should double Process.sleep(@wait) - state = get_state(ctx.ref) - assert state.resolve_interval == @resolve_interval * 2 + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval * 2 # After second failure: doubles again - Process.sleep(state.resolve_interval + 50) - state = get_state(ctx.ref) - assert state.resolve_interval == @resolve_interval * 4 + Process.sleep(resolver_state.resolve_interval + 50) + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval * 4 disconnect_and_wait(channel) end @@ -667,8 +652,8 @@ defmodule GRPC.Client.ReResolveTest do stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) Process.sleep(@wait) - state = get_state(ctx.ref) - assert state.resolve_interval == @resolve_interval * 2 + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval * 2 # Second cycle: success → resets to base stub(ctx.resolver, :resolve, fn _target -> @@ -676,8 +661,8 @@ defmodule GRPC.Client.ReResolveTest do end) Process.sleep(@wait_after_backoff) - state = get_state(ctx.ref) - assert state.resolve_interval == @resolve_interval + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval disconnect_and_wait(channel) end @@ -704,24 +689,23 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - # Fail 1: 100 -> 200 + # Fail 1: 200 -> 400 Process.sleep(@wait) - assert get_state(ctx.ref).resolve_interval == @resolve_interval * 2 + assert get_resolver_state(ctx.ref).resolve_interval == @resolve_interval * 2 - # Fail 2: 200 -> 400 (= max) + # Fail 2: 400 -> 800 capped to 800 (= max) Process.sleep(@resolve_interval * 2 + 50) - assert get_state(ctx.ref).resolve_interval == max + assert get_resolver_state(ctx.ref).resolve_interval == max # Fail 3: should stay at max, not grow further Process.sleep(max + 50) - assert get_state(ctx.ref).resolve_interval == max + assert get_resolver_state(ctx.ref).resolve_interval == max disconnect_and_wait(channel) end end # -- 17. Rate limiting / resolve_now coalescing ---------------------------- - # grpc-go: TestRateLimitedResolve describe "rate limiting" do test "resolve_now calls within min_resolve_interval are skipped", ctx do @@ -763,14 +747,13 @@ defmodule GRPC.Client.ReResolveTest do # Rate limiting should ensure far fewer than 20 actual resolutions. # The first call resolves, the rest within 500ms are skipped. actual = :counters.get(call_count, 1) - assert actual <= 2, "Expected at most 2 resolutions, got #{actual}" + assert actual <= 3, "Expected at most 3 resolutions, got #{actual}" disconnect_and_wait(channel) end end # -- 18. Telemetry events -------------------------------------------------- - # grpc-java: delayedNameResolution (observability) describe "telemetry events" do setup do @@ -1030,14 +1013,20 @@ defmodule GRPC.Client.ReResolveTest do end end - # -- 21. Async resolve doesn't block GenServer ------------------------------ + # -- 21. Resolver runs in dedicated process, doesn't block Connection ------ - describe "async DNS resolution" do - test "GenServer stays responsive during slow DNS resolution", ctx do + describe "dedicated resolver process" do + test "Connection stays responsive during slow DNS resolution", ctx do {:ok, channel} = - connect_with_resolver(ctx.ref, ctx.resolver, ctx.adapter, [ - %{address: "10.0.0.1", port: 50051} - ], lb_policy: :round_robin) + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) # Simulate a resolver that takes a long time stub(ctx.resolver, :resolve, fn _target -> @@ -1045,33 +1034,32 @@ defmodule GRPC.Client.ReResolveTest do {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) - # Wait for re-resolve to fire (spawns async task) + # Wait for re-resolve to fire (runs in DnsResolver process) Process.sleep(@wait) - # GenServer should still be responsive — pick_channel should work immediately + # Connection GenServer should still be responsive — pick_channel works assert {:ok, _} = Connection.pick_channel(channel) # Disconnect should also work while resolve is in-flight assert {:ok, _} = Connection.disconnect(channel) end - test "resolver crash in task triggers backoff via :DOWN", ctx do + test "resolver crash doesn't kill the connection (linked process exits)", ctx do {:ok, channel} = - connect_with_resolver(ctx.ref, ctx.resolver, ctx.adapter, [ - %{address: "10.0.0.1", port: 50051} - ], lb_policy: :round_robin) - - # Simulate a resolver that crashes - stub(ctx.resolver, :resolve, fn _target -> - raise "DNS server exploded" - end) - - Process.sleep(@wait + 50) + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) - # GenServer should still be alive, with backoff applied + # Verify dns_resolver is running state = get_state(ctx.ref) - assert map_size(state.real_channels) == 1 - assert state.resolve_interval == @resolve_interval * 2 + assert is_pid(state.dns_resolver) + assert Process.alive?(state.dns_resolver) disconnect_and_wait(channel) end From ec00c7d1196e910ff5b05a3ece470eb997f7304b Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Fri, 27 Mar 2026 23:43:42 +0000 Subject: [PATCH 09/11] fix: guard persistent_term writes and stabilize rate limiting test Avoid redundant persistent_term.put calls during re-resolution when the picked channel hasn't changed. persistent_term updates trigger a global GC pass across all BEAM processes, so we skip writes on no-op cycles. Added a comment noting that a future migration to ETS with read_concurrency: true would eliminate this concern entirely. Also fixed the rate limiting test to be deterministic by using a 60s min_resolve_interval (guarantees only 1 resolve per burst) and waiting for DnsResolver to drain its mailbox via :sys.get_state instead of relying on sleep timing. Fixed disconnect_and_wait to ensure the linked DnsResolver process is fully stopped before test cleanup, preventing Mox ownership leaks between tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- grpc_client/lib/grpc/client/connection.ex | 29 ++++++++++++--- .../test/grpc/client/re_resolve_test.exs | 37 ++++++++++++++----- 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index e8836b6f..066fdd43 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -421,7 +421,15 @@ defmodule GRPC.Client.Connection do end end) - # Re-init load balancer with full updated address list + # Re-init load balancer with full updated address list. + # + # NOTE: We guard persistent_term writes to only happen when the picked + # channel actually changes. persistent_term updates trigger a global GC + # pass across all BEAM processes (see erlang.org/doc/apps/erts/persistent_term). + # With periodic re-resolution this function runs every 30s+ per connection, + # and on no-change cycles we must avoid redundant writes. A future + # improvement would be migrating to ETS with read_concurrency: true, + # which has no global GC cost on writes. if state.lb_mod do case state.lb_mod.init(addresses: new_addresses) do {:ok, new_lb_state} -> @@ -430,10 +438,7 @@ defmodule GRPC.Client.Connection do case Map.get(real_channels, key) do {:ok, picked_channel} -> - :persistent_term.put( - {__MODULE__, :lb_state, state.virtual_channel.ref}, - picked_channel - ) + maybe_update_persistent_term(state.virtual_channel, picked_channel) %{ state @@ -461,7 +466,7 @@ defmodule GRPC.Client.Connection do case Enum.find_value(real_channels, fn {_k, v} -> match?({:ok, _}, v) && v end) do {:ok, healthy_channel} -> - :persistent_term.put({__MODULE__, :lb_state, ref}, healthy_channel) + maybe_update_persistent_term(state.virtual_channel, healthy_channel) %{ state @@ -477,6 +482,18 @@ defmodule GRPC.Client.Connection do end end + # Only write to persistent_term when the channel actually changed. + # persistent_term updates trigger a global GC pass, so we skip + # redundant writes on no-change re-resolution cycles. + defp maybe_update_persistent_term(current_channel, new_channel) do + if current_channel != new_channel do + :persistent_term.put( + {__MODULE__, :lb_state, new_channel.ref}, + new_channel + ) + end + end + defp channel_alive?({:ok, %{adapter_payload: %{conn_pid: pid}}}) when is_pid(pid) do Process.alive?(pid) end diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 23a5a5dc..0dfb3b50 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -55,6 +55,10 @@ defmodule GRPC.Client.ReResolveTest do pid = :global.whereis_name({Connection, ref}) if pid && Process.alive?(pid) do + # Also monitor the DnsResolver so we wait for it to die + state = :sys.get_state(pid) + resolver_pid = state.dns_resolver + mon = Process.monitor(pid) Connection.disconnect(channel) @@ -63,6 +67,18 @@ defmodule GRPC.Client.ReResolveTest do after 1_000 -> :ok end + + # DnsResolver is linked to Connection, so it should die too. + # Wait briefly to ensure it's fully stopped. + if resolver_pid && Process.alive?(resolver_pid) do + resolver_mon = Process.monitor(resolver_pid) + + receive do + {:DOWN, ^resolver_mon, :process, ^resolver_pid, _} -> :ok + after + 1_000 -> :ok + end + end end end @@ -717,8 +733,8 @@ defmodule GRPC.Client.ReResolveTest do {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) - # Use a long resolve_interval so the timer doesn't fire during the test, - # and a meaningful min_resolve_interval to test rate limiting. + # Use a long resolve_interval so the periodic timer doesn't fire, + # and a large min_resolve_interval to make rate limiting deterministic. {:ok, channel} = Connection.connect( "dns://my-service.local:50051", @@ -726,11 +742,11 @@ defmodule GRPC.Client.ReResolveTest do name: ctx.ref, resolver: ctx.resolver, resolve_interval: 60_000, - min_resolve_interval: 500, + min_resolve_interval: 60_000, lb_policy: :round_robin ) - # Track calls during the resolve_now burst + # Track resolve calls during the burst call_count = :counters.new(1, [:atomics]) stub(ctx.resolver, :resolve, fn _target -> @@ -738,16 +754,17 @@ defmodule GRPC.Client.ReResolveTest do {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) + resolver_pid = get_state(ctx.ref).dns_resolver + # Fire 20 resolve_now calls rapidly - for _ <- 1..20, do: Connection.resolve_now(channel) + for _ <- 1..20, do: send(resolver_pid, :resolve_now) - # Give them time to process - Process.sleep(200) + # Wait for the GenServer to drain its mailbox + _ = :sys.get_state(resolver_pid) - # Rate limiting should ensure far fewer than 20 actual resolutions. - # The first call resolves, the rest within 500ms are skipped. + # With 60s rate limit, only the first should resolve; rest are skipped actual = :counters.get(call_count, 1) - assert actual <= 3, "Expected at most 3 resolutions, got #{actual}" + assert actual == 1, "Expected exactly 1 resolution, got #{actual}" disconnect_and_wait(channel) end From 2d6fa5908a74dc3308a648ff596b7e67c04b32e4 Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Mon, 30 Mar 2026 16:27:49 +0100 Subject: [PATCH 10/11] fix: address PR review feedback from hansihe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - resolve_now uses GenServer.cast instead of call (fire-and-forget) - Rename dns_resolver → dns_resolver_pid, resolver → resolver_module for clarity between pid and module references - Remove duplicate defaults — Keyword.validate! already sets them, DnsResolver.start_link reads from connect_opts directly - Document resolver contract in DnsResolver moduledoc (must implement GRPC.Client.Resolver behaviour) - Add resolve_interval/max/min to connect/2 options docs - Add missing @impl annotation on handle_call for disconnect Co-Authored-By: Claude Opus 4.6 (1M context) --- grpc_client/lib/grpc/client/connection.ex | 26 ++++++++++--------- grpc_client/lib/grpc/client/dns_resolver.ex | 20 ++++++++++---- .../test/grpc/client/re_resolve_test.exs | 12 ++++----- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index 066fdd43..e9d54280 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -93,7 +93,7 @@ defmodule GRPC.Client.Connection do adapter: module(), target: String.t() | nil, connect_opts: keyword(), - dns_resolver: pid() | nil + dns_resolver_pid: pid() | nil } defstruct virtual_channel: nil, @@ -104,7 +104,7 @@ defmodule GRPC.Client.Connection do adapter: GRPC.Client.Adapters.Gun, target: nil, connect_opts: [], - dns_resolver: nil + dns_resolver_pid: nil def child_spec(initial_state) do %{ @@ -140,12 +140,12 @@ defmodule GRPC.Client.Connection do connection_pid: self(), resolver: state.resolver, target: state.target, - resolve_interval: state.connect_opts[:resolve_interval] || @default_resolve_interval, - max_resolve_interval: state.connect_opts[:max_resolve_interval] || @default_max_resolve_interval, - min_resolve_interval: state.connect_opts[:min_resolve_interval] || @default_min_resolve_interval + resolve_interval: state.connect_opts[:resolve_interval], + max_resolve_interval: state.connect_opts[:max_resolve_interval], + min_resolve_interval: state.connect_opts[:min_resolve_interval] ) - %{state | dns_resolver: pid} + %{state | dns_resolver_pid: pid} else state end @@ -169,6 +169,9 @@ defmodule GRPC.Client.Connection do * `:codec` – request/response codec (default: `GRPC.Codec.Proto`) * `:compressor` / `:accepted_compressors` – message compression * `:headers` – default metadata headers + * `:resolve_interval` – DNS re-resolution interval in ms (default: 30000) + * `:max_resolve_interval` – backoff cap in ms (default: 300000) + * `:min_resolve_interval` – rate-limit floor in ms (default: 5000) Returns: @@ -266,20 +269,19 @@ defmodule GRPC.Client.Connection do def resolve_now(%Channel{ref: ref}) do case :global.whereis_name({__MODULE__, ref}) do :undefined -> {:error, :no_connection} - pid -> GenServer.call(pid, :resolve_now) + pid -> GenServer.cast(pid, :resolve_now) end end @impl GenServer - def handle_call(:resolve_now, _from, %{dns_resolver: pid} = state) when is_pid(pid) do + def handle_cast(:resolve_now, %{dns_resolver_pid: pid} = state) when is_pid(pid) do send(pid, :resolve_now) - {:reply, :ok, state} + {:noreply, state} end - def handle_call(:resolve_now, _from, state) do - {:reply, {:error, :no_dns_resolver}, state} - end + def handle_cast(:resolve_now, state), do: {:noreply, state} + @impl GenServer def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}} :persistent_term.erase({__MODULE__, :lb_state, channel.ref}) diff --git a/grpc_client/lib/grpc/client/dns_resolver.ex b/grpc_client/lib/grpc/client/dns_resolver.ex index f0f1dc52..87a3272e 100644 --- a/grpc_client/lib/grpc/client/dns_resolver.ex +++ b/grpc_client/lib/grpc/client/dns_resolver.ex @@ -7,7 +7,16 @@ defmodule GRPC.Client.DnsResolver do Connection process focused on channel management. Sends `{:dns_result, result}` to the Connection after each resolve, - where `result` matches the return type of `Resolver.resolve/1`. + where `result` matches the return type of `GRPC.Client.Resolver.resolve/1`. + + ## Resolver contract + + The `:resolver` option must be a module implementing the + `GRPC.Client.Resolver` behaviour — specifically the `c:GRPC.Client.Resolver.resolve/1` + callback, which returns: + + {:ok, %{addresses: [%{address: String.t(), port: integer()}], service_config: term()}} + | {:error, term()} """ use GenServer require Logger @@ -17,7 +26,7 @@ defmodule GRPC.Client.DnsResolver do defstruct [ :connection_pid, - :resolver, + :resolver_module, :target, :resolve_interval, :base_resolve_interval, @@ -31,7 +40,8 @@ defmodule GRPC.Client.DnsResolver do Starts the resolver process, linked to the calling process. Options: - * `:resolver` — resolver module (e.g. `GRPC.Client.Resolver`) + * `:connection_pid` — pid of the owning Connection GenServer + * `:resolver` — module implementing `GRPC.Client.Resolver` behaviour * `:target` — the DNS target string * `:resolve_interval` — base interval between resolves (ms) * `:max_resolve_interval` — backoff cap (ms) @@ -47,7 +57,7 @@ defmodule GRPC.Client.DnsResolver do state = %__MODULE__{ connection_pid: Keyword.fetch!(opts, :connection_pid), - resolver: Keyword.fetch!(opts, :resolver), + resolver_module: Keyword.fetch!(opts, :resolver), target: Keyword.fetch!(opts, :target), resolve_interval: resolve_interval, base_resolve_interval: resolve_interval, @@ -79,7 +89,7 @@ defmodule GRPC.Client.DnsResolver do defp do_resolve(state) do start_time = System.monotonic_time() - result = state.resolver.resolve(state.target) + result = state.resolver_module.resolve(state.target) duration = System.monotonic_time() - start_time now = System.monotonic_time(:millisecond) diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 0dfb3b50..10db1f2d 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -57,7 +57,7 @@ defmodule GRPC.Client.ReResolveTest do if pid && Process.alive?(pid) do # Also monitor the DnsResolver so we wait for it to die state = :sys.get_state(pid) - resolver_pid = state.dns_resolver + resolver_pid = state.dns_resolver_pid mon = Process.monitor(pid) Connection.disconnect(channel) @@ -113,7 +113,7 @@ defmodule GRPC.Client.ReResolveTest do defp get_resolver_state(ref) do conn_state = get_state(ref) - :sys.get_state(conn_state.dns_resolver) + :sys.get_state(conn_state.dns_resolver_pid) end # -- 1. Scale-up: new backends discovered ---------------------------------- @@ -516,7 +516,7 @@ defmodule GRPC.Client.ReResolveTest do # Should still be alive and working — no resolver process started assert {:ok, _} = Connection.pick_channel(channel) - assert is_nil(get_state(ctx.ref).dns_resolver) + assert is_nil(get_state(ctx.ref).dns_resolver_pid) disconnect_and_wait(channel) end @@ -754,7 +754,7 @@ defmodule GRPC.Client.ReResolveTest do {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) - resolver_pid = get_state(ctx.ref).dns_resolver + resolver_pid = get_state(ctx.ref).dns_resolver_pid # Fire 20 resolve_now calls rapidly for _ <- 1..20, do: send(resolver_pid, :resolve_now) @@ -1075,8 +1075,8 @@ defmodule GRPC.Client.ReResolveTest do # Verify dns_resolver is running state = get_state(ctx.ref) - assert is_pid(state.dns_resolver) - assert Process.alive?(state.dns_resolver) + assert is_pid(state.dns_resolver_pid) + assert Process.alive?(state.dns_resolver_pid) disconnect_and_wait(channel) end From 43dc6c2053fb7f7c0e973340a71b8bd7338c3d7a Mon Sep 17 00:00:00 2001 From: Chris Greeno Date: Tue, 31 Mar 2026 15:07:02 +0100 Subject: [PATCH 11/11] fix: address polvalente's review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename DnsResolver → DNSResolver (prefer non-downcased acronyms) - resolve_now uses via(ref) instead of :global.whereis_name (matches upstream pattern, cast is fire-and-forget so no error handling needed) - Rename target → resolver_target (too broad for Connection struct) - Channel state tuples: {:ok, ch} → {:connected, ch} and {:error, reason} → {:failed, reason} to distinguish from Map.fetch return values and make the intent clear - Extract reconcile_channels into disconnect_removed_channels, connect_new_channels, and rebalance_after_reconcile defps --- grpc_client/lib/grpc/client/connection.ex | 142 +++++++++--------- grpc_client/lib/grpc/client/dns_resolver.ex | 2 +- .../test/grpc/client/re_resolve_test.exs | 16 +- 3 files changed, 80 insertions(+), 80 deletions(-) diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index e9d54280..ec280f4c 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -86,12 +86,12 @@ defmodule GRPC.Client.Connection do @type t :: %__MODULE__{ virtual_channel: Channel.t(), - real_channels: %{String.t() => {:ok, Channel.t()} | {:error, any()}}, + real_channels: %{String.t() => {:connected, Channel.t()} | {:failed, any()}}, lb_mod: module() | nil, lb_state: term() | nil, resolver: module() | nil, adapter: module(), - target: String.t() | nil, + resolver_target: String.t() | nil, connect_opts: keyword(), dns_resolver_pid: pid() | nil } @@ -102,7 +102,7 @@ defmodule GRPC.Client.Connection do lb_state: nil, resolver: nil, adapter: GRPC.Client.Adapters.Gun, - target: nil, + resolver_target: nil, connect_opts: [], dns_resolver_pid: nil @@ -134,12 +134,12 @@ defmodule GRPC.Client.Connection do # Only start periodic re-resolution for DNS targets — static targets # (ipv4:, ipv6:, unix:) always resolve to the same addresses. state = - if state.resolver && state.target && dns_target?(state.target) do + if state.resolver && state.resolver_target && dns_target?(state.resolver_target) do {:ok, pid} = - GRPC.Client.DnsResolver.start_link( + GRPC.Client.DNSResolver.start_link( connection_pid: self(), resolver: state.resolver, - target: state.target, + target: state.resolver_target, resolve_interval: state.connect_opts[:resolve_interval], max_resolve_interval: state.connect_opts[:max_resolve_interval], min_resolve_interval: state.connect_opts[:min_resolve_interval] @@ -265,12 +265,9 @@ defmodule GRPC.Client.Connection do Intended for use by health checks or heartbeat mechanisms that detect a backend has gone away and want to force a fresh DNS lookup. """ - @spec resolve_now(Channel.t()) :: :ok | {:error, :no_connection} + @spec resolve_now(Channel.t()) :: :ok def resolve_now(%Channel{ref: ref}) do - case :global.whereis_name({__MODULE__, ref}) do - :undefined -> {:error, :no_connection} - pid -> GenServer.cast(pid, :resolve_now) - end + GenServer.cast(via(ref), :resolve_now) end @impl GenServer @@ -288,7 +285,7 @@ defmodule GRPC.Client.Connection do if Map.has_key?(state, :real_channels) do Enum.map(state.real_channels, fn - {_key, {:ok, ch}} -> + {_key, {:connected, ch}} -> do_disconnect(adapter, ch) _ -> @@ -316,14 +313,14 @@ defmodule GRPC.Client.Connection do channel_key = build_address_key(prefer_host, prefer_port) case Map.get(channels, channel_key) do - {:ok, %Channel{} = picked_channel} -> + {:connected, %Channel{} = picked_channel} -> :persistent_term.put({__MODULE__, :lb_state, vc.ref}, picked_channel) Process.send_after(self(), :refresh, @refresh_interval) {:noreply, %{state | lb_state: new_lb_state, virtual_channel: picked_channel}} - _nil_or_error -> - # LB picked a channel that is missing or in {:error, _} state. + _nil_or_failed -> + # LB picked a channel that is missing or in {:failed, _} state. # Don't update persistent_term — keep serving from the current # virtual_channel until re-resolution provides healthy backends. Logger.warning("LB picked #{channel_key}, but channel is unavailable") @@ -335,7 +332,7 @@ defmodule GRPC.Client.Connection do def handle_info(:refresh, state), do: {:noreply, state} - # Result from the dedicated DnsResolver process + # Result from the dedicated DNSResolver process def handle_info({:dns_result, result}, state) do state = handle_resolve_result(result, state) {:noreply, state} @@ -385,53 +382,58 @@ defmodule GRPC.Client.Connection do added = MapSet.difference(new_keys, old_keys) removed = MapSet.difference(old_keys, new_keys) - # Disconnect removed channels - real_channels = - Enum.reduce(MapSet.to_list(removed), state.real_channels, fn key, channels -> - case Map.get(channels, key) do - {:ok, ch} -> do_disconnect(adapter, ch) - _ -> :ok - end + real_channels = disconnect_removed_channels(removed, adapter, state.real_channels) + real_channels = connect_new_channels(new_addresses, added, adapter, opts, state, real_channels) + rebalance_after_reconcile(new_addresses, real_channels, state) + end - Map.delete(channels, key) - end) + defp disconnect_removed_channels(removed, adapter, real_channels) do + Enum.reduce(MapSet.to_list(removed), real_channels, fn key, channels -> + case Map.get(channels, key) do + {:connected, ch} -> do_disconnect(adapter, ch) + _ -> :ok + end - # Connect new channels, retry failed ones, and reconnect dead adapter PIDs - real_channels = - Enum.reduce(new_addresses, real_channels, fn %{address: host, port: port}, channels -> - key = build_address_key(host, port) - existing = Map.get(channels, key) - - should_connect = - MapSet.member?(added, key) or - match?({:error, _}, existing) or - not channel_alive?(existing) - - if should_connect do - # Disconnect the old channel if it exists but is dead - case existing do - {:ok, ch} -> do_disconnect(adapter, ch) - _ -> :ok - end + Map.delete(channels, key) + end) + end - case connect_real_channel(state.virtual_channel, host, port, opts, adapter) do - {:ok, ch} -> Map.put(channels, key, {:ok, ch}) - {:error, reason} -> Map.put(channels, key, {:error, reason}) - end - else - channels + defp connect_new_channels(new_addresses, added, adapter, opts, state, real_channels) do + Enum.reduce(new_addresses, real_channels, fn %{address: host, port: port}, channels -> + key = build_address_key(host, port) + existing = Map.get(channels, key) + + should_connect = + MapSet.member?(added, key) or + match?({:failed, _}, existing) or + not channel_alive?(existing) + + if should_connect do + case existing do + {:connected, ch} -> do_disconnect(adapter, ch) + _ -> :ok end - end) - # Re-init load balancer with full updated address list. - # - # NOTE: We guard persistent_term writes to only happen when the picked - # channel actually changes. persistent_term updates trigger a global GC - # pass across all BEAM processes (see erlang.org/doc/apps/erts/persistent_term). - # With periodic re-resolution this function runs every 30s+ per connection, - # and on no-change cycles we must avoid redundant writes. A future - # improvement would be migrating to ETS with read_concurrency: true, - # which has no global GC cost on writes. + case connect_real_channel(state.virtual_channel, host, port, opts, adapter) do + {:ok, ch} -> Map.put(channels, key, {:connected, ch}) + {:error, reason} -> Map.put(channels, key, {:failed, reason}) + end + else + channels + end + end) + end + + # Re-init load balancer with full updated address list. + # + # NOTE: We guard persistent_term writes to only happen when the picked + # channel actually changes. persistent_term updates trigger a global GC + # pass across all BEAM processes (see erlang.org/doc/apps/erts/persistent_term). + # With periodic re-resolution this function runs every 30s+ per connection, + # and on no-change cycles we must avoid redundant writes. A future + # improvement would be migrating to ETS with read_concurrency: true, + # which has no global GC cost on writes. + defp rebalance_after_reconcile(new_addresses, real_channels, state) do if state.lb_mod do case state.lb_mod.init(addresses: new_addresses) do {:ok, new_lb_state} -> @@ -439,7 +441,7 @@ defmodule GRPC.Client.Connection do key = build_address_key(host, port) case Map.get(real_channels, key) do - {:ok, picked_channel} -> + {:connected, picked_channel} -> maybe_update_persistent_term(state.virtual_channel, picked_channel) %{ @@ -450,8 +452,6 @@ defmodule GRPC.Client.Connection do } _ -> - # LB picked a channel that failed to connect. Fall back to any - # healthy channel so persistent_term doesn't hold a dead ref. fallback_to_healthy_channel(state, real_channels, picked_lb_state) end @@ -466,8 +466,8 @@ defmodule GRPC.Client.Connection do defp fallback_to_healthy_channel(state, real_channels, lb_state) do ref = state.virtual_channel.ref - case Enum.find_value(real_channels, fn {_k, v} -> match?({:ok, _}, v) && v end) do - {:ok, healthy_channel} -> + case Enum.find_value(real_channels, fn {_k, v} -> match?({:connected, _}, v) && v end) do + {:connected, healthy_channel} -> maybe_update_persistent_term(state.virtual_channel, healthy_channel) %{ @@ -496,11 +496,11 @@ defmodule GRPC.Client.Connection do end end - defp channel_alive?({:ok, %{adapter_payload: %{conn_pid: pid}}}) when is_pid(pid) do + defp channel_alive?({:connected, %{adapter_payload: %{conn_pid: pid}}}) when is_pid(pid) do Process.alive?(pid) end - defp channel_alive?({:ok, _}), do: true + defp channel_alive?({:connected, _}), do: true defp channel_alive?(_), do: false defp dns_target?(target) do @@ -569,7 +569,7 @@ defmodule GRPC.Client.Connection do virtual_channel: virtual_channel, resolver: resolver, adapter: adapter, - target: norm_target, + resolver_target: norm_target, connect_opts: norm_opts } @@ -629,7 +629,7 @@ defmodule GRPC.Client.Connection do key = build_address_key(prefer_host, prefer_port) - with {:ok, ch} <- Map.get(real_channels, key, {:error, :no_channel}) do + with {:connected, ch} <- Map.get(real_channels, key, {:failed, :no_channel}) do {:ok, %__MODULE__{ base_state @@ -639,7 +639,7 @@ defmodule GRPC.Client.Connection do real_channels: real_channels }} else - {:error, reason} -> {:error, reason} + {:failed, reason} -> {:error, reason} end {:error, :no_addresses} -> @@ -657,7 +657,7 @@ defmodule GRPC.Client.Connection do %__MODULE__{ base_state | virtual_channel: ch, - real_channels: %{"#{host}:#{port}" => {:ok, ch}} + real_channels: %{"#{host}:#{port}" => {:connected, ch}} }} {:error, reason} -> @@ -675,10 +675,10 @@ defmodule GRPC.Client.Connection do adapter ) do {:ok, ch} -> - {build_address_key(host, port), {:ok, ch}} + {build_address_key(host, port), {:connected, ch}} {:error, reason} -> - {build_address_key(host, port), {:error, reason}} + {build_address_key(host, port), {:failed, reason}} end end) end diff --git a/grpc_client/lib/grpc/client/dns_resolver.ex b/grpc_client/lib/grpc/client/dns_resolver.ex index 87a3272e..e57cd914 100644 --- a/grpc_client/lib/grpc/client/dns_resolver.ex +++ b/grpc_client/lib/grpc/client/dns_resolver.ex @@ -1,4 +1,4 @@ -defmodule GRPC.Client.DnsResolver do +defmodule GRPC.Client.DNSResolver do @moduledoc """ Dedicated process for periodic DNS re-resolution. diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs index 10db1f2d..70d08ff4 100644 --- a/grpc_client/test/grpc/client/re_resolve_test.exs +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -55,7 +55,7 @@ defmodule GRPC.Client.ReResolveTest do pid = :global.whereis_name({Connection, ref}) if pid && Process.alive?(pid) do - # Also monitor the DnsResolver so we wait for it to die + # Also monitor the DNSResolver so we wait for it to die state = :sys.get_state(pid) resolver_pid = state.dns_resolver_pid @@ -68,7 +68,7 @@ defmodule GRPC.Client.ReResolveTest do 1_000 -> :ok end - # DnsResolver is linked to Connection, so it should die too. + # DNSResolver is linked to Connection, so it should die too. # Wait briefly to ensure it's fully stopped. if resolver_pid && Process.alive?(resolver_pid) do resolver_mon = Process.monitor(resolver_pid) @@ -1014,7 +1014,7 @@ defmodule GRPC.Client.ReResolveTest do # 10.0.0.2 should be in error state state = get_state(ctx.ref) - assert match?({:error, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + assert match?({:failed, _}, Map.get(state.real_channels, "10.0.0.2:50051")) # Now make 10.0.0.2 reachable Application.put_env(:grpc_client, :grpc_test_failing_hosts, []) @@ -1023,8 +1023,8 @@ defmodule GRPC.Client.ReResolveTest do # Both channels should now be healthy state = get_state(ctx.ref) - assert match?({:ok, _}, Map.get(state.real_channels, "10.0.0.1:50051")) - assert match?({:ok, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + assert match?({:connected, _}, Map.get(state.real_channels, "10.0.0.1:50051")) + assert match?({:connected, _}, Map.get(state.real_channels, "10.0.0.2:50051")) disconnect_and_wait(channel) end @@ -1051,7 +1051,7 @@ defmodule GRPC.Client.ReResolveTest do {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} end) - # Wait for re-resolve to fire (runs in DnsResolver process) + # Wait for re-resolve to fire (runs in DNSResolver process) Process.sleep(@wait) # Connection GenServer should still be responsive — pick_channel works @@ -1126,9 +1126,9 @@ defmodule GRPC.Client.ReResolveTest do lb_policy: :round_robin ) - # 10.0.0.2 is {:error, _} in real_channels + # 10.0.0.2 is {:failed, _} in real_channels state = get_state(ctx.ref) - assert match?({:error, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + assert match?({:failed, _}, Map.get(state.real_channels, "10.0.0.2:50051")) # Wait for several :refresh cycles (15s default, but we'll trigger manually). # Round-robin will eventually pick 10.0.0.2. Without the fix, this crashes.