From 44722e845c920c0bde760fca728417352656128c Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Wed, 21 Nov 2018 17:53:36 -0500 Subject: [PATCH 1/5] Add LISTEN/NOTIFY support --- README.md | 16 ++++ src/PgAsync/Client.php | 62 +++++++++++++++ src/PgAsync/Connection.php | 35 +++++++-- src/PgAsync/Message/Message.php | 2 + src/PgAsync/Message/NotificationResponse.php | 81 ++++++++++++++++++++ tests/Unit/Message/MessageTest.php | 16 ++++ 6 files changed, 204 insertions(+), 8 deletions(-) create mode 100644 src/PgAsync/Message/NotificationResponse.php diff --git a/README.md b/README.md index d246dfa..27a4716 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,22 @@ $client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5']) ``` +## Example - LISTEN/NOTIFY +```php +$client = new PgAsync\Client([ + "host" => "127.0.0.1", + "port" => "5432", + "user" => "matt", + "database" => "matt", + "auto_disconnect" => true //This option will force the client to disconnect as soon as it completes. The connection will not be returned to the connection pool. +]); + +$client->listen('some_channel') + ->subscriber(function (\PgAsync\Message\NotificationResponse $message) { + echo $message->getChannelName() . ': ' . $message->getPayload() . "\n"; + }); +``` + ## Install With [composer](https://getcomposer.org/) install into you project with: diff --git a/src/PgAsync/Client.php b/src/PgAsync/Client.php index 639f511..3f0ad10 100644 --- a/src/PgAsync/Client.php +++ b/src/PgAsync/Client.php @@ -2,9 +2,11 @@ namespace PgAsync; +use PgAsync\Message\NotificationResponse; use React\EventLoop\LoopInterface; use React\Socket\ConnectorInterface; use Rx\Observable; +use Rx\Subject\Subject; class Client { @@ -30,6 +32,12 @@ class Client /** @var int */ private $maxConnections = 5; + /** @var Subject[] */ + private $listeners = []; + + /** @var Connection */ + private $listenConnection; + public function __construct(array $parameters, LoopInterface $loop = null, ConnectorInterface $connector = null) { $this->loop = $loop ?: \EventLoop\getLoop(); @@ -158,4 +166,58 @@ public function closeNow() $connection->disconnect(); } } + + public function listen($channel) + { + if (!isset($this->listeners[$channel])) { + $subscriberCount = 0; + $listenerDisposable = null; + $channelSubject = new Subject(); + $this->listeners[$channel] = Observable::defer(function () use ($channel, &$subscriberCount, &$listenerDisposable, $channelSubject) { + $unlisten = function () use ($channel, &$subscriberCount, &$listenerDisposable, $channelSubject) { + $subscriberCount--; + if ($subscriberCount !== 0) { + return; + } + $this->listenConnection->query("UNLISTEN " . $channel) + ->subscribe(); + + $listenerDisposable->dispose(); + $listenerDisposable = null; + unset($this->listeners[$channel]); + + if (empty($this->listeners)) { + $this->listenConnection->disconnect(); + $this->listenConnection = null; + } + }; + + return Observable::start(function () use ($channel, &$listenerDisposable, &$subscriberCount, $channelSubject) { + $subscriberCount++; + if ($this->listenConnection === null) { + $this->listenConnection = $this->createNewConnection(); + } + + if ($this->listenConnection === null) { + throw new \Exception('Could not get new connection to listen on.'); + } + + if ($listenerDisposable !== null) { + return; + } + $listenerDisposable = + $this->listenConnection->query("LISTEN " . $channel) + ->merge($this->listenConnection->notifications()) + ->filter(function (NotificationResponse $message) use ($channel) { + return $message->getChannelName() === $channel; + }) + ->subscribe($channelSubject); + })->skip(1) + ->merge($channelSubject) + ->finally($unlisten); + }); + } + + return $this->listeners[$channel]; + } } diff --git a/src/PgAsync/Connection.php b/src/PgAsync/Connection.php index 5a25f3a..1d69b96 100644 --- a/src/PgAsync/Connection.php +++ b/src/PgAsync/Connection.php @@ -23,6 +23,7 @@ use PgAsync\Message\ErrorResponse; use PgAsync\Message\Message; use PgAsync\Message\NoticeResponse; +use PgAsync\Message\NotificationResponse; use PgAsync\Message\ParameterStatus; use PgAsync\Message\ParseComplete; use PgAsync\Command\Query; @@ -39,6 +40,7 @@ use Rx\Observable\AnonymousObservable; use Rx\ObserverInterface; use Rx\SchedulerInterface; +use Rx\Subject\Subject; class Connection extends EventEmitter { @@ -108,6 +110,9 @@ class Connection extends EventEmitter /** @var string */ private $uri; + /** @var Subject */ + private $notificationSubject; + /** * Can be 'I' for Idle, 'T' if in transactions block * or 'E' if in failed transaction block (queries will fail until end of trans) @@ -147,14 +152,15 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt unset($parameters['auto_disconnect']); } - $this->parameters = $parameters; - $this->loop = $loop; - $this->commandQueue = []; - $this->queryState = static::STATE_BUSY; - $this->queryType = static::QUERY_SIMPLE; - $this->connStatus = static::CONNECTION_NEEDED; - $this->socket = $connector ?: new Connector($loop); - $this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port']; + $this->parameters = $parameters; + $this->loop = $loop; + $this->commandQueue = []; + $this->queryState = static::STATE_BUSY; + $this->queryType = static::QUERY_SIMPLE; + $this->connStatus = static::CONNECTION_NEEDED; + $this->socket = $connector ?: new Connector($loop); + $this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port']; + $this->notificationSubject = new Subject(); } private function start() @@ -297,9 +303,16 @@ public function handleMessage($message) $this->handleReadyForQuery($message); } elseif ($message instanceof RowDescription) { $this->handleRowDescription($message); + } elseif ($message instanceof NotificationResponse) { + $this->handleNotificationResponse($message); } } + private function handleNotificationResponse(NotificationResponse $message) + { + $this->notificationSubject->onNext($message); + } + private function handleDataRow(DataRow $dataRow) { if ($this->queryState === $this::STATE_BUSY && $this->currentCommand instanceof CommandInterface) { @@ -448,6 +461,8 @@ private function failAllCommandsWith(\Throwable $e = null) { $e = $e ?: new \Exception('unknown error'); + $this->notificationSubject->onError($e); + while (count($this->commandQueue) > 0) { $c = array_shift($this->commandQueue); if ($c instanceof CommandInterface) { @@ -636,4 +651,8 @@ private function cancelRequest() }); } } + + public function notifications() { + return $this->notificationSubject->asObservable(); + } } diff --git a/src/PgAsync/Message/Message.php b/src/PgAsync/Message/Message.php index 5550da8..8f08588 100644 --- a/src/PgAsync/Message/Message.php +++ b/src/PgAsync/Message/Message.php @@ -52,6 +52,8 @@ public static function createMessageFromIdentifier(string $identifier): ParserIn return new ReadyForQuery(); case 'T': return new RowDescription(); + case NotificationResponse::getMessageIdentifier(): + return new NotificationResponse(); } return new Discard(); diff --git a/src/PgAsync/Message/NotificationResponse.php b/src/PgAsync/Message/NotificationResponse.php new file mode 100644 index 0000000..4a27e0a --- /dev/null +++ b/src/PgAsync/Message/NotificationResponse.php @@ -0,0 +1,81 @@ +notifyingProcessId = unpack('N', substr($rawMessage, $currentPos, 4))[1]; + $currentPos += 4; + + $rawPayload = substr($rawMessage, $currentPos); + $parts = explode("\0", $rawPayload); + + if (count($parts) !== 3) { + throw new \UnderflowException('Wrong number of notification parts in payload'); + } + + $this->channelName = $parts[0]; + $this->payload = $parts[1]; + } + + /** + * @return string + */ + public function getPayload(): string + { + return $this->payload; + } + + /** + * @return int + */ + public function getNotifyingProcessId(): int + { + return $this->notifyingProcessId; + } + + /** + * @return string + */ + public function getChannelName(): string + { + return $this->channelName; + } + + /** + * @inheritDoc + */ + public static function getMessageIdentifier(): string + { + return 'A'; + } + + public function getNoticeMessages(): array + { + return $this->noticeMessages; + } +} diff --git a/tests/Unit/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php index b46d2ec..446a94a 100644 --- a/tests/Unit/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -7,4 +7,20 @@ public function testInt32() $this->assertEquals("\x04\xd2\x16\x2f", \PgAsync\Message\Message::int32(80877103)); $this->assertEquals("\x00\x00\x00\x00", \PgAsync\Message\Message::int32(0)); } + + public function testNotificationResponse() + { + $rawNotificationMessage = hex2bin('41000000190000040c686572650048656c6c6f20746865726500'); + + + $notificationResponse = \PgAsync\Message\Message::createMessageFromIdentifier($rawNotificationMessage[0]); + $this->assertInstanceOf(\PgAsync\Message\NotificationResponse::class, $notificationResponse); + /** @var \PgAsync\Message\NotificationResponse */ + $notificationResponse->parseData($rawNotificationMessage); + + $this->assertEquals('Hello there', $notificationResponse->getPayload()); + $this->assertEquals('here', $notificationResponse->getChannelName()); + $this->assertEquals(1036, $notificationResponse->getNotifyingProcessId()); + + } } From b8e945121e39a441c7d6d273682b6a7fc45bff82 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Wed, 21 Nov 2018 17:59:35 -0500 Subject: [PATCH 2/5] Typo in README --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 27a4716..75d23d5 100644 --- a/README.md +++ b/README.md @@ -73,9 +73,11 @@ $client = new PgAsync\Client([ ]); $client->listen('some_channel') - ->subscriber(function (\PgAsync\Message\NotificationResponse $message) { + ->subscribe(function (\PgAsync\Message\NotificationResponse $message) { echo $message->getChannelName() . ': ' . $message->getPayload() . "\n"; }); + +$client->query("NOTIFY some_channel, 'Hello World'")->subscribe(); ``` ## Install From 0d143f94f7df341e58874f59005032a13c73b674 Mon Sep 17 00:00:00 2001 From: David Dan Date: Wed, 21 Nov 2018 23:02:36 -0500 Subject: [PATCH 3/5] Refactored `listen()` --- README.md | 3 +- example/ListenNotify.php | 20 ++++++++++ src/PgAsync/Client.php | 86 ++++++++++++++++------------------------ 3 files changed, 56 insertions(+), 53 deletions(-) create mode 100644 example/ListenNotify.php diff --git a/README.md b/README.md index 75d23d5..80ac832 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,7 @@ $client = new PgAsync\Client([ "host" => "127.0.0.1", "port" => "5432", "user" => "matt", - "database" => "matt", - "auto_disconnect" => true //This option will force the client to disconnect as soon as it completes. The connection will not be returned to the connection pool. + "database" => "matt" ]); $client->listen('some_channel') diff --git a/example/ListenNotify.php b/example/ListenNotify.php new file mode 100644 index 0000000..7d8d6c1 --- /dev/null +++ b/example/ListenNotify.php @@ -0,0 +1,20 @@ + '127.0.0.1', + 'port' => '5432', + 'user' => 'daviddan', + 'database' => 'postgres', +]); + +$client->listen('some_channel') + ->subscribe(function (\PgAsync\Message\NotificationResponse $message) { + echo $message->getChannelName() . ': ' . $message->getPayload() . "\n"; + }); + +\Rx\Observable::timer(1000) + ->flatMapTo($client->query("NOTIFY some_channel, 'Hello World'")) + ->subscribe(); + diff --git a/src/PgAsync/Client.php b/src/PgAsync/Client.php index 3f0ad10..08012f8 100644 --- a/src/PgAsync/Client.php +++ b/src/PgAsync/Client.php @@ -40,8 +40,8 @@ class Client public function __construct(array $parameters, LoopInterface $loop = null, ConnectorInterface $connector = null) { - $this->loop = $loop ?: \EventLoop\getLoop(); - $this->connector = $connector; + $this->loop = $loop ?: \EventLoop\getLoop(); + $this->connector = $connector; if (isset($parameters['auto_disconnect'])) { $this->autoDisconnect = $parameters['auto_disconnect']; @@ -79,7 +79,7 @@ public function executeStatement(string $queryString, array $parameters = []) }); } - private function getLeastBusyConnection() : Connection + private function getLeastBusyConnection(): Connection { if (count($this->connections) === 0) { // try to spin up another connection to return @@ -167,57 +167,41 @@ public function closeNow() } } - public function listen($channel) + public function listen(string $channel): Observable { - if (!isset($this->listeners[$channel])) { - $subscriberCount = 0; - $listenerDisposable = null; - $channelSubject = new Subject(); - $this->listeners[$channel] = Observable::defer(function () use ($channel, &$subscriberCount, &$listenerDisposable, $channelSubject) { - $unlisten = function () use ($channel, &$subscriberCount, &$listenerDisposable, $channelSubject) { - $subscriberCount--; - if ($subscriberCount !== 0) { - return; - } - $this->listenConnection->query("UNLISTEN " . $channel) - ->subscribe(); - - $listenerDisposable->dispose(); - $listenerDisposable = null; - unset($this->listeners[$channel]); - - if (empty($this->listeners)) { - $this->listenConnection->disconnect(); - $this->listenConnection = null; - } - }; - - return Observable::start(function () use ($channel, &$listenerDisposable, &$subscriberCount, $channelSubject) { - $subscriberCount++; - if ($this->listenConnection === null) { - $this->listenConnection = $this->createNewConnection(); - } - - if ($this->listenConnection === null) { - throw new \Exception('Could not get new connection to listen on.'); - } - - if ($listenerDisposable !== null) { - return; - } - $listenerDisposable = - $this->listenConnection->query("LISTEN " . $channel) - ->merge($this->listenConnection->notifications()) - ->filter(function (NotificationResponse $message) use ($channel) { - return $message->getChannelName() === $channel; - }) - ->subscribe($channelSubject); - })->skip(1) - ->merge($channelSubject) - ->finally($unlisten); - }); + if (isset($this->listeners[$channel])) { + return $this->listeners[$channel]; } + $unlisten = function () use ($channel) { + $this->listenConnection->query('UNLISTEN ' . $channel)->subscribe(); + + unset($this->listeners[$channel]); + + if (empty($this->listeners)) { + $this->listenConnection->disconnect(); + $this->listenConnection = null; + } + }; + + $this->listeners[$channel] = Observable::defer(function () use ($channel) { + if ($this->listenConnection === null) { + $this->listenConnection = $this->createNewConnection(); + } + + if ($this->listenConnection === null) { + throw new \Exception('Could not get new connection to listen on.'); + } + + return $this->listenConnection->query('LISTEN ' . $channel) + ->merge($this->listenConnection->notifications()) + ->filter(function (NotificationResponse $message) use ($channel) { + return $message->getChannelName() === $channel; + }); + }) + ->finally($unlisten) + ->share(); + return $this->listeners[$channel]; } } From 790d310d3cd457891427f3144f85c40b0485a136 Mon Sep 17 00:00:00 2001 From: David Dan Date: Thu, 22 Nov 2018 14:10:18 -0500 Subject: [PATCH 4/5] Update ListenNotify.php --- example/ListenNotify.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example/ListenNotify.php b/example/ListenNotify.php index 7d8d6c1..ec6c3a2 100644 --- a/example/ListenNotify.php +++ b/example/ListenNotify.php @@ -5,8 +5,8 @@ $client = new PgAsync\Client([ 'host' => '127.0.0.1', 'port' => '5432', - 'user' => 'daviddan', - 'database' => 'postgres', + 'user' => 'matt', + 'database' => 'matt', ]); $client->listen('some_channel') From cabf7040f74842d7a3a93113937147f32bca09cf Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Thu, 22 Nov 2018 23:55:16 -0500 Subject: [PATCH 5/5] Added LISTEN tests --- tests/Integration/ClientTest.php | 44 +++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/Integration/ClientTest.php b/tests/Integration/ClientTest.php index 4f99b17..caf803b 100644 --- a/tests/Integration/ClientTest.php +++ b/tests/Integration/ClientTest.php @@ -3,7 +3,7 @@ namespace PgAsync\Tests\Integration; use PgAsync\Client; -use React\EventLoop\Timer\Timer; +use PgAsync\Message\NotificationResponse; use Rx\Observable; use Rx\Observer\CallbackObserver; @@ -191,4 +191,46 @@ function () { $client->closeNow(); $this->getLoop()->run(); } + + public function testListen() + { + $client = new Client([ + "user" => $this->getDbUser(), + "database" => $this::getDbName(), + ], $this->getLoop()); + + $testQuery = $client->listen('some_channel') + ->merge($client->listen('some_channel')->take(1)) + ->take(3) + ->concat($client->listen('some_channel')->take(1)); + + $values = []; + + $testQuery->subscribe( + function (NotificationResponse $results) use (&$values) { + $values[] = $results->getPayload(); + }, + function (\Throwable $e) use (&$error) { + $this->fail('Error while testing: ' . $e->getMessage()); + $this->stopLoop(); + }, + function () { + $this->stopLoop(); + } + ); + + Observable::interval(300) + ->take(3) + ->flatMap(function ($x) use ($client) { + return $client->executeStatement("NOTIFY some_channel, 'Hello" . $x . "'"); + }) + ->subscribe(); + + $this->runLoopWithTimeout(4); + + $this->assertEquals(['Hello0', 'Hello0', 'Hello1', 'Hello2'], $values); + + $client->closeNow(); + $this->getLoop()->run(); + } } \ No newline at end of file