From bc7fa519f8fc3794144c8a3717f3fc549474a26a Mon Sep 17 00:00:00 2001 From: Craig Swank Date: Tue, 8 Oct 2019 14:08:47 -0600 Subject: [PATCH] use decoder for print (C-p) --- go.sum | 3 +++ internal/kafka/kafka.go | 16 ++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/go.sum b/go.sum index ba48a114..7e486e1f 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs= github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -48,11 +49,13 @@ github.com/pierrec/lz4 v1.0.2-0.20171218195038-2fcda4cb7018/go.mod h1:pdkljMzZIN github.com/pierrec/xxHash v0.1.1 h1:KP4NrV9023xp3M4FkTYfcXqWigsOCImL1ANJ7sh5vg4= github.com/pierrec/xxHash v0.1.1/go.mod h1:w2waW5Zoa/Wc4Yqe0wgrIYAGKqRMf7czn2HNKXmuL+I= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20171128170426-e181e095bae9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= diff --git a/internal/kafka/kafka.go b/internal/kafka/kafka.go index 8c175ff5..f2c13abb 100644 --- a/internal/kafka/kafka.go +++ b/internal/kafka/kafka.go @@ -275,9 +275,9 @@ func (c *Client) SearchTopic(partitions []Partition, s string, firstResult bool, func (c *Client) search(info Partition, s string, stop func() bool, cb func(int64, int64)) (int64, error) { n := int64(-1) var i int64 - err := c.consume(info, info.End, func(msg string) bool { + err := c.consume(info, info.End, func(d []byte) bool { cb(i, info.End) - if strings.Contains(msg, s) { + if strings.Contains(string(d), s) { n = i + info.Offset return true } @@ -296,13 +296,17 @@ func (c *Client) Search(info Partition, s string, cb func(i, j int64)) (int64, e //Fetch gets all messages in a partition up intil the 'end' offset. func (c *Client) Fetch(info Partition, end int64, cb func(string)) error { - return c.consume(info, end, func(s string) bool { - cb(s) + return c.consume(info, end, func(msg []byte) bool { + val, err := c.decoder.Decode(info.Topic, msg) + if err != nil { + return true + } + cb(string(val)) return false }) } -func (c *Client) consume(info Partition, end int64, cb func(string) bool) error { +func (c *Client) consume(info Partition, end int64, cb func([]byte) bool) error { consumer, err := sarama.NewConsumer(c.addrs, nil) if err != nil { return err @@ -326,7 +330,7 @@ func (c *Client) consume(info Partition, end int64, cb func(string) bool) error for i := int64(0); i < end; i++ { select { case msg := <-pc.Messages(): - if stop := cb(string(msg.Value)); stop { + if stop := cb(msg.Value); stop { return nil } case <-time.After(time.Second):