Skip to content

Commit

Permalink
use decoder for print (C-p)
Browse files Browse the repository at this point in the history
  • Loading branch information
cswank committed Oct 8, 2019
1 parent 9fca4ce commit bc7fa51
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs=
github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -48,11 +49,13 @@ github.com/pierrec/lz4 v1.0.2-0.20171218195038-2fcda4cb7018/go.mod h1:pdkljMzZIN
github.com/pierrec/xxHash v0.1.1 h1:KP4NrV9023xp3M4FkTYfcXqWigsOCImL1ANJ7sh5vg4=
github.com/pierrec/xxHash v0.1.1/go.mod h1:w2waW5Zoa/Wc4Yqe0wgrIYAGKqRMf7czn2HNKXmuL+I=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20171128170426-e181e095bae9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
Expand Down
16 changes: 10 additions & 6 deletions internal/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ func (c *Client) SearchTopic(partitions []Partition, s string, firstResult bool,
func (c *Client) search(info Partition, s string, stop func() bool, cb func(int64, int64)) (int64, error) {
n := int64(-1)
var i int64
err := c.consume(info, info.End, func(msg string) bool {
err := c.consume(info, info.End, func(d []byte) bool {
cb(i, info.End)
if strings.Contains(msg, s) {
if strings.Contains(string(d), s) {
n = i + info.Offset
return true
}
Expand All @@ -296,13 +296,17 @@ func (c *Client) Search(info Partition, s string, cb func(i, j int64)) (int64, e

//Fetch gets all messages in a partition up intil the 'end' offset.
func (c *Client) Fetch(info Partition, end int64, cb func(string)) error {
return c.consume(info, end, func(s string) bool {
cb(s)
return c.consume(info, end, func(msg []byte) bool {
val, err := c.decoder.Decode(info.Topic, msg)
if err != nil {
return true
}
cb(string(val))
return false
})
}

func (c *Client) consume(info Partition, end int64, cb func(string) bool) error {
func (c *Client) consume(info Partition, end int64, cb func([]byte) bool) error {
consumer, err := sarama.NewConsumer(c.addrs, nil)
if err != nil {
return err
Expand All @@ -326,7 +330,7 @@ func (c *Client) consume(info Partition, end int64, cb func(string) bool) error
for i := int64(0); i < end; i++ {
select {
case msg := <-pc.Messages():
if stop := cb(string(msg.Value)); stop {
if stop := cb(msg.Value); stop {
return nil
}
case <-time.After(time.Second):
Expand Down

0 comments on commit bc7fa51

Please sign in to comment.