Skip to content

Commit

Permalink
Add sliceHeader for zero-copy parsing of message headers, use for c…
Browse files Browse the repository at this point in the history
…lient info

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Feb 4, 2025
1 parent f7c32d0 commit 99528d9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 10 deletions.
26 changes: 18 additions & 8 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4248,9 +4248,20 @@ func (c *client) setHeader(key, value string, msg []byte) []byte {
return bb.Bytes()
}

// Will return the value for the header denoted by key or nil if it does not exists.
// This function ignores errors and tries to achieve speed and no additional allocations.
// Will return a copy of the value for the header denoted by key or nil if it does not exist.
// If you know that it is safe to refer to the underlying hdr slice for the period that the
// return value is used, then sliceHeader() will be faster.
func getHeader(key string, hdr []byte) []byte {
v := sliceHeader(key, hdr)
if v == nil {
return nil
}
return append(make([]byte, 0, len(v)), v...)
}

// Will return the sliced value for the header denoted by key or nil if it does not exists.
// This function ignores errors and tries to achieve speed and no additional allocations.
func sliceHeader(key string, hdr []byte) []byte {
if len(hdr) == 0 {
return nil
}
Expand All @@ -4275,15 +4286,14 @@ func getHeader(key string, hdr []byte) []byte {
index++
}
// Collect together the rest of the value until we hit a CRLF.
var value []byte
start := index
for index < hdrLen {
if hdr[index] == '\r' && index < hdrLen-1 && hdr[index+1] == '\n' {
break
}
value = append(value, hdr[index])
index++
}
return value
return hdr[start:index:index]
}

// For bytes.HasPrefix below.
Expand Down Expand Up @@ -4400,7 +4410,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
var ci *ClientInfo
if hadPrevSi && c.pa.hdr >= 0 {
var cis ClientInfo
if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
if err := json.Unmarshal(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
ci = &cis
ci.Service = acc.Name
// Check if we are moving into a share details account from a non-shared
Expand All @@ -4409,7 +4419,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
c.addServerAndClusterInfo(ci)
}
}
} else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
} else if c.kind != LEAF || c.pa.hdr < 0 || len(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
ci = c.getClientInfo(share)
// If we did not share but the imports destination is the system account add in the server and cluster info.
if !share && isSysImport {
Expand Down Expand Up @@ -5076,7 +5086,7 @@ func (c *client) checkLeafClientInfoHeader(msg []byte) (dmsg []byte, setHdr bool
if c.pa.hdr < 0 || len(msg) < c.pa.hdr {
return msg, false
}
cir := getHeader(ClientInfoHdr, msg[:c.pa.hdr])
cir := sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])
if len(cir) == 0 {
return msg, false
}
Expand Down
24 changes: 24 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2963,6 +2963,30 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) {
}
}

func TestSliceHeader(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

hdr = genHeader(hdr, "a", "1")
hdr = genHeader(hdr, JSExpectedStream, "my-stream")
hdr = genHeader(hdr, JSExpectedLastSeq, "22")
hdr = genHeader(hdr, "b", "2")
hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24")
hdr = genHeader(hdr, JSExpectedLastMsgId, "1")
hdr = genHeader(hdr, "c", "3")

sliced := sliceHeader(JSExpectedLastSubjSeq, hdr)
copied := getHeader(JSExpectedLastSubjSeq, hdr)

require_NotNil(t, sliced)
require_True(t, cap(sliced) < cap(hdr))
require_True(t, cap(sliced) > len(sliced))

require_NotNil(t, copied)
require_Equal(t, cap(copied), len(copied))

require_True(t, bytes.Equal(sliced, copied))
}

func TestInProcessAllowedConnectionType(t *testing.T) {
tmpl := `
listen: "127.0.0.1:-1"
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
s, rr := js.srv, js.apiSubs.Match(subject)

hdr, msg := c.msgParts(rmsg)
if len(getHeader(ClientInfoHdr, hdr)) == 0 {
if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
// Check if this is the system account. We will let these through for the account info only.
sacc := s.SystemAccount()
if sacc != acc {
Expand Down Expand Up @@ -1166,7 +1166,7 @@ func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Ac
var ci ClientInfo

if len(hdr) > 0 {
if err := json.Unmarshal(getHeader(ClientInfoHdr, hdr), &ci); err != nil {
if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
return nil, nil, nil, nil, err
}
}
Expand Down

0 comments on commit 99528d9

Please sign in to comment.