Skip to content

Commit

Permalink
Merge pull request #1 from walkor/master
Browse files Browse the repository at this point in the history
update
  • Loading branch information
ares333 authored Oct 13, 2017
2 parents b6737b0 + a968d46 commit 041ea7b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 57 deletions.
3 changes: 3 additions & 0 deletions Connection/AsyncTcpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public function __construct($remote_address, $context_option = null)
}

$this->id = $this->_id = self::$_idRecorder++;
if(PHP_INT_MAX === self::$_idRecorder){
self::$_idRecorder = 0;
}
// Check application layer protocol class.
if (!isset(self::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
Expand Down
142 changes: 85 additions & 57 deletions Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ class Worker
* @var string
*/
protected $_autoloadRootPath = '';

/**
* Pause listening or not.
*
* @var string
*/
protected $_pauseListen = false;

/**
* Daemonize.
Expand Down Expand Up @@ -457,9 +464,12 @@ protected static function init()
$backtrace = debug_backtrace();
self::$_startFile = $backtrace[count($backtrace) - 1]['file'];


$unique_prefix = str_replace('/', '_', self::$_startFile);

// Pid file.
if (empty(self::$pidFile)) {
self::$pidFile = __DIR__ . "/../" . str_replace('/', '_', self::$_startFile) . ".pid";
self::$pidFile = __DIR__ . "/../$unique_prefix.pid";
}

// Log file.
Expand All @@ -477,7 +487,7 @@ protected static function init()

// For statistics.
self::$_globalStatistics['start_timestamp'] = time();
self::$_statisticsFile = sys_get_temp_dir() . '/workerman.status';
self::$_statisticsFile = sys_get_temp_dir() . "/$unique_prefix.status";

// Process title.
self::setProcessTitle('WorkerMan: master process start_file=' . self::$_startFile);
Expand Down Expand Up @@ -608,7 +618,7 @@ protected static function displayUI()
$start_file = $argv[0];
self::safeEcho("Input \"php $start_file stop\" to quit. Start success.\n\n");
} else {
self::safeEcho("Press Ctrl-C to quit. Start success.\n");
self::safeEcho("Press Ctrl+C to quit. Start success.\n");
}
}

Expand Down Expand Up @@ -682,9 +692,15 @@ protected static function parseCommand()
// Sleep 1 second.
sleep(1);
// Clear terminal.
echo chr(27).chr(91).chr(72).chr(27).chr(91).chr(50).chr(74);
if ($command2 === '-d') {
echo "\33[H\33[2J\33(B\33[m";
}
// Echo status data.
echo self::formatStatusData();
if ($command2 !== '-d') {
exit(0);
}
echo "\nPress Ctrl+C to quit.\n\n";
}
exit(0);
case 'connections':
Expand Down Expand Up @@ -1602,86 +1618,101 @@ public function __construct($socket_name = '', $context_option = array())
}

/**
* Listen port.
* Listen.
*
* @throws Exception
*/
public function listen()
{
if (!$this->_socketName || $this->_mainSocket) {
if (!$this->_socketName) {
return;
}

// Autoload.
Autoloader::setRootPath($this->_autoloadRootPath);

// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
if (!isset(self::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!$this->_mainSocket) {
// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
if (!isset(self::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
}

if (!isset(self::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
if (!isset(self::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
} else {
$this->transport = $scheme;
}
} else {
$this->transport = $scheme;
}

$local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
$local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;

// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
$errmsg = '';
// SO_REUSEPORT.
if ($this->reusePort) {
stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
}
// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
$errmsg = '';
// SO_REUSEPORT.
if ($this->reusePort) {
stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
}

// Create an Internet or Unix domain server socket.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}
// Create an Internet or Unix domain server socket.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}

if ($this->transport === 'ssl') {
stream_socket_enable_crypto($this->_mainSocket, false);
} elseif ($this->transport === 'unix') {
$socketFile = substr($address, 2);
if ($this->user) {
chown($socketFile, $this->user);
if ($this->transport === 'ssl') {
stream_socket_enable_crypto($this->_mainSocket, false);
} elseif ($this->transport === 'unix') {
$socketFile = substr($address, 2);
if ($this->user) {
chown($socketFile, $this->user);
}
if ($this->group) {
chgrp($socketFile, $this->group);
}
}
if ($this->group) {
chgrp($socketFile, $this->group);

// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream($this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
}
}

// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream($this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
// Non blocking.
stream_set_blocking($this->_mainSocket, 0);
}

// Non blocking.
stream_set_blocking($this->_mainSocket, 0);

// Register a listener to be notified when server socket is ready to read.
if (self::$globalEvent) {
if (self::$globalEvent || $this->_pauseListen) {
if ($this->transport !== 'udp') {
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
} else {
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
array($this, 'acceptUdpConnection'));
}
$this->_pauseListen = false;
}
}

/**
* Unlisten.
*
* @return void
*/
public function unlisten() {
if (self::$globalEvent && $this->_mainSocket) {
self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
$this->_pauseListen = true;
}
}

Expand Down Expand Up @@ -1774,10 +1805,7 @@ public function stop()
}
}
// Remove listener for server socket.
if ($this->_mainSocket) {
self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
@fclose($this->_mainSocket);
}
$this->unlisten();
}

/**
Expand Down

0 comments on commit 041ea7b

Please sign in to comment.