Skip to content

Commit

Permalink
ensure message headers are including for batching decision
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes committed May 19, 2023
1 parent b4221d5 commit e484d20
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
11 changes: 11 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ func (msg *Message) size() int32 {
return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
}

func (msg *Message) headerSize() int {
return varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})
}

func (msg *Message) totalSize() int32 {
return int32(msg.headerSize()) + msg.size()
}

type message struct {
CRC int32
MagicByte int8
Expand Down
2 changes: 1 addition & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
}

func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
bytes := int64(msg.size())
bytes := int64(msg.totalSize())

if b.size > 0 && (b.bytes+bytes) > maxBytes {
return false
Expand Down

0 comments on commit e484d20

Please sign in to comment.