Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Classes/Command/SchedulerCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,11 @@ public function injectConnection(Connection $connection): void
* Reset stale jobs that have not changed for too long.
*
* @param string $groupName Free jobs in this group only
* @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago
* @throws Exception
*/
public function resetStaleJobsCommand(
string $groupName,
int $minutes = 10
): void {
$freed = $this->scheduler->resetStaleJobs($groupName, $minutes);
$freed = $this->scheduler->resetStaleJobs($groupName);
if ($freed) {
$this->outputLine('Freed ' . $freed . ' stale jobs.');
}
Expand Down
12 changes: 9 additions & 3 deletions Classes/Domain/AbstractScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ abstract class AbstractScheduler implements Scheduler
*/
protected TimeBaseForDueDateCalculation $timeBaseForDueDateCalculation;

#[Flow\InjectConfiguration(path: 'staleJobTimeout')]
protected int $staleJobTimeoutSecs;

protected const CLAIM_QUERY = "";
protected const SELECT_QUERY = "";
protected const RELEASE_QUERY = "";
Expand Down Expand Up @@ -280,16 +283,15 @@ public function activity(ScheduledJob $job): void
* Reset stale jobs that have not changed for too long.
*
* @param string $groupName Free jobs in this group only
* @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago
* @throws Exception
* @return int Number of freed jobs
*/
public function resetStaleJobs(string $groupName, int $minutes): int {
public function resetStaleJobs(string $groupName): int {
Comment thread
HenrikVogel marked this conversation as resolved.
Outdated
return $this->dbal->executeQuery(
sql: static::RESET_STALE_JOBS_QUERY,
params: [
'groupName' => $groupName,
'minutes' => max($minutes, 1),
'seconds' => max($this->staleJobTimeoutSecs, 1),
],
types: [
'groupName' => Types::STRING,
Expand Down Expand Up @@ -347,4 +349,8 @@ protected function validateGroupName(string $groupName): void
public function getConnection(): Connection {
return $this->dbal;
}

public function getStaleJobTimeoutSeconds(): int {
return $this->staleJobTimeoutSecs;
}
}
2 changes: 1 addition & 1 deletion Classes/Domain/MySQLScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class MySQLScheduler extends AbstractScheduler {
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity < NOW() - INTERVAL :minutes MINUTE
AND activity < NOW() - INTERVAL :seconds SECOND
MySQL;

}
2 changes: 1 addition & 1 deletion Classes/Domain/PostgreSQLScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class PostgreSQLScheduler extends AbstractScheduler {
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity < NOW() - make_interval(mins => :minutes)
AND activity < NOW() - make_interval(secs => :seconds)
PostgreSQL;

}
4 changes: 3 additions & 1 deletion Classes/Domain/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public function fail(ScheduledJob $job, string $reason): void;

public function activity(ScheduledJob $job): void;

public function resetStaleJobs(string $groupName, int $minutes): int;
public function resetStaleJobs(string $groupName): int;

public function getConnection(): Connection;

public function getStaleJobTimeoutSeconds(): int;
}
13 changes: 10 additions & 3 deletions Classes/Service/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\Exception\ConnectionLost;
use Doctrine\DBAL\Exception\RetryableException;
use Doctrine\DBAL\TransactionIsolationLevel;
use Doctrine\ORM\EntityManagerInterface;

/**
Expand Down Expand Up @@ -49,12 +50,18 @@ public function fetchOne(string $query, array $params = [], array $types = [])
});
}

// requires dbal autocommit to be enabled
public function fetchOneReadUncommited(string $query, array $params = [], array $types = [])
{
return $this->withAutoReconnectAndRetry(function () use ($query, $params, $types) {
$this->dbal->executeQuery("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
return $this->dbal->fetchOne($query, $params, $types);
$previous = $this->dbal->getTransactionIsolation();
try {
$this->dbal->setTransactionIsolation(TransactionIsolationLevel::READ_UNCOMMITTED);
return $this->dbal->transactional(function () use ($query, $params, $types) {
return $this->dbal->fetchOne($query, $params, $types);
});
} finally {
$this->dbal->setTransactionIsolation($previous);
}
});
}

Expand Down
63 changes: 18 additions & 45 deletions Classes/Service/JobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@
#[Flow\Scope("singleton")]
abstract class JobStatusService {

protected const string TOTAL_COUNT_QUERY = "";
protected const string RUNNING_COUNT_QUERY = "";
protected const string PENDING_COUNT_QUERY = "";
protected const string STALE_COUNT_QUERY = "";
protected const string FAILED_COUNT_QUERY = "";

#[Flow\Inject]
protected Scheduler $scheduler;

public function getTotalJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE groupname = :groupName
MySQL;
return $this->fetchOne(
$query,
static::TOTAL_COUNT_QUERY,
[
'groupName' => $groupName
],
Expand All @@ -31,36 +32,22 @@ public function getTotalJobCount(string $groupName): int {
}

public function getRunningJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity > NOW() - INTERVAL 2 SECOND
MySQL;
return $this->fetchOne(
$query,
static::RUNNING_COUNT_QUERY,
[
'groupName' => $groupName
'groupName' => $groupName,
'seconds' => $this->scheduler->getStaleJobTimeoutSeconds()
],
[
'groupName' => Types::STRING
'groupName' => Types::STRING,
'seconds' => Types::INTEGER
]
);
}

public function getPendingJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE ((running = 0
AND claimed = '')
OR running = 2)
AND groupname = :groupName
MySQL;
return $this->fetchOne(
$query,
static::PENDING_COUNT_QUERY,
[
'groupName' => $groupName
],
Expand All @@ -70,37 +57,23 @@ public function getPendingJobCount(string $groupName): int {
);
}

public function getStaleJobCount(string $groupName, int $minutes): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity < NOW() - INTERVAL :minutes MINUTE
MySQL;
public function getStaleJobCount(string $groupName): int {
return $this->fetchOne(
$query,
static::STALE_COUNT_QUERY,
[
"groupName" => $groupName,
"minutes" => $minutes
"seconds" => $this->scheduler->getStaleJobTimeoutSeconds()
],
[
"groupName" => Types::STRING,
"minutes" => Types::INTEGER
"seconds" => Types::INTEGER
]
);
}

public function getFailedJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE claimed LIKE 'failed(%)'
AND groupname = :groupName
MySQL;
return $this->fetchOne(
$query,
static::FAILED_COUNT_QUERY,
[
'groupName' => $groupName
],
Expand Down
35 changes: 35 additions & 0 deletions Classes/Service/MySQLJobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,41 @@

class MySQLJobStatusService extends JobStatusService {

protected const string TOTAL_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE groupname = :groupName
MySQL;

protected const string RUNNING_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity > NOW() - INTERVAL :seconds SECOND
MySQL;

protected const string PENDING_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE ((running = 0
AND claimed = '')
OR running = 2)
AND groupname = :groupName
MySQL;

protected const string STALE_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity <= NOW() - INTERVAL :seconds SECOND
MySQL;

protected const string FAILED_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE claimed LIKE 'failed(%)'
AND groupname = :groupName
MySQL;

protected function fetchOne(string $query, array $params = [], array $types = []) {
return $this->scheduler->getConnection()->fetchOneReadUncommited($query, $params, $types);
}
Expand Down
35 changes: 35 additions & 0 deletions Classes/Service/PostgreSQLJobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,39 @@

class PostgreSQLJobStatusService extends JobStatusService {

protected const string TOTAL_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE groupname = :groupName
PostgreSQL;

protected const string RUNNING_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity > NOW() - make_interval(secs => :seconds)
PostgreSQL;

protected const string PENDING_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE ((running = 0
AND claimed = '')
OR running = 2)
AND groupname = :groupName
PostgreSQL;

protected const string STALE_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity <= NOW() - make_interval(secs => :seconds)
PostgreSQL;

protected const string FAILED_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE claimed LIKE 'failed(%)'
AND groupname = :groupName
PostgreSQL;

}
8 changes: 8 additions & 0 deletions Configuration/Settings.Timeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Netlogix:
JobQueue:
Scheduled:

# Time (in seconds) of inactivity (no updates to the jobs activity timestamp column) after which a job
# is considered stale and needs to be reset (see SchedulerCommandController::resetStaleJobsCommand)
# in order to be picked up again.
staleJobTimeout: 60
Loading