Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/gh 220 slow consumer error not handled #221

Merged
merged 2 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ Configuration is a JSON encoded file. If no config file is found at the given pa
// Timeout in milliseconds for NATS requests.
"requestTimeout": 3000,

// Size of message buffer for incoming NATS requests.
"bufferSize": 8192,

// Header authentication resource method for web resources.
// Prior to accessing the resource, this resource method will be
// called, allowing an auth service to set a token using
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
NatsTLSKey string `json:"natsKey"`
NatsRootCAs []string `json:"natsRootCAs"`
RequestTimeout int `json:"requestTimeout"`
BufferSize int `json:"bufferSize"`
Debug bool `json:"debug"`
Trace bool `json:"trace"`
server.Config
Expand Down Expand Up @@ -110,6 +111,9 @@ func (c *Config) SetDefault() {
if c.NatsRootCAs == nil {
c.NatsRootCAs = []string{}
}
if c.BufferSize == 0 {
c.BufferSize = 8192
}
c.Config.SetDefault()
}

Expand Down Expand Up @@ -303,6 +307,7 @@ func main() {
ClientKey: cfg.NatsTLSKey,
RootCAs: cfg.NatsRootCAs,
RequestTimeout: time.Duration(cfg.RequestTimeout) * time.Millisecond,
BufferSize: cfg.BufferSize,
Logger: l,
}, cfg.Config)
if err != nil {
Expand Down
20 changes: 14 additions & 6 deletions nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/resgateio/resgate/server/mq"
)

const (
natsChannelSize = 256
)

// Client holds a client connection to a nats server.
type Client struct {
RequestTimeout time.Duration
Expand All @@ -26,6 +22,7 @@ type Client struct {
ClientKey string
RootCAs []string
Logger logger.Logger
BufferSize int

mq *nats.Conn
mqCh chan *nats.Msg
Expand Down Expand Up @@ -75,7 +72,11 @@ func (c *Client) Connect() error {
c.Logf("Connecting to NATS at %s", c.URL)

// Create connection options
opts := []nats.Option{nats.NoReconnect(), nats.ClosedHandler(c.onClose)}
opts := []nats.Option{
nats.NoReconnect(),
nats.ClosedHandler(c.onClose),
nats.ErrorHandler(c.onError),
}
if c.Creds != "" {
opts = append(opts, nats.UserCredentials(c.Creds))
}
Expand All @@ -98,7 +99,7 @@ func (c *Client) Connect() error {
}

c.mq = nc
c.mqCh = make(chan *nats.Msg, natsChannelSize)
c.mqCh = make(chan *nats.Msg, c.BufferSize)
c.mqReqs = make(map[*nats.Subscription]*responseCont)
c.tq = timerqueue.New(c.onTimeout, c.RequestTimeout)
c.stopped = make(chan struct{})
Expand Down Expand Up @@ -173,6 +174,13 @@ func (c *Client) onClose(conn *nats.Conn) {
}
}

func (c *Client) onError(conn *nats.Conn, sub *nats.Subscription, err error) {
c.Logger.Error(err.Error())
if err == nats.ErrSlowConsumer {
c.Close()
}
}

// SendRequest sends a request to the MQ.
func (c *Client) SendRequest(subj string, payload []byte, cb mq.Response) {
inbox := nats.NewInbox()
Expand Down