From 2d4198ca7149c98849cba948d9e221134a4a0958 Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Mon, 6 Oct 2025 11:26:45 +0200 Subject: [PATCH 01/11] task: Proper type hint for Lock::run() --- Classes/Lock.php | 5 +++++ composer.json | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Classes/Lock.php b/Classes/Lock.php index 16b5c42..75c31b0 100644 --- a/Classes/Lock.php +++ b/Classes/Lock.php @@ -30,6 +30,11 @@ public function __construct(int $numberOfWorkers, string $lockFileDirectory) } } + /** + * @template T + * @param callable(): T $run + * @return T + */ public function run(callable $run) { $this->findSlot(); diff --git a/composer.json b/composer.json index 4fce6c0..ad733b5 100644 --- a/composer.json +++ b/composer.json @@ -13,7 +13,8 @@ "flowpack/jobqueue-common": "^3.0.0", "netlogix/supervisor": "^1.0", "php-amqplib/php-amqplib": "^2.11", - "t3n/jobqueue-rabbitmq": "^2.3.0" + "t3n/jobqueue-rabbitmq": "^2.3.0", + "php": "~8.2 || ~8.3 || ~8.4" }, "extra": { "neos": { From 731f712b5f86484f28a8cca9a91a96e758af6df3 Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Mon, 6 Oct 2025 11:27:59 +0200 Subject: [PATCH 02/11] feat: Allow sub process to boot without message The sub process "./flow job:execute" now starts with its "queue" argument but without its "messageCacheIdentifier" argument in an interactive process object. The messaeg identifier gets passed to this process when available, which might be at a later time. This allows the sub process to boot its fraemwork while still waiting for messaegs to come in. While this will increae memory consumption by a bit, it will decrease the wait time for executing a message. --- Classes/Worker.php | 156 ++++++++++++++++++++++++--------------------- composer.json | 3 +- 2 files changed, 87 insertions(+), 72 deletions(-) diff --git a/Classes/Worker.php b/Classes/Worker.php index 0709622..7ea1878 100644 --- a/Classes/Worker.php +++ b/Classes/Worker.php @@ -1,4 +1,5 @@ command = $command; - $this->queue = $queue; - $this->queueSettings = $queueSettings; - $this->messageCache = $messageCache; - $this->lock = $lock; } - public function prepare() + public function prepare(): void { + $this->cleanPool(); + $this->output = new ConsoleOutput(); - $this->outputLine('Watching queue "%s"', $this->queue->getName()); + $this->output->outputLine('Watching queue "%s"', [$this->queue->getName()]); } - public function executeMessage(Message $message) + public function executeMessage(Message $message): void { $messageCacheIdentifier = sha1(serialize($message)); $this->messageCache->set($messageCacheIdentifier, $message); - $this->lock->run(function() use (&$messageCacheIdentifier, &$commandOutput, &$result) { - exec( - $this->command . ' --messageCacheIdentifier=' . escapeshellarg($messageCacheIdentifier), - $commandOutput, - $result - ); - }); + $process = $this->lock->run( + fn () => $this->runFromPool($messageCacheIdentifier) + ); - if ($result === 0) { + if ($process->getExitCode() === 0) { $this->queue->finish($message->getIdentifier()); - $this->outputLine( - 'Successfully executed job "%s" (%s)', - $message->getIdentifier(), - join('', $commandOutput) + $this->output->outputLine( + 'Successfully executed job "%s"', + [$message->getIdentifier()] ); - + $this->output->outputLine('Output: %s', [$process->getOutput()]); } else { $maximumNumberOfReleases = isset($this->queueSettings['maximumNumberOfReleases']) - ? (int)$this->queueSettings['maximumNumberOfReleases'] + ? (int) $this->queueSettings['maximumNumberOfReleases'] : JobManager::DEFAULT_MAXIMUM_NUMBER_RELEASES; if ($message->getNumberOfReleases() < $maximumNumberOfReleases) { $releaseOptions = isset($this->queueSettings['releaseOptions']) ? $this->queueSettings['releaseOptions'] : []; $this->queue->release($message->getIdentifier(), $releaseOptions); $this->queue->reQueueMessage($message, $releaseOptions); - $this->outputLine( - 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE', - $message->getIdentifier(), - $this->queue->getName(), - $message->getNumberOfReleases() + 1, - $maximumNumberOfReleases + 1 + $this->output->outputLine('Output: %s', [$process->getOutput()]); + $this->output->outputLine( + 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE', + [ + $message->getIdentifier(), + $this->queue->getName(), + $message->getNumberOfReleases() + 1, + $maximumNumberOfReleases + 1, + ] ); - $this->outputLine('Message: %s', join('', $commandOutput)); - } else { $this->queue->abort($message->getIdentifier()); - $this->outputLine( - 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', - $message->getIdentifier(), - $this->queue->getName(), - $message->getNumberOfReleases() + 1, - $maximumNumberOfReleases + 1 + $this->output->outputLine('Output: %s', [$process->getOutput()]); + $this->output->outputLine( + 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', + [ + $message->getIdentifier(), + $this->queue->getName(), + $message->getNumberOfReleases() + 1, + $maximumNumberOfReleases + 1, + ] ); - $this->outputLine('Message: %s', join('', $commandOutput)); } } @@ -122,8 +100,44 @@ public function executeMessage(Message $message) } } - protected function outputLine(string $text, ...$arguments) + /** + * @return array{input: InputStream, process: Process} + */ + private function createProcess(): array + { + $input = new InputStream(); + $process = Process::fromShellCommandline( + command: $this->command, + input: $input, + timeout: 0 + ); + $process->start(); + return ['input' => $input, 'process' => $process]; + } + + private function runFromPool(string $messageCacheIdentifier): Process { - $this->output->outputLine($text, $arguments); + $this->cleanPool(); + ['input' => $input, 'process' => $process] = array_shift($this->pool); + $this->pool[] = $this->createProcess(); + + assert($input instanceof InputStream); + assert($process instanceof Process); + + $input->write($messageCacheIdentifier . PHP_EOL); + + $process->wait(); + return $process; + } + + private function cleanPool(): void + { + $this->pool = array_filter( + $this->pool, + fn (array $item) => $item['process']->isRunning() + ); + while (count($this->pool) < $this->poolSize) { + $this->pool[] = $this->createProcess(); + } } } diff --git a/composer.json b/composer.json index ad733b5..a1069e5 100644 --- a/composer.json +++ b/composer.json @@ -1,7 +1,7 @@ { "name": "netlogix/jobqueue-fast-rabbit", "type": "neos-package", - "description": "Low memory footprint worker for RabbitMQ jobs", + "description": "Worker for RabbitMQ jobs", "license": "MIT", "autoload": { "psr-4": { @@ -13,6 +13,7 @@ "flowpack/jobqueue-common": "^3.0.0", "netlogix/supervisor": "^1.0", "php-amqplib/php-amqplib": "^2.11", + "symfony/process": "^5.4 || ^6.0 || ^7.0", "t3n/jobqueue-rabbitmq": "^2.3.0", "php": "~8.2 || ~8.3 || ~8.4" }, From 37d9f64e8b8e954d0304452804ad74f4c7c77cee Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Mon, 6 Oct 2025 12:33:04 +0200 Subject: [PATCH 03/11] task: Preload singletons in idle state of job::execute The worker object starst the JobCommandController without having a message to execute and idles while waiting for th emessage. This change loads all available singletons during the idle phase so that they are present once a message is ready for execution. --- .../AllSingletonsPreloader.php | 82 +++++++++++++++++++ .../JobCommandInitializationAspect.php | 58 +++++++++++++ .../SingletonsPreloader.php | 8 ++ .../Settings.SingletonsPreloader.yaml | 14 ++++ 4 files changed, 162 insertions(+) create mode 100644 Classes/SingletonPreloading/AllSingletonsPreloader.php create mode 100644 Classes/SingletonPreloading/JobCommandInitializationAspect.php create mode 100644 Classes/SingletonPreloading/SingletonsPreloader.php create mode 100644 Configuration/Settings.SingletonsPreloader.yaml diff --git a/Classes/SingletonPreloading/AllSingletonsPreloader.php b/Classes/SingletonPreloading/AllSingletonsPreloader.php new file mode 100644 index 0000000..9f3712b --- /dev/null +++ b/Classes/SingletonPreloading/AllSingletonsPreloader.php @@ -0,0 +1,82 @@ +getSingletonClassNames($objectManager) as $className) { + try { + $objectManager->get($className); + } catch (Throwable $e) { + // ignore + } + } + } + + /** + * @return Traversable + */ + public function getSingletonClassNames(ObjectManagerInterface $objectManager): Traversable + { + foreach (self::getSingletonClassNamesFromReflection($objectManager) as $className) { + foreach ($this->ignoreClassNames as $ignoredClassName) { + if (is_a($className, $ignoredClassName, true)) { + continue; + } + } + yield $className; + } + } + + #[Flow\CompileStatic] + public static function getSingletonClassNamesFromReflection(ObjectManagerInterface $objectManager): array + { + return array_filter( + array: $objectManager->get(ReflectionService::class)->getAllClassNames(), + callback: static function ($className) use ($objectManager): bool { + try { + return $objectManager->getScope($className) === Configuration::SCOPE_SINGLETON; + } catch (\Exception $e) { + return false; + } + } + ); + } +} diff --git a/Classes/SingletonPreloading/JobCommandInitializationAspect.php b/Classes/SingletonPreloading/JobCommandInitializationAspect.php new file mode 100644 index 0000000..d4a6acd --- /dev/null +++ b/Classes/SingletonPreloading/JobCommandInitializationAspect.php @@ -0,0 +1,58 @@ +initializeCommandMethodArguments())')] + public function preloadSingletonsWhenJobCommandControllerGetsInitialized(JoinPointInterface $joinPoint): void + { + $jobCommandController = $joinPoint->getProxy(); + assert($jobCommandController instanceof JobCommandController); + + $reflection = new ClassReflection($jobCommandController); + + $commandMethodName = $reflection + ->getProperty('commandMethodName') + ->getValue($jobCommandController); + + if ($commandMethodName !== 'executeCommand') { + return; + } + + $request = $reflection + ->getProperty('request') + ->getValue($jobCommandController); + assert($request instanceof Request); + + $arguments = $reflection + ->getProperty('arguments') + ->getValue($jobCommandController); + assert($arguments instanceof Controller\Arguments); + + foreach ($arguments as $argument) { + assert($argument instanceof Controller\Argument); + if ($argument->isRequired() && !$request->hasArgument($argument->getName())) { + // only preload if the request is blocked by fetching an argument via stdin + $this->objectManager + ->get(SingletonsPreloader::class) + ->collect(); + return; + } + } + } +} diff --git a/Classes/SingletonPreloading/SingletonsPreloader.php b/Classes/SingletonPreloading/SingletonsPreloader.php new file mode 100644 index 0000000..da1df7d --- /dev/null +++ b/Classes/SingletonPreloading/SingletonsPreloader.php @@ -0,0 +1,8 @@ + Date: Tue, 7 Oct 2025 16:48:19 +0200 Subject: [PATCH 04/11] feat: Use React/EventLoop for ticks during sub process execution The previous setting only reacted to data sent from the child process to the parent process via STDOUT and STDERR but had no timeout based callback, so during long-running child processees with no data interaction, the parent could not e.g. keep its database alive. --- Classes/Worker.php | 25 +++++++++++++++++++++---- composer.json | 1 + 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/Classes/Worker.php b/Classes/Worker.php index 7ea1878..64d9563 100644 --- a/Classes/Worker.php +++ b/Classes/Worker.php @@ -8,11 +8,16 @@ use Flowpack\JobQueue\Common\Queue\Message; use Neos\Cache\Frontend\FrontendInterface; use Neos\Flow\Cli\ConsoleOutput; +use React\EventLoop; use Symfony\Component\Process\InputStream; use Symfony\Component\Process\Process; use t3n\JobQueue\RabbitMQ\Queue\RabbitQueue; use function array_shift; +use function fputs; + +use const STDERR; +use const STDOUT; final class Worker { @@ -60,7 +65,6 @@ public function executeMessage(Message $message): void 'Successfully executed job "%s"', [$message->getIdentifier()] ); - $this->output->outputLine('Output: %s', [$process->getOutput()]); } else { $maximumNumberOfReleases = isset($this->queueSettings['maximumNumberOfReleases']) ? (int) $this->queueSettings['maximumNumberOfReleases'] @@ -70,7 +74,6 @@ public function executeMessage(Message $message): void $releaseOptions = isset($this->queueSettings['releaseOptions']) ? $this->queueSettings['releaseOptions'] : []; $this->queue->release($message->getIdentifier(), $releaseOptions); $this->queue->reQueueMessage($message, $releaseOptions); - $this->output->outputLine('Output: %s', [$process->getOutput()]); $this->output->outputLine( 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE', [ @@ -82,7 +85,6 @@ public function executeMessage(Message $message): void ); } else { $this->queue->abort($message->getIdentifier()); - $this->output->outputLine('Output: %s', [$process->getOutput()]); $this->output->outputLine( 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', [ @@ -126,7 +128,22 @@ private function runFromPool(string $messageCacheIdentifier): Process $input->write($messageCacheIdentifier . PHP_EOL); - $process->wait(); + $loop = EventLoop\Loop::get(); + $loop->addPeriodicTimer(0.01, function (EventLoop\TimerInterface $timer) use ($process, $loop) { + try { + fputs(STDOUT, $process->getIncrementalOutput()); + fputs(STDERR, $process->getIncrementalErrorOutput()); + } catch (\Throwable $e) { + } + + if (!$process->isRunning()) { + $loop->cancelTimer($timer); + $loop->stop(); + } + }); + + $loop->run(); + return $process; } diff --git a/composer.json b/composer.json index a1069e5..1635f05 100644 --- a/composer.json +++ b/composer.json @@ -13,6 +13,7 @@ "flowpack/jobqueue-common": "^3.0.0", "netlogix/supervisor": "^1.0", "php-amqplib/php-amqplib": "^2.11", + "react/event-loop": "^1.5", "symfony/process": "^5.4 || ^6.0 || ^7.0", "t3n/jobqueue-rabbitmq": "^2.3.0", "php": "~8.2 || ~8.3 || ~8.4" From d6ed7b38202e100f4700813f7e27bfc8669f3ea5 Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Thu, 9 Oct 2025 14:27:31 +0200 Subject: [PATCH 05/11] task: Switch from symfony/process to react/child-process The react/child-process Process implementation can work on an event loop, which also supports periodic timers. --- Classes/Worker.php | 57 ++++++++++++++++++---------------------------- composer.json | 2 +- 2 files changed, 23 insertions(+), 36 deletions(-) diff --git a/Classes/Worker.php b/Classes/Worker.php index 64d9563..3c097fb 100644 --- a/Classes/Worker.php +++ b/Classes/Worker.php @@ -8,9 +8,8 @@ use Flowpack\JobQueue\Common\Queue\Message; use Neos\Cache\Frontend\FrontendInterface; use Neos\Flow\Cli\ConsoleOutput; +use React\ChildProcess\Process; use React\EventLoop; -use Symfony\Component\Process\InputStream; -use Symfony\Component\Process\Process; use t3n\JobQueue\RabbitMQ\Queue\RabbitQueue; use function array_shift; @@ -23,8 +22,10 @@ final class Worker { protected readonly ConsoleOutput $output; + private readonly EventLoop\LoopInterface $loop; + /** - * @var array{input: InputStream, process: Process}[] + * @var Process[] */ private array $pool = []; @@ -40,6 +41,7 @@ public function __construct( protected readonly FrontendInterface $messageCache, protected readonly Lock $lock ) { + $this->loop = EventLoop\Loop::get(); } public function prepare(): void @@ -102,47 +104,32 @@ public function executeMessage(Message $message): void } } - /** - * @return array{input: InputStream, process: Process} - */ - private function createProcess(): array + private function createProcess(): Process { - $input = new InputStream(); - $process = Process::fromShellCommandline( - command: $this->command, - input: $input, - timeout: 0 - ); - $process->start(); - return ['input' => $input, 'process' => $process]; + $process = new Process($this->command); + $timer = $this->loop->addPeriodicTimer(0.01, function () { + // TODO: Add keepalive for database if necessary + }); + $process->on('exit', function () use ($timer) { + $this->loop->cancelTimer($timer); + $this->loop->stop(); + }); + $process->start(loop: $this->loop, interval: 0.01); + return $process; } private function runFromPool(string $messageCacheIdentifier): Process { $this->cleanPool(); - ['input' => $input, 'process' => $process] = array_shift($this->pool); - $this->pool[] = $this->createProcess(); - - assert($input instanceof InputStream); + $process = array_shift($this->pool); assert($process instanceof Process); - $input->write($messageCacheIdentifier . PHP_EOL); - - $loop = EventLoop\Loop::get(); - $loop->addPeriodicTimer(0.01, function (EventLoop\TimerInterface $timer) use ($process, $loop) { - try { - fputs(STDOUT, $process->getIncrementalOutput()); - fputs(STDERR, $process->getIncrementalErrorOutput()); - } catch (\Throwable $e) { - } + $process->stdout->on('data', fn ($chunk) => fputs(STDOUT, $chunk)); + $process->stderr->on('data', fn ($chunk) => fputs(STDERR, $chunk)); - if (!$process->isRunning()) { - $loop->cancelTimer($timer); - $loop->stop(); - } - }); + $process->stdin->write($messageCacheIdentifier . PHP_EOL); - $loop->run(); + $this->loop->run(); return $process; } @@ -151,7 +138,7 @@ private function cleanPool(): void { $this->pool = array_filter( $this->pool, - fn (array $item) => $item['process']->isRunning() + fn (Process $process) => $process->isRunning() ); while (count($this->pool) < $this->poolSize) { $this->pool[] = $this->createProcess(); diff --git a/composer.json b/composer.json index 1635f05..a9fcfdc 100644 --- a/composer.json +++ b/composer.json @@ -13,8 +13,8 @@ "flowpack/jobqueue-common": "^3.0.0", "netlogix/supervisor": "^1.0", "php-amqplib/php-amqplib": "^2.11", + "react/child-process": "^0.6", "react/event-loop": "^1.5", - "symfony/process": "^5.4 || ^6.0 || ^7.0", "t3n/jobqueue-rabbitmq": "^2.3.0", "php": "~8.2 || ~8.3 || ~8.4" }, From fcce7dc499c3c7deb540485b403b64a13ba5334f Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Wed, 8 Oct 2025 21:27:11 +0200 Subject: [PATCH 06/11] feat: Overfill worker pool before taking out pooled worker This allows for a poolSize of 0, meaning there is no idling worker. --- Classes/Worker.php | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Classes/Worker.php b/Classes/Worker.php index 3c097fb..b320411 100644 --- a/Classes/Worker.php +++ b/Classes/Worker.php @@ -46,7 +46,7 @@ public function __construct( public function prepare(): void { - $this->cleanPool(); + $this->fillPool($this->poolSize); $this->output = new ConsoleOutput(); $this->output->outputLine('Watching queue "%s"', [$this->queue->getName()]); @@ -120,7 +120,7 @@ private function createProcess(): Process private function runFromPool(string $messageCacheIdentifier): Process { - $this->cleanPool(); + $this->fillPool($this->poolSize + 1); // Overfill $process = array_shift($this->pool); assert($process instanceof Process); @@ -134,13 +134,14 @@ private function runFromPool(string $messageCacheIdentifier): Process return $process; } - private function cleanPool(): void + private function fillPool(int $poolSize): void { + $poolSize = max($poolSize, 0); $this->pool = array_filter( $this->pool, fn (Process $process) => $process->isRunning() ); - while (count($this->pool) < $this->poolSize) { + while (count($this->pool) < $poolSize) { $this->pool[] = $this->createProcess(); } } From 28f7db5d311fee990c0e8b691bcdf103f8c5ab3f Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Thu, 9 Oct 2025 10:20:46 +0200 Subject: [PATCH 07/11] feat: Default to "none singletons preloader" Not loading dependencies with expiring connections is hard, so default to not loading any singletons at all in this package. --- Classes/Package.php | 44 ++++++ .../AllSingletonsPreloader.php | 136 +++++++++++++----- .../NoneSingletonsPreloader.php | 37 +++++ Configuration/Caches.yaml | 3 + Configuration/Objects.yaml | 28 ++++ 5 files changed, 216 insertions(+), 32 deletions(-) create mode 100644 Classes/Package.php create mode 100644 Classes/SingletonPreloading/NoneSingletonsPreloader.php create mode 100644 Configuration/Caches.yaml create mode 100644 Configuration/Objects.yaml diff --git a/Classes/Package.php b/Classes/Package.php new file mode 100644 index 0000000..e730447 --- /dev/null +++ b/Classes/Package.php @@ -0,0 +1,44 @@ +getSignalSlotDispatcher(); + assert($dispatcher instanceof Dispatcher); + + /** + * @see Compiler::compiledClasses() + * @see FileMonitor::emitFilesHaveChanged() + * @see AllSingletonsPreloader::flush() + */ + $dispatcher->connect( + signalClassName: FileMonitor::class, + signalName: 'filesHaveChanged', + slotClassNameOrObject: fn () => static::flushSingletonsPreloaderCache($bootstrap->getObjectManager()) + ); + } + + private function flushSingletonsPreloaderCache(ObjectManagerInterface $objectManager): void + { + if ($objectManager instanceof CompileTimeObjectManager) { + return; + } + $objectManager + ->get(AllSingletonsPreloader::CACHE) + ->flush(); + } +} diff --git a/Classes/SingletonPreloading/AllSingletonsPreloader.php b/Classes/SingletonPreloading/AllSingletonsPreloader.php index 9f3712b..b9db783 100644 --- a/Classes/SingletonPreloading/AllSingletonsPreloader.php +++ b/Classes/SingletonPreloading/AllSingletonsPreloader.php @@ -2,17 +2,18 @@ namespace Netlogix\JobQueue\FastRabbit\SingletonPreloading; -use Neos\Flow\Core\Bootstrap; +use Doctrine\DBAL\Connection; +use Doctrine\ORM\EntityManagerInterface; +use Neos\Cache\Frontend\VariableFrontend; use Neos\Flow\ObjectManagement\Configuration\Configuration; use Neos\Flow\ObjectManagement\ObjectManager; use Neos\Flow\ObjectManagement\ObjectManagerInterface; use Neos\Flow\Reflection\ReflectionService; use Neos\Flow\Annotations as Flow; use Throwable; - use Traversable; -use function array_filter; +use function class_exists; use function is_a; /** @@ -29,54 +30,125 @@ */ class AllSingletonsPreloader implements SingletonsPreloader { + public const string CACHE = 'Netlogix.JobQueue.FakeQueue:SingletonPreloaderCache'; + #[Flow\InjectConfiguration(path: 'AllSingletonsPreloader.ignoreClassNames', package: 'Netlogix.JobQueue.FastRabbit')] protected array $ignoreClassNames = []; - public function __construct( - protected readonly ObjectManager $objectManager, - protected readonly ReflectionService $reflectionService - ) { - } + #[Flow\Inject(name: AllSingletonsPreloader::CACHE, lazy: false)] + protected VariableFrontend $cache; + + #[Flow\Inject(lazy: false)] + protected ObjectManager $objectManager; + + #[Flow\Inject(lazy: false)] + protected ReflectionService $reflectionService; public function collect(): void { - $objectManager = Bootstrap::$staticObjectManager; - foreach ($this->getSingletonClassNames($objectManager) as $className) { - try { - $objectManager->get($className); - } catch (Throwable $e) { - // ignore - } + foreach ($this->getClassList() as $className => $buildInstance) { + $this->preload(className: $className, buildInstance: $buildInstance); } + $this->pauseExpiringObjects(); } /** - * @return Traversable + * @return array */ - public function getSingletonClassNames(ObjectManagerInterface $objectManager): Traversable + protected function getClassList(): array { - foreach (self::getSingletonClassNamesFromReflection($objectManager) as $className) { - foreach ($this->ignoreClassNames as $ignoredClassName) { - if (is_a($className, $ignoredClassName, true)) { - continue; - } + if ($this->cache->has('classList')) { + return $this->cache->get('classList'); + } else { + $list = [... $this->buildClassList()]; + $this->cache->set('classList', $list); + return $list; + } + } + + protected function preload(string $className, bool $buildInstance): void + { + try { + $buildInstance + ? $this->objectManager->get($className) + : class_exists(class: $className, autoload: true); + } catch (Throwable) { + // ignore + } + } + + protected function pauseExpiringObjects() + { + if ($this->objectManager->has(EntityManagerInterface::class)) { + $this->objectManager + ->get(EntityManagerInterface::class) + ->getConnection() + ->close(); + } + if ($this->objectManager->has(Connection::class)) { + $this->objectManager + ->get(Connection::class) + ->close(); + } + // TODO: There are other objects that might expire, for example ยด + } + + /** + * @return Traversable + */ + protected function buildClassList(): Traversable + { + foreach (self::getSingletonClassNamesFromReflection($this->objectManager) as $className => $buildInstance) { + yield $className => $buildInstance && !$this->ignoreClassName($className); + } + } + + protected function ignoreClassName(string $className): bool + { + foreach ($this->ignoreClassNames as $ignoredClassName) { + if (is_a($className, $ignoredClassName, true)) { + return true; } - yield $className; } + return false; } + /** + * @return array + */ #[Flow\CompileStatic] - public static function getSingletonClassNamesFromReflection(ObjectManagerInterface $objectManager): array + final public static function getSingletonClassNamesFromReflection(ObjectManagerInterface $objectManager): array { - return array_filter( - array: $objectManager->get(ReflectionService::class)->getAllClassNames(), - callback: static function ($className) use ($objectManager): bool { - try { - return $objectManager->getScope($className) === Configuration::SCOPE_SINGLETON; - } catch (\Exception $e) { - return false; + $reflection = $objectManager->get(ReflectionService::class); + assert($reflection instanceof ReflectionService); + $classNames = []; + foreach ($reflection->getAllClassNames() as $className) { + try { + if ($objectManager->getScope($className) !== Configuration::SCOPE_SINGLETON) { + /** + * Only preload singletons + */ + $classNames[$className] = false; + continue; } + } catch (\Exception $e) { + $classNames[$className] = false; + continue; } - ); + + $constructParameters = $reflection->getMethodParameters($className, '__construct'); + if (count($constructParameters)) { + /** + * Skip preloading for classes with constructor arguments because they are + * likely to depend on stateful objects that, in one way or other, expire, + * like database connections. + */ + $classNames[$className] = false; + } else { + $classNames[$className] = true; + } + } + + return $classNames; } } diff --git a/Classes/SingletonPreloading/NoneSingletonsPreloader.php b/Classes/SingletonPreloading/NoneSingletonsPreloader.php new file mode 100644 index 0000000..8dea0be --- /dev/null +++ b/Classes/SingletonPreloading/NoneSingletonsPreloader.php @@ -0,0 +1,37 @@ + Date: Fri, 10 Oct 2025 11:31:05 +0200 Subject: [PATCH 08/11] feat: Make pool size a setting per job queue When a child process is assigned a task, the pool is restocked to this amount. So this is the number of idle processes at any time. The total number of processes can be higher if there are busy ones. --- Classes/Worker.php | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Classes/Worker.php b/Classes/Worker.php index b320411..dbae934 100644 --- a/Classes/Worker.php +++ b/Classes/Worker.php @@ -15,6 +15,8 @@ use function array_shift; use function fputs; +use function max; + use const STDERR; use const STDOUT; @@ -30,9 +32,11 @@ final class Worker private array $pool = []; /** - * A pool size of 1 means one standby while 1 is working. + * When a child process is assigned a task, the pool is restocked to this + * amount. So this is the number of idle processes at any time. The total + * number of processes can be higher if there are busy ones. */ - protected int $poolSize = 1; + private readonly int $poolSize; public function __construct( protected readonly string $command, @@ -42,6 +46,7 @@ public function __construct( protected readonly Lock $lock ) { $this->loop = EventLoop\Loop::get(); + $this->poolSize = max(0, (int) ($queueSettings['poolSize'] ?? 1)); } public function prepare(): void From 64a09dbb15ae878caba43f424eb23234d76eec3e Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Thu, 16 Oct 2025 16:40:44 +0200 Subject: [PATCH 09/11] fix: Prevent MissingInputExceptions when loop restarts Our FastRabbit loop will retart every 6 hours. This results in child processes loosing their STDIN, so they will throw an exceptions. Since this exception is expected, we don't need to log it. --- Classes/Loop.php | 5 +- .../JobCommandInitializationAspect.php | 51 +++++++++++++++++++ Classes/Worker.php | 9 ++++ 3 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 Classes/PreventLoggingOfMissingInput/JobCommandInitializationAspect.php diff --git a/Classes/Loop.php b/Classes/Loop.php index 89fd1fe..b3f9eeb 100644 --- a/Classes/Loop.php +++ b/Classes/Loop.php @@ -45,11 +45,14 @@ public function runMessagesOnWorker(Worker $worker) do { try { $message = $this->queue->waitAndReserve($this->timeout); - $worker->executeMessage($message); + if ($message) { + $worker->executeMessage($message); + } } catch (AMQPTimeoutException $e) { } if ($this->exitAfterTimestamp !== null && time() >= $this->exitAfterTimestamp) { + $worker->shutdownObject(); break; } } while (true); diff --git a/Classes/PreventLoggingOfMissingInput/JobCommandInitializationAspect.php b/Classes/PreventLoggingOfMissingInput/JobCommandInitializationAspect.php new file mode 100644 index 0000000..c193d66 --- /dev/null +++ b/Classes/PreventLoggingOfMissingInput/JobCommandInitializationAspect.php @@ -0,0 +1,51 @@ +mapRequestArgumentsToControllerArguments())')] + public function preventLoggingOfMissingInputExceptions(JoinPointInterface $joinPoint): void + { + $jobCommandController = $joinPoint->getProxy(); + assert($jobCommandController instanceof JobCommandController); + + $reflection = new ClassReflection($jobCommandController); + + $commandMethodName = $reflection + ->getProperty('commandMethodName') + ->getValue($jobCommandController); + + if ($commandMethodName !== 'executeCommand') { + $joinPoint->getAdviceChain()->proceed($joinPoint); + return; + } + + try { + $joinPoint->getAdviceChain()->proceed($joinPoint); + } catch (MissingInputException $e) { + throw new StopCommandException(); + } + } +} diff --git a/Classes/Worker.php b/Classes/Worker.php index dbae934..76b0405 100644 --- a/Classes/Worker.php +++ b/Classes/Worker.php @@ -49,6 +49,15 @@ public function __construct( $this->poolSize = max(0, (int) ($queueSettings['poolSize'] ?? 1)); } + public function shutdownObject() + { + foreach ($this->pool as $process) { + $process->terminate(); + $process->stdin->close(); + } + $this->pool = []; + } + public function prepare(): void { $this->fillPool($this->poolSize); From 7c85ff8159be2b92c6b736173440f4226f84557e Mon Sep 17 00:00:00 2001 From: Stephan Schuler Date: Tue, 28 Oct 2025 17:01:30 +0100 Subject: [PATCH 10/11] feat: Use netlogix/jobqueue-loop library In a previous commit, this package used a pool of React/Process objects for executing jobs, which allowed the children to preload singletons, even if the parent has no Job payload to pass on, yet. This chnage moves the Pool object with its "prefork strategy" to a separate library package, which allows for som code to be reused. --- Classes/Job/ConfigurationFactory.php | 13 +--- Classes/Loop.php | 106 +++++++++++++++++---------- Classes/Worker.php | 93 +++-------------------- bin/fast-rabbit | 18 ++++- composer.json | 1 + 5 files changed, 95 insertions(+), 136 deletions(-) diff --git a/Classes/Job/ConfigurationFactory.php b/Classes/Job/ConfigurationFactory.php index 74187bb..6640b29 100644 --- a/Classes/Job/ConfigurationFactory.php +++ b/Classes/Job/ConfigurationFactory.php @@ -8,6 +8,8 @@ use Neos\Flow\Configuration\ConfigurationManager; use Neos\Flow\Core\Booting\Scripts; use Neos\Utility\ObjectAccess; +use Netlogix\JobQueue\Pool\ProcessFactory; + use function defined; use function preg_replace; use function rtrim; @@ -59,16 +61,7 @@ public function buildJobConfiguration(string $queueName): array ConfigurationManager::CONFIGURATION_TYPE_SETTINGS, 'Neos.Flow' ); - - $command = Scripts::buildPhpCommand( - $flowSettings - ); - $command .= sprintf( - ' %s %s --queue=%s', - escapeshellarg(\FLOW_PATH_FLOW . 'Scripts/flow.php'), - escapeshellarg('flowpack.jobqueue.common:job:execute'), - escapeshellarg($queueName) - ); + $command = (new ProcessFactory)->buildSubprocessCommand(); $workerPool = (array)$this->configurationManager->getConfiguration( ConfigurationManager::CONFIGURATION_TYPE_SETTINGS, diff --git a/Classes/Loop.php b/Classes/Loop.php index b3f9eeb..3509eab 100644 --- a/Classes/Loop.php +++ b/Classes/Loop.php @@ -1,60 +1,86 @@ queue = $queue; - $this->exitAfterTimestamp = $exitAfter > 0 ? time() + $exitAfter : null; - $this->timeout = $exitAfter > 0 ? $exitAfter : null; + public const int SIX_HOURS_IN_SECONDS = 21600; + + public function __construct( + /** + * The Queue to watch + */ + protected RabbitQueue $queue, + + protected readonly Pool $poolObject, + + /** + * Time in seconds after which the loop should exit + */ + protected readonly ?int $exitAfter + ) { } public function runMessagesOnWorker(Worker $worker) { - $worker->prepare(); - do { - try { - $message = $this->queue->waitAndReserve($this->timeout); - if ($message) { - $worker->executeMessage($message); + $this + ->poolObject + ->runLoop(function (Pool $pool) use ($worker) { + $worker->prepare(); + + $runDueJobs = $pool->eventLoop->addPeriodicTimer( + interval: 0.01, + callback: fn () => $this->runDueJob($pool, $worker) + ); + + if ($this->exitAfter) { + $pool->eventLoop->addTimer( + interval: max($this->exitAfter, 1), + callback: function () use ($pool, $runDueJobs) { + $pool->eventLoop->cancelTimer($runDueJobs); + $checkForPoolToClear = $pool->eventLoop->addPeriodicTimer( + interval: 1, + callback: function () use ($pool, &$checkForPoolToClear) { + if (count($pool) === 0) { + $pool->eventLoop->cancelTimer($checkForPoolToClear); + $pool->eventLoop->stop(); + } + } + ); + } + ); } - } catch (AMQPTimeoutException $e) { - } + }); + } - if ($this->exitAfterTimestamp !== null && time() >= $this->exitAfterTimestamp) { - $worker->shutdownObject(); - break; + private function runDueJob(Pool $pool, Worker $worker): void + { + /** + * No parallel execution of multiple messages here, create multiple + * fast rabbit instances connected instead. + * Counting the running instances in the pool only prevents the + * pool from spawning too many workers. + */ + if (count($pool)) { + return; + } + try { + $message = $this->queue->waitAndReserve(10); + if ($message) { + $pool->eventLoop->futureTick(fn () => $worker->executeMessage($message)); } - } while (true); + } catch (AMQPTimeoutException $e) { + } } } diff --git a/Classes/Worker.php b/Classes/Worker.php index 76b0405..cdcc7f9 100644 --- a/Classes/Worker.php +++ b/Classes/Worker.php @@ -8,60 +8,28 @@ use Flowpack\JobQueue\Common\Queue\Message; use Neos\Cache\Frontend\FrontendInterface; use Neos\Flow\Cli\ConsoleOutput; +use Netlogix\JobQueue\Pool\Pool; use React\ChildProcess\Process; -use React\EventLoop; use t3n\JobQueue\RabbitMQ\Queue\RabbitQueue; -use function array_shift; -use function fputs; - -use function max; - -use const STDERR; -use const STDOUT; +use function sha1; final class Worker { protected readonly ConsoleOutput $output; - private readonly EventLoop\LoopInterface $loop; - - /** - * @var Process[] - */ - private array $pool = []; - - /** - * When a child process is assigned a task, the pool is restocked to this - * amount. So this is the number of idle processes at any time. The total - * number of processes can be higher if there are busy ones. - */ - private readonly int $poolSize; - public function __construct( protected readonly string $command, + protected readonly Pool $poolObject, protected readonly RabbitQueue $queue, protected readonly array $queueSettings, protected readonly FrontendInterface $messageCache, protected readonly Lock $lock ) { - $this->loop = EventLoop\Loop::get(); - $this->poolSize = max(0, (int) ($queueSettings['poolSize'] ?? 1)); - } - - public function shutdownObject() - { - foreach ($this->pool as $process) { - $process->terminate(); - $process->stdin->close(); - } - $this->pool = []; } public function prepare(): void { - $this->fillPool($this->poolSize); - $this->output = new ConsoleOutput(); $this->output->outputLine('Watching queue "%s"', [$this->queue->getName()]); } @@ -72,16 +40,19 @@ public function executeMessage(Message $message): void $this->messageCache->set($messageCacheIdentifier, $message); $process = $this->lock->run( - fn () => $this->runFromPool($messageCacheIdentifier) + fn () => $this->poolObject->runPayload(payload: $message->getPayload(), queueName: $this->queue->getName()), ); + assert($process instanceof Process); - if ($process->getExitCode() === 0) { + $process->on(Pool::EVENT_SUCCESS, function () use ($message) { $this->queue->finish($message->getIdentifier()); $this->output->outputLine( 'Successfully executed job "%s"', [$message->getIdentifier()] ); - } else { + }); + + $process->on(Pool::EVENT_ERROR, function () use ($message) { $maximumNumberOfReleases = isset($this->queueSettings['maximumNumberOfReleases']) ? (int) $this->queueSettings['maximumNumberOfReleases'] : JobManager::DEFAULT_MAXIMUM_NUMBER_RELEASES; @@ -111,52 +82,6 @@ public function executeMessage(Message $message): void ] ); } - } - - if ($messageCacheIdentifier !== null) { - $this->messageCache->remove($messageCacheIdentifier); - } - } - - private function createProcess(): Process - { - $process = new Process($this->command); - $timer = $this->loop->addPeriodicTimer(0.01, function () { - // TODO: Add keepalive for database if necessary - }); - $process->on('exit', function () use ($timer) { - $this->loop->cancelTimer($timer); - $this->loop->stop(); }); - $process->start(loop: $this->loop, interval: 0.01); - return $process; - } - - private function runFromPool(string $messageCacheIdentifier): Process - { - $this->fillPool($this->poolSize + 1); // Overfill - $process = array_shift($this->pool); - assert($process instanceof Process); - - $process->stdout->on('data', fn ($chunk) => fputs(STDOUT, $chunk)); - $process->stderr->on('data', fn ($chunk) => fputs(STDERR, $chunk)); - - $process->stdin->write($messageCacheIdentifier . PHP_EOL); - - $this->loop->run(); - - return $process; - } - - private function fillPool(int $poolSize): void - { - $poolSize = max($poolSize, 0); - $this->pool = array_filter( - $this->pool, - fn (Process $process) => $process->isRunning() - ); - while (count($this->pool) < $poolSize) { - $this->pool[] = $this->createProcess(); - } } } diff --git a/bin/fast-rabbit b/bin/fast-rabbit index cf96dd7..eae82f7 100755 --- a/bin/fast-rabbit +++ b/bin/fast-rabbit @@ -1,10 +1,12 @@ #!/usr/bin/env php injectMessageCache($messageCache); + +$worker = new Worker($command, $pool, $queue, $queueSettings, $messageCache, $lock); +$loop = new Loop($queue, $pool, Loop::SIX_HOURS_IN_SECONDS); $loop->runMessagesOnWorker($worker); diff --git a/composer.json b/composer.json index a9fcfdc..49259c8 100644 --- a/composer.json +++ b/composer.json @@ -11,6 +11,7 @@ "require": { "ext-json": "*", "flowpack/jobqueue-common": "^3.0.0", + "netlogix/jobqueue-pool": "^1.0", "netlogix/supervisor": "^1.0", "php-amqplib/php-amqplib": "^2.11", "react/child-process": "^0.6", From 28b77ca424bbdd04240def0265ed54778939e7bb Mon Sep 17 00:00:00 2001 From: Bastian Brunsch <62799991+bastiBepunkt@users.noreply.github.com> Date: Thu, 11 Dec 2025 09:48:41 +0100 Subject: [PATCH 11/11] fix: remove final from method getSingletonClassNamesFromReflection --- Classes/SingletonPreloading/AllSingletonsPreloader.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Classes/SingletonPreloading/AllSingletonsPreloader.php b/Classes/SingletonPreloading/AllSingletonsPreloader.php index b9db783..7993000 100644 --- a/Classes/SingletonPreloading/AllSingletonsPreloader.php +++ b/Classes/SingletonPreloading/AllSingletonsPreloader.php @@ -117,7 +117,7 @@ protected function ignoreClassName(string $className): bool * @return array */ #[Flow\CompileStatic] - final public static function getSingletonClassNamesFromReflection(ObjectManagerInterface $objectManager): array + public static function getSingletonClassNamesFromReflection(ObjectManagerInterface $objectManager): array { $reflection = $objectManager->get(ReflectionService::class); assert($reflection instanceof ReflectionService);