@@ -10,13 +10,13 @@ defmodule Mix.Tasks.Bench do
1010
1111 ## Options
1212
13- * `--duration` - Duration to run benchmark in seconds (default: 60)
13+ * `--duration` - Duration to run benchmark in seconds (default: 60 when --max-messages not set )
1414 * `--row-sizes` - Row size distribution as "fraction:bytes,..." (default: "1.0:200")
1515 * `--transaction-sizes` - Transaction size distribution as "fraction:count,..." (default: "1.0:10")
1616 * `--pk-collision-rate` - PK collision rate 0.0-1.0 (default: 0.005)
1717 * `--partition-count` - Number of partitions (default: schedulers_online)
18- * `--max-messages` - Maximum messages to generate (default: unlimited)
19- * `--through` - Pipeline stage to run through: "full" or "reorder_buffer " (default: "full")
18+ * `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration )
19+ * `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps " (default: "full")
2020
2121 ## Examples
2222
@@ -34,10 +34,14 @@ defmodule Mix.Tasks.Bench do
3434
3535 # Run through reorder_buffer only (isolated pipeline test)
3636 mix benchmark --through reorder_buffer
37+
38+ # Run through SlotProcessorServer (includes message handler, stops before Broadway)
39+ mix benchmark --through sps
3740 """
3841 use Mix.Task
3942
4043 alias Sequin.Accounts
44+ alias Sequin.Benchmark.MessageHandler , as: BenchmarkMessageHandler
4145 alias Sequin.Benchmark.Stats
4246 alias Sequin.Consumers
4347 alias Sequin.Databases
@@ -80,22 +84,33 @@ defmodule Mix.Tasks.Bench do
8084 ]
8185 )
8286
83- duration = Keyword . get ( opts , :duration , @ default_duration )
87+ duration_opt = Keyword . get ( opts , :duration )
8488 row_sizes = parse_distribution ( Keyword . get ( opts , :row_sizes ) , @ default_row_sizes )
8589 transaction_sizes = parse_distribution ( Keyword . get ( opts , :transaction_sizes ) , @ default_transaction_sizes )
8690 pk_collision_rate = Keyword . get ( opts , :pk_collision_rate , @ default_pk_collision_rate )
8791 partition_count = Keyword . get ( opts , :partition_count , @ default_partition_count )
8892 max_messages = Keyword . get ( opts , :max_messages )
8993 through = opts |> Keyword . get ( :through , "full" ) |> String . to_existing_atom ( )
9094
95+ if max_messages && duration_opt do
96+ Mix . raise ( "--duration and --max-messages are mutually exclusive" )
97+ end
98+
99+ duration =
100+ cond do
101+ duration_opt -> duration_opt
102+ max_messages -> nil
103+ true -> @ default_duration
104+ end
105+
91106 # Start the application
92107 Mix.Task . run ( "app.start" )
93108
94109 announce ( "#{ @ bold } === Sequin Pipeline Benchmark ===#{ @ reset } " , @ cyan )
95110 IO . puts ( "" )
96111
97112 announce ( "Configuration:" , @ yellow )
98- IO . puts ( " Duration: #{ duration } s " )
113+ IO . puts ( " Duration: #{ format_duration ( duration , max_messages ) } " )
99114 IO . puts ( " Row sizes: #{ inspect ( row_sizes ) } " )
100115 IO . puts ( " Transaction sizes: #{ inspect ( transaction_sizes ) } " )
101116 IO . puts ( " PK collision rate: #{ pk_collision_rate } " )
@@ -121,7 +136,7 @@ defmodule Mix.Tasks.Bench do
121136 IO . puts ( " Replication slot: #{ replication . id } " )
122137
123138 # For :full mode, we need a consumer for Broadway
124- # For :reorder_buffer mode , we use the replication.id as the checksum owner
139+ # For :reorder_buffer and :sps modes , we use the replication.id as the checksum owner
125140 { consumer , checksum_owner_id } =
126141 case through do
127142 :full ->
@@ -133,6 +148,11 @@ defmodule Mix.Tasks.Bench do
133148 # Initialize checksums with replication.id as owner
134149 Stats . init_for_owner ( replication . id , partition_count )
135150 { nil , replication . id }
151+
152+ :sps ->
153+ # Initialize checksums with replication.id as owner
154+ Stats . init_for_owner ( replication . id , partition_count )
155+ { nil , replication . id }
136156 end
137157
138158 IO . puts ( "" )
@@ -191,6 +211,31 @@ defmodule Mix.Tasks.Bench do
191211 skip_start?: true
192212 ]
193213 )
214+
215+ :sps ->
216+ replication = Repo . preload ( replication , :postgres_database )
217+
218+ # Initialize stats tracking for this owner
219+ Stats . init_for_owner ( checksum_owner_id , partition_count )
220+
221+ # Create context for the benchmark message handler
222+ message_handler_ctx = % BenchmarkMessageHandler.Context {
223+ partition_count: partition_count ,
224+ checksum_owner_id: checksum_owner_id
225+ }
226+
227+ { :ok , _slot_sup } =
228+ SlotProducerSupervisor . start_link (
229+ replication_slot: replication ,
230+ slot_producer_opts: [
231+ backend_mod: VirtualBackend ,
232+ connect_opts: [ id: source_id , source_mod: BenchmarkSource ]
233+ ] ,
234+ slot_processor_opts: [
235+ message_handler_module: BenchmarkMessageHandler ,
236+ message_handler_ctx_fn: fn _replication -> message_handler_ctx end
237+ ]
238+ )
194239 end
195240
196241 IO . puts ( "" )
@@ -206,8 +251,13 @@ defmodule Mix.Tasks.Bench do
206251 capture_metrics_from_module ( checksum_owner_id )
207252 end
208253
209- announce ( "Running benchmark for #{ duration } s..." , @ green )
210- Process . sleep ( duration * 1000 )
254+ announce ( "Running benchmark for #{ format_duration ( duration , max_messages ) } ..." , @ green )
255+
256+ if duration do
257+ Process . sleep ( duration * 1000 )
258+ else
259+ wait_for_max_messages ( source_id , max_messages )
260+ end
211261
212262 # Pause the source and wait for pipeline to drain
213263 announce ( "Pausing source and waiting for pipeline to drain..." , @ yellow )
@@ -239,6 +289,20 @@ defmodule Mix.Tasks.Bench do
239289 source_checksums = BenchmarkSource . checksums ( source_id )
240290 pipeline_checksums = Stats . checksums ( checksum_owner_id )
241291
292+ source_group_checksums =
293+ if through == :full do
294+ Stats . group_checksums ( source_id , scope: :source )
295+ else
296+ % { }
297+ end
298+
299+ pipeline_group_checksums =
300+ if through == :full do
301+ Stats . group_checksums ( checksum_owner_id )
302+ else
303+ % { }
304+ end
305+
242306 # Get tracked messages for comparison
243307 source_tracked = BenchmarkSource . tracked_messages ( source_id )
244308 pipeline_tracked = Stats . tracked_messages ( checksum_owner_id )
@@ -250,6 +314,9 @@ defmodule Mix.Tasks.Bench do
250314 actual_duration_s ,
251315 source_checksums ,
252316 pipeline_checksums ,
317+ source_group_checksums ,
318+ pipeline_group_checksums ,
319+ through ,
253320 source_tracked ,
254321 pipeline_tracked
255322 )
@@ -281,13 +348,23 @@ defmodule Mix.Tasks.Bench do
281348 end
282349
283350 defp pipeline_total_count ( consumer_id ) do
284- consumer_id
285- |> Stats . checksums ( )
286- |> Map . values ( )
287- |> Enum . map ( & elem ( & 1 , 1 ) )
288- |> Enum . sum ( )
351+ Stats . message_count ( consumer_id )
289352 end
290353
354+ defp wait_for_max_messages ( source_id , max_messages , poll_ms \\ 200 ) do
355+ % { total_messages: total_messages } = BenchmarkSource . stats ( source_id )
356+
357+ if total_messages >= max_messages do
358+ :ok
359+ else
360+ Process . sleep ( poll_ms )
361+ wait_for_max_messages ( source_id , max_messages , poll_ms )
362+ end
363+ end
364+
365+ defp format_duration ( nil , max_messages ) , do: "until #{ max_messages } messages"
366+ defp format_duration ( duration , _max_messages ) , do: "#{ duration } s"
367+
291368 defp setup_base_entities do
292369 # Create account using domain function
293370 { :ok , account } =
@@ -336,7 +413,10 @@ defmodule Mix.Tasks.Bench do
336413 status: :active ,
337414 actions: [ :insert , :update , :delete ] ,
338415 replication_slot_id: replication . id ,
339- sink: % { type: :benchmark , partition_count: partition_count } ,
416+ sink: % {
417+ type: :benchmark ,
418+ partition_count: partition_count
419+ } ,
340420 source: % { include_table_oids: [ 16_384 , 16_385 , 16_386 ] }
341421 } )
342422
@@ -360,14 +440,14 @@ defmodule Mix.Tasks.Bench do
360440 partition_msgs
361441 |> Enum . sort_by ( & { & 1 . message . commit_lsn , & 1 . message . commit_idx } )
362442 |> Enum . each ( fn msg ->
363- Stats . message_received (
364- checksum_owner_id ,
365- partition ,
366- msg . message . commit_lsn ,
367- msg . message . commit_idx ,
443+ Stats . message_received ( % Stats.Message {
444+ owner_id: checksum_owner_id ,
445+ partition: partition ,
446+ commit_lsn: msg . message . commit_lsn ,
447+ commit_idx: msg . message . commit_idx ,
368448 byte_size: msg . byte_size ,
369449 created_at_us: extract_created_at ( msg . message . fields )
370- )
450+ } )
371451 end )
372452 end )
373453
@@ -446,6 +526,9 @@ defmodule Mix.Tasks.Bench do
446526 duration_s ,
447527 source_checksums ,
448528 pipeline_checksums ,
529+ source_group_checksums ,
530+ pipeline_group_checksums ,
531+ through ,
449532 source_tracked ,
450533 pipeline_tracked
451534 ) do
@@ -477,7 +560,14 @@ defmodule Mix.Tasks.Bench do
477560
478561 # Verification
479562 announce ( "Verification:" , @ yellow )
480- verify_checksums ( source_checksums , pipeline_checksums )
563+
564+ case through do
565+ :full ->
566+ verify_group_checksums ( source_group_checksums , pipeline_group_checksums )
567+
568+ _ ->
569+ verify_checksums ( source_checksums , pipeline_checksums )
570+ end
481571
482572 # Message tracking comparison
483573 IO . puts ( "" )
@@ -539,6 +629,51 @@ defmodule Mix.Tasks.Bench do
539629 end
540630 end
541631
632+ defp verify_group_checksums ( source_group_checksums , pipeline_group_checksums ) do
633+ source_groups = source_group_checksums |> Map . keys ( ) |> Enum . sort ( )
634+ pipeline_groups = pipeline_group_checksums |> Map . keys ( ) |> Enum . sort ( )
635+
636+ IO . puts ( " Group checksum sample rate: #{ Stats . checksum_sample_rate ( ) } " )
637+ IO . puts ( " Sampled groups: #{ length ( pipeline_groups ) } " )
638+
639+ cond do
640+ Enum . empty? ( pipeline_groups ) ->
641+ IO . puts ( " #{ @ yellow } No sampled groups to verify#{ @ reset } " )
642+
643+ source_groups == pipeline_groups ->
644+ mismatches =
645+ Enum . filter ( source_groups , fn group_id ->
646+ source_group_checksums [ group_id ] != pipeline_group_checksums [ group_id ]
647+ end )
648+
649+ if Enum . empty? ( mismatches ) do
650+ IO . puts ( " #{ @ green } Group checksums: PASS (all sampled groups match)#{ @ reset } " )
651+ else
652+ IO . puts ( " #{ @ yellow } Group checksums: PARTIAL (#{ length ( mismatches ) } groups differ)#{ @ reset } " )
653+
654+ mismatches
655+ |> Enum . take ( 5 )
656+ |> Enum . each ( fn group_id ->
657+ { src_checksum , src_count } = source_group_checksums [ group_id ]
658+ { pipe_checksum , pipe_count } = pipeline_group_checksums [ group_id ]
659+
660+ IO . puts (
661+ " Group #{ group_id } : source={#{ src_checksum } , #{ src_count } } pipeline={#{ pipe_checksum } , #{ pipe_count } }"
662+ )
663+ end )
664+
665+ if length ( mismatches ) > 5 do
666+ IO . puts ( " ... and #{ length ( mismatches ) - 5 } more" )
667+ end
668+ end
669+
670+ true ->
671+ IO . puts ( " #{ @ yellow } WARNING: Sampled group mismatch#{ @ reset } " )
672+ IO . puts ( " Source groups: #{ length ( source_groups ) } " )
673+ IO . puts ( " Pipeline groups: #{ length ( pipeline_groups ) } " )
674+ end
675+ end
676+
542677 defp compare_tracked_messages ( source_tracked , pipeline_tracked ) do
543678 source_count = length ( source_tracked )
544679 pipeline_count = length ( pipeline_tracked )
0 commit comments