Skip to content

Commit

Permalink
Merge pull request #1 from walkor/master
Browse files Browse the repository at this point in the history
update
  • Loading branch information
forkeer authored Feb 7, 2017
2 parents e3b1eea + d131cc3 commit ebfe4d4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 26 deletions.
52 changes: 49 additions & 3 deletions Connection/TcpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ class TcpConnection extends ConnectionInterface
*/
public $protocol = null;

/**
* Transport (tcp/udp/unix/ssl).
*
* @var string
*/
public $transport = 'tcp';

/**
* Which worker belong to.
*
Expand Down Expand Up @@ -206,6 +213,13 @@ class TcpConnection extends ConnectionInterface
*/
protected $_isPaused = false;

/**
* SSL handshake completed or not
*
* @var bool
*/
protected $_sslHandshakeCompleted = false;

/**
* Construct.
*
Expand Down Expand Up @@ -236,6 +250,10 @@ public function __construct($socket, $remote_address = '')
*/
public function send($send_buffer, $raw = false)
{
if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
return false;
}

// Try to call protocol::encode($send_buffer) before sending.
if (false === $raw && $this->protocol) {
$parser = $this->protocol;
Expand All @@ -245,7 +263,9 @@ public function send($send_buffer, $raw = false)
}
}

if ($this->_status === self::STATUS_INITIAL || $this->_status === self::STATUS_CONNECTING) {
if ($this->_status !== self::STATUS_ESTABLISH ||
($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
) {
if ($this->_sendBuffer) {
if ($this->bufferIsFull()) {
self::$statistics['send_fail']++;
Expand All @@ -255,10 +275,9 @@ public function send($send_buffer, $raw = false)
$this->_sendBuffer .= $send_buffer;
$this->checkBufferWillFull();
return null;
} elseif ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
return false;
}


// Attempt to send data directly.
if ($this->_sendBuffer === '') {
$len = @fwrite($this->_socket, $send_buffer);
Expand Down Expand Up @@ -366,6 +385,33 @@ public function resumeRecv()
*/
public function baseRead($socket, $check_eof = true)
{
// SSL handshake.
if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
stream_set_blocking($socket, true);
stream_set_timeout($socket, 1);
$ret = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_SSLv23_SERVER);
if(!$ret) {
echo new \Exception('ssl handshake fail, stream_socket_enable_crypto return ' . var_export($ret, true));
return $this->destroy();
}
if (isset($this->onSslHandshake)) {
try {
call_user_func($this->onSslHandshake, $this);
} catch (\Exception $e) {
self::log($e);
exit(250);
} catch (\Error $e) {
self::log($e);
exit(250);
}
}
$this->_sslHandshakeCompleted = true;
if ($this->_sendBuffer) {
Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
}
return;
}

$buffer = fread($socket, self::READ_BUFFER_SIZE);

// Check connection closed.
Expand Down
4 changes: 3 additions & 1 deletion Events/React.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public function del($fd, $flag)
return $this->_loop->removeSignal($fd);
case EventInterface::EV_TIMER:
case EventInterface::EV_TIMER_ONCE;
return $this->_loop->cancelTimer($fd);
if ($fd !== null){
return $this->_loop->cancelTimer($fd);
}
}
return false;
}
Expand Down
53 changes: 40 additions & 13 deletions Protocols/Http.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
*/
class Http
{
/**
* The supported HTTP methods
* @var array
*/
public static $methods = array('GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS');

/**
* Check the integrity of the package.
*
Expand All @@ -40,23 +46,36 @@ public static function input($recv_buffer, TcpConnection $connection)
}

list($header,) = explode("\r\n\r\n", $recv_buffer, 2);
if (0 === strpos($recv_buffer, "POST")) {
// find Content-Length
$match = array();
if (preg_match("/\r\nContent-Length: ?(\d+)/i", $header, $match)) {
$content_length = $match[1];
return $content_length + strlen($header) + 4;
} else {
return 0;
}
} elseif (0 === strpos($recv_buffer, "GET")) {
return strlen($header) + 4;
} else {
$method = substr($header, 0, strpos($header, ' '));

if(in_array($method, static::$methods)) {
return static::getRequestSize($header, $method);
}else{
$connection->send("HTTP/1.1 400 Bad Request\r\n\r\n", true);
return 0;
}
}

/**
* Get whole size of the request
* includes the request headers and request body.
* @param string $header The request headers
* @param string $method The request method
* @return integer
*/
protected static function getRequestSize($header, $method)
{
if($method=='GET') {
return strlen($header) + 4;
}
$match = array();
if (preg_match("/\r\nContent-Length: ?(\d+)/i", $header, $match)) {
$content_length = isset($match[1]) ? $match[1] : 0;
return $content_length + strlen($header) + 4;
}
return 0;
}

/**
* Parse $_POST、$_GET、$_COOKIE.
*
Expand Down Expand Up @@ -144,10 +163,18 @@ public static function decode($recv_buffer, TcpConnection $connection)
} else {
parse_str($http_body, $_POST);
// $GLOBALS['HTTP_RAW_POST_DATA']
$GLOBALS['HTTP_RAW_POST_DATA'] = $http_body;
$GLOBALS['HTTP_RAW_REQUEST_DATA'] = $GLOBALS['HTTP_RAW_POST_DATA'] = $http_body;
}
}

if ($_SERVER['REQUEST_METHOD'] === 'PUT') {
$GLOBALS['HTTP_RAW_REQUEST_DATA'] = $http_body;
}

if ($_SERVER['REQUEST_METHOD'] === 'DELETE') {
$GLOBALS['HTTP_RAW_REQUEST_DATA'] = $http_body;
}

// QUERY_STRING
$_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
if ($_SERVER['QUERY_STRING']) {
Expand Down
23 changes: 14 additions & 9 deletions Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Worker
*
* @var string
*/
const VERSION = '3.3.6';
const VERSION = '3.3.7';

/**
* Status starting.
Expand Down Expand Up @@ -411,10 +411,7 @@ class Worker
'tcp' => 'tcp',
'udp' => 'udp',
'unix' => 'unix',
'ssl' => 'tcp',
'sslv2' => 'tcp',
'sslv3' => 'tcp',
'tls' => 'tcp'
'ssl' => 'tcp'
);

/**
Expand Down Expand Up @@ -1408,7 +1405,6 @@ public function listen()
// Autoload.
Autoloader::setRootPath($this->_autoloadRootPath);

$local_socket = $this->_socketName;
// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
Expand All @@ -1425,11 +1421,15 @@ public function listen()
}
}
}
$local_socket = $this->transport . ":" . $address;
if (!isset(self::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
} else {
$this->transport = self::$_builtinTransports[$scheme];
$this->transport = $scheme;
}

$local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;

// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
Expand All @@ -1445,8 +1445,12 @@ public function listen()
throw new Exception($errmsg);
}

if ($this->transport === 'ssl') {
stream_socket_enable_crypto($this->_mainSocket, false);
}

// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && $this->transport === 'tcp') {
if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream($this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
Expand Down Expand Up @@ -1575,6 +1579,7 @@ public function acceptConnection($socket)
$this->connections[$connection->id] = $connection;
$connection->worker = $this;
$connection->protocol = $this->protocol;
$connection->transport = $this->transport;
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
Expand Down

0 comments on commit ebfe4d4

Please sign in to comment.