Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow RoadRunner jobs to optionally forward a requestId as part of the task payload #61

Merged
merged 2 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions functions/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
use Psr\Container\ContainerInterface;
use Shlinkio\Shlink\EventDispatcher\Listener\DummyEnabledListenerChecker;
use Shlinkio\Shlink\EventDispatcher\Listener\EnabledListenerCheckerInterface;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\JobsInterface;

function lazyListener(ContainerInterface $container, string $listenerServiceName): callable
{
return new Listener\LazyEventListener($container, $listenerServiceName);
}

function roadRunnerTaskListener(JobsInterface $jobs, string $listenerServiceName): callable
{
return new RoadRunner\RoadRunnerTaskListener($jobs, $listenerServiceName);
function roadRunnerTaskListener(
JobsInterface $jobs,
string $listenerServiceName,
RequestIdProviderInterface $requestIdProvider,
): callable {
return new RoadRunner\RoadRunnerTaskListener($jobs, $listenerServiceName, $requestIdProvider);
}

function resolveEnabledListenerChecker(ContainerInterface $container): EnabledListenerCheckerInterface
Expand Down
8 changes: 3 additions & 5 deletions src/Listener/LazyEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@

use Psr\Container\ContainerInterface;

class LazyEventListener
readonly class LazyEventListener
{
public function __construct(
private readonly ContainerInterface $container,
private readonly string $listenerServiceName,
) {
public function __construct(private ContainerInterface $container, private string $listenerServiceName)
{
}

public function __invoke(object $event): void
Expand Down
14 changes: 12 additions & 2 deletions src/RoadRunner/RoadRunnerEventDispatcherFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use League\Event\EventDispatcher;
use League\Event\PrioritizedListenerRegistry;
use Psr\Container\ContainerInterface;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\Jobs;

use function Shlinkio\Shlink\Config\env;
Expand All @@ -21,15 +22,24 @@ public function __invoke(ContainerInterface $container): EventDispatcher
{
$provider = new PrioritizedListenerRegistry();
$eventsConfig = $container->get('config')['events'] ?? [];
$requestIdProvider = $container->has(RequestIdProviderInterface::class)
? $container->get(RequestIdProviderInterface::class)
: new class implements RequestIdProviderInterface {
public function currentRequestId(): string
{
return '-';
}
};

$this->registerEvents($provider, $container, $eventsConfig['async'] ?? []);
$this->registerEvents($provider, $container, $requestIdProvider, $eventsConfig['async'] ?? []);

return new EventDispatcher($provider);
}

private function registerEvents(
PrioritizedListenerRegistry $provider,
ContainerInterface $container,
RequestIdProviderInterface $requestIdProvider,
array $events,
): void {
if (env('RR_MODE') === null) {
Expand All @@ -45,7 +55,7 @@ private function registerEvents(
continue;
}

$provider->subscribeTo($eventName, roadRunnerTaskListener($jobs, $listener));
$provider->subscribeTo($eventName, roadRunnerTaskListener($jobs, $listener, $requestIdProvider));
}
}
}
Expand Down
25 changes: 18 additions & 7 deletions src/RoadRunner/RoadRunnerTaskConsumerToListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
use function is_subclass_of;
use function Shlinkio\Shlink\Json\json_decode;

class RoadRunnerTaskConsumerToListener
readonly class RoadRunnerTaskConsumerToListener
{
public function __construct(
private readonly ConsumerInterface $consumer,
private readonly ContainerInterface $container,
private readonly LoggerInterface $logger,
private ConsumerInterface $consumer,
private ContainerInterface $container,
private LoggerInterface $logger,
) {
}

public function listenForTasks(): void
/**
* @param (callable(string): void)|null $setCurrentRequestId
*/
public function listenForTasks(?callable $setCurrentRequestId = null): void
{
while ($task = $this->consumer->waitTask()) {
try {
Expand All @@ -37,8 +40,16 @@ public function listenForTasks(): void
continue;
}

['listenerServiceName' => $listener, 'eventPayload' => $payload] = json_decode($task->getPayload());
$this->container->get($listener)($event::fromPayload($payload));
[
'listenerServiceName' => $listenerService,
'eventPayload' => $payload,
'requestId' => $requestId,
] = json_decode($task->getPayload());
if ($setCurrentRequestId !== null) {
$setCurrentRequestId($requestId);
}

$this->container->get($listenerService)($event::fromPayload($payload));
$task->complete();
} catch (Throwable $e) {
$task->fail($e);
Expand Down
11 changes: 8 additions & 3 deletions src/RoadRunner/RoadRunnerTaskListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
namespace Shlinkio\Shlink\EventDispatcher\RoadRunner;

use JsonSerializable;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\JobsInterface;

use function Shlinkio\Shlink\Json\json_encode;

class RoadRunnerTaskListener
readonly class RoadRunnerTaskListener
{
private const SHLINK_QUEUE = 'shlink';

public function __construct(private readonly JobsInterface $jobs, private readonly string $listenerServiceName)
{
public function __construct(
private JobsInterface $jobs,
private string $listenerServiceName,
private RequestIdProviderInterface $requestIdProvider,
) {
}

public function __invoke(object $event): void
Expand All @@ -23,6 +27,7 @@ public function __invoke(object $event): void
$task = $queue->create($event::class, json_encode([
'listenerServiceName' => $this->listenerServiceName,
'eventPayload' => $event instanceof JsonSerializable ? $event : [],
'requestId' => $this->requestIdProvider->currentRequestId(),
]));
$queue->dispatch($task);
}
Expand Down
10 changes: 10 additions & 0 deletions src/Util/RequestIdProviderInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Shlinkio\Shlink\EventDispatcher\Util;

interface RequestIdProviderInterface
{
public function currentRequestId(): string;
}
29 changes: 23 additions & 6 deletions test/RoadRunner/RoadRunnerEventDispatcherFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
use PHPUnit\Framework\Assert;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\Attributes\TestWith;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use ReflectionObject;
use Shlinkio\Shlink\EventDispatcher\Listener\EnabledListenerCheckerInterface;
use Shlinkio\Shlink\EventDispatcher\RoadRunner\RoadRunnerEventDispatcherFactory;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\RoadRunner\Jobs\JobsInterface;
use stdClass;
Expand Down Expand Up @@ -53,7 +55,9 @@ public static function provideEnv(): iterable
}

#[Test]
public function skipsListenersWhenEnabledListenerCheckerIsRegistered(): void
#[TestWith([true])]
#[TestWith([false])]
public function skipsListenersWhenEnabledListenerCheckerIsRegistered(bool $hasRequestIdProvider): void
{
putenv('RR_MODE=http');

Expand All @@ -62,7 +66,7 @@ public function shouldRegisterListener(string $event, string $listener, bool $is
{
return $isAsync && $listener === 'foo';
}
});
}, hasRequestIdProvider: $hasRequestIdProvider);

$dispatcher = ($this->factory)($container);
$listenerProvider = $this->getPrivateProp($dispatcher, 'listenerProvider');
Expand All @@ -80,10 +84,11 @@ private function getPrivateProp(object $object, string $propName): mixed
return $prop->getValue($object);
}

private function container(?EnabledListenerCheckerInterface $listenerChecker = null): ContainerInterface
{
private function container(
?EnabledListenerCheckerInterface $listenerChecker = null,
bool $hasRequestIdProvider = false,
): ContainerInterface {
$container = $this->createMock(ContainerInterface::class);

$getServiceReturnMap = [
['config', [
'events' => [
Expand All @@ -95,12 +100,24 @@ private function container(?EnabledListenerCheckerInterface $listenerChecker = n
]],
[Jobs::class, $this->createMock(JobsInterface::class)],
];
$hasServiceReturnMap = [
[RequestIdProviderInterface::class, $hasRequestIdProvider],
];

if ($listenerChecker !== null) {
$container->method('has')->with(EnabledListenerCheckerInterface::class)->willReturn(true);
$hasServiceReturnMap[] = [EnabledListenerCheckerInterface::class, true];
$getServiceReturnMap[] = [EnabledListenerCheckerInterface::class, $listenerChecker];
}

if ($hasRequestIdProvider) {
$getServiceReturnMap[] = [
RequestIdProviderInterface::class,
$this->createMock(RequestIdProviderInterface::class),
];
}

$container->method('get')->willReturnMap($getServiceReturnMap);
$container->method('has')->willReturnMap($hasServiceReturnMap);

return $container;
}
Expand Down
15 changes: 13 additions & 2 deletions test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace ShlinkioTest\Shlink\EventDispatcher\RoadRunner;

use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\Attributes\TestWith;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
Expand Down Expand Up @@ -58,14 +59,18 @@ function () use (&$callCount, $task) {
}

#[Test]
public function listenerIsLoadedAndInvoked(): void
#[TestWith(['123'])]
#[TestWith(['456'])]
#[TestWith(['abc'])]
public function listenerIsLoadedAndInvoked(string $requestId): void
{
$callCount = 0;
$task = $this->createMock(ReceivedTaskInterface::class);
$task->method('getName')->willReturn(DummyJsonDeserializable::class);
$task->method('getPayload')->willReturn(json_encode([
'listenerServiceName' => 'my_listener',
'eventPayload' => [],
'requestId' => $requestId,
]));
$task->expects($this->once())->method('complete');
$task->expects($this->never())->method('fail');
Expand All @@ -79,7 +84,12 @@ function () use (&$callCount, $task) {
});
$this->logger->expects($this->never())->method('warning');

$this->taskConsumer->listenForTasks();
$providedRequestId = null;
$this->taskConsumer->listenForTasks(function (string $id) use (&$providedRequestId): void {
$providedRequestId = $id;
});

self::assertEquals($requestId, $providedRequestId);
}

#[Test]
Expand All @@ -91,6 +101,7 @@ public function taskIsFailedInCaseOfError(): void
$task->method('getPayload')->willReturn(json_encode([
'listenerServiceName' => 'my_listener',
'eventPayload' => [],
'requestId' => '123',
]));
$task->expects($this->never())->method('complete');
$task->expects($this->once())->method('fail')->with($this->isInstanceOf(RuntimeException::class));
Expand Down
13 changes: 12 additions & 1 deletion test/RoadRunner/RoadRunnerTaskListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Shlinkio\Shlink\EventDispatcher\RoadRunner\RoadRunnerTaskListener;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\JobsInterface;
use Spiral\RoadRunner\Jobs\QueueInterface;
use Spiral\RoadRunner\Jobs\Task\PreparedTaskInterface;
Expand All @@ -27,7 +28,16 @@ class RoadRunnerTaskListenerTest extends TestCase
public function setUp(): void
{
$this->jobs = $this->createMock(JobsInterface::class);
$this->listener = new RoadRunnerTaskListener($this->jobs, $this->listenerServiceName);
$this->listener = new RoadRunnerTaskListener(
$this->jobs,
$this->listenerServiceName,
new class implements RequestIdProviderInterface {
public function currentRequestId(): string
{
return '-';
}
},
);
}

#[Test, DataProvider('provideEvents')]
Expand All @@ -40,6 +50,7 @@ public function expectedTaskIsDispatchedBasedOnProvidedEvent(object $event, arra
$queue->expects($this->once())->method('create')->with($event::class, json_encode([
'listenerServiceName' => $this->listenerServiceName,
'eventPayload' => $expectedPayload,
'requestId' => '-',
]))->willReturn($task);
$queue->expects($this->once())->method('dispatch')->with($task)->willReturn(
$this->createMock(QueuedTaskInterface::class),
Expand Down
Loading