Skip to content

Commit

Permalink
windows fix PR nanomsg#1848
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 1, 2024
1 parent 8c45983 commit 9ed68d5
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
27 changes: 26 additions & 1 deletion src/platform/windows/win_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ typedef struct ipc_conn {
int send_rv;
int conn_rv;
bool closed;
bool sending;
bool recving;
nni_mtx mtx;
nni_cv cv;
nni_reap_node reap;
Expand Down Expand Up @@ -74,9 +76,12 @@ ipc_recv_start(ipc_conn *c)
if (len > 0x1000000) {
len = 0x1000000;
}

c->recving = true;
if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
c->recving = false;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, nni_win_error(rv));
} else {
Expand All @@ -102,6 +107,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
// A zero byte receive is a remote close from the peer.
rv = NNG_ECONNSHUT;
}
c->recving = false;
nni_aio_list_remove(aio);
ipc_recv_start(c);
nni_mtx_unlock(&c->mtx);
Expand Down Expand Up @@ -191,9 +197,12 @@ ipc_send_start(ipc_conn *c)
if (len > 0x1000000) {
len = 0x1000000;
}

c->sending = true;
if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
c->sending = false;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, nni_win_error(rv));
} else {
Expand All @@ -212,6 +221,7 @@ ipc_send_cb(nni_win_io *io, int rv, size_t num)
aio = nni_list_first(&c->send_aios);
NNI_ASSERT(aio != NULL);
nni_aio_list_remove(aio);
c->sending = false;
if (c->send_rv != 0) {
rv = c->send_rv;
c->send_rv = 0;
Expand Down Expand Up @@ -270,17 +280,32 @@ static void
ipc_close(void *arg)
{
ipc_conn *c = arg;
nni_time now;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
HANDLE f = c->f;
c->closed = true;
c->f = INVALID_HANDLE_VALUE;

c->f = INVALID_HANDLE_VALUE;

if (f != INVALID_HANDLE_VALUE) {
CancelIoEx(f, &c->send_io.olpd);
CancelIoEx(f, &c->recv_io.olpd);
DisconnectNamedPipe(f);
CloseHandle(f);
}
}
now = nni_clock();
// wait up to a maximum of 10 seconds before assuming something is
// badly amiss. from what we can tell, this doesn't happen, and we do
// see the timer expire properly, but this safeguard can prevent a
// hang.
while ((c->recving || c->sending) &&
((nni_clock() - now) < (NNI_SECOND * 10))) {
nni_mtx_unlock(&c->mtx);
nni_msleep(1);
nni_mtx_lock(&c->mtx);
}
nni_mtx_unlock(&c->mtx);
}

Expand Down
8 changes: 5 additions & 3 deletions src/platform/windows/win_tcp.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2019 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
// Copyright 2019 Devolutions <[email protected]>
//
Expand All @@ -22,15 +22,17 @@ struct nni_tcp_conn {
nni_win_io conn_io;
nni_list recv_aios;
nni_list send_aios;
nni_aio * conn_aio;
nni_aio *conn_aio;
SOCKADDR_STORAGE sockname;
SOCKADDR_STORAGE peername;
nni_tcp_dialer * dialer;
nni_tcp_dialer *dialer;
nni_tcp_listener *listener;
int recv_rv;
int send_rv;
int conn_rv;
bool closed;
bool sending;
bool recving;
char buf[512]; // to hold acceptex results
nni_mtx mtx;
nni_cv cv;
Expand Down
35 changes: 29 additions & 6 deletions src/platform/windows/win_tcpconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ tcp_recv_start(nni_tcp_conn *c)
}

flags = 0;
rv = WSARecv(
c->recving = true;
flags = 0;
rv = WSARecv(
c->s, iov, niov, &nrecv, &flags, &c->recv_io.olpd, NULL);

if ((rv == SOCKET_ERROR) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous error.
c->recving = false;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, nni_win_error(rv));
} else {
Expand All @@ -76,7 +79,6 @@ tcp_recv_cb(nni_win_io *io, int rv, size_t num)
nni_mtx_lock(&c->mtx);
aio = nni_list_first(&c->recv_aios);
NNI_ASSERT(aio != NULL);
nni_aio_list_remove(aio);
if (c->recv_rv != 0) {
rv = c->recv_rv;
c->recv_rv = 0;
Expand All @@ -85,6 +87,8 @@ tcp_recv_cb(nni_win_io *io, int rv, size_t num)
// A zero byte receive is a remote close from the peer.
rv = NNG_ECONNSHUT;
}
c->recving = false;
nni_aio_list_remove(aio);
tcp_recv_start(c);
nni_mtx_unlock(&c->mtx);

Expand Down Expand Up @@ -163,11 +167,13 @@ tcp_send_start(nni_tcp_conn *c)
}
}

c->sending = true;
rv = WSASend(c->s, iov, niov, NULL, 0, &c->send_io.olpd, NULL);

if ((rv == SOCKET_ERROR) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
c->sending = false;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, nni_win_error(rv));
} else {
Expand Down Expand Up @@ -207,6 +213,7 @@ tcp_send_cb(nni_win_io *io, int rv, size_t num)
aio = nni_list_first(&c->send_aios);
NNI_ASSERT(aio != NULL);
nni_aio_list_remove(aio); // should always be at head
c->sending = false;

if (c->send_rv != 0) {
rv = c->send_rv;
Expand Down Expand Up @@ -245,14 +252,30 @@ tcp_close(void *arg)
{
nni_tcp_conn *c = arg;
nni_mtx_lock(&c->mtx);
nni_time now;
if (!c->closed) {
SOCKET s = c->s;

c->closed = true;
if (c->s != INVALID_SOCKET) {
shutdown(c->s, SD_BOTH);
closesocket(c->s);
c->s = INVALID_SOCKET;
c->s = INVALID_SOCKET;
if (s != INVALID_SOCKET) {
CancelIoEx(s, &c->send_io.olpd);
CancelIoEx(s, &c->recv_io.olpd);
shutdown(s, SD_BOTH);
closesocket(s);
}
}
now = nni_clock();
// wait up to a maximum of 10 seconds before assuming something is
// badly amiss. from what we can tell, this doesn't happen, and we do
// see the timer expire properly, but this safeguard can prevent a
// hang.
while ((c->recving || c->sending) &&
((nni_clock() - now) < (NNI_SECOND * 10))) {
nni_mtx_unlock(&c->mtx);
nni_msleep(1);
nni_mtx_lock(&c->mtx);
}
nni_mtx_unlock(&c->mtx);
}

Expand Down

0 comments on commit 9ed68d5

Please sign in to comment.