Skip to content

Commit

Permalink
POSIX poller: add support for select, and for choosing the poller
Browse files Browse the repository at this point in the history
Some platforms or configurations may not have more modern options
like kqueue or epoll, or may be constrained by policy.
  • Loading branch information
gdamore committed Dec 18, 2024
1 parent d83f5ae commit 6c949de
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 23 deletions.
44 changes: 40 additions & 4 deletions src/platform/posix/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ if (NNG_PLATFORM_POSIX)
nng_check_sym(port_create port.h NNG_HAVE_PORT_CREATE)
nng_check_sym(epoll_create sys/epoll.h NNG_HAVE_EPOLL)
nng_check_sym(epoll_create1 sys/epoll.h NNG_HAVE_EPOLL_CREATE1)
nng_check_sym(poll poll.h NNG_HAVE_POLL)
nng_check_sym(select sys/select.h NNG_HAVE_SELECT)
nng_check_sym(getpeereid unistd.h NNG_HAVE_GETPEEREID)
nng_check_sym(SO_PEERCRED sys/socket.h NNG_HAVE_SOPEERCRED)
nng_check_struct_member(sockpeercred uid sys/socket.h NNG_HAVE_SOCKPEERCRED)
Expand Down Expand Up @@ -101,14 +103,48 @@ if (NNG_PLATFORM_POSIX)
posix_udp.c
)

if (NNG_HAVE_PORT_CREATE)
nng_sources(posix_pollq_port.c)
set(NNG_POLLQ_POLLER "auto" CACHE STRING "Poller used for multiplexing I/O")
set_property(CACHE NNG_POLLQ_POLLER PROPERTY STRINGS auto ports kqueue epoll poll select)
mark_as_advanced(NNG_POLLQ_POLLER)
if (NNG_POLLQ_POLLER STREQUAL "ports")
nng_defines(NNG_POLLQ_PORTS)
elseif (NNG_POLLQ_POLLER STREQUAL "kqueue")
nng_defines(NNG_POLLQ_KQUEUE)
elseif (NNG_POLLQ_POLLER STREQUAL "epoll")
nng_defines(NNG_POLLQ_EPOLL)
elseif (NNG_POLLQ_POLLER STREQUAL "poll")
nng_defines(NNG_POLLQ_POLL)
elseif (NNG_POLLQ_POLLER STREQUAL "select")
set(NNG_POLLQ_SELECT ON)
elseif (NNG_HAVE_PORT_CREATE)
set(NNG_POLLQ_PORTS ON)
elseif (NNG_HAVE_KQUEUE)
nng_sources(posix_pollq_kqueue.c)
set(NNG_POLLQ_KQUEUE ON)
elseif (NNG_HAVE_EPOLL AND NNG_HAVE_EVENTFD)
set(NNG_POLLQ_EPOLL ON)
elseif (NNG_HAVE_POLL)
set(NNG_POLLQ_POLL ON)
elseif (NNG_HAVE_SELECT)
set(NNG_POLLQ_SELECT TRUE)
endif()

if (NNG_POLLQ_PORTS)
message(STATUS "Using port events for multiplexing I/O.")
nng_sources(posix_pollq_port.c)
elseif (NNG_POLLQ_KQUEUE)
message(STATUS "Using kqueue for multiplexing I/O.")
nng_sources(posix_pollq_kqueue.c)
elseif (NNG_POLLQ_EPOLL)
message(DEBUG "Using epoll for multiplexing I/O.")
nng_sources(posix_pollq_epoll.c)
else ()
elseif (NNG_POLLQ_POLL)
message(STATUS "Using poll for multiplexing I/O.")
nng_sources(posix_pollq_poll.c)
elseif (NNG_POLLQ_SELECT)
message(STATUS "Using select for multiplexing I/O.")
nng_sources(posix_pollq_select.c)
else()
message(FATAL_ERROR "No suitable poller found for multiplexing I/O.")
endif ()

if (NNG_HAVE_ARC4RANDOM)
Expand Down
4 changes: 2 additions & 2 deletions src/platform/posix/posix_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ ipc_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
nni_posix_pfd_arm(c->pfd, POLLOUT);
nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
Expand Down Expand Up @@ -298,7 +298,7 @@ ipc_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
nni_posix_pfd_arm(c->pfd, POLLIN);
nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
Expand Down
9 changes: 9 additions & 0 deletions src/platform/posix/posix_pollq.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@ extern int nni_posix_pfd_fd(nni_posix_pfd *);
extern void nni_posix_pfd_close(nni_posix_pfd *);
extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);

#ifdef POLLIN
#define NNI_POLL_IN ((unsigned) POLLIN)
#define NNI_POLL_OUT ((unsigned) POLLOUT)
#define NNI_POLL_HUP ((unsigned) POLLHUP)
#define NNI_POLL_ERR ((unsigned) POLLERR)
#define NNI_POLL_INVAL ((unsigned) POLLNVAL)
#else
// maybe using select
#define NNI_POLL_IN (0x0001)
#define NNI_POLL_OUT (0x0010)
#define NNI_POLL_HUP (0x0004)
#define NNI_POLL_ERR (0x0008)
#define NNI_POLL_INVAL (0x0020)
#endif // POLLIN

#endif // NNG_PLATFORM_POSIX

Expand Down
10 changes: 5 additions & 5 deletions src/platform/posix/posix_pollq_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
return (0);
}

if (events & POLLIN) {
if (events & NNI_POLL_IN) {
EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
if (events & POLLOUT) {
if (events & NNI_POLL_OUT) {
EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf);
}
while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) {
Expand Down Expand Up @@ -254,10 +254,10 @@ nni_posix_poll_thr(void *arg)

switch (ev->filter) {
case EVFILT_READ:
revents = POLLIN;
revents = NNI_POLL_IN;
break;
case EVFILT_WRITE:
revents = POLLOUT;
revents = NNI_POLL_OUT;
break;
}
if (ev->udata == NULL) {
Expand All @@ -267,7 +267,7 @@ nni_posix_poll_thr(void *arg)
}
pf = (void *) ev->udata;
if (ev->flags & EV_ERROR) {
revents |= POLLHUP;
revents |= NNI_POLL_HUP;

Check warning on line 270 in src/platform/posix/posix_pollq_kqueue.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_pollq_kqueue.c#L270

Added line #L270 was not covered by tests
}

nni_mtx_lock(&pf->mtx);
Expand Down
Loading

0 comments on commit 6c949de

Please sign in to comment.