From f2e04ae71532540609abe3589d0866af629b9043 Mon Sep 17 00:00:00 2001 From: "yunnian(zhenyu)" <936321732@qq.com> Date: Sun, 21 Oct 2018 16:04:33 +0800 Subject: [PATCH] add the process management --- README.md | 12 ++-- nsq.c | 178 +++++++++++++++++++++++++++++++--------------------- package.xml | 11 ++-- php_nsq.h | 2 +- 4 files changed, 120 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index ca9825d..7d937aa 100644 --- a/README.md +++ b/README.md @@ -163,17 +163,15 @@ $nsq->subscribe($nsq_lookupd, $config, function($msg){ 4. `If your have strong consuming ability ,you can add you rdy num and connect num`
-5. `You can use supervisor to supervise process,The following configuration needs to be added to the supervisor configuration file: `
-``` - stopasgroup=true - killasgroup=true -``` - -6. `If your execution time is more than 1 minute, you should use 'touch()' function `
+5. `If your execution time is more than 1 minute, you should use 'touch()' function `
Changes ------- +* **3.3.0** + * add the process management + * When the child process exits abnormally, it will pull up a new child process + * When the master process exits, all child processes also exit * **3.2.0** * Fix The error message was not reported * Fix pub error when ip or url too long diff --git a/nsq.c b/nsq.c index 1ab75a7..9e6600f 100644 --- a/nsq.c +++ b/nsq.c @@ -49,6 +49,7 @@ ZEND_DECLARE_MODULE_GLOBALS(nsq) /* True global resources - no need for thread safety here */ static int le_nsq; int le_bufferevent; +int le_arg; /* {{{ PHP_INI */ @@ -70,6 +71,9 @@ PHP_INI_END() zend_class_entry *nsq_ce/*, *nsq_message_exception*/; +static void signal_handle(int sig); +static void signal_handles(int sig); + PHP_METHOD(Nsq, __construct){ zval *self; zval *nsq_config = (zval *)malloc(sizeof(zval)); //use send IDENTIFY comand @@ -249,6 +253,47 @@ PHP_METHOD (Nsq, deferredPublish) HashTable *child_fd; +HashTable *child_pid_arg; +int is_init = 0; +int master = 0; + +void start_worker_process(NSQArg *arg) +{ + + zval zval_pid; + zval zval_arg; + zend_resource *arg_resource; + pid_t pid; + pid = fork(); + + if (pid == 0) { + subscribe(arg); + }else if(pid > 0 ){ + if(!is_init) { + master = getpid(); + signal(SIGCHLD, signal_handle); + signal(SIGTERM, signal_handle); + + // init hash table + ALLOC_HASHTABLE(child_fd); + zend_hash_init(child_fd, 0, NULL, ZVAL_PTR_DTOR, 1); + + ALLOC_HASHTABLE(child_pid_arg); + zend_hash_init(child_pid_arg, 0, NULL, ZVAL_PTR_DTOR, 1); + is_init = 1; + + } + + ZVAL_LONG(&zval_pid, pid); + zval *fd_res = zend_hash_next_index_insert(child_fd, &zval_pid); + + // value should build to zval + arg_resource = zend_register_resource(arg, le_arg); + ZVAL_RES(&zval_arg, arg_resource); + zval *arg_res = zend_hash_index_add(child_pid_arg, pid, &zval_arg); + } +} + static void signal_handle(int sig) { @@ -256,17 +301,20 @@ static void signal_handle(int sig) pid_t pid; zend_ulong index; zval *val; - int s; + int count; + pid_t current = getpid(); switch (sig) { case SIGTERM: - s = zend_array_count(child_fd); - // quit all child - ZEND_HASH_FOREACH_NUM_KEY_VAL(child_fd, index, val); - kill(Z_LVAL_P(val), SIGTERM); - ZEND_HASH_FOREACH_END(); - exit(0); + if(current == master){ + count = zend_array_count(child_fd); + // quit all child + ZEND_HASH_FOREACH_NUM_KEY_VAL(child_fd, index, val); + kill(Z_LVAL_P(val), SIGTERM); + ZEND_HASH_FOREACH_END(); + } + exit(0); break; /** * TODO reload all workers @@ -274,8 +322,12 @@ static void signal_handle(int sig) case SIGUSR1: break; case SIGCHLD: - while((pid=waitpid(-1, &status, WNOHANG)) > 0){ - printf("child %d terminated\n", pid); + while((pid = waitpid(-1, &status, WNOHANG)) > 0){ + printf("child %d terminated, will reload \n", pid); + zval *zval_arg = zend_hash_index_find(child_pid_arg, pid); + struct NSQArg *arg = (struct NSQArg*)zend_fetch_resource(Z_RES_P(zval_arg), "sub nsqd arg", le_arg); + start_worker_process(arg); + zend_hash_index_del(child_pid_arg, (zend_ulong)pid); }; break; case SIGALRM: @@ -285,6 +337,7 @@ static void signal_handle(int sig) } } + PHP_METHOD (Nsq, subscribe) { zend_fcall_info fci; @@ -292,7 +345,6 @@ PHP_METHOD (Nsq, subscribe) zval *config; zval *class_lookupd; zval *lookupd_addr, rv3, lookupd_re; - zval zval_pid; ZEND_PARSE_PARAMETERS_START(3, 3) Z_PARAM_OBJECT(class_lookupd) @@ -369,10 +421,6 @@ PHP_METHOD (Nsq, subscribe) } - signal(SIGCHLD, signal_handle); - signal(SIGTERM, signal_handle); - ALLOC_HASHTABLE(child_fd); - zend_hash_init(child_fd, 0, NULL, ZVAL_PTR_DTOR, 1); // foreach producers to get nsqd address zval *val; @@ -383,68 +431,50 @@ PHP_METHOD (Nsq, subscribe) ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(producers), val) { - //signal(SIGCHLD, ); - pid = fork(); - - if (pid == 0) { - zval *nsqd_host = zend_hash_str_find(Z_ARRVAL_P(val), "broadcast_address", - sizeof("broadcast_address") - 1); - zval *nsqd_port = zend_hash_str_find(Z_ARRVAL_P(val), "tcp_port", sizeof("tcp_port") - 1); - struct NSQMsg *msg; - msg = malloc(sizeof(NSQMsg)); - msg->topic = Z_STRVAL_P(topic); - msg->channel = Z_STRVAL_P(channel); - - if (rdy) { - msg->rdy = Z_LVAL_P(rdy); - } else { - msg->rdy = 1; - } - - if (delay_time) { - msg->delay_time = Z_LVAL_P(delay_time); - } else { - msg->delay_time = 0; - } - - if (auto_finish && Z_TYPE_P(auto_finish) == IS_TRUE) { - msg->auto_finish = 1; - } else { - msg->auto_finish = 0; - } - - convert_to_string(nsqd_port); - - NSQArg *arg; - arg = malloc(sizeof(NSQArg)); - arg->msg = msg; - arg->host = Z_STRVAL_P(nsqd_host); - arg->port = Z_STRVAL_P(nsqd_port); - arg->fci = &fci; - arg->fcc = &fcc; - arg->nsq_obj = getThis(); - - //child should quit when master quit - //prctl(PR_SET_PDEATHSIG,SIGHUP); - - subscribe(arg); - free(msg); - free(arg); + zval *nsqd_host = zend_hash_str_find(Z_ARRVAL_P(val), "broadcast_address", + sizeof("broadcast_address") - 1); + zval *nsqd_port = zend_hash_str_find(Z_ARRVAL_P(val), "tcp_port", sizeof("tcp_port") - 1); + struct NSQMsg msg; + msg.topic = Z_STRVAL_P(topic); + msg.channel = Z_STRVAL_P(channel); + + if (rdy) { + msg.rdy = Z_LVAL_P(rdy); + } else { + msg.rdy = 1; } - ZVAL_LONG(&zval_pid, pid); - zval *res = zend_hash_next_index_insert(child_fd, &zval_pid); - }ZEND_HASH_FOREACH_END(); + if (delay_time) { + msg.delay_time = Z_LVAL_P(delay_time); + } else { + msg.delay_time = 0; + } + if (auto_finish && Z_TYPE_P(auto_finish) == IS_TRUE) { + msg.auto_finish = 1; + } else { + msg.auto_finish = 0; + } + + convert_to_string(nsqd_port); + + NSQArg arg; + arg.msg = &msg; + arg.host = Z_STRVAL_P(nsqd_host); + arg.port = Z_STRVAL_P(nsqd_port); + arg.fci = &fci; + arg.fcc = &fcc; + arg.nsq_obj = getThis(); + + start_worker_process(&arg); + + }ZEND_HASH_FOREACH_END(); - zval_dtor(auto_finish); - zval_dtor(connect_num); - zval_dtor(config); } - int ret = 0; + int ret_pid = 0; while (1) { - ret = wait(NULL); - if (ret == -1) { + ret_pid = wait(NULL); + if (ret_pid == -1) { if (errno == EINTR) { continue; } @@ -452,6 +482,9 @@ PHP_METHOD (Nsq, subscribe) } } + zval_dtor(auto_finish); + zval_dtor(connect_num); + zval_dtor(config); zval_dtor(&lookupd_re); } @@ -461,6 +494,10 @@ static void _php_bufferevent_dtor(zend_resource *rsrc TSRMLS_DC) /* {{{ */ { efree(bevent); } +static void _php_sub_arg_dtor(zend_resource *rsrc TSRMLS_DC) /* {{{ */ { + //struct NSQArg *arg = (NSQArg *) rsrc; + //free(arg); +} /* }}} */ /* The previous line is meant for vim and emacs, so it can correctly fold and @@ -542,6 +579,7 @@ PHP_MINIT_FUNCTION (nsq) zend_declare_property_null(nsq_ce,ZEND_STRL("nsqConfig"),ZEND_ACC_PUBLIC TSRMLS_CC); zend_declare_property_null(nsq_ce, ZEND_STRL("nsqd_connection_fds"), ZEND_ACC_PUBLIC TSRMLS_CC); le_bufferevent = zend_register_list_destructors_ex(_php_bufferevent_dtor, NULL, "buffer event", module_number); + le_arg = zend_register_list_destructors_ex(_php_sub_arg_dtor, NULL, "nsqdr arg", module_number); lookupd_init(); message_init(); diff --git a/package.xml b/package.xml index cd1dd3b..d49b3ea 100644 --- a/package.xml +++ b/package.xml @@ -12,12 +12,12 @@ rick/ZhenYuwu rick - wuzhenyu7758@gmail.com + 936321732@qq.com yes - 2018-08-07 + 2018-10-21 - 3.2.0 + 3.3.0 2.0.0 @@ -26,8 +26,9 @@ PHP - *Fix The error message was not reported - *Fix pub error when ip or url too long + * add the process management + * When the child process exits abnormally, it will pull up a new child process + * When the master process exits, all child processes also exit diff --git a/php_nsq.h b/php_nsq.h index c0be20f..1feaa57 100644 --- a/php_nsq.h +++ b/php_nsq.h @@ -20,7 +20,7 @@ extern zend_module_entry nsq_module_entry; #define phpext_nsq_ptr &nsq_module_entry -#define PHP_NSQ_VERSION "3.2.0" /* Replace with version number for your extension */ +#define PHP_NSQ_VERSION "3.3.0" /* Replace with version number for your extension */ #ifdef PHP_WIN32 # define PHP_NSQ_API __declspec(dllexport)