Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
3.2.0
=====

* (feature) Add `TaskScheduler`.
* (improvement) Update task ULID when redispatching in the scheduler.
* (improvement) Require PHP 8.5+


3.1.1
=====

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
],
"homepage": "https://github.com/21TORR/TaskManagerBundle",
"require": {
"php": ">= 8.4",
"php": ">= 8.5",
"21torr/bundle-helpers": "^2.3.1",
"21torr/cli": "^1.2.3",
"21torr/hosting": "^4.1.1",
Expand Down
6 changes: 5 additions & 1 deletion src/Entity/TaskLog.php
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ public function createRun (?LoggerInterface $logger = null) : TaskRun
{
if ($this->isSuccess())
{
throw new InvalidLogActionException("Can't start a run for a task #{$this->id} that is already finished.");
throw new InvalidLogActionException(\sprintf(
"Can't start a run for a task #%s (task id: '%s') that is already finished.",
$this->id,
$this->taskId,
));
}

$run = new TaskRun($this, $logger);
Expand Down
24 changes: 14 additions & 10 deletions src/Listener/MessengerEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Torr\TaskManager\Entity\TaskLog;
use Torr\TaskManager\Model\TaskLogModel;
use Torr\TaskManager\Normalizer\TaskDetailsNormalizer;
use Torr\TaskManager\Task\DispatchAfterRunTask\DispatchAfterRunTask;
use Torr\TaskManager\Task\Task;

/**
Expand All @@ -25,16 +26,17 @@ public function __construct (
/**
*
*/
#[AsEventListener(SendMessageToTransportsEvent::class)]
#[AsEventListener]
public function onSendMessageToTransports (SendMessageToTransportsEvent $event) : void
{
$taskLog = $this->getLogForEvent($event->getEnvelope());
$envelope = $event->getEnvelope();
$taskLog = $this->getLogForEvent($envelope);

// make sure that the log entry is created and flushed
if (null !== $taskLog)
{
// update envelope with current version
$taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope()));
$taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope));

$this->logModel->flush();
}
Expand All @@ -43,18 +45,19 @@ public function onSendMessageToTransports (SendMessageToTransportsEvent $event)
/**
* Automatically integrate
*/
#[AsEventListener(WorkerMessageHandledEvent::class)]
#[AsEventListener]
public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void
{
$taskLog = $this->getLogForEvent($event->getEnvelope());
$envelope = $event->getEnvelope();
$taskLog = $this->getLogForEvent($envelope);

if (null === $taskLog)
{
return;
}

// update envelope with current version
$taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope()));
$taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope));

// abort run as success. It wasn't marked as finished manually, but it succeeded nonetheless.
$run = $taskLog->getLastUnfinishedRun();
Expand All @@ -63,18 +66,19 @@ public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void
$this->logModel->flush();
}

#[AsEventListener(WorkerMessageFailedEvent::class)]
#[AsEventListener]
public function onWorkerMessageFailed (WorkerMessageFailedEvent $event) : void
{
$taskLog = $this->getLogForEvent($event->getEnvelope());
$envelope = $event->getEnvelope();
$taskLog = $this->getLogForEvent($envelope);

if (null === $taskLog)
{
return;
}

// update envelope with current version
$taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope()));
$taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope));

// abort run as failure. It wasn't marked as finished manually and it failed.
$run = $taskLog->getLastUnfinishedRun();
Expand All @@ -90,7 +94,7 @@ private function getLogForEvent (Envelope $envelope) : ?TaskLog
{
$message = $envelope->getMessage();

return $message instanceof Task
return $message instanceof Task && !$message instanceof DispatchAfterRunTask
? $this->logModel->getLogForTask($message)
: null;
}
Expand Down
32 changes: 32 additions & 0 deletions src/Schedule/TaskScheduler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php declare(strict_types=1);

namespace Torr\TaskManager\Schedule;

use Symfony\Component\Lock\LockFactory;
use Symfony\Contracts\Cache\CacheInterface;

/**
* @final
*
* @api
*/
readonly class TaskScheduler
{
private const string LOCK_KEY = "task-manager.scheduler.lock";

public function __construct (
private CacheInterface $cache,
private LockFactory $lockFactory,
) {}

/**
*
*/
public function createSchedule () : WrappedSchedule
{
return new WrappedSchedule(
$this->cache,
$this->lockFactory->createLock(self::LOCK_KEY),
);
}
}
86 changes: 86 additions & 0 deletions src/Schedule/WrappedSchedule.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php declare(strict_types=1);

namespace Torr\TaskManager\Schedule;

use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Scheduler\RecurringMessage;
use Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\ScheduleProviderInterface;
use Symfony\Contracts\Cache\CacheInterface;
use Torr\TaskManager\Task\DispatchAfterRunTask\DispatchAfterRunTask;
use Torr\TaskManager\Task\Task;

/**
* Wrapper around Symfony's Schedule with sensible defaults
*
* @final
*
* @internal
*/
readonly class WrappedSchedule implements ScheduleProviderInterface
{
/**
*
*/
private Schedule $schedule;

/**
*/
public function __construct (
CacheInterface $cache,
LockInterface $lock,
)
{
$this->schedule = new Schedule()
->lock($lock)
->stateful($cache)
->processOnlyLastMissedRun(true);
}

/**
* Adds a cron task
*/
public function cron (
string $cronExpression,
Task $task,
\DateTimeZone|string|null $timezone = null,
) : static
{
$this->schedule->add(RecurringMessage::cron(
$cronExpression,
new DispatchAfterRunTask($task),
$timezone,
));

return $this;
}

/**
* Adds a cron task
*/
public function every (
string|int|\DateInterval $frequency,
Task $task,
string|\DateTimeImmutable|null $from = null,
string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01'),
) : static
{
$this->schedule->add(RecurringMessage::every(
$frequency,
new DispatchAfterRunTask($task),
$from,
$until,
));

return $this;
}

/**
*
*/
#[\Override]
public function getSchedule () : Schedule
{
return $this->schedule->getSchedule();
}
}
18 changes: 12 additions & 6 deletions src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

namespace Torr\TaskManager\Task\DispatchAfterRunTask;

use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
use Torr\TaskManager\Director\TaskDirector;
use Torr\Cli\Console\Style\TorrStyle;
use Torr\TaskManager\Manager\TaskManager;

/**
Expand All @@ -16,7 +19,6 @@
*/
public function __construct (
private TaskManager $taskManager,
private TaskDirector $taskDirector,
) {}

/**
Expand All @@ -25,9 +27,13 @@ public function __construct (
#[AsMessageHandler]
public function onDispatchAfterRunTask (DispatchAfterRunTask $task) : void
{
$run = $this->taskDirector->startRun($task);
// DispatchAfterRun message should not use the task director, as we can't recreate the task ulid.
$io = new TorrStyle(
new ArrayInput([]),
new ConsoleOutput(OutputInterface::VERBOSITY_NORMAL, true),
);

$run->io->writeln(\sprintf(
$io->writeln(\sprintf(
"Redispatching task <fg=yellow>%s</>",
$task->task::class,
));
Expand All @@ -36,7 +42,7 @@ public function onDispatchAfterRunTask (DispatchAfterRunTask $task) : void
? [new TransportNamesStamp($task->transportNames)]
: [];

$this->taskManager->enqueue($task->task, $stamps);
$run->finish(success: true);
$wrappedTask = $task->task->withNewTaskUlid();
$this->taskManager->enqueue($wrappedTask, $stamps);
}
}
12 changes: 11 additions & 1 deletion src/Task/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
public function __construct ()
{
$this->ulid = (new Ulid())->toBase58();
$this->ulid = new Ulid()->toBase58();
}

/**
Expand All @@ -25,4 +25,14 @@ public function __construct ()
* serialized messages as well.
*/
abstract public function getMetaData () : TaskMetaData;

/**
*
*/
public function withNewTaskUlid () : static
{
return clone($this, [
"ulid" => new Ulid()->toBase58(),
]);
}
}
35 changes: 35 additions & 0 deletions tests/Task/TaskTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php declare(strict_types=1);

namespace Tests\Torr\TaskManager\Task;

use PHPUnit\Framework\TestCase;
use Torr\TaskManager\Task\Task;
use Torr\TaskManager\Task\TaskMetaData;

/**
* @internal
*/
final class TaskTest extends TestCase
{
/**
*
*/
public function testNewTaskUlid () : void
{
// @phpstan-ignore-next-line 21torr.custom.task.suffix
$task = new readonly class() extends Task {
/**
*
*/
#[\Override]
public function getMetaData () : TaskMetaData
{
return new TaskMetaData("Test");
}
};

$initialUlid = $task->ulid;
$newTask = $task->withNewTaskUlid();
self::assertNotSame($initialUlid, $newTask->ulid, "Task ULID should change on PHP 8.4");
}
}