Skip to content

Commit

Permalink
Finalize CI etc
Browse files Browse the repository at this point in the history
  • Loading branch information
apfelbox committed Jun 6, 2024
1 parent 096c332 commit 81b8096
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 174 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.4.0
=====


1.3.1
=====

Expand Down
7 changes: 1 addition & 6 deletions src/Command/ListTasksCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Symfony\Component\Console\Output\OutputInterface;
use Torr\Cli\Console\Style\TorrStyle;
use Torr\TaskManager\Registry\TaskRegistry;
use Torr\TaskManager\Task\WrappedMessage;

#[AsCommand("task-manager:list-tasks")]
final class ListTasksCommand extends Command
Expand Down Expand Up @@ -69,16 +68,12 @@ protected function execute (InputInterface $input, OutputInterface $output) : in
$first = false;
}

$taskClass = $task instanceof WrappedMessage
? \get_class($task->message)
: $task::class;

$row[] = sprintf(
"<fg=yellow>%s</>",
$metaData->getKey(),
);
$row[] = $metaData->label;
$row[] = $taskClass;
$row[] = $task::class;
$rows[] = $row;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/Command/QueueTasksCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use Torr\TaskManager\Manager\TaskManager;
use Torr\TaskManager\Receiver\ReceiverHelper;
use Torr\TaskManager\Registry\TaskRegistry;
use Torr\TaskManager\Task\TaskInterface;
use Torr\TaskManager\Task\Task;

#[AsCommand("task-manager:queue")]
final class QueueTasksCommand extends Command
Expand Down Expand Up @@ -85,7 +85,7 @@ protected function execute (InputInterface $input, OutputInterface $output) : in
/**
* Handles the interaction to get the tasks to queue
*
* @return TaskInterface[]
* @return Task[]
*/
private function getTasksToQueue (InputInterface $input, TorrStyle $io) : array
{
Expand Down Expand Up @@ -132,7 +132,7 @@ private function getTasksToQueue (InputInterface $input, TorrStyle $io) : array
/**
* @param string[] $keys
*
* @return TaskInterface[]
* @return Task[]
*/
private function fetchTasksByKey (array $keys) : array
{
Expand All @@ -153,7 +153,7 @@ private function fetchTasksByKey (array $keys) : array
/**
*
*/
private function formatTaskLabel (TaskInterface $task) : string
private function formatTaskLabel (Task $task) : string
{
$metaData = $task->getMetaData();

Expand Down
6 changes: 3 additions & 3 deletions src/Config/BundleConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

namespace Torr\TaskManager\Config;

final class BundleConfig
final readonly class BundleConfig
{
/**
*/
public function __construct (
/** @var string[] */
public readonly array $sortedQueues,
public array $sortedQueues,
/** @var string[] */
public readonly array $failureTransports = [],
public array $failureTransports = [],
) {}
}
40 changes: 3 additions & 37 deletions src/Event/RegisterTasksEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

use Torr\TaskManager\Exception\Registry\DuplicateTaskRegisteredException;
use Torr\TaskManager\Task\Task;
use Torr\TaskManager\Task\TaskInterface;
use Torr\TaskManager\Task\WrappedMessage;

/**
* This event lets you register your tasks, so that the UI can make them selectable
Expand All @@ -16,41 +14,9 @@
*/
final class RegisterTasksEvent
{
/** @var array<string, TaskInterface> */
/** @var array<string, Task> */
public array $tasks = [];

/**
* Registers a task
*
* @deprecated refactor your messages to use {@see Task}s
*
* @throws DuplicateTaskRegisteredException
*/
public function registerTask (
string $label,
object $message,
?string $group = null,
) : self
{
// @phpstan-ignore-next-line todoBy.sfDeprecation
trigger_deprecation("21torr/task-manager", "1.4.0", "Not using Tasks is deprecated. Please refactor your code to tasks and register them using the dedicated method in " . __CLASS__ . ".");

$wrappedTask = new WrappedMessage($label, $message, $group);
$key = $wrappedTask->getMetaData()->getKey();

if (\array_key_exists($key, $this->tasks))
{
throw new DuplicateTaskRegisteredException(sprintf(
"Duplicate task registered with key '%s'",
$key,
));
}

$this->tasks[$key] = $wrappedTask;

return $this;
}

/**
* Registers a task
*
Expand All @@ -75,15 +41,15 @@ public function register (Task $task) : self
}

/**
* @return list<TaskInterface>
* @return list<Task>
*/
public function getTasks () : array
{
$entries = array_values($this->tasks);

usort(
$entries,
static fn (TaskInterface $left, TaskInterface $right) => strnatcasecmp($left->getMetaData()->label, $right->getMetaData()->label),
static fn (Task $left, Task $right) => strnatcasecmp($left->getMetaData()->label, $right->getMetaData()->label),
);

return $entries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public function onSendMessageToTransports (SendMessageToTransportsEvent $event)
}
}


/**
* Automatically integrate
*/
Expand Down Expand Up @@ -66,7 +65,6 @@ public function onWorkerMessageFailed (WorkerMessageFailedEvent $event) : void
$this->logModel->flush();
}


/**
*
*/
Expand Down
67 changes: 18 additions & 49 deletions src/Manager/TaskManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
use Symfony\Component\Messenger\Transport\TransportInterface;
use Torr\TaskManager\Config\BundleConfig;
use Torr\TaskManager\Exception\Manager\InvalidMessageTransportException;
use Torr\TaskManager\Message\UniqueMessageInterface;
use Torr\TaskManager\Stamp\UniqueJobStamp;
use Torr\TaskManager\Task\Task;

final readonly class TaskManager
{
/**
*/
public function __construct (
/** @var ServiceLocator<TransportInterface> */
private ServiceLocator $receivers,
Expand All @@ -28,57 +29,44 @@ public function __construct (
*
* @return bool whether the message was added. If this is false, an identical job is already queued.
*/
public function enqueue (
object $message,
?string $jobId = null,
) : bool
public function enqueue (Task $task) : bool
{
if (null === $jobId)
{
$jobId = $this->fetchJobIdFromMessage($message);
}

if (null === $jobId)
{
$this->messageBus->dispatch($message);

return true;
}

if (null !== $this->findQueuedMessageByUniqueJobId($jobId))
// if we find a message with the same unique task id, we don't queue it again
if ($this->isTaskWithSameTaskIdAlreadyQueued($task->getMetaData()->uniqueTaskId))
{
return false;
}

$envelope = $message instanceof Envelope
? $message
: new Envelope($message);

$envelope = $envelope->with(new UniqueJobStamp($jobId));
$this->messageBus->dispatch($envelope);
$this->messageBus->dispatch($task);

return true;
}

/**
* Finds a queued message with the given job id
*/
private function findQueuedMessageByUniqueJobId (string $jobId) : ?Envelope
private function isTaskWithSameTaskIdAlreadyQueued (?string $uniqueTaskId) : bool
{
// no task id, so this task is not deduplicated. No need to check anything, just enqueue it.
if (null === $uniqueTaskId)
{
return false;
}

foreach ($this->getAllQueues() as $queueName)
{
foreach ($this->fetchTasksInQueue($queueName) as $envelope)
{
$stamp = $envelope->last(UniqueJobStamp::class);
$message = $envelope->getMessage();

if (null !== $stamp && $stamp->jobId === $jobId)
if ($message instanceof Task && $message->getMetaData()->uniqueTaskId === $uniqueTaskId)
{
return $envelope;
return true;
}
}
}

return null;
return false;
}

/**
Expand Down Expand Up @@ -134,23 +122,4 @@ public function getAllQueues () : array
fn (string $serviceId) => !str_starts_with($serviceId, "messenger.transport.") && !\in_array($serviceId, $this->bundleConfig->failureTransports, true),
);
}

/**
*
*/
private function fetchJobIdFromMessage (object $message) : ?string
{
// unwrap the envelope, as the job id is attached to the message
if ($message instanceof Envelope)
{
$message = $message->getMessage();
}

if ($message instanceof UniqueMessageInterface)
{
return $message->getJobId();
}

return null;
}
}
15 changes: 0 additions & 15 deletions src/Message/UniqueMessageInterface.php

This file was deleted.

3 changes: 0 additions & 3 deletions Model/TaskLogModel.php → src/Model/TaskLogModel.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public function __construct (
$this->repository = $repository;
}


/**
* Gets or creates the log entry for the given task
*/
Expand All @@ -46,7 +45,6 @@ public function getLogForTask (Task $task) : TaskLog
return $log;
}


/**
* Creates a new run for the given task (lok) and marks it as persisted.
*/
Expand All @@ -58,7 +56,6 @@ public function createRunForTask (TaskLog $log) : TaskRun
return $run;
}


/**
*/
public function flush () : void
Expand Down
14 changes: 7 additions & 7 deletions src/Registry/TaskRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@

use Psr\EventDispatcher\EventDispatcherInterface;
use Torr\TaskManager\Event\RegisterTasksEvent;
use Torr\TaskManager\Task\TaskInterface;
use Torr\TaskManager\Task\Task;

/**
* Contains all tasks that are automatically configured to be runnable.
*/
final class TaskRegistry
{
/** @var array<string, TaskInterface[]>|null */
/** @var array<string, Task[]>|null */
private ?array $tasks = null;

/** @var array<string, TaskInterface>|null */
/** @var array<string, Task>|null */
private ?array $keyMap = null;

public function __construct (
private readonly EventDispatcherInterface $dispatcher,
) {}

/**
* @return array<string, TaskInterface[]>
* @return array<string, Task[]>
*/
public function getGroupedTasks () : array
{
Expand All @@ -32,7 +32,7 @@ public function getGroupedTasks () : array
/**
* Returns a task by its key
*/
public function getTaskByKey (string $key) : ?TaskInterface
public function getTaskByKey (string $key) : ?Task
{
// be sure to fetch tasks
$this->getGroupedTasks();
Expand All @@ -43,7 +43,7 @@ public function getTaskByKey (string $key) : ?TaskInterface
/**
* Returns a list of all tasks, grouped by group label.
*
* @return array<string, TaskInterface[]>
* @return array<string, Task[]>
*/
private function fetchGroupedTasks () : array
{
Expand Down Expand Up @@ -85,7 +85,7 @@ private function fetchGroupedTasks () : array
{
usort(
$entries,
static fn (TaskInterface $left, TaskInterface $right) => strnatcasecmp($left->getMetaData()->label, $right->getMetaData()->label),
static fn (Task $left, Task $right) => strnatcasecmp($left->getMetaData()->label, $right->getMetaData()->label),
);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Task/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
/**
* A runnable task
*/
abstract class Task implements TaskInterface
abstract readonly class Task
{
public readonly string $ulid;
public string $ulid;

/**
*/
Expand Down
Loading

0 comments on commit 81b8096

Please sign in to comment.