Skip to content

Commit

Permalink
posix pollers: expose pfd structures (for sizes) and fix poller selec…
Browse files Browse the repository at this point in the history
…tion

The poller selection in the previous poller changes for select were
not quite functional.  Also, while testing poll() based poller, there
were problems where it simply did not work correctly, so this addresses
those, and it seems to work now.

The pfd structures are exposed as we intend to allow inlining them
to eliminate the separate allocation and potential for failure during
initialization.  We also want to have plans afoot to eliminate a
lot of the extra locking done done on each I/O iteration, and this
is setting the foundation for that.
  • Loading branch information
gdamore committed Dec 19, 2024
1 parent 0d39d4b commit 5e18eb4
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 117 deletions.
15 changes: 10 additions & 5 deletions src/platform/posix/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ if (NNG_PLATFORM_POSIX)
set_property(CACHE NNG_POLLQ_POLLER PROPERTY STRINGS auto ports kqueue epoll poll select)
mark_as_advanced(NNG_POLLQ_POLLER)
if (NNG_POLLQ_POLLER STREQUAL "ports")
nng_defines(NNG_POLLQ_PORTS)
set(NNG_POLLQ_PORTS ON)
elseif (NNG_POLLQ_POLLER STREQUAL "kqueue")
nng_defines(NNG_POLLQ_KQUEUE)
set(NNG_POLLQ_KQUEUE ON)
elseif (NNG_POLLQ_POLLER STREQUAL "epoll")
nng_defines(NNG_POLLQ_EPOLL)
set(NNG_POLLQ_EPOLL ON)
elseif (NNG_POLLQ_POLLER STREQUAL "poll")
nng_defines(NNG_POLLQ_POLL)
set(NNG_POLLQ_POLL ON)
elseif (NNG_POLLQ_POLLER STREQUAL "select")
set(NNG_POLLQ_SELECT ON)
elseif (NNG_HAVE_PORT_CREATE)
Expand All @@ -125,23 +125,28 @@ if (NNG_PLATFORM_POSIX)
elseif (NNG_HAVE_POLL)
set(NNG_POLLQ_POLL ON)
elseif (NNG_HAVE_SELECT)
set(NNG_POLLQ_SELECT TRUE)
set(NNG_POLLQ_SELECT ON)
endif()

if (NNG_POLLQ_PORTS)
message(STATUS "Using port events for multiplexing I/O.")
nng_defines(NNG_POLLQ_PORTS)
nng_sources(posix_pollq_port.c)
elseif (NNG_POLLQ_KQUEUE)
message(STATUS "Using kqueue for multiplexing I/O.")
nng_defines(NNG_POLLQ_KQUEUE)
nng_sources(posix_pollq_kqueue.c)
elseif (NNG_POLLQ_EPOLL)
message(DEBUG "Using epoll for multiplexing I/O.")
nng_defines(NNG_POLLQ_EPOLL)
nng_sources(posix_pollq_epoll.c)
elseif (NNG_POLLQ_POLL)
message(STATUS "Using poll for multiplexing I/O.")
nng_defines(NNG_POLLQ_POLL)
nng_sources(posix_pollq_poll.c)
elseif (NNG_POLLQ_SELECT)
message(STATUS "Using select for multiplexing I/O.")
nng_defines(NNG_POLLQ_SELECT)
nng_sources(posix_pollq_select.c)
else()
message(FATAL_ERROR "No suitable poller found for multiplexing I/O.")
Expand Down
32 changes: 15 additions & 17 deletions src/platform/posix/posix_pollq.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2019 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand All @@ -20,33 +20,31 @@
// one of several possible different backends.

#include "core/nng_impl.h"
#include <poll.h>

typedef struct nni_posix_pfd nni_posix_pfd;
typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, unsigned, void *);

#if defined(NNG_POLLQ_KQUEUE)
#include "posix_pollq_kqueue.h"
#elif defined(NNG_POLLQ_PORTS)
#include "posix_pollq_port.h"
#elif defined(NNG_POLLQ_EPOLL)
#include "posix_pollq_epoll.h"
#elif defined(NNG_POLLQ_POLL)
#include "posix_pollq_epoll.h"
#elif defined(NNG_POLLQ_SELECT)
#include "posix_pollq_epoll.h"
#else
#error "No suitable poller defined"
#endif

extern int nni_posix_pfd_init(nni_posix_pfd **, int);
extern void nni_posix_pfd_fini(nni_posix_pfd *);
extern int nni_posix_pfd_arm(nni_posix_pfd *, unsigned);
extern int nni_posix_pfd_fd(nni_posix_pfd *);
extern void nni_posix_pfd_close(nni_posix_pfd *);
extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);

#ifdef POLLIN
#define NNI_POLL_IN ((unsigned) POLLIN)
#define NNI_POLL_OUT ((unsigned) POLLOUT)
#define NNI_POLL_HUP ((unsigned) POLLHUP)
#define NNI_POLL_ERR ((unsigned) POLLERR)
#define NNI_POLL_INVAL ((unsigned) POLLNVAL)
#else
// maybe using select
#define NNI_POLL_IN (0x0001)
#define NNI_POLL_OUT (0x0010)
#define NNI_POLL_HUP (0x0004)
#define NNI_POLL_ERR (0x0008)
#define NNI_POLL_INVAL (0x0020)
#endif // POLLIN

#endif // NNG_PLATFORM_POSIX

#endif // PLATFORM_POSIX_POLLQ_H
20 changes: 2 additions & 18 deletions src/platform/posix/posix_pollq_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"

typedef struct nni_posix_pollq nni_posix_pollq;

#ifndef EFD_CLOEXEC
#define EFD_CLOEXEC 0
#endif
Expand All @@ -51,28 +49,14 @@ typedef struct nni_posix_pollq nni_posix_pollq;

// nni_posix_pollq is a work structure that manages state for the epoll-based
// pollq implementation
struct nni_posix_pollq {
typedef struct nni_posix_pollq {
nni_mtx mtx;
int epfd; // epoll handle
int evfd; // event fd (to wake us for other stuff)
bool close; // request for worker to exit
nni_thr thr; // worker thread
nni_list reapq;
};

struct nni_posix_pfd {
nni_list_node node;
nni_posix_pollq *pq;
int fd;
nni_posix_pfd_cb cb;
void *arg;
bool closed;
bool closing;
bool reap;
unsigned events;
nni_mtx mtx;
nni_cv cv;
};
} nni_posix_pollq;

// single global instance for now.
static nni_posix_pollq nni_posix_global_pollq;
Expand Down
38 changes: 38 additions & 0 deletions src/platform/posix/posix_pollq_epoll.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

#ifndef PLATFORM_POSIX_POLLQ_EPOLL_H
#define PLATFORM_POSIX_POLLQ_EPOLL_H

#include <poll.h>

// nni_posix_pfd is the handle used by the poller. It's internals are private
// to the poller.
struct nni_posix_pfd {
nni_list_node node;
struct nni_posix_pollq *pq;
int fd;
nni_posix_pfd_cb cb;
void *arg;
bool closed;
bool closing;
bool reap;
unsigned events;
nni_mtx mtx;
nni_cv cv;
};

#define NNI_POLL_IN ((unsigned) POLLIN)
#define NNI_POLL_OUT ((unsigned) POLLOUT)
#define NNI_POLL_HUP ((unsigned) POLLHUP)
#define NNI_POLL_ERR ((unsigned) POLLERR)
#define NNI_POLL_INVAL ((unsigned) POLLNVAL)

#endif // PLATFORM_POSIX_POLLQ_EPOLL_H
19 changes: 2 additions & 17 deletions src/platform/posix/posix_pollq_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "core/defs.h"
#ifdef NNG_HAVE_KQUEUE

#include <errno.h>
Expand All @@ -24,31 +23,17 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"

typedef struct nni_posix_pollq nni_posix_pollq;

// nni_posix_pollq is a work structure that manages state for the kqueue-based
// pollq implementation
struct nni_posix_pollq {
typedef struct nni_posix_pollq {
nni_mtx mtx;
int wake_wfd; // write side of wake pipe
int wake_rfd; // read side of wake pipe
bool closed; // request for worker to exit
int kq; // kqueue handle
nni_thr thr; // worker thread
nni_list reapq; // items to reap
};

struct nni_posix_pfd {
nni_list_node node; // linkage into the reap list
nni_posix_pollq *pq; // associated pollq
int fd; // file descriptor to poll
void *arg; // user data
nni_posix_pfd_cb cb; // user callback on event
bool closed;
unsigned events;
nni_cv cv; // signaled when poller has unregistered
nni_mtx mtx;
};
} nni_posix_pollq;

#define NNI_MAX_KQUEUE_EVENTS 64

Expand Down
34 changes: 34 additions & 0 deletions src/platform/posix/posix_pollq_kqueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

#ifndef PLATFORM_POSIX_POLLQ_KQUEUE_H
#define PLATFORM_POSIX_POLLQ_KQUEUE_H

// nni_posix_pfd is the handle used by the poller. It's internals are private
// to the poller.
struct nni_posix_pfd {
nni_list_node node; // linkage into the reap list
struct nni_posix_pollq *pq; // associated pollq
int fd; // file descriptor to poll
void *arg; // user data
nni_posix_pfd_cb cb; // user callback on event
bool closed;
unsigned events;
nni_cv cv; // signaled when poller has unregistered
nni_mtx mtx;
};

#define NNI_POLL_IN (0x0001)
#define NNI_POLL_OUT (0x0010)
#define NNI_POLL_HUP (0x0004)
#define NNI_POLL_ERR (0x0008)
#define NNI_POLL_INVAL (0x0020)

#endif // PLATFORM_POSIX_POLLQ_KQUEUE_H
55 changes: 26 additions & 29 deletions src/platform/posix/posix_pollq_poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@
// (Btw, pfd->fd is not guarded, because it is set at pfd creation and
// persists until the pfd is destroyed.)

typedef struct nni_posix_pollq nni_posix_pollq;

struct nni_posix_pollq {
typedef struct nni_posix_pollq {
nni_mtx mtx;
int nfds;
int wakewfd; // write side of waker pipe
Expand All @@ -54,18 +52,7 @@ struct nni_posix_pollq {
nni_thr thr; // worker thread
nni_list pollq; // armed nodes
nni_list reapq;
};

struct nni_posix_pfd {
nni_posix_pollq *pq;
int fd;
nni_list_node node;
nni_cv cv;
nni_mtx mtx;
unsigned events;
nni_posix_pfd_cb cb;
void *arg;
};
} nni_posix_pollq;

static nni_posix_pollq nni_posix_global_pollq;

Expand Down Expand Up @@ -223,11 +210,11 @@ nni_posix_poll_thr(void *arg)

// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
fds[0].fd = pq->wakerfd;
fds[0].events = POLLIN;
fds[0].revents = 0;
pfds[0] = NULL;
nfds = 1;
fds[0].fd = pq->wakerfd;
fds[0].events = POLLIN;
fds[0].revents = 0;
pfds[pq->wakerfd] = NULL;
nfds = 1;

// Also lets reap anything that was in the reaplist!
while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
Expand Down Expand Up @@ -255,7 +242,7 @@ nni_posix_poll_thr(void *arg)
fds[nfds].fd = pfd->fd;
fds[nfds].events = events;
fds[nfds].revents = 0;
pfds[nfds] = pfd;
pfds[pfd->fd] = pfd;
nfds++;
}
}
Expand All @@ -270,18 +257,28 @@ nni_posix_poll_thr(void *arg)
(void) poll(fds, nfds, -1);

// If the waker pipe was signaled, read from it.
if (fds[0].revents & POLLIN) {
NNI_ASSERT(fds[0].fd == pq->wakerfd);
nni_plat_pipe_clear(pq->wakerfd);
}

for (int i = 1; i < nfds; i++) {
if ((events = fds[i].revents) != 0) {
for (int i = 0; i < nfds; i++) {
int fd = fds[i].fd;
events = fds[i].revents;
pfd = pfds[fd];
if (events == 0) {
continue;
}
if (pfd == NULL || fd == pq->wakerfd) {
nni_plat_pipe_clear(pq->wakerfd);
if (events & POLLHUP) {
return;
}
} else {
nni_posix_pfd_cb cb;
void *arg;

pfd = pfds[i];

if ((events & (POLLIN | POLLOUT)) != 0) {
// don't emit pollhup yet, we want
// to finish reading.
events &= ~POLLHUP;
}
nni_mtx_lock(&pfd->mtx);
cb = pfd->cb;
arg = pfd->arg;
Expand Down
37 changes: 37 additions & 0 deletions src/platform/posix/posix_pollq_poll.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

#ifndef PLATFORM_POSIX_POLLQ_POLL_H
#define PLATFORM_POSIX_POLLQ_POLL_H

#include <poll.h>

typedef struct nni_posix_pollq nni_posix_pollq;

// nni_posix_pfd is the handle used by the poller. It's internals are private
// to the poller.
struct nni_posix_pfd {
nni_posix_pollq *pq;
int fd;
nni_list_node node;
nni_cv cv;
nni_mtx mtx;
unsigned events;
nni_posix_pfd_cb cb;
void *arg;
};

#define NNI_POLL_IN ((unsigned) POLLIN)
#define NNI_POLL_OUT ((unsigned) POLLOUT)
#define NNI_POLL_HUP ((unsigned) POLLHUP)
#define NNI_POLL_ERR ((unsigned) POLLERR)
#define NNI_POLL_INVAL ((unsigned) POLLNVAL)

#endif // PLATFORM_POSIX_POLLQ_POLL_H
Loading

0 comments on commit 5e18eb4

Please sign in to comment.