Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NNG_OPT_RECVFD and NNG_OPT_SENDFD converted to functions. #1905

Merged
merged 1 commit into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions docs/ref/migrate/nanomsg.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,24 @@ NNG approach to messages. Likewise there is no `struct nn_cmsghdr` equivalent.

The following options are changed.

| Nanomsg Option | NNG Eqvaivalent | Notes |
| ---------------------- | -------------------- | ------------------------------------------------------- |
| `NN_LINGER` | None | NNG does not support tuning this. |
| `NN_SNDBUF` | `NNG_OPT_SENDBUF` | NNG value is given in messages, not bytes. |
| `NN_RCVBUF` | `NNG_OPT_RECVBUF` | NNG value is given in messages, not bytes. |
| `NN_SNDTIMEO` | `NNG_OPT_SENDTIMEO` |
| `NN_RCVTIMEO` | `NNG_OPT_RECVTIMEO` |
| `NN_RECONNECT_IVL` | `NNG_OPT_RECONNMINT` |
| `NN_RECONNECT_IVL_MAX` | `NNG_OPT_RECONNMAXT` |
| `NN_SNDPRIO` | None | Not supported in NNG yet. |
| `NN_RCVPRIO` | None | Not supported in NNG yet. |
| `NN_RCVFD` | `NNG_OPT_RECVFD` |
| `NN_SNDFD` | `NNG_OPT_SENDFD` |
| `NN_DOMAIN` | None | NNG options are not divided by domain or protocol. |
| `NN_PROTOCOL` | `NNG_OPT_PROTO` | See also `NNG_OPT_PROTONAME`. |
| `NN_IPV4ONLY` | None | Use URL such as `tcp4://` to obtain this functionality. |
| `NN_SOCKET_NAME` | `NNG_OPT_SOCKNAME` |
| `NN_MAXTTL` | `NNG_OPT_MAXTTL` |
| Nanomsg Option | NNG Eqvaivalent | Notes |
| ---------------------- | ------------------------------ | ------------------------------------------------------- |
| `NN_LINGER` | None | NNG does not support tuning this. |
| `NN_SNDBUF` | `NNG_OPT_SENDBUF` | NNG value is given in messages, not bytes. |
| `NN_RCVBUF` | `NNG_OPT_RECVBUF` | NNG value is given in messages, not bytes. |
| `NN_SNDTIMEO` | `NNG_OPT_SENDTIMEO` |
| `NN_RCVTIMEO` | `NNG_OPT_RECVTIMEO` |
| `NN_RECONNECT_IVL` | `NNG_OPT_RECONNMINT` |
| `NN_RECONNECT_IVL_MAX` | `NNG_OPT_RECONNMAXT` |
| `NN_SNDPRIO` | None | Not supported in NNG yet. |
| `NN_RCVPRIO` | None | Not supported in NNG yet. |
| `NN_RCVFD` | `+nng_socket_get_recv_poll_fd` | No longer an option, use a function call. |
| `NN_SNDFD` | `+nng_socket_get_send_poll_fd` | No longer an option, use a function call. |
| `NN_DOMAIN` | None | NNG options are not divided by domain or protocol. |
| `NN_PROTOCOL` | `NNG_OPT_PROTO` | See also `NNG_OPT_PROTONAME`. |
| `NN_IPV4ONLY` | None | Use URL such as `tcp4://` to obtain this functionality. |
| `NN_SOCKET_NAME` | `NNG_OPT_SOCKNAME` |
| `NN_MAXTTL` | `NNG_OPT_MAXTTL` |

## Error Codes

Expand Down
14 changes: 12 additions & 2 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ NNG_DECL int nng_socket_get_ptr(nng_socket, const char *, void **);
NNG_DECL int nng_socket_get_ms(nng_socket, const char *, nng_duration *);
NNG_DECL int nng_socket_get_addr(nng_socket, const char *, nng_sockaddr *);

// These functions are used to obtain a file descriptor that will poll
// as readable if the socket can receive or send. Applications must never
// read or write to the file descriptor directly, but simply check it
// with poll, epoll, kqueue, or similar functions. This is intended to
// aid in integration NNG with external event loops based on polling I/O.
// Note that using these functions will force NNG to make extra system calls,
// and thus impact performance. The file descriptor pollability is
// level-triggered. These file descriptors will be closed when the socket
// is closed.
NNG_DECL int nng_socket_get_recv_poll_fd(nng_socket id, int *fdp);
NNG_DECL int nng_socket_get_send_poll_fd(nng_socket id, int *fdp);

// Utility function for getting a printable form of the socket address
// for display in logs, etc. It is not intended to be parsed, and the
// display format may change without notice. Generally you should alow
Expand Down Expand Up @@ -722,8 +734,6 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
#define NNG_OPT_PEERNAME "peer-name"
#define NNG_OPT_RECVBUF "recv-buffer"
#define NNG_OPT_SENDBUF "send-buffer"
#define NNG_OPT_RECVFD "recv-fd"
#define NNG_OPT_SENDFD "send-fd"
#define NNG_OPT_RECVTIMEO "recv-timeout"
#define NNG_OPT_SENDTIMEO "send-timeout"
#define NNG_OPT_LOCADDR "local-address"
Expand Down
8 changes: 7 additions & 1 deletion src/core/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ struct nni_proto_sock_ops {
// Receive a message.
void (*sock_recv)(void *, nni_aio *);

// Return the receive poll FD.
int (*sock_recv_poll_fd)(void *, int *);

// Return the send poll FD.
int (*sock_send_poll_fd)(void *, int *);

// Options. Must not be NULL. Final entry should have NULL name.
nni_option *sock_options;
};
Expand All @@ -124,7 +130,7 @@ struct nni_proto {
uint32_t proto_flags; // Protocol flags
const nni_proto_sock_ops *proto_sock_ops; // Per-socket operations
const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations
const nni_proto_ctx_ops * proto_ctx_ops; // Context operations
const nni_proto_ctx_ops *proto_ctx_ops; // Context operations
};

// We quite intentionally use a signature where the upper word is nonzero,
Expand Down
11 changes: 0 additions & 11 deletions src/core/sock_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ test_send_nonblock(void)
NUTS_CLOSE(s1);
}

void
test_readonly_options(void)
{
nng_socket s1;
NUTS_OPEN(s1);
NUTS_FAIL(nng_socket_set_int(s1, NNG_OPT_RECVFD, 0), NNG_EREADONLY);
NUTS_FAIL(nng_socket_set_int(s1, NNG_OPT_SENDFD, 0), NNG_EREADONLY);
NUTS_CLOSE(s1);
}

void
test_socket_base(void)
{
Expand Down Expand Up @@ -596,7 +586,6 @@ NUTS_TESTS = {
{ "recv non-block", test_recv_nonblock },
{ "send timeout", test_send_timeout },
{ "send non-block", test_send_nonblock },
{ "read only options", test_readonly_options },
{ "socket base", test_socket_base },
{ "socket name", test_socket_name },
{ "socket name oversize", test_socket_name_oversize },
Expand Down
58 changes: 22 additions & 36 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,19 @@ static void nni_ctx_destroy(nni_ctx *);
#define SOCK(s) ((nni_sock *) (s))

static int
sock_get_fd(void *s, unsigned flag, int *fdp)
sock_get_fd(nni_sock *s, unsigned flag, int *fdp)
{
int rv;
nni_pollable *p;

if ((flag & nni_sock_flags(SOCK(s))) == 0) {
if ((flag & nni_sock_flags(s)) == 0) {
return (NNG_ENOTSUP);
}

if (flag == NNI_PROTO_FLAG_SND) {
rv = nni_msgq_get_sendable(SOCK(s)->s_uwq, &p);
rv = nni_msgq_get_sendable(s->s_uwq, &p);
} else {
rv = nni_msgq_get_recvable(SOCK(s)->s_urq, &p);
rv = nni_msgq_get_recvable(s->s_urq, &p);
}

if (rv == 0) {
Expand All @@ -136,30 +136,6 @@ sock_get_fd(void *s, unsigned flag, int *fdp)
return (rv);
}

static int
sock_get_sendfd(void *s, void *buf, size_t *szp, nni_type t)
{
int fd;
int rv;

if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_SND, &fd)) != 0) {
return (rv);
}
return (nni_copyout_int(fd, buf, szp, t));
}

static int
sock_get_recvfd(void *s, void *buf, size_t *szp, nni_type t)
{
int fd;
int rv;

if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_RCV, &fd)) != 0) {
return (rv);
}
return (nni_copyout_int(fd, buf, szp, t));
}

static int
sock_get_raw(void *s, void *buf, size_t *szp, nni_type t)
{
Expand Down Expand Up @@ -286,14 +262,6 @@ static const nni_option sock_options[] = {
.o_get = sock_get_sendtimeo,
.o_set = sock_set_sendtimeo,
},
{
.o_name = NNG_OPT_RECVFD,
.o_get = sock_get_recvfd,
},
{
.o_name = NNG_OPT_SENDFD,
.o_get = sock_get_sendfd,
},
{
.o_name = NNG_OPT_RECVBUF,
.o_get = sock_get_recvbuf,
Expand Down Expand Up @@ -353,6 +321,24 @@ nni_sock_id(nni_sock *s)
return (s->s_id);
}

int
nni_sock_get_send_fd(nni_sock *s, int *fdp)
{
if (s->s_sock_ops.sock_send_poll_fd != NULL) {
return (s->s_sock_ops.sock_send_poll_fd(s->s_data, fdp));
}
return (sock_get_fd(s, NNI_PROTO_FLAG_SND, fdp));
}

int
nni_sock_get_recv_fd(nni_sock *s, int *fdp)
{
if (s->s_sock_ops.sock_recv_poll_fd != NULL) {
return (s->s_sock_ops.sock_recv_poll_fd(s->s_data, fdp));
}
return (sock_get_fd(s, NNI_PROTO_FLAG_RCV, fdp));
}

// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
// the upper read and write queues.
nni_msgq *
Expand Down
2 changes: 2 additions & 0 deletions src/core/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extern int nni_sock_getopt(
extern void nni_sock_send(nni_sock *, nni_aio *);
extern void nni_sock_recv(nni_sock *, nni_aio *);
extern uint32_t nni_sock_id(nni_sock *);
extern int nni_sock_get_send_fd(nni_sock *s, int *fdp);
extern int nni_sock_get_recv_fd(nni_sock *s, int *fdp);

// These are socket methods that protocol operations can expect to call.
// Note that each of these should be called without any locks held, since
Expand Down
33 changes: 33 additions & 0 deletions src/nng.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "nng/nng.h"
#include "core/nng_impl.h"
#include "core/platform.h"
#include "core/socket.h"

// This file provides the "public" API. This is a thin wrapper around
// internal API functions. We use the public prefix instead of internal,
Expand Down Expand Up @@ -1141,6 +1142,38 @@
return (socket_get(id, n, v, NULL, NNI_TYPE_SOCKADDR));
}

int
nng_socket_get_recv_poll_fd(nng_socket id, int *fdp)
{
int rv;
nni_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&sock, id.id)) != 0)) {
return (rv);

Check warning on line 1153 in src/nng.c

View check run for this annotation

Codecov / codecov/patch

src/nng.c#L1153

Added line #L1153 was not covered by tests
}

rv = nni_sock_get_recv_fd(sock, fdp);
nni_sock_rele(sock);
return (rv);
}

int
nng_socket_get_send_poll_fd(nng_socket id, int *fdp)
{
int rv;
nni_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&sock, id.id)) != 0)) {
return (rv);

Check warning on line 1169 in src/nng.c

View check run for this annotation

Codecov / codecov/patch

src/nng.c#L1169

Added line #L1169 was not covered by tests
}

rv = nni_sock_get_send_fd(sock, fdp);
nni_sock_rele(sock);
return (rv);
}

int
nng_pipe_notify(nng_socket s, nng_pipe_ev ev, nng_pipe_cb cb, void *arg)
{
Expand Down
66 changes: 24 additions & 42 deletions src/sp/protocol/bus0/bus.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,34 +363,20 @@ bus0_sock_recv(void *arg, nni_aio *aio)
}

static int
bus0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_type t)
bus0_sock_get_send_fd(void *arg, int *fdp)
{
bus0_sock *sock = arg;
int fd;
int rv;
nni_mtx_lock(&sock->mtx);
// BUS sockets are *always* writable (best effort)
nni_pollable_raise(&sock->can_send);
rv = nni_pollable_getfd(&sock->can_send, &fd);
nni_mtx_unlock(&sock->mtx);

if (rv == 0) {
rv = nni_copyout_int(fd, buf, szp, t);
}
return (rv);
return (nni_pollable_getfd(&sock->can_send, fdp));
}

static int
bus0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
bus0_sock_get_recv_fd(void *arg, int *fdp)
{
bus0_sock *s = arg;
int rv;
int fd;

if ((rv = nni_pollable_getfd(&s->can_recv, &fd)) == 0) {
rv = nni_copyout_int(fd, buf, szp, t);
}
return (rv);
return (nni_pollable_getfd(&s->can_recv, fdp));
}

static int
Expand Down Expand Up @@ -474,14 +460,6 @@ static nni_proto_pipe_ops bus0_pipe_ops = {
};

static nni_option bus0_sock_options[] = {
{
.o_name = NNG_OPT_SENDFD,
.o_get = bus0_sock_get_send_fd,
},
{
.o_name = NNG_OPT_RECVFD,
.o_get = bus0_sock_get_recv_fd,
},
{
.o_name = NNG_OPT_RECVBUF,
.o_get = bus0_sock_get_recv_buf_len,
Expand All @@ -499,25 +477,29 @@ static nni_option bus0_sock_options[] = {
};

static nni_proto_sock_ops bus0_sock_ops = {
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_options = bus0_sock_options,
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_send_poll_fd = bus0_sock_get_send_fd,
.sock_recv_poll_fd = bus0_sock_get_recv_fd,
.sock_options = bus0_sock_options,
};

static nni_proto_sock_ops bus0_sock_ops_raw = {
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init_raw,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_options = bus0_sock_options,
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init_raw,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_send_poll_fd = bus0_sock_get_send_fd,
.sock_recv_poll_fd = bus0_sock_get_recv_fd,
.sock_options = bus0_sock_options,
};

static nni_proto bus0_proto = {
Expand Down
7 changes: 4 additions & 3 deletions src/sp/protocol/bus0/bus_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "nng/nng.h"
#include <nuts.h>

#include <nng/protocol/bus0/bus.h>
Expand Down Expand Up @@ -244,7 +245,7 @@ test_bus_poll_readable(void)
NUTS_PASS(nng_bus0_open(&s2));
NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd));
NUTS_PASS(nng_socket_get_recv_poll_fd(s1, &fd));
NUTS_TRUE(fd >= 0);

// Not readable if not connected!
Expand Down Expand Up @@ -277,10 +278,10 @@ test_bus_poll_writeable(void)
NUTS_PASS(nng_bus0_open(&s1));
NUTS_PASS(nng_bus0_open(&s2));
NUTS_PASS(nng_socket_set_int(s2, NNG_OPT_SENDBUF, 1));
NUTS_PASS(nng_socket_get_int(s2, NNG_OPT_SENDFD, &fd));
NUTS_PASS(nng_socket_get_send_poll_fd(s2, &fd));
NUTS_TRUE(fd >= 0);

// Pub is *always* writeable
// Bus is *always* writeable
NUTS_TRUE(nuts_poll_fd(fd));

// Even after connect (no message yet)
Expand Down
Loading
Loading