diff --git a/Classes/Command/SchedulerCommandController.php b/Classes/Command/SchedulerCommandController.php index dcc9ea4..e497fc3 100644 --- a/Classes/Command/SchedulerCommandController.php +++ b/Classes/Command/SchedulerCommandController.php @@ -54,14 +54,11 @@ public function injectConnection(Connection $connection): void * Reset stale jobs that have not changed for too long. * * @param string $groupName Free jobs in this group only - * @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago - * @throws Exception */ public function resetStaleJobsCommand( string $groupName, - int $minutes = 10 ): void { - $freed = $this->scheduler->resetStaleJobs($groupName, $minutes); + $freed = $this->scheduler->resetStaleJobs($groupName); if ($freed) { $this->outputLine('Freed ' . $freed . ' stale jobs.'); } diff --git a/Classes/Domain/AbstractScheduler.php b/Classes/Domain/AbstractScheduler.php index c62a46c..5c4c556 100644 --- a/Classes/Domain/AbstractScheduler.php +++ b/Classes/Domain/AbstractScheduler.php @@ -37,6 +37,9 @@ abstract class AbstractScheduler implements Scheduler */ protected TimeBaseForDueDateCalculation $timeBaseForDueDateCalculation; + #[Flow\InjectConfiguration(path: 'staleJobTimeout')] + protected int $staleJobTimeoutSecs; + protected const CLAIM_QUERY = ""; protected const SELECT_QUERY = ""; protected const RELEASE_QUERY = ""; @@ -280,16 +283,15 @@ public function activity(ScheduledJob $job): void * Reset stale jobs that have not changed for too long. * * @param string $groupName Free jobs in this group only - * @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago * @throws Exception * @return int Number of freed jobs */ - public function resetStaleJobs(string $groupName, int $minutes): int { + public function resetStaleJobs(string $groupName): int { return $this->dbal->executeQuery( sql: static::RESET_STALE_JOBS_QUERY, params: [ 'groupName' => $groupName, - 'minutes' => max($minutes, 1), + 'seconds' => max($this->staleJobTimeoutSecs, 1), ], types: [ 'groupName' => Types::STRING, @@ -347,4 +349,8 @@ protected function validateGroupName(string $groupName): void public function getConnection(): Connection { return $this->dbal; } + + public function getStaleJobTimeoutSeconds(): int { + return $this->staleJobTimeoutSecs; + } } diff --git a/Classes/Domain/MySQLScheduler.php b/Classes/Domain/MySQLScheduler.php index 3bf92bd..b2ab18b 100644 --- a/Classes/Domain/MySQLScheduler.php +++ b/Classes/Domain/MySQLScheduler.php @@ -118,7 +118,7 @@ class MySQLScheduler extends AbstractScheduler { WHERE running = 1 AND claimed NOT LIKE 'failed(%)' AND groupname = :groupName - AND activity < NOW() - INTERVAL :minutes MINUTE + AND activity < NOW() - INTERVAL :seconds SECOND MySQL; } diff --git a/Classes/Domain/PostgreSQLScheduler.php b/Classes/Domain/PostgreSQLScheduler.php index 811677a..8afe497 100644 --- a/Classes/Domain/PostgreSQLScheduler.php +++ b/Classes/Domain/PostgreSQLScheduler.php @@ -83,7 +83,7 @@ class PostgreSQLScheduler extends AbstractScheduler { WHERE running = 1 AND claimed NOT LIKE 'failed(%)' AND groupname = :groupName - AND activity < NOW() - make_interval(mins => :minutes) + AND activity < NOW() - make_interval(secs => :seconds) PostgreSQL; } diff --git a/Classes/Domain/Scheduler.php b/Classes/Domain/Scheduler.php index 63607af..4145e86 100644 --- a/Classes/Domain/Scheduler.php +++ b/Classes/Domain/Scheduler.php @@ -23,7 +23,9 @@ public function fail(ScheduledJob $job, string $reason): void; public function activity(ScheduledJob $job): void; - public function resetStaleJobs(string $groupName, int $minutes): int; + public function resetStaleJobs(string $groupName): int; public function getConnection(): Connection; + + public function getStaleJobTimeoutSeconds(): int; } diff --git a/Classes/Service/Connection.php b/Classes/Service/Connection.php index ee0dcde..07757f5 100644 --- a/Classes/Service/Connection.php +++ b/Classes/Service/Connection.php @@ -8,6 +8,7 @@ use Doctrine\DBAL\Connection as DBALConnection; use Doctrine\DBAL\Exception\ConnectionLost; use Doctrine\DBAL\Exception\RetryableException; +use Doctrine\DBAL\TransactionIsolationLevel; use Doctrine\ORM\EntityManagerInterface; /** @@ -49,12 +50,18 @@ public function fetchOne(string $query, array $params = [], array $types = []) }); } - // requires dbal autocommit to be enabled public function fetchOneReadUncommited(string $query, array $params = [], array $types = []) { return $this->withAutoReconnectAndRetry(function () use ($query, $params, $types) { - $this->dbal->executeQuery("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"); - return $this->dbal->fetchOne($query, $params, $types); + $previous = $this->dbal->getTransactionIsolation(); + try { + $this->dbal->setTransactionIsolation(TransactionIsolationLevel::READ_UNCOMMITTED); + return $this->dbal->transactional(function () use ($query, $params, $types) { + return $this->dbal->fetchOne($query, $params, $types); + }); + } finally { + $this->dbal->setTransactionIsolation($previous); + } }); } diff --git a/Classes/Service/JobStatusService.php b/Classes/Service/JobStatusService.php index 66372d2..df84d54 100644 --- a/Classes/Service/JobStatusService.php +++ b/Classes/Service/JobStatusService.php @@ -10,17 +10,18 @@ #[Flow\Scope("singleton")] abstract class JobStatusService { + protected const string TOTAL_COUNT_QUERY = ""; + protected const string RUNNING_COUNT_QUERY = ""; + protected const string PENDING_COUNT_QUERY = ""; + protected const string STALE_COUNT_QUERY = ""; + protected const string FAILED_COUNT_QUERY = ""; + #[Flow\Inject] protected Scheduler $scheduler; public function getTotalJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::TOTAL_COUNT_QUERY, [ 'groupName' => $groupName ], @@ -31,36 +32,22 @@ public function getTotalJobCount(string $groupName): int { } public function getRunningJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = << NOW() - INTERVAL 2 SECOND - MySQL; return $this->fetchOne( - $query, + static::RUNNING_COUNT_QUERY, [ - 'groupName' => $groupName + 'groupName' => $groupName, + 'seconds' => $this->scheduler->getStaleJobTimeoutSeconds() ], [ - 'groupName' => Types::STRING + 'groupName' => Types::STRING, + 'seconds' => Types::INTEGER ] ); } public function getPendingJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::PENDING_COUNT_QUERY, [ 'groupName' => $groupName ], @@ -70,37 +57,23 @@ public function getPendingJobCount(string $groupName): int { ); } - public function getStaleJobCount(string $groupName, int $minutes): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::STALE_COUNT_QUERY, [ "groupName" => $groupName, - "minutes" => $minutes + "seconds" => $this->scheduler->getStaleJobTimeoutSeconds() ], [ "groupName" => Types::STRING, - "minutes" => Types::INTEGER + "seconds" => Types::INTEGER ] ); } public function getFailedJobCount(string $groupName): int { - $tableName = ScheduledJob::TABLE_NAME; - $query = <<fetchOne( - $query, + static::FAILED_COUNT_QUERY, [ 'groupName' => $groupName ], diff --git a/Classes/Service/MySQLJobStatusService.php b/Classes/Service/MySQLJobStatusService.php index 1a61c74..200b0d7 100644 --- a/Classes/Service/MySQLJobStatusService.php +++ b/Classes/Service/MySQLJobStatusService.php @@ -4,6 +4,41 @@ class MySQLJobStatusService extends JobStatusService { + protected const string TOTAL_COUNT_QUERY = << NOW() - INTERVAL :seconds SECOND + MySQL; + + protected const string PENDING_COUNT_QUERY = <<scheduler->getConnection()->fetchOneReadUncommited($query, $params, $types); } diff --git a/Classes/Service/PostgreSQLJobStatusService.php b/Classes/Service/PostgreSQLJobStatusService.php index 0957bc2..b5643ee 100644 --- a/Classes/Service/PostgreSQLJobStatusService.php +++ b/Classes/Service/PostgreSQLJobStatusService.php @@ -4,4 +4,39 @@ class PostgreSQLJobStatusService extends JobStatusService { + protected const string TOTAL_COUNT_QUERY = << NOW() - make_interval(secs => :seconds) + PostgreSQL; + + protected const string PENDING_COUNT_QUERY = << :seconds) + PostgreSQL; + + protected const string FAILED_COUNT_QUERY = <<