diff --git a/src/Model/Entity/SchedulerRow.php b/src/Model/Entity/SchedulerRow.php index 0f87ace..9248f3f 100644 --- a/src/Model/Entity/SchedulerRow.php +++ b/src/Model/Entity/SchedulerRow.php @@ -557,7 +557,7 @@ protected function _getJobTask(): ?string { protected function _getJobData(): array { $param = []; if ($this->param) { - $decoded = json_decode($this->param, true, JSON_THROW_ON_ERROR); + $decoded = json_decode($this->param, true, 512, JSON_THROW_ON_ERROR); // Validation rejects scalar/null payloads at save time, but a row // inserted directly via SQL or through a marshalling path that // bypasses validation could still reach this method. Falling diff --git a/src/Model/Table/SchedulerRowsTable.php b/src/Model/Table/SchedulerRowsTable.php index 8512214..19d2046 100644 --- a/src/Model/Table/SchedulerRowsTable.php +++ b/src/Model/Table/SchedulerRowsTable.php @@ -505,7 +505,7 @@ public function run(SchedulerRow $row): bool { return $this->getConnection()->transactional(function () use ($row, $config, $oldLastRun): bool { /** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */ $queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs'); - if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) { + if (!$row->allow_concurrent && $this->isBlockedByActiveJob($queuedJobsTable, $row)) { return false; } @@ -600,7 +600,7 @@ public function runOnce(SchedulerRow $row, array $overrides = []): bool { return $this->getConnection()->transactional(function () use ($row, $jobData, $config, $overrides): bool { /** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */ $queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs'); - if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) { + if (!$row->allow_concurrent && $this->isBlockedByActiveJob($queuedJobsTable, $row)) { return false; } @@ -632,6 +632,53 @@ public function runOnce(SchedulerRow $row, array $overrides = []): bool { }); } + /** + * Whether a non-concurrent row is blocked by a genuinely in-flight job. + * + * Plain `QueuedJobsTable::isQueued()` treats every `completed IS NULL` + * row as blocking — including a job a worker fetched and then died on + * (OOM, timeout, kill) without ever completing or failing it. Such a row + * stays `completed IS NULL` forever, so a non-concurrent schedule would + * wedge permanently behind it: both the cron tick and the admin "Run" + * action keep returning false ("could not be added to the queue"). + * + * A job fetched longer ago than the queue's own requeue timeout is one + * the queue itself would re-offer to a worker, so it is presumed dead and + * no longer counts as blocking here. When `Queue.defaultRequeueTimeout` + * is not configured we fall back to the original strict semantics so + * behaviour is unchanged for installs that have not opted into a timeout. + * + * (Mirrors the `$staleTimeout` parameter added to + * `QueuedJobsTable::isQueued()` in dereuromark/cakephp-queue#503; once a + * release carrying that parameter is required, this can collapse to a + * single `isQueued($ref, $task, $staleTimeout)` call.) + * + * @param \Queue\Model\Table\QueuedJobsTable $queuedJobsTable + * @param \QueueScheduler\Model\Entity\SchedulerRow $row + * + * @return bool + */ + protected function isBlockedByActiveJob(object $queuedJobsTable, SchedulerRow $row): bool { + $staleTimeout = Configure::read('Queue.defaultRequeueTimeout'); + if (!is_numeric($staleTimeout) || (int)$staleTimeout <= 0) { + return $queuedJobsTable->isQueued($row->job_reference, $row->job_task); + } + + $conditions = [ + 'reference' => $row->job_reference, + 'completed IS' => null, + 'OR' => [ + 'fetched IS' => null, + 'fetched >=' => (new DateTime())->subSeconds((int)$staleTimeout), + ], + ]; + if ($row->job_task !== null) { + $conditions['job_task'] = $row->job_task; + } + + return (bool)$queuedJobsTable->find()->where($conditions)->select(['id'])->first(); + } + /** * @param mixed $value * @param array $context diff --git a/tests/TestCase/Model/Table/SchedulerRowsTableTest.php b/tests/TestCase/Model/Table/SchedulerRowsTableTest.php index 9165c30..3378af6 100644 --- a/tests/TestCase/Model/Table/SchedulerRowsTableTest.php +++ b/tests/TestCase/Model/Table/SchedulerRowsTableTest.php @@ -625,6 +625,88 @@ public function testWhitespacePaddedEmptyJsonParamIsNormalized(): void { * * @return void */ + /** + * A non-concurrent row must NOT stay wedged behind a job that a worker + * fetched and then died on. Once that fetch is older than the queue's + * requeue timeout the job is presumed dead, so the next tick dispatches + * again instead of holding back forever. + * + * @return void + */ + public function testRunIgnoresStaleQueuedJob(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + Configure::write('Queue.defaultRequeueTimeout', 60); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 'stale-target', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + $this->assertSame(1, $queuedJobsTable->find()->count()); + + // Mark the existing job as fetched 5 minutes ago and never completed: + // a worker grabbed it and vanished. + $queuedJob = $queuedJobsTable->get($row->last_queued_job_id); + $queuedJob->fetched = (new DateTime())->subSeconds(300); + $queuedJob->completed = null; + $queuedJobsTable->saveOrFail($queuedJob); + + // Next tick must dispatch despite the dead job (300s > 60s timeout). + $this->assertTrue($this->SchedulerRows->run($row)); + $this->assertSame(2, $queuedJobsTable->find()->count()); + + Configure::delete('Queue.defaultRequeueTimeout'); + } + + /** + * A genuinely in-flight job (fetched within the requeue window, or not + * yet fetched at all) must still block a non-concurrent row. + * + * @return void + */ + public function testRunBlocksOnFreshQueuedJob(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + Configure::write('Queue.defaultRequeueTimeout', 60); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 'fresh-target', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + $queuedJob = $queuedJobsTable->get($row->last_queued_job_id); + $queuedJob->fetched = (new DateTime())->subSeconds(10); + $queuedJob->completed = null; + $queuedJobsTable->saveOrFail($queuedJob); + + // Fetched 10s ago, within the 60s window => still in flight => blocked. + $this->assertFalse($this->SchedulerRows->run($row)); + $this->assertSame(1, $queuedJobsTable->find()->count()); + + Configure::delete('Queue.defaultRequeueTimeout'); + } + public function testRunIsIdempotentAcrossOverlappingTicks(): void { // Load alongside Tools + QueueScheduler so the plugin registry matches // what Application::bootstrap() loads in the test app. Loading only