From 69d871cda3ec0f449666c503ad8bf0161add6d58 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Sun, 7 Apr 2024 10:22:47 +0200 Subject: [PATCH 1/2] Allow RoadRunner jobs to optionally forward a requestId as part of the task payload --- functions/functions.php | 10 +++++-- src/Listener/LazyEventListener.php | 8 ++--- .../RoadRunnerEventDispatcherFactory.php | 14 +++++++-- .../RoadRunnerTaskConsumerToListener.php | 23 +++++++++++---- src/RoadRunner/RoadRunnerTaskListener.php | 11 +++++-- src/Util/RequestIdProviderInterface.php | 10 +++++++ .../RoadRunnerEventDispatcherFactoryTest.php | 29 +++++++++++++++---- .../RoadRunnerTaskConsumerToListenerTest.php | 2 ++ .../RoadRunner/RoadRunnerTaskListenerTest.php | 13 ++++++++- 9 files changed, 94 insertions(+), 26 deletions(-) create mode 100644 src/Util/RequestIdProviderInterface.php diff --git a/functions/functions.php b/functions/functions.php index ef56656..0d94dc4 100644 --- a/functions/functions.php +++ b/functions/functions.php @@ -7,6 +7,7 @@ use Psr\Container\ContainerInterface; use Shlinkio\Shlink\EventDispatcher\Listener\DummyEnabledListenerChecker; use Shlinkio\Shlink\EventDispatcher\Listener\EnabledListenerCheckerInterface; +use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface; use Spiral\RoadRunner\Jobs\JobsInterface; function lazyListener(ContainerInterface $container, string $listenerServiceName): callable @@ -14,9 +15,12 @@ function lazyListener(ContainerInterface $container, string $listenerServiceName return new Listener\LazyEventListener($container, $listenerServiceName); } -function roadRunnerTaskListener(JobsInterface $jobs, string $listenerServiceName): callable -{ - return new RoadRunner\RoadRunnerTaskListener($jobs, $listenerServiceName); +function roadRunnerTaskListener( + JobsInterface $jobs, + string $listenerServiceName, + RequestIdProviderInterface $requestIdProvider, +): callable { + return new RoadRunner\RoadRunnerTaskListener($jobs, $listenerServiceName, $requestIdProvider); } function resolveEnabledListenerChecker(ContainerInterface $container): EnabledListenerCheckerInterface diff --git a/src/Listener/LazyEventListener.php b/src/Listener/LazyEventListener.php index 011c418..e1f4454 100644 --- a/src/Listener/LazyEventListener.php +++ b/src/Listener/LazyEventListener.php @@ -6,12 +6,10 @@ use Psr\Container\ContainerInterface; -class LazyEventListener +readonly class LazyEventListener { - public function __construct( - private readonly ContainerInterface $container, - private readonly string $listenerServiceName, - ) { + public function __construct(private ContainerInterface $container, private string $listenerServiceName) + { } public function __invoke(object $event): void diff --git a/src/RoadRunner/RoadRunnerEventDispatcherFactory.php b/src/RoadRunner/RoadRunnerEventDispatcherFactory.php index bd585b6..d954a18 100644 --- a/src/RoadRunner/RoadRunnerEventDispatcherFactory.php +++ b/src/RoadRunner/RoadRunnerEventDispatcherFactory.php @@ -7,6 +7,7 @@ use League\Event\EventDispatcher; use League\Event\PrioritizedListenerRegistry; use Psr\Container\ContainerInterface; +use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface; use Spiral\RoadRunner\Jobs\Jobs; use function Shlinkio\Shlink\Config\env; @@ -21,8 +22,16 @@ public function __invoke(ContainerInterface $container): EventDispatcher { $provider = new PrioritizedListenerRegistry(); $eventsConfig = $container->get('config')['events'] ?? []; + $requestIdProvider = $container->has(RequestIdProviderInterface::class) + ? $container->get(RequestIdProviderInterface::class) + : new class implements RequestIdProviderInterface { + public function currentRequestId(): string + { + return '-'; + } + }; - $this->registerEvents($provider, $container, $eventsConfig['async'] ?? []); + $this->registerEvents($provider, $container, $requestIdProvider, $eventsConfig['async'] ?? []); return new EventDispatcher($provider); } @@ -30,6 +39,7 @@ public function __invoke(ContainerInterface $container): EventDispatcher private function registerEvents( PrioritizedListenerRegistry $provider, ContainerInterface $container, + RequestIdProviderInterface $requestIdProvider, array $events, ): void { if (env('RR_MODE') === null) { @@ -45,7 +55,7 @@ private function registerEvents( continue; } - $provider->subscribeTo($eventName, roadRunnerTaskListener($jobs, $listener)); + $provider->subscribeTo($eventName, roadRunnerTaskListener($jobs, $listener, $requestIdProvider)); } } } diff --git a/src/RoadRunner/RoadRunnerTaskConsumerToListener.php b/src/RoadRunner/RoadRunnerTaskConsumerToListener.php index 6671026..c1d5161 100644 --- a/src/RoadRunner/RoadRunnerTaskConsumerToListener.php +++ b/src/RoadRunner/RoadRunnerTaskConsumerToListener.php @@ -13,16 +13,19 @@ use function is_subclass_of; use function Shlinkio\Shlink\Json\json_decode; -class RoadRunnerTaskConsumerToListener +readonly class RoadRunnerTaskConsumerToListener { public function __construct( - private readonly ConsumerInterface $consumer, - private readonly ContainerInterface $container, - private readonly LoggerInterface $logger, + private ConsumerInterface $consumer, + private ContainerInterface $container, + private LoggerInterface $logger, ) { } - public function listenForTasks(): void + /** + * @param callable(string): void|null $setCurrentRequestId + */ + public function listenForTasks(?callable $setCurrentRequestId = null): void { while ($task = $this->consumer->waitTask()) { try { @@ -37,7 +40,15 @@ public function listenForTasks(): void continue; } - ['listenerServiceName' => $listener, 'eventPayload' => $payload] = json_decode($task->getPayload()); + [ + 'listenerServiceName' => $listener, + 'eventPayload' => $payload, + 'requestId' => $requestId, + ] = json_decode($task->getPayload()); + if ($setCurrentRequestId !== null) { + $setCurrentRequestId($requestId); + } + $this->container->get($listener)($event::fromPayload($payload)); $task->complete(); } catch (Throwable $e) { diff --git a/src/RoadRunner/RoadRunnerTaskListener.php b/src/RoadRunner/RoadRunnerTaskListener.php index c65621a..3790649 100644 --- a/src/RoadRunner/RoadRunnerTaskListener.php +++ b/src/RoadRunner/RoadRunnerTaskListener.php @@ -5,16 +5,20 @@ namespace Shlinkio\Shlink\EventDispatcher\RoadRunner; use JsonSerializable; +use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface; use Spiral\RoadRunner\Jobs\JobsInterface; use function Shlinkio\Shlink\Json\json_encode; -class RoadRunnerTaskListener +readonly class RoadRunnerTaskListener { private const SHLINK_QUEUE = 'shlink'; - public function __construct(private readonly JobsInterface $jobs, private readonly string $listenerServiceName) - { + public function __construct( + private JobsInterface $jobs, + private string $listenerServiceName, + private RequestIdProviderInterface $requestIdProvider, + ) { } public function __invoke(object $event): void @@ -23,6 +27,7 @@ public function __invoke(object $event): void $task = $queue->create($event::class, json_encode([ 'listenerServiceName' => $this->listenerServiceName, 'eventPayload' => $event instanceof JsonSerializable ? $event : [], + 'requestId' => $this->requestIdProvider->currentRequestId(), ])); $queue->dispatch($task); } diff --git a/src/Util/RequestIdProviderInterface.php b/src/Util/RequestIdProviderInterface.php new file mode 100644 index 0000000..e938e2b --- /dev/null +++ b/src/Util/RequestIdProviderInterface.php @@ -0,0 +1,10 @@ +factory)($container); $listenerProvider = $this->getPrivateProp($dispatcher, 'listenerProvider'); @@ -80,10 +84,11 @@ private function getPrivateProp(object $object, string $propName): mixed return $prop->getValue($object); } - private function container(?EnabledListenerCheckerInterface $listenerChecker = null): ContainerInterface - { + private function container( + ?EnabledListenerCheckerInterface $listenerChecker = null, + bool $hasRequestIdProvider = false, + ): ContainerInterface { $container = $this->createMock(ContainerInterface::class); - $getServiceReturnMap = [ ['config', [ 'events' => [ @@ -95,12 +100,24 @@ private function container(?EnabledListenerCheckerInterface $listenerChecker = n ]], [Jobs::class, $this->createMock(JobsInterface::class)], ]; + $hasServiceReturnMap = [ + [RequestIdProviderInterface::class, $hasRequestIdProvider], + ]; + if ($listenerChecker !== null) { - $container->method('has')->with(EnabledListenerCheckerInterface::class)->willReturn(true); + $hasServiceReturnMap[] = [EnabledListenerCheckerInterface::class, true]; $getServiceReturnMap[] = [EnabledListenerCheckerInterface::class, $listenerChecker]; } + if ($hasRequestIdProvider) { + $getServiceReturnMap[] = [ + RequestIdProviderInterface::class, + $this->createMock(RequestIdProviderInterface::class), + ]; + } + $container->method('get')->willReturnMap($getServiceReturnMap); + $container->method('has')->willReturnMap($hasServiceReturnMap); return $container; } diff --git a/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php b/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php index bd680e8..c1d66e0 100644 --- a/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php +++ b/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php @@ -66,6 +66,7 @@ public function listenerIsLoadedAndInvoked(): void $task->method('getPayload')->willReturn(json_encode([ 'listenerServiceName' => 'my_listener', 'eventPayload' => [], + 'requestId' => '123', ])); $task->expects($this->once())->method('complete'); $task->expects($this->never())->method('fail'); @@ -91,6 +92,7 @@ public function taskIsFailedInCaseOfError(): void $task->method('getPayload')->willReturn(json_encode([ 'listenerServiceName' => 'my_listener', 'eventPayload' => [], + 'requestId' => '123', ])); $task->expects($this->never())->method('complete'); $task->expects($this->once())->method('fail')->with($this->isInstanceOf(RuntimeException::class)); diff --git a/test/RoadRunner/RoadRunnerTaskListenerTest.php b/test/RoadRunner/RoadRunnerTaskListenerTest.php index 728bdca..25a42a9 100644 --- a/test/RoadRunner/RoadRunnerTaskListenerTest.php +++ b/test/RoadRunner/RoadRunnerTaskListenerTest.php @@ -10,6 +10,7 @@ use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Shlinkio\Shlink\EventDispatcher\RoadRunner\RoadRunnerTaskListener; +use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface; use Spiral\RoadRunner\Jobs\JobsInterface; use Spiral\RoadRunner\Jobs\QueueInterface; use Spiral\RoadRunner\Jobs\Task\PreparedTaskInterface; @@ -27,7 +28,16 @@ class RoadRunnerTaskListenerTest extends TestCase public function setUp(): void { $this->jobs = $this->createMock(JobsInterface::class); - $this->listener = new RoadRunnerTaskListener($this->jobs, $this->listenerServiceName); + $this->listener = new RoadRunnerTaskListener( + $this->jobs, + $this->listenerServiceName, + new class implements RequestIdProviderInterface { + public function currentRequestId(): string + { + return '-'; + } + }, + ); } #[Test, DataProvider('provideEvents')] @@ -40,6 +50,7 @@ public function expectedTaskIsDispatchedBasedOnProvidedEvent(object $event, arra $queue->expects($this->once())->method('create')->with($event::class, json_encode([ 'listenerServiceName' => $this->listenerServiceName, 'eventPayload' => $expectedPayload, + 'requestId' => '-', ]))->willReturn($task); $queue->expects($this->once())->method('dispatch')->with($task)->willReturn( $this->createMock(QueuedTaskInterface::class), From c276f37bc5392f37428a4b6409ab45558cc5f871 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Sun, 7 Apr 2024 10:43:03 +0200 Subject: [PATCH 2/2] Test requestId is properly set in RoadRunnerTaskListenerTest --- .../RoadRunnerTaskConsumerToListener.php | 6 +++--- .../RoadRunnerTaskConsumerToListenerTest.php | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/RoadRunner/RoadRunnerTaskConsumerToListener.php b/src/RoadRunner/RoadRunnerTaskConsumerToListener.php index c1d5161..0e2fe17 100644 --- a/src/RoadRunner/RoadRunnerTaskConsumerToListener.php +++ b/src/RoadRunner/RoadRunnerTaskConsumerToListener.php @@ -23,7 +23,7 @@ public function __construct( } /** - * @param callable(string): void|null $setCurrentRequestId + * @param (callable(string): void)|null $setCurrentRequestId */ public function listenForTasks(?callable $setCurrentRequestId = null): void { @@ -41,7 +41,7 @@ public function listenForTasks(?callable $setCurrentRequestId = null): void } [ - 'listenerServiceName' => $listener, + 'listenerServiceName' => $listenerService, 'eventPayload' => $payload, 'requestId' => $requestId, ] = json_decode($task->getPayload()); @@ -49,7 +49,7 @@ public function listenForTasks(?callable $setCurrentRequestId = null): void $setCurrentRequestId($requestId); } - $this->container->get($listener)($event::fromPayload($payload)); + $this->container->get($listenerService)($event::fromPayload($payload)); $task->complete(); } catch (Throwable $e) { $task->fail($e); diff --git a/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php b/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php index c1d66e0..e0057d8 100644 --- a/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php +++ b/test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php @@ -5,6 +5,7 @@ namespace ShlinkioTest\Shlink\EventDispatcher\RoadRunner; use PHPUnit\Framework\Attributes\Test; +use PHPUnit\Framework\Attributes\TestWith; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Psr\Container\ContainerInterface; @@ -58,7 +59,10 @@ function () use (&$callCount, $task) { } #[Test] - public function listenerIsLoadedAndInvoked(): void + #[TestWith(['123'])] + #[TestWith(['456'])] + #[TestWith(['abc'])] + public function listenerIsLoadedAndInvoked(string $requestId): void { $callCount = 0; $task = $this->createMock(ReceivedTaskInterface::class); @@ -66,7 +70,7 @@ public function listenerIsLoadedAndInvoked(): void $task->method('getPayload')->willReturn(json_encode([ 'listenerServiceName' => 'my_listener', 'eventPayload' => [], - 'requestId' => '123', + 'requestId' => $requestId, ])); $task->expects($this->once())->method('complete'); $task->expects($this->never())->method('fail'); @@ -80,7 +84,12 @@ function () use (&$callCount, $task) { }); $this->logger->expects($this->never())->method('warning'); - $this->taskConsumer->listenForTasks(); + $providedRequestId = null; + $this->taskConsumer->listenForTasks(function (string $id) use (&$providedRequestId): void { + $providedRequestId = $id; + }); + + self::assertEquals($requestId, $providedRequestId); } #[Test]