Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Classes/Command/SchedulerCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,11 @@ public function injectConnection(Connection $connection): void
* Reset stale jobs that have not changed for too long.
*
* @param string $groupName Free jobs in this group only
* @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago
* @throws Exception
*/
public function resetStaleJobsCommand(
string $groupName,
int $minutes = 10
): void {
$freed = $this->scheduler->resetStaleJobs($groupName, $minutes);
$freed = $this->scheduler->resetStaleJobs($groupName);
if ($freed) {
$this->outputLine('Freed ' . $freed . ' stale jobs.');
}
Expand Down
12 changes: 9 additions & 3 deletions Classes/Domain/AbstractScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ abstract class AbstractScheduler implements Scheduler
*/
protected TimeBaseForDueDateCalculation $timeBaseForDueDateCalculation;

#[Flow\InjectConfiguration(path: 'staleJobTimeout')]
protected int $staleJobTimeoutSecs;

protected const CLAIM_QUERY = "";
protected const SELECT_QUERY = "";
protected const RELEASE_QUERY = "";
Expand Down Expand Up @@ -280,16 +283,15 @@ public function activity(ScheduledJob $job): void
* Reset stale jobs that have not changed for too long.
*
* @param string $groupName Free jobs in this group only
* @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago
* @throws Exception
* @return int Number of freed jobs
*/
public function resetStaleJobs(string $groupName, int $minutes): int {
public function resetStaleJobs(string $groupName): int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this param is a breaking change. Is that fine or do we want to keep accepting a second parameter to override the configured timeout?

return $this->dbal->executeQuery(
sql: static::RESET_STALE_JOBS_QUERY,
params: [
'groupName' => $groupName,
'minutes' => max($minutes, 1),
'seconds' => max($this->staleJobTimeoutSecs, 1),
],
types: [
'groupName' => Types::STRING,
Expand Down Expand Up @@ -347,4 +349,8 @@ protected function validateGroupName(string $groupName): void
public function getConnection(): Connection {
return $this->dbal;
}

public function getStaleJobTimeoutSeconds(): int {
return $this->staleJobTimeoutSecs;
}
}
2 changes: 1 addition & 1 deletion Classes/Domain/MySQLScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class MySQLScheduler extends AbstractScheduler {
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity < NOW() - INTERVAL :minutes MINUTE
AND activity < NOW() - INTERVAL :seconds SECOND
MySQL;

}
2 changes: 1 addition & 1 deletion Classes/Domain/PostgreSQLScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class PostgreSQLScheduler extends AbstractScheduler {
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity < NOW() - make_interval(mins => :minutes)
AND activity < NOW() - make_interval(secs => :seconds)
PostgreSQL;

}
4 changes: 3 additions & 1 deletion Classes/Domain/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public function fail(ScheduledJob $job, string $reason): void;

public function activity(ScheduledJob $job): void;

public function resetStaleJobs(string $groupName, int $minutes): int;
public function resetStaleJobs(string $groupName): int;

public function getConnection(): Connection;

public function getStaleJobTimeoutSeconds(): int;
}
13 changes: 10 additions & 3 deletions Classes/Service/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\Exception\ConnectionLost;
use Doctrine\DBAL\Exception\RetryableException;
use Doctrine\DBAL\TransactionIsolationLevel;
use Doctrine\ORM\EntityManagerInterface;

/**
Expand Down Expand Up @@ -49,12 +50,18 @@ public function fetchOne(string $query, array $params = [], array $types = [])
});
}

// requires dbal autocommit to be enabled
public function fetchOneReadUncommited(string $query, array $params = [], array $types = [])
{
return $this->withAutoReconnectAndRetry(function () use ($query, $params, $types) {
$this->dbal->executeQuery("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
return $this->dbal->fetchOne($query, $params, $types);
$previous = $this->dbal->getTransactionIsolation();
try {
$this->dbal->setTransactionIsolation(TransactionIsolationLevel::READ_UNCOMMITTED);
return $this->dbal->transactional(function () use ($query, $params, $types) {
return $this->dbal->fetchOne($query, $params, $types);
});
} finally {
$this->dbal->setTransactionIsolation($previous);
}
});
}

Expand Down
63 changes: 18 additions & 45 deletions Classes/Service/JobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@
#[Flow\Scope("singleton")]
abstract class JobStatusService {

protected const string TOTAL_COUNT_QUERY = "";
protected const string RUNNING_COUNT_QUERY = "";
protected const string PENDING_COUNT_QUERY = "";
protected const string STALE_COUNT_QUERY = "";
protected const string FAILED_COUNT_QUERY = "";

#[Flow\Inject]
protected Scheduler $scheduler;

public function getTotalJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE groupname = :groupName
MySQL;
return $this->fetchOne(
$query,
static::TOTAL_COUNT_QUERY,
[
'groupName' => $groupName
],
Expand All @@ -31,36 +32,22 @@ public function getTotalJobCount(string $groupName): int {
}

public function getRunningJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity > NOW() - INTERVAL 2 SECOND
MySQL;
return $this->fetchOne(
$query,
static::RUNNING_COUNT_QUERY,
[
'groupName' => $groupName
'groupName' => $groupName,
'seconds' => $this->scheduler->getStaleJobTimeoutSeconds()
],
[
'groupName' => Types::STRING
'groupName' => Types::STRING,
'seconds' => Types::INTEGER
]
);
}

public function getPendingJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE ((running = 0
AND claimed = '')
OR running = 2)
AND groupname = :groupName
MySQL;
return $this->fetchOne(
$query,
static::PENDING_COUNT_QUERY,
[
'groupName' => $groupName
],
Expand All @@ -70,37 +57,23 @@ public function getPendingJobCount(string $groupName): int {
);
}

public function getStaleJobCount(string $groupName, int $minutes): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity < NOW() - INTERVAL :minutes MINUTE
MySQL;
public function getStaleJobCount(string $groupName): int {
return $this->fetchOne(
$query,
static::STALE_COUNT_QUERY,
[
"groupName" => $groupName,
"minutes" => $minutes
"seconds" => $this->scheduler->getStaleJobTimeoutSeconds()
],
[
"groupName" => Types::STRING,
"minutes" => Types::INTEGER
"seconds" => Types::INTEGER
]
);
}

public function getFailedJobCount(string $groupName): int {
$tableName = ScheduledJob::TABLE_NAME;
$query = <<<MySQL
SELECT COUNT(*) FROM {$tableName}
WHERE claimed LIKE 'failed(%)'
AND groupname = :groupName
MySQL;
return $this->fetchOne(
$query,
static::FAILED_COUNT_QUERY,
[
'groupName' => $groupName
],
Expand Down
35 changes: 35 additions & 0 deletions Classes/Service/MySQLJobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,41 @@

class MySQLJobStatusService extends JobStatusService {

protected const string TOTAL_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE groupname = :groupName
MySQL;

protected const string RUNNING_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity > NOW() - INTERVAL :seconds SECOND
MySQL;

protected const string PENDING_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE ((running = 0
AND claimed = '')
OR running = 2)
AND groupname = :groupName
MySQL;

protected const string STALE_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity <= NOW() - INTERVAL :seconds SECOND
MySQL;

protected const string FAILED_COUNT_QUERY = <<<MySQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE claimed LIKE 'failed(%)'
AND groupname = :groupName
MySQL;

protected function fetchOne(string $query, array $params = [], array $types = []) {
return $this->scheduler->getConnection()->fetchOneReadUncommited($query, $params, $types);
}
Expand Down
35 changes: 35 additions & 0 deletions Classes/Service/PostgreSQLJobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,39 @@

class PostgreSQLJobStatusService extends JobStatusService {

protected const string TOTAL_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE groupname = :groupName
PostgreSQL;

protected const string RUNNING_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity > NOW() - make_interval(secs => :seconds)
PostgreSQL;

protected const string PENDING_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE ((running = 0
AND claimed = '')
OR running = 2)
AND groupname = :groupName
PostgreSQL;

protected const string STALE_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE running = 1
AND claimed NOT LIKE 'failed(%)'
AND groupname = :groupName
AND activity <= NOW() - make_interval(secs => :seconds)
PostgreSQL;

protected const string FAILED_COUNT_QUERY = <<<PostgreSQL
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
WHERE claimed LIKE 'failed(%)'
AND groupname = :groupName
PostgreSQL;

}
8 changes: 8 additions & 0 deletions Configuration/Settings.Timeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Netlogix:
JobQueue:
Scheduled:

# Time (in seconds) of inactivity (no updates to the jobs activity timestamp column) after which a job
# is considered stale and needs to be reset (see SchedulerCommandController::resetStaleJobsCommand)
# in order to be picked up again.
staleJobTimeout: 60
Loading