From 1a19bddadc2ab91a4e4d916f05f856daaedd2535 Mon Sep 17 00:00:00 2001 From: Jannik Zschiesche Date: Mon, 23 Feb 2026 12:29:55 +0100 Subject: [PATCH 1/5] Add TaskScheduler --- src/Schedule/InternalTaskManagerSchedule.php | 86 ++++++++++++++++++++ src/Schedule/TaskScheduler.php | 32 ++++++++ 2 files changed, 118 insertions(+) create mode 100644 src/Schedule/InternalTaskManagerSchedule.php create mode 100644 src/Schedule/TaskScheduler.php diff --git a/src/Schedule/InternalTaskManagerSchedule.php b/src/Schedule/InternalTaskManagerSchedule.php new file mode 100644 index 0000000..35df0cc --- /dev/null +++ b/src/Schedule/InternalTaskManagerSchedule.php @@ -0,0 +1,86 @@ +schedule = new Schedule() + ->lock($lock) + ->stateful($cache) + ->processOnlyLastMissedRun(true); + } + + /** + * Adds a cron task + */ + public function cron ( + string $cronExpression, + Task $task, + \DateTimeZone|string|null $timezone = null, + ) : static + { + $this->schedule->add(RecurringMessage::cron( + $cronExpression, + new DispatchAfterRunTask($task), + $timezone, + )); + + return $this; + } + + /** + * Adds a cron task + */ + public function every ( + string|int|\DateInterval $frequency, + Task $task, + string|\DateTimeImmutable|null $from = null, + string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01'), + ) : static + { + $this->schedule->add(RecurringMessage::every( + $frequency, + new DispatchAfterRunTask($task), + $from, + $until, + )); + + return $this; + } + + /** + * + */ + #[\Override] + public function getSchedule () : Schedule + { + return $this->schedule->getSchedule(); + } +} diff --git a/src/Schedule/TaskScheduler.php b/src/Schedule/TaskScheduler.php new file mode 100644 index 0000000..b8d94e5 --- /dev/null +++ b/src/Schedule/TaskScheduler.php @@ -0,0 +1,32 @@ +cache, + $this->lockFactory->createLock(self::LOCK_KEY), + ); + } +} From 3d5d0143a1041fb9a7e7fea0406d61e847a4cb5e Mon Sep 17 00:00:00 2001 From: Jannik Zschiesche Date: Mon, 23 Feb 2026 13:43:20 +0100 Subject: [PATCH 2/5] Add withNewTaskUlid() to task --- src/Task/Task.php | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/Task/Task.php b/src/Task/Task.php index f3bd46f..720377b 100644 --- a/src/Task/Task.php +++ b/src/Task/Task.php @@ -25,4 +25,20 @@ public function __construct () * serialized messages as well. */ abstract public function getMetaData () : TaskMetaData; + + /** + * + */ + public function withNewTaskUlid () : static + { + if (\function_exists("clone") && \PHP_VERSION_ID >= 80500) + { + return clone($this, [ + "ulid" => (new Ulid())->toBase58() + ]); + } + + // @todo remove fallback + if above, when PHP 8.5 is required + return $this; + } } From aaa2a64f646a48e3eff03976681c34f49d4d7d95 Mon Sep 17 00:00:00 2001 From: Jannik Zschiesche Date: Mon, 23 Feb 2026 13:43:36 +0100 Subject: [PATCH 3/5] Add task scheduler --- src/Schedule/TaskScheduler.php | 4 ++-- .../{InternalTaskManagerSchedule.php => WrappedSchedule.php} | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) rename src/Schedule/{InternalTaskManagerSchedule.php => WrappedSchedule.php} (93%) diff --git a/src/Schedule/TaskScheduler.php b/src/Schedule/TaskScheduler.php index b8d94e5..38a546a 100644 --- a/src/Schedule/TaskScheduler.php +++ b/src/Schedule/TaskScheduler.php @@ -22,9 +22,9 @@ public function __construct ( /** * */ - public function createSchedule () : InternalTaskManagerSchedule + public function createSchedule () : WrappedSchedule { - return new InternalTaskManagerSchedule( + return new WrappedSchedule( $this->cache, $this->lockFactory->createLock(self::LOCK_KEY), ); diff --git a/src/Schedule/InternalTaskManagerSchedule.php b/src/Schedule/WrappedSchedule.php similarity index 93% rename from src/Schedule/InternalTaskManagerSchedule.php rename to src/Schedule/WrappedSchedule.php index 35df0cc..606df31 100644 --- a/src/Schedule/InternalTaskManagerSchedule.php +++ b/src/Schedule/WrappedSchedule.php @@ -17,7 +17,7 @@ * * @internal */ -readonly class InternalTaskManagerSchedule implements ScheduleProviderInterface +readonly class WrappedSchedule implements ScheduleProviderInterface { /** * @@ -81,6 +81,7 @@ public function every ( #[\Override] public function getSchedule () : Schedule { + dump("get internal schedule"); return $this->schedule->getSchedule(); } } From 231622ba683afc1607d557dc1bf5da16ec8cf188 Mon Sep 17 00:00:00 2001 From: Jannik Zschiesche Date: Mon, 23 Feb 2026 13:43:44 +0100 Subject: [PATCH 4/5] Add more context in error message --- src/Entity/TaskLog.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Entity/TaskLog.php b/src/Entity/TaskLog.php index 78b7310..e7bbfb4 100644 --- a/src/Entity/TaskLog.php +++ b/src/Entity/TaskLog.php @@ -156,7 +156,11 @@ public function createRun (?LoggerInterface $logger = null) : TaskRun { if ($this->isSuccess()) { - throw new InvalidLogActionException("Can't start a run for a task #{$this->id} that is already finished."); + throw new InvalidLogActionException(\sprintf( + "Can't start a run for a task #%s (task id: '%s') that is already finished.", + $this->id, + $this->taskId, + )); } $run = new TaskRun($this, $logger); From a4beab31966b3595c62ee60eda08e55600e2b00b Mon Sep 17 00:00:00 2001 From: Jannik Zschiesche Date: Mon, 23 Feb 2026 13:44:28 +0100 Subject: [PATCH 5/5] Recreate task ulid when redispatching --- CHANGELOG.md | 8 +++++ composer.json | 2 +- src/Listener/MessengerEventListener.php | 24 +++++++------ src/Schedule/WrappedSchedule.php | 1 - .../DispatchAfterRunTaskHandler.php | 18 ++++++---- src/Task/Task.php | 14 +++----- tests/Task/TaskTest.php | 35 +++++++++++++++++++ 7 files changed, 74 insertions(+), 28 deletions(-) create mode 100644 tests/Task/TaskTest.php diff --git a/CHANGELOG.md b/CHANGELOG.md index b71023b..8bbb68e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +3.2.0 +===== + +* (feature) Add `TaskScheduler`. +* (improvement) Update task ULID when redispatching in the scheduler. +* (improvement) Require PHP 8.5+ + + 3.1.1 ===== diff --git a/composer.json b/composer.json index 627262a..570cad4 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,7 @@ ], "homepage": "https://github.com/21TORR/TaskManagerBundle", "require": { - "php": ">= 8.4", + "php": ">= 8.5", "21torr/bundle-helpers": "^2.3.1", "21torr/cli": "^1.2.3", "21torr/hosting": "^4.1.1", diff --git a/src/Listener/MessengerEventListener.php b/src/Listener/MessengerEventListener.php index fbd479a..ab7f09f 100644 --- a/src/Listener/MessengerEventListener.php +++ b/src/Listener/MessengerEventListener.php @@ -10,6 +10,7 @@ use Torr\TaskManager\Entity\TaskLog; use Torr\TaskManager\Model\TaskLogModel; use Torr\TaskManager\Normalizer\TaskDetailsNormalizer; +use Torr\TaskManager\Task\DispatchAfterRunTask\DispatchAfterRunTask; use Torr\TaskManager\Task\Task; /** @@ -25,16 +26,17 @@ public function __construct ( /** * */ - #[AsEventListener(SendMessageToTransportsEvent::class)] + #[AsEventListener] public function onSendMessageToTransports (SendMessageToTransportsEvent $event) : void { - $taskLog = $this->getLogForEvent($event->getEnvelope()); + $envelope = $event->getEnvelope(); + $taskLog = $this->getLogForEvent($envelope); // make sure that the log entry is created and flushed if (null !== $taskLog) { // update envelope with current version - $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope())); + $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope)); $this->logModel->flush(); } @@ -43,10 +45,11 @@ public function onSendMessageToTransports (SendMessageToTransportsEvent $event) /** * Automatically integrate */ - #[AsEventListener(WorkerMessageHandledEvent::class)] + #[AsEventListener] public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void { - $taskLog = $this->getLogForEvent($event->getEnvelope()); + $envelope = $event->getEnvelope(); + $taskLog = $this->getLogForEvent($envelope); if (null === $taskLog) { @@ -54,7 +57,7 @@ public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void } // update envelope with current version - $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope())); + $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope)); // abort run as success. It wasn't marked as finished manually, but it succeeded nonetheless. $run = $taskLog->getLastUnfinishedRun(); @@ -63,10 +66,11 @@ public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void $this->logModel->flush(); } - #[AsEventListener(WorkerMessageFailedEvent::class)] + #[AsEventListener] public function onWorkerMessageFailed (WorkerMessageFailedEvent $event) : void { - $taskLog = $this->getLogForEvent($event->getEnvelope()); + $envelope = $event->getEnvelope(); + $taskLog = $this->getLogForEvent($envelope); if (null === $taskLog) { @@ -74,7 +78,7 @@ public function onWorkerMessageFailed (WorkerMessageFailedEvent $event) : void } // update envelope with current version - $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($event->getEnvelope())); + $taskLog->setTaskDetails($this->detailsNormalizer->normalizeTaskDetails($envelope)); // abort run as failure. It wasn't marked as finished manually and it failed. $run = $taskLog->getLastUnfinishedRun(); @@ -90,7 +94,7 @@ private function getLogForEvent (Envelope $envelope) : ?TaskLog { $message = $envelope->getMessage(); - return $message instanceof Task + return $message instanceof Task && !$message instanceof DispatchAfterRunTask ? $this->logModel->getLogForTask($message) : null; } diff --git a/src/Schedule/WrappedSchedule.php b/src/Schedule/WrappedSchedule.php index 606df31..0f0c368 100644 --- a/src/Schedule/WrappedSchedule.php +++ b/src/Schedule/WrappedSchedule.php @@ -81,7 +81,6 @@ public function every ( #[\Override] public function getSchedule () : Schedule { - dump("get internal schedule"); return $this->schedule->getSchedule(); } } diff --git a/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php b/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php index cc9df62..21611cd 100644 --- a/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php +++ b/src/Task/DispatchAfterRunTask/DispatchAfterRunTaskHandler.php @@ -2,9 +2,12 @@ namespace Torr\TaskManager\Task\DispatchAfterRunTask; +use Symfony\Component\Console\Input\ArrayInput; +use Symfony\Component\Console\Output\ConsoleOutput; +use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Messenger\Attribute\AsMessageHandler; use Symfony\Component\Messenger\Stamp\TransportNamesStamp; -use Torr\TaskManager\Director\TaskDirector; +use Torr\Cli\Console\Style\TorrStyle; use Torr\TaskManager\Manager\TaskManager; /** @@ -16,7 +19,6 @@ */ public function __construct ( private TaskManager $taskManager, - private TaskDirector $taskDirector, ) {} /** @@ -25,9 +27,13 @@ public function __construct ( #[AsMessageHandler] public function onDispatchAfterRunTask (DispatchAfterRunTask $task) : void { - $run = $this->taskDirector->startRun($task); + // DispatchAfterRun message should not use the task director, as we can't recreate the task ulid. + $io = new TorrStyle( + new ArrayInput([]), + new ConsoleOutput(OutputInterface::VERBOSITY_NORMAL, true), + ); - $run->io->writeln(\sprintf( + $io->writeln(\sprintf( "Redispatching task %s", $task->task::class, )); @@ -36,7 +42,7 @@ public function onDispatchAfterRunTask (DispatchAfterRunTask $task) : void ? [new TransportNamesStamp($task->transportNames)] : []; - $this->taskManager->enqueue($task->task, $stamps); - $run->finish(success: true); + $wrappedTask = $task->task->withNewTaskUlid(); + $this->taskManager->enqueue($wrappedTask, $stamps); } } diff --git a/src/Task/Task.php b/src/Task/Task.php index 720377b..46aa8d8 100644 --- a/src/Task/Task.php +++ b/src/Task/Task.php @@ -15,7 +15,7 @@ */ public function __construct () { - $this->ulid = (new Ulid())->toBase58(); + $this->ulid = new Ulid()->toBase58(); } /** @@ -31,14 +31,8 @@ abstract public function getMetaData () : TaskMetaData; */ public function withNewTaskUlid () : static { - if (\function_exists("clone") && \PHP_VERSION_ID >= 80500) - { - return clone($this, [ - "ulid" => (new Ulid())->toBase58() - ]); - } - - // @todo remove fallback + if above, when PHP 8.5 is required - return $this; + return clone($this, [ + "ulid" => new Ulid()->toBase58(), + ]); } } diff --git a/tests/Task/TaskTest.php b/tests/Task/TaskTest.php new file mode 100644 index 0000000..276dc0b --- /dev/null +++ b/tests/Task/TaskTest.php @@ -0,0 +1,35 @@ +ulid; + $newTask = $task->withNewTaskUlid(); + self::assertNotSame($initialUlid, $newTask->ulid, "Task ULID should change on PHP 8.4"); + } +}