From 1d7498fe35312de91f2242b80a3f953c0b15ea19 Mon Sep 17 00:00:00 2001 From: Anthony Accomazzo Date: Sat, 24 Jan 2026 17:01:26 -0800 Subject: [PATCH] bench: add `--through sps` --- config/bench.exs | 1 + config/test.exs | 1 + lib/mix/tasks/bench.ex | 177 ++++++++++++-- lib/sequin/benchmark/message_handler.ex | 79 ++++++ lib/sequin/benchmark/stats.ex | 231 +++++++++++++++--- lib/sequin/postgres/benchmark_source.ex | 20 +- lib/sequin/runtime/benchmark_pipeline.ex | 32 ++- lib/sequin/runtime/message_handler.ex | 8 +- lib/sequin/runtime/sink_pipeline.ex | 27 +- .../runtime/benchmark_pipeline_test.exs | 67 +++-- test/sequin/sink_pipeline_test.exs | 3 + 11 files changed, 550 insertions(+), 96 deletions(-) create mode 100644 lib/sequin/benchmark/message_handler.ex diff --git a/config/bench.exs b/config/bench.exs index 15706f86d..776cc3c87 100644 --- a/config/bench.exs +++ b/config/bench.exs @@ -1,6 +1,7 @@ import Config config :sequin, Oban, testing: :manual, prefix: "sequin_config" +config :sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 0.1 config :sequin, Sequin.Repo, username: "postgres", diff --git a/config/test.exs b/config/test.exs index 9ded40b1b..410da089e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -28,6 +28,7 @@ config :sequin, Sequin, process_mod: Sequin.TestSupport.ProcessMock, application_mod: Sequin.TestSupport.ApplicationMock +config :sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 0.1 config :sequin, Sequin.Mailer, adapter: Swoosh.Adapters.Test config :sequin, Sequin.Pagerduty, diff --git a/lib/mix/tasks/bench.ex b/lib/mix/tasks/bench.ex index 831d31c39..a0037f48a 100644 --- a/lib/mix/tasks/bench.ex +++ b/lib/mix/tasks/bench.ex @@ -10,13 +10,13 @@ defmodule Mix.Tasks.Bench do ## Options - * `--duration` - Duration to run benchmark in seconds (default: 60) + * `--duration` - Duration to run benchmark in seconds (default: 60 when --max-messages not set) * `--row-sizes` - Row size distribution as "fraction:bytes,..." (default: "1.0:200") * `--transaction-sizes` - Transaction size distribution as "fraction:count,..." (default: "1.0:10") * `--pk-collision-rate` - PK collision rate 0.0-1.0 (default: 0.005) * `--partition-count` - Number of partitions (default: schedulers_online) - * `--max-messages` - Maximum messages to generate (default: unlimited) - * `--through` - Pipeline stage to run through: "full" or "reorder_buffer" (default: "full") + * `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration) + * `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps" (default: "full") ## Examples @@ -34,10 +34,14 @@ defmodule Mix.Tasks.Bench do # Run through reorder_buffer only (isolated pipeline test) mix benchmark --through reorder_buffer + + # Run through SlotProcessorServer (includes message handler, stops before Broadway) + mix benchmark --through sps """ use Mix.Task alias Sequin.Accounts + alias Sequin.Benchmark.MessageHandler, as: BenchmarkMessageHandler alias Sequin.Benchmark.Stats alias Sequin.Consumers alias Sequin.Databases @@ -80,7 +84,7 @@ defmodule Mix.Tasks.Bench do ] ) - duration = Keyword.get(opts, :duration, @default_duration) + duration_opt = Keyword.get(opts, :duration) row_sizes = parse_distribution(Keyword.get(opts, :row_sizes), @default_row_sizes) transaction_sizes = parse_distribution(Keyword.get(opts, :transaction_sizes), @default_transaction_sizes) pk_collision_rate = Keyword.get(opts, :pk_collision_rate, @default_pk_collision_rate) @@ -88,6 +92,17 @@ defmodule Mix.Tasks.Bench do max_messages = Keyword.get(opts, :max_messages) through = opts |> Keyword.get(:through, "full") |> String.to_existing_atom() + if max_messages && duration_opt do + Mix.raise("--duration and --max-messages are mutually exclusive") + end + + duration = + cond do + duration_opt -> duration_opt + max_messages -> nil + true -> @default_duration + end + # Start the application Mix.Task.run("app.start") @@ -95,7 +110,7 @@ defmodule Mix.Tasks.Bench do IO.puts("") announce("Configuration:", @yellow) - IO.puts(" Duration: #{duration}s") + IO.puts(" Duration: #{format_duration(duration, max_messages)}") IO.puts(" Row sizes: #{inspect(row_sizes)}") IO.puts(" Transaction sizes: #{inspect(transaction_sizes)}") IO.puts(" PK collision rate: #{pk_collision_rate}") @@ -121,7 +136,7 @@ defmodule Mix.Tasks.Bench do IO.puts(" Replication slot: #{replication.id}") # For :full mode, we need a consumer for Broadway - # For :reorder_buffer mode, we use the replication.id as the checksum owner + # For :reorder_buffer and :sps modes, we use the replication.id as the checksum owner {consumer, checksum_owner_id} = case through do :full -> @@ -133,6 +148,11 @@ defmodule Mix.Tasks.Bench do # Initialize checksums with replication.id as owner Stats.init_for_owner(replication.id, partition_count) {nil, replication.id} + + :sps -> + # Initialize checksums with replication.id as owner + Stats.init_for_owner(replication.id, partition_count) + {nil, replication.id} end IO.puts("") @@ -191,6 +211,31 @@ defmodule Mix.Tasks.Bench do skip_start?: true ] ) + + :sps -> + replication = Repo.preload(replication, :postgres_database) + + # Initialize stats tracking for this owner + Stats.init_for_owner(checksum_owner_id, partition_count) + + # Create context for the benchmark message handler + message_handler_ctx = %BenchmarkMessageHandler.Context{ + partition_count: partition_count, + checksum_owner_id: checksum_owner_id + } + + {:ok, _slot_sup} = + SlotProducerSupervisor.start_link( + replication_slot: replication, + slot_producer_opts: [ + backend_mod: VirtualBackend, + connect_opts: [id: source_id, source_mod: BenchmarkSource] + ], + slot_processor_opts: [ + message_handler_module: BenchmarkMessageHandler, + message_handler_ctx_fn: fn _replication -> message_handler_ctx end + ] + ) end IO.puts("") @@ -206,8 +251,13 @@ defmodule Mix.Tasks.Bench do capture_metrics_from_module(checksum_owner_id) end - announce("Running benchmark for #{duration}s...", @green) - Process.sleep(duration * 1000) + announce("Running benchmark for #{format_duration(duration, max_messages)}...", @green) + + if duration do + Process.sleep(duration * 1000) + else + wait_for_max_messages(source_id, max_messages) + end # Pause the source and wait for pipeline to drain announce("Pausing source and waiting for pipeline to drain...", @yellow) @@ -239,6 +289,20 @@ defmodule Mix.Tasks.Bench do source_checksums = BenchmarkSource.checksums(source_id) pipeline_checksums = Stats.checksums(checksum_owner_id) + source_group_checksums = + if through == :full do + Stats.group_checksums(source_id, scope: :source) + else + %{} + end + + pipeline_group_checksums = + if through == :full do + Stats.group_checksums(checksum_owner_id) + else + %{} + end + # Get tracked messages for comparison source_tracked = BenchmarkSource.tracked_messages(source_id) pipeline_tracked = Stats.tracked_messages(checksum_owner_id) @@ -250,6 +314,9 @@ defmodule Mix.Tasks.Bench do actual_duration_s, source_checksums, pipeline_checksums, + source_group_checksums, + pipeline_group_checksums, + through, source_tracked, pipeline_tracked ) @@ -281,13 +348,23 @@ defmodule Mix.Tasks.Bench do end defp pipeline_total_count(consumer_id) do - consumer_id - |> Stats.checksums() - |> Map.values() - |> Enum.map(&elem(&1, 1)) - |> Enum.sum() + Stats.message_count(consumer_id) end + defp wait_for_max_messages(source_id, max_messages, poll_ms \\ 200) do + %{total_messages: total_messages} = BenchmarkSource.stats(source_id) + + if total_messages >= max_messages do + :ok + else + Process.sleep(poll_ms) + wait_for_max_messages(source_id, max_messages, poll_ms) + end + end + + defp format_duration(nil, max_messages), do: "until #{max_messages} messages" + defp format_duration(duration, _max_messages), do: "#{duration}s" + defp setup_base_entities do # Create account using domain function {:ok, account} = @@ -336,7 +413,10 @@ defmodule Mix.Tasks.Bench do status: :active, actions: [:insert, :update, :delete], replication_slot_id: replication.id, - sink: %{type: :benchmark, partition_count: partition_count}, + sink: %{ + type: :benchmark, + partition_count: partition_count + }, source: %{include_table_oids: [16_384, 16_385, 16_386]} }) @@ -360,14 +440,14 @@ defmodule Mix.Tasks.Bench do partition_msgs |> Enum.sort_by(&{&1.message.commit_lsn, &1.message.commit_idx}) |> Enum.each(fn msg -> - Stats.message_received( - checksum_owner_id, - partition, - msg.message.commit_lsn, - msg.message.commit_idx, + Stats.message_received(%Stats.Message{ + owner_id: checksum_owner_id, + partition: partition, + commit_lsn: msg.message.commit_lsn, + commit_idx: msg.message.commit_idx, byte_size: msg.byte_size, created_at_us: extract_created_at(msg.message.fields) - ) + }) end) end) @@ -446,6 +526,9 @@ defmodule Mix.Tasks.Bench do duration_s, source_checksums, pipeline_checksums, + source_group_checksums, + pipeline_group_checksums, + through, source_tracked, pipeline_tracked ) do @@ -477,7 +560,14 @@ defmodule Mix.Tasks.Bench do # Verification announce("Verification:", @yellow) - verify_checksums(source_checksums, pipeline_checksums) + + case through do + :full -> + verify_group_checksums(source_group_checksums, pipeline_group_checksums) + + _ -> + verify_checksums(source_checksums, pipeline_checksums) + end # Message tracking comparison IO.puts("") @@ -539,6 +629,51 @@ defmodule Mix.Tasks.Bench do end end + defp verify_group_checksums(source_group_checksums, pipeline_group_checksums) do + source_groups = source_group_checksums |> Map.keys() |> Enum.sort() + pipeline_groups = pipeline_group_checksums |> Map.keys() |> Enum.sort() + + IO.puts(" Group checksum sample rate: #{Stats.checksum_sample_rate()}") + IO.puts(" Sampled groups: #{length(pipeline_groups)}") + + cond do + Enum.empty?(pipeline_groups) -> + IO.puts(" #{@yellow}No sampled groups to verify#{@reset}") + + source_groups == pipeline_groups -> + mismatches = + Enum.filter(source_groups, fn group_id -> + source_group_checksums[group_id] != pipeline_group_checksums[group_id] + end) + + if Enum.empty?(mismatches) do + IO.puts(" #{@green}Group checksums: PASS (all sampled groups match)#{@reset}") + else + IO.puts(" #{@yellow}Group checksums: PARTIAL (#{length(mismatches)} groups differ)#{@reset}") + + mismatches + |> Enum.take(5) + |> Enum.each(fn group_id -> + {src_checksum, src_count} = source_group_checksums[group_id] + {pipe_checksum, pipe_count} = pipeline_group_checksums[group_id] + + IO.puts( + " Group #{group_id}: source={#{src_checksum}, #{src_count}} pipeline={#{pipe_checksum}, #{pipe_count}}" + ) + end) + + if length(mismatches) > 5 do + IO.puts(" ... and #{length(mismatches) - 5} more") + end + end + + true -> + IO.puts(" #{@yellow}WARNING: Sampled group mismatch#{@reset}") + IO.puts(" Source groups: #{length(source_groups)}") + IO.puts(" Pipeline groups: #{length(pipeline_groups)}") + end + end + defp compare_tracked_messages(source_tracked, pipeline_tracked) do source_count = length(source_tracked) pipeline_count = length(pipeline_tracked) diff --git a/lib/sequin/benchmark/message_handler.ex b/lib/sequin/benchmark/message_handler.ex new file mode 100644 index 000000000..a3200d537 --- /dev/null +++ b/lib/sequin/benchmark/message_handler.ex @@ -0,0 +1,79 @@ +defmodule Sequin.Benchmark.MessageHandler do + @moduledoc """ + A message handler for benchmarking that records stats instead of routing to consumers. + + Used with `--through sps` mode to measure pipeline performance through SlotProcessorServer + without the overhead of SlotMessageStore and Broadway. + """ + + @behaviour Sequin.Runtime.MessageHandler + + alias Sequin.Benchmark.Stats + alias Sequin.Runtime.SlotProcessor.Message + + defmodule Context do + @moduledoc false + use TypedStruct + + typedstruct do + field :partition_count, pos_integer(), enforce: true + field :checksum_owner_id, term(), enforce: true + end + end + + @impl true + def before_handle_messages(%Context{}, _messages), do: :ok + + @impl true + def handle_messages(%Context{} = ctx, messages) do + # Group by partition (using PK as group_id, same as BenchmarkSource) + messages + |> Enum.group_by(fn %Message{} = msg -> + # Extract PK from message.ids (first element for single-column PK) + group_id = msg.ids |> List.first() |> to_string() + Stats.partition(group_id, ctx.partition_count) + end) + |> Enum.each(fn {partition, partition_msgs} -> + # Sort within partition by (commit_lsn, commit_idx) + partition_msgs + |> Enum.sort_by(&{&1.commit_lsn, &1.commit_idx}) + |> Enum.each(fn %Message{} = msg -> + Stats.message_received(%Stats.Message{ + owner_id: ctx.checksum_owner_id, + partition: partition, + commit_lsn: msg.commit_lsn, + commit_idx: msg.commit_idx, + byte_size: msg.byte_size, + created_at_us: extract_created_at(msg.fields) + }) + end) + end) + + {:ok, length(messages)} + end + + @impl true + def put_high_watermark_wal_cursor(%Context{}, _cursor), do: :ok + + # Also handle logical messages (no-op for benchmarking) + def handle_logical_message(%Context{}, _commit_lsn, _msg), do: :ok + + # Extract created_at from message fields (stored as microseconds since epoch) + defp extract_created_at(fields) when is_list(fields) do + case Enum.find(fields, fn f -> f.column_name == "created_at" end) do + %{value: created_at} when is_integer(created_at) -> + created_at + + %{value: created_at} when is_binary(created_at) -> + case Integer.parse(created_at) do + {int, ""} -> int + _ -> nil + end + + _ -> + nil + end + end + + defp extract_created_at(_), do: nil +end diff --git a/lib/sequin/benchmark/stats.ex b/lib/sequin/benchmark/stats.ex index ffb3d1f7d..47a6b74d5 100644 --- a/lib/sequin/benchmark/stats.ex +++ b/lib/sequin/benchmark/stats.ex @@ -7,17 +7,28 @@ defmodule Sequin.Benchmark.Stats do ETS tables: - `:benchmark_checksums` (checksums per owner + partition) + - `:benchmark_group_checksums` (checksums per owner + group_id) - `:benchmark_metrics` (bytes delivered, latency sum/count per owner) + - `:benchmark_message_counts` (total messages per owner) - `:benchmark_tracked_messages` (optional message tracking) Tracking is controlled by: config :sequin, Sequin.Benchmark.Stats, track_messages: true + + Per-group checksum sampling is controlled by: + + config :sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 0.1 """ + alias Sequin.Benchmark.Stats + @checksums_table :benchmark_checksums + @group_checksums_table :benchmark_group_checksums @metrics_table :benchmark_metrics + @counts_table :benchmark_message_counts @tracking_table :benchmark_tracked_messages + @default_checksum_sample_rate 0.1 @type scope :: :pipeline | :source @@ -25,6 +36,37 @@ defmodule Sequin.Benchmark.Stats do # Public API # ============================================================================ + defmodule Message do + @moduledoc false + defstruct [:owner_id, :partition, :commit_lsn, :commit_idx, :byte_size, :created_at_us, :scope] + + @type t :: %__MODULE__{ + owner_id: term(), + partition: non_neg_integer(), + commit_lsn: integer(), + commit_idx: integer(), + byte_size: non_neg_integer() | nil, + created_at_us: integer() | nil, + scope: Stats.scope() | nil + } + end + + defmodule GroupMessage do + @moduledoc false + defstruct [:owner_id, :group_id, :commit_lsn, :commit_idx, :partition, :byte_size, :created_at_us, :scope] + + @type t :: %__MODULE__{ + owner_id: term(), + group_id: term(), + commit_lsn: integer(), + commit_idx: integer(), + partition: non_neg_integer() | nil, + byte_size: non_neg_integer() | nil, + created_at_us: integer() | nil, + scope: Stats.scope() | nil + } + end + @doc """ Computes partition from a group_id using consistent hashing. """ @@ -48,6 +90,7 @@ defmodule Sequin.Benchmark.Stats do # Initialize metrics :ets.insert_new(@metrics_table, {owner_key, 0, 0, 0}) + :ets.insert_new(@counts_table, {owner_key, 0}) :ok end @@ -57,27 +100,48 @@ defmodule Sequin.Benchmark.Stats do Used by BenchmarkSource to track what was produced. """ - @spec message_emitted(term(), non_neg_integer(), integer(), integer(), keyword()) :: :ok - def message_emitted(owner_id, partition, commit_lsn, commit_idx, opts \\ []) do - update_checksum(owner_id, partition, commit_lsn, commit_idx, opts) + @spec message_emitted(Message.t()) :: :ok + def message_emitted(%Message{} = message) do + update_checksum(message) :ok end @doc """ - Records a message received by the pipeline. Updates checksum and metrics. + Records a message emitted by the source for a specific group_id. + Updates per-group checksum only (no metrics). Sampling applied internally. + """ + @spec message_emitted_for_group(GroupMessage.t()) :: :ok + def message_emitted_for_group(%GroupMessage{group_id: nil}), do: raise("group_id cannot be nil") - Used by the pipeline (BenchmarkPipeline or mock_flush_batch_fn) to track what was consumed. + def message_emitted_for_group(%GroupMessage{} = message) do + if sample_group?(message.group_id), do: update_group_checksum(message) + :ok + end - ## Options + @doc """ + Records a message received by the pipeline. Updates checksum and metrics. + """ + @spec message_received(Message.t()) :: :ok + def message_received(%Message{} = message) do + update_checksum(message) + update_count(message) + update_metrics(message) + :ok + end - * `:byte_size` - Size of the message in bytes (optional) - * `:created_at_us` - Creation timestamp in microseconds for latency calculation (optional) - * `:scope` - `:pipeline` (default) or `:source` + @doc """ + Records a message received by the pipeline for a specific group_id. + Updates per-group checksum (sampled) and metrics. Also tracks for debugging if enabled. """ - @spec message_received(term(), non_neg_integer(), integer(), integer(), keyword()) :: :ok - def message_received(owner_id, partition, commit_lsn, commit_idx, opts \\ []) do - update_checksum(owner_id, partition, commit_lsn, commit_idx, opts) - update_metrics(owner_id, opts) + @spec message_received_for_group(GroupMessage.t()) :: :ok + def message_received_for_group(%GroupMessage{group_id: nil}), do: raise("group_id cannot be nil") + + def message_received_for_group(%GroupMessage{} = message) do + if sample_group?(message.group_id), do: update_group_checksum(message) + update_count(message) + update_metrics(message) + owner_key = owner_key(message.owner_id, scope: message.scope) + maybe_track(owner_key, message) :ok end @@ -96,6 +160,21 @@ defmodule Sequin.Benchmark.Stats do end) end + @doc """ + Returns the current per-group checksums for an owner. + """ + @spec group_checksums(term(), keyword()) :: %{term() => {non_neg_integer(), non_neg_integer()}} + def group_checksums(owner_id, opts \\ []) do + ensure_tables() + owner_key = owner_key(owner_id, opts) + + @group_checksums_table + |> :ets.match({{owner_key, :"$1"}, :"$2", :"$3"}) + |> Map.new(fn [group_id, checksum, count] -> + {group_id, {checksum, count}} + end) + end + @doc """ Returns the current throughput and latency metrics for an owner. """ @@ -117,6 +196,30 @@ defmodule Sequin.Benchmark.Stats do end end + @doc """ + Returns the total messages processed for an owner. + """ + @spec message_count(term(), keyword()) :: non_neg_integer() + def message_count(owner_id, opts \\ []) do + ensure_tables() + owner_key = owner_key(owner_id, opts) + + case :ets.lookup(@counts_table, owner_key) do + [{^owner_key, count}] -> count + [] -> 0 + end + end + + @doc """ + Returns the configured checksum sample rate for per-group verification. + """ + @spec checksum_sample_rate() :: float() + def checksum_sample_rate do + :sequin + |> Application.get_env(__MODULE__, []) + |> Keyword.get(:checksum_sample_rate, @default_checksum_sample_rate) + end + @doc """ Returns tracked messages as [{commit_lsn, commit_idx, partition}] in order. """ @@ -144,7 +247,9 @@ defmodule Sequin.Benchmark.Stats do owner_key = owner_key(owner_id, opts) :ets.match_delete(@checksums_table, {{owner_key, :_}, :_, :_}) + :ets.match_delete(@group_checksums_table, {{owner_key, :_}, :_, :_}) :ets.delete(@metrics_table, owner_key) + :ets.delete(@counts_table, owner_key) :ets.match_delete(@tracking_table, {{owner_key, :_}, :_, :_, :_}) :ok @@ -164,42 +269,57 @@ defmodule Sequin.Benchmark.Stats do # Private Functions # ============================================================================ - defp update_checksum(owner_id, partition, commit_lsn, commit_idx, opts) do + defp update_checksum(%Message{} = msg) do ensure_tables() - owner_key = owner_key(owner_id, opts) - key = {owner_key, partition} + owner_key = owner_key(msg.owner_id, scope: msg.scope) + key = {owner_key, msg.partition} + new_checksum_data = <> case :ets.lookup(@checksums_table, key) do [{^key, prev_checksum, count}] -> - new_checksum = :erlang.crc32(<>) + new_checksum = :erlang.crc32(<>) :ets.insert(@checksums_table, {key, new_checksum, count + 1}) - maybe_track(owner_key, commit_lsn, commit_idx, partition) [] -> - new_checksum = :erlang.crc32(<<0::32, commit_lsn::64, commit_idx::32>>) + new_checksum = :erlang.crc32(<<0::32, new_checksum_data::binary>>) :ets.insert(@checksums_table, {key, new_checksum, 1}) - maybe_track(owner_key, commit_lsn, commit_idx, partition) end + + maybe_track(owner_key, msg) end - defp update_metrics(owner_id, opts) do + defp update_group_checksum(%GroupMessage{} = msg) do ensure_tables() - owner_key = owner_key(owner_id, opts) + owner_key = owner_key(msg.owner_id, scope: msg.scope) + key = {owner_key, msg.group_id} + new_checksum_data = <> + + case :ets.lookup(@group_checksums_table, key) do + [{^key, prev_checksum, count}] -> + new_checksum = :erlang.crc32(<>) + :ets.insert(@group_checksums_table, {key, new_checksum, count + 1}) + + [] -> + new_checksum = :erlang.crc32(<<0::32, new_checksum_data::binary>>) + :ets.insert(@group_checksums_table, {key, new_checksum, 1}) + end + end - # Record bytes if provided - if byte_size = Keyword.get(opts, :byte_size) do + defp update_metrics(msg) do + ensure_tables() + owner_key = owner_key(msg.owner_id, scope: msg.scope) + + if msg.byte_size do try do - :ets.update_counter(@metrics_table, owner_key, {2, byte_size}) + :ets.update_counter(@metrics_table, owner_key, {2, msg.byte_size}) rescue ArgumentError -> - :ets.insert(@metrics_table, {owner_key, byte_size, 0, 0}) + :ets.insert(@metrics_table, {owner_key, msg.byte_size, 0, 0}) end end - # Record latency if created_at provided - if created_at_us = Keyword.get(opts, :created_at_us) do - now = :os.system_time(:microsecond) - latency_us = now - created_at_us + if msg.created_at_us do + latency_us = :os.system_time(:microsecond) - msg.created_at_us try do :ets.update_counter(@metrics_table, owner_key, [{3, latency_us}, {4, 1}]) @@ -210,15 +330,35 @@ defmodule Sequin.Benchmark.Stats do end end - defp maybe_track(owner_key, commit_lsn, commit_idx, partition) do + defp update_count(msg) do + ensure_tables() + owner_key = owner_key(msg.owner_id, scope: msg.scope) + + try do + :ets.update_counter(@counts_table, owner_key, {2, 1}) + rescue + ArgumentError -> + :ets.insert(@counts_table, {owner_key, 1}) + end + end + + defp maybe_track(_owner_key, %{partition: nil}), do: :ok + + defp maybe_track(owner_key, msg) do if track_messages?() do track_key = {owner_key, :erlang.unique_integer([:monotonic, :positive])} - :ets.insert(@tracking_table, {track_key, commit_lsn, commit_idx, partition}) + :ets.insert(@tracking_table, {track_key, msg.commit_lsn, msg.commit_idx, msg.partition}) end end + defp sample_group?(group_id) do + sample_rate = checksum_sample_rate() |> min(1.0) |> max(0.0) + threshold = trunc(sample_rate * 1_000_000) + :erlang.phash2(group_id, 1_000_000) < threshold + end + defp owner_key(owner_id, opts) do - scope = Keyword.get(opts, :scope, :pipeline) + scope = Keyword.get(opts, :scope) || :pipeline {scope, owner_id} end @@ -247,6 +387,31 @@ defmodule Sequin.Benchmark.Stats do :ok end + case :ets.whereis(@counts_table) do + :undefined -> + try do + :ets.new(@counts_table, [:set, :public, :named_table, {:write_concurrency, true}]) + rescue + ArgumentError -> + :ok + end + + _ -> + :ok + end + + case :ets.whereis(@group_checksums_table) do + :undefined -> + try do + :ets.new(@group_checksums_table, [:set, :public, :named_table, {:write_concurrency, true}]) + rescue + ArgumentError -> :ok + end + + _tid -> + :ok + end + case :ets.whereis(@tracking_table) do :undefined -> try do diff --git a/lib/sequin/postgres/benchmark_source.ex b/lib/sequin/postgres/benchmark_source.ex index 5e6c2d2f9..c70822a2b 100644 --- a/lib/sequin/postgres/benchmark_source.ex +++ b/lib/sequin/postgres/benchmark_source.ex @@ -439,13 +439,29 @@ defmodule Sequin.Postgres.BenchmarkSource do # Compute partition for checksum # Use pk (as string) to match pipeline's group_id partitioning - partition = Stats.partition(to_string(pk), state.config.partition_count) + group_id = to_string(pk) + partition = Stats.partition(group_id, state.config.partition_count) # Compute commit_idx (0-based index within transaction) commit_idx = state.transaction_size - state.transaction_messages_remaining # Update checksum for this partition - Stats.message_emitted(state.id, partition, state.transaction_commit_lsn, commit_idx, scope: :source) + Stats.message_emitted(%Stats.Message{ + owner_id: state.id, + partition: partition, + commit_lsn: state.transaction_commit_lsn, + commit_idx: commit_idx, + scope: :source + }) + + # Update per-group checksum (sampling handled by Stats) + Stats.message_emitted_for_group(%Stats.GroupMessage{ + owner_id: state.id, + group_id: group_id, + commit_lsn: state.transaction_commit_lsn, + commit_idx: commit_idx, + scope: :source + }) # Generate payload payload = :binary.copy("x", row_size) diff --git a/lib/sequin/runtime/benchmark_pipeline.ex b/lib/sequin/runtime/benchmark_pipeline.ex index 9b3356b3b..14b67d69a 100644 --- a/lib/sequin/runtime/benchmark_pipeline.ex +++ b/lib/sequin/runtime/benchmark_pipeline.ex @@ -45,6 +45,24 @@ defmodule Sequin.Runtime.BenchmarkPipeline do context end + @impl SinkPipeline + def batchers_config(consumer) do + %SinkConsumer{sink: %BenchmarkSink{partition_count: partition_count}} = consumer + + [ + default: [ + concurrency: partition_count, + batch_size: 10, + batch_timeout: 1 + ] + ] + end + + @impl SinkPipeline + def partition_by_fn(_consumer) do + fn msg -> :erlang.phash2(msg.data.group_id) end + end + @impl SinkPipeline def handle_message(broadway_message, context) do %{consumer: %SinkConsumer{sink: %BenchmarkSink{partition_count: partition_count}}} = context @@ -62,7 +80,6 @@ defmodule Sequin.Runtime.BenchmarkPipeline do @impl SinkPipeline def handle_batch(:default, messages, %{batch_key: partition}, context) do %{consumer: consumer} = context - # Also emit to Prometheus for observability total_bytes = messages |> Enum.map(& &1.data.payload_size_bytes) |> Enum.sum() @@ -77,14 +94,15 @@ defmodule Sequin.Runtime.BenchmarkPipeline do Enum.each(sorted_messages, fn msg -> created_at_us = extract_created_at(msg.data.data.record) - Stats.message_received( - consumer.id, - partition, - msg.data.commit_lsn, - msg.data.commit_idx, + Stats.message_received_for_group(%Stats.GroupMessage{ + owner_id: consumer.id, + group_id: msg.data.group_id, + commit_lsn: msg.data.commit_lsn, + commit_idx: msg.data.commit_idx, + partition: partition, byte_size: msg.data.payload_size_bytes, created_at_us: created_at_us - ) + }) # Also emit to Prometheus for observability if created_at_us do diff --git a/lib/sequin/runtime/message_handler.ex b/lib/sequin/runtime/message_handler.ex index 83cfc3f96..cb4e6cefd 100644 --- a/lib/sequin/runtime/message_handler.ex +++ b/lib/sequin/runtime/message_handler.ex @@ -188,11 +188,7 @@ defmodule Sequin.Runtime.MessageHandler do @max_backoff_ms 100 @max_attempts 15 @decorate track_metrics("put_messages") - defp put_messages(consumer, messages_to_ingest) do - do_put_messages(consumer, messages_to_ingest) - end - - defp do_put_messages(consumer, messages_to_ingest, attempt \\ 1) do + defp put_messages(consumer, messages_to_ingest, attempt \\ 1) do case SlotMessageStore.put_messages(consumer, messages_to_ingest) do :ok -> Health.put_event(:sink_consumer, consumer.id, %Event{slug: :messages_ingested, status: :success}) @@ -207,7 +203,7 @@ defmodule Sequin.Runtime.MessageHandler do ) Process.sleep(backoff) - do_put_messages(consumer, messages_to_ingest, attempt + 1) + put_messages(consumer, messages_to_ingest, attempt + 1) {:error, error} -> Health.put_event(:sink_consumer, consumer.id, %Event{slug: :messages_ingested, status: :fail, error: error}) diff --git a/lib/sequin/runtime/sink_pipeline.ex b/lib/sequin/runtime/sink_pipeline.ex index a55577a15..4cade4020 100644 --- a/lib/sequin/runtime/sink_pipeline.ex +++ b/lib/sequin/runtime/sink_pipeline.ex @@ -80,11 +80,21 @@ defmodule Sequin.Runtime.SinkPipeline do """ @callback apply_routing(consumer :: SinkConsumer.t(), rinfo :: map()) :: struct() + @doc """ + Returns a partition function for Broadway's :partition_by option. + + When implemented, ensures messages with the same partition key are processed + in order through the same processor and batcher stage. + Return `nil` to disable partitioning. + """ + @callback partition_by_fn(consumer :: SinkConsumer.t()) :: (Message.t() -> term()) | nil + @optional_callbacks [ processors_config: 1, batchers_config: 1, handle_message: 2, - apply_routing: 2 + apply_routing: 2, + partition_by_fn: 1 ] @doc """ @@ -133,7 +143,7 @@ defmodule Sequin.Runtime.SinkPipeline do test_pid: test_pid } - Broadway.start_link(__MODULE__, + broadway_opts = [ name: via_tuple(consumer.id), producer: [ module: {producer, [consumer_id: consumer.id, test_pid: test_pid]} @@ -141,7 +151,18 @@ defmodule Sequin.Runtime.SinkPipeline do processors: processors_config(pipeline_mod, consumer), batchers: batchers_config(pipeline_mod, consumer), context: context - ) + ] + + # Add partition_by_fn if the pipeline module implements it + broadway_opts = + if function_exported?(pipeline_mod, :partition_by_fn, 1) do + partition_fn = pipeline_mod.partition_by_fn(consumer) + Keyword.put(broadway_opts, :partition_by, partition_fn) + else + broadway_opts + end + + Broadway.start_link(__MODULE__, broadway_opts) end def via_tuple(consumer_id) do diff --git a/test/sequin/runtime/benchmark_pipeline_test.exs b/test/sequin/runtime/benchmark_pipeline_test.exs index 7fff3ab85..a9639e384 100644 --- a/test/sequin/runtime/benchmark_pipeline_test.exs +++ b/test/sequin/runtime/benchmark_pipeline_test.exs @@ -26,8 +26,22 @@ defmodule Sequin.Runtime.BenchmarkPipelineTest do consumer_id = "test-consumer-#{System.unique_integer()}" Stats.init_for_owner(consumer_id, 2, scope: :pipeline) - Stats.message_emitted(consumer_id, 0, 100, 0, scope: :pipeline) - Stats.message_emitted(consumer_id, 1, 200, 0, scope: :pipeline) + + Stats.message_emitted(%Stats.Message{ + owner_id: consumer_id, + partition: 0, + commit_lsn: 100, + commit_idx: 0, + scope: :pipeline + }) + + Stats.message_emitted(%Stats.Message{ + owner_id: consumer_id, + partition: 1, + commit_lsn: 200, + commit_idx: 0, + scope: :pipeline + }) Stats.reset_for_owner(consumer_id) @@ -60,6 +74,10 @@ defmodule Sequin.Runtime.BenchmarkPipelineTest do end test "messages are processed and checksums are tracked", %{consumer: consumer} do + # Set 100% sample rate for testing + Application.put_env(:sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 1.0) + on_exit(fn -> Application.delete_env(:sequin, Sequin.Benchmark.Stats) end) + test_pid = self() # Start the SlotMessageStoreSupervisor @@ -85,18 +103,22 @@ defmodule Sequin.Runtime.BenchmarkPipelineTest do # Wait for all messages to be processed await_acks(10) - # Verify checksums were tracked - checksums = Stats.checksums(consumer.id) - assert map_size(checksums) == 4 + # Verify group checksums were tracked (BenchmarkPipeline uses message_received_for_group) + group_checksums = Stats.group_checksums(consumer.id) + assert map_size(group_checksums) == 4 # Verify total count matches total_count = - Enum.reduce(checksums, 0, fn {_partition, {_checksum, count}}, acc -> acc + count end) + Enum.reduce(group_checksums, 0, fn {_group_id, {_checksum, count}}, acc -> acc + count end) assert total_count == 10 end test "checksum computation matches expected formula", %{consumer: consumer} do + # Set 100% sample rate for testing + Application.put_env(:sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 1.0) + on_exit(fn -> Application.delete_env(:sequin, Sequin.Benchmark.Stats) end) + test_pid = self() # Start the SlotMessageStoreSupervisor @@ -124,26 +146,28 @@ defmodule Sequin.Runtime.BenchmarkPipelineTest do # Wait for message to be processed await_acks(1) - # Compute expected checksum manually - partition = :erlang.phash2(group_id, 4) + # Compute expected checksum manually (group checksums are keyed by group_id, not partition) expected_checksum = :erlang.crc32(<<0::32, commit_lsn::64, commit_idx::32>>) - checksums = Stats.checksums(consumer.id) - {actual_checksum, count} = checksums[partition] + group_checksums = Stats.group_checksums(consumer.id) + {actual_checksum, count} = group_checksums[group_id] assert count == 1 assert actual_checksum == expected_checksum end test "checksums are order-sensitive (rolling checksum)", %{consumer: consumer} do + # Set 100% sample rate for testing + Application.put_env(:sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 1.0) + on_exit(fn -> Application.delete_env(:sequin, Sequin.Benchmark.Stats) end) + test_pid = self() # Start the SlotMessageStoreSupervisor start_supervised!({SlotMessageStoreSupervisor, [consumer_id: consumer.id, test_pid: test_pid]}) - # Create two events for the same partition - group_id = "same-partition" - partition = :erlang.phash2(group_id, 4) + # Create two events for the same group_id + group_id = "same-group" events = [ ConsumersFactory.consumer_event( @@ -169,23 +193,18 @@ defmodule Sequin.Runtime.BenchmarkPipelineTest do # Wait for messages to be processed await_acks(2) - checksums = Stats.checksums(consumer.id) - {actual_checksum, count} = checksums[partition] + group_checksums = Stats.group_checksums(consumer.id) + {actual_checksum, count} = group_checksums[group_id] assert count == 2 # Verify the checksum is a rolling checksum (not just a single value). - # The order of processing within a batch may vary, so we accept either order. - # Order A: 100 then 200 + # BenchmarkPipeline sorts by (commit_lsn, commit_idx) so order should be deterministic. + # Order: 100 then 200 checksum_100_first = :erlang.crc32(<<0::32, 100::64, 0::32>>) - expected_order_a = :erlang.crc32(<>) - - # Order B: 200 then 100 - checksum_200_first = :erlang.crc32(<<0::32, 200::64, 0::32>>) - expected_order_b = :erlang.crc32(<>) + expected_checksum = :erlang.crc32(<>) - assert actual_checksum in [expected_order_a, expected_order_b], - "Checksum #{actual_checksum} doesn't match expected order A (#{expected_order_a}) or B (#{expected_order_b})" + assert actual_checksum == expected_checksum end end diff --git a/test/sequin/sink_pipeline_test.exs b/test/sequin/sink_pipeline_test.exs index 5ce58099a..be4943b54 100644 --- a/test/sequin/sink_pipeline_test.exs +++ b/test/sequin/sink_pipeline_test.exs @@ -11,6 +11,9 @@ defmodule Sequin.SinkPipelineTest do stub(SinkPipelineMock, :init, fn context, _ -> context end) stub(SinkPipelineMock, :processors_config, fn _ -> [] end) stub(SinkPipelineMock, :batchers_config, fn _ -> [] end) + + stub(SinkPipelineMock, :partition_by_fn, fn _consumer -> fn message -> :erlang.phash2(message.data.group_id) end end) + stub(SinkPipelineMock, :handle_message, fn message, context -> {:ok, message, context} end) stub(SinkPipelineMock, :handle_batch, fn _batch_name, messages, _batch_info, context -> {:ok, messages, context} end)