diff --git a/CHANGELOG.md b/CHANGELOG.md index d3f3a6e..e3c154d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +2.0.0 +===== + +* (bc) **This bundle now requires that all your messages extend from the `Task` base class.** +* (bc) Change signature of `TaskManager::enqueue()` to only accept tasks. +* (bc) Remove `RegisterTasksEvent::registerTask()` and replace it with `RegisterTasksEvent::register(Task $task)`. +* (feature) Add `TaskDirector` and `RunDirector` to better integrate and log your task runs. +* (feature) Add native `Task` event that encapsulates commonly used logic. + + 1.3.1 ===== diff --git a/UPGRADE.md b/UPGRADE.md new file mode 100644 index 0000000..fff9988 --- /dev/null +++ b/UPGRADE.md @@ -0,0 +1,6 @@ +1.x to 2.0 +========== + +* All your messages need to extend `Task` now. +* `TaskManager::enqueue()` now only accepts `Task`s. +* `RegisterTasksEvent::registerTask()` was removed, you should use `RegisterTasksEvent::register(Task $task)`. diff --git a/composer.json b/composer.json index 0f2506a..7d6d7d2 100644 --- a/composer.json +++ b/composer.json @@ -11,22 +11,25 @@ ], "homepage": "https://github.com/21TORR/TaskManagerBundle", "require": { - "php": ">= 8.1", + "php": ">= 8.3", "21torr/bundle-helpers": "^2.1", "21torr/cli": "^1.0", - "symfony/config": "^6.4 || ^7.0", - "symfony/console": "^6.4 || ^7.0", - "symfony/dependency-injection": "^6.4 || ^7.0", + "doctrine/orm": "^2.19", + "symfony/clock": "^7.0", + "symfony/config": "^7.0", + "symfony/console": "^7.0", + "symfony/dependency-injection": "^7.0", "symfony/event-dispatcher-contracts": "^3.4", - "symfony/http-kernel": "^6.4 || ^7.0", - "symfony/messenger": "^6.4 || ^7.0", - "symfony/string": "^6.4 || ^7.0" + "symfony/http-kernel": "^7.0", + "symfony/messenger": "^7.0", + "symfony/string": "^7.0", + "symfony/uid": "^7.0" }, "require-dev": { - "21torr/janus": "^1.2", + "21torr/janus": "^1.3.3", "bamarni/composer-bin-plugin": "^1.8", "roave/security-advisories": "dev-latest", - "symfony/phpunit-bridge": "^6.4 || ^7.0" + "symfony/phpunit-bridge": "^7.0" }, "autoload": { "psr-4": { @@ -52,16 +55,16 @@ }, "scripts": { "fix-lint": [ - "vendor-bin/cs-fixer/vendor/bin/php-cs-fixer fix --diff --config vendor-bin/cs-fixer/vendor/21torr/php-cs-fixer/.php-cs-fixer.dist.php --no-interaction --ansi", - "@composer bin c-norm normalize \"$(pwd)/composer.json\" --indent-style tab --indent-size 1 --ansi" + "@composer bin c-norm normalize \"$(pwd)/composer.json\" --indent-style tab --indent-size 1 --ansi", + "vendor-bin/cs-fixer/vendor/bin/php-cs-fixer fix --diff --config vendor-bin/cs-fixer/vendor/21torr/php-cs-fixer/.php-cs-fixer.dist.php --no-interaction --ansi" ], "lint": [ "@composer bin c-norm normalize \"$(pwd)/composer.json\" --indent-style tab --indent-size 1 --dry-run --ansi", - "vendor-bin/cs-fixer/vendor/bin/php-cs-fixer fix --diff --config vendor-bin/cs-fixer/vendor/21torr/php-cs-fixer/.php-cs-fixer.dist.php --dry-run --no-interaction --ansi" + "vendor-bin/cs-fixer/vendor/bin/php-cs-fixer check --diff --config vendor-bin/cs-fixer/vendor/21torr/php-cs-fixer/.php-cs-fixer.dist.php --no-interaction --ansi" ], "test": [ "vendor/bin/simple-phpunit", - "vendor-bin/phpstan/vendor/bin/phpstan analyze -c phpstan.neon . --ansi" + "vendor-bin/phpstan/vendor/bin/phpstan analyze -c phpstan.neon . --ansi -v" ] } -} +} \ No newline at end of file diff --git a/phpstan.neon b/phpstan.neon index 8ca1552..cd0577a 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,2 +1,8 @@ includes: - - vendor/21torr/janus/phpstan/lib.neon + - vendor/21torr/janus/phpstan/lib.neon + +# If you use simple-phpunit, you need to uncomment the following line. +# Always make sure to first run simple-phpunit and then PHPStan. +# parameters: +# bootstrapFiles: +# - vendor/bin/.phpunit/phpunit/vendor/autoload.php diff --git a/src/Command/ListTasksCommand.php b/src/Command/ListTasksCommand.php index 486b144..178ea43 100644 --- a/src/Command/ListTasksCommand.php +++ b/src/Command/ListTasksCommand.php @@ -55,6 +55,7 @@ protected function execute (InputInterface $input, OutputInterface $output) : in foreach ($tasks as $task) { $row = []; + $metaData = $task->getMetaData(); if ($first) { @@ -67,17 +68,16 @@ protected function execute (InputInterface $input, OutputInterface $output) : in $first = false; } - $row[] = \sprintf( + $row[] = sprintf( "%s", - $task->key, + $metaData->getKey(), ); - $row[] = $task->label; - $row[] = \get_class($task->task); + $row[] = $metaData->label; + $row[] = $task::class; $rows[] = $row; } } - $io->table( headers: [ "Group", @@ -85,7 +85,6 @@ protected function execute (InputInterface $input, OutputInterface $output) : in "Name", "Task Class", ], - // @phpstan-ignore-next-line This is fine and the type was fixed in newer versions of the CLI bundle rows: $rows, ); diff --git a/src/Command/QueueTasksCommand.php b/src/Command/QueueTasksCommand.php index 48386b2..6f4c9f3 100644 --- a/src/Command/QueueTasksCommand.php +++ b/src/Command/QueueTasksCommand.php @@ -11,8 +11,8 @@ use Torr\TaskManager\Exception\Registry\UnknownTaskKeyException; use Torr\TaskManager\Manager\TaskManager; use Torr\TaskManager\Receiver\ReceiverHelper; -use Torr\TaskManager\Registry\Data\Task; use Torr\TaskManager\Registry\TaskRegistry; +use Torr\TaskManager\Task\Task; #[AsCommand("task-manager:queue")] final class QueueTasksCommand extends Command @@ -58,11 +58,11 @@ protected function execute (InputInterface $input, OutputInterface $output) : in catch (UnknownTaskKeyException $exception) { $io->error($exception->getMessage()); + return self::FAILURE; } - - $io->comment(\sprintf( + $io->comment(sprintf( "Queuing %d task%s", \count($tasksToQueue), 1 !== \count($tasksToQueue) ? "s" : "", @@ -70,14 +70,15 @@ protected function execute (InputInterface $input, OutputInterface $output) : in foreach ($tasksToQueue as $task) { - $io->writeln(\sprintf( + $io->writeln(sprintf( "• Queuing task %s", $this->formatTaskLabel($task), )); - $this->taskManager->enqueue($task->task); + $this->taskManager->enqueue($task); } $io->success("All done."); + return self::SUCCESS; } @@ -119,7 +120,7 @@ private function getTasksToQueue (InputInterface $input, TorrStyle $io) : array foreach ($selectedOptions as $option) { - $index = \array_search($option, $choices, true); + $index = array_search($option, $choices, true); \assert(\is_int($index)); $result[] = $flatTasks[$index]; @@ -140,7 +141,7 @@ private function fetchTasksByKey (array $keys) : array foreach ($keys as $taskKey) { $result[] = $this->taskRegistry->getTaskByKey($taskKey) - ?? throw new UnknownTaskKeyException(\sprintf( + ?? throw new UnknownTaskKeyException(sprintf( "Unknown task key '%s'", $taskKey, )); @@ -154,16 +155,18 @@ private function fetchTasksByKey (array $keys) : array */ private function formatTaskLabel (Task $task) : string { - if (null !== $task->group) + $metaData = $task->getMetaData(); + + if (null !== $metaData->group) { - return \sprintf( + return sprintf( "%s: %s (%s)", - $task->group, - $task->label, - $task->key, + $metaData->group, + $metaData->label, + $metaData->getKey(), ); } - return $task->label; + return $metaData->label; } } diff --git a/src/Config/BundleConfig.php b/src/Config/BundleConfig.php index 7df9371..b8201c9 100644 --- a/src/Config/BundleConfig.php +++ b/src/Config/BundleConfig.php @@ -2,14 +2,14 @@ namespace Torr\TaskManager\Config; -final class BundleConfig +final readonly class BundleConfig { /** */ public function __construct ( /** @var string[] */ - public readonly array $sortedQueues, + public array $sortedQueues, /** @var string[] */ - public readonly array $failureTransports = [], + public array $failureTransports = [], ) {} } diff --git a/src/Console/ChainOutput.php b/src/Console/ChainOutput.php new file mode 100644 index 0000000..7a3b5da --- /dev/null +++ b/src/Console/ChainOutput.php @@ -0,0 +1,132 @@ +bufferedOutput = new BufferedOutput($verbosity, $decorated, $formatter); + $this->consoleOutput = new ConsoleOutput($verbosity, $decorated, $formatter); + } + + /** + * @inheritDoc + */ + public function write (iterable|string $messages, bool $newline = false, int $options = 0) : void + { + $this->bufferedOutput->write($messages, $newline, $options); + $this->consoleOutput->write($messages, $newline, $options); + } + + /** + * @inheritDoc + */ + public function writeln (iterable|string $messages, int $options = 0) : void + { + $this->bufferedOutput->writeln($messages, $options); + $this->consoleOutput->writeln($messages, $options); + } + + /** + * @inheritDoc + */ + public function setVerbosity (int $level) : void + { + $this->bufferedOutput->setVerbosity($level); + $this->consoleOutput->setVerbosity($level); + } + + /** + * @inheritDoc + */ + public function getVerbosity () : int + { + return $this->bufferedOutput->getVerbosity(); + } + + /** + * @inheritDoc + */ + public function isQuiet () : bool + { + return $this->bufferedOutput->isQuiet(); + } + + /** + * @inheritDoc + */ + public function isVerbose () : bool + { + return $this->bufferedOutput->isVerbose(); + } + + /** + * @inheritDoc + */ + public function isVeryVerbose () : bool + { + return $this->bufferedOutput->isVeryVerbose(); + } + + /** + * @inheritDoc + */ + public function isDebug () : bool + { + return $this->bufferedOutput->isDebug(); + } + + /** + * @inheritDoc + */ + public function setDecorated (bool $decorated) : void + { + $this->bufferedOutput->setDecorated(true); + $this->consoleOutput->setDecorated(true); + } + + /** + * @inheritDoc + */ + public function isDecorated () : bool + { + return $this->bufferedOutput->isDecorated(); + } + + public function setFormatter (OutputFormatterInterface $formatter) : void + { + $this->bufferedOutput->setFormatter($formatter); + $this->consoleOutput->setFormatter($formatter); + } + + /** + * @inheritDoc + */ + public function getFormatter () : OutputFormatterInterface + { + return $this->bufferedOutput->getFormatter(); + } + + /** + * + */ + public function getBufferedOutput () : string + { + return $this->bufferedOutput->fetch(); + } +} diff --git a/src/Console/MessageHandlerIo.php b/src/Console/MessageHandlerIo.php new file mode 100644 index 0000000..592ddcd --- /dev/null +++ b/src/Console/MessageHandlerIo.php @@ -0,0 +1,31 @@ +output = new ChainOutput(); + + parent::__construct( + new ArrayInput([]), + $this->output, + ); + } + + /** + * + */ + public function getBufferedOutput () : string + { + return $this->output->getBufferedOutput(); + } +} diff --git a/src/DependencyInjection/TaskManagerBundleConfiguration.php b/src/DependencyInjection/TaskManagerBundleConfiguration.php index 55eedef..eb810ae 100644 --- a/src/DependencyInjection/TaskManagerBundleConfiguration.php +++ b/src/DependencyInjection/TaskManagerBundleConfiguration.php @@ -27,5 +27,4 @@ public function getConfigTreeBuilder () : TreeBuilder return $treeBuilder; } - } diff --git a/src/Director/RunDirector.php b/src/Director/RunDirector.php new file mode 100644 index 0000000..748da23 --- /dev/null +++ b/src/Director/RunDirector.php @@ -0,0 +1,46 @@ +output = new ChainOutput(); + $this->io = new TorrStyle( + new ArrayInput([]), + $this->output, + ); + } + + /** + * + */ + public function getIo () : TorrStyle + { + return $this->io; + } + + /** + * Marks the task as finished + */ + public function finish (bool $success) : void + { + $this->run->finish($success, $this->output->getBufferedOutput()); + $this->logModel->flush(); + } +} diff --git a/src/Director/TaskDirector.php b/src/Director/TaskDirector.php new file mode 100644 index 0000000..2d21622 --- /dev/null +++ b/src/Director/TaskDirector.php @@ -0,0 +1,29 @@ +logModel->getLogForTask($task); + + // create run + $run = $this->logModel->createRunForTask($log); + $this->logModel->flush(); + + return new RunDirector($this->logModel, $run); + } +} diff --git a/src/Entity/TaskLog.php b/src/Entity/TaskLog.php new file mode 100644 index 0000000..b067207 --- /dev/null +++ b/src/Entity/TaskLog.php @@ -0,0 +1,127 @@ + */ + #[ORM\OneToMany(mappedBy: "taskLog", targetEntity: TaskRun::class)] + #[ORM\OrderBy(["timeStarted" => "asc"])] + private Collection $runs; + + public function __construct ( + string $taskId, + ) + { + $this->taskId = $taskId; + $this->runs = new ArrayCollection(); + $this->timeQueued = now(); + } + + /** + */ + public function getId () : ?int + { + return $this->id; + } + + /** + */ + public function getTaskId () : string + { + return $this->taskId; + } + + /** + */ + public function getTimeQueued () : \DateTimeImmutable + { + return $this->timeQueued; + } + + /** + * @return Collection + */ + public function getRuns () : Collection + { + return $this->runs; + } + + /** + */ + public function isFinishedSuccessfully () : ?bool + { + foreach ($this->runs as $run) + { + if ($run->isFinishedSuccessfully()) + { + return true; + } + } + + return false; + } + + /** + */ + public function getLastUnfinishedRun () : ?TaskRun + { + foreach ($this->runs as $run) + { + if (!$run->isFinished()) + { + return $run; + } + } + + return null; + } + + /** + * + */ + public function startRun () : TaskRun + { + if ($this->isFinishedSuccessfully()) + { + throw new InvalidLogActionException("Can't start a run for a task #{$this->id} that is already finished."); + } + + $run = new TaskRun($this); + $this->runs->add($run); + + return $run; + } +} diff --git a/src/Entity/TaskRun.php b/src/Entity/TaskRun.php new file mode 100644 index 0000000..b2a1f4e --- /dev/null +++ b/src/Entity/TaskRun.php @@ -0,0 +1,150 @@ +taskLog = $taskLog; + $this->timeStarted = now(); + } + + // region Accessors + /** + */ + public function getTaskLog () : TaskLog + { + return $this->taskLog; + } + + /** + */ + public function getTimeStarted () : \DateTimeImmutable + { + return $this->timeStarted; + } + + /** + */ + public function getTimeFinished () : ?\DateTimeImmutable + { + return $this->timeFinished; + } + + /** + * Whether the task was finished successfully. + */ + public function isFinishedSuccessfully () : bool + { + return true === $this->successful; + } + + /** + */ + public function getOutput () : ?string + { + return $this->output; + } + + /** + */ + public function isFinished () : bool + { + return null !== $this->timeFinished; + } + + /** + * Whether the task was finished properly or was automatically finished. + */ + public function hasFinishedProperly () : bool + { + return true === $this->finishedProperly; + } + // endregion + + /** + */ + public function finish (bool $successful, ?string $output) : void + { + if ($this->isFinished()) + { + throw new InvalidLogActionException("Can't finish task run #{$this->id} as it is already finished."); + } + + $this->finishedProperly = true; + $this->successful = $successful; + $this->output = $output; + $this->timeFinished = now(); + } + + /** + * Aborts the task, without finishing it properly + */ + public function abort (bool $successful, ?string $output = null) : void + { + if ($this->isFinished()) + { + throw new InvalidLogActionException("Can't abort task run #{$this->id} as it is already finished."); + } + + $this->finishedProperly = false; + $this->successful = $successful; + $this->output = $output; + $this->timeFinished = now(); + } +} diff --git a/src/Event/RegisterTasksEvent.php b/src/Event/RegisterTasksEvent.php index c71d539..cad31bd 100644 --- a/src/Event/RegisterTasksEvent.php +++ b/src/Event/RegisterTasksEvent.php @@ -2,10 +2,8 @@ namespace Torr\TaskManager\Event; -use Symfony\Component\String\Slugger\AsciiSlugger; -use function Symfony\Component\String\u; use Torr\TaskManager\Exception\Registry\DuplicateTaskRegisteredException; -use Torr\TaskManager\Registry\Data\Task; +use Torr\TaskManager\Task\Task; /** * This event lets you register your tasks, so that the UI can make them selectable @@ -18,54 +16,40 @@ final class RegisterTasksEvent { /** @var array */ public array $tasks = []; - private readonly AsciiSlugger $slugger; - - /** - */ - public function __construct () - { - $this->slugger = new AsciiSlugger("en"); - } - /** * Registers a task * * @throws DuplicateTaskRegisteredException */ - public function registerTask ( - string $label, - object $message, - ?string $group = null, - ) : self + public function register (Task $task) : self { - $key = u($label)->lower()->toString(); - $key = $this->slugger->slug($key)->toString(); + $definition = $task->getMetaData(); + $key = $definition->getKey(); if (\array_key_exists($key, $this->tasks)) { - throw new DuplicateTaskRegisteredException(\sprintf( + throw new DuplicateTaskRegisteredException(sprintf( "Duplicate task registered with key '%s'", $key, )); } - $this->tasks[$key] = new Task($key, $label, $message, $group); + $this->tasks[$key] = $task; return $this; } - /** - * @return Task[] + * @return list */ public function getTasks () : array { - $entries = \array_values($this->tasks); + $entries = array_values($this->tasks); - \usort( + usort( $entries, - static fn (Task $left, Task $right) => \strnatcasecmp($left->label, $right->label), + static fn (Task $left, Task $right) => strnatcasecmp($left->getMetaData()->label, $right->getMetaData()->label), ); return $entries; diff --git a/src/Exception/Log/InvalidLogActionException.php b/src/Exception/Log/InvalidLogActionException.php new file mode 100644 index 0000000..6b9c8f4 --- /dev/null +++ b/src/Exception/Log/InvalidLogActionException.php @@ -0,0 +1,9 @@ +getLogForEvent($event->getEnvelope()); + + // make sure that the log entry is created and flushed + if (null !== $taskLog) + { + $this->logModel->flush(); + } + } + + /** + * Automatically integrate + */ + #[AsEventListener(WorkerMessageHandledEvent::class)] + public function onWorkerMessageHandled (WorkerMessageHandledEvent $event) : void + { + $taskLog = $this->getLogForEvent($event->getEnvelope()); + $run = $taskLog?->getLastUnfinishedRun(); + + if (null === $run) + { + return; + } + + $run->abort(true, null); + $this->logModel->flush(); + } + + #[AsEventListener(WorkerMessageFailedEvent::class)] + public function onWorkerMessageFailed (WorkerMessageFailedEvent $event) : void + { + $taskLog = $this->getLogForEvent($event->getEnvelope()); + $run = $taskLog?->getLastUnfinishedRun(); + + if (null === $run) + { + return; + } + + $run->abort(false, $event->getThrowable()->getMessage()); + $this->logModel->flush(); + } + + /** + * + */ + private function getLogForEvent (Envelope $envelope) : ?TaskLog + { + $message = $envelope->getMessage(); + + return $message instanceof Task + ? $this->logModel->getLogForTask($message) + : null; + } +} diff --git a/src/Manager/TaskManager.php b/src/Manager/TaskManager.php index 89086d6..ca01951 100644 --- a/src/Manager/TaskManager.php +++ b/src/Manager/TaskManager.php @@ -11,78 +11,64 @@ use Symfony\Component\Messenger\Transport\TransportInterface; use Torr\TaskManager\Config\BundleConfig; use Torr\TaskManager\Exception\Manager\InvalidMessageTransportException; -use Torr\TaskManager\Message\UniqueMessageInterface; -use Torr\TaskManager\Stamp\UniqueJobStamp; +use Torr\TaskManager\Task\Task; -final class TaskManager +final readonly class TaskManager { + /** + */ public function __construct ( /** @var ServiceLocator */ - private readonly ServiceLocator $receivers, - private readonly MessageBusInterface $messageBus, - private readonly BundleConfig $bundleConfig, + private ServiceLocator $receivers, + private MessageBusInterface $messageBus, + private BundleConfig $bundleConfig, ) {} - /** * Enqueues a task. You can give the job a unique id, so that only a single task with this id can be enqueued at the same time. * * @return bool whether the message was added. If this is false, an identical job is already queued. */ - public function enqueue ( - object $message, - ?string $jobId = null, - ) : bool + public function enqueue (Task $task) : bool { - if (null === $jobId) - { - $jobId = $this->fetchJobIdFromMessage($message); - } - - if (null === $jobId) - { - $this->messageBus->dispatch($message); - return true; - } - - if (null !== $this->findQueuedMessageByUniqueJobId($jobId)) + // if we find a message with the same unique task id, we don't queue it again + if ($this->isTaskWithSameTaskIdAlreadyQueued($task->getMetaData()->uniqueTaskId)) { return false; } - $envelope = $message instanceof Envelope - ? $message - : new Envelope($message); + $this->messageBus->dispatch($task); - $envelope = $envelope->with(new UniqueJobStamp($jobId)); - $this->messageBus->dispatch($envelope); return true; - } - /** * Finds a queued message with the given job id */ - private function findQueuedMessageByUniqueJobId (string $jobId) : ?Envelope + private function isTaskWithSameTaskIdAlreadyQueued (?string $uniqueTaskId) : bool { + // no task id, so this task is not deduplicated. No need to check anything, just enqueue it. + if (null === $uniqueTaskId) + { + return false; + } + foreach ($this->getAllQueues() as $queueName) { foreach ($this->fetchTasksInQueue($queueName) as $envelope) { - $stamp = $envelope->last(UniqueJobStamp::class); + $message = $envelope->getMessage(); - if (null !== $stamp && $stamp->jobId === $jobId) + if ($message instanceof Task && $message->getMetaData()->uniqueTaskId === $uniqueTaskId) { - return $envelope; + return true; } } } - return null; + return false; } - /** * Fetches all tasks for the given priority * @@ -102,7 +88,7 @@ public function fetchTasksInQueue (string $queueName) : iterable if (!$receiver instanceof ListableReceiverInterface) { - throw new InvalidMessageTransportException(\sprintf( + throw new InvalidMessageTransportException(sprintf( "Transport for queue '%s' must implement ListableReceiverInterface", $queueName, )); @@ -112,14 +98,13 @@ public function fetchTasksInQueue (string $queueName) : iterable } catch (ContainerExceptionInterface $exception) { - throw new InvalidMessageTransportException(\sprintf( + throw new InvalidMessageTransportException(sprintf( "Could not fetch transport: %s", $exception->getMessage(), ), previous: $exception); } } - /** * Returns all queues * @@ -132,28 +117,9 @@ public function getAllQueues () : array return $this->bundleConfig->sortedQueues; } - return \array_filter( - \array_keys($this->receivers->getProvidedServices()), - fn (string $serviceId) => !\str_starts_with($serviceId, "messenger.transport.") && !\in_array($serviceId, $this->bundleConfig->failureTransports, true), + return array_filter( + array_keys($this->receivers->getProvidedServices()), + fn (string $serviceId) => !str_starts_with($serviceId, "messenger.transport.") && !\in_array($serviceId, $this->bundleConfig->failureTransports, true), ); } - - /** - * - */ - private function fetchJobIdFromMessage (object $message) : ?string - { - // unwrap the envelope, as the job id is attached to the message - if ($message instanceof Envelope) - { - $message = $message->getMessage(); - } - - if ($message instanceof UniqueMessageInterface) - { - return $message->getJobId(); - } - - return null; - } } diff --git a/src/Message/UniqueMessageInterface.php b/src/Message/UniqueMessageInterface.php deleted file mode 100644 index 6a36174..0000000 --- a/src/Message/UniqueMessageInterface.php +++ /dev/null @@ -1,15 +0,0 @@ - */ + private EntityRepository $repository; + + /** + */ + public function __construct ( + private readonly EntityManagerInterface $entityManager, + ) + { + $repository = $this->entityManager->getRepository(TaskLog::class); + \assert($repository instanceof EntityRepository); + $this->repository = $repository; + } + + /** + * Gets or creates the log entry for the given task + */ + public function getLogForTask (Task $task) : TaskLog + { + $log = $this->repository->findOneBy([ + "taskId" => $task->ulid, + ]); + + if (null !== $log) + { + return $log; + } + + // if it isn't created yet, create a new one + $log = new TaskLog($task->ulid); + $this->entityManager->persist($log); + + return $log; + } + + /** + * Creates a new run for the given task (lok) and marks it as persisted. + */ + public function createRunForTask (TaskLog $log) : TaskRun + { + $run = new TaskRun($log); + $this->entityManager->persist($run); + + return $run; + } + + /** + */ + public function flush () : void + { + $this->entityManager->flush(); + } +} diff --git a/src/Receiver/ReceiverHelper.php b/src/Receiver/ReceiverHelper.php index 815be66..b6819c0 100644 --- a/src/Receiver/ReceiverHelper.php +++ b/src/Receiver/ReceiverHelper.php @@ -9,14 +9,14 @@ /** * @internal */ -final class ReceiverHelper +final readonly class ReceiverHelper { /** */ public function __construct ( /** @var iterable */ #[AutowireIterator(tag: "messenger.receiver")] - private readonly iterable $transports, + private iterable $transports, ) {} /** diff --git a/src/Registry/Data/Task.php b/src/Registry/Data/Task.php deleted file mode 100644 index 8777381..0000000 --- a/src/Registry/Data/Task.php +++ /dev/null @@ -1,19 +0,0 @@ -|null */ private ?array $tasks = null; + /** @var array|null */ private ?array $keyMap = null; @@ -20,7 +21,6 @@ public function __construct ( private readonly EventDispatcherInterface $dispatcher, ) {} - /** * @return array */ @@ -29,7 +29,6 @@ public function getGroupedTasks () : array return $this->tasks ??= $this->fetchGroupedTasks(); } - /** * Returns a task by its key */ @@ -41,7 +40,6 @@ public function getTaskByKey (string $key) : ?Task return $this->keyMap[$key] ?? null; } - /** * Returns a list of all tasks, grouped by group label. * @@ -59,20 +57,22 @@ private function fetchGroupedTasks () : array foreach ($tasks as $task) { - if (null !== $task->group) + $definition = $task->getMetaData(); + + if (null !== $definition->group) { - $grouped[$task->group][] = $task; + $grouped[$definition->group][] = $task; } else { $ungrouped[] = $task; } - $this->keyMap[$task->key] = $task; + $this->keyMap[$definition->getKey()] = $task; } // sort groups by group label - \uksort($grouped, "strnatcasecmp"); + uksort($grouped, "strnatcasecmp"); if (!empty($ungrouped)) { @@ -83,9 +83,9 @@ private function fetchGroupedTasks () : array // sort every group foreach ($grouped as &$entries) { - \usort( + usort( $entries, - static fn (Task $left, Task $right) => \strnatcasecmp($left->label, $right->label), + static fn (Task $left, Task $right) => strnatcasecmp($left->getMetaData()->label, $right->getMetaData()->label), ); } diff --git a/src/Task/Task.php b/src/Task/Task.php index 5021ed1..f3bd46f 100644 --- a/src/Task/Task.php +++ b/src/Task/Task.php @@ -2,14 +2,27 @@ namespace Torr\TaskManager\Task; -use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Uid\Ulid; -final class Task +/** + * A runnable task + */ +abstract readonly class Task { + public string $ulid; + + /** + */ + public function __construct () + { + $this->ulid = (new Ulid())->toBase58(); + } + /** + * Defines the metadata for this task. + * + * It is important that this data is generated on the fly, so that we can change the label for already + * serialized messages as well. */ - public function __construct ( - public readonly string $queueName, - public readonly Envelope $envelope, - ) {} + abstract public function getMetaData () : TaskMetaData; } diff --git a/src/Task/TaskMetaData.php b/src/Task/TaskMetaData.php new file mode 100644 index 0000000..5c0868a --- /dev/null +++ b/src/Task/TaskMetaData.php @@ -0,0 +1,32 @@ +label)->lower()->toString(); + + return $slugger->slug($key)->toString(); + } +} diff --git a/src/TaskManagerBundle.php b/src/TaskManagerBundle.php index 5c33207..f806e08 100644 --- a/src/TaskManagerBundle.php +++ b/src/TaskManagerBundle.php @@ -43,5 +43,4 @@ public function getPath () : string { return \dirname(__DIR__); } - } diff --git a/vendor-bin/cs-fixer/composer.json b/vendor-bin/cs-fixer/composer.json index c8efe94..ceadfce 100644 --- a/vendor-bin/cs-fixer/composer.json +++ b/vendor-bin/cs-fixer/composer.json @@ -1,6 +1,6 @@ { "require-dev": { - "21torr/php-cs-fixer": "^1.0.2", + "21torr/php-cs-fixer": "^1.1.1", "roave/security-advisories": "dev-latest" } }