-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jetstream Client API General Design #51
Comments
For discussion I have this in go, funtion names etc all up for discussion. This is looking a lot like the entire jsm consumer admin and would be a ton of copy/paste Message Acknowledgement and MetadataBasic Ackserr := m.Ack()
err := m.Nak()
err := m.AckProgress()
err := m.AckNext("_INBOX.xxx")
err := m.AckTerm() Acks with options for reliable acksTimeout based on time.Duration() err := m.Ack(nats.AckWaitDuration(5*time.Second)) Timeout based on a context - timeout or deadline context err := m.Ack(nats.AckWaitContext(ctx)) JS Originating Message MetadataThese will parse the message metadata on demand - first access will parse and cache it - so in the non JS case or when not used will not impact performance We considered lazy-adding these as headers but with some of them being int and time.Time it was just annoying - users would have to do additional parsing. func (m *Msg) JSStream() string
func (m *Msg) JSConsumer() string
func (m *Msg) JSDelivered() int
func (m *Msg) JSStreamSequence() int
func (m *Msg) JSConsumerSequence() int
func (m *Msg) JSTimeStamp() time.Time PublishingThis publishes to a stream, which must already exist, waiting for 2 seconds for that specific stream to respond. Does not yet support multiple ack respnses err := nc.Publish("js.str", []byte("hello"), nats.ExpectAckFromStream("STREAM", 2*time.Second)) RequestsRequest is unchanged - we don want to intercept the responses I think, but want to help parse them msg, _ := nc.Request("js.str", []byte("hello"), 2 * time.Second)
ack, _ := nats.ParsePublishAck(msg) // will error with the message if -ERR or not +OK a generic error is return
fmt.Printf("Ack from %s with sequence %d\n", ack.Stream, ack.Sequence) SubscribesFor subscribe I tried BasicThis wil ensure that if cfg is durable that it will be updated with ib, if ephemeral it will be made with that ib sub, err := nc.Subscribe(ib, cb, nats.WithConsumer(cfg)) Template styleHere I have a typical consumer config and do just some slight modes to it ordersConsumer := nats.NewConsumer(.....) // perhaps with sampling etc, can be passed to nats.WithConsumer() if complete
// later
newOrders, _ := nc.Subscribe(ib, cb, nats.NewConsumerFromDefault(stream, ordersConsumer, nats.FilterStreamBySubject("ORDERS.new"))
dispatchedOrders, _ := nc.Subscribe(ib, cb, nats.NewConsumerFromDefault(stream, ordersConsumer, nats.FilterStreamBySubject("ORDERS.dispatched")) General Helpers and AdminThese are probably the absolute minimum we need to help people a bit: sub := nats.NextSubject("ORDERS", "new")
err := nc.NextMessage("ORDERS", "new", 1, "_INBOX.xxx")
os, _ := nc.ConsumerStatus("ORDERS", "new")
err := nc.DeleteConsumer("ORDERS", "new")
consumers, _ := nc.ListConsumers("ORDERS")
err := nc.NewConsumer(stream, cfg) Pull HelperI think inevitably we will need a helper for Pull subscribers that maintain a local buffer, does health checks etc, not sure how this will look exactly so this is just spit balling. // 100 message buffer
newOrders, err := nats.PullSubscribe("ORDERS", "new", 100, nats.WithConsumer(cfg)) // last is optional
// allows graceful close and shutdown
go func() {
<-ctx.Done()
newOrders.Close()
}()
for msg := range newOrders.NextMsg() {
// do stuff
// also other ack types that makes sense has to be ack here so the subscriber can intelligently
// handle things using +NXT or a bigger pull if we are falling behind etc, can also be a pull specific
// msg that has the same methods i guess
newOrders.Ack(msg)
} |
This is a great writeup, thanks @ripienaar. Throwing out a few ideas... I propose we make sure the JetStream specific APIs clear to avoid confusion. Documentation would make it clear that they only work when jetstream is enabled. sub, err := nc.StreamConsumerNextSubject("ORDERS", "new")
err := nc.StreamConsumerNextMessage("ORDERS", "new", 1, "_INBOX.xxx")
sts, err := nc.StreamConsumerStatus("ORDERS", "new")
scl, err := nc.StreamConsumerList("ORDERS")
err := nc.StreamConsumerDelete("ORDERS", "new") One could consider creating a stream consumer object and placing the APIs there rather than on the connection itself, but this creates separation between requesting a message and receiving one on the connection. sc, err := nc.GetStreamConsumer("ORDERS", "new")
err := sc.NextMessage(1, "_INBOX.xxx")
sts, err := sc.Status()
err = sc.Delete()
var scl []*StreamConsumer
scl, err := sc.StreamConsumerList("ORDERS") I'm wondering if we could use the no-responder API in these, or ping for presence of a jetstream enabled server. One of the most frequently encountered issues with NATS streaming was a problem identifying whether a NATS streaming connect call timed out, the wrong cluster ID was used, or streaming wasn't running/enabled. I'm concerned users will get into trouble with the I suggest something like: // sync publish
ack, err := nc.PublishAndVerifyStorage("subj", payload, 2 * time.Second)
// (or PublishWithAck, PublishAndVerifyStream,, PublishWithStreamAck, etc.)
// In the future could add expected # of acks in case a KV store, object store, multiple streams, etc is behind that subject. (Note that API could work for KV store, ObjectStore, etc). The async publish could be the
About packaging, I strongly advise these APIs being in the same package as the core NATS APIs because these are tightly coupled with core NATS. By virtue of being the same package, assembly, jar, etc, that association will be made by users and jetstream won't be considered a bolt on/add on as perceived by some users with NATS streaming. Versioning is in line between client functionality and server functionality. We could also someday expect to see JS related flags in the handshake info returned by the server. |
I like this direction, did you take a look at JSM @ColinSullivan1? much can be pillaged from there. @derekcollison what do you think of the direction Colin proposes, specifically not extending the existing Publish() etc with JS specific stuff, this is from a recent chat we had and this was the consensus among a bunch of us. |
I think JetStream Publish is just normal publish waiting on the proper response from JetStream. Lots of ways to do this, not a huge fan of new APIs with super long names though.. ;) |
From yesterdays call, I was not able to do the wrapper in nats.go I promised as that would trigger go cycling imports. Here is a test of what such an integration would look like, note I am also adding a JSMsg object and moving all the ack and stuff there, thus msg.JSMsg() will create that from an existing *Msg and from there you can access all the JS message stuff There's a branch here nats-io/nats.go@master...ripienaar:jsm_wrap that would not compile but would show what I am on about Other things we said, I'd make the validator a injectable dependency so that jsm.go only gets all the JSON schema stuff and nats.go would not have validation or the dependencies as it would have a null validator. So ignore those here func TestJetStream(t *testing.T) {
srv := startJetStream(t)
defer os.RemoveAll(srv.JetStreamConfig().StoreDir)
defer srv.Shutdown()
nc, err := Connect(srv.ClientURL())
if err != nil {
t.Fatalf("connect failed: %v", err)
}
if !nc.IsJetStreamEnabled() {
t.Fatalf("JetStream is not enabled")
}
str, err := nc.NewStreamFromDefault("ORDERS", DefaultStream, StreamSubjects("ORDERS.*"))
if err != nil {
t.Fatalf("stream create failed: %s", err)
}
received := 0
_, err = nc.Subscribe(NewInbox(), func(m *Msg) {
jsm, err := m.JSMsg()
if err != nil {
t.Fatalf("failed to create JSMsg: %s", err)
}
err = jsm.Ack(AckWaitDuration(2 * time.Second))
if err != nil {
t.Fatalf("Ack was not acknowledged: %s", err)
}
received++
})
// ephemeral
_, err = str.NewConsumer(ConsumerFilterStreamBySubject("ORDERS.new"))
if err != nil {
t.Fatalf("consumer create failed: %s", err)
}
for i := 0; i < 10; i++ {
// also str.Publish("ORDERS.new", []byte(...), 2*time.Second) will wait 2 seconds and verify str.Name() received it
state, err := nc.JetStreamPublish("ORDERS.new", []byte(fmt.Sprintf("new order %d", i)), PublishRequiresStream("ORDERS"))
if err != nil {
t.Fatalf("publish failed: %s", err)
}
}
si, err := str.State()
if err != nil {
t.Fatalf("failed to get stream state: %s", err)
}
if si.Msgs != 10 {
t.Fatalf("expected 10 messages got %d", si.Msgs)
}
time.Sleep(10 * time.Millisecond)
if received != 10 {
t.Fatalf("expected 10 messages got %d", received)
}
} |
Design the general Jetstream client API for publishing, acknowledging, etc.
The text was updated successfully, but these errors were encountered: