diff --git a/CHANGELOG.md b/CHANGELOG.md index b71023b..8bbb68e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +3.2.0 +===== + +* (feature) Add `TaskScheduler`. +* (improvement) Update task ULID when redispatching in the scheduler. +* (improvement) Require PHP 8.5+ + + 3.1.1 ===== diff --git a/composer.json b/composer.json index 627262a..570cad4 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,7 @@ ], "homepage": "https://github.com/21TORR/TaskManagerBundle", "require": { - "php": ">= 8.4", + "php": ">= 8.5", "21torr/bundle-helpers": "^2.3.1", "21torr/cli": "^1.2.3", "21torr/hosting": "^4.1.1", diff --git a/src/Entity/TaskLog.php b/src/Entity/TaskLog.php index 78b7310..e7bbfb4 100644 --- a/src/Entity/TaskLog.php +++ b/src/Entity/TaskLog.php @@ -156,7 +156,11 @@ public function createRun (?LoggerInterface $logger = null) : TaskRun { if ($this->isSuccess()) { - throw new InvalidLogActionException("Can't start a run for a task #{$this->id} that is already finished."); + throw new InvalidLogActionException(\sprintf( + "Can't start a run for a task #%s (task id: '%s') that is already finished.", + $this->id, + $this->taskId, + )); } $run = new TaskRun($this, $logger); diff --git a/src/Listener/MessengerEventListener.php b/src/Listener/MessengerEventListener.php index fbd479a..ab7f09f 100644 --- a/src/Listener/MessengerEventListener.php +++ b/src/Listener/MessengerEventListener.php @@ -10,6 +10,7 @@ use Torr\TaskManager\Entity\TaskLog; use Torr\TaskManager\Model\TaskLogModel; use Torr\TaskManager\Normalizer\TaskDetailsNormalizer; +use Torr\TaskManager\Task\DispatchAfterRunTask\DispatchAfterRunTask; use Torr\TaskManager\Task\Task; /** @@ -25,16 +26,17 @@ public function __construct ( /** * */ - #[AsEventListener(SendMessageToTransportsEvent::class)] + #[AsEventListener] public function onSendMessageToTransports (SendMessageToTransportsEvent $event) : void { - $taskLog = $this->getLogForEvent($event->getEnvelope()); + $envelope = $event->getEnvelope(); + $taskLog = $this->getLogForEvent($envelope); // make sure that the log entry is created and flushed if (null !== $taskLog) { // update envelope with current version - $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope())); + $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope)); $this->logModel->flush(); } @@ -43,10 +45,11 @@ public function onSendMessageToTransports (SendMessageToTransportsEvent $event) /** * Automatically integrate */ - #[AsEventListener(WorkerMessageHandledEvent::class)] + #[AsEventListener] public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void { - $taskLog = $this->getLogForEvent($event->getEnvelope()); + $envelope = $event->getEnvelope(); + $taskLog = $this->getLogForEvent($envelope); if (null === $taskLog) { @@ -54,7 +57,7 @@ public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void } // update envelope with current version - $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope())); + $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope)); // abort run as success. It wasn't marked as finished manually, but it succeeded nonetheless. $run = $taskLog->getLastUnfinishedRun(); @@ -63,10 +66,11 @@ public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void $this->logModel->flush(); } - #[AsEventListener(WorkerMessageFailedEvent::class)] + #[AsEventListener] public function onWorkerMessageFailed (WorkerMessageFailedEvent $event) : void { - $taskLog = $this->getLogForEvent($event->getEnvelope()); + $envelope = $event->getEnvelope(); + $taskLog = $this->getLogForEvent($envelope); if (null === $taskLog) { @@ -74,7 +78,7 @@ public function onWorkerMessageFailed (WorkerMessageFailedEvent $event) : void } // update envelope with current version - $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope())); + $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope)); // abort run as failure. It wasn't marked as finished manually and it failed. $run = $taskLog->getLastUnfinishedRun(); @@ -90,7 +94,7 @@ private function getLogForEvent (Envelope $envelope) : ?TaskLog { $message = $envelope->getMessage(); - return $message instanceof Task + return $message instanceof Task && !$message instanceof DispatchAfterRunTask ? $this->logModel->getLogForTask($message) : null; } diff --git a/src/Schedule/TaskScheduler.php b/src/Schedule/TaskScheduler.php new file mode 100644 index 0000000..38a546a --- /dev/null +++ b/src/Schedule/TaskScheduler.php @@ -0,0 +1,32 @@ +cache, + $this->lockFactory->createLock(self::LOCK_KEY), + ); + } +} diff --git a/src/Schedule/WrappedSchedule.php b/src/Schedule/WrappedSchedule.php new file mode 100644 index 0000000..0f0c368 --- /dev/null +++ b/src/Schedule/WrappedSchedule.php @@ -0,0 +1,86 @@ +schedule = new Schedule() + ->lock($lock) + ->stateful($cache) + ->processOnlyLastMissedRun(true); + } + + /** + * Adds a cron task + */ + public function cron ( + string $cronExpression, + Task $task, + \DateTimeZone|string|null $timezone = null, + ) : static + { + $this->schedule->add(RecurringMessage::cron( + $cronExpression, + new DispatchAfterRunTask($task), + $timezone, + )); + + return $this; + } + + /** + * Adds a cron task + */ + public function every ( + string|int|\DateInterval $frequency, + Task $task, + string|\DateTimeImmutable|null $from = null, + string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01'), + ) : static + { + $this->schedule->add(RecurringMessage::every( + $frequency, + new DispatchAfterRunTask($task), + $from, + $until, + )); + + return $this; + } + + /** + * + */ + #[\Override] + public function getSchedule () : Schedule + { + return $this->schedule->getSchedule(); + } +} diff --git a/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php b/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php index cc9df62..21611cd 100644 --- a/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php +++ b/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php @@ -2,9 +2,12 @@ namespace Torr\TaskManager\Task\DispatchAfterRunTask; +use Symfony\Component\Console\Input\ArrayInput; +use Symfony\Component\Console\Output\ConsoleOutput; +use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Messenger\Attribute\AsMessageHandler; use Symfony\Component\Messenger\Stamp\TransportNamesStamp; -use Torr\TaskManager\Director\TaskDirector; +use Torr\Cli\Console\Style\TorrStyle; use Torr\TaskManager\Manager\TaskManager; /** @@ -16,7 +19,6 @@ */ public function __construct ( private TaskManager $taskManager, - private TaskDirector $taskDirector, ) {} /** @@ -25,9 +27,13 @@ public function __construct ( #[AsMessageHandler] public function onDispatchAfterRunTask (DispatchAfterRunTask $task) : void { - $run = $this->taskDirector->startRun($task); + // DispatchAfterRun message should not use the task director, as we can't recreate the task ulid. + $io = new TorrStyle( + new ArrayInput([]), + new ConsoleOutput(OutputInterface::VERBOSITY_NORMAL, true), + ); - $run->io->writeln(\sprintf( + $io->writeln(\sprintf( "Redispatching task %s", $task->task::class, )); @@ -36,7 +42,7 @@ public function onDispatchAfterRunTask (DispatchAfterRunTask $task) : void ? [new TransportNamesStamp($task->transportNames)] : []; - $this->taskManager->enqueue($task->task, $stamps); - $run->finish(success: true); + $wrappedTask = $task->task->withNewTaskUlid(); + $this->taskManager->enqueue($wrappedTask, $stamps); } } diff --git a/src/Task/Task.php b/src/Task/Task.php index f3bd46f..46aa8d8 100644 --- a/src/Task/Task.php +++ b/src/Task/Task.php @@ -15,7 +15,7 @@ */ public function __construct () { - $this->ulid = (new Ulid())->toBase58(); + $this->ulid = new Ulid()->toBase58(); } /** @@ -25,4 +25,14 @@ public function __construct () * serialized messages as well. */ abstract public function getMetaData () : TaskMetaData; + + /** + * + */ + public function withNewTaskUlid () : static + { + return clone($this, [ + "ulid" => new Ulid()->toBase58(), + ]); + } } diff --git a/tests/Task/TaskTest.php b/tests/Task/TaskTest.php new file mode 100644 index 0000000..276dc0b --- /dev/null +++ b/tests/Task/TaskTest.php @@ -0,0 +1,35 @@ +ulid; + $newTask = $task->withNewTaskUlid(); + self::assertNotSame($initialUlid, $newTask->ulid, "Task ULID should change on PHP 8.4"); + } +}