diff --git a/grpc_client/lib/grpc/client/connection.ex b/grpc_client/lib/grpc/client/connection.ex index 2cbb537d..ec280f4c 100644 --- a/grpc_client/lib/grpc/client/connection.ex +++ b/grpc_client/lib/grpc/client/connection.ex @@ -80,14 +80,20 @@ defmodule GRPC.Client.Connection do @insecure_scheme "http" @secure_scheme "https" @refresh_interval 15_000 + @default_resolve_interval 30_000 + @default_max_resolve_interval 300_000 + @default_min_resolve_interval 5_000 @type t :: %__MODULE__{ virtual_channel: Channel.t(), - real_channels: %{String.t() => {:ok, Channel.t()} | {:error, any()}}, + real_channels: %{String.t() => {:connected, Channel.t()} | {:failed, any()}}, lb_mod: module() | nil, lb_state: term() | nil, resolver: module() | nil, - adapter: module() + adapter: module(), + resolver_target: String.t() | nil, + connect_opts: keyword(), + dns_resolver_pid: pid() | nil } defstruct virtual_channel: nil, @@ -95,7 +101,10 @@ defmodule GRPC.Client.Connection do lb_mod: nil, lb_state: nil, resolver: nil, - adapter: GRPC.Client.Adapters.Gun + adapter: GRPC.Client.Adapters.Gun, + resolver_target: nil, + connect_opts: [], + dns_resolver_pid: nil def child_spec(initial_state) do %{ @@ -121,6 +130,26 @@ defmodule GRPC.Client.Connection do ) Process.send_after(self(), :refresh, @refresh_interval) + + # Only start periodic re-resolution for DNS targets — static targets + # (ipv4:, ipv6:, unix:) always resolve to the same addresses. + state = + if state.resolver && state.resolver_target && dns_target?(state.resolver_target) do + {:ok, pid} = + GRPC.Client.DNSResolver.start_link( + connection_pid: self(), + resolver: state.resolver, + target: state.resolver_target, + resolve_interval: state.connect_opts[:resolve_interval], + max_resolve_interval: state.connect_opts[:max_resolve_interval], + min_resolve_interval: state.connect_opts[:min_resolve_interval] + ) + + %{state | dns_resolver_pid: pid} + else + state + end + {:ok, state} end @@ -140,6 +169,9 @@ defmodule GRPC.Client.Connection do * `:codec` – request/response codec (default: `GRPC.Codec.Proto`) * `:compressor` / `:accepted_compressors` – message compression * `:headers` – default metadata headers + * `:resolve_interval` – DNS re-resolution interval in ms (default: 30000) + * `:max_resolve_interval` – backoff cap in ms (default: 300000) + * `:min_resolve_interval` – rate-limit floor in ms (default: 5000) Returns: @@ -227,6 +259,25 @@ defmodule GRPC.Client.Connection do end end + @doc """ + Triggers an immediate DNS re-resolution, subject to rate limiting. + + Intended for use by health checks or heartbeat mechanisms that detect + a backend has gone away and want to force a fresh DNS lookup. + """ + @spec resolve_now(Channel.t()) :: :ok + def resolve_now(%Channel{ref: ref}) do + GenServer.cast(via(ref), :resolve_now) + end + + @impl GenServer + def handle_cast(:resolve_now, %{dns_resolver_pid: pid} = state) when is_pid(pid) do + send(pid, :resolve_now) + {:noreply, state} + end + + def handle_cast(:resolve_now, state), do: {:noreply, state} + @impl GenServer def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}} @@ -234,7 +285,7 @@ defmodule GRPC.Client.Connection do if Map.has_key?(state, :real_channels) do Enum.map(state.real_channels, fn - {_key, {:ok, ch}} -> + {_key, {:connected, ch}} -> do_disconnect(adapter, ch) _ -> @@ -262,23 +313,31 @@ defmodule GRPC.Client.Connection do channel_key = build_address_key(prefer_host, prefer_port) case Map.get(channels, channel_key) do - nil -> - Logger.warning("LB picked #{channel_key}, but no channel found in pool") + {:connected, %Channel{} = picked_channel} -> + :persistent_term.put({__MODULE__, :lb_state, vc.ref}, picked_channel) Process.send_after(self(), :refresh, @refresh_interval) - {:noreply, %{state | lb_state: new_lb_state}} + {:noreply, %{state | lb_state: new_lb_state, virtual_channel: picked_channel}} - {:ok, %Channel{} = picked_channel} -> - :persistent_term.put({__MODULE__, :lb_state, vc.ref}, picked_channel) + _nil_or_failed -> + # LB picked a channel that is missing or in {:failed, _} state. + # Don't update persistent_term — keep serving from the current + # virtual_channel until re-resolution provides healthy backends. + Logger.warning("LB picked #{channel_key}, but channel is unavailable") Process.send_after(self(), :refresh, @refresh_interval) - - {:noreply, %{state | lb_state: new_lb_state, virtual_channel: picked_channel}} + {:noreply, %{state | lb_state: new_lb_state}} end end def handle_info(:refresh, state), do: {:noreply, state} + # Result from the dedicated DNSResolver process + def handle_info({:dns_result, result}, state) do + state = handle_resolve_result(result, state) + {:noreply, state} + end + def handle_info({:DOWN, _ref, :process, pid, reason}, state) do Logger.warning( "#{inspect(__MODULE__)} received :DOWN from #{inspect(pid)} with reason: #{inspect(reason)}" @@ -308,6 +367,146 @@ defmodule GRPC.Client.Connection do def terminate(_reason, _state), do: :ok + defp handle_resolve_result({:ok, %{addresses: []}}, state), do: state + + defp handle_resolve_result({:ok, %{addresses: new_addresses}}, state) do + reconcile_channels(new_addresses, state.adapter, state.connect_opts, state) + end + + defp handle_resolve_result({:error, _reason}, state), do: state + + defp reconcile_channels(new_addresses, adapter, opts, state) do + new_keys = MapSet.new(new_addresses, &build_address_key(&1.address, &1.port)) + old_keys = MapSet.new(Map.keys(state.real_channels)) + + added = MapSet.difference(new_keys, old_keys) + removed = MapSet.difference(old_keys, new_keys) + + real_channels = disconnect_removed_channels(removed, adapter, state.real_channels) + real_channels = connect_new_channels(new_addresses, added, adapter, opts, state, real_channels) + rebalance_after_reconcile(new_addresses, real_channels, state) + end + + defp disconnect_removed_channels(removed, adapter, real_channels) do + Enum.reduce(MapSet.to_list(removed), real_channels, fn key, channels -> + case Map.get(channels, key) do + {:connected, ch} -> do_disconnect(adapter, ch) + _ -> :ok + end + + Map.delete(channels, key) + end) + end + + defp connect_new_channels(new_addresses, added, adapter, opts, state, real_channels) do + Enum.reduce(new_addresses, real_channels, fn %{address: host, port: port}, channels -> + key = build_address_key(host, port) + existing = Map.get(channels, key) + + should_connect = + MapSet.member?(added, key) or + match?({:failed, _}, existing) or + not channel_alive?(existing) + + if should_connect do + case existing do + {:connected, ch} -> do_disconnect(adapter, ch) + _ -> :ok + end + + case connect_real_channel(state.virtual_channel, host, port, opts, adapter) do + {:ok, ch} -> Map.put(channels, key, {:connected, ch}) + {:error, reason} -> Map.put(channels, key, {:failed, reason}) + end + else + channels + end + end) + end + + # Re-init load balancer with full updated address list. + # + # NOTE: We guard persistent_term writes to only happen when the picked + # channel actually changes. persistent_term updates trigger a global GC + # pass across all BEAM processes (see erlang.org/doc/apps/erts/persistent_term). + # With periodic re-resolution this function runs every 30s+ per connection, + # and on no-change cycles we must avoid redundant writes. A future + # improvement would be migrating to ETS with read_concurrency: true, + # which has no global GC cost on writes. + defp rebalance_after_reconcile(new_addresses, real_channels, state) do + if state.lb_mod do + case state.lb_mod.init(addresses: new_addresses) do + {:ok, new_lb_state} -> + {:ok, {host, port}, picked_lb_state} = state.lb_mod.pick(new_lb_state) + key = build_address_key(host, port) + + case Map.get(real_channels, key) do + {:connected, picked_channel} -> + maybe_update_persistent_term(state.virtual_channel, picked_channel) + + %{ + state + | real_channels: real_channels, + lb_state: picked_lb_state, + virtual_channel: picked_channel + } + + _ -> + fallback_to_healthy_channel(state, real_channels, picked_lb_state) + end + + {:error, _} -> + fallback_to_healthy_channel(state, real_channels, state.lb_state) + end + else + fallback_to_healthy_channel(state, real_channels, state.lb_state) + end + end + + defp fallback_to_healthy_channel(state, real_channels, lb_state) do + ref = state.virtual_channel.ref + + case Enum.find_value(real_channels, fn {_k, v} -> match?({:connected, _}, v) && v end) do + {:connected, healthy_channel} -> + maybe_update_persistent_term(state.virtual_channel, healthy_channel) + + %{ + state + | real_channels: real_channels, + lb_state: lb_state, + virtual_channel: healthy_channel + } + + nil -> + Logger.warning("No healthy channels available after re-resolution") + :persistent_term.erase({__MODULE__, :lb_state, ref}) + %{state | real_channels: real_channels, lb_state: lb_state} + end + end + + # Only write to persistent_term when the channel actually changed. + # persistent_term updates trigger a global GC pass, so we skip + # redundant writes on no-change re-resolution cycles. + defp maybe_update_persistent_term(current_channel, new_channel) do + if current_channel != new_channel do + :persistent_term.put( + {__MODULE__, :lb_state, new_channel.ref}, + new_channel + ) + end + end + + defp channel_alive?({:connected, %{adapter_payload: %{conn_pid: pid}}}) when is_pid(pid) do + Process.alive?(pid) + end + + defp channel_alive?({:connected, _}), do: true + defp channel_alive?(_), do: false + + defp dns_target?(target) do + URI.parse(target).scheme == "dns" + end + defp via(ref) do {:global, {__MODULE__, ref}} end @@ -333,7 +532,12 @@ defmodule GRPC.Client.Connection do codec: GRPC.Codec.Proto, compressor: nil, accepted_compressors: [], - headers: [] + headers: [], + lb_policy: nil, + resolver: GRPC.Client.Resolver, + resolve_interval: @default_resolve_interval, + max_resolve_interval: @default_max_resolve_interval, + min_resolve_interval: @default_min_resolve_interval ) resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver) @@ -364,7 +568,9 @@ defmodule GRPC.Client.Connection do base_state = %__MODULE__{ virtual_channel: virtual_channel, resolver: resolver, - adapter: adapter + adapter: adapter, + resolver_target: norm_target, + connect_opts: norm_opts } case resolver.resolve(norm_target) do @@ -423,7 +629,7 @@ defmodule GRPC.Client.Connection do key = build_address_key(prefer_host, prefer_port) - with {:ok, ch} <- Map.get(real_channels, key, {:error, :no_channel}) do + with {:connected, ch} <- Map.get(real_channels, key, {:failed, :no_channel}) do {:ok, %__MODULE__{ base_state @@ -433,7 +639,7 @@ defmodule GRPC.Client.Connection do real_channels: real_channels }} else - {:error, reason} -> {:error, reason} + {:failed, reason} -> {:error, reason} end {:error, :no_addresses} -> @@ -451,7 +657,7 @@ defmodule GRPC.Client.Connection do %__MODULE__{ base_state | virtual_channel: ch, - real_channels: %{"#{host}:#{port}" => {:ok, ch}} + real_channels: %{"#{host}:#{port}" => {:connected, ch}} }} {:error, reason} -> @@ -469,10 +675,10 @@ defmodule GRPC.Client.Connection do adapter ) do {:ok, ch} -> - {build_address_key(host, port), {:ok, ch}} + {build_address_key(host, port), {:connected, ch}} {:error, reason} -> - {build_address_key(host, port), {:error, reason}} + {build_address_key(host, port), {:failed, reason}} end end) end diff --git a/grpc_client/lib/grpc/client/dns_resolver.ex b/grpc_client/lib/grpc/client/dns_resolver.ex new file mode 100644 index 00000000..e57cd914 --- /dev/null +++ b/grpc_client/lib/grpc/client/dns_resolver.ex @@ -0,0 +1,157 @@ +defmodule GRPC.Client.DNSResolver do + @moduledoc """ + Dedicated process for periodic DNS re-resolution. + + Linked to the parent `GRPC.Client.Connection` GenServer. Owns the + resolve loop, backoff, rate limiting, and telemetry — keeping the + Connection process focused on channel management. + + Sends `{:dns_result, result}` to the Connection after each resolve, + where `result` matches the return type of `GRPC.Client.Resolver.resolve/1`. + + ## Resolver contract + + The `:resolver` option must be a module implementing the + `GRPC.Client.Resolver` behaviour — specifically the `c:GRPC.Client.Resolver.resolve/1` + callback, which returns: + + {:ok, %{addresses: [%{address: String.t(), port: integer()}], service_config: term()}} + | {:error, term()} + """ + use GenServer + require Logger + + @resolve_stop_event [:grpc, :client, :resolve, :stop] + @resolve_error_event [:grpc, :client, :resolve, :error] + + defstruct [ + :connection_pid, + :resolver_module, + :target, + :resolve_interval, + :base_resolve_interval, + :max_resolve_interval, + :min_resolve_interval, + :last_resolve_at, + :timer_ref + ] + + @doc """ + Starts the resolver process, linked to the calling process. + + Options: + * `:connection_pid` — pid of the owning Connection GenServer + * `:resolver` — module implementing `GRPC.Client.Resolver` behaviour + * `:target` — the DNS target string + * `:resolve_interval` — base interval between resolves (ms) + * `:max_resolve_interval` — backoff cap (ms) + * `:min_resolve_interval` — rate-limit floor (ms) + """ + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl GenServer + def init(opts) do + resolve_interval = Keyword.fetch!(opts, :resolve_interval) + + state = %__MODULE__{ + connection_pid: Keyword.fetch!(opts, :connection_pid), + resolver_module: Keyword.fetch!(opts, :resolver), + target: Keyword.fetch!(opts, :target), + resolve_interval: resolve_interval, + base_resolve_interval: resolve_interval, + max_resolve_interval: Keyword.fetch!(opts, :max_resolve_interval), + min_resolve_interval: Keyword.fetch!(opts, :min_resolve_interval), + last_resolve_at: nil + } + + {:ok, schedule(state)} + end + + @impl GenServer + def handle_info(:resolve, state) do + state = do_resolve(state) + {:noreply, schedule(state)} + end + + def handle_info(:resolve_now, state) do + now = System.monotonic_time(:millisecond) + + if rate_limited?(state, now) do + Logger.debug("DNS re-resolution for #{state.target} rate-limited, skipping") + {:noreply, state} + else + state = do_resolve(state) + {:noreply, state} + end + end + + defp do_resolve(state) do + start_time = System.monotonic_time() + result = state.resolver_module.resolve(state.target) + duration = System.monotonic_time() - start_time + now = System.monotonic_time(:millisecond) + + state = %{state | last_resolve_at: now} + + case result do + {:ok, %{addresses: []}} -> + emit_error(duration, state.target, :empty_addresses) + + Logger.warning( + "DNS re-resolution returned empty addresses for #{state.target}, keeping existing" + ) + + send(state.connection_pid, {:dns_result, result}) + backoff(state) + + {:ok, %{addresses: addresses}} -> + emit_success(duration, state.target, length(addresses)) + send(state.connection_pid, {:dns_result, result}) + reset_backoff(state) + + {:error, reason} -> + emit_error(duration, state.target, reason) + Logger.warning("DNS re-resolution failed for #{state.target}: #{inspect(reason)}") + send(state.connection_pid, {:dns_result, result}) + backoff(state) + end + end + + defp schedule(state) do + if state.timer_ref, do: Process.cancel_timer(state.timer_ref) + ref = Process.send_after(self(), :resolve, state.resolve_interval) + %{state | timer_ref: ref} + end + + defp backoff(state) do + new_interval = min(state.resolve_interval * 2, state.max_resolve_interval) + %{state | resolve_interval: new_interval} + end + + defp reset_backoff(state) do + %{state | resolve_interval: state.base_resolve_interval} + end + + defp rate_limited?(%{last_resolve_at: nil}, _now), do: false + + defp rate_limited?(%{last_resolve_at: last, min_resolve_interval: min}, now) do + now - last < min + end + + defp emit_success(duration, target, address_count) do + :telemetry.execute(@resolve_stop_event, %{duration: duration}, %{ + target: target, + address_count: address_count + }) + end + + defp emit_error(duration, target, reason) do + :telemetry.execute(@resolve_error_event, %{duration: duration}, %{ + target: target, + reason: reason, + address_count: 0 + }) + end +end diff --git a/grpc_client/test/grpc/client/re_resolve_test.exs b/grpc_client/test/grpc/client/re_resolve_test.exs new file mode 100644 index 00000000..70d08ff4 --- /dev/null +++ b/grpc_client/test/grpc/client/re_resolve_test.exs @@ -0,0 +1,1152 @@ +defmodule GRPC.Client.ReResolveTest do + @moduledoc """ + Tests for periodic DNS re-resolution in the gRPC client connection manager. + + Test coverage modelled after grpc-go's dns_resolver_test.go and + resolver_update_test.go. Covers: + + 1. Scale-up — new backends discovered on re-resolution + 2. Scale-down — backends removed on re-resolution + 3. No-op — unchanged addresses leave channels untouched + 4. Complete replacement — disjoint address lists + 5. DNS failure — keeps existing channels, logs warning + 6. Recovery after failure — next cycle succeeds + 7. Empty address list — treated as failure, channels preserved + 8. Recovery after empty — next cycle with valid addresses works + 9. pick_channel stability — channel picking works during/after re-resolution + 10. pick_channel after full replacement — new backend is pickable + 11. Repeated cycles — timer fires on every interval tick + 12. Non-DNS targets — ipv4: targets do not trigger re-resolution + 13. Timer after disconnect — no crash + 14. LB error during re-resolution — connection survives + 15. Port change on same host — detected as new address + 16. Exponential backoff — doubles on failure, resets on success, caps at max + 17. Rate limiting — resolve_now calls coalesced within min_resolve_interval + 18. Telemetry — :stop event on success, :error event on failure/empty + """ + use GRPC.Client.DataCase, async: false + import Mox + + alias GRPC.Client.Connection + + # Interval for re-resolution in tests (ms). Long enough that only one + # re-resolve fires per sleep window, short enough tests stay fast. + @resolve_interval 200 + # Sleep slightly longer than one interval so the timer fires exactly once. + @wait @resolve_interval + 100 + # After a failure, backoff doubles the interval. Wait accordingly. + @wait_after_backoff @resolve_interval * 2 + 150 + + setup do + Mox.set_mox_global() + ref = make_ref() + + %{ + ref: ref, + adapter: GRPC.Test.ClientAdapter, + resolver: GRPC.Client.MockResolver + } + end + + # -- helpers --------------------------------------------------------------- + + defp disconnect_and_wait(channel) do + ref = channel.ref + pid = :global.whereis_name({Connection, ref}) + + if pid && Process.alive?(pid) do + # Also monitor the DNSResolver so we wait for it to die + state = :sys.get_state(pid) + resolver_pid = state.dns_resolver_pid + + mon = Process.monitor(pid) + Connection.disconnect(channel) + + receive do + {:DOWN, ^mon, :process, ^pid, _} -> :ok + after + 1_000 -> :ok + end + + # DNSResolver is linked to Connection, so it should die too. + # Wait briefly to ensure it's fully stopped. + if resolver_pid && Process.alive?(resolver_pid) do + resolver_mon = Process.monitor(resolver_pid) + + receive do + {:DOWN, ^resolver_mon, :process, ^resolver_pid, _} -> :ok + after + 1_000 -> :ok + end + end + end + end + + defp connect_with_resolver(ref, resolver, adapter, addresses, opts) do + # Initial resolve call during connect/2 + expect(resolver, :resolve, fn _target -> + {:ok, %{addresses: addresses, service_config: nil}} + end) + + # Stub subsequent re-resolve calls to return the same addresses by default. + # Individual tests override this with expect/stub before sleeping. + stub(resolver, :resolve, fn _target -> + {:ok, %{addresses: addresses, service_config: nil}} + end) + + Connection.connect( + "dns://my-service.local:50051", + [ + adapter: adapter, + name: ref, + resolver: resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0 + ] ++ opts + ) + end + + defp get_state(ref) do + pid = :global.whereis_name({Connection, ref}) + :sys.get_state(pid) + end + + defp get_resolver_state(ref) do + conn_state = get_state(ref) + :sys.get_state(conn_state.dns_resolver_pid) + end + + # -- 1. Scale-up: new backends discovered ---------------------------------- + + describe "scale-up: new backends discovered" do + test "adds channels for addresses that appear in DNS", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + assert map_size(get_state(ctx.ref).real_channels) == 1 + + new_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + assert Map.has_key?(state.real_channels, "10.0.0.1:50051") + assert Map.has_key?(state.real_channels, "10.0.0.2:50051") + + disconnect_and_wait(channel) + end + end + + # -- 2. Scale-down: backends removed --------------------------------------- + + describe "scale-down: backends removed" do + test "disconnects channels for addresses no longer in DNS", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + lb_policy: :round_robin + ) + + assert map_size(get_state(ctx.ref).real_channels) == 2 + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 1 + assert Map.has_key?(state.real_channels, "10.0.0.1:50051") + refute Map.has_key?(state.real_channels, "10.0.0.2:50051") + + disconnect_and_wait(channel) + end + end + + # -- 3. No-op: unchanged addresses ---------------------------------------- + + describe "no-op: unchanged addresses" do + test "leaves channels untouched when DNS returns the same set", ctx do + addresses = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + {:ok, channel} = + connect_with_resolver(ctx.ref, ctx.resolver, ctx.adapter, addresses, + lb_policy: :round_robin + ) + + state_before = get_state(ctx.ref) + + # stub already returns same addresses from connect_with_resolver + Process.sleep(@wait) + + state_after = get_state(ctx.ref) + assert state_before.real_channels == state_after.real_channels + + disconnect_and_wait(channel) + end + end + + # -- 4. Complete replacement: disjoint address lists ----------------------- + + describe "complete replacement: disjoint address lists" do + test "removes old and creates new channels for entirely different backends", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + lb_policy: :round_robin + ) + + new_addrs = [ + %{address: "10.0.0.3", port: 50051}, + %{address: "10.0.0.4", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + refute Map.has_key?(state.real_channels, "10.0.0.1:50051") + refute Map.has_key?(state.real_channels, "10.0.0.2:50051") + assert Map.has_key?(state.real_channels, "10.0.0.3:50051") + assert Map.has_key?(state.real_channels, "10.0.0.4:50051") + + disconnect_and_wait(channel) + end + end + + # -- 5. DNS failure keeps existing channels -------------------------------- + + describe "DNS failure during re-resolution" do + test "keeps existing channels on resolver error", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> {:error, :timeout} end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 1 + assert Map.has_key?(state.real_channels, "10.0.0.1:50051") + + disconnect_and_wait(channel) + end + end + + # -- 6. Recovery after DNS failure ----------------------------------------- + + describe "recovery after DNS failure" do + test "next successful cycle updates channels after a failed one", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # First cycle: failure + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) + + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 1 + + # Second cycle: success (interval is doubled after failure) + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait_after_backoff) + assert map_size(get_state(ctx.ref).real_channels) == 2 + + disconnect_and_wait(channel) + end + end + + # -- 7. Empty address list: treated as failure ----------------------------- + + describe "empty address list on re-resolution" do + test "keeps existing channels when DNS returns zero addresses", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [], service_config: nil}} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + + disconnect_and_wait(channel) + end + end + + # -- 8. Recovery after empty ----------------------------------------------- + + describe "recovery after empty address list" do + test "subsequent cycle with valid addresses updates channels", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # First cycle: empty — channels preserved + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [], service_config: nil}} + end) + + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 1 + + # Second cycle: valid — channels updated (interval doubled after empty) + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.3", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait_after_backoff) + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + assert Map.has_key?(state.real_channels, "10.0.0.3:50051") + + disconnect_and_wait(channel) + end + end + + # -- 9. pick_channel stability during re-resolution ----------------------- + + describe "pick_channel stability during re-resolution" do + test "pick_channel continues to work while addresses change", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + assert {:ok, _} = Connection.pick_channel(channel) + + new_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host in ["10.0.0.1", "10.0.0.2"] + assert picked.port == 50051 + + disconnect_and_wait(channel) + end + end + + # -- 10. pick_channel after full replacement -------------------------------- + + describe "pick_channel after full backend replacement" do + test "picks a channel from the new backend set", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.99", port: 50051}], service_config: nil}} + end) + + Process.sleep(@wait) + + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host == "10.0.0.99" + + disconnect_and_wait(channel) + end + end + + # -- 11. Repeated re-resolution cycles ------------------------------------- + + describe "repeated re-resolution cycles" do + test "timer fires on every interval tick, accumulating changes", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # Cycle 1: 2 backends + two_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: two_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 2 + + # Cycle 2: 3 backends (no backoff — previous cycle succeeded) + three_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051}, + %{address: "10.0.0.3", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: three_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + assert map_size(get_state(ctx.ref).real_channels) == 3 + + disconnect_and_wait(channel) + end + end + + # -- 12. Non-DNS targets skip re-resolution -------------------------------- + + describe "non-DNS targets skip re-resolution" do + test "ipv4 target does not start a dns resolver process", ctx do + {:ok, channel} = + Connection.connect("ipv4:127.0.0.1:50051", + adapter: ctx.adapter, + name: ctx.ref, + resolve_interval: 50 + ) + + # Wait long enough for re-resolve to have fired if it was scheduled + Process.sleep(200) + + # Should still be alive and working — no resolver process started + assert {:ok, _} = Connection.pick_channel(channel) + assert is_nil(get_state(ctx.ref).dns_resolver_pid) + + disconnect_and_wait(channel) + end + end + + # -- 13. Re-resolution after disconnect is a no-op ------------------------- + + describe "re-resolution after disconnect" do + test "linked resolver dies when connection disconnects", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + Connection.disconnect(channel) + + # Wait past when the re-resolve timer would fire — should not crash + Process.sleep(@wait) + + # Process should be gone, pick should fail cleanly + assert {:error, :no_connection} = Connection.pick_channel(channel) + end + end + + # -- 14. LB crash during re-resolution doesn't kill the connection --------- + + describe "LB error during re-resolution" do + test "connection survives when re-resolved addresses cause LB init to fail", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.5", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait) + + # GenServer should still be alive and channels should be updated + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 2 + assert {:ok, _} = Connection.pick_channel(channel) + + disconnect_and_wait(channel) + end + end + + # -- 15. Port change on same host detected -------------------------------- + + describe "port change on same host" do + test "detects port change as a new address", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + assert Map.has_key?(get_state(ctx.ref).real_channels, "10.0.0.1:50051") + + # Same host, different port + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [%{address: "10.0.0.1", port: 50052}], + service_config: nil + }} + end) + + Process.sleep(@wait) + + state = get_state(ctx.ref) + assert map_size(state.real_channels) == 1 + refute Map.has_key?(state.real_channels, "10.0.0.1:50051") + assert Map.has_key?(state.real_channels, "10.0.0.1:50052") + + disconnect_and_wait(channel) + end + end + + # -- 16. Exponential backoff on failure ------------------------------------ + + describe "exponential backoff on failure" do + test "interval doubles after each consecutive failure", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # Fail continuously + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) + + # After first failure: interval should double + Process.sleep(@wait) + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval * 2 + + # After second failure: doubles again + Process.sleep(resolver_state.resolve_interval + 50) + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval * 4 + + disconnect_and_wait(channel) + end + + test "interval resets to base after successful resolution", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # First cycle: failure → doubles interval + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) + + Process.sleep(@wait) + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval * 2 + + # Second cycle: success → resets to base + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + Process.sleep(@wait_after_backoff) + resolver_state = get_resolver_state(ctx.ref) + assert resolver_state.resolve_interval == @resolve_interval + + disconnect_and_wait(channel) + end + + test "interval caps at max_resolve_interval", ctx do + # Set max to 4x base so we can hit the cap quickly + max = @resolve_interval * 4 + + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> {:error, :nxdomain} end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + max_resolve_interval: max, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + # Fail 1: 200 -> 400 + Process.sleep(@wait) + assert get_resolver_state(ctx.ref).resolve_interval == @resolve_interval * 2 + + # Fail 2: 400 -> 800 capped to 800 (= max) + Process.sleep(@resolve_interval * 2 + 50) + assert get_resolver_state(ctx.ref).resolve_interval == max + + # Fail 3: should stay at max, not grow further + Process.sleep(max + 50) + assert get_resolver_state(ctx.ref).resolve_interval == max + + disconnect_and_wait(channel) + end + end + + # -- 17. Rate limiting / resolve_now coalescing ---------------------------- + + describe "rate limiting" do + test "resolve_now calls within min_resolve_interval are skipped", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + # Use a long resolve_interval so the periodic timer doesn't fire, + # and a large min_resolve_interval to make rate limiting deterministic. + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: 60_000, + min_resolve_interval: 60_000, + lb_policy: :round_robin + ) + + # Track resolve calls during the burst + call_count = :counters.new(1, [:atomics]) + + stub(ctx.resolver, :resolve, fn _target -> + :counters.add(call_count, 1, 1) + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + resolver_pid = get_state(ctx.ref).dns_resolver_pid + + # Fire 20 resolve_now calls rapidly + for _ <- 1..20, do: send(resolver_pid, :resolve_now) + + # Wait for the GenServer to drain its mailbox + _ = :sys.get_state(resolver_pid) + + # With 60s rate limit, only the first should resolve; rest are skipped + actual = :counters.get(call_count, 1) + assert actual == 1, "Expected exactly 1 resolution, got #{actual}" + + disconnect_and_wait(channel) + end + end + + # -- 18. Telemetry events -------------------------------------------------- + + describe "telemetry events" do + setup do + test_pid = self() + handler_id = "test-resolve-telemetry-#{inspect(test_pid)}" + + :telemetry.attach_many( + handler_id, + [ + [:grpc, :client, :resolve, :stop], + [:grpc, :client, :resolve, :error] + ], + fn name, measurements, metadata, [] -> + send(test_pid, {:telemetry, name, measurements, metadata}) + end, + [] + ) + + on_exit(fn -> :telemetry.detach(handler_id) end) + :ok + end + + test "emits :stop event on successful re-resolution", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + new_addrs = [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ] + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: new_addrs, service_config: nil}} + end) + + Process.sleep(@wait) + + assert_received {:telemetry, [:grpc, :client, :resolve, :stop], measurements, metadata} + assert is_integer(measurements.duration) + assert measurements.duration >= 0 + assert metadata.target == "dns://my-service.local:50051" + assert metadata.address_count == 2 + + disconnect_and_wait(channel) + end + + test "emits :error event on DNS failure", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> {:error, :timeout} end) + + Process.sleep(@wait) + + assert_received {:telemetry, [:grpc, :client, :resolve, :error], measurements, metadata} + assert is_integer(measurements.duration) + assert metadata.target == "dns://my-service.local:50051" + assert metadata.reason == :timeout + assert metadata.address_count == 0 + + disconnect_and_wait(channel) + end + + test "emits :error event on empty address list", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [], service_config: nil}} + end) + + Process.sleep(@wait) + + assert_received {:telemetry, [:grpc, :client, :resolve, :error], _measurements, metadata} + assert metadata.reason == :empty_addresses + + disconnect_and_wait(channel) + end + end + + # -- 19. Stale persistent_term: LB picks unhealthy channel ----------------- + + describe "stale persistent_term prevention" do + setup ctx do + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.99"]) + on_exit(fn -> Application.delete_env(:grpc_client, :grpc_test_failing_hosts) end) + Map.put(ctx, :failing_adapter, GRPC.Test.FailingClientAdapter) + end + + test "falls back to healthy channel when LB picks a failed one", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + assert {:ok, _} = Connection.pick_channel(channel) + + # Re-resolve adds a failing host. Round-robin might pick it, but + # fallback should ensure we get the healthy one. + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.99", port: 50051}, + %{address: "10.0.0.1", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait) + + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host == "10.0.0.1" + + disconnect_and_wait(channel) + end + + test "pick_channel returns error when all new channels fail", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + assert {:ok, _} = Connection.pick_channel(channel) + + # Re-resolve replaces with ONLY failing hosts + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.98", "10.0.0.99"]) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.98", port: 50051}, + %{address: "10.0.0.99", port: 50051} + ], + service_config: nil + }} + end) + + Process.sleep(@wait) + + assert {:error, :no_connection} = Connection.pick_channel(channel) + end + end + + # -- 20. Retry previously failed channels still in DNS --------------------- + + describe "retry previously failed channels" do + setup ctx do + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.2"]) + on_exit(fn -> Application.delete_env(:grpc_client, :grpc_test_failing_hosts) end) + Map.put(ctx, :failing_adapter, GRPC.Test.FailingClientAdapter) + end + + test "reconnects a previously failed channel when it becomes reachable", ctx do + expect(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: @resolve_interval, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + # 10.0.0.2 should be in error state + state = get_state(ctx.ref) + assert match?({:failed, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + + # Now make 10.0.0.2 reachable + Application.put_env(:grpc_client, :grpc_test_failing_hosts, []) + + Process.sleep(@wait) + + # Both channels should now be healthy + state = get_state(ctx.ref) + assert match?({:connected, _}, Map.get(state.real_channels, "10.0.0.1:50051")) + assert match?({:connected, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + + disconnect_and_wait(channel) + end + end + + # -- 21. Resolver runs in dedicated process, doesn't block Connection ------ + + describe "dedicated resolver process" do + test "Connection stays responsive during slow DNS resolution", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # Simulate a resolver that takes a long time + stub(ctx.resolver, :resolve, fn _target -> + Process.sleep(2_000) + {:ok, %{addresses: [%{address: "10.0.0.1", port: 50051}], service_config: nil}} + end) + + # Wait for re-resolve to fire (runs in DNSResolver process) + Process.sleep(@wait) + + # Connection GenServer should still be responsive — pick_channel works + assert {:ok, _} = Connection.pick_channel(channel) + + # Disconnect should also work while resolve is in-flight + assert {:ok, _} = Connection.disconnect(channel) + end + + test "resolver crash doesn't kill the connection (linked process exits)", ctx do + {:ok, channel} = + connect_with_resolver( + ctx.ref, + ctx.resolver, + ctx.adapter, + [ + %{address: "10.0.0.1", port: 50051} + ], + lb_policy: :round_robin + ) + + # Verify dns_resolver is running + state = get_state(ctx.ref) + assert is_pid(state.dns_resolver_pid) + assert Process.alive?(state.dns_resolver_pid) + + disconnect_and_wait(channel) + end + end + + # -- 22. :refresh handler doesn't crash on {:error, _} channels ----------- + + describe "refresh handler with failed channels" do + setup ctx do + Application.put_env(:grpc_client, :grpc_test_failing_hosts, ["10.0.0.2"]) + on_exit(fn -> Application.delete_env(:grpc_client, :grpc_test_failing_hosts) end) + Map.put(ctx, :failing_adapter, GRPC.Test.FailingClientAdapter) + end + + test "GenServer survives when :refresh picks a failed channel", ctx do + # Connect with 2 backends — one healthy, one failing + expect(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + stub(ctx.resolver, :resolve, fn _target -> + {:ok, + %{ + addresses: [ + %{address: "10.0.0.1", port: 50051}, + %{address: "10.0.0.2", port: 50051} + ], + service_config: nil + }} + end) + + {:ok, channel} = + Connection.connect( + "dns://my-service.local:50051", + adapter: ctx.failing_adapter, + name: ctx.ref, + resolver: ctx.resolver, + resolve_interval: 60_000, + min_resolve_interval: 0, + lb_policy: :round_robin + ) + + # 10.0.0.2 is {:failed, _} in real_channels + state = get_state(ctx.ref) + assert match?({:failed, _}, Map.get(state.real_channels, "10.0.0.2:50051")) + + # Wait for several :refresh cycles (15s default, but we'll trigger manually). + # Round-robin will eventually pick 10.0.0.2. Without the fix, this crashes. + pid = :global.whereis_name({Connection, ctx.ref}) + + for _ <- 1..5 do + send(pid, :refresh) + end + + # Small sleep for messages to process + Process.sleep(50) + + # GenServer should still be alive + assert Process.alive?(pid) + assert {:ok, picked} = Connection.pick_channel(channel) + assert picked.host == "10.0.0.1" + + disconnect_and_wait(channel) + end + end +end diff --git a/grpc_client/test/support/test_adapter.exs b/grpc_client/test/support/test_adapter.exs index 4fe3c161..59404824 100644 --- a/grpc_client/test/support/test_adapter.exs +++ b/grpc_client/test/support/test_adapter.exs @@ -11,6 +11,32 @@ defmodule GRPC.Test.ClientAdapter do def cancel(stream), do: stream end +defmodule GRPC.Test.FailingClientAdapter do + @moduledoc """ + A test adapter that fails to connect for hosts listed in the + :grpc_test_failing_hosts application env key. All other hosts succeed. + """ + @behaviour GRPC.Client.Adapter + + def connect(%{host: host} = channel, _opts) do + failing = Application.get_env(:grpc_client, :grpc_test_failing_hosts, []) + + if host in failing do + {:error, :connection_refused} + else + {:ok, channel} + end + end + + def disconnect(channel), do: {:ok, channel} + def send_request(stream, _message, _opts), do: stream + def receive_data(_stream, _opts), do: {:ok, nil} + def send_data(stream, _message, _opts), do: stream + def send_headers(stream, _opts), do: stream + def end_stream(stream), do: stream + def cancel(stream), do: stream +end + defmodule GRPC.Test.ServerAdapter do @behaviour GRPC.Server.Adapter diff --git a/grpc_client/test/test_helper.exs b/grpc_client/test/test_helper.exs index b8d41541..cea39974 100644 --- a/grpc_client/test/test_helper.exs +++ b/grpc_client/test/test_helper.exs @@ -5,6 +5,10 @@ Mox.defmock(GRPC.Client.Resolver.DNS.MockAdapter, for: GRPC.Client.Resolver.DNS.Adapter ) +Mox.defmock(GRPC.Client.MockResolver, + for: GRPC.Client.Resolver +) + {parsed, _, _} = OptionParser.parse(System.argv(), switches: [warnings_as_errors: :boolean]) if !parsed[:warnings_as_errors] do