Skip to content

Commit ea02d1c

Browse files
authored
Merge pull request #32 from driftphp/feature/added-event-exchanges-to-consumer
Added event exchanges to the consumer
2 parents 92b694a + 7fd6431 commit ea02d1c

2 files changed

Lines changed: 61 additions & 6 deletions

File tree

Console/CommandConsumerCommand.php

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
use Drift\CommandBus\Async\AsyncAdapter;
1919
use Drift\CommandBus\Bus\InlineCommandBus;
2020
use Drift\Console\OutputPrinter;
21-
use React\EventLoop\LoopInterface;
21+
use Drift\EventBus\Bus\EventBus;
22+
use Drift\EventBus\Subscriber\EventBusSubscriber;
2223
use Symfony\Component\Console\Command\Command;
2324
use Symfony\Component\Console\Input\InputInterface;
2425
use Symfony\Component\Console\Input\InputOption;
@@ -31,22 +32,25 @@ class CommandConsumerCommand extends Command
3132
{
3233
private AsyncAdapter $asyncAdapter;
3334
private InlineCommandBus $commandBus;
35+
private ?EventBusSubscriber $eventBusSubscriber;
3436

3537
/**
3638
* ConsumeCommand constructor.
3739
*
38-
* @param AsyncAdapter $asyncAdapter
39-
* @param InlineCommandBus $commandBus
40-
* @param LoopInterface $loop
40+
* @param AsyncAdapter $asyncAdapter
41+
* @param InlineCommandBus $commandBus
42+
* @param EventBusSubscriber|null $eventBusSubscriber
4143
*/
4244
public function __construct(
4345
AsyncAdapter $asyncAdapter,
44-
InlineCommandBus $commandBus
46+
InlineCommandBus $commandBus,
47+
?EventBusSubscriber $eventBusSubscriber
4548
) {
4649
parent::__construct();
4750

4851
$this->asyncAdapter = $asyncAdapter;
4952
$this->commandBus = $commandBus;
53+
$this->eventBusSubscriber = $eventBusSubscriber;
5054
}
5155

5256
/**
@@ -62,6 +66,18 @@ protected function configure()
6266
'Number of jobs to handle before dying',
6367
0
6468
);
69+
70+
/*
71+
* If we have the EventBus loaded, we can add listeners as well
72+
*/
73+
if (class_exists(EventBus::class)) {
74+
$this->addOption(
75+
'exchange',
76+
null,
77+
InputOption::VALUE_IS_ARRAY | InputOption::VALUE_REQUIRED,
78+
'Exchanges to listen'
79+
);
80+
}
6581
}
6682

6783
/**
@@ -82,6 +98,21 @@ protected function execute(InputInterface $input, OutputInterface $output)
8298
(new CommandBusHeaderMessage('', 'Using adapter '.$adapterName))->print($outputPrinter);
8399
(new CommandBusHeaderMessage('', 'Started listening...'))->print($outputPrinter);
84100

101+
$exchanges = self::buildQueueArray($input);
102+
if (
103+
class_exists(EventBusSubscriber::class) &&
104+
!empty($exchanges) &&
105+
!is_null($this->eventBusSubscriber)
106+
) {
107+
(new CommandBusHeaderMessage('', 'Kernel connected to exchanges.'))->print($outputPrinter);
108+
$this
109+
->eventBusSubscriber
110+
->subscribeToExchanges(
111+
$exchanges,
112+
$outputPrinter
113+
);
114+
}
115+
85116
$this
86117
->asyncAdapter
87118
->consume(
@@ -92,4 +123,26 @@ protected function execute(InputInterface $input, OutputInterface $output)
92123

93124
return 0;
94125
}
126+
127+
/**
128+
* Build queue architecture from array of strings.
129+
*
130+
* @param InputInterface $input
131+
*
132+
* @return array
133+
*/
134+
private static function buildQueueArray(InputInterface $input): array
135+
{
136+
if (!$input->hasOption('exchange')) {
137+
return [];
138+
}
139+
140+
$exchanges = [];
141+
foreach ($input->getOption('exchange') as $exchange) {
142+
$exchangeParts = explode(':', $exchange, 2);
143+
$exchanges[$exchangeParts[0]] = $exchangeParts[1] ?? '';
144+
}
145+
146+
return $exchanges;
147+
}
95148
}

DependencyInjection/CompilerPass/BusCompilerPass.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
use Drift\CommandBus\Middleware\AsyncMiddleware;
3535
use Drift\CommandBus\Middleware\HandlerMiddleware;
3636
use Drift\CommandBus\Middleware\Middleware;
37+
use Drift\EventBus\Subscriber\EventBusSubscriber;
3738
use Drift\Postgresql\DependencyInjection\CompilerPass\PostgresqlCompilerPass;
3839
use Drift\Redis\DependencyInjection\CompilerPass\RedisCompilerPass;
3940
use Exception;
@@ -42,6 +43,7 @@
4243
use ReflectionException;
4344
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
4445
use Symfony\Component\DependencyInjection\ContainerBuilder;
46+
use Symfony\Component\DependencyInjection\ContainerInterface;
4547
use Symfony\Component\DependencyInjection\Definition;
4648
use Symfony\Component\DependencyInjection\Reference;
4749

@@ -424,7 +426,7 @@ private static function createCommandConsumer(ContainerBuilder $container)
424426
$consumer = new Definition(CommandConsumerCommand::class, [
425427
new Reference(AsyncAdapter::class),
426428
new Reference('drift.inline_command_bus'),
427-
new Reference('reactphp.event_loop'),
429+
new Reference(EventBusSubscriber::class, ContainerInterface::NULL_ON_INVALID_REFERENCE),
428430
]);
429431

430432
$consumer->addTag('console.command', [

0 commit comments

Comments
 (0)