Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-replication-keepalive-timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fix replication connection drops caused by PostgreSQL's wal_sender_timeout during backpressure. The replication client now sends periodic keepalive messages while event processing is paused, preventing the connection from being killed during slow downstream processing.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
[doc Verify that the replication connection survives when the ShapeLogCollector is suspended (via :sys.suspend) for longer than wal_sender_timeout. The replication client dispatches events via non-blocking $gen_call and sends periodic keepalives that prevent PostgreSQL from killing the connection.]

[include _macros.luxinc]

[global pg_container_name=repl-keepalive__pg]

###

## Start Postgres with a low wal_sender_timeout.
## Electric derives its keepalive interval as min(wal_sender_timeout/3, 15s).
## With wal_sender_timeout=5s, keepalives fire every ~1.6s.
## With the old blocking code, the replication client would be stuck inside
## handle_data for the entire retry period — no keepalives sent — and PG would
## kill the connection after 5s. With the fix, keepalives fire every ~1.6s.
[invoke setup_pg_with_shell_name "pg" "" "-c wal_sender_timeout=5s" ""]

## Add a test table
[invoke start_psql]
[shell psql]
"""!
CREATE TABLE items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
val TEXT
);
"""
??CREATE TABLE

## Start Electric and wait for streaming
[invoke setup_electric]

[shell electric]
??[notice] Starting replication from postgres

## Insert initial data and verify it flows through a shape — proves replication works
[shell psql]
!INSERT INTO items (val) VALUES ('before-block');
??INSERT 0 1

[shell client]
[invoke shape_get_snapshot items]
??before-block

## Suspend the SLC process using :sys.suspend so it stops processing messages.
## The $gen_call from the ReplicationClient will sit in SLC's mailbox, and
## the gen_statem remains responsive to keepalive timers while waiting.
[shell electric]
# Disable the fail pattern — we expect error/warning logs during the retry period
-

!:sys.suspend(GenServer.whereis(Electric.Replication.ShapeLogCollector.name("single_stack")))
??:ok

## Insert data to trigger event processing — this starts the :not_ready retry loop
[shell psql]
!INSERT INTO items (val) VALUES ('during-block');
??INSERT 0 1

## Wait ~2x wal_sender_timeout (5s). If keepalives are not being sent,
## PostgreSQL kills the connection during this window.
[sleep 10]

## Resume event processing and check if data still flows.
## If the connection survived (fix works): data flows, no restart needed.
## If the connection was killed (old code): Electric has to reconnect,
## which triggers error logs about connection failure.
[shell electric]
# Set fail pattern to catch connection errors — this is the actual assertion.
# If the connection was killed, we'd see logs like:
# "Electric.Connection.Manager is restarting after it has encountered an error
# in replication mode: connection closed while talking to the database"
# or the gen_statem terminating with "tcp send: closed".
-connection closed|tcp send: closed|ssl send: closed|$fail_pattern

!:sys.resume(GenServer.whereis(Electric.Replication.ShapeLogCollector.name("single_stack")))
??:ok

## Insert data that should flow through if the connection survived
[shell psql]
!INSERT INTO items (val) VALUES ('after-restore');
??INSERT 0 1

## Give Electric time to process the queued events and the new insert.
## If the connection died, the fail pattern above will catch the error logs
## as they appear when the gen_statem discovers the dead socket.
[sleep 5]

## If we got here without the fail pattern triggering, the connection survived.
## Verify by checking health endpoint — it should still be active.
[shell client]
[invoke wait_health "3000" "active"]

[cleanup]
[invoke teardown]
4 changes: 2 additions & 2 deletions packages/sync-service/lib/electric/connection/restarter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ defmodule Electric.Connection.Restarter do
StatusMonitor.database_connections_waking_up(state.stack_id)
Electric.Connection.Manager.Supervisor.restart(stack_id: state.stack_id)

ref = StatusMonitor.wait_until_conn_up_async(state.stack_id)
ref = StatusMonitor.wait_until_async(state.stack_id, :active)

{:noreply, %{state | wait_until_conn_up_ref: ref}}
end
Expand All @@ -112,7 +112,7 @@ defmodule Electric.Connection.Restarter do
{:reply, :ok, state}
end

def handle_info({ref, :ok}, %{wait_until_conn_up_ref: ref} = state) do
def handle_info({{StatusMonitor, ref}, {:ok, :active}}, %{wait_until_conn_up_ref: ref} = state) do
{:noreply, %{state | wait_until_conn_up_ref: nil}}
end
end
Loading
Loading