Skip to content

Commit

Permalink
debug log
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <[email protected]>
  • Loading branch information
5kbpers committed Apr 1, 2020
1 parent 857d7d0 commit 2cc6079
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cdc/model/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package model
import (
"bytes"
"encoding/json"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -128,15 +129,20 @@ func (batch *BatchMsg) HasNext() bool {
}

func (batch *BatchMsg) Next() ([]byte, []byte, error) {
fmt.Printf("before get key len: %d\n", batch.keyBuf.Len())
_, keyLen, err := codec.DecodeInt(batch.keyBuf.Next(8))
if err != nil {
return nil, nil, err
}
fmt.Printf("before get key: %d\n", keyLen)
key := batch.keyBuf.Next(int(keyLen))

fmt.Printf("before get value len: %d\n", batch.valueBuf.Len())
_, valueLen, err := codec.DecodeInt(batch.valueBuf.Next(8))
if err != nil {
return nil, nil, err
}
fmt.Printf("before get value: %d\n", valueLen)
value := batch.valueBuf.Next(int(valueLen))
return key, value, nil
}
Expand Down
12 changes: 12 additions & 0 deletions cdc/model/mq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package model

import (
"github.com/pingcap/check"
)

type batchMsgSuite struct{}

var _ = check.Suite(&batchMsgSuite{})

func (s *batchMsgSuite) TestBatchMsg(c *check.C) {
}
1 change: 1 addition & 0 deletions cdc/sink/mqProducer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (k *kafkaSaramaProducer) runWorker(ctx context.Context) error {
Value: sarama.ByteEncoder(value),
Partition: int32(partition),
}
fmt.Printf("flush to downstream key %x value %x\n", key, value)
if resolved {
msg.Metadata = resolvedTs
}
Expand Down
6 changes: 5 additions & 1 deletion kafka_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}
batchMsg := model.NewBatchMsg()
for message := range claim.Messages() {
log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
batchMsg.SetRaw(message.Key, message.Value)
fmt.Printf("msg key %x value %x\n", message.Key, message.Value)
for batchMsg.HasNext() {
keyBytes, valueBytes, err := batchMsg.Next()
if err != nil {
Expand All @@ -330,6 +331,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
ddl := new(model.DDLEvent)
ddl.FromMqMessage(key, value)
c.appendDDL(ddl)
fmt.Printf("get ddl event %v\n", ddl)
case model.MqMessageTypeRow:
globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
if key.Ts <= globalResolvedTs || key.Ts <= sink.resolvedTs {
Expand All @@ -349,6 +351,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if err != nil {
log.Fatal("emit row changed event failed", zap.Error(err))
}
fmt.Printf("get row event %v\n", row)
case model.MqMessageTypeResolved:
err := sink.EmitRowChangedEvent(ctx, &model.RowChangedEvent{Ts: key.Ts, Resolved: true})
if err != nil {
Expand All @@ -358,6 +361,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if resolvedTs < key.Ts {
atomic.StoreUint64(&sink.resolvedTs, key.Ts)
}
fmt.Printf("get resolved event %v\n", resolvedTs)
}
session.MarkMessage(message, "")
}
Expand Down

0 comments on commit 2cc6079

Please sign in to comment.