From b5e2b006404fc1da75f9f728bd86975e17a89422 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Thu, 28 May 2026 15:07:46 +0200 Subject: [PATCH 1/2] Don't wedge a non-concurrent schedule behind a dead job MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A non-concurrent row (allow_concurrent = false) gates its next dispatch on QueuedJobsTable::isQueued(), which counts any row with `completed IS NULL`. If a worker fetches the scheduled job and then dies (OOM, timeout, kill) without completing or failing it, that row stays `completed IS NULL` forever — so every subsequent tick AND the admin "Run" action keep returning false ("could not be added to the queue"). The schedule is permanently stuck behind a job that will never finish. run() and runOnce() now go through isBlockedByActiveJob(): a job fetched longer ago than the queue's own requeue timeout (`Queue.defaultRequeueTimeout`) is one the queue would re-offer to a worker, so it is presumed dead and no longer blocks dispatch. Not-yet-fetched jobs and jobs fetched within the window still block, so genuine concurrency protection is unchanged. When the timeout is not configured the original strict isQueued() semantics are kept, so behaviour is unchanged for installs that have not opted in. Mirrors the optional stale-timeout added to QueuedJobsTable::isQueued() in dereuromark/cakephp-queue#503; once a release carrying that parameter is required this helper can collapse to a single isQueued($ref, $task, $staleTimeout) call. --- src/Model/Table/SchedulerRowsTable.php | 51 +++++++++++- .../Model/Table/SchedulerRowsTableTest.php | 82 +++++++++++++++++++ 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/src/Model/Table/SchedulerRowsTable.php b/src/Model/Table/SchedulerRowsTable.php index 8512214..19d2046 100644 --- a/src/Model/Table/SchedulerRowsTable.php +++ b/src/Model/Table/SchedulerRowsTable.php @@ -505,7 +505,7 @@ public function run(SchedulerRow $row): bool { return $this->getConnection()->transactional(function () use ($row, $config, $oldLastRun): bool { /** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */ $queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs'); - if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) { + if (!$row->allow_concurrent && $this->isBlockedByActiveJob($queuedJobsTable, $row)) { return false; } @@ -600,7 +600,7 @@ public function runOnce(SchedulerRow $row, array $overrides = []): bool { return $this->getConnection()->transactional(function () use ($row, $jobData, $config, $overrides): bool { /** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */ $queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs'); - if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) { + if (!$row->allow_concurrent && $this->isBlockedByActiveJob($queuedJobsTable, $row)) { return false; } @@ -632,6 +632,53 @@ public function runOnce(SchedulerRow $row, array $overrides = []): bool { }); } + /** + * Whether a non-concurrent row is blocked by a genuinely in-flight job. + * + * Plain `QueuedJobsTable::isQueued()` treats every `completed IS NULL` + * row as blocking — including a job a worker fetched and then died on + * (OOM, timeout, kill) without ever completing or failing it. Such a row + * stays `completed IS NULL` forever, so a non-concurrent schedule would + * wedge permanently behind it: both the cron tick and the admin "Run" + * action keep returning false ("could not be added to the queue"). + * + * A job fetched longer ago than the queue's own requeue timeout is one + * the queue itself would re-offer to a worker, so it is presumed dead and + * no longer counts as blocking here. When `Queue.defaultRequeueTimeout` + * is not configured we fall back to the original strict semantics so + * behaviour is unchanged for installs that have not opted into a timeout. + * + * (Mirrors the `$staleTimeout` parameter added to + * `QueuedJobsTable::isQueued()` in dereuromark/cakephp-queue#503; once a + * release carrying that parameter is required, this can collapse to a + * single `isQueued($ref, $task, $staleTimeout)` call.) + * + * @param \Queue\Model\Table\QueuedJobsTable $queuedJobsTable + * @param \QueueScheduler\Model\Entity\SchedulerRow $row + * + * @return bool + */ + protected function isBlockedByActiveJob(object $queuedJobsTable, SchedulerRow $row): bool { + $staleTimeout = Configure::read('Queue.defaultRequeueTimeout'); + if (!is_numeric($staleTimeout) || (int)$staleTimeout <= 0) { + return $queuedJobsTable->isQueued($row->job_reference, $row->job_task); + } + + $conditions = [ + 'reference' => $row->job_reference, + 'completed IS' => null, + 'OR' => [ + 'fetched IS' => null, + 'fetched >=' => (new DateTime())->subSeconds((int)$staleTimeout), + ], + ]; + if ($row->job_task !== null) { + $conditions['job_task'] = $row->job_task; + } + + return (bool)$queuedJobsTable->find()->where($conditions)->select(['id'])->first(); + } + /** * @param mixed $value * @param array $context diff --git a/tests/TestCase/Model/Table/SchedulerRowsTableTest.php b/tests/TestCase/Model/Table/SchedulerRowsTableTest.php index 9165c30..3378af6 100644 --- a/tests/TestCase/Model/Table/SchedulerRowsTableTest.php +++ b/tests/TestCase/Model/Table/SchedulerRowsTableTest.php @@ -625,6 +625,88 @@ public function testWhitespacePaddedEmptyJsonParamIsNormalized(): void { * * @return void */ + /** + * A non-concurrent row must NOT stay wedged behind a job that a worker + * fetched and then died on. Once that fetch is older than the queue's + * requeue timeout the job is presumed dead, so the next tick dispatches + * again instead of holding back forever. + * + * @return void + */ + public function testRunIgnoresStaleQueuedJob(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + Configure::write('Queue.defaultRequeueTimeout', 60); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 'stale-target', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + $this->assertSame(1, $queuedJobsTable->find()->count()); + + // Mark the existing job as fetched 5 minutes ago and never completed: + // a worker grabbed it and vanished. + $queuedJob = $queuedJobsTable->get($row->last_queued_job_id); + $queuedJob->fetched = (new DateTime())->subSeconds(300); + $queuedJob->completed = null; + $queuedJobsTable->saveOrFail($queuedJob); + + // Next tick must dispatch despite the dead job (300s > 60s timeout). + $this->assertTrue($this->SchedulerRows->run($row)); + $this->assertSame(2, $queuedJobsTable->find()->count()); + + Configure::delete('Queue.defaultRequeueTimeout'); + } + + /** + * A genuinely in-flight job (fetched within the requeue window, or not + * yet fetched at all) must still block a non-concurrent row. + * + * @return void + */ + public function testRunBlocksOnFreshQueuedJob(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + Configure::write('Queue.defaultRequeueTimeout', 60); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 'fresh-target', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + $queuedJob = $queuedJobsTable->get($row->last_queued_job_id); + $queuedJob->fetched = (new DateTime())->subSeconds(10); + $queuedJob->completed = null; + $queuedJobsTable->saveOrFail($queuedJob); + + // Fetched 10s ago, within the 60s window => still in flight => blocked. + $this->assertFalse($this->SchedulerRows->run($row)); + $this->assertSame(1, $queuedJobsTable->find()->count()); + + Configure::delete('Queue.defaultRequeueTimeout'); + } + public function testRunIsIdempotentAcrossOverlappingTicks(): void { // Load alongside Tools + QueueScheduler so the plugin registry matches // what Application::bootstrap() loads in the test app. Loading only From 34a928ee99eb1180dd2135bfd66d49b1944c84eb Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Thu, 28 May 2026 15:10:57 +0200 Subject: [PATCH 2/2] Fix json_decode flags argument position JSON_THROW_ON_ERROR was passed as the $depth parameter (#3) instead of $flags (#4). Add the default depth of 512 so the flag lands correctly. --- src/Model/Entity/SchedulerRow.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Model/Entity/SchedulerRow.php b/src/Model/Entity/SchedulerRow.php index 0f87ace..9248f3f 100644 --- a/src/Model/Entity/SchedulerRow.php +++ b/src/Model/Entity/SchedulerRow.php @@ -557,7 +557,7 @@ protected function _getJobTask(): ?string { protected function _getJobData(): array { $param = []; if ($this->param) { - $decoded = json_decode($this->param, true, JSON_THROW_ON_ERROR); + $decoded = json_decode($this->param, true, 512, JSON_THROW_ON_ERROR); // Validation rejects scalar/null payloads at save time, but a row // inserted directly via SQL or through a marshalling path that // bypasses validation could still reach this method. Falling