Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #395] Batch writes in Kafka consumer #396

Merged
merged 31 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
33e77ca
collect pprof heap
pingyu Feb 11, 2024
7ceeefb
unlimit retry for pd connection
pingyu Feb 13, 2024
88bebcc
reduce record size
pingyu Feb 13, 2024
9430c95
log level: info
pingyu Feb 13, 2024
25de855
reduce data size; add grafana panel
pingyu Feb 13, 2024
20b8fad
batch
pingyu Feb 25, 2024
5eafecd
fix
pingyu Feb 25, 2024
1ec6a2b
try debug
pingyu Feb 25, 2024
7e890be
fix encoder size
pingyu Feb 25, 2024
1db9282
fix
pingyu Feb 25, 2024
bf62381
Merge branch 'fix-flow-control' into kafka-consumer-batch
pingyu Feb 25, 2024
f0d1382
MQMessage pool
pingyu Feb 26, 2024
e3addb4
Merge branch 'fix-flow-control' into kafka-consumer-batch
pingyu Feb 26, 2024
b62252f
fix release
pingyu Mar 2, 2024
671adf5
wip
pingyu Mar 2, 2024
d5af5a3
fix flaky ut
pingyu Mar 2, 2024
87e7534
logging
pingyu Mar 2, 2024
a6f8b1d
fix ut
pingyu Mar 2, 2024
3d406b3
wip
pingyu Mar 2, 2024
84578d7
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 2, 2024
d0c9e75
adjust memory release parameter
pingyu Mar 3, 2024
6b64d8b
polish
pingyu Mar 3, 2024
200c939
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 3, 2024
b54817c
polish
pingyu Mar 3, 2024
f568dd3
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 3, 2024
20b67dd
polish
pingyu Mar 3, 2024
772feed
polish
pingyu Mar 3, 2024
e092598
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 3, 2024
acf0d83
fix ut
pingyu Mar 3, 2024
02b5646
Merge remote-tracking branch 'upstream/main' into kafka-consumer-batch
pingyu Mar 4, 2024
dd9620a
polish
pingyu Mar 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/cdc/sink/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

const (
printStatusInterval = 10 * time.Minute
printStatusInterval = 30 * time.Second
flushMetricsInterval = 5 * time.Second
)

Expand Down
34 changes: 22 additions & 12 deletions cdc/cdc/sink/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ type innerBatch struct {
TTLs []uint64
}

type tikvBatcher struct {
type TikvBatcher struct {
Batches []innerBatch
count int
byteSize uint64
Expand All @@ -266,22 +266,26 @@ type tikvBatcher struct {
statistics *Statistics
}

func newTiKVBatcher(statistics *Statistics) *tikvBatcher {
b := &tikvBatcher{
func NewTiKVBatcher(statistics *Statistics) *TikvBatcher {
b := &TikvBatcher{
statistics: statistics,
}
return b
}

func (b *tikvBatcher) Count() int {
func (b *TikvBatcher) Count() int {
return b.count
}

func (b *tikvBatcher) ByteSize() uint64 {
func (b *TikvBatcher) IsEmpty() bool {
return b.count == 0
}

func (b *TikvBatcher) ByteSize() uint64 {
return b.byteSize
}

func (b *tikvBatcher) getNow() uint64 {
func (b *TikvBatcher) getNow() uint64 {
failpoint.Inject("tikvSinkGetNow", func(val failpoint.Value) {
now := uint64(val.(int))
failpoint.Return(now)
Expand Down Expand Up @@ -316,16 +320,18 @@ func ExtractRawKVEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType
return
}

func (b *tikvBatcher) Append(entry *model.RawKVEntry) {
func (b *TikvBatcher) Append(entry *model.RawKVEntry) error {
if len(b.Batches) == 0 {
b.now = b.getNow()
}

opType, key, value, ttl, err := ExtractRawKVEntry(entry, b.now)
if err != nil {
log.Error("failed to extract entry", zap.Any("event", entry), zap.Error(err))
b.statistics.AddInvalidKeyCount()
return
if b.statistics != nil {
b.statistics.AddInvalidKeyCount()
}
return errors.Trace(err)
}

// NOTE: do NOT separate PUT & DELETE operations into two batch.
Expand Down Expand Up @@ -353,9 +359,11 @@ func (b *tikvBatcher) Append(entry *model.RawKVEntry) {
if opType == model.OpTypePut {
b.byteSize += uint64(len(value)) + uint64(unsafe.Sizeof(ttl))
}

return nil
}

func (b *tikvBatcher) Reset() {
func (b *TikvBatcher) Reset() {
b.Batches = b.Batches[:0]
b.count = 0
b.byteSize = 0
Expand All @@ -370,7 +378,7 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {
tick := time.NewTicker(500 * time.Millisecond)
defer tick.Stop()

batcher := newTiKVBatcher(k.statistics)
batcher := NewTiKVBatcher(k.statistics)

flushToTiKV := func() error {
return k.statistics.RecordBatchExecution(func() (int, error) {
Expand Down Expand Up @@ -426,7 +434,9 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {
}
continue
}
batcher.Append(e.rawKVEntry)
if err := batcher.Append(e.rawKVEntry); err != nil {
return errors.Trace(err)
}

if batcher.ByteSize() >= defaultTiKVBatchBytesLimit {
if err := flushToTiKV(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cdc/cdc/sink/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestTiKVSinkBatcher(t *testing.T) {
}()

statistics := NewStatistics(context.Background(), "TiKV", map[string]string{})
batcher := newTiKVBatcher(statistics)
batcher := NewTiKVBatcher(statistics)
keys := []string{
"a", "b", "c", "d", "e", "f",
}
Expand Down Expand Up @@ -183,8 +183,8 @@ func TestTiKVSinkBatcher(t *testing.T) {
ExpiredTs: expires[i],
CRTs: uint64(i),
}
batcher.Append(entry0)
batcher.Append(entry1)
require.NoError(batcher.Append(entry0))
require.Error(batcher.Append(entry1))
}
require.Len(batcher.Batches, 3)
require.Equal(6, batcher.Count())
Expand Down
70 changes: 48 additions & 22 deletions cdc/cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"math"
"math/rand"
"net/url"
"os"
"os/signal"
Expand All @@ -44,7 +45,7 @@ import (
)

const (
downstreamRetryInterval = 500 * time.Millisecond
downstreamRetryIntervalMs int = 200
)

// Sarama configuration options
Expand Down Expand Up @@ -379,15 +380,45 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if sink == nil {
panic("sink should initialized")
}
ClaimMessages:
kvs := make([]*model.RawKVEntry, 0)
for message := range claim.Messages() {
log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value)
if err != nil {
return errors.Trace(err)
}

// Return error only when the session is closed
emitChangedEvents := func() error {
if len(kvs) == 0 {
return nil
}
for {
err = sink.EmitChangedEvents(ctx, kvs...)
if err == nil {
log.Debug("emit changed events", zap.Any("kvs", kvs))
lastCRTs := sink.lastCRTs.Load()
lastKv := kvs[len(kvs)-1]
if lastCRTs < lastKv.CRTs {
sink.lastCRTs.Store(lastKv.CRTs)
}
kvs = kvs[:0]
return nil
}

log.Warn("emit row changed event failed", zap.Error(err))
if session.Context().Err() != nil {
log.Warn("session closed", zap.Error(session.Context().Err()))
return session.Context().Err()
}

sleepMs := downstreamRetryIntervalMs + rand.Intn(downstreamRetryIntervalMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
}
}

counter := 0
KvLoop:
for {
tp, hasNext, err := batchDecoder.HasNext()
if err != nil {
Expand Down Expand Up @@ -416,32 +447,21 @@ ClaimMessages:
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("sinkResolvedTs", sink.resolvedTs.Load()),
zap.Int32("partition", partition))
break ClaimMessages
continue KvLoop
}

for {
err = sink.EmitChangedEvents(ctx, kv)
if err == nil {
log.Debug("emit changed events", zap.Any("kv", kv))
lastCRTs := sink.lastCRTs.Load()
if lastCRTs < kv.CRTs {
sink.lastCRTs.Store(kv.CRTs)
}
break
}

log.Warn("emit row changed event failed", zap.Error(err))
if session.Context().Err() != nil {
log.Warn("session closed", zap.Error(session.Context().Err()))
return nil
}
time.Sleep(downstreamRetryInterval)
}
kvs = append(kvs, kv)
case model.MqMessageTypeResolved:
ts, err := batchDecoder.NextResolvedEvent()
if err != nil {
log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
}

if err := emitChangedEvents(); err != nil {
log.Info("session closed", zap.Error(err))
return nil
}

resolvedTs := sink.resolvedTs.Load()
if resolvedTs < ts {
log.Debug("update sink resolved ts",
Expand All @@ -450,13 +470,19 @@ ClaimMessages:
sink.resolvedTs.Store(ts)
}
}
session.MarkMessage(message, "")
}

if counter > kafkaMaxBatchSize {
log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize),
zap.Int("actual-batch-size", counter))
}

if err := emitChangedEvents(); err != nil {
log.Info("session closed", zap.Error(err))
return nil
}

session.MarkMessage(message, "")
}

return nil
Expand Down
52 changes: 35 additions & 17 deletions cdc/cmd/kafka-consumer/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"math"
"net/url"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
Expand All @@ -30,15 +29,17 @@ import (
)

const (
defaultPDErrorRetry int = math.MaxInt
defaultPDErrorRetry int = math.MaxInt
defaultTiKVBatchBytesLimit uint64 = 40 * 1024 * 1024 // 40MB
)

var _ sink.Sink = (*tikvSimpleSink)(nil)

// tikvSimpleSink is a sink that sends events to downstream TiKV cluster.
// The reason why we need this sink other than `cdc/sink/tikv.tikvSink` is that we need Kafka message offset to handle TiKV errors, which is not provided by `tikvSink`.
type tikvSimpleSink struct {
client *rawkv.Client
client *rawkv.Client
batcher *sink.TikvBatcher
}

func newSimpleTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaConfig, opts map[string]string, _ chan error) (*tikvSimpleSink, error) {
Expand All @@ -56,34 +57,50 @@ func newSimpleTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaC
return nil, errors.Trace(err)
}
return &tikvSimpleSink{
client: client,
client: client,
batcher: sink.NewTiKVBatcher(nil),
}, nil
}

func (s *tikvSimpleSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error {
now := uint64(time.Now().Unix())
s.batcher.Reset()

for _, entry := range rawKVEntries {
opType, key, value, ttl, err := sink.ExtractRawKVEntry(entry, now)
if err != nil {
return errors.Trace(err)
flushToTiKV := func() error {
if s.batcher.IsEmpty() {
return nil
}

if opType == model.OpTypePut {
err := s.client.PutWithTTL(ctx, key, value, ttl)
var err error
for _, batch := range s.batcher.Batches {
if batch.OpType == model.OpTypePut {
err = s.client.BatchPutWithTTL(ctx, batch.Keys, batch.Values, batch.TTLs)
} else if batch.OpType == model.OpTypeDelete {
err = s.client.BatchDelete(ctx, batch.Keys)
} else {
err = errors.Errorf("unexpected OpType: %v", batch.OpType)
}
if err != nil {
return errors.Trace(err)
}
} else if opType == model.OpTypeDelete {
err := s.client.Delete(ctx, key)
if err != nil {
}
s.batcher.Reset()
return nil
}

for _, entry := range rawKVEntries {
err := s.batcher.Append(entry)
if err != nil {
return errors.Trace(err)
}

if s.batcher.ByteSize() >= defaultTiKVBatchBytesLimit {
if err := flushToTiKV(); err != nil {
return errors.Trace(err)
}
} else {
return errors.Errorf("unexpected opType %v", opType)
}
}
return nil

return errors.Trace(flushToTiKV())
}

func (s *tikvSimpleSink) FlushChangedEvents(ctx context.Context, _ model.KeySpanID, resolvedTs uint64) (uint64, error) {
Expand All @@ -95,6 +112,7 @@ func (s *tikvSimpleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error
}

func (s *tikvSimpleSink) Close(ctx context.Context) error {
s.batcher.Reset()
return errors.Trace(s.client.Close())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '2.1'
version: "3"

services:
zookeeper:
Expand Down
3 changes: 2 additions & 1 deletion cdc/tests/integration_tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ function on_exit() {
return 0
else
echo "Error $STATUS_CODE occurred on $LINE for sink $SINK_TYPE"
tail -n +1 "$WORK_DIR"/cdc*.log
# CI env already collect "*.log". Uncomment it for other envs.
# tail -n +1 "$WORK_DIR"/cdc*.log
fi
}
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function run() {
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

for i in $(seq 1 10); do
for _ in $(seq 1 10); do
tikv-cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$UP_PD
rawkv_op $UP_PD put 5000
tikv-cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$UP_PD
Expand Down
Loading