44
55namespace Webgriffe \Esb ;
66
7- use Amp \Beanstalk \BeanstalkClient ;
8- use function Amp \call ;
97use Amp \Loop ;
108use Amp \Promise ;
119use Psr \Log \LoggerInterface ;
1210use Webgriffe \Esb \Model \FlowConfig ;
1311use Webgriffe \Esb \Model \Job ;
1412use Webgriffe \Esb \Model \ProducedJobEvent ;
13+ use Webgriffe \Esb \Service \BatchManagerFactory ;
1514use Webgriffe \Esb \Service \CronProducersServer ;
16- use Webgriffe \Esb \Service \ElasticSearch ;
1715use Webgriffe \Esb \Service \HttpProducersServer ;
18- use Webgriffe \Esb \Service \ProducerQueueManagerInterface ;
19- use Webgriffe \Esb \Service \QueueManager ;
20-
21- final class ProducerInstance implements ProducerInstanceInterface
22- {
23- /**
24- * @var FlowConfig
25- */
26- private $ flowConfig ;
27-
28- /**
29- * @var ProducerInterface
30- */
31- private $ producer ;
32-
33- /**
34- * @var LoggerInterface
35- */
36- private $ logger ;
3716
38- /**
39- * @var HttpProducersServer
40- */
41- private $ httpProducersServer ;
17+ use Webgriffe \Esb \Service \QueueBackendInterface ;
4218
43- /**
44- * @var CronProducersServer
45- */
46- private $ cronProducersServer ;
19+ use function Amp \call ;
4720
48- /**
49- * @var ProducerQueueManagerInterface
50- */
51- private $ queueManager ;
21+ final class ProducerInstance implements ProducerInstanceInterface
22+ {
5223
5324 public function __construct (
54- FlowConfig $ flowConfig ,
55- ProducerInterface $ producer ,
56- ?BeanstalkClient $ beanstalkClient ,
57- LoggerInterface $ logger ,
58- HttpProducersServer $ httpProducersServer ,
59- CronProducersServer $ cronProducersServer ,
60- ?ElasticSearch $ elasticSearch ,
61- ?ProducerQueueManagerInterface $ queueManager = null
25+ private readonly FlowConfig $ flowConfig ,
26+ private readonly ProducerInterface $ producer ,
27+ private readonly LoggerInterface $ logger ,
28+ private readonly HttpProducersServer $ httpProducersServer ,
29+ private readonly CronProducersServer $ cronProducersServer ,
30+ private readonly QueueBackendInterface $ queueBackend ,
31+ private readonly BatchManagerFactory $ batchManagerFactory ,
6232 ) {
63- if ($ beanstalkClient !== null ) {
64- trigger_deprecation (
65- 'webgriffe/esb ' ,
66- '2.2 ' ,
67- 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' .
68- 'Please pass a "%s" instead. ' ,
69- BeanstalkClient::class,
70- __CLASS__ ,
71- ProducerQueueManagerInterface::class
72- );
73- }
74- if ($ elasticSearch !== null ) {
75- trigger_deprecation (
76- 'webgriffe/esb ' ,
77- '2.2 ' ,
78- 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' .
79- 'Please pass a "%s" instead. ' ,
80- ElasticSearch::class,
81- __CLASS__ ,
82- ProducerQueueManagerInterface::class
83- );
84- }
85- $ this ->flowConfig = $ flowConfig ;
86- $ this ->producer = $ producer ;
87- $ this ->logger = $ logger ;
88- $ this ->httpProducersServer = $ httpProducersServer ;
89- $ this ->cronProducersServer = $ cronProducersServer ;
90-
91- if ($ queueManager === null ) {
92- trigger_deprecation (
93- 'webgriffe/esb ' ,
94- '2.2 ' ,
95- 'Not passing a "%s" to "%s" is deprecated and will be required in 3.0. ' ,
96- ProducerQueueManagerInterface::class,
97- __CLASS__
98- );
99-
100- if (!$ beanstalkClient ) {
101- throw new \RuntimeException ('Cannot create a QueueManager without the Beanstalk client! ' );
102- }
103-
104- if (!$ elasticSearch ) {
105- throw new \RuntimeException ('Cannot create a QueueManager without the ElasticSearch client ' );
106- }
107-
108- $ queueManager = new QueueManager (
109- $ this ->flowConfig ,
110- $ beanstalkClient ,
111- $ elasticSearch ,
112- $ this ->logger ,
113- 1000
114- );
115- }
116- $ this ->queueManager = $ queueManager ;
11733 }
11834
11935 public function boot (): Promise
12036 {
12137 return call (function () {
12238 yield $ this ->producer ->init ();
123- yield $ this ->queueManager ->boot ();
39+ yield $ this ->queueBackend ->boot ();
12440
12541 $ this ->logger ->info (
12642 'A Producer has been successfully initialized ' ,
@@ -164,6 +80,7 @@ function ($watcherId) {
16480 public function produceAndQueueJobs ($ data = null ): Promise
16581 {
16682 return call (function () use ($ data ) {
83+ $ batchManager = $ this ->batchManagerFactory ->create ();
16784 $ jobsCount = 0 ;
16885 $ job = null ;
16986 try {
@@ -172,10 +89,10 @@ public function produceAndQueueJobs($data = null): Promise
17289 /** @var Job $job */
17390 $ job = $ jobs ->getCurrent ();
17491 $ job ->addEvent (new ProducedJobEvent (new \DateTime (), \get_class ($ this ->producer )));
175- $ jobsCount += yield $ this -> queueManager ->enqueue ($ job );
92+ $ jobsCount += yield $ batchManager ->enqueue ($ job );
17693 }
17794
178- $ jobsCount += yield $ this -> queueManager ->flush ();
95+ $ jobsCount += yield $ batchManager ->flush ();
17996 } catch (\Throwable $ error ) {
18097 $ this ->logger ->error (
18198 'An error occurred producing/queueing jobs. ' ,
0 commit comments