1515import java .util .concurrent .atomic .AtomicReference ;
1616
1717import com .fasterxml .jackson .core .JsonProcessingException ;
18+ import io .a2a .server .tasks .InMemoryTaskStore ;
19+ import io .a2a .server .tasks .PushNotificationSender ;
1820import io .a2a .spec .A2AError ;
1921import io .a2a .spec .A2AServerException ;
2022import io .a2a .spec .Artifact ;
2830import io .a2a .spec .TaskStatusUpdateEvent ;
2931import io .a2a .spec .TextPart ;
3032import io .a2a .util .Utils ;
33+ import org .junit .jupiter .api .AfterEach ;
3134import org .junit .jupiter .api .BeforeEach ;
3235import org .junit .jupiter .api .Test ;
3336
3437public class EventConsumerTest {
3538
39+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
40+
3641 private EventQueue eventQueue ;
3742 private EventConsumer eventConsumer ;
38-
43+ private MainEventBus mainEventBus ;
44+ private MainEventBusProcessor mainEventBusProcessor ;
3945
4046 private static final String MINIMAL_TASK = """
4147 {
@@ -57,10 +63,25 @@ public class EventConsumerTest {
5763
5864 @ BeforeEach
5965 public void init () {
60- eventQueue = EventQueueUtil .getEventQueueBuilder ().build ().tap ();
66+ // Set up MainEventBus and processor for production-like test environment
67+ InMemoryTaskStore taskStore = new InMemoryTaskStore ();
68+ mainEventBus = new MainEventBus ();
69+ mainEventBusProcessor = new MainEventBusProcessor (mainEventBus , taskStore , NOOP_PUSHNOTIFICATION_SENDER );
70+ EventQueueUtil .start (mainEventBusProcessor );
71+
72+ eventQueue = EventQueueUtil .getEventQueueBuilder ()
73+ .mainEventBus (mainEventBus )
74+ .build ().tap ();
6175 eventConsumer = new EventConsumer (eventQueue );
6276 }
6377
78+ @ AfterEach
79+ public void cleanup () {
80+ if (mainEventBusProcessor != null ) {
81+ EventQueueUtil .stop (mainEventBusProcessor );
82+ }
83+ }
84+
6485 @ Test
6586 public void testConsumeOneTaskEvent () throws Exception {
6687 Task event = Utils .unmarshalFrom (MINIMAL_TASK , Task .TYPE_REFERENCE );
@@ -343,7 +364,9 @@ public void onComplete() {
343364
344365 @ Test
345366 public void testConsumeAllStopsOnQueueClosed () throws Exception {
346- EventQueue queue = EventQueueUtil .getEventQueueBuilder ().build ().tap ();
367+ EventQueue queue = EventQueueUtil .getEventQueueBuilder ()
368+ .mainEventBus (mainEventBus )
369+ .build ().tap ();
347370 EventConsumer consumer = new EventConsumer (queue );
348371
349372 // Close the queue immediately
@@ -389,7 +412,9 @@ public void onComplete() {
389412
390413 @ Test
391414 public void testConsumeAllHandlesQueueClosedException () throws Exception {
392- EventQueue queue = EventQueueUtil .getEventQueueBuilder ().build ().tap ();
415+ EventQueue queue = EventQueueUtil .getEventQueueBuilder ()
416+ .mainEventBus (mainEventBus )
417+ .build ().tap ();
393418 EventConsumer consumer = new EventConsumer (queue );
394419
395420 // Add a message event (which will complete the stream)
@@ -447,7 +472,9 @@ public void onComplete() {
447472
448473 @ Test
449474 public void testConsumeAllTerminatesOnQueueClosedEvent () throws Exception {
450- EventQueue queue = EventQueueUtil .getEventQueueBuilder ().build ().tap ();
475+ EventQueue queue = EventQueueUtil .getEventQueueBuilder ()
476+ .mainEventBus (mainEventBus )
477+ .build ().tap ();
451478 EventConsumer consumer = new EventConsumer (queue );
452479
453480 // Enqueue a QueueClosedEvent (poison pill)
0 commit comments