diff --git a/config/Migrations/20260528000000_QueueSchedulerConsecutiveFailures.php b/config/Migrations/20260528000000_QueueSchedulerConsecutiveFailures.php new file mode 100644 index 0000000..a4d234c --- /dev/null +++ b/config/Migrations/20260528000000_QueueSchedulerConsecutiveFailures.php @@ -0,0 +1,35 @@ +table('queue_scheduler_rows') + ->addColumn('consecutive_failures', 'integer', [ + 'default' => 0, + 'null' => false, + 'signed' => false, + 'comment' => 'Number of consecutive terminally-failed (aborted) dispatches without ' + . 'an intervening success. Reset to 0 on a successful/fresh dispatch.', + ]) + ->update(); + } + +} diff --git a/config/app.example.php b/config/app.example.php index 901e54c..5426c8a 100644 --- a/config/app.example.php +++ b/config/app.example.php @@ -72,6 +72,17 @@ // NOT coordinate across hosts — running loop mode on multiple hosts can still // produce overlapping schedules. //'lockPath' => TMP . 'queue_scheduler.lock', + + // Backoff for repeatedly-failing non-concurrent rows. When the row's + // previously dispatched job terminally failed (queue status "aborted"), + // the next tick reruns that same job in place instead of queuing a new + // one — so a broken task does not pile up a failed job every interval. + // After this many consecutive reruns (without an intervening success) + // the row is disabled and a `QueueScheduler.Row.disabled` event is fired + // so the host app can alert. Re-enabling a disabled row resets the count. + // 0 (default) = unlimited reruns, never auto-disable. Requires the queue + // to record terminal "aborted" state. + //'maxConsecutiveFailures' => 0, ], // Icon configuration for the backend UI (optional, but recommended for better UX) // Without this, the UI will use Font Awesome icons from CDN when using the standalone layout. diff --git a/docs/README.md b/docs/README.md index 46e60f5..cd165a5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -365,6 +365,24 @@ This is independent of `QueueScheduler.standalone` (which controls whether the a `QueueScheduler.dashboardAutoRefresh` (integer, seconds; default `0`) sets a meta-refresh interval on the admin dashboard so it polls itself for fresh state without manual reload. `0` disables auto-refresh; a typical value is `30` or `60`. +### Failure backoff for non-concurrent rows + +When a non-concurrent row's previously dispatched job *terminally* fails — the queue marks it `aborted` after exhausting its retries — the next tick **reruns that same job in place** instead of queuing a brand-new one. Without this, a persistently-broken scheduled task would leave one failed job behind every interval (e.g. ~1440 rows/day for an every-minute task). Reusing the row keeps it to a single, recycled job. + +Still-retrying jobs are untouched: while the queue has retries left the job is genuinely in flight and the next tick is held back as usual, so there is no early re-dispatch. + +`QueueScheduler.maxConsecutiveFailures` (integer; default `0`) caps how many consecutive reruns (without an intervening success) are granted before the row is **disabled** and a `QueueScheduler.Row.disabled` event is dispatched so the host app can alert: + +```php +$this->getEventManager()->on('QueueScheduler.Row.disabled', function ($event) { + $row = $event->getData('row'); + $failures = $event->getData('consecutiveFailures'); + // notify ops… +}); +``` + +So `1` reruns the job once and disables on the next abort, `3` grants three reruns, and `0` (default) means unlimited reruns and never auto-disable. A successful (or fresh, non-aborted) dispatch resets the counter, and re-enabling a disabled row also resets it — so the row gets a fresh round of reruns rather than re-disabling immediately, regardless of the cap. This relies on the queue recording terminal `aborted` state (cakephp-queue 8.15+); on older queue releases no job is ever marked aborted, so the feature is simply dormant and behaviour is unchanged. + ### Scheduler health indicator The admin index page shows a small pill next to the page header indicating whether cron is actively invoking the scheduler: diff --git a/src/Model/Entity/SchedulerRow.php b/src/Model/Entity/SchedulerRow.php index 0f87ace..78a3f94 100644 --- a/src/Model/Entity/SchedulerRow.php +++ b/src/Model/Entity/SchedulerRow.php @@ -31,6 +31,7 @@ * @property \Cake\I18n\DateTime|null $created * @property \Cake\I18n\DateTime|null $modified * @property bool $enabled + * @property int $consecutive_failures * @property int|null $last_queued_job_id * @property \Queue\Model\Entity\QueuedJob|null $last_queued_job * @property-read string|null $job_task @@ -557,7 +558,7 @@ protected function _getJobTask(): ?string { protected function _getJobData(): array { $param = []; if ($this->param) { - $decoded = json_decode($this->param, true, JSON_THROW_ON_ERROR); + $decoded = json_decode($this->param, true, 512, JSON_THROW_ON_ERROR); // Validation rejects scalar/null payloads at save time, but a row // inserted directly via SQL or through a marshalling path that // bypasses validation could still reach this method. Falling diff --git a/src/Model/Table/SchedulerRowsTable.php b/src/Model/Table/SchedulerRowsTable.php index 8512214..13cd9bc 100644 --- a/src/Model/Table/SchedulerRowsTable.php +++ b/src/Model/Table/SchedulerRowsTable.php @@ -18,6 +18,7 @@ use DateInterval; use Exception; use InvalidArgumentException; +use Queue\Model\Table\QueuedJobsTable; use Queue\Queue\Task; use QueueScheduler\Model\Entity\SchedulerRow; use RuntimeException; @@ -439,6 +440,14 @@ protected function adjustCakeCommand(ArrayObject $data): void { * @return void */ public function beforeSave(EventInterface $event, EntityInterface $entity, ArrayObject $options): void { + // Re-enabling a row clears any backoff streak so it gets a fresh round of + // reruns. Without this, a row disabled at the cap would re-disable on the + // very next tick (its consecutive_failures is still at the cap) without + // ever retrying — acute when maxConsecutiveFailures is 1. + if ($entity->isDirty('enabled') && $entity->enabled) { + $entity->consecutive_failures = 0; + } + if ( $entity->next_run === null || $entity->isDirty('frequency') @@ -502,20 +511,120 @@ public function run(SchedulerRow $row): bool { $config['reference'] = $row->job_reference; $oldLastRun = $row->last_run; - return $this->getConnection()->transactional(function () use ($row, $config, $oldLastRun): bool { + // $dispatched is what run() reports to the caller: true only when a job + // was actually queued or reran. The transactional closure returns true + // whenever its writes should COMMIT (a dispatch OR a backoff-disable) and + // false only to roll back (hold-back / lost compare-and-swap). The + // disable case therefore commits its `enabled = false` write while still + // reporting "nothing dispatched" to the caller — returning false from the + // closure would roll the disable back. + $dispatched = false; + // Set inside the closure when the backoff cap disables the row. The + // disable write commits with the transaction; the alert event + log fire + // only AFTER commit (below) so a listener never runs mid-transaction and + // can rely on the disabled state being durable. + $disabled = false; + + $this->getConnection()->transactional(function () use ($row, $config, $oldLastRun, &$dispatched, &$disabled): bool { /** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */ $queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs'); - if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) { - return false; + + $abortedJobId = null; + if (!$row->allow_concurrent) { + // If the previous dispatch terminally failed (queue status = + // aborted), rerun that same row in place rather than piling up a + // fresh failed job every tick (see #1). Checked BEFORE the + // in-flight hold-back so this works regardless of whether the + // installed queue release already excludes aborted rows from + // isQueued() (cakephp-queue#504) — an aborted job is terminal + // and never blocks the next dispatch. + $abortedJobId = $this->abortedJobId($queuedJobsTable, $row); + + // A genuinely in-flight job (pending/running/retrying) still + // blocks the next dispatch. + if ($abortedJobId === null && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) { + return false; + } + } + + $now = new DateTime(); + + if ($abortedJobId !== null) { + $max = (int)Configure::read('QueueScheduler.maxConsecutiveFailures', 0); + + // #2 backoff: once we have already recycled this row's job $max + // times in a row without an intervening success and it is STILL + // aborting, stop retrying — disable the row and alert. The check is + // on the count BEFORE this tick, so $max is the number of reruns + // granted before giving up. A re-enable resets the counter (see + // beforeSave()), granting another $max reruns; recovery therefore + // works on every queue version because it flows through the rerun + // path below rather than depending on isQueued() excluding aborted. + if ($max > 0 && $row->consecutive_failures >= $max) { + $updated = $this->updateAll( + ['enabled' => false], + ['id' => $row->id, 'last_run IS' => $oldLastRun], + ); + if ($updated === 0) { + return false; + } + + $row->enabled = false; + $disabled = true; + + // Commit the disable; the alert event + log fire after commit + // (below). Report "not dispatched" to the caller. + return true; + } + + // #1 rerun in place: recycle the aborted job rather than piling up + // a fresh failed job every tick. Claim the tick first (compare-and- + // swap on last_run); only then mutate the job, so a lost race never + // resets a row another tick is about to reuse. last_queued_job_id + // already points at this job. + $failures = $row->consecutive_failures + 1; + $updated = $this->updateAll( + ['last_run' => $now, 'consecutive_failures' => $failures], + ['id' => $row->id, 'last_run IS' => $oldLastRun], + ); + if ($updated === 0) { + return false; + } + + // Recycle the aborted job in place. reset() clears attempts / + // fetched / etc. using the installed queue schema (so we stay + // compatible with older queue versions). Its guards always hold + // for a terminal aborted job: completed IS null, attempts > 0, and + // notbefore is in the past (the final failed attempt already ran). + $queuedJobsTable->reset($abortedJobId); + // reset() in the currently-supported queue versions does not clear + // the terminal `status`, so drop it here; otherwise abortedJobId() + // would still match the job next tick and the rerun would not take. + // cakephp-queue#505 makes reset() clear `status` itself — once the + // min queue requirement includes that release, this line can go. + $queuedJobsTable->updateAll(['status' => null], ['id' => $abortedJobId]); + + $row->last_run = $now; + $row->consecutive_failures = $failures; + $row->next_run = $row->calculateNextRun(); + $this->updateAll( + ['next_run' => $row->next_run], + ['id' => $row->id], + ); + + $dispatched = true; + + return true; } $queuedJob = $queuedJobsTable->createJob($row->job_task, $row->job_data, $config); - $now = new DateTime(); $updated = $this->updateAll( [ 'last_run' => $now, 'last_queued_job_id' => $queuedJob->id, + // Fresh dispatch (prior succeeded or none): clear the streak. + 'consecutive_failures' => 0, ], [ 'id' => $row->id, @@ -533,6 +642,7 @@ public function run(SchedulerRow $row): bool { $row->last_run = $now; $row->last_queued_job_id = $queuedJob->id; + $row->consecutive_failures = 0; // next_run is recomputed by beforeSave when last_run is dirty; the // updateAll bypassed that, so do it explicitly to keep persistent // state consistent without re-running the full save pipeline. @@ -542,8 +652,56 @@ public function run(SchedulerRow $row): bool { ['id' => $row->id], ); + $dispatched = true; + return true; }); + + if ($disabled) { + $this->dispatchEvent('QueueScheduler.Row.disabled', [ + 'row' => $row, + 'consecutiveFailures' => $row->consecutive_failures, + ]); + Log::warning(sprintf( + 'QueueScheduler: row #%d (%s) disabled after %d consecutive failures.', + (int)$row->id, + (string)$row->name, + $row->consecutive_failures, + )); + } + + return $dispatched; + } + + /** + * Returns the id of the row's last dispatched job if it terminally failed + * (queue status = aborted, not completed), else null. + * + * Pairs with the terminal "aborted" state persisted by cakephp-queue#504. + * The literal status string is used so this stays a no-op on a queue + * release that does not yet persist it (no aborted rows ⇒ always null). + * + * @param \Queue\Model\Table\QueuedJobsTable $queuedJobsTable + * @param \QueueScheduler\Model\Entity\SchedulerRow $row + * + * @return int|null + */ + protected function abortedJobId(QueuedJobsTable $queuedJobsTable, SchedulerRow $row): ?int { + if (!$row->last_queued_job_id) { + return null; + } + + /** @var \Queue\Model\Entity\QueuedJob|null $job */ + $job = $queuedJobsTable->find() + ->where([ + 'id' => $row->last_queued_job_id, + 'completed IS' => null, + 'status' => 'aborted', + ]) + ->select(['id']) + ->first(); + + return $job?->id; } /** diff --git a/tests/Fixture/SchedulerRowsFixture.php b/tests/Fixture/SchedulerRowsFixture.php index 2d734f5..2f2dade 100644 --- a/tests/Fixture/SchedulerRowsFixture.php +++ b/tests/Fixture/SchedulerRowsFixture.php @@ -27,6 +27,7 @@ class SchedulerRowsFixture extends TestFixture { 'last_queued_job_id' => ['type' => 'integer', 'length' => null, 'unsigned' => true, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null], 'allow_concurrent' => ['type' => 'boolean', 'length' => null, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null], 'enabled' => ['type' => 'boolean', 'length' => null, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null], + 'consecutive_failures' => ['type' => 'integer', 'length' => null, 'unsigned' => true, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null], 'window_start_time' => ['type' => 'time', 'length' => null, 'precision' => null, 'null' => true, 'default' => null, 'comment' => ''], 'window_end_time' => ['type' => 'time', 'length' => null, 'precision' => null, 'null' => true, 'default' => null, 'comment' => ''], 'window_days_of_week' => ['type' => 'string', 'length' => 32, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null], diff --git a/tests/TestCase/Model/Table/SchedulerRowsTableTest.php b/tests/TestCase/Model/Table/SchedulerRowsTableTest.php index 9165c30..0137a65 100644 --- a/tests/TestCase/Model/Table/SchedulerRowsTableTest.php +++ b/tests/TestCase/Model/Table/SchedulerRowsTableTest.php @@ -607,11 +607,213 @@ public function testWhitespacePaddedEmptyJsonParamIsNormalized(): void { } /** - * Real values pass through unchanged — the normalizer only fires on - * decoded-empty arrays. + * #1: when the previous dispatch terminally aborted, the next tick reruns + * that same job in place instead of creating a new one — so a broken task + * does not pile up a fresh failed row every interval. The failure counter + * increments. * * @return void */ + public function testRunRerunsAbortedJobInsteadOfCreatingNew(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 'aborted-rerun', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + $this->assertSame(1, $queuedJobsTable->find()->count()); + $jobId = $row->last_queued_job_id; + + // Simulate the queue marking the dispatched job terminally failed. + $queuedJobsTable->updateAll( + ['status' => 'aborted', 'attempts' => 5, 'completed' => null], + ['id' => $jobId], + ); + + // Next tick reruns the same job: no new row, counter incremented. + $this->assertTrue($this->SchedulerRows->run($row)); + + $this->assertSame(1, $queuedJobsTable->find()->count(), 'no new job row'); + $reloadedRow = $this->SchedulerRows->get($row->id); + $this->assertSame($jobId, $reloadedRow->last_queued_job_id, 'same job reused'); + $this->assertSame(1, $reloadedRow->consecutive_failures); + + $reloadedJob = $queuedJobsTable->get($jobId); + $this->assertSame(0, $reloadedJob->attempts, 'reset() cleared attempts'); + $this->assertNull($reloadedJob->status, 'reset() cleared the aborted status'); + } + + /** + * #2: maxConsecutiveFailures caps how many reruns are granted. With a cap of + * 1 the first abort is rerun in place, and the next abort (still no success) + * disables the row instead of being rerun again — and no extra job is queued. + * + * @return void + */ + public function testRunDisablesRowAfterMaxConsecutiveFailures(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + Configure::write('QueueScheduler.maxConsecutiveFailures', 1); + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 'aborted-backoff', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + $jobId = $row->last_queued_job_id; + + $queuedJobsTable->updateAll( + ['status' => 'aborted', 'attempts' => 5, 'completed' => null], + ['id' => $jobId], + ); + + // First abort: streak 0 < cap 1 => rerun in place, streak -> 1. + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + $this->assertSame(1, $row->consecutive_failures); + + // The recycled job aborts again; streak 1 >= cap 1 => disable, no rerun. + $queuedJobsTable->updateAll( + ['status' => 'aborted', 'attempts' => 5, 'completed' => null], + ['id' => $jobId], + ); + $this->assertFalse($this->SchedulerRows->run($row)); + + $reloadedRow = $this->SchedulerRows->get($row->id); + $this->assertFalse($reloadedRow->enabled, 'row disabled after the rerun budget'); + $this->assertSame(1, $reloadedRow->consecutive_failures); + $this->assertSame(1, $queuedJobsTable->find()->count(), 'no extra job queued'); + + Configure::delete('QueueScheduler.maxConsecutiveFailures'); + } + + /** + * A fresh dispatch (previous job succeeded, or none) clears the + * consecutive-failure streak. + * + * @return void + */ + public function testRunResetsFailureCounterOnFreshDispatch(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 'streak-reset', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + $jobId = $row->last_queued_job_id; + + // Previous job completed successfully; simulate a stale non-zero streak. + $queuedJobsTable->updateAll(['completed' => new DateTime()], ['id' => $jobId]); + $this->SchedulerRows->updateAll(['consecutive_failures' => 3], ['id' => $row->id]); + $row = $this->SchedulerRows->get($row->id); + + // Fresh dispatch: new job, streak cleared. + $this->assertTrue($this->SchedulerRows->run($row)); + + $this->assertSame(2, $queuedJobsTable->find()->count(), 'a fresh job was queued'); + $reloadedRow = $this->SchedulerRows->get($row->id); + $this->assertSame(0, $reloadedRow->consecutive_failures); + } + + /** + * Recovery for the acute cap: with maxConsecutiveFailures = 1 a row disables + * after its one rerun keeps aborting. Re-enabling it must grant a fresh round + * of reruns rather than re-disabling on the next tick: beforeSave() resets the + * streak, so the next tick reruns the job in place again. This works on every + * queue version because it flows through the rerun path, not a fresh dispatch + * gated by isQueued(). + * + * @return void + */ + public function testReEnablingDisabledRowGrantsFreshReruns(): void { + $this->loadPlugins(['Tools', 'Queue', 'QueueScheduler']); + Configure::write('QueueScheduler.maxConsecutiveFailures', 1); + $queuedJobsTable = $this->getTableLocator()->get('Queue.QueuedJobs'); + + $row = $this->SchedulerRows->newEntity([ + 'name' => 're-enable-reruns', + 'type' => SchedulerRow::TYPE_QUEUE_TASK, + 'content' => ExampleTask::class, + 'job_config' => null, + 'job_data' => '', + 'frequency' => '+1 hour', + 'allow_concurrent' => false, + ]); + $this->SchedulerRows->saveOrFail($row); + $row = $this->SchedulerRows->get($row->id); + + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + $jobId = $row->last_queued_job_id; + + $markAborted = function () use ($queuedJobsTable, $jobId): void { + $queuedJobsTable->updateAll( + ['status' => 'aborted', 'attempts' => 5, 'completed' => null], + ['id' => $jobId], + ); + }; + + // First abort: rerun in place (streak -> 1). + $markAborted(); + $this->assertTrue($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + + // Second abort: streak 1 >= cap 1 => disable. + $markAborted(); + $this->assertFalse($this->SchedulerRows->run($row)); + $row = $this->SchedulerRows->get($row->id); + $this->assertFalse($row->enabled); + $this->assertSame($jobId, $row->last_queued_job_id, 'job stays attached for visibility'); + + // Re-enable through the normal save path (as the admin edit action does): + // the streak resets, so the next tick reruns the same job in place again + // instead of re-disabling immediately. + $row = $this->SchedulerRows->patchEntity($row, ['enabled' => true]); + $this->SchedulerRows->saveOrFail($row); + $this->assertSame(0, $row->consecutive_failures, 're-enable clears the streak'); + + $markAborted(); + $this->assertTrue($this->SchedulerRows->run($row), 're-enabled row reruns again'); + $reloaded = $this->SchedulerRows->get($row->id); + $this->assertTrue($reloaded->enabled); + $this->assertSame(1, $reloaded->consecutive_failures); + $this->assertSame($jobId, $reloaded->last_queued_job_id, 'same job recycled, no pile-up'); + $this->assertSame(1, $queuedJobsTable->find()->count(), 'no extra job queued'); + + Configure::delete('QueueScheduler.maxConsecutiveFailures'); + } + /** * Regression: `run()` wraps the isQueued → createJob → save chain in a * transaction with a compare-and-swap on `last_run`. Two scheduler ticks