Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Windows by using temporary network socket for process I/O #13

Merged
merged 3 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ matrix:
include:
- php: hhvm
install: composer require phpunit/phpunit:^5 --dev --no-interaction
- name: "Windows"
os: windows
language: shell # no built-in php support
before_install:
- choco install php
- choco install composer
- export PATH="$(powershell -Command '("Process", "Machine" | % { [Environment]::GetEnvironmentVariable("PATH", $_) -Split ";" -Replace "\\$", "" } | Select -Unique | % { cygpath $_ }) -Join ":"')"
- php -r "file_put_contents(php_ini_loaded_file(),'extension_dir=ext'.PHP_EOL,FILE_APPEND);"
- php -r "file_put_contents(php_ini_loaded_file(),'extension=sqlite3'.PHP_EOL,FILE_APPEND);"
allow_failures:
- php: hhvm
- os: windows

install:
- composer install --no-interaction
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ This method returns a promise that will resolve with a `DatabaseInterface` on
success or will reject with an `Exception` on error. The SQLite extension
is inherently blocking, so this method will spawn an SQLite worker process
to run all SQLite commands and queries in a separate process without
blocking the main process.
blocking the main process. On Windows, it uses a temporary network socket
for this communication, on all other platforms it communicates over
standard process I/O pipes.

```php
$factory->open('users.db')->then(function (DatabaseInterface $db) {
Expand Down
40 changes: 36 additions & 4 deletions res/sqlite-worker.php
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
<?php

// This child worker process will be started by the main process to start communication over process pipe I/O
//
// Communication happens via newline-delimited JSON-RPC messages, see:
// $ php res/sqlite-worker.php
// < {"id":0,"method":"open","params":["test.db"]}
// > {"id":0,"result":true}
//
// Or via socket connection (used for Windows, which does not support non-blocking process pipe I/O)
// $ nc localhost 8080
// $ php res/sqlite-worker.php localhost:8080

use Clue\React\NDJson\Decoder;
use Clue\React\NDJson\Encoder;
use React\EventLoop\Factory;
use React\Stream\DuplexResourceStream;
use React\Stream\ReadableResourceStream;
use React\Stream\ThroughStream;
use React\Stream\WritableResourceStream;
use Clue\React\NDJson\Decoder;
use Clue\React\NDJson\Encoder;

if (file_exists(__DIR__ . '/../vendor/autoload.php')) {
// local project development, go from /res to /vendor
Expand All @@ -15,8 +28,27 @@
}

$loop = Factory::create();
$in = new Decoder(new ReadableResourceStream(\STDIN, $loop));
$out = new Encoder(new WritableResourceStream(\STDOUT, $loop));

if (isset($_SERVER['argv'][1])) {
// socket address given, so try to connect through socket (Windows)
$socket = stream_socket_client($_SERVER['argv'][1]);
$stream = new DuplexResourceStream($socket, $loop);

// pipe input through a wrapper stream so that an error on the input stream
// will not immediately close the output stream without a chance to report
// this error through the output stream.
$through = new ThroughStream();
$stream->on('data', function ($data) use ($through) {
$through->write($data);
});

$in = new Decoder($through);
$out = new Encoder($stream);
} else {
// no socket address given, use process I/O pipes
$in = new Decoder(new ReadableResourceStream(\STDIN, $loop));
$out = new Encoder(new WritableResourceStream(\STDOUT, $loop));
}

// report error when input is invalid NDJSON
$in->on('error', function (Exception $e) use ($out) {
Expand Down
95 changes: 94 additions & 1 deletion src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
use React\ChildProcess\Process;
use React\EventLoop\LoopInterface;
use Clue\React\SQLite\Io\ProcessIoDatabase;
use React\Stream\DuplexResourceStream;
use React\Promise\Deferred;
use React\Stream\ThroughStream;

class Factory
{
private $loop;

private $useSocket;

/**
* The `Factory` is responsible for opening your [`DatabaseInterface`](#databaseinterface) instance.
* It also registers everything with the main [`EventLoop`](https://github.com/reactphp/event-loop#usage).
Expand All @@ -24,6 +29,9 @@ class Factory
public function __construct(LoopInterface $loop)
{
$this->loop = $loop;

// use socket I/O for Windows only, use faster process pipes everywhere else
$this->useSocket = DIRECTORY_SEPARATOR === '\\';
}

/**
Expand All @@ -33,7 +41,9 @@ public function __construct(LoopInterface $loop)
* success or will reject with an `Exception` on error. The SQLite extension
* is inherently blocking, so this method will spawn an SQLite worker process
* to run all SQLite commands and queries in a separate process without
* blocking the main process.
* blocking the main process. On Windows, it uses a temporary network socket
* for this communication, on all other platforms it communicates over
* standard process I/O pipes.
*
* ```php
* $factory->open('users.db')->then(function (DatabaseInterface $db) {
Expand Down Expand Up @@ -62,6 +72,11 @@ public function __construct(LoopInterface $loop)
* @return PromiseInterface<DatabaseInterface> Resolves with DatabaseInterface instance or rejects with Exception
*/
public function open($filename, $flags = null)
{
return $this->useSocket ? $this->openSocketIo($filename, $flags) : $this->openProcessIo($filename, $flags);
}

private function openProcessIo($filename, $flags = null)
{
$command = 'exec ' . \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php');

Expand Down Expand Up @@ -121,4 +136,82 @@ public function open($filename, $flags = null)
throw $e;
});
}

private function openSocketIo($filename, $flags = null)
{
$command = \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php');

// launch process without default STDIO pipes
$null = \DIRECTORY_SEPARATOR === '\\' ? 'nul' : '/dev/null';
$pipes = array(
array('file', $null, 'r'),
array('file', $null, 'w'),
STDERR // array('file', $null, 'w'),
);

// start temporary socket on random address
$server = @stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr);
if ($server === false) {
return \React\Promise\reject(
new \RuntimeException('Unable to start temporary socket I/O server: ' . $errstr, $errno)
);
}

// pass random server address to child process to connect back to parent process
stream_set_blocking($server, false);
$command .= ' ' . stream_socket_get_name($server, false);

$process = new Process($command, null, null, $pipes);
$process->start($this->loop);

$deferred = new Deferred(function () use ($process, $server) {
$this->loop->removeReadStream($server);
fclose($server);
$process->terminate();

throw new \RuntimeException('Opening database cancelled');
});

// time out after a few seconds if we don't receive a connection
$timeout = $this->loop->addTimer(5.0, function () use ($server, $deferred, $process) {
$this->loop->removeReadStream($server);
fclose($server);
$process->terminate();

$deferred->reject(new \RuntimeException('No connection detected'));
});

$this->loop->addReadStream($server, function () use ($server, $timeout, $filename, $flags, $deferred, $process) {
// accept once connection on server socket and stop server socket
$this->loop->cancelTimer($timeout);
$peer = stream_socket_accept($server, 0);
$this->loop->removeReadStream($server);
fclose($server);

// use this one connection as fake process I/O streams
$connection = new DuplexResourceStream($peer, $this->loop, -1);
$process->stdin = $process->stdout = $connection;
$connection->on('close', function () use ($process) {
$process->terminate();
});
$process->on('exit', function () use ($connection) {
$connection->close();
});

$db = new ProcessIoDatabase($process);
$args = array($filename);
if ($flags !== null) {
$args[] = $flags;
}

$db->send('open', $args)->then(function () use ($deferred, $db) {
$deferred->resolve($db);
}, function ($e) use ($deferred, $db) {
$db->close();
$deferred->reject($e);
});
});

return $deferred->promise();
}
}
8 changes: 6 additions & 2 deletions src/Io/ProcessIoDatabase.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ public function quit()
{
$promise = $this->send('close', array());

$this->process->stdin->end();
if ($this->process->stdin === $this->process->stdout) {
$promise->then(function () { $this->process->stdin->close(); });
} else {
$this->process->stdin->end();
}

return $promise;
}
Expand Down Expand Up @@ -120,7 +124,7 @@ public function close()
/** @internal */
public function send($method, array $params)
{
if (!$this->process->stdin->isWritable()) {
if ($this->closed || !$this->process->stdin->isWritable()) {
return \React\Promise\reject(new \RuntimeException('Database closed'));
}

Expand Down
Loading