From 63ebf7f9628569d0e90f2e707e531c116cea05a3 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 14:34:52 +0200 Subject: [PATCH 01/13] feat(sync-service): add Electric.PollWait per-process bounded polling primitive --- .../sync-service/lib/electric/poll_wait.ex | 72 +++++++++++++ .../test/electric/poll_wait_test.exs | 100 ++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 packages/sync-service/lib/electric/poll_wait.ex create mode 100644 packages/sync-service/test/electric/poll_wait_test.exs diff --git a/packages/sync-service/lib/electric/poll_wait.ex b/packages/sync-service/lib/electric/poll_wait.ex new file mode 100644 index 0000000000..2f4fa72b36 --- /dev/null +++ b/packages/sync-service/lib/electric/poll_wait.ex @@ -0,0 +1,72 @@ +defmodule Electric.PollWait do + @moduledoc """ + Per-process bounded polling of a cheap (ETS-backed) condition. + + `until/3` sleeps between checks with exponential backoff (doubling, capped) + and bounded jitter so concurrent waiters land on distinct ETS reads + instead of stampeding the same millisecond window. + + All defaults can be overridden per-call so the primitive can be shared + between consumers with very different latency profiles (e.g. StatusMonitor + readiness on a seconds-to-minutes timescale vs ShapeCache creation on a + tens-of-milliseconds timescale). + """ + + @default_initial_interval 25 + @default_max_interval 500 + @default_backoff 2.0 + @default_jitter 0.25 + + @type ready :: :ready | {:ready, term()} + @type check :: (-> ready | :not_ready) + + @spec until(check, timeout(), keyword()) :: ready | :timeout + def until(check_fun, timeout, opts \\ []) when is_function(check_fun, 0) do + initial = Keyword.get(opts, :initial_interval, @default_initial_interval) + max = Keyword.get(opts, :max_interval, @default_max_interval) + factor = Keyword.get(opts, :backoff, @default_backoff) + jitter = Keyword.get(opts, :jitter, @default_jitter) + + do_until(check_fun, deadline(timeout), initial, max, factor, jitter) + end + + defp deadline(:infinity), do: :infinity + + defp deadline(t) when is_integer(t) and t >= 0, + do: System.monotonic_time(:millisecond) + t + + defp do_until(check_fun, deadline, interval, max, factor, jitter) do + case check_fun.() do + :not_ready -> + case remaining(deadline) do + 0 -> + :timeout + + rem -> + sleep_for = min(jittered(interval, jitter), rem) + Process.sleep(sleep_for) + + next_interval = min(round(interval * factor), max) + do_until(check_fun, deadline, next_interval, max, factor, jitter) + end + + ready -> + ready + end + end + + # Returns interval ± jitter*interval, clamped to >= 1ms so we never busy-loop. + defp jittered(interval, jitter) when jitter <= 0.0, do: max(1, interval) + + defp jittered(interval, jitter) do + spread = max(1, round(interval * jitter)) + # :rand.uniform(N) returns 1..N. Centre around 0 by subtracting spread+1. + offset = :rand.uniform(2 * spread + 1) - spread - 1 + max(1, interval + offset) + end + + defp remaining(:infinity), do: :infinity + + defp remaining(deadline), + do: max(0, deadline - System.monotonic_time(:millisecond)) +end diff --git a/packages/sync-service/test/electric/poll_wait_test.exs b/packages/sync-service/test/electric/poll_wait_test.exs new file mode 100644 index 0000000000..275725e657 --- /dev/null +++ b/packages/sync-service/test/electric/poll_wait_test.exs @@ -0,0 +1,100 @@ +defmodule Electric.PollWaitTest do + use ExUnit.Case, async: true + + alias Electric.PollWait + + describe "until/3" do + test "returns :ready immediately when the predicate is ready on first check" do + assert PollWait.until(fn -> :ready end, 1_000) == :ready + end + + test "returns {:ready, value} when the predicate yields a tagged ready" do + assert PollWait.until(fn -> {:ready, :foo} end, 1_000) == {:ready, :foo} + end + + test "returns :timeout when the predicate never becomes ready" do + # Use a tiny timeout so the test is cheap. Initial interval defaults + # to 25ms so 50ms is enough to guarantee at least one sleep. + assert PollWait.until(fn -> :not_ready end, 50) == :timeout + end + + test "stops polling once the deadline elapses, even mid-sleep" do + counter = :counters.new(1, [:atomics]) + + check = fn -> + :counters.add(counter, 1, 1) + :not_ready + end + + assert PollWait.until(check, 75, initial_interval: 25, max_interval: 25, jitter: 0.0) == + :timeout + + # 0ms check, sleep 25, 25ms check, sleep 25, 50ms check, sleep 25, 75ms deadline. + # Allow either 3 or 4 calls depending on scheduling jitter. + count = :counters.get(counter, 1) + assert count in 3..4, "expected 3 or 4 checks, got #{count}" + end + + test "respects per-call backoff opts (no global defaults baked in)" do + timestamps = :ets.new(:ts, [:public, :ordered_set]) + + check = fn -> + :ets.insert(timestamps, {System.monotonic_time(:millisecond), :tick}) + :not_ready + end + + _ = + PollWait.until(check, 300, + initial_interval: 5, + max_interval: 20, + backoff: 2.0, + jitter: 0.0 + ) + + ts = timestamps |> :ets.tab2list() |> Enum.map(&elem(&1, 0)) |> Enum.sort() + diffs = ts |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end) + + # With 0 jitter and a 2.0 factor: 5, 10, 20, 20, 20, ... ms between checks. + # Schedulers add ±a-few-ms slop, so allow loose bounds. + [d1, d2, d3 | _] = diffs + assert d1 in 4..15 + assert d2 in 8..25 + assert d3 in 15..30 + :ets.delete(timestamps) + end + + test "jitter never produces negative or zero sleeps" do + # Drive 200 jittered intervals at the minimum interval (1) with max jitter + # and ensure none of them blow up or return ready spuriously. + check = fn -> :not_ready end + assert PollWait.until(check, 5, initial_interval: 1, max_interval: 1, jitter: 1.0) == + :timeout + end + + test ":infinity timeout never returns :timeout but yields when ready" do + parent = self() + + task = + Task.async(fn -> + PollWait.until( + fn -> + receive do + :go -> :ready + after + 0 -> :not_ready + end + end, + :infinity, + initial_interval: 5, + max_interval: 5, + jitter: 0.0 + ) + |> tap(fn r -> send(parent, {:result, r}) end) + end) + + Process.sleep(20) + send(task.pid, :go) + assert_receive {:result, :ready}, 200 + end + end +end From 16e1eaa0e2f454056ea5336c30e3941082f3f582 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 14:40:22 +0200 Subject: [PATCH 02/13] fix(sync-service): loosen PollWait timing assertions and reformat test --- .../test/electric/poll_wait_test.exs | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/packages/sync-service/test/electric/poll_wait_test.exs b/packages/sync-service/test/electric/poll_wait_test.exs index 275725e657..93cb83a806 100644 --- a/packages/sync-service/test/electric/poll_wait_test.exs +++ b/packages/sync-service/test/electric/poll_wait_test.exs @@ -30,9 +30,11 @@ defmodule Electric.PollWaitTest do :timeout # 0ms check, sleep 25, 25ms check, sleep 25, 50ms check, sleep 25, 75ms deadline. - # Allow either 3 or 4 calls depending on scheduling jitter. + # The load-bearing assertion is the upper bound (<=4): the loop must be + # bounded by the deadline. The lower bound is loosened to 2 because a + # slow CI runner can overrun a single 25ms sleep and collapse the trace. count = :counters.get(counter, 1) - assert count in 3..4, "expected 3 or 4 checks, got #{count}" + assert count in 2..4, "expected 2..4 checks, got #{count}" end test "respects per-call backoff opts (no global defaults baked in)" do @@ -55,11 +57,19 @@ defmodule Electric.PollWaitTest do diffs = ts |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end) # With 0 jitter and a 2.0 factor: 5, 10, 20, 20, 20, ... ms between checks. - # Schedulers add ±a-few-ms slop, so allow loose bounds. [d1, d2, d3 | _] = diffs - assert d1 in 4..15 - assert d2 in 8..25 - assert d3 in 15..30 + + # Each measured delta must be at least the configured sleep (minus 1ms slop + # for monotonic-time resolution). + assert d1 >= 4, "expected d1 >= 4ms (configured 5), got #{d1}" + assert d2 >= 8, "expected d2 >= 8ms (configured 10), got #{d2}" + assert d3 >= 15, "expected d3 >= 15ms (configured 20), got #{d3}" + + # And growth must be monotonic: each subsequent interval is at least the + # previous one (within scheduler slop). This proves the backoff factor is + # being applied, regardless of how slow the CI runner is. + assert d2 >= d1, "expected d2 >= d1, got d1=#{d1}, d2=#{d2}" + assert d3 >= d2, "expected d3 >= d2, got d2=#{d2}, d3=#{d3}" :ets.delete(timestamps) end @@ -67,6 +77,7 @@ defmodule Electric.PollWaitTest do # Drive 200 jittered intervals at the minimum interval (1) with max jitter # and ensure none of them blow up or return ready spuriously. check = fn -> :not_ready end + assert PollWait.until(check, 5, initial_interval: 1, max_interval: 1, jitter: 1.0) == :timeout end From 8b036887cf6915771ec26bd15988c635329e468d Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 14:45:08 +0200 Subject: [PATCH 03/13] feat(sync-service): expose StatusMonitor waiter-set congestion flag --- .../lib/electric/status_monitor.ex | 45 +++++++++++- .../test/electric/status_monitor_test.exs | 71 +++++++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index 1189a413da..8ec362963f 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -23,6 +23,8 @@ defmodule Electric.StatusMonitor do @default_results for condition <- @conditions, into: %{}, do: {condition, {false, %{}}} @db_state_key :db_state + @congested_key :waiters_congested + @congested_threshold 100 @spin_prevention_delay 10 def start_link(opts) do @@ -57,6 +59,25 @@ defmodule Electric.StatusMonitor do end end + @doc """ + Returns true once the StatusMonitor's waiter set has crossed + `congested_threshold/0`. The flag is cleared once the set drains back to 0. + + Used by callers to decide between a `GenServer.call` wait (low latency + when uncontended) and a `PollWait.until/3` wait (bounded mailbox growth + under burst). Cheap: one ETS read against a `read_concurrency: true` + table. + """ + @spec congested?(String.t()) :: boolean() + def congested?(stack_id) do + :ets.lookup_element(ets_table(stack_id), @congested_key, 2, false) + rescue + ArgumentError -> false + end + + @doc "Threshold at which the StatusMonitor flips its congestion flag. Exposed for tests." + def congested_threshold, do: @congested_threshold + @spec status(String.t()) :: status() def status(stack_id) do table = ets_table(stack_id) @@ -323,7 +344,10 @@ defmodule Electric.StatusMonitor do Process.send_after(self(), {:timeout_waiter, {from, level}}, timeout) end - {:noreply, %{state | waiters: MapSet.put(waiters, {from, level})}} + new_waiters = MapSet.put(waiters, {from, level}) + maybe_set_congested(state.stack_id, MapSet.size(new_waiters)) + + {:noreply, %{state | waiters: new_waiters}} end end @@ -352,7 +376,9 @@ defmodule Electric.StatusMonitor do def handle_info({:timeout_waiter, {from, _level} = waiter}, state) do if MapSet.member?(state.waiters, waiter) do GenServer.reply(from, {:error, timeout_message(state.stack_id)}) - {:noreply, %{state | waiters: MapSet.delete(state.waiters, waiter)}} + new_waiters = MapSet.delete(state.waiters, waiter) + maybe_clear_congested(state.stack_id, MapSet.size(new_waiters)) + {:noreply, %{state | waiters: new_waiters}} else {:noreply, state} end @@ -375,9 +401,24 @@ defmodule Electric.StatusMonitor do end end) + maybe_clear_congested(state.stack_id, MapSet.size(waiters)) %{state | waiters: waiters} end + defp maybe_set_congested(stack_id, size) when size >= @congested_threshold do + :ets.insert(ets_table(stack_id), {@congested_key, true}) + :ok + end + + defp maybe_set_congested(_stack_id, _size), do: :ok + + defp maybe_clear_congested(stack_id, 0) do + :ets.insert(ets_table(stack_id), {@congested_key, false}) + :ok + end + + defp maybe_clear_congested(_stack_id, _size), do: :ok + defp db_state(table) do :ets.lookup_element(table, @db_state_key, 2, :up) rescue diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index d4c94a7b17..ca496858f3 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -573,4 +573,75 @@ defmodule Electric.StatusMonitorTest do assert_receive {{StatusMonitor, ^ref_active}, {:ok, :active}}, 100 end end + + describe "congested?/1" do + test "returns false before any waiters are enqueued", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + assert StatusMonitor.congested?(stack_id) == false + end + + test "returns false when the table does not exist (status monitor not started)", + %{stack_id: stack_id} do + assert StatusMonitor.congested?(stack_id) == false + end + + test "flips to true once the waiter set reaches the threshold and back to false on drain", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + threshold = StatusMonitor.congested_threshold() + pid = GenServer.whereis(StatusMonitor.name(stack_id)) + + # Spawn `threshold` waiters that block on :active. + waiters = + for _ <- 1..threshold do + Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 5_000) end) + end + + # Wait deterministically until all `threshold` calls have landed in the + # StatusMonitor's state — `Task.async` doesn't guarantee the spawned task + # has executed its `GenServer.call`, so a simple `wait_for_messages_to_be_processed` + # could race ahead of the tasks. + wait_until_waiters_count(pid, threshold) + + assert StatusMonitor.congested?(stack_id) == true + + # Drive readiness to drain all waiters. + Support.TestUtils.set_status_to_active(%{stack_id: stack_id}) + Enum.each(waiters, &Task.await/1) + + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + assert StatusMonitor.congested?(stack_id) == false + end + + test "does not set the flag below the threshold", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + _ = + Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) + + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + assert StatusMonitor.congested?(stack_id) == false + end + + defp wait_until_waiters_count( + pid, + expected, + deadline \\ System.monotonic_time(:millisecond) + 2_000 + ) do + size = MapSet.size(:sys.get_state(pid).waiters) + + cond do + size >= expected -> + :ok + + System.monotonic_time(:millisecond) > deadline -> + flunk("Timed out waiting for #{expected} waiters; saw #{size}") + + true -> + Process.sleep(5) + wait_until_waiters_count(pid, expected, deadline) + end + end + end end From c55618106628b91d3bd992693924b39cb62ee1c8 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 14:49:24 +0200 Subject: [PATCH 04/13] fix(sync-service): tighten congested? doc and cover timeout-drain path --- .../lib/electric/status_monitor.ex | 3 +-- .../test/electric/status_monitor_test.exs | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index 8ec362963f..03c21ac66b 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -65,8 +65,7 @@ defmodule Electric.StatusMonitor do Used by callers to decide between a `GenServer.call` wait (low latency when uncontended) and a `PollWait.until/3` wait (bounded mailbox growth - under burst). Cheap: one ETS read against a `read_concurrency: true` - table. + under burst). Cheap: one ETS read against the per-stack status table. """ @spec congested?(String.t()) :: boolean() def congested?(stack_id) do diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index ca496858f3..36a0d17487 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -614,6 +614,29 @@ defmodule Electric.StatusMonitorTest do assert StatusMonitor.congested?(stack_id) == false end + test "flips back to false when waiters drain via :timeout_waiter rather than readiness", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + threshold = StatusMonitor.congested_threshold() + pid = GenServer.whereis(StatusMonitor.name(stack_id)) + + waiters = + for _ <- 1..threshold do + Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) + end + + wait_until_waiters_count(pid, threshold) + assert StatusMonitor.congested?(stack_id) == true + + # All waiters time out — :timeout_waiter is the drain path. + results = Enum.map(waiters, &Task.await(&1, 1_000)) + assert Enum.all?(results, &match?({:error, _}, &1)) + + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + assert StatusMonitor.congested?(stack_id) == false + end + test "does not set the flag below the threshold", %{stack_id: stack_id} do start_link_supervised!({StatusMonitor, stack_id: stack_id}) From 1d0999c17ec3331bedacd757d2cd92fc98d616f1 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 16:03:17 +0200 Subject: [PATCH 05/13] feat(sync-service): adaptive poll-based wait_until under StatusMonitor congestion When the StatusMonitor's waiter set crosses the congestion threshold, new wait_until/3 callers now poll on PollWait.until/3 against service_status/1 instead of enqueuing into the StatusMonitor mailbox. This bounds StatusMonitor mailbox growth under burst load while keeping the low-latency GenServer.call path for the uncongested common case. The fast path (:active short-circuit, :waiting+:read_only short-circuit, :sleeping returning :conn_sleeping when not blocking) is unchanged. Congestion is consulted only on the not-ready and sleeping+blocking branches via the shared dispatch_wait/3 chokepoint. The ETS status table is switched from :protected to :public so tests can force the congestion flag directly. Reads (congested?/1) already went through ETS outside the GenServer process. --- .../lib/electric/status_monitor.ex | 37 ++++++++- .../test/electric/status_monitor_test.exs | 77 +++++++++++++++++++ 2 files changed, 111 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index 03c21ac66b..380c3360c7 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -4,6 +4,8 @@ defmodule Electric.StatusMonitor do require Logger + alias Electric.PollWait + @type status() :: %{ conn: :waiting_on_lock | :starting | :up | :sleeping, shape: :starting | :read_only | :up @@ -36,7 +38,7 @@ defmodule Electric.StatusMonitor do Process.set_label({:status_monitor, stack_id}) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - :ets.new(ets_table(stack_id), [:named_table, :protected]) + :ets.new(ets_table(stack_id), [:named_table, :public]) {:ok, %{stack_id: stack_id, waiters: MapSet.new()}} end @@ -205,13 +207,21 @@ defmodule Electric.StatusMonitor do :sleeping -> if Keyword.get(opts, :block_on_conn_sleeping, false) do - do_wait_until(stack_id, level, opts) + dispatch_wait(stack_id, level, opts) else :conn_sleeping end _ -> - do_wait_until(stack_id, level, opts) + dispatch_wait(stack_id, level, opts) + end + end + + defp dispatch_wait(stack_id, level, opts) do + if congested?(stack_id) do + poll_wait(stack_id, level, opts) + else + do_wait_until(stack_id, level, opts) end end @@ -253,6 +263,27 @@ defmodule Electric.StatusMonitor do end end + defp poll_wait(stack_id, level, opts) do + timeout = Keyword.fetch!(opts, :timeout) + + # Mirror check_level/2: only :active and :waiting-for-:read_only count + # as ready. Sleeping is handled by the outer case in wait_until/3 before + # dispatch; once we're polling we treat it as not-ready so the loop + # behaves identically to a parked GenServer waiter. + check = fn -> + case service_status(stack_id) do + :active -> {:ready, {:ok, :active}} + :waiting when level == :read_only -> {:ready, {:ok, :read_only}} + _ -> :not_ready + end + end + + case PollWait.until(check, timeout) do + {:ready, value} -> value + :timeout -> {:error, timeout_message(stack_id)} + end + end + defp maybe_retry_wait_until(_stack_id, _level, _opts, timeout, last_error) when timeout <= @spin_prevention_delay do {:error, last_error} diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index 36a0d17487..6f344d7720 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -667,4 +667,81 @@ defmodule Electric.StatusMonitorTest do end end end + + describe "wait_until/3 under congestion" do + test "polling path returns {:ok, :active} when readiness flips", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + # Force the flag on so the next caller takes the polling path. + force_congested(stack_id) + + test_process = self() + + Task.async(fn -> + result = StatusMonitor.wait_until(stack_id, :active, timeout: 1_000) + send(test_process, {:result, result}) + end) + + refute_receive {:result, _}, 50 + Support.TestUtils.set_status_to_active(%{stack_id: stack_id}) + assert_receive {:result, {:ok, :active}}, 1_000 + end + + test "polling path returns {:ok, :read_only} when only metadata becomes ready", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + force_congested(stack_id) + + test_process = self() + + Task.async(fn -> + result = StatusMonitor.wait_until(stack_id, :read_only, timeout: 1_000) + send(test_process, {:result, result}) + end) + + refute_receive {:result, _}, 50 + StatusMonitor.mark_shape_metadata_ready(stack_id, self()) + assert_receive {:result, {:ok, :read_only}}, 1_000 + end + + test "polling path returns {:error, _} on timeout", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + force_congested(stack_id) + + assert {:error, "Timeout waiting for Postgres lock acquisition"} = + StatusMonitor.wait_until(stack_id, :active, timeout: 50) + end + + test "sleeping branch short-circuits before the polling check (flag set + sleeping, not blocking)", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + force_congested(stack_id) + + StatusMonitor.database_connections_going_to_sleep(stack_id) + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + + # The outer case in wait_until/3 handles sleeping before consulting + # congested?/1, so :conn_sleeping is returned regardless of the flag. + assert StatusMonitor.wait_until(stack_id, :active, timeout: 50) == :conn_sleeping + end + + test "uncongested callers continue to use the GenServer.call path", %{stack_id: stack_id} do + # Indirect check: the GenServer.call path enqueues into state.waiters. We + # confirm a single uncongested caller doesn't flip the flag — proving it + # took the call path, not the polling path (polling never touches + # state.waiters). + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + task = Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 100) end) + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + + assert StatusMonitor.congested?(stack_id) == false + # Drain the timeout waiter so the test exits cleanly. + assert {:error, _} = Task.await(task, 200) + end + + defp force_congested(stack_id) do + :ets.insert(:"#{inspect(StatusMonitor)}:#{stack_id}", {:waiters_congested, true}) + end + end end From ccb38ef91b3b6c545b1c69c6904fd14695ed2d57 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 16:06:06 +0200 Subject: [PATCH 06/13] fix(sync-service): keep StatusMonitor ETS table protected, route test flag via cast --- packages/sync-service/lib/electric/status_monitor.ex | 11 ++++++++++- .../test/electric/status_monitor_test.exs | 5 ++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index 380c3360c7..737bd3c900 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -38,7 +38,7 @@ defmodule Electric.StatusMonitor do Process.set_label({:status_monitor, stack_id}) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - :ets.new(ets_table(stack_id), [:named_table, :public]) + :ets.new(ets_table(stack_id), [:named_table, :protected]) {:ok, %{stack_id: stack_id, waiters: MapSet.new()}} end @@ -364,6 +364,15 @@ defmodule Electric.StatusMonitor do {:noreply, state} end + # Test-only: writes the congestion flag directly. The flag is normally + # set/cleared by the GenServer in response to waiter-set transitions; this + # cast lets tests force the polling branch without first enqueuing the + # threshold's worth of real waiters. + def handle_cast({:set_congested_flag_for_test, value}, state) when is_boolean(value) do + :ets.insert(ets_table(state.stack_id), {@congested_key, value}) + {:noreply, state} + end + def handle_call({:wait_until, level, timeout}, from, %{waiters: waiters} = state) do case check_level(level, state.stack_id) do {:ok, _} = reply -> diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index 6f344d7720..75a4e8bfe5 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -741,7 +741,10 @@ defmodule Electric.StatusMonitorTest do end defp force_congested(stack_id) do - :ets.insert(:"#{inspect(StatusMonitor)}:#{stack_id}", {:waiters_congested, true}) + GenServer.cast(StatusMonitor.name(stack_id), {:set_congested_flag_for_test, true}) + # Round-trip a call to ensure the cast has been processed before the + # caller reads the flag. + StatusMonitor.wait_for_messages_to_be_processed(stack_id) end end end From 0ad77e7221895568558d93725db1ae1057d2c377 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 16:51:47 +0200 Subject: [PATCH 07/13] test(sync-service): loosen brittle timing and string assertions in poll-wait tests --- .../sync-service/test/electric/status_monitor_test.exs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index 75a4e8bfe5..3651d5d5d6 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -708,8 +708,10 @@ defmodule Electric.StatusMonitorTest do start_link_supervised!({StatusMonitor, stack_id: stack_id}) force_congested(stack_id) - assert {:error, "Timeout waiting for Postgres lock acquisition"} = + assert {:error, message} = StatusMonitor.wait_until(stack_id, :active, timeout: 50) + + assert message =~ "Postgres lock" end test "sleeping branch short-circuits before the polling check (flag set + sleeping, not blocking)", @@ -732,12 +734,12 @@ defmodule Electric.StatusMonitorTest do # state.waiters). start_link_supervised!({StatusMonitor, stack_id: stack_id}) - task = Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 100) end) + task = Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) StatusMonitor.wait_for_messages_to_be_processed(stack_id) assert StatusMonitor.congested?(stack_id) == false # Drain the timeout waiter so the test exits cleanly. - assert {:error, _} = Task.await(task, 200) + assert {:error, _} = Task.await(task, 1_000) end defp force_congested(stack_id) do From 0a7f2992cdbfe70c8731acda27c6bbac8ed34942 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 16:52:41 +0200 Subject: [PATCH 08/13] chore: changeset for adaptive StatusMonitor wait --- .changeset/poll-wait-status-monitor.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/poll-wait-status-monitor.md diff --git a/.changeset/poll-wait-status-monitor.md b/.changeset/poll-wait-status-monitor.md new file mode 100644 index 0000000000..631570fd54 --- /dev/null +++ b/.changeset/poll-wait-status-monitor.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Replace the mailbox-based `StatusMonitor.wait_until/3` not-ready path with adaptive per-process polling once the StatusMonitor's waiter set crosses a congestion threshold (bottleneck 2 of #4266). The fast path (`:active`, `:waiting` + `:read_only`, `:sleeping`) is unchanged. Uncongested callers continue to use the existing `GenServer.call` for low-latency wakeup; congested callers switch to `Electric.PollWait.until/3` against `service_status/1`, bounding StatusMonitor mailbox growth to the threshold during cold-start bursts. No HTTP protocol change. From e344565dba36f9d6cad16d2d685cac7a3175dad7 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 20 May 2026 17:23:45 +0200 Subject: [PATCH 09/13] chore(sync-service): mark test-only handle_cast with @doc false --- packages/sync-service/lib/electric/status_monitor.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index 737bd3c900..43373334ec 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -368,6 +368,7 @@ defmodule Electric.StatusMonitor do # set/cleared by the GenServer in response to waiter-set transitions; this # cast lets tests force the polling branch without first enqueuing the # threshold's worth of real waiters. + @doc false def handle_cast({:set_congested_flag_for_test, value}, state) when is_boolean(value) do :ets.insert(ets_table(state.stack_id), {@congested_key, value}) {:noreply, state} From 1a677e2687adf5fb926f5bf8adb55fa9540eafc2 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 21 May 2026 12:58:33 +0200 Subject: [PATCH 10/13] refactor(sync-service): dedupe poll_wait predicate via check_level + add congested_threshold spec --- .../sync-service/lib/electric/status_monitor.ex | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index 43373334ec..f9ef3ca274 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -77,6 +77,7 @@ defmodule Electric.StatusMonitor do end @doc "Threshold at which the StatusMonitor flips its congestion flag. Exposed for tests." + @spec congested_threshold() :: pos_integer() def congested_threshold, do: @congested_threshold @spec status(String.t()) :: status() @@ -266,15 +267,14 @@ defmodule Electric.StatusMonitor do defp poll_wait(stack_id, level, opts) do timeout = Keyword.fetch!(opts, :timeout) - # Mirror check_level/2: only :active and :waiting-for-:read_only count - # as ready. Sleeping is handled by the outer case in wait_until/3 before - # dispatch; once we're polling we treat it as not-ready so the loop - # behaves identically to a parked GenServer waiter. + # Reuse check_level/2 so the polling path's readiness contract is by + # construction identical to a parked GenServer waiter's. Sleeping is + # handled by the outer case in wait_until/3 before dispatch; once we're + # polling, check_level/2's catch-all returns :not_ready and we keep going. check = fn -> - case service_status(stack_id) do - :active -> {:ready, {:ok, :active}} - :waiting when level == :read_only -> {:ready, {:ok, :read_only}} - _ -> :not_ready + case check_level(level, stack_id) do + {:ok, _} = ready -> {:ready, ready} + :not_ready -> :not_ready end end From 3ceeed934036286706f0a0870831138dd5df8d72 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 21 May 2026 13:53:09 +0200 Subject: [PATCH 11/13] Implementation cleanups --- .changeset/poll-wait-status-monitor.md | 2 +- .../sync-service/lib/electric/poll_wait.ex | 8 +++----- .../lib/electric/status_monitor.ex | 18 +++++++----------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/.changeset/poll-wait-status-monitor.md b/.changeset/poll-wait-status-monitor.md index 631570fd54..77fd39d75f 100644 --- a/.changeset/poll-wait-status-monitor.md +++ b/.changeset/poll-wait-status-monitor.md @@ -2,4 +2,4 @@ '@core/sync-service': patch --- -Replace the mailbox-based `StatusMonitor.wait_until/3` not-ready path with adaptive per-process polling once the StatusMonitor's waiter set crosses a congestion threshold (bottleneck 2 of #4266). The fast path (`:active`, `:waiting` + `:read_only`, `:sleeping`) is unchanged. Uncongested callers continue to use the existing `GenServer.call` for low-latency wakeup; congested callers switch to `Electric.PollWait.until/3` against `service_status/1`, bounding StatusMonitor mailbox growth to the threshold during cold-start bursts. No HTTP protocol change. +Replace the mailbox-based `StatusMonitor.wait_until/3` not-ready path with adaptive per-process polling once the StatusMonitor's waiter set crosses a congestion threshold. Uncongested callers continue to use the existing `GenServer.call` for low-latency wakeup; congested callers switch to `Electric.PollWait.until/3` against `service_status/1`, bounding StatusMonitor mailbox growth to the threshold during cold-start bursts. diff --git a/packages/sync-service/lib/electric/poll_wait.ex b/packages/sync-service/lib/electric/poll_wait.ex index 2f4fa72b36..bc6bc72539 100644 --- a/packages/sync-service/lib/electric/poll_wait.ex +++ b/packages/sync-service/lib/electric/poll_wait.ex @@ -7,9 +7,7 @@ defmodule Electric.PollWait do instead of stampeding the same millisecond window. All defaults can be overridden per-call so the primitive can be shared - between consumers with very different latency profiles (e.g. StatusMonitor - readiness on a seconds-to-minutes timescale vs ShapeCache creation on a - tens-of-milliseconds timescale). + between consumers with very different latency profiles. """ @default_initial_interval 25 @@ -55,8 +53,8 @@ defmodule Electric.PollWait do end end - # Returns interval ± jitter*interval, clamped to >= 1ms so we never busy-loop. - defp jittered(interval, jitter) when jitter <= 0.0, do: max(1, interval) + # Returns interval ± jitter*interval, clamped to >= 10ms so we never busy-loop. + defp jittered(interval, jitter) when jitter <= 0.0, do: max(10, interval) defp jittered(interval, jitter) do spread = max(1, round(interval * jitter)) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index f9ef3ca274..e7a8e050cd 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -26,7 +26,7 @@ defmodule Electric.StatusMonitor do @db_state_key :db_state @congested_key :waiters_congested - @congested_threshold 100 + @congested_threshold 1000 @spin_prevention_delay 10 def start_link(opts) do @@ -62,12 +62,12 @@ defmodule Electric.StatusMonitor do end @doc """ - Returns true once the StatusMonitor's waiter set has crossed - `congested_threshold/0`. The flag is cleared once the set drains back to 0. + Returns true once the StatusMonitor process has accumulated `congested_threshold/0` waiters + The flag is cleared once the set of waiters drains back to 0. Used by callers to decide between a `GenServer.call` wait (low latency when uncontended) and a `PollWait.until/3` wait (bounded mailbox growth - under burst). Cheap: one ETS read against the per-stack status table. + under burst). """ @spec congested?(String.t()) :: boolean() def congested?(stack_id) do @@ -76,7 +76,7 @@ defmodule Electric.StatusMonitor do ArgumentError -> false end - @doc "Threshold at which the StatusMonitor flips its congestion flag. Exposed for tests." + @doc false @spec congested_threshold() :: pos_integer() def congested_threshold, do: @congested_threshold @@ -267,18 +267,14 @@ defmodule Electric.StatusMonitor do defp poll_wait(stack_id, level, opts) do timeout = Keyword.fetch!(opts, :timeout) - # Reuse check_level/2 so the polling path's readiness contract is by - # construction identical to a parked GenServer waiter's. Sleeping is - # handled by the outer case in wait_until/3 before dispatch; once we're - # polling, check_level/2's catch-all returns :not_ready and we keep going. - check = fn -> + check_fn = fn -> case check_level(level, stack_id) do {:ok, _} = ready -> {:ready, ready} :not_ready -> :not_ready end end - case PollWait.until(check, timeout) do + case PollWait.until(check_fn, timeout) do {:ready, value} -> value :timeout -> {:error, timeout_message(stack_id)} end From 9de45c0b9fad01e6ccbe2638075f73963514284f Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 21 May 2026 13:59:49 +0200 Subject: [PATCH 12/13] refactor(sync-service): replace do_wait_until sleep loop with PollWait --- .../lib/electric/status_monitor.ex | 77 ++++++++----------- 1 file changed, 33 insertions(+), 44 deletions(-) diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index e7a8e050cd..cf726eb34f 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -229,38 +229,27 @@ defmodule Electric.StatusMonitor do defp do_wait_until(stack_id, level, opts) do timeout = Keyword.fetch!(opts, :timeout) - try do - case stack_id |> name() |> GenServer.whereis() do - nil -> - maybe_retry_wait_until( - stack_id, - level, - opts, - timeout, - "Status monitor not found for stack ID: #{stack_id}" - ) - - pid when is_pid(pid) -> - GenServer.call(pid, {:wait_until, level, timeout}, :infinity) + check_fn = fn -> + case monitor_lookup(stack_id) do + {:ready, pid} -> + try do + {:ready, GenServer.call(pid, {:wait_until, level, timeout}, :infinity)} + catch + :exit, _reason -> :not_ready + end + + _ -> + :not_ready end - rescue - ArgumentError -> - maybe_retry_wait_until( - stack_id, - level, - opts, - timeout, - "Stack ID not recognised: #{stack_id}" - ) - catch - :exit, _reason -> - maybe_retry_wait_until( - stack_id, - level, - opts, - timeout, - "Stack #{inspect(stack_id)} has terminated" - ) + end + + case PollWait.until(check_fn, timeout, + initial_interval: @spin_prevention_delay, + max_interval: @spin_prevention_delay, + jitter: 0.0 + ) do + {:ready, result} -> result + :timeout -> {:error, monitor_unavailable_reason(stack_id)} end end @@ -280,21 +269,21 @@ defmodule Electric.StatusMonitor do end end - defp maybe_retry_wait_until(_stack_id, _level, _opts, timeout, last_error) - when timeout <= @spin_prevention_delay do - {:error, last_error} + defp monitor_lookup(stack_id) do + case stack_id |> name() |> GenServer.whereis() do + nil -> :monitor_not_found + pid when is_pid(pid) -> {:ready, pid} + end + rescue + ArgumentError -> :registry_not_found end - defp maybe_retry_wait_until(stack_id, level, opts, timeout, _) do - Process.sleep(@spin_prevention_delay) - - remaining_timeout = - case timeout do - :infinity -> :infinity - _ -> timeout - @spin_prevention_delay - end - - wait_until(stack_id, level, Keyword.put(opts, :timeout, remaining_timeout)) + defp monitor_unavailable_reason(stack_id) do + case monitor_lookup(stack_id) do + :monitor_not_found -> "Status monitor not found for stack ID: #{stack_id}" + :registry_not_found -> "Stack ID not recognised: #{stack_id}" + {:ready, _} -> "Stack #{inspect(stack_id)} has terminated" + end end @doc "Convenience wrapper: wait until fully active. Returns `:ok` on success." From 7dd7cc3f245df99e1550dfb068207482d68db93a Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 21 May 2026 14:24:43 +0200 Subject: [PATCH 13/13] test(sync-service): simplify :timeout_waiter drain test to use one waiter + forced flag --- .../test/electric/status_monitor_test.exs | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index 3651d5d5d6..3eaf6bd217 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -617,22 +617,20 @@ defmodule Electric.StatusMonitorTest do test "flips back to false when waiters drain via :timeout_waiter rather than readiness", %{stack_id: stack_id} do start_link_supervised!({StatusMonitor, stack_id: stack_id}) - - threshold = StatusMonitor.congested_threshold() pid = GenServer.whereis(StatusMonitor.name(stack_id)) - waiters = - for _ <- 1..threshold do - Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) - end + # Enqueue a real waiter on the GenServer.call path (flag is still false + # here, so the call path is taken and the waiter lands in state.waiters). + task = Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) + wait_until_waiters_count(pid, 1) - wait_until_waiters_count(pid, threshold) + # Now force the flag on. When the waiter's deadline fires, the + # :timeout_waiter handler removes it from state.waiters (size → 0) and + # calls maybe_clear_congested/2, which is what we're testing. + force_congested(stack_id) assert StatusMonitor.congested?(stack_id) == true - # All waiters time out — :timeout_waiter is the drain path. - results = Enum.map(waiters, &Task.await(&1, 1_000)) - assert Enum.all?(results, &match?({:error, _}, &1)) - + assert {:error, _} = Task.await(task, 1_000) StatusMonitor.wait_for_messages_to_be_processed(stack_id) assert StatusMonitor.congested?(stack_id) == false end