Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace NNG_OPT_SUB_SUBSCRIBE/UNSUBSCRIBE with functions. #1909

Merged
merged 2 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/ref/migrate/nanomsg.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ The following options are changed.
| `NN_IPV4ONLY` | None | Use URL such as `tcp4://` to obtain this functionality. |
| `NN_SOCKET_NAME` | `NNG_OPT_SOCKNAME` |
| `NN_MAXTTL` | `NNG_OPT_MAXTTL` |
| `NN_SUB_SUBSCRIBE` | `nng_sub0_socket_subscribe` | No longer an option, use a function call. |
| `NN_SUB_UNSUBSCRIBE` | `nng_sub0_socket_unsubscribe` | No longer an option, use a function call. |

## Error Codes

Expand Down
7 changes: 7 additions & 0 deletions docs/ref/migrate/nng1.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ matching the actual wire protocol values, instead of `int`.

The `NNG_OPT_RAW` option has aso been replaced by a function, `nng_socket_raw`.

## Subscriptions

The `NNG_OPT_SUB_SUBSCRIBE` and `NNG_OPT_SUB_UNSUBCRIBE` options have been replaced by
the following functions: `nng_sub0_socket_subscribe`, `nng_sub0_socket_unsubscribe`,
`nng_sub0_ctx_subscribe` and `nng_sub0_ctx_unsubscribe`. These functions, like the options
they replace, are only applicable to SUB sockets.

## Statistics Use Constified Pointers

A number of the statistics functions take, or return, `const nng_stat *` instead
Expand Down
12 changes: 9 additions & 3 deletions include/nng/protocol/pubsub0/sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H
#define NNG_PROTOCOL_PUBSUB0_SUB_H

#include <nng/nng.h>

#ifdef __cplusplus
extern "C" {
#endif
Expand All @@ -19,6 +21,13 @@ NNG_DECL int nng_sub0_open(nng_socket *);

NNG_DECL int nng_sub0_open_raw(nng_socket *);

NNG_DECL int nng_sub0_socket_subscribe(
nng_socket id, const void *buf, size_t sz);
NNG_DECL int nng_sub0_socket_unsubscribe(
nng_socket id, const void *buf, size_t sz);
NNG_DECL int nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz);
NNG_DECL int nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz);

#ifndef nng_sub_open
#define nng_sub_open nng_sub0_open
#endif
Expand All @@ -27,9 +36,6 @@ NNG_DECL int nng_sub0_open_raw(nng_socket *);
#define nng_sub_open_raw nng_sub0_open_raw
#endif

#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"

#define NNG_OPT_SUB_PREFNEW "sub:prefnew"

#ifdef __cplusplus
Expand Down
18 changes: 18 additions & 0 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,18 @@ nni_sock_proto_pipe_ops(nni_sock *sock)
return (&sock->s_pipe_ops);
}

struct nni_proto_sock_ops *
nni_sock_proto_ops(nni_sock *sock)
{
return (&sock->s_sock_ops);
}

struct nni_proto_ctx_ops *
nni_ctx_proto_ops(nni_ctx *ctx)
{
return (&ctx->c_ops);
}

void *
nni_sock_proto_data(nni_sock *sock)
{
Expand Down Expand Up @@ -1142,6 +1154,12 @@ nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing)
return (rv);
}

void *
nni_ctx_proto_data(nni_ctx *ctx)
{
return (ctx->c_data);
}

static void
nni_ctx_destroy(nni_ctx *ctx)
{
Expand Down
4 changes: 4 additions & 0 deletions src/core/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ extern bool nni_sock_raw(nni_sock *);
extern void *nni_sock_proto_data(nni_sock *);
extern void nni_sock_add_stat(nni_sock *, nni_stat_item *);

extern struct nni_proto_sock_ops *nni_sock_proto_ops(nni_sock *);
extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
extern struct nni_proto_ctx_ops *nni_ctx_proto_ops(nni_ctx *);

extern int nni_sock_setopt(
nni_sock *, const char *, const void *, size_t, nni_opt_type);
Expand Down Expand Up @@ -77,6 +79,8 @@ extern int nni_ctx_open(nni_ctx **, nni_sock *);
// NNG_ECLOSED unless the final argument is true.)
extern int nni_ctx_find(nni_ctx **, uint32_t, bool);

extern void *nni_ctx_proto_data(nni_ctx *);

// nni_ctx_rele is called to release a hold on the context. These holds
// are acquired by either nni_ctx_open or nni_ctx_find. If the context
// is being closed (nni_ctx_close was called), and this is the last reference,
Expand Down
2 changes: 1 addition & 1 deletion src/sp/protocol/pubsub0/pub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ test_pub_send_queued(void)
// test to be really meaningful.
NUTS_PASS(nng_pub0_open(&pub));
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0));
NUTS_PASS(nng_sub0_socket_subscribe(sub, "", 0));
NUTS_PASS(nng_socket_set_int(pub, NNG_OPT_SENDBUF, 10));
NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 10));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
Expand Down
127 changes: 91 additions & 36 deletions src/sp/protocol/pubsub0/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <string.h>

#include "core/nng_impl.h"
#include "core/socket.h"
#include "nng/protocol/pubsub0/sub.h"

// Subscriber protocol. The SUB protocol receives messages sent to
Expand Down Expand Up @@ -454,13 +455,11 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
// to replace this with a patricia trie, like old nanomsg had.

static int
sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
sub0_ctx_subscribe(sub0_ctx *ctx, const void *buf, size_t sz)
{
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
NNI_ARG_UNUSED(t);

nni_mtx_lock(&sock->lk);
NNI_LIST_FOREACH (&ctx->topics, topic) {
Expand Down Expand Up @@ -492,13 +491,11 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
}

static int
sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
sub0_ctx_unsubscribe(sub0_ctx *ctx, const void *buf, size_t sz)
{
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
NNI_ARG_UNUSED(t);

nni_mtx_lock(&sock->lk);
NNI_LIST_FOREACH (&ctx->topics, topic) {
Expand Down Expand Up @@ -579,14 +576,6 @@ static nni_option sub0_ctx_options[] = {
.o_get = sub0_ctx_get_recv_buf_len,
.o_set = sub0_ctx_set_recv_buf_len,
},
{
.o_name = NNG_OPT_SUB_SUBSCRIBE,
.o_set = sub0_ctx_subscribe,
},
{
.o_name = NNG_OPT_SUB_UNSUBSCRIBE,
.o_set = sub0_ctx_unsubscribe,
},
{
.o_name = NNG_OPT_SUB_PREFNEW,
.o_get = sub0_ctx_get_prefer_new,
Expand Down Expand Up @@ -636,20 +625,6 @@ sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t));
}

static int
sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
return (sub0_ctx_subscribe(&sock->master, buf, sz, t));
}

static int
sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t));
}

static int
sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
Expand Down Expand Up @@ -685,14 +660,6 @@ static nni_proto_ctx_ops sub0_ctx_ops = {
};

static nni_option sub0_sock_options[] = {
{
.o_name = NNG_OPT_SUB_SUBSCRIBE,
.o_set = sub0_sock_subscribe,
},
{
.o_name = NNG_OPT_SUB_UNSUBSCRIBE,
.o_set = sub0_sock_unsubscribe,
},
{
.o_name = NNG_OPT_RECVBUF,
.o_get = sub0_sock_get_recv_buf_len,
Expand Down Expand Up @@ -736,3 +703,91 @@ nng_sub0_open(nng_socket *sock)
{
return (nni_proto_open(sock, &sub0_proto));
}

int
nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz)
{
int rv;
nni_sock *s;
sub0_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&s, id.id)) != 0)) {
return (rv);
}
// validate the socket type
if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
nni_sock_rele(s);
return (NNG_ENOTSUP);
}
sock = nni_sock_proto_data(s);
rv = sub0_ctx_subscribe(&sock->master, buf, sz);
nni_sock_rele(s);
return (rv);
}

int
nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz)
{
int rv;
nni_sock *s;
sub0_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&s, id.id)) != 0)) {
return (rv);
}
// validate the socket type
if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
nni_sock_rele(s);
return (NNG_ENOTSUP);
}
sock = nni_sock_proto_data(s);
rv = sub0_ctx_unsubscribe(&sock->master, buf, sz);
nni_sock_rele(s);
return (rv);
}

int
nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz)
{
int rv;
nni_ctx *c;
sub0_ctx *ctx;

if (((rv = nni_init()) != 0) ||
((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
return (rv);
}
// validate the socket type
if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
nni_ctx_rele(c);
return (NNG_ENOTSUP);
}
ctx = nni_ctx_proto_data(c);
rv = sub0_ctx_subscribe(ctx, buf, sz);
nni_ctx_rele(c);
return (rv);
}

int
nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz)
{
int rv;
nni_ctx *c;
sub0_ctx *ctx;

if (((rv = nni_init()) != 0) ||
((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
return (rv);
}
// validate the socket type
if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
nni_ctx_rele(c);
return (NNG_ENOTSUP);
}
ctx = nni_ctx_proto_data(c);
rv = sub0_ctx_unsubscribe(ctx, buf, sz);
nni_ctx_rele(c);
return (rv);
}
Loading
Loading