diff --git a/README.md b/README.md index d246dfa..80ac832 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,23 @@ $client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5']) ``` +## Example - LISTEN/NOTIFY +```php +$client = new PgAsync\Client([ + "host" => "127.0.0.1", + "port" => "5432", + "user" => "matt", + "database" => "matt" +]); + +$client->listen('some_channel') + ->subscribe(function (\PgAsync\Message\NotificationResponse $message) { + echo $message->getChannelName() . ': ' . $message->getPayload() . "\n"; + }); + +$client->query("NOTIFY some_channel, 'Hello World'")->subscribe(); +``` + ## Install With [composer](https://getcomposer.org/) install into you project with: diff --git a/example/ListenNotify.php b/example/ListenNotify.php new file mode 100644 index 0000000..ec6c3a2 --- /dev/null +++ b/example/ListenNotify.php @@ -0,0 +1,20 @@ + '127.0.0.1', + 'port' => '5432', + 'user' => 'matt', + 'database' => 'matt', +]); + +$client->listen('some_channel') + ->subscribe(function (\PgAsync\Message\NotificationResponse $message) { + echo $message->getChannelName() . ': ' . $message->getPayload() . "\n"; + }); + +\Rx\Observable::timer(1000) + ->flatMapTo($client->query("NOTIFY some_channel, 'Hello World'")) + ->subscribe(); + diff --git a/src/PgAsync/Client.php b/src/PgAsync/Client.php index 639f511..08012f8 100644 --- a/src/PgAsync/Client.php +++ b/src/PgAsync/Client.php @@ -2,9 +2,11 @@ namespace PgAsync; +use PgAsync\Message\NotificationResponse; use React\EventLoop\LoopInterface; use React\Socket\ConnectorInterface; use Rx\Observable; +use Rx\Subject\Subject; class Client { @@ -30,10 +32,16 @@ class Client /** @var int */ private $maxConnections = 5; + /** @var Subject[] */ + private $listeners = []; + + /** @var Connection */ + private $listenConnection; + public function __construct(array $parameters, LoopInterface $loop = null, ConnectorInterface $connector = null) { - $this->loop = $loop ?: \EventLoop\getLoop(); - $this->connector = $connector; + $this->loop = $loop ?: \EventLoop\getLoop(); + $this->connector = $connector; if (isset($parameters['auto_disconnect'])) { $this->autoDisconnect = $parameters['auto_disconnect']; @@ -71,7 +79,7 @@ public function executeStatement(string $queryString, array $parameters = []) }); } - private function getLeastBusyConnection() : Connection + private function getLeastBusyConnection(): Connection { if (count($this->connections) === 0) { // try to spin up another connection to return @@ -158,4 +166,42 @@ public function closeNow() $connection->disconnect(); } } + + public function listen(string $channel): Observable + { + if (isset($this->listeners[$channel])) { + return $this->listeners[$channel]; + } + + $unlisten = function () use ($channel) { + $this->listenConnection->query('UNLISTEN ' . $channel)->subscribe(); + + unset($this->listeners[$channel]); + + if (empty($this->listeners)) { + $this->listenConnection->disconnect(); + $this->listenConnection = null; + } + }; + + $this->listeners[$channel] = Observable::defer(function () use ($channel) { + if ($this->listenConnection === null) { + $this->listenConnection = $this->createNewConnection(); + } + + if ($this->listenConnection === null) { + throw new \Exception('Could not get new connection to listen on.'); + } + + return $this->listenConnection->query('LISTEN ' . $channel) + ->merge($this->listenConnection->notifications()) + ->filter(function (NotificationResponse $message) use ($channel) { + return $message->getChannelName() === $channel; + }); + }) + ->finally($unlisten) + ->share(); + + return $this->listeners[$channel]; + } } diff --git a/src/PgAsync/Connection.php b/src/PgAsync/Connection.php index 5a25f3a..1d69b96 100644 --- a/src/PgAsync/Connection.php +++ b/src/PgAsync/Connection.php @@ -23,6 +23,7 @@ use PgAsync\Message\ErrorResponse; use PgAsync\Message\Message; use PgAsync\Message\NoticeResponse; +use PgAsync\Message\NotificationResponse; use PgAsync\Message\ParameterStatus; use PgAsync\Message\ParseComplete; use PgAsync\Command\Query; @@ -39,6 +40,7 @@ use Rx\Observable\AnonymousObservable; use Rx\ObserverInterface; use Rx\SchedulerInterface; +use Rx\Subject\Subject; class Connection extends EventEmitter { @@ -108,6 +110,9 @@ class Connection extends EventEmitter /** @var string */ private $uri; + /** @var Subject */ + private $notificationSubject; + /** * Can be 'I' for Idle, 'T' if in transactions block * or 'E' if in failed transaction block (queries will fail until end of trans) @@ -147,14 +152,15 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt unset($parameters['auto_disconnect']); } - $this->parameters = $parameters; - $this->loop = $loop; - $this->commandQueue = []; - $this->queryState = static::STATE_BUSY; - $this->queryType = static::QUERY_SIMPLE; - $this->connStatus = static::CONNECTION_NEEDED; - $this->socket = $connector ?: new Connector($loop); - $this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port']; + $this->parameters = $parameters; + $this->loop = $loop; + $this->commandQueue = []; + $this->queryState = static::STATE_BUSY; + $this->queryType = static::QUERY_SIMPLE; + $this->connStatus = static::CONNECTION_NEEDED; + $this->socket = $connector ?: new Connector($loop); + $this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port']; + $this->notificationSubject = new Subject(); } private function start() @@ -297,9 +303,16 @@ public function handleMessage($message) $this->handleReadyForQuery($message); } elseif ($message instanceof RowDescription) { $this->handleRowDescription($message); + } elseif ($message instanceof NotificationResponse) { + $this->handleNotificationResponse($message); } } + private function handleNotificationResponse(NotificationResponse $message) + { + $this->notificationSubject->onNext($message); + } + private function handleDataRow(DataRow $dataRow) { if ($this->queryState === $this::STATE_BUSY && $this->currentCommand instanceof CommandInterface) { @@ -448,6 +461,8 @@ private function failAllCommandsWith(\Throwable $e = null) { $e = $e ?: new \Exception('unknown error'); + $this->notificationSubject->onError($e); + while (count($this->commandQueue) > 0) { $c = array_shift($this->commandQueue); if ($c instanceof CommandInterface) { @@ -636,4 +651,8 @@ private function cancelRequest() }); } } + + public function notifications() { + return $this->notificationSubject->asObservable(); + } } diff --git a/src/PgAsync/Message/Message.php b/src/PgAsync/Message/Message.php index 5550da8..8f08588 100644 --- a/src/PgAsync/Message/Message.php +++ b/src/PgAsync/Message/Message.php @@ -52,6 +52,8 @@ public static function createMessageFromIdentifier(string $identifier): ParserIn return new ReadyForQuery(); case 'T': return new RowDescription(); + case NotificationResponse::getMessageIdentifier(): + return new NotificationResponse(); } return new Discard(); diff --git a/src/PgAsync/Message/NotificationResponse.php b/src/PgAsync/Message/NotificationResponse.php new file mode 100644 index 0000000..4a27e0a --- /dev/null +++ b/src/PgAsync/Message/NotificationResponse.php @@ -0,0 +1,81 @@ +notifyingProcessId = unpack('N', substr($rawMessage, $currentPos, 4))[1]; + $currentPos += 4; + + $rawPayload = substr($rawMessage, $currentPos); + $parts = explode("\0", $rawPayload); + + if (count($parts) !== 3) { + throw new \UnderflowException('Wrong number of notification parts in payload'); + } + + $this->channelName = $parts[0]; + $this->payload = $parts[1]; + } + + /** + * @return string + */ + public function getPayload(): string + { + return $this->payload; + } + + /** + * @return int + */ + public function getNotifyingProcessId(): int + { + return $this->notifyingProcessId; + } + + /** + * @return string + */ + public function getChannelName(): string + { + return $this->channelName; + } + + /** + * @inheritDoc + */ + public static function getMessageIdentifier(): string + { + return 'A'; + } + + public function getNoticeMessages(): array + { + return $this->noticeMessages; + } +} diff --git a/tests/Integration/ClientTest.php b/tests/Integration/ClientTest.php index 4f99b17..caf803b 100644 --- a/tests/Integration/ClientTest.php +++ b/tests/Integration/ClientTest.php @@ -3,7 +3,7 @@ namespace PgAsync\Tests\Integration; use PgAsync\Client; -use React\EventLoop\Timer\Timer; +use PgAsync\Message\NotificationResponse; use Rx\Observable; use Rx\Observer\CallbackObserver; @@ -191,4 +191,46 @@ function () { $client->closeNow(); $this->getLoop()->run(); } + + public function testListen() + { + $client = new Client([ + "user" => $this->getDbUser(), + "database" => $this::getDbName(), + ], $this->getLoop()); + + $testQuery = $client->listen('some_channel') + ->merge($client->listen('some_channel')->take(1)) + ->take(3) + ->concat($client->listen('some_channel')->take(1)); + + $values = []; + + $testQuery->subscribe( + function (NotificationResponse $results) use (&$values) { + $values[] = $results->getPayload(); + }, + function (\Throwable $e) use (&$error) { + $this->fail('Error while testing: ' . $e->getMessage()); + $this->stopLoop(); + }, + function () { + $this->stopLoop(); + } + ); + + Observable::interval(300) + ->take(3) + ->flatMap(function ($x) use ($client) { + return $client->executeStatement("NOTIFY some_channel, 'Hello" . $x . "'"); + }) + ->subscribe(); + + $this->runLoopWithTimeout(4); + + $this->assertEquals(['Hello0', 'Hello0', 'Hello1', 'Hello2'], $values); + + $client->closeNow(); + $this->getLoop()->run(); + } } \ No newline at end of file diff --git a/tests/Unit/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php index b46d2ec..446a94a 100644 --- a/tests/Unit/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -7,4 +7,20 @@ public function testInt32() $this->assertEquals("\x04\xd2\x16\x2f", \PgAsync\Message\Message::int32(80877103)); $this->assertEquals("\x00\x00\x00\x00", \PgAsync\Message\Message::int32(0)); } + + public function testNotificationResponse() + { + $rawNotificationMessage = hex2bin('41000000190000040c686572650048656c6c6f20746865726500'); + + + $notificationResponse = \PgAsync\Message\Message::createMessageFromIdentifier($rawNotificationMessage[0]); + $this->assertInstanceOf(\PgAsync\Message\NotificationResponse::class, $notificationResponse); + /** @var \PgAsync\Message\NotificationResponse */ + $notificationResponse->parseData($rawNotificationMessage); + + $this->assertEquals('Hello there', $notificationResponse->getPayload()); + $this->assertEquals('here', $notificationResponse->getChannelName()); + $this->assertEquals(1036, $notificationResponse->getNotifyingProcessId()); + + } }