Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/bench.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Config

config :sequin, Oban, testing: :manual, prefix: "sequin_config"
config :sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 0.1

config :sequin, Sequin.Repo,
username: "postgres",
Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ config :sequin, Sequin,
process_mod: Sequin.TestSupport.ProcessMock,
application_mod: Sequin.TestSupport.ApplicationMock

config :sequin, Sequin.Benchmark.Stats, checksum_sample_rate: 0.1
config :sequin, Sequin.Mailer, adapter: Swoosh.Adapters.Test

config :sequin, Sequin.Pagerduty,
Expand Down
177 changes: 156 additions & 21 deletions lib/mix/tasks/bench.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ defmodule Mix.Tasks.Bench do

## Options

* `--duration` - Duration to run benchmark in seconds (default: 60)
* `--duration` - Duration to run benchmark in seconds (default: 60 when --max-messages not set)
* `--row-sizes` - Row size distribution as "fraction:bytes,..." (default: "1.0:200")
* `--transaction-sizes` - Transaction size distribution as "fraction:count,..." (default: "1.0:10")
* `--pk-collision-rate` - PK collision rate 0.0-1.0 (default: 0.005)
* `--partition-count` - Number of partitions (default: schedulers_online)
* `--max-messages` - Maximum messages to generate (default: unlimited)
* `--through` - Pipeline stage to run through: "full" or "reorder_buffer" (default: "full")
* `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration)
* `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps" (default: "full")

## Examples

Expand All @@ -34,10 +34,14 @@ defmodule Mix.Tasks.Bench do

# Run through reorder_buffer only (isolated pipeline test)
mix benchmark --through reorder_buffer

# Run through SlotProcessorServer (includes message handler, stops before Broadway)
mix benchmark --through sps
"""
use Mix.Task

alias Sequin.Accounts
alias Sequin.Benchmark.MessageHandler, as: BenchmarkMessageHandler
alias Sequin.Benchmark.Stats
alias Sequin.Consumers
alias Sequin.Databases
Expand Down Expand Up @@ -80,22 +84,33 @@ defmodule Mix.Tasks.Bench do
]
)

duration = Keyword.get(opts, :duration, @default_duration)
duration_opt = Keyword.get(opts, :duration)
row_sizes = parse_distribution(Keyword.get(opts, :row_sizes), @default_row_sizes)
transaction_sizes = parse_distribution(Keyword.get(opts, :transaction_sizes), @default_transaction_sizes)
pk_collision_rate = Keyword.get(opts, :pk_collision_rate, @default_pk_collision_rate)
partition_count = Keyword.get(opts, :partition_count, @default_partition_count)
max_messages = Keyword.get(opts, :max_messages)
through = opts |> Keyword.get(:through, "full") |> String.to_existing_atom()

if max_messages && duration_opt do
Mix.raise("--duration and --max-messages are mutually exclusive")
end

duration =
cond do
duration_opt -> duration_opt
max_messages -> nil
true -> @default_duration
end

# Start the application
Mix.Task.run("app.start")

announce("#{@bold}=== Sequin Pipeline Benchmark ===#{@reset}", @cyan)
IO.puts("")

announce("Configuration:", @yellow)
IO.puts(" Duration: #{duration}s")
IO.puts(" Duration: #{format_duration(duration, max_messages)}")
IO.puts(" Row sizes: #{inspect(row_sizes)}")
IO.puts(" Transaction sizes: #{inspect(transaction_sizes)}")
IO.puts(" PK collision rate: #{pk_collision_rate}")
Expand All @@ -121,7 +136,7 @@ defmodule Mix.Tasks.Bench do
IO.puts(" Replication slot: #{replication.id}")

# For :full mode, we need a consumer for Broadway
# For :reorder_buffer mode, we use the replication.id as the checksum owner
# For :reorder_buffer and :sps modes, we use the replication.id as the checksum owner
{consumer, checksum_owner_id} =
case through do
:full ->
Expand All @@ -133,6 +148,11 @@ defmodule Mix.Tasks.Bench do
# Initialize checksums with replication.id as owner
Stats.init_for_owner(replication.id, partition_count)
{nil, replication.id}

:sps ->
# Initialize checksums with replication.id as owner
Stats.init_for_owner(replication.id, partition_count)
{nil, replication.id}
end

IO.puts("")
Expand Down Expand Up @@ -191,6 +211,31 @@ defmodule Mix.Tasks.Bench do
skip_start?: true
]
)

:sps ->
replication = Repo.preload(replication, :postgres_database)

# Initialize stats tracking for this owner
Stats.init_for_owner(checksum_owner_id, partition_count)

# Create context for the benchmark message handler
message_handler_ctx = %BenchmarkMessageHandler.Context{
partition_count: partition_count,
checksum_owner_id: checksum_owner_id
}

{:ok, _slot_sup} =
SlotProducerSupervisor.start_link(
replication_slot: replication,
slot_producer_opts: [
backend_mod: VirtualBackend,
connect_opts: [id: source_id, source_mod: BenchmarkSource]
],
slot_processor_opts: [
message_handler_module: BenchmarkMessageHandler,
message_handler_ctx_fn: fn _replication -> message_handler_ctx end
]
)
end

IO.puts("")
Expand All @@ -206,8 +251,13 @@ defmodule Mix.Tasks.Bench do
capture_metrics_from_module(checksum_owner_id)
end

announce("Running benchmark for #{duration}s...", @green)
Process.sleep(duration * 1000)
announce("Running benchmark for #{format_duration(duration, max_messages)}...", @green)

if duration do
Process.sleep(duration * 1000)
else
wait_for_max_messages(source_id, max_messages)
end

# Pause the source and wait for pipeline to drain
announce("Pausing source and waiting for pipeline to drain...", @yellow)
Expand Down Expand Up @@ -239,6 +289,20 @@ defmodule Mix.Tasks.Bench do
source_checksums = BenchmarkSource.checksums(source_id)
pipeline_checksums = Stats.checksums(checksum_owner_id)

source_group_checksums =
if through == :full do
Stats.group_checksums(source_id, scope: :source)
else
%{}
end

pipeline_group_checksums =
if through == :full do
Stats.group_checksums(checksum_owner_id)
else
%{}
end

# Get tracked messages for comparison
source_tracked = BenchmarkSource.tracked_messages(source_id)
pipeline_tracked = Stats.tracked_messages(checksum_owner_id)
Expand All @@ -250,6 +314,9 @@ defmodule Mix.Tasks.Bench do
actual_duration_s,
source_checksums,
pipeline_checksums,
source_group_checksums,
pipeline_group_checksums,
through,
source_tracked,
pipeline_tracked
)
Expand Down Expand Up @@ -281,13 +348,23 @@ defmodule Mix.Tasks.Bench do
end

defp pipeline_total_count(consumer_id) do
consumer_id
|> Stats.checksums()
|> Map.values()
|> Enum.map(&elem(&1, 1))
|> Enum.sum()
Stats.message_count(consumer_id)
end

defp wait_for_max_messages(source_id, max_messages, poll_ms \\ 200) do
%{total_messages: total_messages} = BenchmarkSource.stats(source_id)

if total_messages >= max_messages do
:ok
else
Process.sleep(poll_ms)
wait_for_max_messages(source_id, max_messages, poll_ms)
end
end

defp format_duration(nil, max_messages), do: "until #{max_messages} messages"
defp format_duration(duration, _max_messages), do: "#{duration}s"

defp setup_base_entities do
# Create account using domain function
{:ok, account} =
Expand Down Expand Up @@ -336,7 +413,10 @@ defmodule Mix.Tasks.Bench do
status: :active,
actions: [:insert, :update, :delete],
replication_slot_id: replication.id,
sink: %{type: :benchmark, partition_count: partition_count},
sink: %{
type: :benchmark,
partition_count: partition_count
},
source: %{include_table_oids: [16_384, 16_385, 16_386]}
})

Expand All @@ -360,14 +440,14 @@ defmodule Mix.Tasks.Bench do
partition_msgs
|> Enum.sort_by(&{&1.message.commit_lsn, &1.message.commit_idx})
|> Enum.each(fn msg ->
Stats.message_received(
checksum_owner_id,
partition,
msg.message.commit_lsn,
msg.message.commit_idx,
Stats.message_received(%Stats.Message{
owner_id: checksum_owner_id,
partition: partition,
commit_lsn: msg.message.commit_lsn,
commit_idx: msg.message.commit_idx,
byte_size: msg.byte_size,
created_at_us: extract_created_at(msg.message.fields)
)
})
end)
end)

Expand Down Expand Up @@ -446,6 +526,9 @@ defmodule Mix.Tasks.Bench do
duration_s,
source_checksums,
pipeline_checksums,
source_group_checksums,
pipeline_group_checksums,
through,
source_tracked,
pipeline_tracked
) do
Expand Down Expand Up @@ -477,7 +560,14 @@ defmodule Mix.Tasks.Bench do

# Verification
announce("Verification:", @yellow)
verify_checksums(source_checksums, pipeline_checksums)

case through do
:full ->
verify_group_checksums(source_group_checksums, pipeline_group_checksums)

_ ->
verify_checksums(source_checksums, pipeline_checksums)
end

# Message tracking comparison
IO.puts("")
Expand Down Expand Up @@ -539,6 +629,51 @@ defmodule Mix.Tasks.Bench do
end
end

defp verify_group_checksums(source_group_checksums, pipeline_group_checksums) do
source_groups = source_group_checksums |> Map.keys() |> Enum.sort()
pipeline_groups = pipeline_group_checksums |> Map.keys() |> Enum.sort()

IO.puts(" Group checksum sample rate: #{Stats.checksum_sample_rate()}")
IO.puts(" Sampled groups: #{length(pipeline_groups)}")

cond do
Enum.empty?(pipeline_groups) ->
IO.puts(" #{@yellow}No sampled groups to verify#{@reset}")

source_groups == pipeline_groups ->
mismatches =
Enum.filter(source_groups, fn group_id ->
source_group_checksums[group_id] != pipeline_group_checksums[group_id]
end)

if Enum.empty?(mismatches) do
IO.puts(" #{@green}Group checksums: PASS (all sampled groups match)#{@reset}")
else
IO.puts(" #{@yellow}Group checksums: PARTIAL (#{length(mismatches)} groups differ)#{@reset}")

mismatches
|> Enum.take(5)
|> Enum.each(fn group_id ->
{src_checksum, src_count} = source_group_checksums[group_id]
{pipe_checksum, pipe_count} = pipeline_group_checksums[group_id]

IO.puts(
" Group #{group_id}: source={#{src_checksum}, #{src_count}} pipeline={#{pipe_checksum}, #{pipe_count}}"
)
end)

if length(mismatches) > 5 do
IO.puts(" ... and #{length(mismatches) - 5} more")
end
end

true ->
IO.puts(" #{@yellow}WARNING: Sampled group mismatch#{@reset}")
IO.puts(" Source groups: #{length(source_groups)}")
IO.puts(" Pipeline groups: #{length(pipeline_groups)}")
end
end

defp compare_tracked_messages(source_tracked, pipeline_tracked) do
source_count = length(source_tracked)
pipeline_count = length(pipeline_tracked)
Expand Down
Loading
Loading