diff --git a/cdc/model/mq.go b/cdc/model/mq.go index 844ac21a672..3af4481bc18 100644 --- a/cdc/model/mq.go +++ b/cdc/model/mq.go @@ -3,6 +3,7 @@ package model import ( "bytes" "encoding/json" + "fmt" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -128,15 +129,20 @@ func (batch *BatchMsg) HasNext() bool { } func (batch *BatchMsg) Next() ([]byte, []byte, error) { + fmt.Printf("before get key len: %d\n", batch.keyBuf.Len()) _, keyLen, err := codec.DecodeInt(batch.keyBuf.Next(8)) if err != nil { return nil, nil, err } + fmt.Printf("before get key: %d\n", keyLen) key := batch.keyBuf.Next(int(keyLen)) + + fmt.Printf("before get value len: %d\n", batch.valueBuf.Len()) _, valueLen, err := codec.DecodeInt(batch.valueBuf.Next(8)) if err != nil { return nil, nil, err } + fmt.Printf("before get value: %d\n", valueLen) value := batch.valueBuf.Next(int(valueLen)) return key, value, nil } diff --git a/cdc/model/mq_test.go b/cdc/model/mq_test.go new file mode 100644 index 00000000000..b67a1f10b7f --- /dev/null +++ b/cdc/model/mq_test.go @@ -0,0 +1,12 @@ +package model + +import ( + "github.com/pingcap/check" +) + +type batchMsgSuite struct{} + +var _ = check.Suite(&batchMsgSuite{}) + +func (s *batchMsgSuite) TestBatchMsg(c *check.C) { +} diff --git a/cdc/sink/mqProducer/kafka.go b/cdc/sink/mqProducer/kafka.go index fac6ed64c9a..7af5c34ad6b 100644 --- a/cdc/sink/mqProducer/kafka.go +++ b/cdc/sink/mqProducer/kafka.go @@ -185,6 +185,7 @@ func (k *kafkaSaramaProducer) runWorker(ctx context.Context) error { Value: sarama.ByteEncoder(value), Partition: int32(partition), } + fmt.Printf("flush to downstream key %x value %x\n", key, value) if resolved { msg.Metadata = resolvedTs } diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index 64b778e9b41..eb40aba1368 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -306,8 +306,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram } batchMsg := model.NewBatchMsg() for message := range claim.Messages() { - log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) + log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) batchMsg.SetRaw(message.Key, message.Value) + fmt.Printf("msg key %x value %x\n", message.Key, message.Value) for batchMsg.HasNext() { keyBytes, valueBytes, err := batchMsg.Next() if err != nil { @@ -330,6 +331,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ddl := new(model.DDLEvent) ddl.FromMqMessage(key, value) c.appendDDL(ddl) + fmt.Printf("get ddl event %v\n", ddl) case model.MqMessageTypeRow: globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) if key.Ts <= globalResolvedTs || key.Ts <= sink.resolvedTs { @@ -349,6 +351,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if err != nil { log.Fatal("emit row changed event failed", zap.Error(err)) } + fmt.Printf("get row event %v\n", row) case model.MqMessageTypeResolved: err := sink.EmitRowChangedEvent(ctx, &model.RowChangedEvent{Ts: key.Ts, Resolved: true}) if err != nil { @@ -358,6 +361,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if resolvedTs < key.Ts { atomic.StoreUint64(&sink.resolvedTs, key.Ts) } + fmt.Printf("get resolved event %v\n", resolvedTs) } session.MarkMessage(message, "") }