Skip to content

Commit

Permalink
win: optimize udp receive performance (libuv#3807)
Browse files Browse the repository at this point in the history
Do at most 32 nonblocking udp receive in a row.

Fixes: libuv#3704
  • Loading branch information
ywave620 authored Nov 8, 2022
1 parent 8a1f378 commit dff3f8c
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 44 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ if(LIBUV_BUILD_TESTS)
test/test-udp-sendmmsg-error.c
test/test-udp-send-unreachable.c
test/test-udp-try-send.c
test/test-udp-recv-in-a-row.c
test/test-uname.c
test/test-walk-handles.c
test/test-watcher-cross-stop.c)
Expand Down
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-udp-sendmmsg-error.c \
test/test-udp-send-unreachable.c \
test/test-udp-try-send.c \
test/test-udp-recv-in-a-row.c \
test/test-uname.c \
test/test-walk-handles.c \
test/test-watcher-cross-stop.c
Expand Down
99 changes: 55 additions & 44 deletions src/win/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -441,57 +441,68 @@ void uv__process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle,
DWORD bytes, err, flags;
struct sockaddr_storage from;
int from_len;
int count;

/* Prevent loop starvation when the data comes in as fast as
* (or faster than) we can read it. */
count = 32;

do {
/* Do at most `count` nonblocking receive. */
buf = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
goto done;
}

/* Do a nonblocking receive.
* TODO: try to read multiple datagrams at once. FIONREAD maybe? */
buf = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
goto done;
}
assert(buf.base != NULL);

memset(&from, 0, sizeof from);
from_len = sizeof from;
memset(&from, 0, sizeof from);
from_len = sizeof from;

flags = 0;
flags = 0;

if (WSARecvFrom(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
(struct sockaddr*) &from,
&from_len,
NULL,
NULL) != SOCKET_ERROR) {
if (WSARecvFrom(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
(struct sockaddr*) &from,
&from_len,
NULL,
NULL) != SOCKET_ERROR) {

/* Message received */
handle->recv_cb(handle, bytes, &buf, (const struct sockaddr*) &from, 0);
} else {
err = WSAGetLastError();
if (err == WSAEMSGSIZE) {
/* Message truncated */
handle->recv_cb(handle,
bytes,
&buf,
(const struct sockaddr*) &from,
UV_UDP_PARTIAL);
} else if (err == WSAEWOULDBLOCK) {
/* Kernel buffer empty */
handle->recv_cb(handle, 0, &buf, NULL, 0);
} else if (err == WSAECONNRESET || err == WSAENETRESET) {
/* WSAECONNRESET/WSANETRESET is ignored because this just indicates
* that a previous sendto operation failed.
*/
handle->recv_cb(handle, 0, &buf, NULL, 0);
/* Message received */
err = ERROR_SUCCESS;
handle->recv_cb(handle, bytes, &buf, (const struct sockaddr*) &from, 0);
} else {
/* Any other error that we want to report back to the user. */
uv_udp_recv_stop(handle);
handle->recv_cb(handle, uv_translate_sys_error(err), &buf, NULL, 0);
err = WSAGetLastError();
if (err == WSAEMSGSIZE) {
/* Message truncated */
handle->recv_cb(handle,
bytes,
&buf,
(const struct sockaddr*) &from,
UV_UDP_PARTIAL);
} else if (err == WSAEWOULDBLOCK) {
/* Kernel buffer empty */
handle->recv_cb(handle, 0, &buf, NULL, 0);
} else if (err == WSAECONNRESET || err == WSAENETRESET) {
/* WSAECONNRESET/WSANETRESET is ignored because this just indicates
* that a previous sendto operation failed.
*/
handle->recv_cb(handle, 0, &buf, NULL, 0);
} else {
/* Any other error that we want to report back to the user. */
uv_udp_recv_stop(handle);
handle->recv_cb(handle, uv_translate_sys_error(err), &buf, NULL, 0);
}
}
}
while (err == ERROR_SUCCESS &&
count-- > 0 &&
/* The recv_cb callback may decide to pause or close the handle. */
(handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING));
}

done:
Expand Down
2 changes: 2 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ TEST_DECLARE (udp_open)
TEST_DECLARE (udp_open_twice)
TEST_DECLARE (udp_open_bound)
TEST_DECLARE (udp_open_connect)
TEST_DECLARE (udp_recv_in_a_row)
#ifndef _WIN32
TEST_DECLARE (udp_send_unix)
#endif
Expand Down Expand Up @@ -770,6 +771,7 @@ TASK_LIST_START
TEST_ENTRY (udp_multicast_ttl)
TEST_ENTRY (udp_sendmmsg_error)
TEST_ENTRY (udp_try_send)
TEST_ENTRY (udp_recv_in_a_row)

TEST_ENTRY (udp_open)
TEST_ENTRY (udp_open_twice)
Expand Down
121 changes: 121 additions & 0 deletions test/test-udp-recv-in-a-row.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/* Copyright The libuv project and contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

#include "uv.h"
#include "task.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

static uv_udp_t server;
static uv_udp_t client;
static uv_check_t check_handle;
static uv_buf_t buf;
static struct sockaddr_in addr;
static char send_data[10];
static int check_cb_called;

#define N 5
static int recv_cnt;

static void alloc_cb(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
static char slab[sizeof(send_data)];
buf->base = slab;
buf->len = sizeof(slab);
}

static void sv_recv_cb(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* rcvbuf,
const struct sockaddr* addr,
unsigned flags) {
if (++ recv_cnt < N) {
ASSERT_EQ(sizeof(send_data), nread);
} else {
ASSERT_EQ(0, nread);
}
}

static void check_cb(uv_check_t* handle) {
ASSERT_PTR_EQ(&check_handle, handle);

/**
* sv_recv_cb() is called with nread set to zero to indicate
* there is no more udp packet in the kernel, so the actual
* recv_cnt is one larger than N.
*/
ASSERT_EQ(N+1, recv_cnt);
check_cb_called = 1;

/* we are done */
ASSERT_EQ(0, uv_check_stop(handle));
uv_close((uv_handle_t*) &client, NULL);
uv_close((uv_handle_t*) &check_handle, NULL);
uv_close((uv_handle_t*) &server, NULL);
}


TEST_IMPL(udp_recv_in_a_row) {
int i, r;

ASSERT_EQ(0, uv_check_init(uv_default_loop(), &check_handle));
ASSERT_EQ(0, uv_check_start(&check_handle, check_cb));

ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));

ASSERT_EQ(0, uv_udp_init(uv_default_loop(), &server));
ASSERT_EQ(0, uv_udp_bind(&server, (const struct sockaddr*) &addr, 0));
ASSERT_EQ(0, uv_udp_recv_start(&server, alloc_cb, sv_recv_cb));

ASSERT_EQ(0, uv_udp_init(uv_default_loop(), &client));

/* send N-1 udp packets */
buf = uv_buf_init(send_data, sizeof(send_data));
for (i = 0; i < N - 1; i ++) {
r = uv_udp_try_send(&client,
&buf,
1,
(const struct sockaddr*) &addr);
ASSERT_EQ(sizeof(send_data), r);
}

/* send an empty udp packet */
buf = uv_buf_init(NULL, 0);
r = uv_udp_try_send(&client,
&buf,
1,
(const struct sockaddr*) &addr);
ASSERT_EQ(0, r);

/* check_cb() asserts that the N packets can be received
* before it gets called.
*/

ASSERT_EQ(0, uv_run(uv_default_loop(), UV_RUN_DEFAULT));

ASSERT(check_cb_called);

MAKE_VALGRIND_HAPPY();
return 0;
}

0 comments on commit dff3f8c

Please sign in to comment.