Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Model/Entity/SchedulerRow.php
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ protected function _getJobTask(): ?string {
protected function _getJobData(): array {
$param = [];
if ($this->param) {
$decoded = json_decode($this->param, true, JSON_THROW_ON_ERROR);
$decoded = json_decode($this->param, true, 512, JSON_THROW_ON_ERROR);
// Validation rejects scalar/null payloads at save time, but a row
// inserted directly via SQL or through a marshalling path that
// bypasses validation could still reach this method. Falling
Expand Down
51 changes: 49 additions & 2 deletions src/Model/Table/SchedulerRowsTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public function run(SchedulerRow $row): bool {
return $this->getConnection()->transactional(function () use ($row, $config, $oldLastRun): bool {
/** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */
$queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs');
if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) {
if (!$row->allow_concurrent && $this->isBlockedByActiveJob($queuedJobsTable, $row)) {
return false;
}

Expand Down Expand Up @@ -600,7 +600,7 @@ public function runOnce(SchedulerRow $row, array $overrides = []): bool {
return $this->getConnection()->transactional(function () use ($row, $jobData, $config, $overrides): bool {
/** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */
$queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs');
if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) {
if (!$row->allow_concurrent && $this->isBlockedByActiveJob($queuedJobsTable, $row)) {
return false;
}

Expand Down Expand Up @@ -632,6 +632,53 @@ public function runOnce(SchedulerRow $row, array $overrides = []): bool {
});
}

/**
* Whether a non-concurrent row is blocked by a genuinely in-flight job.
*
* Plain `QueuedJobsTable::isQueued()` treats every `completed IS NULL`
* row as blocking — including a job a worker fetched and then died on
* (OOM, timeout, kill) without ever completing or failing it. Such a row
* stays `completed IS NULL` forever, so a non-concurrent schedule would
* wedge permanently behind it: both the cron tick and the admin "Run"
* action keep returning false ("could not be added to the queue").
*
* A job fetched longer ago than the queue's own requeue timeout is one
* the queue itself would re-offer to a worker, so it is presumed dead and
* no longer counts as blocking here. When `Queue.defaultRequeueTimeout`
* is not configured we fall back to the original strict semantics so
* behaviour is unchanged for installs that have not opted into a timeout.
*
* (Mirrors the `$staleTimeout` parameter added to
* `QueuedJobsTable::isQueued()` in dereuromark/cakephp-queue#503; once a
* release carrying that parameter is required, this can collapse to a
* single `isQueued($ref, $task, $staleTimeout)` call.)
*
* @param \Queue\Model\Table\QueuedJobsTable $queuedJobsTable
* @param \QueueScheduler\Model\Entity\SchedulerRow $row
*
* @return bool
*/
protected function isBlockedByActiveJob(object $queuedJobsTable, SchedulerRow $row): bool {
$staleTimeout = Configure::read('Queue.defaultRequeueTimeout');
if (!is_numeric($staleTimeout) || (int)$staleTimeout <= 0) {
return $queuedJobsTable->isQueued($row->job_reference, $row->job_task);
}

$conditions = [
'reference' => $row->job_reference,
'completed IS' => null,
'OR' => [
'fetched IS' => null,
'fetched >=' => (new DateTime())->subSeconds((int)$staleTimeout),
],
];
if ($row->job_task !== null) {
$conditions['job_task'] = $row->job_task;
}

return (bool)$queuedJobsTable->find()->where($conditions)->select(['id'])->first();
}

/**
* @param mixed $value
* @param array $context
Expand Down
82 changes: 82 additions & 0 deletions tests/TestCase/Model/Table/SchedulerRowsTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,88 @@ public function testWhitespacePaddedEmptyJsonParamIsNormalized(): void {
*
* @return void
*/
/**
* A non-concurrent row must NOT stay wedged behind a job that a worker
* fetched and then died on. Once that fetch is older than the queue's
* requeue timeout the job is presumed dead, so the next tick dispatches
* again instead of holding back forever.
*
* @return void
*/
public function testRunIgnoresStaleQueuedJob(): void {
$this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']);
Configure::write('Queue.defaultRequeueTimeout', 60);

$row = $this->SchedulerRows->newEntity([
'name' => 'stale-target',
'type' => SchedulerRow::TYPE_QUEUE_TASK,
'content' => ExampleTask::class,
'job_config' => null,
'job_data' => '',
'frequency' => '+1 hour',
'allow_concurrent' => false,
]);
$this->SchedulerRows->saveOrFail($row);
$row = $this->SchedulerRows->get($row->id);

$this->assertTrue($this->SchedulerRows->run($row));
$row = $this->SchedulerRows->get($row->id);

$queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs');
$this->assertSame(1, $queuedJobsTable->find()->count());

// Mark the existing job as fetched 5 minutes ago and never completed:
// a worker grabbed it and vanished.
$queuedJob = $queuedJobsTable->get($row->last_queued_job_id);
$queuedJob->fetched = (new DateTime())->subSeconds(300);
$queuedJob->completed = null;
$queuedJobsTable->saveOrFail($queuedJob);

// Next tick must dispatch despite the dead job (300s > 60s timeout).
$this->assertTrue($this->SchedulerRows->run($row));
$this->assertSame(2, $queuedJobsTable->find()->count());

Configure::delete('Queue.defaultRequeueTimeout');
}

/**
* A genuinely in-flight job (fetched within the requeue window, or not
* yet fetched at all) must still block a non-concurrent row.
*
* @return void
*/
public function testRunBlocksOnFreshQueuedJob(): void {
$this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']);
Configure::write('Queue.defaultRequeueTimeout', 60);

$row = $this->SchedulerRows->newEntity([
'name' => 'fresh-target',
'type' => SchedulerRow::TYPE_QUEUE_TASK,
'content' => ExampleTask::class,
'job_config' => null,
'job_data' => '',
'frequency' => '+1 hour',
'allow_concurrent' => false,
]);
$this->SchedulerRows->saveOrFail($row);
$row = $this->SchedulerRows->get($row->id);

$this->assertTrue($this->SchedulerRows->run($row));
$row = $this->SchedulerRows->get($row->id);

$queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs');
$queuedJob = $queuedJobsTable->get($row->last_queued_job_id);
$queuedJob->fetched = (new DateTime())->subSeconds(10);
$queuedJob->completed = null;
$queuedJobsTable->saveOrFail($queuedJob);

// Fetched 10s ago, within the 60s window => still in flight => blocked.
$this->assertFalse($this->SchedulerRows->run($row));
$this->assertSame(1, $queuedJobsTable->find()->count());

Configure::delete('Queue.defaultRequeueTimeout');
}

public function testRunIsIdempotentAcrossOverlappingTicks(): void {
// Load alongside Tools + QueueScheduler so the plugin registry matches
// what Application::bootstrap() loads in the test app. Loading only
Expand Down
Loading