Skip to content

Commit

Permalink
Merge pull request #1 from walkor/master
Browse files Browse the repository at this point in the history
同步最新
  • Loading branch information
hkui authored Mar 3, 2020
2 parents c2e940a + b6bae50 commit 4284726
Show file tree
Hide file tree
Showing 17 changed files with 415 additions and 344 deletions.
6 changes: 3 additions & 3 deletions Autoloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public static function setRootPath($root_path)
*/
public static function loadByNamespace($name)
{
$class_path = \str_replace('\\', DIRECTORY_SEPARATOR, $name);
$class_path = \str_replace('\\', \DIRECTORY_SEPARATOR, $name);
if (\strpos($name, 'Workerman\\') === 0) {
$class_file = __DIR__ . \substr($class_path, \strlen('Workerman')) . '.php';
} else {
if (self::$_autoloadRootPath) {
$class_file = self::$_autoloadRootPath . DIRECTORY_SEPARATOR . $class_path . '.php';
$class_file = self::$_autoloadRootPath . \DIRECTORY_SEPARATOR . $class_path . '.php';
}
if (empty($class_file) || !\is_file($class_file)) {
$class_file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . "$class_path.php";
$class_file = __DIR__ . \DIRECTORY_SEPARATOR . '..' . \DIRECTORY_SEPARATOR . "$class_path.php";
}
}

Expand Down
31 changes: 18 additions & 13 deletions Connection/AsyncTcpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use Workerman\Events\EventInterface;
use Workerman\Lib\Timer;
use Workerman\Worker;
use Exception;
use \Exception;

/**
* AsyncTcpConnection.
Expand Down Expand Up @@ -119,7 +119,7 @@ public function __construct($remote_address, array $context_option = array())
}
} else {
if (!isset($address_info['port'])) {
$address_info['port'] = 80;
$address_info['port'] = 0;
}
if (!isset($address_info['path'])) {
$address_info['path'] = '/';
Expand All @@ -137,7 +137,7 @@ public function __construct($remote_address, array $context_option = array())
}

$this->id = $this->_id = self::$_idRecorder++;
if(PHP_INT_MAX === self::$_idRecorder){
if(\PHP_INT_MAX === self::$_idRecorder){
self::$_idRecorder = 0;
}
// Check application layer protocol class.
Expand All @@ -155,8 +155,9 @@ public function __construct($remote_address, array $context_option = array())
}

// For statistics.
self::$statistics['connection_count']++;
++self::$statistics['connection_count'];
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
$this->maxPackageSize = self::$defaultMaxPackageSize;
$this->_contextOption = $context_option;
static::$connections[$this->_id] = $this;
}
Expand All @@ -175,22 +176,26 @@ public function connect()
$this->_status = self::STATUS_CONNECTING;
$this->_connectStartTime = \microtime(true);
if ($this->transport !== 'unix') {
if (!$this->_remotePort) {
$this->_remotePort = $this->transport === 'ssl' ? 443 : 80;
$this->_remoteAddress = $this->_remoteHost.':'.$this->_remotePort;
}
// Open socket connection asynchronously.
if ($this->_contextOption) {
$context = \stream_context_create($this->_contextOption);
$this->_socket = \stream_socket_client("tcp://{$this->_remoteHost}:{$this->_remotePort}",
$errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT, $context);
$errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT, $context);
} else {
$this->_socket = \stream_socket_client("tcp://{$this->_remoteHost}:{$this->_remotePort}",
$errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT);
$errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT);
}
} else {
$this->_socket = \stream_socket_client("{$this->transport}://{$this->_remoteAddress}", $errno, $errstr, 0,
STREAM_CLIENT_ASYNC_CONNECT);
\STREAM_CLIENT_ASYNC_CONNECT);
}
// If failed attempt to emit onError callback.
if (!$this->_socket || !\is_resource($this->_socket)) {
$this->emitError(WORKERMAN_CONNECT_FAIL, $errstr);
$this->emitError(\WORKERMAN_CONNECT_FAIL, $errstr);
if ($this->_status === self::STATUS_CLOSING) {
$this->destroy();
}
Expand All @@ -202,7 +207,7 @@ public function connect()
// Add socket to global event loop waiting connection is successfully established or faild.
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection'));
// For windows.
if(DIRECTORY_SEPARATOR === '\\') {
if(\DIRECTORY_SEPARATOR === '\\') {
Worker::$globalEvent->add($this->_socket, EventInterface::EV_EXCEPT, array($this, 'checkConnection'));
}
}
Expand Down Expand Up @@ -289,7 +294,7 @@ protected function emitError($code, $msg)
public function checkConnection()
{
// Remove EV_EXPECT for windows.
if(DIRECTORY_SEPARATOR === '\\') {
if(\DIRECTORY_SEPARATOR === '\\') {
Worker::$globalEvent->del($this->_socket, EventInterface::EV_EXCEPT);
}

Expand All @@ -311,8 +316,8 @@ public function checkConnection()
// Try to open keepalive for tcp and disable Nagle algorithm.
if (\function_exists('socket_import_stream') && $this->transport === 'tcp') {
$raw_socket = \socket_import_stream($this->_socket);
\socket_set_option($raw_socket, SOL_SOCKET, SO_KEEPALIVE, 1);
\socket_set_option($raw_socket, SOL_TCP, TCP_NODELAY, 1);
\socket_set_option($raw_socket, \SOL_SOCKET, \SO_KEEPALIVE, 1);
\socket_set_option($raw_socket, \SOL_TCP, \TCP_NODELAY, 1);
}

// SSL handshake.
Expand Down Expand Up @@ -360,7 +365,7 @@ public function checkConnection()
}
} else {
// Connection failed.
$this->emitError(WORKERMAN_CONNECT_FAIL, 'connect ' . $this->_remoteAddress . ' fail after ' . round(\microtime(true) - $this->_connectStartTime, 4) . ' seconds');
$this->emitError(\WORKERMAN_CONNECT_FAIL, 'connect ' . $this->_remoteAddress . ' fail after ' . round(\microtime(true) - $this->_connectStartTime, 4) . ' seconds');
if ($this->_status === self::STATUS_CLOSING) {
$this->destroy();
}
Expand Down
6 changes: 3 additions & 3 deletions Connection/AsyncUdpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use Workerman\Events\EventInterface;
use Workerman\Worker;
use Exception;
use \Exception;

/**
* AsyncTcpConnection.
Expand Down Expand Up @@ -94,7 +94,7 @@ public function baseRead($socket)
$parser = $this->protocol;
$recv_buffer = $parser::decode($recv_buffer, $this);
}
ConnectionInterface::$statistics['total_request']++;
++ConnectionInterface::$statistics['total_request'];
try {
\call_user_func($this->onMessage, $this, $recv_buffer);
} catch (\Exception $e) {
Expand Down Expand Up @@ -176,7 +176,7 @@ public function connect()
if ($this->_contextOption) {
$context = \stream_context_create($this->_contextOption);
$this->_socket = \stream_socket_client("udp://{$this->_remoteAddress}", $errno, $errmsg,
30, STREAM_CLIENT_CONNECT, $context);
30, \STREAM_CLIENT_CONNECT, $context);
} else {
$this->_socket = \stream_socket_client("udp://{$this->_remoteAddress}", $errno, $errmsg);
}
Expand Down
70 changes: 37 additions & 33 deletions Connection/TcpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use Workerman\Events\EventInterface;
use Workerman\Worker;
use Exception;
use \Exception;

/**
* TcpConnection.
Expand Down Expand Up @@ -292,9 +292,9 @@ public function __call($name, array $arguments) {
*/
public function __construct($socket, $remote_address = '')
{
self::$statistics['connection_count']++;
++self::$statistics['connection_count'];
$this->id = $this->_id = self::$_idRecorder++;
if(self::$_idRecorder === PHP_INT_MAX){
if(self::$_idRecorder === \PHP_INT_MAX){
self::$_idRecorder = 0;
}
$this->_socket = $socket;
Expand All @@ -315,7 +315,7 @@ public function __construct($socket, $remote_address = '')
*
* @param bool $raw_output
*
* @return int
* @return int|string
*/
public function getStatus($raw_output = true)
{
Expand Down Expand Up @@ -351,7 +351,7 @@ public function send($send_buffer, $raw = false)
($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
) {
if ($this->_sendBuffer && $this->bufferIsFull()) {
self::$statistics['send_fail']++;
++self::$statistics['send_fail'];
return false;
}
$this->_sendBuffer .= $send_buffer;
Expand Down Expand Up @@ -382,10 +382,10 @@ public function send($send_buffer, $raw = false)
} else {
// Connection closed?
if (!\is_resource($this->_socket) || \feof($this->_socket)) {
self::$statistics['send_fail']++;
++self::$statistics['send_fail'];
if ($this->onError) {
try {
\call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
\call_user_func($this->onError, $this, \WORKERMAN_SEND_FAIL, 'client closed');
} catch (\Exception $e) {
Worker::log($e);
exit(250);
Expand All @@ -403,16 +403,16 @@ public function send($send_buffer, $raw = false)
// Check if the send buffer will be full.
$this->checkBufferWillFull();
return;
} else {
if ($this->bufferIsFull()) {
self::$statistics['send_fail']++;
return false;
}
}

$this->_sendBuffer .= $send_buffer;
// Check if the send buffer is full.
$this->checkBufferWillFull();
if ($this->bufferIsFull()) {
++self::$statistics['send_fail'];
return false;
}

$this->_sendBuffer .= $send_buffer;
// Check if the send buffer is full.
$this->checkBufferWillFull();
}

/**
Expand All @@ -424,7 +424,7 @@ public function getRemoteIp()
{
$pos = \strrpos($this->_remoteAddress, ':');
if ($pos) {
return \substr($this->_remoteAddress, 0, $pos);
return (string) \substr($this->_remoteAddress, 0, $pos);
}
return '';
}
Expand All @@ -437,7 +437,7 @@ public function getRemoteIp()
public function getRemotePort()
{
if ($this->_remoteAddress) {
return (int)\substr(\strrchr($this->_remoteAddress, ':'), 1);
return (int) \substr(\strrchr($this->_remoteAddress, ':'), 1);
}
return 0;
}
Expand Down Expand Up @@ -601,14 +601,15 @@ public function baseRead($socket, $check_eof = true)
$this->_recvBuffer .= $buffer;
}

$recv_len = \strlen($this->_recvBuffer);
// If the application layer protocol has been set up.
if ($this->protocol !== null) {
$parser = $this->protocol;
while ($this->_recvBuffer !== '' && !$this->_isPaused) {
// The current packet length is known.
if ($this->_currentPackageLength) {
// Data is not enough for a package.
if ($this->_currentPackageLength > \strlen($this->_recvBuffer)) {
if ($this->_currentPackageLength > $recv_len) {
break;
}
} else {
Expand All @@ -623,21 +624,21 @@ public function baseRead($socket, $check_eof = true)
break;
} elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= $this->maxPackageSize) {
// Data is not enough for a package.
if ($this->_currentPackageLength > \strlen($this->_recvBuffer)) {
if ($this->_currentPackageLength > $recv_len) {
break;
}
} // Wrong package.
else {
Worker::safeEcho('error package. package_length=' . var_export($this->_currentPackageLength, true));
Worker::safeEcho('Error package. package_length=' . \var_export($this->_currentPackageLength, true));
$this->destroy();
return;
}
}

// The data is enough for a packet.
self::$statistics['total_request']++;
++self::$statistics['total_request'];
// The current packet length is equal to the length of the buffer.
if (\strlen($this->_recvBuffer) === $this->_currentPackageLength) {
if ($recv_len === $this->_currentPackageLength) {
$one_request_buffer = $this->_recvBuffer;
$this->_recvBuffer = '';
} else {
Expand Down Expand Up @@ -670,7 +671,7 @@ public function baseRead($socket, $check_eof = true)
}

// Applications protocol is not set.
self::$statistics['total_request']++;
++self::$statistics['total_request'];
if (!$this->onMessage) {
$this->_recvBuffer = '';
return;
Expand Down Expand Up @@ -727,7 +728,7 @@ public function baseWrite()
$this->bytesWritten += $len;
$this->_sendBuffer = \substr($this->_sendBuffer, $len);
} else {
self::$statistics['send_fail']++;
++self::$statistics['send_fail'];
$this->destroy();
}
}
Expand Down Expand Up @@ -756,9 +757,9 @@ public function doSslHandshake($socket){
}*/

if($async){
$type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
$type = \STREAM_CRYPTO_METHOD_SSLv2_CLIENT | \STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
}else{
$type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER;
$type = \STREAM_CRYPTO_METHOD_SSLv2_SERVER | \STREAM_CRYPTO_METHOD_SSLv23_SERVER;
}

// Hidden error.
Expand Down Expand Up @@ -838,14 +839,17 @@ public function close($data = null, $raw = false)
$this->destroy();
return;
}

if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
return;
} else {
if ($data !== null) {
$this->send($data, $raw);
}
$this->_status = self::STATUS_CLOSING;
}

if ($data !== null) {
$this->send($data, $raw);
}

$this->_status = self::STATUS_CLOSING;

if ($this->_sendBuffer === '') {
$this->destroy();
} else {
Expand Down Expand Up @@ -896,7 +900,7 @@ protected function bufferIsFull()
if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
if ($this->onError) {
try {
\call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
\call_user_func($this->onError, $this, \WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
} catch (\Exception $e) {
Worker::log($e);
exit(250);
Expand Down Expand Up @@ -988,7 +992,7 @@ public function __destruct()
self::$statistics['connection_count']--;
if (Worker::getGracefulStop()) {
if (!isset($mod)) {
$mod = ceil((self::$statistics['connection_count'] + 1) / 3);
$mod = \ceil((self::$statistics['connection_count'] + 1) / 3);
}

if (0 === self::$statistics['connection_count'] % $mod) {
Expand Down
2 changes: 1 addition & 1 deletion Events/Ev.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace Workerman\Events;

use Workerman\Worker;
use EvWatcher;
use \EvWatcher;

/**
* ev eventloop
Expand Down
Loading

0 comments on commit 4284726

Please sign in to comment.