Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] PushConsumer implementation in jetstream package #1785

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 86 additions & 31 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,30 @@ This doc covers the basic usage of the `jetstream` package in `nats.go` client.
- [Overview](#overview)
- [Basic usage](#basic-usage)
- [Streams](#streams)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Consumers](#consumers)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer
names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from the
consumer](#receiving-messages-from-the-consumer)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a
callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming
messages](#using-messages-to-iterate-over-incoming-messages)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from pull consumers](#receiving-messages-from-pull-consumers)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming messages](#using-messages-to-iterate-over-incoming-messages)
- [Receiving messages from push consumers](#receiving-messages-from-push-consumers)
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Object Store](#object-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Examples](#examples)

## Overview
Expand Down Expand Up @@ -254,14 +251,34 @@ fmt.Println(cachedInfo.Config.Name)

## Consumers

Only pull consumers are supported in `jetstream` package. However, unlike the
JetStream API in `nats` package, pull consumers allow for continuous message
retrieval (similarly to how `nats.Subscribe()` works). Because of that, push
consumers can be easily replaced by pull consumers for most of the use cases.
Both pull and push consumers are supported in `jetstream` package. For most use
cases, we recommend using pull consumers as they allow for more fine-grained
control over the message processing and can often prevent issues such as e.g.
slow consumers. However, unlike the JetStream API in `nats` package, pull
consumers allow for continuous message retrieval (similarly to how
`nats.Subscribe()` works). Because of that, push consumers can be easily
replaced by pull consumers for most of the use cases. Push consumers are
supported mainly for the purpose of ease of migration from `nats` package. The
interfaces for consuming messages via push and pull consumers are similar, with
the main difference being that push consumers do not support fetching individual
batches of messages.

### Consumers management

CRUD operations on consumers can be achieved on 2 levels:
Both pull and push consumers can be managed using `jetstream` package. The
following example demonstrates how to create, update, fetch and delete a pull
consumer. Push consumers can be managed in a similar way, with method names
containing `Push` (e.g. `CreatePushConsumer`, `UpdatePushConsumer`,
`DeletePushConsumer`).

> __NOTE__: It is important to use `CreateConsumer` and `CreatePushConsumer`
methods to create the respective consumer types as they return the correct
interface (different for push and pull consumers). `DeliverSubject` is mandatory
when creating a push consumer and cannot be provided when creating a pull
consumer. Similarly, an attempt to get a push consumer using `Consumer` method
will result in an error (and vice versa).

CRUD operations on pull consumers can be achieved on 2 levels:

- on `JetStream` interface

Expand Down Expand Up @@ -370,6 +387,8 @@ message ordering. It is also resilient to consumer deletion.
Ordered consumers present the same set of message consumption methods as
standard pull consumers.

> __NOTE__: Ordered consumers are not supported for push consumers.

```go
js, _ := jetstream.New(nc)

Expand All @@ -380,7 +399,7 @@ cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
})
```

### Receiving messages from the consumer
### Receiving messages from pull consumers

The `Consumer` interface covers allows fetching messages on demand, with
pre-defined batch size on bytes limit, or continuous push-like receiving of
Expand Down Expand Up @@ -469,10 +488,12 @@ cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
FilterSubject: "ORDERS.A"
}))
})

consContext, _ := c.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
// messages are not acknowledged automatically
msg.Ack()
})
defer consContext.Stop()
```
Expand All @@ -491,7 +512,7 @@ type PullThresholdMessages int
buffer
- `PullHeartbeat(time.Duration)` - idle heartbeat duration for a single pull
request. An error will be triggered if at least 2 heartbeats are missed
- `WithConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a
- `ConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a
custom error handler on `Consume()`, allowing e.g. tracking missing
heartbeats.

Expand Down Expand Up @@ -565,6 +586,40 @@ for {
}
```

#### Receiving messages from push consumers

The `PushConsumer` interface currently only allows message processing in a
callback using `Consume()`.

As heartbeat for push consumers is not managed when using `Consume()`, it is
important to set `IdleHeartbeat` on the consumer level. Similarly, `FlowControl`
can be set to prevent the consumer from receiving more messages than it can
handle.

```go
cons, _ := js.CreateOrUpdatePushConsumer("ORDERS", jetstream.ConsumerConfig{
DeliverSubject: nats.NewInbox()
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
FilterSubject: "ORDERS.A",
// unlike pull consumers, idle heartbeat is configured on the consumer level
IdleHeartbeat: 30 * time.Second
})

consContext, _ := c.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
// messages are not acknowledged automatically
msg.Ack()
})
defer consContext.Stop()
```

`Consume()` on `PushConsumer` can be supplied with `ConsumeErrHandler` option
to set a custom error handler allowing e.g. tracking missing heartbeats.

> __NOTE__: `Stop()` should always be called on `ConsumeContext` to avoid
> leaking goroutines.

## Publishing on stream

`JetStream` interface allows publishing messages on stream in 2 ways:
Expand Down
146 changes: 127 additions & 19 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,21 @@ type (
CachedInfo() *ConsumerInfo
}

PushConsumer interface {
// Consume will continuously receive messages and handle them
// with the provided callback function. Consume can be configured using
// PushConsumeOpt options:
//
// - Error handling and monitoring can be configured using ConsumeErrHandler.
Consume(handler MessageHandler, opts ...PushConsumeOpt) (ConsumeContext, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to provide the Messages variant with Next? Was it available in old JS? Can't recall.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It kind of was, via ChanSubscribe and yes, I think we should add that API, but I would do it in a separate PR.


// Info fetches current ConsumerInfo from the server.
Info(context.Context) (*ConsumerInfo, error)

// CachedInfo returns ConsumerInfo currently cached on this consumer.
CachedInfo() *ConsumerInfo
}

createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
Expand Down Expand Up @@ -186,7 +201,74 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo {
return p.info
}

func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) {
// Info fetches current ConsumerInfo from the server.
func (p *pushConsumer) Info(ctx context.Context) (*ConsumerInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
infoSubject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiConsumerInfoT, p.stream, p.name))
var resp consumerInfoResponse

if _, err := p.jetStream.apiRequestJSON(ctx, infoSubject, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, resp.Error
}
if resp.Error == nil && resp.ConsumerInfo == nil {
return nil, ErrConsumerNotFound
}

p.info = resp.ConsumerInfo
return resp.ConsumerInfo, nil
}

// CachedInfo returns ConsumerInfo currently cached on this consumer.
// This method does not perform any network requests. The cached
// ConsumerInfo is updated on every call to Info and Update.
func (p *pushConsumer) CachedInfo() *ConsumerInfo {
return p.info
}

func upsertPullConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) {
resp, err := upsertConsumer(ctx, js, stream, cfg, action)
if err != nil {
return nil, err
}

return &pullConsumer{
jetStream: js,
stream: stream,
name: resp.Name,
durable: cfg.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
}, nil
}

func upsertPushConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (PushConsumer, error) {
if cfg.DeliverSubject == "" {
return nil, ErrNotPushConsumer
}

resp, err := upsertConsumer(ctx, js, stream, cfg, action)
if err != nil {
return nil, err
}

return &pushConsumer{
jetStream: js,
stream: stream,
name: resp.Name,
info: resp.ConsumerInfo,
}, nil
}

func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (*consumerInfoResponse, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
Expand Down Expand Up @@ -239,14 +321,7 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
return nil, ErrConsumerMultipleFilterSubjectsNotSupported
}

return &pullConsumer{
jetStream: js,
stream: stream,
name: resp.Name,
durable: cfg.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
}, nil
return &resp, nil
}

const (
Expand All @@ -267,6 +342,48 @@ func generateConsName() string {
}

func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consumer, error) {
info, err := fetchConsumerInfo(ctx, js, stream, name)
if err != nil {
return nil, err
}

if info.Config.DeliverSubject != "" {
return nil, ErrNotPullConsumer
}

cons := &pullConsumer{
jetStream: js,
stream: stream,
name: name,
durable: info.Config.Durable != "",
info: info,
subs: syncx.Map[string, *pullSubscription]{},
}

return cons, nil
}

func getPushConsumer(ctx context.Context, js *jetStream, stream, name string) (PushConsumer, error) {
info, err := fetchConsumerInfo(ctx, js, stream, name)
if err != nil {
return nil, err
}

if info.Config.DeliverSubject == "" {
return nil, ErrNotPushConsumer
}

cons := &pushConsumer{
jetStream: js,
stream: stream,
name: name,
info: info,
}

return cons, nil
}

func fetchConsumerInfo(ctx context.Context, js *jetStream, stream, name string) (*ConsumerInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
Expand All @@ -291,16 +408,7 @@ func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consu
return nil, ErrConsumerNotFound
}

cons := &pullConsumer{
jetStream: js,
stream: stream,
name: name,
durable: resp.Config.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
}

return cons, nil
return resp.ConsumerInfo, nil
}

func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string) error {
Expand Down
23 changes: 22 additions & 1 deletion jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type (
TimeStamp time.Time `json:"ts"`
}

// ConsumerConfig is the configuration of a JetStream consumer.
// ConsumerConfig represents the configuration of a JetStream consumer,
// encompassing both push and pull consumer settings
ConsumerConfig struct {
// Name is an optional name for the consumer. If not set, one is
// generated automatically.
Expand Down Expand Up @@ -217,6 +218,26 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`

// Fields specific for push consumers:

// DeliverSubject is the subject to deliver messages to for push consumers
DeliverSubject string `json:"deliver_subject,omitempty"`

// DeliverGroup is the group name for push consumers
DeliverGroup string `json:"deliver_group,omitempty"`

// FlowControl is a flag to enable flow control for the consumer.
// When set, server will regularly send an empty message with Status
// header 100 and a reply subject, consumers must reply to these
// messages to control the rate of message delivery
FlowControl bool `json:"flow_control,omitempty"`

// IdleHeartbeat enables push consumer idle heartbeat messages.
// If the Consumer is idle for more than the set value, an empty message
// with Status header 100 will be sent indicating the consumer is still
// alive.
IdleHeartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}

// OrderedConsumerConfig is the configuration of an ordered JetStream
Expand Down
Loading