Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/fix-suspend-mid-txn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@core/sync-service": patch
---

Fix a `KeyError: key :consider_flushed? not found in: nil` crash in
`Electric.Shapes.Consumer`. A shape consumer could suspend (terminate to save
memory) on its idle timeout while still holding a `pending_txn` for an
in-flight multi-fragment transaction. When a later fragment of that transaction
arrived, a fresh consumer was started and received a `has_begin?: false`
fragment with no pending transaction, crashing in `process_txn_fragment/2`.
`consumer_can_suspend?/1` now refuses to suspend while a transaction is pending,
so the consumer hibernates instead and suspends only once the transaction
completes.
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ defmodule Electric.Shapes.Consumer do
# 3. we are not part of a subquery dependency tree, that is either
# a. we have no dependent shapes
# b. we don't have a materializer subscribed
# 4. we're not in the middle of processing a multi-fragment transaction

if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do
Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end)
Expand All @@ -416,7 +417,7 @@ defmodule Electric.Shapes.Consumer do

defp consumer_can_suspend?(state) do
is_snapshot_started(state) and not Shape.has_dependencies(state.shape) and
not state.materializer_subscribed?
not state.materializer_subscribed? and is_nil(state.pending_txn)
end

@impl GenServer
Expand Down
70 changes: 70 additions & 0 deletions packages/sync-service/test/electric/shapes/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,76 @@ defmodule Electric.Shapes.ConsumerTest do
assert is_pid(Consumer.whereis(ctx.stack_id, shape_handle))
end

@tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1]
@tag suspend: true
test "should hibernate not suspend while a multi-fragment transaction is pending", ctx do
register_as_replication_client(ctx.stack_id)

{shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id)
lsn1 = Lsn.from_integer(300)

:started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id)

consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle)
assert is_pid(consumer_pid)
ref = Process.monitor(consumer_pid)

# The begin fragment of a multi-fragment transaction leaves the consumer
# holding a pending_txn until the matching commit fragment arrives.
begin_fragment =
txn_fragment(
2,
lsn1,
[
%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "21"},
log_offset: LogOffset.new(lsn1, 0)
}
],
has_begin?: true,
has_commit?: false
)

assert :ok = ShapeLogCollector.handle_event(begin_fragment, ctx.stack_id)

# The idle timer (hibernate_after: 10ms) fires, but with a transaction still
# pending the consumer must hibernate rather than suspend, so it survives to
# receive the rest of the transaction. Suspending here would drop pending_txn
# and crash on the next fragment (issue #4501).
refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 100

assert {:current_function, {:gen_server, :loop_hibernate, 4}} =
Process.info(consumer_pid, :current_function)

assert is_pid(Consumer.whereis(ctx.stack_id, shape_handle))

# Completing the transaction clears pending_txn, so the consumer is free to
# suspend on the next idle timeout.
commit_fragment =
txn_fragment(
2,
lsn1,
[
%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "22"},
log_offset: LogOffset.new(lsn1, 1)
}
],
has_begin?: false,
has_commit?: true
)

assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id)

assert_receive {:flush_boundary_updated, 300}, 1_000

assert_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 1_000

refute Consumer.whereis(ctx.stack_id, shape_handle)
end

@tag with_pure_file_storage_opts: [flush_period: 1]
@tag suspend: false
test "ConsumerRegistry.enable_suspend should suspend hibernated consumers", ctx do
Expand Down
Loading