Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ProcessMaker/Console/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ protected function schedule(Schedule $schedule)
{
$schedule->command('bpmn:timer')
->everyMinute()
->onOneServer();
->onOneServer()
->withoutOverlapping(5);
Comment thread
marcoAntonioNina marked this conversation as resolved.
Outdated

$schedule->command('processmaker:sync-recommendations --queue')
->daily()
Expand Down
245 changes: 177 additions & 68 deletions ProcessMaker/Managers/TaskSchedulerManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Schema;
use Illuminate\Support\Str;
use PDOException;
use ProcessMaker\Facades\WorkflowManager;
use ProcessMaker\Jobs\StartEventConditional;
Expand Down Expand Up @@ -133,89 +134,197 @@ private function scheduleTask(
}

/**
* Checks the schedule_tasks table to execute jobs
* Timeout in minutes for stale claimed tasks.
* If a task has been claimed for longer than this, it will be released.
*/
const CLAIM_TIMEOUT_MINUTES = 5;
Comment thread
marcoAntonioNina marked this conversation as resolved.
Outdated

/**
* Checks the schedule_tasks table to execute jobs.
* Uses atomic claim per task to prevent duplicate executions while maintaining
* the original selection logic (nextDate calculation).
*/
public function scheduleTasks()
{
$today = $this->today();
$todayFormatted = $today->format('Y-m-d H:i:s');

try {
/**
* This validation is removed; the database schema should exist before
* any initiation of 'jobs' and 'schedule'.
*
* if (!Schema::hasTable('scheduled_tasks')) {
* return;
* }
*/
$this->removeExpiredLocks();

$tasks = ScheduledTask::cursor();
// 1. Release stale claims (tasks that were claimed but never completed)
$this->releaseStaleClaimedTasks();

// 2. Get candidate tasks using cursor() for memory efficiency
// We filter by unclaimed tasks only, but evaluate nextDate for each
$tasks = ScheduledTask::whereNull('claimed_by')->cursor();

foreach ($tasks as $task) {
Comment thread
caleeli marked this conversation as resolved.
try {
$config = json_decode($task->configuration);

$lastExecution = new DateTime($task->last_execution, new DateTimeZone('UTC'));

if ($lastExecution === null) {
continue;
}
$owner = $task->processRequestToken ?: $task->processRequest ?: $task->process;
$ownerDateTime = $owner?->created_at;
$nextDate = $this->nextDate($today, $config, $lastExecution, $ownerDateTime);

// if no execution date exists we go to the next task
if (empty($nextDate)) {
continue;
}

// Since the task scheduler has a presition of 1 minute (crontab)
// the times must be rounded or trucated to the nearest HH:MM:00 before compare
$method = config('app.timer_events_seconds') . 'DateTime';
$todayWithoutSeconds = $this->$method($today);
$nextDateWithoutSeconds = $this->$method($nextDate);
if ($nextDateWithoutSeconds <= $todayWithoutSeconds) {
switch ($task->type) {
case 'TIMER_START_EVENT':
$this->executeTimerStartEvent($task, $config);
$task->last_execution = $today->format('Y-m-d H:i:s');
$task->save();
break;
case 'INTERMEDIATE_TIMER_EVENT':
$executed = $this->executeIntermediateTimerEvent($task, $config);
$task->last_execution = $today->format('Y-m-d H:i:s');
if ($executed) {
$task->save();
}
break;
case 'BOUNDARY_TIMER_EVENT':
$executed = $this->executeBoundaryTimerEvent($task, $config);
$task->last_execution = $today->format('Y-m-d H:i:s');
if ($executed) {
$task->save();
}
break;
case 'SCHEDULED_JOB':
$this->executeScheduledJob($config);
$task->last_execution = $today->format('Y-m-d H:i:s');
$task->save();
break;
default:
throw new Exception('Unknown timer event: ' . $task->type);
}
}
} catch (\Throwable $ex) {
Log::Error('Failed Scheduled Task: ', [
'Task data' => print_r($task->getAttributes(), true),
'Exception' => $ex->__toString(),
]);
}
$this->processTaskWithAtomicClaim($task, $today, $todayFormatted);
}
} catch (PDOException $e) {
Log::error('The connection to the database had problems (scheduleTasks): ' . $e->getMessage());
}
}

/**
* Release tasks that have been claimed for too long (stale claims).
* This handles cases where a process crashed after claiming tasks.
*/
private function releaseStaleClaimedTasks(): void
{
$staleThreshold = Carbon::now()->subMinutes(self::CLAIM_TIMEOUT_MINUTES);

ScheduledTask::whereNotNull('claimed_by')
->where('claimed_at', '<', $staleThreshold)
->update([
'claimed_by' => null,
'claimed_at' => null,
]);
}

/**
* Process a task with atomic claim to prevent duplicate execution.
* This maintains the original selection logic (nextDate calculation) while
* adding protection against concurrent execution.
*
* @param ScheduledTask $task The task to evaluate and potentially execute
* @param DateTime $today Current datetime
* @param string $todayFormatted Formatted datetime string
*/
private function processTaskWithAtomicClaim(ScheduledTask $task, DateTime $today, string $todayFormatted): void
{
try {
$config = json_decode($task->configuration);
$lastExecution = new DateTime($task->last_execution, new DateTimeZone('UTC'));

if ($lastExecution === null) {
return;
}

$owner = $task->processRequestToken ?: $task->processRequest ?: $task->process;
$ownerDateTime = $owner?->created_at;
$nextDate = $this->nextDate($today, $config, $lastExecution, $ownerDateTime);

// If no execution date exists, skip this task
if (empty($nextDate)) {
return;
}

// Since the task scheduler has a precision of 1 minute (crontab)
// the times must be rounded or truncated to the nearest HH:MM:00 before compare
$method = config('app.timer_events_seconds') . 'DateTime';
$todayWithoutSeconds = $this->$method($today);
$nextDateWithoutSeconds = $this->$method($nextDate);

// Only proceed if the task should execute now
if ($nextDateWithoutSeconds > $todayWithoutSeconds) {
return;
}

// Try to atomically claim this specific task
$claimed = $this->claimTask($task->id, $todayFormatted);

if (!$claimed) {
// Another process already claimed this task, skip it
return;
}

// Re-fetch the task to get fresh data after claiming
$task = ScheduledTask::find($task->id);
if (!$task) {
return;
}

// Execute the task
$this->executeTask($task, $config, $todayFormatted);
} catch (\Throwable $ex) {
Log::error('Failed Scheduled Task: ', [
'Task data' => print_r($task->getAttributes(), true),
'Exception' => $ex->__toString(),
]);
// Release task on error so it can be retried
$this->releaseTask($task);
}
}

/**
* Atomically claim a single task for execution.
* Uses UPDATE with WHERE to ensure only one process can claim it.
*
* @param int $taskId The task ID to claim
* @param string $todayFormatted Current datetime formatted
* @return bool True if successfully claimed, false if already claimed by another process
*/
private function claimTask(int $taskId, string $todayFormatted): bool
{
$claimId = Str::uuid()->toString();

$affected = DB::table('scheduled_tasks')
->where('id', $taskId)
->whereNull('claimed_by')
->update([
'claimed_by' => $claimId,
'claimed_at' => $todayFormatted,
]);

return $affected > 0;
}

/**
* Execute a task based on its type.
*
* @param ScheduledTask $task The task to execute
* @param object $config Task configuration
* @param string $todayFormatted Formatted datetime for last_execution
*/
private function executeTask(ScheduledTask $task, object $config, string $todayFormatted): void
{
$executed = false;

switch ($task->type) {
case 'TIMER_START_EVENT':
$this->executeTimerStartEvent($task, $config);
$executed = true;
break;
case 'INTERMEDIATE_TIMER_EVENT':
$executed = $this->executeIntermediateTimerEvent($task, $config);
break;
case 'BOUNDARY_TIMER_EVENT':
$executed = $this->executeBoundaryTimerEvent($task, $config);
break;
case 'SCHEDULED_JOB':
$this->executeScheduledJob($config);
$executed = true;
break;
default:
throw new Exception('Unknown timer event: ' . $task->type);
}

if ($executed) {
// Update last_execution and release claim
$task->last_execution = $todayFormatted;
$task->claimed_by = null;
$task->claimed_at = null;
$task->save();
} else {
// Release claim without updating last_execution
$this->releaseTask($task);
}
}

/**
* Release a task claim without updating last_execution.
*
* @param ScheduledTask $task The task to release
*/
private function releaseTask(ScheduledTask $task): void
{
$task->claimed_by = null;
$task->claimed_at = null;
$task->save();
}

/**
* Create a scheduled job
*
Expand Down
5 changes: 5 additions & 0 deletions ProcessMaker/Models/ScheduledTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class ScheduledTask extends ProcessMakerModel

protected $fillable = [
'process_id', 'process_request_id', 'process_request_token_id', 'configuration',
'type', 'last_execution', 'claimed_by', 'claimed_at',
];

protected $casts = [
'claimed_at' => 'datetime',
];

public static function rules()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
/**
* Run the migrations.
*
* @return void
*/
public function up()
{
Schema::table('scheduled_tasks', function (Blueprint $table) {
$table->string('claimed_by', 36)->nullable()->after('configuration');
$table->dateTime('claimed_at')->nullable()->after('claimed_by');

// Index for faster queries when claiming tasks
$table->index(['claimed_by', 'claimed_at']);
});
}

/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::table('scheduled_tasks', function (Blueprint $table) {
$table->dropIndex(['claimed_by', 'claimed_at']);
$table->dropColumn(['claimed_by', 'claimed_at']);
});
}
};
Loading
Loading