From f82161d88f5b8c97fe6c9b4e66e4624606bb568f Mon Sep 17 00:00:00 2001 From: Henrik Vogel Date: Wed, 11 Mar 2026 09:20:28 +0100 Subject: [PATCH 1/3] fix: use doctrine transaction for isolation level read uncommitted --- Classes/Service/Connection.php | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/Classes/Service/Connection.php b/Classes/Service/Connection.php index ee0dcde..07757f5 100644 --- a/Classes/Service/Connection.php +++ b/Classes/Service/Connection.php @@ -8,6 +8,7 @@ use Doctrine\DBAL\Connection as DBALConnection; use Doctrine\DBAL\Exception\ConnectionLost; use Doctrine\DBAL\Exception\RetryableException; +use Doctrine\DBAL\TransactionIsolationLevel; use Doctrine\ORM\EntityManagerInterface; /** @@ -49,12 +50,18 @@ public function fetchOne(string $query, array $params = [], array $types = []) }); } - // requires dbal autocommit to be enabled public function fetchOneReadUncommited(string $query, array $params = [], array $types = []) { return $this->withAutoReconnectAndRetry(function () use ($query, $params, $types) { - $this->dbal->executeQuery("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"); - return $this->dbal->fetchOne($query, $params, $types); + $previous = $this->dbal->getTransactionIsolation(); + try { + $this->dbal->setTransactionIsolation(TransactionIsolationLevel::READ_UNCOMMITTED); + return $this->dbal->transactional(function () use ($query, $params, $types) { + return $this->dbal->fetchOne($query, $params, $types); + }); + } finally { + $this->dbal->setTransactionIsolation($previous); + } }); } From b02ab50627deff88db0b58ff072608c7a2b8214b Mon Sep 17 00:00:00 2001 From: Henrik Vogel Date: Wed, 11 Mar 2026 09:22:48 +0100 Subject: [PATCH 2/3] fix: unify stale job timeout and expose as config setting --- Classes/Command/SchedulerCommandController.php | 5 +---- Classes/Domain/AbstractScheduler.php | 8 +++++--- Classes/Domain/MySQLScheduler.php | 2 +- Classes/Domain/PostgreSQLScheduler.php | 2 +- Classes/Domain/Scheduler.php | 2 +- Configuration/Settings.Timeout.yaml | 8 ++++++++ 6 files changed, 17 insertions(+), 10 deletions(-) create mode 100644 Configuration/Settings.Timeout.yaml diff --git a/Classes/Command/SchedulerCommandController.php b/Classes/Command/SchedulerCommandController.php index dcc9ea4..e497fc3 100644 --- a/Classes/Command/SchedulerCommandController.php +++ b/Classes/Command/SchedulerCommandController.php @@ -54,14 +54,11 @@ public function injectConnection(Connection $connection): void * Reset stale jobs that have not changed for too long. * * @param string $groupName Free jobs in this group only - * @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago - * @throws Exception */ public function resetStaleJobsCommand( string $groupName, - int $minutes = 10 ): void { - $freed = $this->scheduler->resetStaleJobs($groupName, $minutes); + $freed = $this->scheduler->resetStaleJobs($groupName); if ($freed) { $this->outputLine('Freed ' . $freed . ' stale jobs.'); } diff --git a/Classes/Domain/AbstractScheduler.php b/Classes/Domain/AbstractScheduler.php index c62a46c..35af37f 100644 --- a/Classes/Domain/AbstractScheduler.php +++ b/Classes/Domain/AbstractScheduler.php @@ -37,6 +37,9 @@ abstract class AbstractScheduler implements Scheduler */ protected TimeBaseForDueDateCalculation $timeBaseForDueDateCalculation; + #[Flow\InjectConfiguration(path: 'staleJobTimeout')] + protected int $staleJobTimeoutSecs; + protected const CLAIM_QUERY = ""; protected const SELECT_QUERY = ""; protected const RELEASE_QUERY = ""; @@ -280,16 +283,15 @@ public function activity(ScheduledJob $job): void * Reset stale jobs that have not changed for too long. * * @param string $groupName Free jobs in this group only - * @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago * @throws Exception * @return int Number of freed jobs */ - public function resetStaleJobs(string $groupName, int $minutes): int { + public function resetStaleJobs(string $groupName): int { return $this->dbal->executeQuery( sql: static::RESET_STALE_JOBS_QUERY, params: [ 'groupName' => $groupName, - 'minutes' => max($minutes, 1), + 'seconds' => max($this->staleJobTimeoutSecs, 1), ], types: [ 'groupName' => Types::STRING, diff --git a/Classes/Domain/MySQLScheduler.php b/Classes/Domain/MySQLScheduler.php index 3bf92bd..b2ab18b 100644 --- a/Classes/Domain/MySQLScheduler.php +++ b/Classes/Domain/MySQLScheduler.php @@ -118,7 +118,7 @@ class MySQLScheduler extends AbstractScheduler { WHERE running = 1 AND claimed NOT LIKE 'failed(%)' AND groupname = :groupName - AND activity < NOW() - INTERVAL :minutes MINUTE + AND activity < NOW() - INTERVAL :seconds SECOND MySQL; } diff --git a/Classes/Domain/PostgreSQLScheduler.php b/Classes/Domain/PostgreSQLScheduler.php index 811677a..8afe497 100644 --- a/Classes/Domain/PostgreSQLScheduler.php +++ b/Classes/Domain/PostgreSQLScheduler.php @@ -83,7 +83,7 @@ class PostgreSQLScheduler extends AbstractScheduler { WHERE running = 1 AND claimed NOT LIKE 'failed(%)' AND groupname = :groupName - AND activity < NOW() - make_interval(mins => :minutes) + AND activity < NOW() - make_interval(secs => :seconds) PostgreSQL; } diff --git a/Classes/Domain/Scheduler.php b/Classes/Domain/Scheduler.php index 63607af..1261586 100644 --- a/Classes/Domain/Scheduler.php +++ b/Classes/Domain/Scheduler.php @@ -23,7 +23,7 @@ public function fail(ScheduledJob $job, string $reason): void; public function activity(ScheduledJob $job): void; - public function resetStaleJobs(string $groupName, int $minutes): int; + public function resetStaleJobs(string $groupName): int; public function getConnection(): Connection; } diff --git a/Configuration/Settings.Timeout.yaml b/Configuration/Settings.Timeout.yaml new file mode 100644 index 0000000..44d62d1 --- /dev/null +++ b/Configuration/Settings.Timeout.yaml @@ -0,0 +1,8 @@ +Netlogix: + JobQueue: + Scheduled: + + # Time (in seconds) of inactivity (no updates to the jobs activity timestamp column) after which a job + # is considered stale and needs to be reset (see SchedulerCommandController::resetStaleJobsCommand) + # in order to be picked up again. + staleJobTimeout: 60 From 7e40f0459c14ba156b3003589a67d3e59affbcc9 Mon Sep 17 00:00:00 2001 From: Henrik Vogel Date: Wed, 11 Mar 2026 11:47:49 +0100 Subject: [PATCH 3/3] fix: separate mysql and postgres job status queries --- Classes/Domain/AbstractScheduler.php | 4 ++ Classes/Domain/Scheduler.php | 2 + Classes/Service/JobStatusService.php | 63 ++++++------------- Classes/Service/MySQLJobStatusService.php | 35 +++++++++++ .../Service/PostgreSQLJobStatusService.php | 35 +++++++++++ 5 files changed, 94 insertions(+), 45 deletions(-) diff --git a/Classes/Domain/AbstractScheduler.php b/Classes/Domain/AbstractScheduler.php index 35af37f..5c4c556 100644 --- a/Classes/Domain/AbstractScheduler.php +++ b/Classes/Domain/AbstractScheduler.php @@ -349,4 +349,8 @@ protected function validateGroupName(string $groupName): void public function getConnection(): Connection { return $this->dbal; } + + public function getStaleJobTimeoutSeconds(): int { + return $this->staleJobTimeoutSecs; + } } diff --git a/Classes/Domain/Scheduler.php b/Classes/Domain/Scheduler.php index 1261586..4145e86 100644 --- a/Classes/Domain/Scheduler.php +++ b/Classes/Domain/Scheduler.php @@ -26,4 +26,6 @@ public function activity(ScheduledJob $job): void; public function resetStaleJobs(string $groupName): int; public function getConnection(): Connection; + + public function getStaleJobTimeoutSeconds(): int; } diff --git a/Classes/Service/JobStatusService.php b/Classes/Service/JobStatusService.php index 66372d2..df84d54 100644 --- a/Classes/Service/JobStatusService.php +++ b/Classes/Service/JobStatusService.php @@ -10,17 +10,18 @@ #[Flow\Scope("singleton")] abstract class JobStatusService { + protected const string TOTAL_COUNT_QUERY = ""; + protected const string RUNNING_COUNT_QUERY = ""; + protected const string PENDING_COUNT_QUERY = ""; + protected const string STALE_COUNT_QUERY = ""; + protected const string FAILED_COUNT_QUERY = ""; + #[Flow\Inject] protected Scheduler $scheduler; public function getTotalJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::TOTAL_COUNT_QUERY, [ 'groupName' => $groupName ], @@ -31,36 +32,22 @@ public function getTotalJobCount(string $groupName): int { } public function getRunningJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = << NOW() - INTERVAL 2 SECOND - MySQL; return $this->fetchOne( - $query, + static::RUNNING_COUNT_QUERY, [ - 'groupName' => $groupName + 'groupName' => $groupName, + 'seconds' => $this->scheduler->getStaleJobTimeoutSeconds() ], [ - 'groupName' => Types::STRING + 'groupName' => Types::STRING, + 'seconds' => Types::INTEGER ] ); } public function getPendingJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::PENDING_COUNT_QUERY, [ 'groupName' => $groupName ], @@ -70,37 +57,23 @@ public function getPendingJobCount(string $groupName): int { ); } - public function getStaleJobCount(string $groupName, int $minutes): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::STALE_COUNT_QUERY, [ "groupName" => $groupName, - "minutes" => $minutes + "seconds" => $this->scheduler->getStaleJobTimeoutSeconds() ], [ "groupName" => Types::STRING, - "minutes" => Types::INTEGER + "seconds" => Types::INTEGER ] ); } public function getFailedJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::FAILED_COUNT_QUERY, [ 'groupName' => $groupName ], diff --git a/Classes/Service/MySQLJobStatusService.php b/Classes/Service/MySQLJobStatusService.php index 1a61c74..200b0d7 100644 --- a/Classes/Service/MySQLJobStatusService.php +++ b/Classes/Service/MySQLJobStatusService.php @@ -4,6 +4,41 @@ class MySQLJobStatusService extends JobStatusService { + protected const string TOTAL_COUNT_QUERY = << NOW() - INTERVAL :seconds SECOND + MySQL; + + protected const string PENDING_COUNT_QUERY = <<scheduler->getConnection()->fetchOneReadUncommited($query, $params, $types); } diff --git a/Classes/Service/PostgreSQLJobStatusService.php b/Classes/Service/PostgreSQLJobStatusService.php index 0957bc2..b5643ee 100644 --- a/Classes/Service/PostgreSQLJobStatusService.php +++ b/Classes/Service/PostgreSQLJobStatusService.php @@ -4,4 +4,39 @@ class PostgreSQLJobStatusService extends JobStatusService { + protected const string TOTAL_COUNT_QUERY = << NOW() - make_interval(secs => :seconds) + PostgreSQL; + + protected const string PENDING_COUNT_QUERY = << :seconds) + PostgreSQL; + + protected const string FAILED_COUNT_QUERY = <<