diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 1a396d823ef..21fe1514b9a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -402,8 +402,18 @@ class HubSpec extends StreamSpec { "ensure that subsequent consumers see subsequent elements without gap" in { val source = Source(1 to 20).runWith(BroadcastHub.sink(8)) + + // Let the first consumer take exactly 10 elements and complete naturally. + // GraphStageLogic.internalCompleteStage cancels inlets before completing outlets, + // so the hub's UnRegister event is enqueued in the hub actor's mailbox before + // Sink.seq's postStop resolves the future. The second consumer's RegistrationPending + // is only sent after futureValue returns, guaranteeing UnRegister is processed first. source.take(10).runWith(Sink.seq).futureValue should ===(1 to 10) - source.take(10).runWith(Sink.seq).futureValue should ===(11 to 20) + + val secondConsumer = source.runWith(TestSink[Int]()) + secondConsumer.request(10) + secondConsumer.expectNextN((11 to 20).toVector) + secondConsumer.expectComplete() } "send the same elements to consumers of different speed attaching around the same time" in {