Skip to content

Commit

Permalink
add the process management
Browse files Browse the repository at this point in the history
  • Loading branch information
yunnian committed Oct 21, 2018
1 parent afdaada commit f2e04ae
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 83 deletions.
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,15 @@ $nsq->subscribe($nsq_lookupd, $config, function($msg){
4. `If your have strong consuming ability ,you can add you rdy num and connect num` <br/>


5. `You can use supervisor to supervise process,The following configuration needs to be added to the supervisor configuration file: ` <br/>
```
stopasgroup=true
killasgroup=true
```

6. `If your execution time is more than 1 minute, you should use 'touch()' function ` <br/>
5. `If your execution time is more than 1 minute, you should use 'touch()' function ` <br/>


Changes
-------
* **3.3.0**
* add the process management
* When the child process exits abnormally, it will pull up a new child process
* When the master process exits, all child processes also exit
* **3.2.0**
* Fix The error message was not reported
* Fix pub error when ip or url too long
Expand Down
178 changes: 108 additions & 70 deletions nsq.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ ZEND_DECLARE_MODULE_GLOBALS(nsq)
/* True global resources - no need for thread safety here */
static int le_nsq;
int le_bufferevent;
int le_arg;

/* {{{ PHP_INI
*/
Expand All @@ -70,6 +71,9 @@ PHP_INI_END()

zend_class_entry *nsq_ce/*, *nsq_message_exception*/;

static void signal_handle(int sig);
static void signal_handles(int sig);

PHP_METHOD(Nsq, __construct){
zval *self;
zval *nsq_config = (zval *)malloc(sizeof(zval)); //use send IDENTIFY comand
Expand Down Expand Up @@ -249,33 +253,81 @@ PHP_METHOD (Nsq, deferredPublish)


HashTable *child_fd;
HashTable *child_pid_arg;
int is_init = 0;
int master = 0;

void start_worker_process(NSQArg *arg)
{

zval zval_pid;
zval zval_arg;
zend_resource *arg_resource;
pid_t pid;
pid = fork();

if (pid == 0) {
subscribe(arg);
}else if(pid > 0 ){
if(!is_init) {
master = getpid();
signal(SIGCHLD, signal_handle);
signal(SIGTERM, signal_handle);

// init hash table
ALLOC_HASHTABLE(child_fd);
zend_hash_init(child_fd, 0, NULL, ZVAL_PTR_DTOR, 1);

ALLOC_HASHTABLE(child_pid_arg);
zend_hash_init(child_pid_arg, 0, NULL, ZVAL_PTR_DTOR, 1);
is_init = 1;

}

ZVAL_LONG(&zval_pid, pid);
zval *fd_res = zend_hash_next_index_insert(child_fd, &zval_pid);

// value should build to zval
arg_resource = zend_register_resource(arg, le_arg);
ZVAL_RES(&zval_arg, arg_resource);
zval *arg_res = zend_hash_index_add(child_pid_arg, pid, &zval_arg);
}
}


static void signal_handle(int sig)
{
int status;
pid_t pid;
zend_ulong index;
zval *val;
int s;
int count;
pid_t current = getpid();
switch (sig)
{
case SIGTERM:
s = zend_array_count(child_fd);
// quit all child
ZEND_HASH_FOREACH_NUM_KEY_VAL(child_fd, index, val);
kill(Z_LVAL_P(val), SIGTERM);
ZEND_HASH_FOREACH_END();
exit(0);
if(current == master){
count = zend_array_count(child_fd);
// quit all child
ZEND_HASH_FOREACH_NUM_KEY_VAL(child_fd, index, val);
kill(Z_LVAL_P(val), SIGTERM);
ZEND_HASH_FOREACH_END();
}

exit(0);
break;
/**
* TODO reload all workers
*/
case SIGUSR1:
break;
case SIGCHLD:
while((pid=waitpid(-1, &status, WNOHANG)) > 0){
printf("child %d terminated\n", pid);
while((pid = waitpid(-1, &status, WNOHANG)) > 0){
printf("child %d terminated, will reload \n", pid);
zval *zval_arg = zend_hash_index_find(child_pid_arg, pid);
struct NSQArg *arg = (struct NSQArg*)zend_fetch_resource(Z_RES_P(zval_arg), "sub nsqd arg", le_arg);
start_worker_process(arg);
zend_hash_index_del(child_pid_arg, (zend_ulong)pid);
};
break;
case SIGALRM:
Expand All @@ -285,14 +337,14 @@ static void signal_handle(int sig)
}
}


PHP_METHOD (Nsq, subscribe)
{
zend_fcall_info fci;
zend_fcall_info_cache fcc;
zval *config;
zval *class_lookupd;
zval *lookupd_addr, rv3, lookupd_re;
zval zval_pid;

ZEND_PARSE_PARAMETERS_START(3, 3)
Z_PARAM_OBJECT(class_lookupd)
Expand Down Expand Up @@ -369,10 +421,6 @@ PHP_METHOD (Nsq, subscribe)

}

signal(SIGCHLD, signal_handle);
signal(SIGTERM, signal_handle);
ALLOC_HASHTABLE(child_fd);
zend_hash_init(child_fd, 0, NULL, ZVAL_PTR_DTOR, 1);

// foreach producers to get nsqd address
zval *val;
Expand All @@ -383,75 +431,60 @@ PHP_METHOD (Nsq, subscribe)

ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(producers), val) {

//signal(SIGCHLD, );
pid = fork();

if (pid == 0) {
zval *nsqd_host = zend_hash_str_find(Z_ARRVAL_P(val), "broadcast_address",
sizeof("broadcast_address") - 1);
zval *nsqd_port = zend_hash_str_find(Z_ARRVAL_P(val), "tcp_port", sizeof("tcp_port") - 1);
struct NSQMsg *msg;
msg = malloc(sizeof(NSQMsg));
msg->topic = Z_STRVAL_P(topic);
msg->channel = Z_STRVAL_P(channel);

if (rdy) {
msg->rdy = Z_LVAL_P(rdy);
} else {
msg->rdy = 1;
}

if (delay_time) {
msg->delay_time = Z_LVAL_P(delay_time);
} else {
msg->delay_time = 0;
}

if (auto_finish && Z_TYPE_P(auto_finish) == IS_TRUE) {
msg->auto_finish = 1;
} else {
msg->auto_finish = 0;
}

convert_to_string(nsqd_port);

NSQArg *arg;
arg = malloc(sizeof(NSQArg));
arg->msg = msg;
arg->host = Z_STRVAL_P(nsqd_host);
arg->port = Z_STRVAL_P(nsqd_port);
arg->fci = &fci;
arg->fcc = &fcc;
arg->nsq_obj = getThis();

//child should quit when master quit
//prctl(PR_SET_PDEATHSIG,SIGHUP);

subscribe(arg);
free(msg);
free(arg);
zval *nsqd_host = zend_hash_str_find(Z_ARRVAL_P(val), "broadcast_address",
sizeof("broadcast_address") - 1);
zval *nsqd_port = zend_hash_str_find(Z_ARRVAL_P(val), "tcp_port", sizeof("tcp_port") - 1);
struct NSQMsg msg;
msg.topic = Z_STRVAL_P(topic);
msg.channel = Z_STRVAL_P(channel);

if (rdy) {
msg.rdy = Z_LVAL_P(rdy);
} else {
msg.rdy = 1;
}
ZVAL_LONG(&zval_pid, pid);
zval *res = zend_hash_next_index_insert(child_fd, &zval_pid);

}ZEND_HASH_FOREACH_END();
if (delay_time) {
msg.delay_time = Z_LVAL_P(delay_time);
} else {
msg.delay_time = 0;
}

if (auto_finish && Z_TYPE_P(auto_finish) == IS_TRUE) {
msg.auto_finish = 1;
} else {
msg.auto_finish = 0;
}

convert_to_string(nsqd_port);

NSQArg arg;
arg.msg = &msg;
arg.host = Z_STRVAL_P(nsqd_host);
arg.port = Z_STRVAL_P(nsqd_port);
arg.fci = &fci;
arg.fcc = &fcc;
arg.nsq_obj = getThis();

start_worker_process(&arg);

}ZEND_HASH_FOREACH_END();

zval_dtor(auto_finish);
zval_dtor(connect_num);
zval_dtor(config);
}
int ret = 0;
int ret_pid = 0;
while (1) {
ret = wait(NULL);
if (ret == -1) {
ret_pid = wait(NULL);
if (ret_pid == -1) {
if (errno == EINTR) {
continue;
}
break;
}
}

zval_dtor(auto_finish);
zval_dtor(connect_num);
zval_dtor(config);
zval_dtor(&lookupd_re);
}

Expand All @@ -461,6 +494,10 @@ static void _php_bufferevent_dtor(zend_resource *rsrc TSRMLS_DC) /* {{{ */ {
efree(bevent);
}

static void _php_sub_arg_dtor(zend_resource *rsrc TSRMLS_DC) /* {{{ */ {
//struct NSQArg *arg = (NSQArg *) rsrc;
//free(arg);
}

/* }}} */
/* The previous line is meant for vim and emacs, so it can correctly fold and
Expand Down Expand Up @@ -542,6 +579,7 @@ PHP_MINIT_FUNCTION (nsq)
zend_declare_property_null(nsq_ce,ZEND_STRL("nsqConfig"),ZEND_ACC_PUBLIC TSRMLS_CC);
zend_declare_property_null(nsq_ce, ZEND_STRL("nsqd_connection_fds"), ZEND_ACC_PUBLIC TSRMLS_CC);
le_bufferevent = zend_register_list_destructors_ex(_php_bufferevent_dtor, NULL, "buffer event", module_number);
le_arg = zend_register_list_destructors_ex(_php_sub_arg_dtor, NULL, "nsqdr arg", module_number);

lookupd_init();
message_init();
Expand Down
11 changes: 6 additions & 5 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
<lead>
<name>rick/ZhenYuwu</name>
<user>rick</user>
<email>wuzhenyu7758@gmail.com</email>
<email>936321732@qq.com</email>
<active>yes</active>
</lead>
<date>2018-08-07</date>
<date>2018-10-21</date>
<version>
<release>3.2.0</release>
<release>3.3.0</release>
<api>2.0.0</api>
</version>
<stability>
Expand All @@ -26,8 +26,9 @@
</stability>
<license uri="http://www.php.net/license">PHP</license>
<notes>
*Fix The error message was not reported
*Fix pub error when ip or url too long
* add the process management
* When the child process exits abnormally, it will pull up a new child process
* When the master process exits, all child processes also exit
</notes>
<contents>
<dir name="/">
Expand Down
2 changes: 1 addition & 1 deletion php_nsq.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
extern zend_module_entry nsq_module_entry;
#define phpext_nsq_ptr &nsq_module_entry

#define PHP_NSQ_VERSION "3.2.0" /* Replace with version number for your extension */
#define PHP_NSQ_VERSION "3.3.0" /* Replace with version number for your extension */

#ifdef PHP_WIN32
# define PHP_NSQ_API __declspec(dllexport)
Expand Down

0 comments on commit f2e04ae

Please sign in to comment.