From 8e099127a5f6437346959a7438a51dd5298a3481 Mon Sep 17 00:00:00 2001 From: Luis Carvalho Date: Fri, 31 Mar 2023 13:54:30 +0100 Subject: [PATCH 1/2] Fix readUint8Array The binary.Read method falls back to using the reflect package on *[]uint8 data types, which is much slower. This seems to fix the consumer performance on big messages (> 100B), but not on small ones. --- pkg/stream/buffer_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/buffer_reader.go b/pkg/stream/buffer_reader.go index 1a11e05b..612bae4e 100644 --- a/pkg/stream/buffer_reader.go +++ b/pkg/stream/buffer_reader.go @@ -59,6 +59,6 @@ func readString(readerStream io.Reader) string { func readUint8Array(readerStream io.Reader, size uint32) []uint8 { var res = make([]uint8, size) - _ = binary.Read(readerStream, binary.BigEndian, &res) + _ = binary.Read(readerStream, binary.BigEndian, res) return res } From 64ef36e1c4c8d2ff5b2db1324bc0a9329ffbc67d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 3 Apr 2023 15:54:56 +0200 Subject: [PATCH 2/2] Add message bytes test Signed-off-by: Gabriele Santomaggio --- pkg/stream/consumer_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 4eb1a46b..381c3ae6 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -437,7 +437,7 @@ var _ = Describe("Streaming Consumers", func() { } }(chConfirm, producer) Expect(err).NotTo(HaveOccurred()) - msg := amqp.NewMessage([]byte("message")) + msg := amqp.NewMessage([]byte{0x00, 0x0e, 0x01, 0x0f, 0x05, 0x08, 0x04, 0x03}) msg.Properties = &amqp.MessageProperties{ MessageID: nil, UserID: nil, @@ -466,6 +466,7 @@ var _ = Describe("Streaming Consumers", func() { Expect(message.Properties.To).To(Equal("ToTest")) Expect(message.Properties.ContentType).To(Equal("ContentTypeTest")) Expect(message.Properties.ContentEncoding).To(Equal("ContentEncodingTest")) + Expect(message.Data[0]).To(Equal([]byte{0x00, 0x0e, 0x01, 0x0f, 0x05, 0x08, 0x04, 0x03})) }, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).SetConsumerName("consumer_test")) Expect(err).NotTo(HaveOccurred())