Skip to content

Fix replication connection drops from wal_sender_timeout during backpressure#4105

Open
msfstef wants to merge 7 commits intomainfrom
msfstef/repl-connection-timeout
Open

Fix replication connection drops from wal_sender_timeout during backpressure#4105
msfstef wants to merge 7 commits intomainfrom
msfstef/repl-connection-timeout

Conversation

@msfstef
Copy link
Copy Markdown
Contributor

@msfstef msfstef commented Apr 9, 2026

Problem

When the BEAM is busy or downstream event processing is slow/failing, the replication client's apply_with_retries blocks inside handle_data, preventing the gen_statem from responding to PostgreSQL's keepalive requests. After wal_sender_timeout (default 60s), PostgreSQL terminates the replication connection with errors like:

%DBConnection.ConnectionError{message: "ssl send: closed", severity: :error, reason: :closed}

The root cause is that PostgreSQL's replication protocol requires the client to periodically send StandbyStatusUpdate messages. These are application-level heartbeats — TCP keepalives (handled by the kernel) don't help. When the gen_statem is stuck in a blocking retry loop inside handle_data, it cannot send these messages, and PostgreSQL kills the connection.

Investigation

We confirmed this is a well-known problem across PostgreSQL logical replication consumers (Debezium, pgjdbc, pg_recvlogical). PostgreSQL's protocol has no flow control mechanism — keepalive handling is coupled with data processing on a single connection.

Key findings:

  • TCP keepalives don't help — they're kernel-level and work regardless of process scheduling, but wal_sender_timeout checks for application-level StandbyStatusUpdate messages
  • SSL has no heartbeat — TLS is just a record layer on top of TCP
  • wal_sender_timeout is the culprit — it fires when the client fails to send any StandbyStatusUpdate within the timeout window
  • Sending StandbyStatusUpdate without advancing the LSN is safe — it resets last_reply_timestamp without affecting the replication slot's confirmed_flush_lsn

Solution

The fix addresses two requirements that are in tension:

  1. Send keepalives while event processing is blocked, to prevent wal_sender_timeout
  2. Stop the replication stream from pushing more data over the socket, to provide backpressure

Vendored Postgrex.ReplicationConnection with socket pause/resume

We vendor Postgrex.ReplicationConnection as Electric.Postgres.ReplicationConnection, adding two new callback return types:

  • {:noreply_and_pause, ack, state} — send ack messages, then pause socket reads. The gen_statem stops receiving new WAL data but remains responsive to handle_info messages (timers, notifications).
  • {:noreply_and_resume, ack, state} — send acks, process any buffered data, resume socket reads.

The pause mechanism leverages {active, :once} — by not re-arming the socket after processing a batch, no new data enters the process. At most one Erlang message is buffered. TCP flow control naturally backpressures PostgreSQL's walsender when the kernel receive buffer fills (~128KB-6MB depending on OS).

Non-blocking event dispatch in ReplicationClient

Instead of blocking in handle_data via apply_with_retries:

handle_data(XLogData) → dispatch_event(event) → {:noreply_and_pause, [], state}

The event is dispatched as a {:process_event, ...} message to self. The gen_statem returns immediately and is free to process other messages.

Async event handler via $gen_call + Process.monitor

apply_event dispatches the event handler as a non-blocking $gen_call (the same protocol GenServer.call uses internally, and the same pattern as StatusMonitor.wait_until_async). The MFA returns a monitor ref; the gen_statem stores it and returns {:noreply, state} immediately. The reply ({ref, :ok} or {ref, {:error, reason}}) and crash detection ({:DOWN, ref, ...}) are handled in handle_info.

Zero per-event process spawning — just Process.monitor(pid) + send(pid, msg). The existing handle_call({:handle_event, ...}) in ShapeLogCollector handles the request unchanged.

On success, {:noreply_and_resume, acks, state} resumes the socket. On failure, retries are scheduled via Process.send_after or StatusMonitor.wait_until_async.

Periodic keepalive timer

A StandbyStatusUpdate is sent every min(wal_sender_timeout/3, 15s). The interval is derived from PostgreSQL's wal_sender_timeout (queried from pg_settings during connection setup). The 15s cap ensures responsiveness even if the timeout is set very high or changes after connection.

Event-driven retry with StatusMonitor.wait_until_async

Replaces the blocking StatusMonitor.wait_until_active(timeout: :infinity) with a new generic wait_until_async/2 that subscribes to status transitions and notifies the caller via a tagged message. This also replaces the previous wait_until_conn_up_async with a single generic mechanism.

Changes

File Change
lib/electric/postgres/replication_connection.ex New — vendored Postgrex.ReplicationConnection with socket pause/resume
lib/electric/postgres/replication_client.ex Non-blocking event dispatch via $gen_call + Process.monitor, keepalive timer, async retry
lib/electric/postgres/replication_client/connection_setup.ex Query wal_sender_timeout from pg_settings during setup
lib/electric/replication/shape_log_collector.ex handle_event_async/2 (non-blocking $gen_call wrapper), set_event_processing_mode/2 for integration testing
lib/electric/stack_supervisor.ex Use handle_event_async MFA
lib/electric/status_monitor.ex Generic wait_until_async/2 (replaces wait_until_conn_up_async)
lib/electric/connection/restarter.ex Use wait_until_async
test/electric/status_monitor_test.exs Tests for wait_until_async
test/electric/postgres/replication_client_test.exs Tests: connection survives wal_sender_timeout during backpressure (unavailable handler AND slow handler)
test/electric/replication/shape_log_collector_test.exs Tests for set_event_processing_mode (:suspend, :crash, :hang)
integration-tests/tests/replication-keepalive-during-backpressure.lux Lux integration test

Test plan

  • All 1551 existing unit tests pass + 3 new mode tests
  • 6 new wait_until_async tests pass
  • New Elixir test: connection survives wal_sender_timeout=3s with handler unavailable (6s window)
  • New Elixir test: connection survives wal_sender_timeout=3s with handler slow (6s delay) — fails without async $gen_call, passes with fix
  • Lux integration test: connection survives wal_sender_timeout=5s with 10s of suspended processing
  • 21/22 existing lux integration tests pass (1 pre-existing flaky failure unrelated to this change)

🤖 Generated with Claude Code

@msfstef msfstef added the claude label Apr 9, 2026
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 9, 2026

Deploy Preview for electric-next ready!

Name Link
🔨 Latest commit f464d7f
🔍 Latest deploy log https://app.netlify.com/projects/electric-next/deploys/69de70976ade9600088180fe
😎 Deploy Preview https://deploy-preview-4105--electric-next.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 9, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 89.20%. Comparing base (2c54c16) to head (5aa4a2b).
⚠️ Report is 3 commits behind head on main.
✅ All tests successful. No failed tests found.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #4105   +/-   ##
=======================================
  Coverage   89.20%   89.20%           
=======================================
  Files          25       25           
  Lines        2520     2520           
  Branches      636      636           
=======================================
  Hits         2248     2248           
  Misses        270      270           
  Partials        2        2           
Flag Coverage Δ
packages/experimental 87.73% <ø> (ø)
packages/react-hooks 86.48% <ø> (ø)
packages/start 82.83% <ø> (ø)
packages/typescript-client 94.30% <ø> (ø)
packages/y-electric 56.05% <ø> (ø)
typescript 89.20% <ø> (ø)
unit-tests 89.20% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@claude
Copy link
Copy Markdown

claude bot commented Apr 9, 2026

Claude Code Review

Summary

One new commit since iteration 6 (5aa4a2b40, 2026-04-15): removes the set_event_processing_mode test-only hook from ShapeLogCollector and switches the Lux integration test to use :sys.suspend/:sys.resume. This is the right call — zero production code pollution from tests. The PR remains ready to merge.

What's Working Well

  • set_event_processing_mode removed (shape_log_collector.ex): The four test-only handle_call clauses (:suspend, :crash, :hang, :normal modes) and the public API function are gone. Production code no longer carries test scaffolding.
  • Lux test uses :sys.suspend/:sys.resume (replication-keepalive-during-backpressure.lux): Using OTP's built-in process suspension is more realistic than a bespoke mock mode — it exercises the actual $gen_call backpressure path rather than a simulated one.
  • MockTransactionProcessor in tests (replication_client_test.exs): toggle_crash and set_delay are now isolated to the test module, which is the correct place for test-only behavior.

Issues Found

Critical: None

Important: None

Suggestions (Nice to Have)

1. Commit message claims catch-all for {ref, _} was added — it was not

File: packages/sync-service/lib/electric/postgres/replication_client.ex

The commit message for 5aa4a2b40 says "Add catch-all handle_info for stale {ref, _} replies", but the diff only removes inline struct comments and adds @type specs. No new handle_info clause was added.

Currently, a stale {ref, :ok} or {ref, {:error, reason}} where ref does not match state.pending_event would fall through all clauses and raise FunctionClauseError, crashing the gen_statem.

In practice this is safe: Erlang's message ordering guarantees that the reply ({ref, :ok}) from a process that then exits arrives before the :DOWN monitor message, so the success handler always fires first. Still, the commit message created an expectation. If the intent was to add it, the missing clause is:

# Stale or unexpected reply from a previous async call (ref no longer matches
# pending_event). Discard silently — see StatusMonitor catch-all above.
def handle_info({ref, _result}, state) when is_reference(ref) do
  {:noreply, state}
end

2. Restarter still relies on GenServer default for stale StatusMonitor replies (from iteration 6)

File: packages/sync-service/lib/electric/connection/restarter.ex:115-117

Unchanged from last iteration — still only the one clause matching the exact ref. A stale {{StatusMonitor, _ref}, _result} falls to GenServer default (logs a warning). Adding the same catch-all as ReplicationClient line 353 would be consistent:

def handle_info({{StatusMonitor, _ref}, _result}, state) do
  {:noreply, state}
end

Issue Conformance

No linked issue. PR description is detailed and self-contained.

Previous Review Status

  • Iteration 6 suggestion (Restarter catch-all): Still open — not addressed.
  • Test-only code in SLC: Fully resolved — set_event_processing_mode removed, :sys.suspend used in Lux test.
  • All prior critical/important items: Remain resolved.

The PR is ready to merge. Both open items are suggestions that can be addressed in a follow-up.


Review iteration: 7 | 2026-04-15

@msfstef
Copy link
Copy Markdown
Contributor Author

msfstef commented Apr 9, 2026

Thanks for the thorough review! Addressed the actionable items in 0393830:

Fixed:

  • 1 (catch-all for stale StatusMonitor messages) — Added a catch-all handle_info clause that silently discards {{Electric.StatusMonitor, _ref}, _result} messages when wait_for_active_ref doesn't match. Prevents FunctionClauseError on stale notifications.
  • 4 (Logger level) — Downgraded keepalive interval log from info to debug.
  • 6 (_remaining parameter) — Added comment explaining the intentional budget reset (matches old apply_with_retries behavior).
  • 3 (changeset) — Added .changeset/fix-replication-keepalive-timeout.md.

Not addressed (by design):

  • 2 (StatusMonitor restart) — Not an issue. StatusMonitor is a child of MonitoredCoreSupervisor which uses :rest_for_one strategy, with StatusMonitor listed before CoreSupervisor. If StatusMonitor crashes, everything downstream (including ReplicationClient) is restarted. The supervision tree comment in monitored_core_supervisor.ex documents this explicitly.
  • 5 (dead code clause) — Already addressed in an earlier commit (42ca443) — turned it into a live defensive Logger.warning that fires if a second socket message arrives while paused (shouldn't happen with {active, :once} but guards against future OTP changes).

@msfstef
Copy link
Copy Markdown
Contributor Author

msfstef commented Apr 9, 2026

benchmark this

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

Benchmark results, triggered for 03938

  • write fanout completed

write fanout results

  • concurrent shape creation completed

concurrent shape creation results

  • diverse shape fanout completed

diverse shape fanout results

  • many shapes one client latency completed

many shapes one client latency results

  • unrelated shapes one client latency completed

unrelated shapes one client latency results

@msfstef msfstef requested review from alco and icehaunter April 9, 2026 12:56
@msfstef msfstef marked this pull request as ready for review April 9, 2026 12:56
@alco
Copy link
Copy Markdown
Member

alco commented Apr 9, 2026

Wow, these benchmark results are music!

@msfstef
Copy link
Copy Markdown
Contributor Author

msfstef commented Apr 9, 2026

benchmark this

@msfstef msfstef self-assigned this Apr 9, 2026
@msfstef
Copy link
Copy Markdown
Contributor Author

msfstef commented Apr 9, 2026

@alco turns out it was against 1.2.8 so I updated the reference image and ran them again! there's no regression which is good (and mostly what I expected, the previous results were unexpectedly better)

Copy link
Copy Markdown
Member

@alco alco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brilliant!

Copy link
Copy Markdown
Contributor

@icehaunter icehaunter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers the case of SLC crashing & retries, but if SLC is taking too long on a transaction for any reason (e.g. writign to all 200k consumers) then we're still blocked on a GenServer.call

@msfstef
Copy link
Copy Markdown
Contributor Author

msfstef commented Apr 14, 2026

benchmark this

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 14, 2026

Benchmark results, triggered for 7a695

  • write fanout completed

write fanout results

  • diverse shape fanout completed

diverse shape fanout results

  • concurrent shape creation completed

concurrent shape creation results

  • unrelated shapes one client latency completed

unrelated shapes one client latency results

  • many shapes one client latency completed

many shapes one client latency results

Copy link
Copy Markdown
Contributor Author

@msfstef msfstef left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icehaunter Good catch — you're right, the previous commit only made the dispatch async but apply_event still blocked on the GenServer.call(:infinity) to SLC.

Fixed in 7a695d1 + 3ad53af:

handle_event_async/2 replaces the blocking GenServer.call with a manual $gen_call + Process.monitor — the same pattern as StatusMonitor.wait_until_async. The gen_statem sends the message and returns immediately; the reply (or :DOWN on crash) is handled in handle_info. Zero per-event process spawning.

New test ("connection survives wal_sender_timeout when event handler is slow") validates a 6s handler delay against a 3s wal_sender_timeout — this specifically covers the case you flagged where SLC is taking a long time (e.g. writing to 200k consumers via ConsumerRegistry.broadcast).

Also consolidated suspend/resume_event_processing into set_event_processing_mode/2 with :suspend, :hang, :crash, and :normal modes for integration testing.

msfstef and others added 6 commits April 14, 2026 19:51
Replace the single-purpose wait_until_conn_up_async with a generic
wait_until_async/2 that supports any status level (:active, :read_only).
Uses a tagged reply format {{Electric.StatusMonitor, ref}, {:ok, level}}
to avoid message collisions in the caller's mailbox.

This removes the dedicated :wait_until_conn_up handler and conn_waiters
state, consolidating all async waiting through the existing level-based
waiter infrastructure.

The Restarter is updated to use wait_until_async(stack_id, :active).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ressure

When downstream event processing is slow or failing, the replication client
retries inside handle_data via apply_with_retries. This blocks the gen_statem
process, preventing it from responding to PostgreSQL's keepalive requests.
After wal_sender_timeout (default 60s), PostgreSQL terminates the connection.

The fix makes event processing non-blocking by:

1. Vendoring Postgrex.ReplicationConnection with socket pause/resume support.
   When handle_data receives WAL data, it dispatches the event asynchronously
   and pauses the socket — stopping PostgreSQL from pushing more data while
   the gen_statem remains responsive to timer and info messages.

2. Adding a periodic keepalive timer that sends StandbyStatusUpdate messages
   at wal_sender_timeout/3 intervals (queried from pg_settings during setup,
   capped at 15s). These fire between async retries, keeping the connection
   alive even during prolonged processing failures.

3. Using StatusMonitor.wait_until_async for event-driven retry notification
   instead of the blocking wait_until_active(timeout: :infinity) call.

The socket pause mechanism leverages {active, :once} — by not re-arming
the socket after processing a batch, no new data enters the process. TCP
flow control naturally backpressures PostgreSQL's walsender when the kernel
receive buffer fills. On resume, buffered copies and socket messages are
drained in order.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ndored ReplicationConnection

- Document the Postgrex version (0.22.0) and Protocol internals we depend on
- Turn the unreachable dead-code clause for duplicate socket messages into
  a live defensive warning log
- Vendor LSN encode/decode tests from upstream Postgrex to validate the
  vendored functions remain correct

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add catch-all handle_info for stale StatusMonitor notifications to
  prevent FunctionClauseError on unmatched refs
- Downgrade keepalive interval log from info to debug (fires on every
  reconnect)
- Add comment explaining intentional retry budget reset in
  wait_for_active_and_retry
- Add changeset file
- Fix field name to wait_for_active_ref consistently

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The synchronous GenServer.call(:infinity) in apply_event blocked the
gen_statem, preventing keepalive processing when the handler was slow
(e.g. ConsumerRegistry.broadcast waiting on many shape consumers).

Replace the blocking call with a manual $gen_call + Process.monitor
pattern (same as StatusMonitor.wait_until_async). The MFA now returns
a monitor ref, and the gen_statem handles the reply/DOWN in handle_info.
Zero per-event process spawning — just make_ref + monitor + send.

Also consolidate suspend/resume_event_processing into a single
set_event_processing_mode/2 API supporting :suspend, :hang, :crash,
and :normal modes for integration testing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@msfstef msfstef force-pushed the msfstef/repl-connection-timeout branch from 3ad53af to f464d7f Compare April 14, 2026 16:51
Copy link
Copy Markdown
Member

@alco alco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤘

- Add catch-all handle_info for stale {ref, _} replies (defensive, matches
  StatusMonitor convention)
- Remove set_event_processing_mode and all test helpers from SLC — use
  :sys.suspend/:sys.resume in Lux integration test instead (zero test
  pollution in production code)
- MockTransactionProcessor handles all test scenarios: set_delay for slow,
  toggle_crash for crash, not-started for :noproc

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@msfstef msfstef force-pushed the msfstef/repl-connection-timeout branch from f464d7f to 5aa4a2b Compare April 15, 2026 10:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants