Skip to content

Commit

Permalink
Merge pull request #36 from voryx/listen_notify
Browse files Browse the repository at this point in the history
Add LISTEN/NOTIFY support
  • Loading branch information
mbonneau authored Nov 23, 2018
2 parents 3cb99de + cabf704 commit 4afc0df
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 12 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ $client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5'])

```

## Example - LISTEN/NOTIFY
```php
$client = new PgAsync\Client([
"host" => "127.0.0.1",
"port" => "5432",
"user" => "matt",
"database" => "matt"
]);

$client->listen('some_channel')
->subscribe(function (\PgAsync\Message\NotificationResponse $message) {
echo $message->getChannelName() . ': ' . $message->getPayload() . "\n";
});

$client->query("NOTIFY some_channel, 'Hello World'")->subscribe();
```

## Install
With [composer](https://getcomposer.org/) install into you project with:

Expand Down
20 changes: 20 additions & 0 deletions example/ListenNotify.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

require_once __DIR__ . '/bootstrap.php';

$client = new PgAsync\Client([
'host' => '127.0.0.1',
'port' => '5432',
'user' => 'matt',
'database' => 'matt',
]);

$client->listen('some_channel')
->subscribe(function (\PgAsync\Message\NotificationResponse $message) {
echo $message->getChannelName() . ': ' . $message->getPayload() . "\n";
});

\Rx\Observable::timer(1000)
->flatMapTo($client->query("NOTIFY some_channel, 'Hello World'"))
->subscribe();

52 changes: 49 additions & 3 deletions src/PgAsync/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

namespace PgAsync;

use PgAsync\Message\NotificationResponse;
use React\EventLoop\LoopInterface;
use React\Socket\ConnectorInterface;
use Rx\Observable;
use Rx\Subject\Subject;

class Client
{
Expand All @@ -30,10 +32,16 @@ class Client
/** @var int */
private $maxConnections = 5;

/** @var Subject[] */
private $listeners = [];

/** @var Connection */
private $listenConnection;

public function __construct(array $parameters, LoopInterface $loop = null, ConnectorInterface $connector = null)
{
$this->loop = $loop ?: \EventLoop\getLoop();
$this->connector = $connector;
$this->loop = $loop ?: \EventLoop\getLoop();
$this->connector = $connector;

if (isset($parameters['auto_disconnect'])) {
$this->autoDisconnect = $parameters['auto_disconnect'];
Expand Down Expand Up @@ -71,7 +79,7 @@ public function executeStatement(string $queryString, array $parameters = [])
});
}

private function getLeastBusyConnection() : Connection
private function getLeastBusyConnection(): Connection
{
if (count($this->connections) === 0) {
// try to spin up another connection to return
Expand Down Expand Up @@ -158,4 +166,42 @@ public function closeNow()
$connection->disconnect();
}
}

public function listen(string $channel): Observable
{
if (isset($this->listeners[$channel])) {
return $this->listeners[$channel];
}

$unlisten = function () use ($channel) {
$this->listenConnection->query('UNLISTEN ' . $channel)->subscribe();

unset($this->listeners[$channel]);

if (empty($this->listeners)) {
$this->listenConnection->disconnect();
$this->listenConnection = null;
}
};

$this->listeners[$channel] = Observable::defer(function () use ($channel) {
if ($this->listenConnection === null) {
$this->listenConnection = $this->createNewConnection();
}

if ($this->listenConnection === null) {
throw new \Exception('Could not get new connection to listen on.');
}

return $this->listenConnection->query('LISTEN ' . $channel)
->merge($this->listenConnection->notifications())
->filter(function (NotificationResponse $message) use ($channel) {
return $message->getChannelName() === $channel;
});
})
->finally($unlisten)
->share();

return $this->listeners[$channel];
}
}
35 changes: 27 additions & 8 deletions src/PgAsync/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use PgAsync\Message\ErrorResponse;
use PgAsync\Message\Message;
use PgAsync\Message\NoticeResponse;
use PgAsync\Message\NotificationResponse;
use PgAsync\Message\ParameterStatus;
use PgAsync\Message\ParseComplete;
use PgAsync\Command\Query;
Expand All @@ -39,6 +40,7 @@
use Rx\Observable\AnonymousObservable;
use Rx\ObserverInterface;
use Rx\SchedulerInterface;
use Rx\Subject\Subject;

class Connection extends EventEmitter
{
Expand Down Expand Up @@ -108,6 +110,9 @@ class Connection extends EventEmitter
/** @var string */
private $uri;

/** @var Subject */
private $notificationSubject;

/**
* Can be 'I' for Idle, 'T' if in transactions block
* or 'E' if in failed transaction block (queries will fail until end of trans)
Expand Down Expand Up @@ -147,14 +152,15 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
unset($parameters['auto_disconnect']);
}

$this->parameters = $parameters;
$this->loop = $loop;
$this->commandQueue = [];
$this->queryState = static::STATE_BUSY;
$this->queryType = static::QUERY_SIMPLE;
$this->connStatus = static::CONNECTION_NEEDED;
$this->socket = $connector ?: new Connector($loop);
$this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port'];
$this->parameters = $parameters;
$this->loop = $loop;
$this->commandQueue = [];
$this->queryState = static::STATE_BUSY;
$this->queryType = static::QUERY_SIMPLE;
$this->connStatus = static::CONNECTION_NEEDED;
$this->socket = $connector ?: new Connector($loop);
$this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port'];
$this->notificationSubject = new Subject();
}

private function start()
Expand Down Expand Up @@ -297,9 +303,16 @@ public function handleMessage($message)
$this->handleReadyForQuery($message);
} elseif ($message instanceof RowDescription) {
$this->handleRowDescription($message);
} elseif ($message instanceof NotificationResponse) {
$this->handleNotificationResponse($message);
}
}

private function handleNotificationResponse(NotificationResponse $message)
{
$this->notificationSubject->onNext($message);
}

private function handleDataRow(DataRow $dataRow)
{
if ($this->queryState === $this::STATE_BUSY && $this->currentCommand instanceof CommandInterface) {
Expand Down Expand Up @@ -448,6 +461,8 @@ private function failAllCommandsWith(\Throwable $e = null)
{
$e = $e ?: new \Exception('unknown error');

$this->notificationSubject->onError($e);

while (count($this->commandQueue) > 0) {
$c = array_shift($this->commandQueue);
if ($c instanceof CommandInterface) {
Expand Down Expand Up @@ -636,4 +651,8 @@ private function cancelRequest()
});
}
}

public function notifications() {
return $this->notificationSubject->asObservable();
}
}
2 changes: 2 additions & 0 deletions src/PgAsync/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static function createMessageFromIdentifier(string $identifier): ParserIn
return new ReadyForQuery();
case 'T':
return new RowDescription();
case NotificationResponse::getMessageIdentifier():
return new NotificationResponse();
}

return new Discard();
Expand Down
81 changes: 81 additions & 0 deletions src/PgAsync/Message/NotificationResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
<?php

namespace PgAsync\Message;

class NotificationResponse implements ParserInterface
{
use ParserTrait;

private $payload = '';

private $notifyingProcessId = 0;

private $channelName = '';

/**
* @inheritDoc
*/
public function parseMessage(string $rawMessage)
{
$len = strlen($rawMessage);
if ($len < 10) {
throw new \UnderflowException;
}

if ($rawMessage[0] !== static::getMessageIdentifier()) {
throw new \InvalidArgumentException('Incorrect message type');
}
$currentPos = 1;
$msgLen = unpack('N', substr($rawMessage, $currentPos, 4))[1];
$currentPos += 4;
$this->notifyingProcessId = unpack('N', substr($rawMessage, $currentPos, 4))[1];
$currentPos += 4;

$rawPayload = substr($rawMessage, $currentPos);
$parts = explode("\0", $rawPayload);

if (count($parts) !== 3) {
throw new \UnderflowException('Wrong number of notification parts in payload');
}

$this->channelName = $parts[0];
$this->payload = $parts[1];
}

/**
* @return string
*/
public function getPayload(): string
{
return $this->payload;
}

/**
* @return int
*/
public function getNotifyingProcessId(): int
{
return $this->notifyingProcessId;
}

/**
* @return string
*/
public function getChannelName(): string
{
return $this->channelName;
}

/**
* @inheritDoc
*/
public static function getMessageIdentifier(): string
{
return 'A';
}

public function getNoticeMessages(): array
{
return $this->noticeMessages;
}
}
44 changes: 43 additions & 1 deletion tests/Integration/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace PgAsync\Tests\Integration;

use PgAsync\Client;
use React\EventLoop\Timer\Timer;
use PgAsync\Message\NotificationResponse;
use Rx\Observable;
use Rx\Observer\CallbackObserver;

Expand Down Expand Up @@ -191,4 +191,46 @@ function () {
$client->closeNow();
$this->getLoop()->run();
}

public function testListen()
{
$client = new Client([
"user" => $this->getDbUser(),
"database" => $this::getDbName(),
], $this->getLoop());

$testQuery = $client->listen('some_channel')
->merge($client->listen('some_channel')->take(1))
->take(3)
->concat($client->listen('some_channel')->take(1));

$values = [];

$testQuery->subscribe(
function (NotificationResponse $results) use (&$values) {
$values[] = $results->getPayload();
},
function (\Throwable $e) use (&$error) {
$this->fail('Error while testing: ' . $e->getMessage());
$this->stopLoop();
},
function () {
$this->stopLoop();
}
);

Observable::interval(300)
->take(3)
->flatMap(function ($x) use ($client) {
return $client->executeStatement("NOTIFY some_channel, 'Hello" . $x . "'");
})
->subscribe();

$this->runLoopWithTimeout(4);

$this->assertEquals(['Hello0', 'Hello0', 'Hello1', 'Hello2'], $values);

$client->closeNow();
$this->getLoop()->run();
}
}
16 changes: 16 additions & 0 deletions tests/Unit/Message/MessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,20 @@ public function testInt32()
$this->assertEquals("\x04\xd2\x16\x2f", \PgAsync\Message\Message::int32(80877103));
$this->assertEquals("\x00\x00\x00\x00", \PgAsync\Message\Message::int32(0));
}

public function testNotificationResponse()
{
$rawNotificationMessage = hex2bin('41000000190000040c686572650048656c6c6f20746865726500');


$notificationResponse = \PgAsync\Message\Message::createMessageFromIdentifier($rawNotificationMessage[0]);
$this->assertInstanceOf(\PgAsync\Message\NotificationResponse::class, $notificationResponse);
/** @var \PgAsync\Message\NotificationResponse */
$notificationResponse->parseData($rawNotificationMessage);

$this->assertEquals('Hello there', $notificationResponse->getPayload());
$this->assertEquals('here', $notificationResponse->getChannelName());
$this->assertEquals(1036, $notificationResponse->getNotifyingProcessId());

}
}

0 comments on commit 4afc0df

Please sign in to comment.