Skip to content

Commit

Permalink
Add nng_sub0_subscribe and friends.
Browse files Browse the repository at this point in the history
These are new functions that replace `NNG_OPT_SUBSCRIBE` and
`NNG_OPT_UNSUBSCRIBE`.  They are provided here as a transition
aid before those options are removed in NNG 2.0.
  • Loading branch information
gdamore committed Nov 9, 2024
1 parent f002c6a commit 144afd9
Show file tree
Hide file tree
Showing 8 changed files with 484 additions and 283 deletions.
527 changes: 265 additions & 262 deletions docs/man/libnng.3.adoc

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions docs/man/nng_sub.7.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ The following protocol-specific options are available.
message is accepted. If no topic matches, then the message is
discarded.
+
NOTE: This option is deprecated. New applications should use
xref:nng_sub_subscribe.3.adoc[`nng_sub0_socket_subscribe`] or `nng_sub0_ctx_subscribe`
functions.
+
NOTE: This option is a byte array. Thus if you use
xref:nng_setopt.3.adoc[`nng_setopt_string()`] the `NUL` terminator byte will
be included in the topic.
Expand All @@ -79,6 +83,10 @@ TIP: To receive all messages, an empty topic (zero length) can be used.
This option, also write-only, removes a topic from the subscription list.
Note that if the topic was not previously subscribed to with
`NNG_OPT_SUB_SUBSCRIBE` then an `NNG_ENOENT` error will result.
+
NOTE: This option is deprecated. New applications should use
xref:nng_sub_subscribe.3.adoc[`nng_sub0_socket_unsubscribe`] or `nng_sub0_ctx_unsubscribe`
functions.

((`NNG_OPT_SUB_PREFNEW`))::

Expand All @@ -95,5 +103,6 @@ The _sub_ protocol has no protocol-specific headers.

[.text-left]
xref:nng_sub_open.3.adoc[nng_sub_open(3)],
xref:nng_sub_subscribe.3.adoc[nng_sub_subscribe(3)],
xref:nng_pub.7.adoc[nng_pub(7)],
xref:nng.7.adoc[nng(7)]
67 changes: 67 additions & 0 deletions docs/man/nng_sub_subscribe.3.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
= nng_sub_subscribe(3)
//
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
//
// This document is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

== NAME

nng_sub_subscribe - manage SUB subscriptions

== SYNOPSIS

[source,c]
----
#include <nng/nng.h>
#include <nng/protocol/pubsub0/sub.h>
int nng_sub0_socket_subscribe(nng_socket s, const void *buf, size_t sz);
int nng_sub0_socket_unsubscribe(nng_socket s, const void *buf, size_t sz);
int nng_sub0_ctx_subscribe(nng_ctx c, const void *buf, size_t sz);
int nng_sub0_ctx_unsubscribe(nng_ctx c, const void *buf, size_t sz);
----

== DESCRIPTION

These functions are used to subscribe, or unsubscribe, message topics
on either the xref:nng_sub.7.adoc[_sub_] version 0 socket _s_,
or the _sub_ version 0 context _c_.

Message topics are used to filter messages. The first _sz_ bytes of an
incoming message is compared against _buf_, and if equal the message
is accepted and will be available for receiving.

Multiple topics may be registered for the same socket or context, and
incoming messages will be forwarded to the application if any of the topics match.

TIP: To disable filtering altogether, the _buf_ may be `NULL` if _sz_ is zero.
In this case, all messages will be forwarded to the application.

TIP: These functions should be used instead of the `NNG_OPT_SUB_SUBSCRIBE` and
`NNG_OPT_SUB_UNSUBSCRIBE` options.

== RETURN VALUES

These functions return 0 on success, and non-zero otherwise.

== ERRORS

[horizontal]
`NNG_ENOMEM`:: Insufficient memory is available.
`NNG_ENOTSUP`:: The protocol is not supported.
`NNG_ENOENT`:: The topic is not subscribed.

== SEE ALSO

[.text-left]
xref:nng_socket.5.adoc[nng_socket(5)],
xref:nng_pub.7.adoc[nng_pub(7)],
xref:nng_sub.7.adoc[nng_sub(7)],
xref:nng.7.adoc[nng(7)]
7 changes: 7 additions & 0 deletions include/nng/protocol/pubsub0/sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ NNG_DECL int nng_sub0_open(nng_socket *);

NNG_DECL int nng_sub0_open_raw(nng_socket *);

NNG_DECL int nng_sub0_socket_subscribe(
nng_socket id, const void *buf, size_t sz);
NNG_DECL int nng_sub0_socket_unsubscribe(
nng_socket id, const void *buf, size_t sz);
NNG_DECL int nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz);
NNG_DECL int nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz);

#ifndef nng_sub_open
#define nng_sub_open nng_sub0_open
#endif
Expand Down
18 changes: 18 additions & 0 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -889,12 +889,30 @@ nni_sock_proto_pipe_ops(nni_sock *sock)
return (&sock->s_pipe_ops);
}

struct nni_proto_sock_ops *
nni_sock_proto_ops(nni_sock *sock)
{
return (&sock->s_sock_ops);
}

struct nni_proto_ctx_ops *
nni_ctx_proto_ops(nni_ctx *ctx)
{
return (&ctx->c_ops);
}

void *
nni_sock_proto_data(nni_sock *sock)
{
return (sock->s_data);
}

void *
nni_ctx_proto_data(nni_ctx *ctx)
{
return (ctx->c_data);
}

int
nni_sock_add_listener(nni_sock *s, nni_listener *l)
{
Expand Down
6 changes: 5 additions & 1 deletion src/core/socket.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand All @@ -26,7 +26,9 @@ extern bool nni_sock_raw(nni_sock *);
extern void *nni_sock_proto_data(nni_sock *);
extern void nni_sock_add_stat(nni_sock *, nni_stat_item *);

extern struct nni_proto_sock_ops *nni_sock_proto_ops(nni_sock *);
extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
extern struct nni_proto_ctx_ops *nni_ctx_proto_ops(nni_ctx *);

extern int nni_sock_setopt(
nni_sock *, const char *, const void *, size_t, nni_opt_type);
Expand Down Expand Up @@ -89,6 +91,8 @@ extern void nni_ctx_close(nni_ctx *);
// nni_ctx_id returns the context ID, which can be used with nni_ctx_find.
extern uint32_t nni_ctx_id(nni_ctx *);

extern void *nni_ctx_proto_data(nni_ctx *);

// nni_ctx_recv receives asynchronously.
extern void nni_ctx_recv(nni_ctx *, nni_aio *);

Expand Down
88 changes: 88 additions & 0 deletions src/sp/protocol/pubsub0/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,94 @@ static nni_option sub0_sock_options[] = {
},
};

int
nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz)
{
int rv;
nni_sock *s;
sub0_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&s, id.id)) != 0)) {
return (rv);

Check warning on line 730 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L730

Added line #L730 was not covered by tests
}
// validate the socket type
if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
nni_sock_rele(s);
return (NNG_ENOTSUP);

Check warning on line 735 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L734-L735

Added lines #L734 - L735 were not covered by tests
}
sock = nni_sock_proto_data(s);
rv = sub0_ctx_subscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE);
nni_sock_rele(s);
return (rv);
}

int
nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz)
{
int rv;
nni_sock *s;
sub0_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&s, id.id)) != 0)) {
return (rv);

Check warning on line 752 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L752

Added line #L752 was not covered by tests
}
// validate the socket type
if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
nni_sock_rele(s);
return (NNG_ENOTSUP);

Check warning on line 757 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L756-L757

Added lines #L756 - L757 were not covered by tests
}
sock = nni_sock_proto_data(s);
rv = sub0_ctx_unsubscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE);
nni_sock_rele(s);
return (rv);
}

int
nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz)
{
int rv;
nni_ctx *c;
sub0_ctx *ctx;

if (((rv = nni_init()) != 0) ||
((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
return (rv);

Check warning on line 774 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L774

Added line #L774 was not covered by tests
}
// validate the socket type
if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
nni_ctx_rele(c);
return (NNG_ENOTSUP);

Check warning on line 779 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L778-L779

Added lines #L778 - L779 were not covered by tests
}
ctx = nni_ctx_proto_data(c);
rv = sub0_ctx_subscribe(ctx, buf, sz, NNI_TYPE_OPAQUE);
nni_ctx_rele(c);
return (rv);
}

int
nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz)
{
int rv;
nni_ctx *c;
sub0_ctx *ctx;

if (((rv = nni_init()) != 0) ||
((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
return (rv);

Check warning on line 796 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L796

Added line #L796 was not covered by tests
}
// validate the socket type
if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
nni_ctx_rele(c);
return (NNG_ENOTSUP);

Check warning on line 801 in src/sp/protocol/pubsub0/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sp/protocol/pubsub0/sub.c#L800-L801

Added lines #L800 - L801 were not covered by tests
}
ctx = nni_ctx_proto_data(c);
rv = sub0_ctx_unsubscribe(ctx, buf, sz, NNI_TYPE_OPAQUE);
nni_ctx_rele(c);
return (rv);
}

static nni_proto_sock_ops sub0_sock_ops = {
.sock_size = sizeof(sub0_sock),
.sock_init = sub0_sock_init,
Expand Down
Loading

0 comments on commit 144afd9

Please sign in to comment.