From 1934c31c0b6aa2612ca9b821d13172b9acacd7db Mon Sep 17 00:00:00 2001 From: matyhtf Date: Fri, 5 Sep 2014 11:21:54 +0800 Subject: [PATCH] Fixed udp segment fault --- examples/udp_server.php | 2 +- include/Server.h | 45 +----------------------- include/swoole.h | 11 ++++++ src/factory/FactoryProcess.c | 67 +++++++++++++++++++++++++++++++++--- src/network/ReactorThread.c | 11 ++++++ 5 files changed, 86 insertions(+), 50 deletions(-) diff --git a/examples/udp_server.php b/examples/udp_server.php index f743e6bb92e..76a2e411ed5 100644 --- a/examples/udp_server.php +++ b/examples/udp_server.php @@ -1,7 +1,7 @@ set(array( - //'worker_num' => 4, //worker process num + 'worker_num' => 4, //worker process num //'log_file' => '/tmp/swoole.log', //'daemonize' => true, )); diff --git a/include/Server.h b/include/Server.h index 2f168429f01..6fedb3fdd19 100644 --- a/include/Server.h +++ b/include/Server.h @@ -517,50 +517,7 @@ static sw_inline swWorker* swServer_get_worker(swServer *serv, uint16_t worker_i } } -static sw_inline int swServer_worker_schedule(swServer *serv, int schedule_key) -{ - int target_worker_id = 0; - - //polling mode - if (serv->dispatch_mode == SW_DISPATCH_ROUND) - { - target_worker_id = (serv->worker_round_id++) % serv->worker_num; - } - //Using the FD touch access to hash - else if (serv->dispatch_mode == SW_DISPATCH_FDMOD) - { - target_worker_id = schedule_key % serv->worker_num; - } - //Preemptive distribution - else - { - if (serv->ipc_mode == SW_IPC_MSGQUEUE) - { - //msgsnd参数必须>0 - //worker进程中正确的mtype应该是pti + 1 - target_worker_id = serv->worker_num; - } - else - { - int i; - sw_atomic_t *round = &SwooleTG.worker_round_i; - for (i = 0; i < serv->worker_num; i++) - { - sw_atomic_fetch_add(round, 1); - target_worker_id = (*round) % serv->worker_num; - - if (serv->workers[target_worker_id].status == SW_WORKER_IDLE) - { - break; - } - } - swTrace("schedule=%d|round=%d\n", target_worker_id, *round); - } - } - return target_worker_id; -} - -static sw_inline int swServer_send2worker_blocking(swServer *serv, void *data, int len, int target_worker_id) +static sw_inline int swServer_send2worker_blocking(swServer *serv, void *data, int len, uint16_t target_worker_id) { int ret = -1; swWorker *worker = &(serv->workers[target_worker_id]); diff --git a/include/swoole.h b/include/swoole.h index 0b6b086b2db..bd6c4b25a8e 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -895,6 +895,7 @@ enum SW_CHANNEL_FLAGS SW_CHAN_SHM = 1u << 3, #define SW_CHAN_SHM SW_CHAN_SHM }; + typedef struct _swChannel { int head; //头部,出队列方向 @@ -920,6 +921,15 @@ int swChannel_notify(swChannel *object); void swChannel_free(swChannel *object); /*----------------------------Thread Pool-------------------------------*/ +enum swThread_type +{ + SW_THREAD_REACTOR, + SW_THREAD_WRITER, + SW_THREAD_UDP, + SW_THREAD_UNIX_DGRAM, + SW_THREAD_HEARTBEAT, +}; + typedef struct _swThreadPool { pthread_mutex_t mutex; @@ -1032,6 +1042,7 @@ typedef struct typedef struct { uint16_t id; + uint8_t type; uint8_t factory_lock_target; int16_t factory_target_worker; sw_atomic_t worker_round_i; diff --git a/src/factory/FactoryProcess.c b/src/factory/FactoryProcess.c index edf23579949..f8516aa0e43 100644 --- a/src/factory/FactoryProcess.c +++ b/src/factory/FactoryProcess.c @@ -873,17 +873,61 @@ int swFactoryProcess_notify(swFactory *factory, swDataHead *ev) return factory->dispatch(factory, (swDispatchData *) &sw_notify_data); } + +static sw_inline uint32_t swServer_worker_schedule(swServer *serv, uint32_t schedule_key) +{ + uint32_t target_worker_id = 0; + + //polling mode + if (serv->dispatch_mode == SW_DISPATCH_ROUND) + { + target_worker_id = (serv->worker_round_id++) % serv->worker_num; + } + //Using the FD touch access to hash + else if (serv->dispatch_mode == SW_DISPATCH_FDMOD) + { + target_worker_id = schedule_key % serv->worker_num; + } + //Preemptive distribution + else + { + if (serv->ipc_mode == SW_IPC_MSGQUEUE) + { + //msgsnd参数必须>0 + //worker进程中正确的mtype应该是pti + 1 + target_worker_id = serv->worker_num; + } + else + { + int i; + sw_atomic_t *round = &SwooleTG.worker_round_i; + for (i = 0; i < serv->worker_num; i++) + { + sw_atomic_fetch_add(round, 1); + target_worker_id = (*round) % serv->worker_num; + + if (serv->workers[target_worker_id].status == SW_WORKER_IDLE) + { + break; + } + } + swTrace("schedule=%d|round=%d\n", target_worker_id, *round); + } + } + return target_worker_id; +} + /** * [ReactorThread] dispatch request to worker */ int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task) { - int schedule_key; - int send_len = sizeof(task->data.info) + task->data.info.len; - int target_worker_id = task->target_worker_id; + uint32_t schedule_key; + uint32_t send_len = sizeof(task->data.info) + task->data.info.len; + uint16_t target_worker_id; swServer *serv = SwooleG.serv; - if (target_worker_id < 0) + if (task->target_worker_id < 0) { //udp use remote port if (task->data.info.type == SW_EVENT_UDP || task->data.info.type == SW_EVENT_UDP6 @@ -915,7 +959,20 @@ int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task) target_worker_id = swServer_worker_schedule(serv, schedule_key); } } - return swReactorThread_send2worker((void *) &(task->data), send_len, target_worker_id); + else + { + target_worker_id = task->target_worker_id; + } + + if (SwooleTG.type == SW_THREAD_REACTOR) + { + return swReactorThread_send2worker((void *) &(task->data), send_len, target_worker_id); + } + else + { + swTrace("dispatch to worker#%d", target_worker_id); + return swServer_send2worker_blocking(serv, (void *) &(task->data), send_len, target_worker_id); + } } static int swFactoryProcess_writer_start(swFactory *factory) diff --git a/src/network/ReactorThread.c b/src/network/ReactorThread.c index bb3991d1e94..cddef731f08 100644 --- a/src/network/ReactorThread.c +++ b/src/network/ReactorThread.c @@ -1030,6 +1030,7 @@ static int swReactorThread_loop_tcp(swThreadParam *param) SwooleTG.factory_lock_target = 0; SwooleTG.factory_target_worker = -1; SwooleTG.id = reactor_id; + SwooleTG.type = SW_THREAD_REACTOR; swReactorThread *thread = swServer_get_thread(serv, reactor_id); swReactor *reactor = &thread->reactor; @@ -1178,6 +1179,11 @@ static int swReactorThread_loop_udp(swThreadParam *param) int sock = param->pti; + SwooleTG.factory_lock_target = 0; + SwooleTG.factory_target_worker = -1; + SwooleTG.id = sock; + SwooleTG.type = SW_THREAD_UDP; + swSignal_none(); //blocking @@ -1374,6 +1380,11 @@ static int swReactorThread_loop_unix_dgram(swThreadParam *param) uint16_t sun_path_offset; uint8_t sun_path_len; + SwooleTG.factory_lock_target = 0; + SwooleTG.factory_target_worker = -1; + SwooleTG.id = param->pti; + SwooleTG.type = SW_THREAD_UNIX_DGRAM; + swSignal_none(); //blocking