Skip to content

Commit

Permalink
Drop JetStream API requests when request queue built up
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Sep 17, 2024
1 parent b163162 commit ee3cedb
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,11 +824,9 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub

// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
const warnThresh = 128
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending >= warnThresh {
s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending)
_, err := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if err == errIPQLenLimitReached {
s.RateLimitWarnf("Dropping JetStream API requests: %s", err)
}
}

Expand Down Expand Up @@ -871,12 +869,16 @@ func (s *Server) setJetStreamExportSubs() error {
// Start the go routine that will process API requests received by the
// subscription below when they are coming from routes, etc..
const maxProcs = 16
const maxRequests = 10_000
mp := runtime.GOMAXPROCS(0)
// Cap at 16 max for now on larger core setups.
if mp > maxProcs {
mp = maxProcs
}
s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](
s, "Routed JS API Requests",
ipqLimitByLen[*jsAPIRoutedReq](maxRequests),
)
for i := 0; i < mp; i++ {
s.startGoRoutine(s.processJSAPIRoutedRequests)
}
Expand Down

0 comments on commit ee3cedb

Please sign in to comment.