From 3e1032efa96b4e463cd565612ab5b6c4ec98486b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 23 Jan 2019 20:17:06 +0100 Subject: [PATCH 1/3] Support Windows by using temporary network socket for process I/O --- README.md | 4 +- res/sqlite-worker.php | 40 ++++++- src/Factory.php | 95 ++++++++++++++- src/Io/ProcessIoDatabase.php | 8 +- tests/FunctionalDatabaseTest.php | 196 +++++++++++++++++++++++++++---- 5 files changed, 309 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 518865e..d73c548 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,9 @@ This method returns a promise that will resolve with a `DatabaseInterface` on success or will reject with an `Exception` on error. The SQLite extension is inherently blocking, so this method will spawn an SQLite worker process to run all SQLite commands and queries in a separate process without -blocking the main process. +blocking the main process. On Windows, it uses a temporary network socket +for this communication, on all other platforms it communicates over +standard process I/O pipes. ```php $factory->open('users.db')->then(function (DatabaseInterface $db) { diff --git a/res/sqlite-worker.php b/res/sqlite-worker.php index 7556f44..3fafe07 100644 --- a/res/sqlite-worker.php +++ b/res/sqlite-worker.php @@ -1,10 +1,23 @@ {"id":0,"result":true} +// +// Or via socket connection (used for Windows, which does not support non-blocking process pipe I/O) +// $ nc localhost 8080 +// $ php res/sqlite-worker.php localhost:8080 + +use Clue\React\NDJson\Decoder; +use Clue\React\NDJson\Encoder; use React\EventLoop\Factory; +use React\Stream\DuplexResourceStream; use React\Stream\ReadableResourceStream; +use React\Stream\ThroughStream; use React\Stream\WritableResourceStream; -use Clue\React\NDJson\Decoder; -use Clue\React\NDJson\Encoder; if (file_exists(__DIR__ . '/../vendor/autoload.php')) { // local project development, go from /res to /vendor @@ -15,8 +28,27 @@ } $loop = Factory::create(); -$in = new Decoder(new ReadableResourceStream(\STDIN, $loop)); -$out = new Encoder(new WritableResourceStream(\STDOUT, $loop)); + +if (isset($_SERVER['argv'][1])) { + // socket address given, so try to connect through socket (Windows) + $socket = stream_socket_client($_SERVER['argv'][1]); + $stream = new DuplexResourceStream($socket, $loop); + + // pipe input through a wrapper stream so that an error on the input stream + // will not immediately close the output stream without a chance to report + // this error through the output stream. + $through = new ThroughStream(); + $stream->on('data', function ($data) use ($through) { + $through->write($data); + }); + + $in = new Decoder($through); + $out = new Encoder($stream); +} else { + // no socket address given, use process I/O pipes + $in = new Decoder(new ReadableResourceStream(\STDIN, $loop)); + $out = new Encoder(new WritableResourceStream(\STDOUT, $loop)); +} // report error when input is invalid NDJSON $in->on('error', function (Exception $e) use ($out) { diff --git a/src/Factory.php b/src/Factory.php index 69aceef..2ac1b51 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -5,11 +5,16 @@ use React\ChildProcess\Process; use React\EventLoop\LoopInterface; use Clue\React\SQLite\Io\ProcessIoDatabase; +use React\Stream\DuplexResourceStream; +use React\Promise\Deferred; +use React\Stream\ThroughStream; class Factory { private $loop; + private $useSocket; + /** * The `Factory` is responsible for opening your [`DatabaseInterface`](#databaseinterface) instance. * It also registers everything with the main [`EventLoop`](https://github.com/reactphp/event-loop#usage). @@ -24,6 +29,9 @@ class Factory public function __construct(LoopInterface $loop) { $this->loop = $loop; + + // use socket I/O for Windows only, use faster process pipes everywhere else + $this->useSocket = DIRECTORY_SEPARATOR === '\\'; } /** @@ -33,7 +41,9 @@ public function __construct(LoopInterface $loop) * success or will reject with an `Exception` on error. The SQLite extension * is inherently blocking, so this method will spawn an SQLite worker process * to run all SQLite commands and queries in a separate process without - * blocking the main process. + * blocking the main process. On Windows, it uses a temporary network socket + * for this communication, on all other platforms it communicates over + * standard process I/O pipes. * * ```php * $factory->open('users.db')->then(function (DatabaseInterface $db) { @@ -62,6 +72,11 @@ public function __construct(LoopInterface $loop) * @return PromiseInterface Resolves with DatabaseInterface instance or rejects with Exception */ public function open($filename, $flags = null) + { + return $this->useSocket ? $this->openSocketIo($filename, $flags) : $this->openProcessIo($filename, $flags); + } + + private function openProcessIo($filename, $flags = null) { $command = 'exec ' . \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php'); @@ -121,4 +136,82 @@ public function open($filename, $flags = null) throw $e; }); } + + private function openSocketIo($filename, $flags = null) + { + $command = \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php'); + + // launch process without default STDIO pipes + $null = \DIRECTORY_SEPARATOR === '\\' ? 'nul' : '/dev/null'; + $pipes = array( + array('file', $null, 'r'), + array('file', $null, 'w'), + STDERR // array('file', $null, 'w'), + ); + + // start temporary socket on random address + $server = @stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr); + if ($server === false) { + return \React\Promise\reject( + new \RuntimeException('Unable to start temporary socket I/O server: ' . $errstr, $errno) + ); + } + + // pass random server address to child process to connect back to parent process + stream_set_blocking($server, false); + $command .= ' ' . stream_socket_get_name($server, false); + + $process = new Process($command, null, null, $pipes); + $process->start($this->loop); + + $deferred = new Deferred(function () use ($process, $server) { + $this->loop->removeReadStream($server); + fclose($server); + $process->terminate(); + + throw new \RuntimeException('Opening database cancelled'); + }); + + // time out after a few seconds if we don't receive a connection + $timeout = $this->loop->addTimer(5.0, function () use ($server, $deferred, $process) { + $this->loop->removeReadStream($server); + fclose($server); + $process->terminate(); + + $deferred->reject(new \RuntimeException('No connection detected')); + }); + + $this->loop->addReadStream($server, function () use ($server, $timeout, $filename, $flags, $deferred, $process) { + // accept once connection on server socket and stop server socket + $this->loop->cancelTimer($timeout); + $peer = stream_socket_accept($server, 0); + $this->loop->removeReadStream($server); + fclose($server); + + // use this one connection as fake process I/O streams + $connection = new DuplexResourceStream($peer, $this->loop, -1); + $process->stdin = $process->stdout = $connection; + $connection->on('close', function () use ($process) { + $process->terminate(); + }); + $process->on('exit', function () use ($connection) { + $connection->close(); + }); + + $db = new ProcessIoDatabase($process); + $args = array($filename); + if ($flags !== null) { + $args[] = $flags; + } + + $db->send('open', $args)->then(function () use ($deferred, $db) { + $deferred->resolve($db); + }, function ($e) use ($deferred, $db) { + $db->close(); + $deferred->reject($e); + }); + }); + + return $deferred->promise(); + } } diff --git a/src/Io/ProcessIoDatabase.php b/src/Io/ProcessIoDatabase.php index f3a4b27..70d2329 100644 --- a/src/Io/ProcessIoDatabase.php +++ b/src/Io/ProcessIoDatabase.php @@ -91,7 +91,11 @@ public function quit() { $promise = $this->send('close', array()); - $this->process->stdin->end(); + if ($this->process->stdin === $this->process->stdout) { + $promise->then(function () { $this->process->stdin->close(); }); + } else { + $this->process->stdin->end(); + } return $promise; } @@ -120,7 +124,7 @@ public function close() /** @internal */ public function send($method, array $params) { - if (!$this->process->stdin->isWritable()) { + if ($this->closed || !$this->process->stdin->isWritable()) { return \React\Promise\reject(new \RuntimeException('Database closed')); } diff --git a/tests/FunctionalDatabaseTest.php b/tests/FunctionalDatabaseTest.php index dcc7f28..04a7a4f 100644 --- a/tests/FunctionalDatabaseTest.php +++ b/tests/FunctionalDatabaseTest.php @@ -7,11 +7,28 @@ class FunctionalDatabaseTest extends TestCase { - public function testOpenMemoryDatabaseResolvesWithDatabaseAndRunsUntilClose() + public function provideSocketFlags() + { + if (DIRECTORY_SEPARATOR === '\\') { + return [[true]]; + } else { + return [[false], [true]]; + } + } + + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testOpenMemoryDatabaseResolvesWithDatabaseAndRunsUntilClose($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $promise->then( @@ -25,11 +42,19 @@ public function testOpenMemoryDatabaseResolvesWithDatabaseAndRunsUntilClose() $loop->run(); } - public function testOpenMemoryDatabaseResolvesWithDatabaseAndRunsUntilQuit() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testOpenMemoryDatabaseResolvesWithDatabaseAndRunsUntilQuit($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $promise->then( @@ -71,11 +96,19 @@ public function testOpenMemoryDatabaseShouldNotInheritActiveFileDescriptors() $loop->run(); } - public function testOpenInvalidPathRejects() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testOpenInvalidPathRejects($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open('/dev/foo/bar'); $promise->then( @@ -86,11 +119,19 @@ public function testOpenInvalidPathRejects() $loop->run(); } - public function testOpenInvalidFlagsRejects() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testOpenInvalidFlagsRejects($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open('::memory::', SQLITE3_OPEN_READONLY); $promise->then( @@ -101,11 +142,19 @@ public function testOpenInvalidFlagsRejects() $loop->run(); } - public function testQuitResolvesAndRunsUntilQuit() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQuitResolvesAndRunsUntilQuit($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $once = $this->expectCallableOnce(); @@ -116,8 +165,11 @@ public function testQuitResolvesAndRunsUntilQuit() $loop->run(); } - - public function testQuitResolvesAndRunsUntilQuitWhenParentHasManyFileDescriptors() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQuitResolvesAndRunsUntilQuitWhenParentHasManyFileDescriptors($flag) { $servers = array(); for ($i = 0; $i < 100; ++$i) { @@ -127,6 +179,10 @@ public function testQuitResolvesAndRunsUntilQuitWhenParentHasManyFileDescriptors $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $once = $this->expectCallableOnce(); @@ -141,11 +197,19 @@ public function testQuitResolvesAndRunsUntilQuitWhenParentHasManyFileDescriptors } } - public function testQuitTwiceWillRejectSecondCall() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQuitTwiceWillRejectSecondCall($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $once = $this->expectCallableOnce(); @@ -157,11 +221,19 @@ public function testQuitTwiceWillRejectSecondCall() $loop->run(); } - public function testQueryIntegerResolvesWithResultWithTypeIntegerAndRunsUntilQuit() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQueryIntegerResolvesWithResultWithTypeIntegerAndRunsUntilQuit($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $data = null; @@ -178,11 +250,19 @@ public function testQueryIntegerResolvesWithResultWithTypeIntegerAndRunsUntilQui $this->assertSame(array(array('value' => 1)), $data); } - public function testQueryStringResolvesWithResultWithTypeStringAndRunsUntilQuit() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQueryStringResolvesWithResultWithTypeStringAndRunsUntilQuit($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $data = null; @@ -199,11 +279,19 @@ public function testQueryStringResolvesWithResultWithTypeStringAndRunsUntilQuit( $this->assertSame(array(array('value' => 'hellö')), $data); } - public function testQueryIntegerPlaceholderPositionalResolvesWithResultWithTypeIntegerAndRunsUntilQuit() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQueryIntegerPlaceholderPositionalResolvesWithResultWithTypeIntegerAndRunsUntilQuit($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $data = null; @@ -220,11 +308,19 @@ public function testQueryIntegerPlaceholderPositionalResolvesWithResultWithTypeI $this->assertSame(array(array('value' => 1)), $data); } - public function testQueryIntegerPlaceholderNamedResolvesWithResultWithTypeIntegerAndRunsUntilQuit() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQueryIntegerPlaceholderNamedResolvesWithResultWithTypeIntegerAndRunsUntilQuit($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $data = null; @@ -241,11 +337,19 @@ public function testQueryIntegerPlaceholderNamedResolvesWithResultWithTypeIntege $this->assertSame(array(array('value' => 1)), $data); } - public function testQueryNullPlaceholderPositionalResolvesWithResultWithTypeNullAndRunsUntilQuit() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQueryNullPlaceholderPositionalResolvesWithResultWithTypeNullAndRunsUntilQuit($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $data = null; @@ -262,11 +366,19 @@ public function testQueryNullPlaceholderPositionalResolvesWithResultWithTypeNull $this->assertSame(array(array('value' => null)), $data); } - public function testQueryRejectsWhenQueryIsInvalid() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQueryRejectsWhenQueryIsInvalid($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $once = $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')); @@ -279,11 +391,19 @@ public function testQueryRejectsWhenQueryIsInvalid() $loop->run(); } - public function testQueryRejectsWhenClosedImmediately() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testQueryRejectsWhenClosedImmediately($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $once = $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')); @@ -296,11 +416,19 @@ public function testQueryRejectsWhenClosedImmediately() $loop->run(); } - public function testExecCreateTableResolvesWithResultWithoutRows() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testExecCreateTableResolvesWithResultWithoutRows($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $data = 'n/a'; @@ -317,11 +445,19 @@ public function testExecCreateTableResolvesWithResultWithoutRows() $this->assertNull($data); } - public function testExecRejectsWhenClosedImmediately() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testExecRejectsWhenClosedImmediately($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $once = $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')); @@ -334,17 +470,25 @@ public function testExecRejectsWhenClosedImmediately() $loop->run(); } - public function testExecRejectsWhenAlreadyClosed() + /** + * @dataProvider provideSocketFlags + * @param bool $flag + */ + public function testExecRejectsWhenAlreadyClosed($flag) { $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); + $ref = new ReflectionProperty($factory, 'useSocket'); + $ref->setAccessible(true); + $ref->setValue($factory, $flag); + $promise = $factory->open(':memory:'); $once = $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')); $promise->then(function (DatabaseInterface $db) use ($once){ $db->close(); - $db->exec('USE a')->then(null, $once); + $db->exec('USE a')->then('var_dump', $once); }); $loop->run(); @@ -354,8 +498,8 @@ protected function expectCallableNever() { $mock = $this->createCallableMock(); $mock - ->expects($this->never()) - ->method('__invoke'); + ->expects($this->never()) + ->method('__invoke'); return $mock; } @@ -364,8 +508,8 @@ protected function expectCallableOnce() { $mock = $this->createCallableMock(); $mock - ->expects($this->once()) - ->method('__invoke'); + ->expects($this->once()) + ->method('__invoke'); return $mock; } @@ -374,9 +518,9 @@ protected function expectCallableOnceWith($value) { $mock = $this->createCallableMock(); $mock - ->expects($this->once()) - ->method('__invoke') - ->with($value); + ->expects($this->once()) + ->method('__invoke') + ->with($value); return $mock; } From b23c688a6a183f8bf2d3674c475f7ce260c9e18e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Mon, 29 Apr 2019 14:26:01 +0200 Subject: [PATCH 2/3] Run tests on Windows via Travis CI Refs https://github.com/reactphp/child-process/pull/71 --- .travis.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.travis.yml b/.travis.yml index 06d52eb..d0a9e40 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,8 +17,16 @@ matrix: include: - php: hhvm install: composer require phpunit/phpunit:^5 --dev --no-interaction + - name: "Windows" + os: windows + language: shell # no built-in php support + before_install: + - choco install php + - choco install composer + - export PATH="$(powershell -Command '("Process", "Machine" | % { [Environment]::GetEnvironmentVariable("PATH", $_) -Split ";" -Replace "\\$", "" } | Select -Unique | % { cygpath $_ }) -Join ":"')" allow_failures: - php: hhvm + - os: windows install: - composer install --no-interaction From b0dfef0b681ddc00cb95195dc7d27654b1d5eac6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Mon, 29 Apr 2019 15:00:34 +0200 Subject: [PATCH 3/3] Enable ext-sqlite3 on Travis CI before installing project --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index d0a9e40..46ce133 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,8 @@ matrix: - choco install php - choco install composer - export PATH="$(powershell -Command '("Process", "Machine" | % { [Environment]::GetEnvironmentVariable("PATH", $_) -Split ";" -Replace "\\$", "" } | Select -Unique | % { cygpath $_ }) -Join ":"')" + - php -r "file_put_contents(php_ini_loaded_file(),'extension_dir=ext'.PHP_EOL,FILE_APPEND);" + - php -r "file_put_contents(php_ini_loaded_file(),'extension=sqlite3'.PHP_EOL,FILE_APPEND);" allow_failures: - php: hhvm - os: windows