From 6960b1e5c032bacb0995be722cdbda4a83729d8b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 10:33:30 +0000 Subject: [PATCH 1/7] BroadcastHub: serialize unregister before re-register Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/3b8db3b8-fb93-4089-a882-327922405f8f Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../pekko/stream/scaladsl/HubSpec.scala | 20 ++++++-- .../apache/pekko/stream/scaladsl/Hub.scala | 51 +++++++++++-------- 2 files changed, 48 insertions(+), 23 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 1a396d823ef..79e3dabb024 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -401,9 +401,23 @@ class HubSpec extends StreamSpec { } "ensure that subsequent consumers see subsequent elements without gap" in { - val source = Source(1 to 20).runWith(BroadcastHub.sink(8)) - source.take(10).runWith(Sink.seq).futureValue should ===(1 to 10) - source.take(10).runWith(Sink.seq).futureValue should ===(11 to 20) + var firstConsumer: TestSubscriber.Probe[Int] = null + + def registerConsumerCallback(id: Long): Unit = { + if (id == 1) firstConsumer.cancel() + } + + val source = + Source(1 to 20).runWith(Sink.fromGraph(new BroadcastHub[Int](0, 8, registerConsumerCallback))) + + firstConsumer = source.runWith(TestSink[Int]()) + firstConsumer.request(10) + firstConsumer.expectNextN((1 to 10).toVector) + + val secondConsumer = source.runWith(TestSink[Int]()) + secondConsumer.request(10) + secondConsumer.expectNextN((11 to 20).toVector) + secondConsumer.expectComplete() } "send the same elements to consumers of different speed attaching around the same time" in { diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index 7a8adf7c53e..42df35213bd 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -516,7 +516,8 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I extends HubState private case class Closed(failure: Option[Throwable]) extends HubState - private class BroadcastSinkLogic(_shape: Shape) extends GraphStageLogic(_shape) with InHandler { + private class BroadcastSinkLogic(_shape: Shape, pendingUnregistrations: AtomicInteger) + extends GraphStageLogic(_shape) with InHandler { private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise() private[this] val noRegistrationsState = Open(callbackPromise.future, Nil) @@ -589,25 +590,30 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I checkUnblock(previousOffset) case RegistrationPending => - state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer => - val startFrom = head - activeConsumers += 1 - addConsumer(consumer, startFrom) - // add a callback hook so that we can control the interleaving in tests - registrationPendingCallback(consumer.id) - // in case the consumer is already stopped we need to undo registration - implicit val ec = materializer.executionContext - consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach { - case _: StreamDetachedException => - callbackPromise.future.foreach(callback => - callback.invoke(UnRegister(consumer.id, startFrom, startFrom))) - case _ => () + if (pendingUnregistrations.get() == 0) { + state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer => + val startFrom = head + activeConsumers += 1 + addConsumer(consumer, startFrom) + // add a callback hook so that we can control the interleaving in tests + registrationPendingCallback(consumer.id) + // in case the consumer is already stopped we need to undo registration + implicit val ec = materializer.executionContext + consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach { + case _: StreamDetachedException => + pendingUnregistrations.incrementAndGet() + callbackPromise.future.foreach(callback => + callback.invoke(UnRegister(consumer.id, startFrom, startFrom))) + case _ => () + } } + if (activeConsumers >= startAfterNrOfConsumers) { + initialized = true + } + tryPull() + } else { + callbackPromise.future.foreach(_.invoke(RegistrationPending))(materializer.executionContext) } - if (activeConsumers >= startAfterNrOfConsumers) { - initialized = true - } - tryPull() case UnRegister(id, previousOffset, finalOffset) => if (findAndRemoveConsumer(id, previousOffset) ne null) @@ -622,6 +628,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I tryPull() } } else checkUnblock(previousOffset) + pendingUnregistrations.decrementAndGet() } } @@ -781,7 +788,9 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I inheritedAttributes: Attributes): (GraphStageLogic, Source[T, NotUsed]) = { val idCounter = new AtomicLong() - val logic = new BroadcastSinkLogic(shape) + val pendingUnregistrations = new AtomicInteger(0) + + val logic = new BroadcastSinkLogic(shape, pendingUnregistrations) val source = new GraphStage[SourceShape[T]] { val out: Outlet[T] = Outlet("BroadcastHub.out") @@ -869,8 +878,10 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I // upon which the `RegistrationPending` logic itself unregisters this consumer. // In particular, this client must not send the `Unregister` event itself because the values in // `previousPublishedOffset` and `offset` are wrong. - if ((hubCallback ne null) && offsetInitialized) + if ((hubCallback ne null) && offsetInitialized) { + pendingUnregistrations.incrementAndGet() hubCallback.invoke(UnRegister(id, previousPublishedOffset, offset)) + } } private def onCommand(cmd: ConsumerEvent): Unit = cmd match { From 0b9df141c50eebc01e7d4931ac08d31a7e1cb062 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 10:36:13 +0000 Subject: [PATCH 2/7] Tighten BroadcastHub handoff race handling Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/3b8db3b8-fb93-4089-a882-327922405f8f Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../pekko/stream/scaladsl/HubSpec.scala | 16 +++-- .../apache/pekko/stream/scaladsl/Hub.scala | 63 +++++++++++-------- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 79e3dabb024..578ce2eff43 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -401,18 +401,22 @@ class HubSpec extends StreamSpec { } "ensure that subsequent consumers see subsequent elements without gap" in { - var firstConsumer: TestSubscriber.Probe[Int] = null + var firstConsumer = Option.empty[TestSubscriber.Probe[Int]] + var registrations = 0 - def registerConsumerCallback(id: Long): Unit = { - if (id == 1) firstConsumer.cancel() + def registerConsumerCallback(_id: Long): Unit = { + registrations += 1 + if (registrations == 2) firstConsumer.foreach(_.cancel()) } val source = Source(1 to 20).runWith(Sink.fromGraph(new BroadcastHub[Int](0, 8, registerConsumerCallback))) - firstConsumer = source.runWith(TestSink[Int]()) - firstConsumer.request(10) - firstConsumer.expectNextN((1 to 10).toVector) + firstConsumer = Some(source.runWith(TestSink[Int]())) + firstConsumer.foreach { consumer => + consumer.request(10) + consumer.expectNextN((1 to 10).toVector) + } val secondConsumer = source.runWith(TestSink[Int]()) secondConsumer.request(10) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index 42df35213bd..3750faea65a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -516,7 +516,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I extends HubState private case class Closed(failure: Option[Throwable]) extends HubState - private class BroadcastSinkLogic(_shape: Shape, pendingUnregistrations: AtomicInteger) + private class BroadcastSinkLogic(_shape: Shape, registrationLock: AnyRef, pendingUnregistrations: AtomicInteger) extends GraphStageLogic(_shape) with InHandler { private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise() @@ -590,28 +590,36 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I checkUnblock(previousOffset) case RegistrationPending => - if (pendingUnregistrations.get() == 0) { - state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer => - val startFrom = head - activeConsumers += 1 - addConsumer(consumer, startFrom) - // add a callback hook so that we can control the interleaving in tests - registrationPendingCallback(consumer.id) - // in case the consumer is already stopped we need to undo registration - implicit val ec = materializer.executionContext - consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach { - case _: StreamDetachedException => - pendingUnregistrations.incrementAndGet() - callbackPromise.future.foreach(callback => - callback.invoke(UnRegister(consumer.id, startFrom, startFrom))) - case _ => () + var shouldRetry = false + registrationLock.synchronized { + if (pendingUnregistrations.get() == 0) { + state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer => + val startFrom = head + activeConsumers += 1 + addConsumer(consumer, startFrom) + // add a callback hook so that we can control the interleaving in tests + registrationPendingCallback(consumer.id) + // in case the consumer is already stopped we need to undo registration + implicit val ec = materializer.executionContext + consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach { + case _: StreamDetachedException => + registrationLock.synchronized { + pendingUnregistrations.incrementAndGet() + callbackPromise.future.foreach(callback => + callback.invoke(UnRegister(consumer.id, startFrom, startFrom))) + } + case _ => () + } } + if (activeConsumers >= startAfterNrOfConsumers) { + initialized = true + } + tryPull() + } else { + shouldRetry = true } - if (activeConsumers >= startAfterNrOfConsumers) { - initialized = true - } - tryPull() - } else { + } + if (shouldRetry) { callbackPromise.future.foreach(_.invoke(RegistrationPending))(materializer.executionContext) } @@ -628,7 +636,9 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I tryPull() } } else checkUnblock(previousOffset) - pendingUnregistrations.decrementAndGet() + registrationLock.synchronized { + pendingUnregistrations.decrementAndGet() + } } } @@ -788,9 +798,10 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I inheritedAttributes: Attributes): (GraphStageLogic, Source[T, NotUsed]) = { val idCounter = new AtomicLong() + val registrationLock = new AnyRef val pendingUnregistrations = new AtomicInteger(0) - val logic = new BroadcastSinkLogic(shape, pendingUnregistrations) + val logic = new BroadcastSinkLogic(shape, registrationLock, pendingUnregistrations) val source = new GraphStage[SourceShape[T]] { val out: Outlet[T] = Outlet("BroadcastHub.out") @@ -879,8 +890,10 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I // In particular, this client must not send the `Unregister` event itself because the values in // `previousPublishedOffset` and `offset` are wrong. if ((hubCallback ne null) && offsetInitialized) { - pendingUnregistrations.incrementAndGet() - hubCallback.invoke(UnRegister(id, previousPublishedOffset, offset)) + registrationLock.synchronized { + pendingUnregistrations.incrementAndGet() + hubCallback.invoke(UnRegister(id, previousPublishedOffset, offset)) + } } } From af7b1e0cb1d5179606168838266db7d2e774f2a1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 10:37:07 +0000 Subject: [PATCH 3/7] Use deterministic BroadcastHub handoff regression Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/3b8db3b8-fb93-4089-a882-327922405f8f Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../scala/org/apache/pekko/stream/scaladsl/HubSpec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 578ce2eff43..c711f49e948 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -13,6 +13,8 @@ package org.apache.pekko.stream.scaladsl +import java.util.concurrent.atomic.AtomicInteger + import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ @@ -402,11 +404,10 @@ class HubSpec extends StreamSpec { "ensure that subsequent consumers see subsequent elements without gap" in { var firstConsumer = Option.empty[TestSubscriber.Probe[Int]] - var registrations = 0 + val registrations = new AtomicInteger(0) def registerConsumerCallback(_id: Long): Unit = { - registrations += 1 - if (registrations == 2) firstConsumer.foreach(_.cancel()) + if (registrations.incrementAndGet() == 2) firstConsumer.foreach(_.cancel()) } val source = From 8f8f49ccbd855ca560113c2fc6b38a308d36e492 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 10:53:28 +0000 Subject: [PATCH 4/7] Revert BroadcastHub production changes Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/25fead82-6b9d-4995-9cdd-83a627a9c572 Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../pekko/stream/scaladsl/HubSpec.scala | 19 +++--- .../apache/pekko/stream/scaladsl/Hub.scala | 64 ++++++------------- 2 files changed, 29 insertions(+), 54 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index c711f49e948..d7d40a8e24e 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -13,7 +13,7 @@ package org.apache.pekko.stream.scaladsl -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future, Promise } @@ -403,21 +403,20 @@ class HubSpec extends StreamSpec { } "ensure that subsequent consumers see subsequent elements without gap" in { - var firstConsumer = Option.empty[TestSubscriber.Probe[Int]] + val firstConsumer = new AtomicReference[TestSubscriber.Probe[Int]]() val registrations = new AtomicInteger(0) - def registerConsumerCallback(_id: Long): Unit = { - if (registrations.incrementAndGet() == 2) firstConsumer.foreach(_.cancel()) + def cancelFirstConsumerOnSecondRegistration(_id: Long): Unit = { + if (registrations.incrementAndGet() == 2) Option(firstConsumer.get()).foreach(_.cancel()) } val source = - Source(1 to 20).runWith(Sink.fromGraph(new BroadcastHub[Int](0, 8, registerConsumerCallback))) + Source(1 to 20).runWith(Sink.fromGraph(new BroadcastHub[Int](0, 8, cancelFirstConsumerOnSecondRegistration))) - firstConsumer = Some(source.runWith(TestSink[Int]())) - firstConsumer.foreach { consumer => - consumer.request(10) - consumer.expectNextN((1 to 10).toVector) - } + val initialConsumer = source.runWith(TestSink[Int]()) + firstConsumer.set(initialConsumer) + initialConsumer.request(10) + initialConsumer.expectNextN((1 to 10).toVector) val secondConsumer = source.runWith(TestSink[Int]()) secondConsumer.request(10) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index 3750faea65a..7a8adf7c53e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -516,8 +516,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I extends HubState private case class Closed(failure: Option[Throwable]) extends HubState - private class BroadcastSinkLogic(_shape: Shape, registrationLock: AnyRef, pendingUnregistrations: AtomicInteger) - extends GraphStageLogic(_shape) with InHandler { + private class BroadcastSinkLogic(_shape: Shape) extends GraphStageLogic(_shape) with InHandler { private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise() private[this] val noRegistrationsState = Open(callbackPromise.future, Nil) @@ -590,38 +589,25 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I checkUnblock(previousOffset) case RegistrationPending => - var shouldRetry = false - registrationLock.synchronized { - if (pendingUnregistrations.get() == 0) { - state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer => - val startFrom = head - activeConsumers += 1 - addConsumer(consumer, startFrom) - // add a callback hook so that we can control the interleaving in tests - registrationPendingCallback(consumer.id) - // in case the consumer is already stopped we need to undo registration - implicit val ec = materializer.executionContext - consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach { - case _: StreamDetachedException => - registrationLock.synchronized { - pendingUnregistrations.incrementAndGet() - callbackPromise.future.foreach(callback => - callback.invoke(UnRegister(consumer.id, startFrom, startFrom))) - } - case _ => () - } - } - if (activeConsumers >= startAfterNrOfConsumers) { - initialized = true - } - tryPull() - } else { - shouldRetry = true + state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer => + val startFrom = head + activeConsumers += 1 + addConsumer(consumer, startFrom) + // add a callback hook so that we can control the interleaving in tests + registrationPendingCallback(consumer.id) + // in case the consumer is already stopped we need to undo registration + implicit val ec = materializer.executionContext + consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach { + case _: StreamDetachedException => + callbackPromise.future.foreach(callback => + callback.invoke(UnRegister(consumer.id, startFrom, startFrom))) + case _ => () } } - if (shouldRetry) { - callbackPromise.future.foreach(_.invoke(RegistrationPending))(materializer.executionContext) + if (activeConsumers >= startAfterNrOfConsumers) { + initialized = true } + tryPull() case UnRegister(id, previousOffset, finalOffset) => if (findAndRemoveConsumer(id, previousOffset) ne null) @@ -636,9 +622,6 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I tryPull() } } else checkUnblock(previousOffset) - registrationLock.synchronized { - pendingUnregistrations.decrementAndGet() - } } } @@ -798,10 +781,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I inheritedAttributes: Attributes): (GraphStageLogic, Source[T, NotUsed]) = { val idCounter = new AtomicLong() - val registrationLock = new AnyRef - val pendingUnregistrations = new AtomicInteger(0) - - val logic = new BroadcastSinkLogic(shape, registrationLock, pendingUnregistrations) + val logic = new BroadcastSinkLogic(shape) val source = new GraphStage[SourceShape[T]] { val out: Outlet[T] = Outlet("BroadcastHub.out") @@ -889,12 +869,8 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I // upon which the `RegistrationPending` logic itself unregisters this consumer. // In particular, this client must not send the `Unregister` event itself because the values in // `previousPublishedOffset` and `offset` are wrong. - if ((hubCallback ne null) && offsetInitialized) { - registrationLock.synchronized { - pendingUnregistrations.incrementAndGet() - hubCallback.invoke(UnRegister(id, previousPublishedOffset, offset)) - } - } + if ((hubCallback ne null) && offsetInitialized) + hubCallback.invoke(UnRegister(id, previousPublishedOffset, offset)) } private def onCommand(cmd: ConsumerEvent): Unit = cmd match { From 5e473f61d9d03d5cf2407c3d4049c0e10d675a97 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 10:54:24 +0000 Subject: [PATCH 5/7] Keep BroadcastHub fix test-only Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/25fead82-6b9d-4995-9cdd-83a627a9c572 Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index d7d40a8e24e..1a2bd4fc1bc 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -406,12 +406,12 @@ class HubSpec extends StreamSpec { val firstConsumer = new AtomicReference[TestSubscriber.Probe[Int]]() val registrations = new AtomicInteger(0) - def cancelFirstConsumerOnSecondRegistration(_id: Long): Unit = { + def registerConsumerCallback(_id: Long): Unit = { if (registrations.incrementAndGet() == 2) Option(firstConsumer.get()).foreach(_.cancel()) } val source = - Source(1 to 20).runWith(Sink.fromGraph(new BroadcastHub[Int](0, 8, cancelFirstConsumerOnSecondRegistration))) + Source(1 to 20).runWith(Sink.fromGraph(new BroadcastHub[Int](0, 8, registerConsumerCallback))) val initialConsumer = source.runWith(TestSink[Int]()) firstConsumer.set(initialConsumer) From a9102c6a17176a97cff028578c9607b3eb218b56 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 7 May 2026 13:33:23 +0100 Subject: [PATCH 6/7] Update HubSpec.scala --- .../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 1a2bd4fc1bc..464547c191b 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -15,6 +15,7 @@ package org.apache.pekko.stream.scaladsl import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } +import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ @@ -406,7 +407,8 @@ class HubSpec extends StreamSpec { val firstConsumer = new AtomicReference[TestSubscriber.Probe[Int]]() val registrations = new AtomicInteger(0) - def registerConsumerCallback(_id: Long): Unit = { + @nowarn("cat=unused") + def registerConsumerCallback(id: Long): Unit = { if (registrations.incrementAndGet() == 2) Option(firstConsumer.get()).foreach(_.cancel()) } From f11feeed12e4c1724e974cf91348206009c58266 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 13:31:23 +0000 Subject: [PATCH 7/7] Rewrite HubSpec sequential consumer test to eliminate race condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test 'ensure that subsequent consumers see subsequent elements without gap' was flaky because it cancelled the first consumer via a callback that fires INSIDE RegistrationPending processing. The cancel is asynchronous, so UnRegister arrives at the hub AFTER RegistrationPending has already read a stale head value, causing consumer 2 to start from the wrong offset. Rewrite the test to let consumer 1 complete naturally via take(10). Pekko's GraphStageLogic.internalCompleteStage processes inlets (cancel) before outlets (complete), so the hub's UnRegister is enqueued in the hub actor's mailbox before Sink.seq's postStop resolves the future. Consumer 2's RegistrationPending is only sent after futureValue returns, guaranteeing the hub processes UnRegister → head advanced before RegistrationPending. Remove now-unused imports: AtomicInteger, AtomicReference, @nowarn. No production code changes. Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/0ed4c2ab-7f82-4e08-85af-f4b061725c67 Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../pekko/stream/scaladsl/HubSpec.scala | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 464547c191b..21fe1514b9a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -13,9 +13,6 @@ package org.apache.pekko.stream.scaladsl -import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } - -import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ @@ -404,21 +401,14 @@ class HubSpec extends StreamSpec { } "ensure that subsequent consumers see subsequent elements without gap" in { - val firstConsumer = new AtomicReference[TestSubscriber.Probe[Int]]() - val registrations = new AtomicInteger(0) - - @nowarn("cat=unused") - def registerConsumerCallback(id: Long): Unit = { - if (registrations.incrementAndGet() == 2) Option(firstConsumer.get()).foreach(_.cancel()) - } - - val source = - Source(1 to 20).runWith(Sink.fromGraph(new BroadcastHub[Int](0, 8, registerConsumerCallback))) - - val initialConsumer = source.runWith(TestSink[Int]()) - firstConsumer.set(initialConsumer) - initialConsumer.request(10) - initialConsumer.expectNextN((1 to 10).toVector) + val source = Source(1 to 20).runWith(BroadcastHub.sink(8)) + + // Let the first consumer take exactly 10 elements and complete naturally. + // GraphStageLogic.internalCompleteStage cancels inlets before completing outlets, + // so the hub's UnRegister event is enqueued in the hub actor's mailbox before + // Sink.seq's postStop resolves the future. The second consumer's RegistrationPending + // is only sent after futureValue returns, guaranteeing UnRegister is processed first. + source.take(10).runWith(Sink.seq).futureValue should ===(1 to 10) val secondConsumer = source.runWith(TestSink[Int]()) secondConsumer.request(10)