diff --git a/.changeset/fix-suspend-mid-txn.md b/.changeset/fix-suspend-mid-txn.md new file mode 100644 index 0000000000..e32c338474 --- /dev/null +++ b/.changeset/fix-suspend-mid-txn.md @@ -0,0 +1,13 @@ +--- +"@core/sync-service": patch +--- + +Fix a `KeyError: key :consider_flushed? not found in: nil` crash in +`Electric.Shapes.Consumer`. A shape consumer could suspend (terminate to save +memory) on its idle timeout while still holding a `pending_txn` for an +in-flight multi-fragment transaction. When a later fragment of that transaction +arrived, a fresh consumer was started and received a `has_begin?: false` +fragment with no pending transaction, crashing in `process_txn_fragment/2`. +`consumer_can_suspend?/1` now refuses to suspend while a transaction is pending, +so the consumer hibernates instead and suspends only once the transaction +completes. diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 5e8cd6d6b3..3ab114d0f5 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -399,6 +399,7 @@ defmodule Electric.Shapes.Consumer do # 3. we are not part of a subquery dependency tree, that is either # a. we have no dependent shapes # b. we don't have a materializer subscribed + # 4. we're not in the middle of processing a multi-fragment transaction if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end) @@ -416,7 +417,7 @@ defmodule Electric.Shapes.Consumer do defp consumer_can_suspend?(state) do is_snapshot_started(state) and not Shape.has_dependencies(state.shape) and - not state.materializer_subscribed? + not state.materializer_subscribed? and is_nil(state.pending_txn) end @impl GenServer diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 40b27576c5..518780ce40 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -1461,6 +1461,76 @@ defmodule Electric.Shapes.ConsumerTest do assert is_pid(Consumer.whereis(ctx.stack_id, shape_handle)) end + @tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "should hibernate not suspend while a multi-fragment transaction is pending", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + # The begin fragment of a multi-fragment transaction leaves the consumer + # holding a pending_txn until the matching commit fragment arrives. + begin_fragment = + txn_fragment( + 2, + lsn1, + [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ], + has_begin?: true, + has_commit?: false + ) + + assert :ok = ShapeLogCollector.handle_event(begin_fragment, ctx.stack_id) + + # The idle timer (hibernate_after: 10ms) fires, but with a transaction still + # pending the consumer must hibernate rather than suspend, so it survives to + # receive the rest of the transaction. Suspending here would drop pending_txn + # and crash on the next fragment (issue #4501). + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 100 + + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + assert is_pid(Consumer.whereis(ctx.stack_id, shape_handle)) + + # Completing the transaction clears pending_txn, so the consumer is free to + # suspend on the next idle timeout. + commit_fragment = + txn_fragment( + 2, + lsn1, + [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "22"}, + log_offset: LogOffset.new(lsn1, 1) + } + ], + has_begin?: false, + has_commit?: true + ) + + assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id) + + assert_receive {:flush_boundary_updated, 300}, 1_000 + + assert_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 1_000 + + refute Consumer.whereis(ctx.stack_id, shape_handle) + end + @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: false test "ConsumerRegistry.enable_suspend should suspend hibernated consumers", ctx do