Skip to content

Commit 3ad53af

Browse files
msfstefclaude
andcommitted
Address review: catch-all for stale replies, mode tests, :hang doc note
- Add catch-all handle_info for stale {ref, _} replies (defensive, matches StatusMonitor convention) - Add tests for set_event_processing_mode :suspend, :crash, and :hang - Add doc note that :hang is only recoverable by killing the process Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7a695d1 commit 3ad53af

3 files changed

Lines changed: 37 additions & 1 deletion

File tree

packages/sync-service/lib/electric/postgres/replication_client.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,11 @@ defmodule Electric.Postgres.ReplicationClient do
409409
end
410410
end
411411

412+
# Stale $gen_call reply from a previous event processing cycle — ignore.
413+
def handle_info({ref, _result}, state) when is_reference(ref) do
414+
{:noreply, state}
415+
end
416+
412417
# This callback is invoked when the connection process receives a shutdown signal.
413418
def handle_info({:EXIT, _pid, :shutdown}, _state) do
414419
Logger.debug("Replication client #{inspect(self())} received shutdown signal, stopping")

packages/sync-service/lib/electric/replication/shape_log_collector.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ defmodule Electric.Replication.ShapeLogCollector do
130130
131131
Modes:
132132
- `:suspend` — return `{:error, :not_ready}` without processing
133-
- `:hang` — accept the event but never reply (simulates slow handler)
133+
- `:hang` — accept the event but never reply (simulates slow handler; only recoverable by killing the process)
134134
- `:crash` — raise an error on the next event
135135
- `:normal` — process events normally (clears any test mode)
136136
"""

packages/sync-service/test/electric/replication/shape_log_collector_test.exs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,37 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
101101
end
102102
end
103103

104+
describe "set_event_processing_mode" do
105+
setup :setup_log_collector
106+
107+
test ":suspend causes handle_event to return {:error, :not_ready}", ctx do
108+
ShapeLogCollector.set_event_processing_mode(ctx.stack_id, :suspend)
109+
txn = transaction(100, Lsn.from_string("0/10"), [])
110+
assert {:error, :not_ready} = ShapeLogCollector.handle_event(txn, ctx.stack_id)
111+
112+
ShapeLogCollector.set_event_processing_mode(ctx.stack_id, :normal)
113+
assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id)
114+
end
115+
116+
test ":crash causes handle_event to exit the caller", ctx do
117+
ShapeLogCollector.set_event_processing_mode(ctx.stack_id, :crash)
118+
txn = transaction(100, Lsn.from_string("0/10"), [])
119+
120+
assert catch_exit(ShapeLogCollector.handle_event(txn, ctx.stack_id))
121+
end
122+
123+
test ":hang causes handle_event to block indefinitely", ctx do
124+
ShapeLogCollector.set_event_processing_mode(ctx.stack_id, :hang)
125+
txn = transaction(100, Lsn.from_string("0/10"), [])
126+
127+
task = Task.async(fn -> ShapeLogCollector.handle_event(txn, ctx.stack_id) end)
128+
refute_receive _, 100
129+
# The task is stuck — verify it hasn't completed
130+
assert nil == Task.yield(task, 0)
131+
Task.shutdown(task, :brutal_kill)
132+
end
133+
end
134+
104135
describe "shape restoration" do
105136
setup :setup_log_collector
106137

0 commit comments

Comments
 (0)