Skip to content

Commit

Permalink
Merge pull request #69 from yunnian/feature-3.5.1
Browse files Browse the repository at this point in the history
Feature 3.5.1
  • Loading branch information
yunnian authored Sep 17, 2021
2 parents a8c7489 + eb5c1cf commit 7ce8d23
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 64 deletions.
38 changes: 6 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
# php-nsq

NSQ client for php7 . QQ Group : 616063018<br/>

### Changes

**2020-05-07**
- fixed https://github.com/yunnian/php-nsq/issues/39
message are binary-safe now
NSQ client for php7&php8<br/>

### install :

Expand Down Expand Up @@ -189,6 +183,11 @@ $nsq->subscribe($nsq_lookupd, $config, function($msg){

Changes
-------
* **3.5.1**
* fix publish return error when get the heartbeat
* add exception
* for PHP8.0
* fix https://github.com/yunnian/php-nsq/issues/39 message are binary-safe now
* **3.3.0**
* add the process management
* When the child process exits abnormally, it will pull up a new child process
Expand All @@ -202,28 +201,3 @@ Changes
* **3.0**
* Fix libevent more than 4096 bytes are truncated
* add the identify command,can use be set or increase heartbeats time and msg-timeout
* **2.4.0**
* Fix pub bug
* Fix sub coredump
* Fix touch bug
* Add the waite, when topic has no message
* **2.3.1**
* Support the domain host of pub
* Fix pub coredump
* **2.3.0**
* Optimized memory usage, Guarantee stability of resident memory
* **2.2.0**
* Fix pub bug zend_mm_heap corrupted
* Fix pub block bug when received the 'heartbeats'
* Add the bufferevent resource
* Add the deferred publish
* Add the touch function
* Add the finish function
* **2.1.1**
* Fix core dump
* **2.0**
* retry
* message object
* fix c99 install error
* license

9 changes: 6 additions & 3 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
<email>[email protected]</email>
<active>yes</active>
</lead>
<date>2019-10-15</date>
<date>2021-09-17</date>
<version>
<release>3.5.0</release>
<release>3.5.1</release>
<api>2.0.0</api>
</version>
<stability>
Expand All @@ -26,7 +26,10 @@
</stability>
<license uri="http://www.php.net/license">PHP</license>
<notes>
* Fix the bug in MacOs
* for PHP8
* add pub connection timeout
* fix pub bug retrun false when revice heartbeat

</notes>
<contents>
<dir name="/">
Expand Down
2 changes: 1 addition & 1 deletion php_nsq.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
extern zend_module_entry nsq_module_entry;
#define phpext_nsq_ptr &nsq_module_entry

#define PHP_NSQ_VERSION "3.5.0" /* Replace with version number for your extension */
#define PHP_NSQ_VERSION "3.5.1" /* Replace with version number for your extension */

#ifdef PHP_WIN32
# define PHP_NSQ_API __declspec(dllexport)
Expand Down
64 changes: 36 additions & 28 deletions pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,29 +175,35 @@ int publish(int sock, char *topic, char *msg, size_t msg_len) {
memcpy(&buf[ofs], msg, msg_len);
ofs+=msg_len;

send(sock, buf, ofs, 0);
int send_len = send(sock, buf, ofs, 0);
if ( -1 == send_len) {
printf("%d, send error :%s\n",__LINE__,strerror(errno));
}
if( send_len == 0 ){
throw_exception(PHP_NSQ_ERROR_PUB_LOST_CONNECTION);
return -1;
}

int l = 0;
int l ;
int msg_size;
char *message;
char *msg_size_char = malloc(4);
memset(msg_size_char, 0x00, 4);
int size;

again_size:
again_read:
l = 0;
memset(msg_size_char, 0x00, 4);
size = read(sock, msg_size_char, 4);
if( size == 0 ){
throw_exception(PHP_NSQ_ERROR_PUB_LOST_CONNECTION);
free(msg_size_char);
return -1;
}
if(size == -1){
goto again_size;
goto again_read;
}
readI32((const unsigned char *) msg_size_char, &msg_size);

free(msg_size_char);

readI32((const unsigned char *) msg_size_char, &msg_size);
message = emalloc(msg_size + 1);
memset(message, 0x00, msg_size);
again:
Expand All @@ -208,9 +214,13 @@ int publish(int sock, char *topic, char *msg, size_t msg_len) {
}
if (strcmp(message + 4, "OK") == 0) {
efree(message);
free(msg_size_char);
return sock;
} else {
} else if ( strcmp(message + 4, "_heartbeat_") == 0 ) {
goto again_read;
}else{
efree(message);
free(msg_size_char);
return -1;
}

Expand Down Expand Up @@ -247,50 +257,48 @@ int deferredPublish(int sock, char *topic, char *msg, size_t msg_len, int defer_

send(sock, buf, ofs, 0);

int l = 0;
int current_l = 0;
int l ;
int msg_size;
char *message;
char *msg_size_char = malloc(4);
memset(msg_size_char, 0x00, 4);
int size;
/*
sighandler_t handler = respond_hearbeat ;
signal(SIGALRM, handler);
alarm(5);
*/

again_size:
size = read(sock, msg_size_char , 4);

again_read:
l = 0;
memset(msg_size_char, 0x00, 4);
size = read(sock, msg_size_char, 4);
if( size == 0 ){
throw_exception(PHP_NSQ_ERROR_PUB_LOST_CONNECTION);
free(msg_size_char);
return -1;
}
if(size == -1){
goto again_size;
goto again_read;
}
readI32((const unsigned char *) msg_size_char, &msg_size);

free(msg_size_char);

readI32((const unsigned char *) msg_size_char, &msg_size);
message = emalloc(msg_size + 1);
memset(message, 0x00, msg_size);
again:
again:
l += read(sock, message +l , msg_size);
if( l < msg_size && l>0){
goto again;

}

if (strcmp(message + 4, "OK") == 0) {
efree(message);
free(msg_size_char);
return sock;
} else {
} else if ( strcmp(message + 4, "_heartbeat_") == 0 ) {
goto again_read;
}else{
efree(message);
free(msg_size_char);
return -1;
}
}

//TODO: WHEN code execute long time after publish ,over 60s , No response heartbeat , will exit
// so should open a separate process to listen thegit heartbeat
/*
void respond_hearbeat(int sock){
//send(sock, "NOP",3 , 0);
Expand Down

0 comments on commit 7ce8d23

Please sign in to comment.