Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php declare(strict_types=1);

use Migrations\BaseMigration;

/**
* Adds a `consecutive_failures` counter to queue_scheduler_rows.
*
* Tracks how many times in a row the row's dispatched job has terminally
* failed (queue status = aborted) without an intervening success. The
* scheduler uses it to back off: after `QueueScheduler.maxConsecutiveFailures`
* the row is disabled instead of re-dispatched, so a permanently-broken task
* stops piling up failed jobs. A successful (or fresh, non-aborted) dispatch
* resets it to 0.
*
* Defaults to 0 and is unsigned/not-null, so existing rows behave exactly as
* before (no backoff until the feature is configured).
*/
class QueueSchedulerConsecutiveFailures extends BaseMigration {

/**
* @return void
*/
public function change(): void {
$this->table('queue_scheduler_rows')
->addColumn('consecutive_failures', 'integer', [
'default' => 0,
'null' => false,
'signed' => false,
'comment' => 'Number of consecutive terminally-failed (aborted) dispatches without '
. 'an intervening success. Reset to 0 on a successful/fresh dispatch.',
])
->update();
}

}
11 changes: 11 additions & 0 deletions config/app.example.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@
// NOT coordinate across hosts — running loop mode on multiple hosts can still
// produce overlapping schedules.
//'lockPath' => TMP . 'queue_scheduler.lock',

// Backoff for repeatedly-failing non-concurrent rows. When the row's
// previously dispatched job terminally failed (queue status "aborted"),
// the next tick reruns that same job in place instead of queuing a new
// one — so a broken task does not pile up a failed job every interval.
// After this many consecutive reruns (without an intervening success)
// the row is disabled and a `QueueScheduler.Row.disabled` event is fired
// so the host app can alert. Re-enabling a disabled row resets the count.
// 0 (default) = unlimited reruns, never auto-disable. Requires the queue
// to record terminal "aborted" state.
//'maxConsecutiveFailures' => 0,
],
// Icon configuration for the backend UI (optional, but recommended for better UX)
// Without this, the UI will use Font Awesome icons from CDN when using the standalone layout.
Expand Down
18 changes: 18 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,24 @@ This is independent of `QueueScheduler.standalone` (which controls whether the a

`QueueScheduler.dashboardAutoRefresh` (integer, seconds; default `0`) sets a meta-refresh interval on the admin dashboard so it polls itself for fresh state without manual reload. `0` disables auto-refresh; a typical value is `30` or `60`.

### Failure backoff for non-concurrent rows

When a non-concurrent row's previously dispatched job *terminally* fails — the queue marks it `aborted` after exhausting its retries — the next tick **reruns that same job in place** instead of queuing a brand-new one. Without this, a persistently-broken scheduled task would leave one failed job behind every interval (e.g. ~1440 rows/day for an every-minute task). Reusing the row keeps it to a single, recycled job.

Still-retrying jobs are untouched: while the queue has retries left the job is genuinely in flight and the next tick is held back as usual, so there is no early re-dispatch.

`QueueScheduler.maxConsecutiveFailures` (integer; default `0`) caps how many consecutive reruns (without an intervening success) are granted before the row is **disabled** and a `QueueScheduler.Row.disabled` event is dispatched so the host app can alert:

```php
$this->getEventManager()->on('QueueScheduler.Row.disabled', function ($event) {
$row = $event->getData('row');
$failures = $event->getData('consecutiveFailures');
// notify ops…
});
```

So `1` reruns the job once and disables on the next abort, `3` grants three reruns, and `0` (default) means unlimited reruns and never auto-disable. A successful (or fresh, non-aborted) dispatch resets the counter, and re-enabling a disabled row also resets it — so the row gets a fresh round of reruns rather than re-disabling immediately, regardless of the cap. This relies on the queue recording terminal `aborted` state (cakephp-queue 8.15+); on older queue releases no job is ever marked aborted, so the feature is simply dormant and behaviour is unchanged.

### Scheduler health indicator

The admin index page shows a small pill next to the page header indicating whether cron is actively invoking the scheduler:
Expand Down
3 changes: 2 additions & 1 deletion src/Model/Entity/SchedulerRow.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* @property \Cake\I18n\DateTime|null $created
* @property \Cake\I18n\DateTime|null $modified
* @property bool $enabled
* @property int $consecutive_failures
* @property int|null $last_queued_job_id
* @property \Queue\Model\Entity\QueuedJob|null $last_queued_job
* @property-read string|null $job_task
Expand Down Expand Up @@ -557,7 +558,7 @@ protected function _getJobTask(): ?string {
protected function _getJobData(): array {
$param = [];
if ($this->param) {
$decoded = json_decode($this->param, true, JSON_THROW_ON_ERROR);
$decoded = json_decode($this->param, true, 512, JSON_THROW_ON_ERROR);
// Validation rejects scalar/null payloads at save time, but a row
// inserted directly via SQL or through a marshalling path that
// bypasses validation could still reach this method. Falling
Expand Down
166 changes: 162 additions & 4 deletions src/Model/Table/SchedulerRowsTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use DateInterval;
use Exception;
use InvalidArgumentException;
use Queue\Model\Table\QueuedJobsTable;
use Queue\Queue\Task;
use QueueScheduler\Model\Entity\SchedulerRow;
use RuntimeException;
Expand Down Expand Up @@ -439,6 +440,14 @@ protected function adjustCakeCommand(ArrayObject $data): void {
* @return void
*/
public function beforeSave(EventInterface $event, EntityInterface $entity, ArrayObject $options): void {
// Re-enabling a row clears any backoff streak so it gets a fresh round of
// reruns. Without this, a row disabled at the cap would re-disable on the
// very next tick (its consecutive_failures is still at the cap) without
// ever retrying — acute when maxConsecutiveFailures is 1.
if ($entity->isDirty('enabled') && $entity->enabled) {
$entity->consecutive_failures = 0;
}

if (
$entity->next_run === null
|| $entity->isDirty('frequency')
Expand Down Expand Up @@ -502,20 +511,120 @@ public function run(SchedulerRow $row): bool {
$config['reference'] = $row->job_reference;
$oldLastRun = $row->last_run;

return $this->getConnection()->transactional(function () use ($row, $config, $oldLastRun): bool {
// $dispatched is what run() reports to the caller: true only when a job
// was actually queued or reran. The transactional closure returns true
// whenever its writes should COMMIT (a dispatch OR a backoff-disable) and
// false only to roll back (hold-back / lost compare-and-swap). The
// disable case therefore commits its `enabled = false` write while still
// reporting "nothing dispatched" to the caller — returning false from the
// closure would roll the disable back.
$dispatched = false;
// Set inside the closure when the backoff cap disables the row. The
// disable write commits with the transaction; the alert event + log fire
// only AFTER commit (below) so a listener never runs mid-transaction and
// can rely on the disabled state being durable.
$disabled = false;

$this->getConnection()->transactional(function () use ($row, $config, $oldLastRun, &$dispatched, &$disabled): bool {
/** @var \Queue\Model\Table\QueuedJobsTable $queuedJobsTable */
$queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs');
if (!$row->allow_concurrent && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) {
return false;

$abortedJobId = null;
if (!$row->allow_concurrent) {
// If the previous dispatch terminally failed (queue status =
// aborted), rerun that same row in place rather than piling up a
// fresh failed job every tick (see #1). Checked BEFORE the
// in-flight hold-back so this works regardless of whether the
// installed queue release already excludes aborted rows from
// isQueued() (cakephp-queue#504) — an aborted job is terminal
// and never blocks the next dispatch.
$abortedJobId = $this->abortedJobId($queuedJobsTable, $row);

// A genuinely in-flight job (pending/running/retrying) still
// blocks the next dispatch.
if ($abortedJobId === null && $queuedJobsTable->isQueued($row->job_reference, $row->job_task)) {
return false;
}
}

$now = new DateTime();

if ($abortedJobId !== null) {
$max = (int)Configure::read('QueueScheduler.maxConsecutiveFailures', 0);

// #2 backoff: once we have already recycled this row's job $max
// times in a row without an intervening success and it is STILL
// aborting, stop retrying — disable the row and alert. The check is
// on the count BEFORE this tick, so $max is the number of reruns
// granted before giving up. A re-enable resets the counter (see
// beforeSave()), granting another $max reruns; recovery therefore
// works on every queue version because it flows through the rerun
// path below rather than depending on isQueued() excluding aborted.
if ($max > 0 && $row->consecutive_failures >= $max) {
$updated = $this->updateAll(
['enabled' => false],
['id' => $row->id, 'last_run IS' => $oldLastRun],
);
if ($updated === 0) {
return false;
}

$row->enabled = false;
$disabled = true;

// Commit the disable; the alert event + log fire after commit
// (below). Report "not dispatched" to the caller.
return true;
}

// #1 rerun in place: recycle the aborted job rather than piling up
// a fresh failed job every tick. Claim the tick first (compare-and-
// swap on last_run); only then mutate the job, so a lost race never
// resets a row another tick is about to reuse. last_queued_job_id
// already points at this job.
$failures = $row->consecutive_failures + 1;
$updated = $this->updateAll(
['last_run' => $now, 'consecutive_failures' => $failures],
['id' => $row->id, 'last_run IS' => $oldLastRun],
);
if ($updated === 0) {
return false;
}

// Recycle the aborted job in place. reset() clears attempts /
// fetched / etc. using the installed queue schema (so we stay
// compatible with older queue versions). Its guards always hold
// for a terminal aborted job: completed IS null, attempts > 0, and
// notbefore is in the past (the final failed attempt already ran).
$queuedJobsTable->reset($abortedJobId);
// reset() in the currently-supported queue versions does not clear
// the terminal `status`, so drop it here; otherwise abortedJobId()
// would still match the job next tick and the rerun would not take.
// cakephp-queue#505 makes reset() clear `status` itself — once the
// min queue requirement includes that release, this line can go.
$queuedJobsTable->updateAll(['status' => null], ['id' => $abortedJobId]);

$row->last_run = $now;
$row->consecutive_failures = $failures;
$row->next_run = $row->calculateNextRun();
$this->updateAll(
['next_run' => $row->next_run],
['id' => $row->id],
);

$dispatched = true;

return true;
}

$queuedJob = $queuedJobsTable->createJob($row->job_task, $row->job_data, $config);

$now = new DateTime();
$updated = $this->updateAll(
[
'last_run' => $now,
'last_queued_job_id' => $queuedJob->id,
// Fresh dispatch (prior succeeded or none): clear the streak.
'consecutive_failures' => 0,
],
[
'id' => $row->id,
Expand All @@ -533,6 +642,7 @@ public function run(SchedulerRow $row): bool {

$row->last_run = $now;
$row->last_queued_job_id = $queuedJob->id;
$row->consecutive_failures = 0;
// next_run is recomputed by beforeSave when last_run is dirty; the
// updateAll bypassed that, so do it explicitly to keep persistent
// state consistent without re-running the full save pipeline.
Expand All @@ -542,8 +652,56 @@ public function run(SchedulerRow $row): bool {
['id' => $row->id],
);

$dispatched = true;

return true;
});

if ($disabled) {
$this->dispatchEvent('QueueScheduler.Row.disabled', [
'row' => $row,
'consecutiveFailures' => $row->consecutive_failures,
]);
Log::warning(sprintf(
'QueueScheduler: row #%d (%s) disabled after %d consecutive failures.',
(int)$row->id,
(string)$row->name,
$row->consecutive_failures,
));
}

return $dispatched;
}

/**
* Returns the id of the row's last dispatched job if it terminally failed
* (queue status = aborted, not completed), else null.
*
* Pairs with the terminal "aborted" state persisted by cakephp-queue#504.
* The literal status string is used so this stays a no-op on a queue
* release that does not yet persist it (no aborted rows ⇒ always null).
*
* @param \Queue\Model\Table\QueuedJobsTable $queuedJobsTable
* @param \QueueScheduler\Model\Entity\SchedulerRow $row
*
* @return int|null
*/
protected function abortedJobId(QueuedJobsTable $queuedJobsTable, SchedulerRow $row): ?int {
if (!$row->last_queued_job_id) {
return null;
}

/** @var \Queue\Model\Entity\QueuedJob|null $job */
$job = $queuedJobsTable->find()
->where([
'id' => $row->last_queued_job_id,
'completed IS' => null,
'status' => 'aborted',
])
->select(['id'])
->first();

return $job?->id;
}

/**
Expand Down
1 change: 1 addition & 0 deletions tests/Fixture/SchedulerRowsFixture.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SchedulerRowsFixture extends TestFixture {
'last_queued_job_id' => ['type' => 'integer', 'length' => null, 'unsigned' => true, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
'allow_concurrent' => ['type' => 'boolean', 'length' => null, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null],
'enabled' => ['type' => 'boolean', 'length' => null, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null],
'consecutive_failures' => ['type' => 'integer', 'length' => null, 'unsigned' => true, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null],
'window_start_time' => ['type' => 'time', 'length' => null, 'precision' => null, 'null' => true, 'default' => null, 'comment' => ''],
'window_end_time' => ['type' => 'time', 'length' => null, 'precision' => null, 'null' => true, 'default' => null, 'comment' => ''],
'window_days_of_week' => ['type' => 'string', 'length' => 32, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
Expand Down
Loading
Loading