Skip to content

Commit

Permalink
Merge branch 'walkor:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
lengqiuqiuqiu authored Jan 27, 2025
2 parents 6a98c9f + ea829e3 commit f41eca7
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 60 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
},
"require": {
"php": ">=8.1",
"ext-json": "*"
"ext-json": "*",
"workerman/coroutine": "^1.0 || dev-main"
},
"suggest": {
"ext-event": "For better performance. "
Expand Down
26 changes: 8 additions & 18 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,16 @@ parameters:
paths:
- src
- tests
excludePaths:
- src/Events/Swow.php
ignoreErrors:
-
path: src/Events/Revolt.php
path: src/Events/Fiber.php
messages:
- '#Property Workerman\\Events\\Revolt::\$driver has unknown class Revolt\\EventLoop\\Driver as its type.#'
- '#Property Workerman\\Events\\Fiber::\$driver has unknown class Revolt\\EventLoop\\Driver as its type.#'
- '#Call to static method getDriver\(\) on an unknown class Revolt\\EventLoop.#'
- '#Method Workerman\\Events\\Revolt::driver\(\) has invalid return type Revolt\\EventLoop\\Driver.#'
- '#Method Workerman\\Events\\Fiber::driver\(\) has invalid return type Revolt\\EventLoop\\Driver.#'
- '#Call to method .* on an unknown class Revolt\\EventLoop\\Driver.#'
-
path: src/Events/Swow.php
messages:
- '#Used function Swow\\Sync\\waitAll not found.#'
- '#Call to static method .* on an unknown class Swow\\.*.#'
- '#Function msleep not found.#'
- '#Function stream_poll_one not found.#'
- '#Caught class Swow\\SignalException not found.#'
- '#Function Swow\\Sync\\waitAll not found.#'
- '#Constant STREAM_POLLHUP not found.#'
- '#Constant STREAM_POLLIN not found.#'
- '#Constant STREAM_POLLNONE not found.#'
- '#Constant STREAM_POLLOUT not found.#'
- '#Property Workerman\\Events\\Swow::.* has unknown class Swow\\Coroutine as its type.#'
-
path: src/Events/Event.php
reportUnmatched: false
Expand All @@ -33,4 +21,6 @@ parameters:
- path: src/Timer.php
message: '#Call to static method getSuspension\(\) on an unknown class Revolt\\EventLoop.#'
- path: src/Worker.php
message: '#Constant LINE_VERSION_LENGTH not found.#'
messages:
- '#Constant LINE_VERSION_LENGTH not found.#'
- '#Call to static method run\(\) on an unknown class Swow\\Coroutine.#'
25 changes: 18 additions & 7 deletions src/Events/Revolt.php → src/Events/Fiber.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

namespace Workerman\Events;

use Fiber as BaseFiber;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver;
use function count;
Expand All @@ -25,7 +26,7 @@
/**
* Revolt eventloop
*/
final class Revolt implements EventInterface
final class Fiber implements EventInterface
{
/**
* @var Driver
Expand Down Expand Up @@ -115,7 +116,7 @@ public function delay(float $delay, callable $func, array $args = []): int
$timerId = $this->timerId++;
$closure = function () use ($func, $args, $timerId) {
unset($this->eventTimer[$timerId]);
$func(...$args);
$this->safeCall($func, ...$args);
};
$cbId = $this->driver->delay($delay, $closure);
$this->eventTimer[$timerId] = $cbId;
Expand All @@ -128,7 +129,7 @@ public function delay(float $delay, callable $func, array $args = []): int
public function repeat(float $interval, callable $func, array $args = []): int
{
$timerId = $this->timerId++;
$cbId = $this->driver->repeat($interval, static fn () => $func(...$args));
$cbId = $this->driver->repeat($interval, fn() => $this->safeCall($func, ...$args));
$this->eventTimer[$timerId] = $cbId;
return $timerId;
}
Expand All @@ -141,10 +142,9 @@ public function onReadable($stream, callable $func): void
$fdKey = (int)$stream;
if (isset($this->readEvents[$fdKey])) {
$this->driver->cancel($this->readEvents[$fdKey]);
unset($this->readEvents[$fdKey]);
}

$this->readEvents[$fdKey] = $this->driver->onReadable($stream, static fn () => $func($stream));
$this->readEvents[$fdKey] = $this->driver->onReadable($stream, fn() => $this->safeCall($func, $stream));
}

/**
Expand All @@ -171,7 +171,7 @@ public function onWritable($stream, callable $func): void
$this->driver->cancel($this->writeEvents[$fdKey]);
unset($this->writeEvents[$fdKey]);
}
$this->writeEvents[$fdKey] = $this->driver->onWritable($stream, static fn () => $func($stream));
$this->writeEvents[$fdKey] = $this->driver->onWritable($stream, fn() => $this->safeCall($func, $stream));
}

/**
Expand All @@ -198,7 +198,7 @@ public function onSignal(int $signal, callable $func): void
$this->driver->cancel($this->eventSignal[$fdKey]);
unset($this->eventSignal[$fdKey]);
}
$this->eventSignal[$fdKey] = $this->driver->onSignal($signal, static fn () => $func($signal));
$this->eventSignal[$fdKey] = $this->driver->onSignal($signal, fn() => $this->safeCall($func, $signal));
}

/**
Expand Down Expand Up @@ -262,4 +262,15 @@ public function setErrorHandler(callable $errorHandler): void
{
$this->driver->setErrorHandler($errorHandler);
}

/**
* @param callable $func
* @param ...$args
* @return void
* @throws \Throwable
*/
protected function safeCall(callable $func, ...$args): void
{
(new BaseFiber(fn() => $func(...$args)))->start();
}
}
10 changes: 4 additions & 6 deletions src/Events/Select.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ final class Select implements EventInterface
*
* @var int
*/
private int $selectTimeout = 100000000;
private int $selectTimeout = 800000;

/**
* Next run time of the timer.
Expand Down Expand Up @@ -357,7 +357,7 @@ protected function setNextTickTime(float $nextTickTime): void
{
$this->nextTickTime = $nextTickTime;
if ($nextTickTime == 0) {
$this->selectTimeout = 10000000;
$this->selectTimeout = 800000;
return;
}
$timeNow = microtime(true);
Expand Down Expand Up @@ -419,10 +419,8 @@ public function run(): void
$this->tick();
}

if ($this->signalEvents) {
// Calls signal handlers for pending signals
pcntl_signal_dispatch();
}
// Calls signal handlers for pending signals
pcntl_signal_dispatch();
}
}

Expand Down
19 changes: 11 additions & 8 deletions src/Events/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
use Throwable;

final class Swoole implements EventInterface
{
Expand Down Expand Up @@ -282,14 +283,16 @@ private function callWrite($fd)
*/
private function safeCall(callable $func, array $args = []): void
{
try {
$func(...$args);
} catch (\Throwable $e) {
if ($this->errorHandler === null) {
echo $e;
} else {
($this->errorHandler)($e);
Coroutine::create(function() use ($func, $args) {
try {
$func(...$args);
} catch (Throwable $e) {
if ($this->errorHandler === null) {
echo $e;
} else {
($this->errorHandler)($e);
}
}
}
});
}
}
3 changes: 2 additions & 1 deletion src/Events/Swow.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Workerman\Events;

use Swow\Coroutine;
use Workerman\Coroutine\Coroutine\Swow as Coroutine;
use Swow\Signal;
use Swow\SignalException;
use function Swow\Sync\waitAll;
Expand Down Expand Up @@ -296,4 +296,5 @@ private function safeCall(callable $func, array $args = []): void
}
});
}

}
17 changes: 7 additions & 10 deletions src/Timer.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
use RuntimeException;
use Throwable;
use Workerman\Events\EventInterface;
use Workerman\Events\Revolt;
use Workerman\Events\Fiber;
use Workerman\Events\Swoole;
use Workerman\Events\Swow;
use Revolt\EventLoop;
use Swoole\Coroutine\System;
use function function_exists;
use function pcntl_alarm;
use function pcntl_signal;
Expand Down Expand Up @@ -182,23 +183,19 @@ public static function sleep(float $delay): void
{
switch (Worker::$eventLoopClass) {
// Fiber
case Revolt::class:
$suspension = \Revolt\EventLoop::getSuspension();
case Fiber::class:
$suspension = EventLoop::getSuspension();
static::add($delay, function () use ($suspension) {
$suspension->resume();
}, null, false);
$suspension->suspend();
return;
// Swoole
case Swoole::class:
\Swoole\Coroutine\System::sleep($delay);
return;
// Swow
case Swow::class:
usleep((int)($delay * 1000 * 1000));
System::sleep($delay);
return;
}
throw new RuntimeException('Timer::sleep() require revolt/event-loop. Please run command "composer require revolt/event-loop" and restart workerman');
usleep((int)($delay * 1000 * 1000));
}

/**
Expand Down
41 changes: 32 additions & 9 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use AllowDynamicProperties;
use Exception;
use Revolt\EventLoop;
use RuntimeException;
use stdClass;
use Stringable;
Expand All @@ -28,9 +27,7 @@
use Workerman\Connection\UdpConnection;
use Workerman\Events\Event;
use Workerman\Events\EventInterface;
use Workerman\Events\Revolt;
use Workerman\Events\Select;
use Workerman\Protocols\ProtocolInterface;
use function defined;
use function function_exists;
use function is_resource;
Expand Down Expand Up @@ -59,7 +56,14 @@ class Worker
*
* @var string
*/
final public const VERSION = '5.0.1';
final public const VERSION = '5.1.0';

/**
* Status initial.
*
* @var int
*/
public const STATUS_INITIAL = 0;

/**
* Status starting.
Expand Down Expand Up @@ -344,9 +348,9 @@ class Worker
/**
* EventLoopClass
*
* @var class-string<EventInterface>
* @var ?class-string<EventInterface>
*/
public static string $eventLoopClass;
public static ?string $eventLoopClass = null;

/**
* After sending the stop command to the child process stopTimeout seconds,
Expand Down Expand Up @@ -432,7 +436,7 @@ class Worker
*
* @var int
*/
protected static int $status = self::STATUS_STARTING;
protected static int $status = self::STATUS_INITIAL;

/**
* UI data.
Expand Down Expand Up @@ -785,7 +789,6 @@ protected static function initGlobalEvent(): void
}

static::$eventLoopClass = match (true) {
class_exists(EventLoop::class) => Revolt::class,
extension_loaded('event') => Event::class,
default => Select::class,
};
Expand Down Expand Up @@ -2541,7 +2544,17 @@ public function run(): void
// Try to emit onWorkerStart callback.
if ($this->onWorkerStart) {
try {
($this->onWorkerStart)($this);
switch (Worker::$eventLoopClass) {
case Events\Swoole::class:
\Swoole\Coroutine::create(fn() => ($this->onWorkerStart)($this));
break;
case Events\Swow::class:
\Swow\Coroutine::run(fn() => ($this->onWorkerStart)($this));
break;
default:
(new \Fiber($this->onWorkerStart))->start($this);

}
} catch (Throwable $e) {
// Avoid rapid infinite loop exit.
sleep(1);
Expand Down Expand Up @@ -2705,4 +2718,14 @@ protected static function checkMasterIsAlive(int $masterPid): bool

return str_contains($content, 'WorkerMan') || str_contains($content, 'php');
}

/**
* If worker is running.
*
* @return bool
*/
public static function isRunning(): bool
{
return Worker::$status !== Worker::STATUS_INITIAL;
}
}

0 comments on commit f41eca7

Please sign in to comment.