diff --git a/message.go b/message.go index c320006ba..0539e6038 100644 --- a/message.go +++ b/message.go @@ -49,6 +49,17 @@ func (msg *Message) size() int32 { return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize } +func (msg *Message) headerSize() int { + return varArrayLen(len(msg.Headers), func(i int) int { + h := &msg.Headers[i] + return varStringLen(h.Key) + varBytesLen(h.Value) + }) +} + +func (msg *Message) totalSize() int32 { + return int32(msg.headerSize()) + msg.size() +} + type message struct { CRC int32 MagicByte int8 diff --git a/writer.go b/writer.go index f5d6fc2c5..1d4ccbab3 100644 --- a/writer.go +++ b/writer.go @@ -1216,7 +1216,7 @@ func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch { } func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool { - bytes := int64(msg.size()) + bytes := int64(msg.totalSize()) if b.size > 0 && (b.bytes+bytes) > maxBytes { return false