diff --git a/server/consumer.go b/server/consumer.go index 9af1c5b2b32..67492134394 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3126,6 +3126,14 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { return needAck } +// Used in nextReqFromMsg, since the json.Unmarshal causes the request +// struct to escape to the heap always. This should reduce GC pressure. +var jsGetNextPool = sync.Pool{ + New: func() any { + return &JSApiConsumerGetNextRequest{} + }, +} + // Helper for the next message requests. func nextReqFromMsg(msg []byte) (time.Time, int, int, bool, time.Duration, time.Time, error) { req := bytes.TrimSpace(msg) @@ -3135,7 +3143,11 @@ func nextReqFromMsg(msg []byte) (time.Time, int, int, bool, time.Duration, time. return time.Time{}, 1, 0, false, 0, time.Time{}, nil case req[0] == '{': - var cr JSApiConsumerGetNextRequest + cr := jsGetNextPool.Get().(*JSApiConsumerGetNextRequest) + defer func() { + *cr = JSApiConsumerGetNextRequest{} + jsGetNextPool.Put(cr) + }() if err := json.Unmarshal(req, &cr); err != nil { return time.Time{}, -1, 0, false, 0, time.Time{}, err } @@ -3535,6 +3547,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) { if err := o.waiting.add(wr); err != nil { sendErr(409, "Exceeded MaxWaiting") + wr.recycle() return } o.signalNewMessages() diff --git a/server/stream.go b/server/stream.go index 5fca89e3d88..484c7a5bb87 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5199,6 +5199,9 @@ func newJSPubMsg(dsubj, subj, reply string, hdr, msg []byte, o *consumer, seq ui if pm != nil { m = pm.(*jsPubMsg) buf = m.buf[:0] + if hdr != nil { + hdr = append(m.hdr[:0], hdr...) + } } else { m = new(jsPubMsg) } @@ -5227,6 +5230,9 @@ func (pm *jsPubMsg) returnToPool() { if len(pm.buf) > 0 { pm.buf = pm.buf[:0] } + if len(pm.hdr) > 0 { + pm.hdr = pm.hdr[:0] + } jsPubMsgPool.Put(pm) }