Skip to content

Commit

Permalink
Fixed udp segment fault
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 5, 2014
1 parent 4da7cfb commit 1934c31
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 50 deletions.
2 changes: 1 addition & 1 deletion examples/udp_server.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php
$serv = new swoole_server("127.0.0.1", 9502, SWOOLE_PROCESS, SWOOLE_SOCK_UDP);
$serv->set(array(
//'worker_num' => 4, //worker process num
'worker_num' => 4, //worker process num
//'log_file' => '/tmp/swoole.log',
//'daemonize' => true,
));
Expand Down
45 changes: 1 addition & 44 deletions include/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,50 +517,7 @@ static sw_inline swWorker* swServer_get_worker(swServer *serv, uint16_t worker_i
}
}

static sw_inline int swServer_worker_schedule(swServer *serv, int schedule_key)
{
int target_worker_id = 0;

//polling mode
if (serv->dispatch_mode == SW_DISPATCH_ROUND)
{
target_worker_id = (serv->worker_round_id++) % serv->worker_num;
}
//Using the FD touch access to hash
else if (serv->dispatch_mode == SW_DISPATCH_FDMOD)
{
target_worker_id = schedule_key % serv->worker_num;
}
//Preemptive distribution
else
{
if (serv->ipc_mode == SW_IPC_MSGQUEUE)
{
//msgsnd参数必须>0
//worker进程中正确的mtype应该是pti + 1
target_worker_id = serv->worker_num;
}
else
{
int i;
sw_atomic_t *round = &SwooleTG.worker_round_i;
for (i = 0; i < serv->worker_num; i++)
{
sw_atomic_fetch_add(round, 1);
target_worker_id = (*round) % serv->worker_num;

if (serv->workers[target_worker_id].status == SW_WORKER_IDLE)
{
break;
}
}
swTrace("schedule=%d|round=%d\n", target_worker_id, *round);
}
}
return target_worker_id;
}

static sw_inline int swServer_send2worker_blocking(swServer *serv, void *data, int len, int target_worker_id)
static sw_inline int swServer_send2worker_blocking(swServer *serv, void *data, int len, uint16_t target_worker_id)
{
int ret = -1;
swWorker *worker = &(serv->workers[target_worker_id]);
Expand Down
11 changes: 11 additions & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ enum SW_CHANNEL_FLAGS
SW_CHAN_SHM = 1u << 3,
#define SW_CHAN_SHM SW_CHAN_SHM
};

typedef struct _swChannel
{
int head; //头部,出队列方向
Expand All @@ -920,6 +921,15 @@ int swChannel_notify(swChannel *object);
void swChannel_free(swChannel *object);

/*----------------------------Thread Pool-------------------------------*/
enum swThread_type
{
SW_THREAD_REACTOR,
SW_THREAD_WRITER,
SW_THREAD_UDP,
SW_THREAD_UNIX_DGRAM,
SW_THREAD_HEARTBEAT,
};

typedef struct _swThreadPool
{
pthread_mutex_t mutex;
Expand Down Expand Up @@ -1032,6 +1042,7 @@ typedef struct
typedef struct
{
uint16_t id;
uint8_t type;
uint8_t factory_lock_target;
int16_t factory_target_worker;
sw_atomic_t worker_round_i;
Expand Down
67 changes: 62 additions & 5 deletions src/factory/FactoryProcess.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,17 +873,61 @@ int swFactoryProcess_notify(swFactory *factory, swDataHead *ev)
return factory->dispatch(factory, (swDispatchData *) &sw_notify_data);
}


static sw_inline uint32_t swServer_worker_schedule(swServer *serv, uint32_t schedule_key)
{
uint32_t target_worker_id = 0;

//polling mode
if (serv->dispatch_mode == SW_DISPATCH_ROUND)
{
target_worker_id = (serv->worker_round_id++) % serv->worker_num;
}
//Using the FD touch access to hash
else if (serv->dispatch_mode == SW_DISPATCH_FDMOD)
{
target_worker_id = schedule_key % serv->worker_num;
}
//Preemptive distribution
else
{
if (serv->ipc_mode == SW_IPC_MSGQUEUE)
{
//msgsnd参数必须>0
//worker进程中正确的mtype应该是pti + 1
target_worker_id = serv->worker_num;
}
else
{
int i;
sw_atomic_t *round = &SwooleTG.worker_round_i;
for (i = 0; i < serv->worker_num; i++)
{
sw_atomic_fetch_add(round, 1);
target_worker_id = (*round) % serv->worker_num;

if (serv->workers[target_worker_id].status == SW_WORKER_IDLE)
{
break;
}
}
swTrace("schedule=%d|round=%d\n", target_worker_id, *round);
}
}
return target_worker_id;
}

/**
* [ReactorThread] dispatch request to worker
*/
int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task)
{
int schedule_key;
int send_len = sizeof(task->data.info) + task->data.info.len;
int target_worker_id = task->target_worker_id;
uint32_t schedule_key;
uint32_t send_len = sizeof(task->data.info) + task->data.info.len;
uint16_t target_worker_id;
swServer *serv = SwooleG.serv;

if (target_worker_id < 0)
if (task->target_worker_id < 0)
{
//udp use remote port
if (task->data.info.type == SW_EVENT_UDP || task->data.info.type == SW_EVENT_UDP6
Expand Down Expand Up @@ -915,7 +959,20 @@ int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task)
target_worker_id = swServer_worker_schedule(serv, schedule_key);
}
}
return swReactorThread_send2worker((void *) &(task->data), send_len, target_worker_id);
else
{
target_worker_id = task->target_worker_id;
}

if (SwooleTG.type == SW_THREAD_REACTOR)
{
return swReactorThread_send2worker((void *) &(task->data), send_len, target_worker_id);
}
else
{
swTrace("dispatch to worker#%d", target_worker_id);
return swServer_send2worker_blocking(serv, (void *) &(task->data), send_len, target_worker_id);
}
}

static int swFactoryProcess_writer_start(swFactory *factory)
Expand Down
11 changes: 11 additions & 0 deletions src/network/ReactorThread.c
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ static int swReactorThread_loop_tcp(swThreadParam *param)
SwooleTG.factory_lock_target = 0;
SwooleTG.factory_target_worker = -1;
SwooleTG.id = reactor_id;
SwooleTG.type = SW_THREAD_REACTOR;

swReactorThread *thread = swServer_get_thread(serv, reactor_id);
swReactor *reactor = &thread->reactor;
Expand Down Expand Up @@ -1178,6 +1179,11 @@ static int swReactorThread_loop_udp(swThreadParam *param)

int sock = param->pti;

SwooleTG.factory_lock_target = 0;
SwooleTG.factory_target_worker = -1;
SwooleTG.id = sock;
SwooleTG.type = SW_THREAD_UDP;

swSignal_none();

//blocking
Expand Down Expand Up @@ -1374,6 +1380,11 @@ static int swReactorThread_loop_unix_dgram(swThreadParam *param)
uint16_t sun_path_offset;
uint8_t sun_path_len;

SwooleTG.factory_lock_target = 0;
SwooleTG.factory_target_worker = -1;
SwooleTG.id = param->pti;
SwooleTG.type = SW_THREAD_UNIX_DGRAM;

swSignal_none();

//blocking
Expand Down

0 comments on commit 1934c31

Please sign in to comment.