From bf0dce650ad892fee1f55b851723eedc393327fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Wed, 11 May 2022 00:24:34 +0800 Subject: [PATCH] This is an automated cherry-pick of #5372 Signed-off-by: ti-chi-bot --- cdc/sink/buffer_sink.go | 231 ++++++++++++++++++ cdc/sink/mq/mq.go | 527 ++++++++++++++++++++++++++++++++++++++++ cdc/sink/mysql.go | 20 ++ 3 files changed, 778 insertions(+) create mode 100644 cdc/sink/buffer_sink.go create mode 100644 cdc/sink/mq/mq.go diff --git a/cdc/sink/buffer_sink.go b/cdc/sink/buffer_sink.go new file mode 100644 index 00000000000..8c0ff7ed874 --- /dev/null +++ b/cdc/sink/buffer_sink.go @@ -0,0 +1,231 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +const maxFlushBatchSize = 512 + +// bufferSink buffers emitted events and checkpoints and flush asynchronously. +// Note that it is a thread-safe Sink implementation. +type bufferSink struct { + Sink + changeFeedCheckpointTs uint64 + tableCheckpointTsMap sync.Map + buffer map[model.TableID][]*model.RowChangedEvent + bufferMu sync.Mutex + flushTsChan chan flushMsg +} + +var _ Sink = (*bufferSink)(nil) + +func newBufferSink( + backendSink Sink, checkpointTs model.Ts, +) *bufferSink { + sink := &bufferSink{ + Sink: backendSink, + // buffer shares the same flow control with table sink + buffer: make(map[model.TableID][]*model.RowChangedEvent), + changeFeedCheckpointTs: checkpointTs, + flushTsChan: make(chan flushMsg, maxFlushBatchSize), + } + return sink +} + +type runState struct { + batch [maxFlushBatchSize]flushMsg + + metricTotalRows prometheus.Counter +} + +func (b *bufferSink) run(ctx context.Context, changefeedID model.ChangeFeedID, errCh chan error) { + state := runState{ + metricTotalRows: metrics.BufferSinkTotalRowsCountCounter. + WithLabelValues(changefeedID.Namespace, changefeedID.ID), + } + defer func() { + metrics.BufferSinkTotalRowsCountCounter. + DeleteLabelValues(changefeedID.Namespace, changefeedID.ID) + }() + + for { + keepRun, err := b.runOnce(ctx, &state) + if err != nil && errors.Cause(err) != context.Canceled { + errCh <- err + return + } + if !keepRun { + return + } + } +} + +func (b *bufferSink) runOnce(ctx context.Context, state *runState) (bool, error) { + batchSize, batch := 0, state.batch + push := func(event flushMsg) { + batch[batchSize] = event + batchSize++ + } + select { + case <-ctx.Done(): + return false, ctx.Err() + case event := <-b.flushTsChan: + push(event) + RecvBatch: + for batchSize < maxFlushBatchSize { + select { + case event := <-b.flushTsChan: + push(event) + default: + break RecvBatch + } + } + } + + start := time.Now() + b.bufferMu.Lock() + // find all rows before resolvedTs and emit to backend sink + for i := 0; i < batchSize; i++ { + tableID, resolvedTs := batch[i].tableID, batch[i].resolvedTs + rows := b.buffer[tableID] + i := sort.Search(len(rows), func(i int) bool { + return rows[i].CommitTs > resolvedTs + }) + if i == 0 { + continue + } + state.metricTotalRows.Add(float64(i)) + + err := b.Sink.EmitRowChangedEvents(ctx, rows[:i]...) + if err != nil { + b.bufferMu.Unlock() + return false, errors.Trace(err) + } + + // put remaining rows back to buffer + // append to a new, fixed slice to avoid lazy GC + b.buffer[tableID] = append(make([]*model.RowChangedEvent, 0, len(rows[i:])), rows[i:]...) + } + b.bufferMu.Unlock() + + for i := 0; i < batchSize; i++ { + tableID, resolvedTs := batch[i].tableID, batch[i].resolvedTs + checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, tableID, resolvedTs) + if err != nil { + return false, errors.Trace(err) + } + b.tableCheckpointTsMap.Store(tableID, checkpointTs) + } + elapsed := time.Since(start) + if elapsed > time.Second { + log.Warn("flush row changed events too slow", + zap.Int("batchSize", batchSize), + zap.Duration("duration", elapsed), + contextutil.ZapFieldChangefeed(ctx)) + } + + return true, nil +} + +// Init table sink resources +func (b *bufferSink) Init(tableID model.TableID) error { + b.clearBufferedTableData(tableID) + return b.Sink.Init(tableID) +} + +// Barrier delete buffer +func (b *bufferSink) Barrier(ctx context.Context, tableID model.TableID) error { + b.clearBufferedTableData(tableID) + return b.Sink.Barrier(ctx, tableID) +} + +func (b *bufferSink) clearBufferedTableData(tableID model.TableID) { + b.bufferMu.Lock() + defer b.bufferMu.Unlock() + delete(b.buffer, tableID) + checkpointTs, loaded := b.tableCheckpointTsMap.LoadAndDelete(tableID) + if loaded { + log.Info("clean up table checkpoint ts in buffer sink", + zap.Int64("tableID", tableID), + zap.Uint64("checkpointTs", checkpointTs.(uint64))) + } +} + +func (b *bufferSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + err := b.EmitRowChangedEvents(ctx, rows...) + if err != nil { + return false, err + } + return true, nil +} + +func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if len(rows) == 0 { + return nil + } + tableID := rows[0].Table.TableID + b.bufferMu.Lock() + b.buffer[tableID] = append(b.buffer[tableID], rows...) + b.bufferMu.Unlock() + } + return nil +} + +func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + select { + case <-ctx.Done(): + return b.getTableCheckpointTs(tableID), ctx.Err() + case b.flushTsChan <- flushMsg{ + tableID: tableID, + resolvedTs: resolvedTs, + }: + } + return b.getTableCheckpointTs(tableID), nil +} + +type flushMsg struct { + tableID model.TableID + resolvedTs uint64 +} + +func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 { + checkPoints, ok := b.tableCheckpointTsMap.Load(tableID) + if ok { + return checkPoints.(uint64) + } + return atomic.LoadUint64(&b.changeFeedCheckpointTs) +} + +// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick +func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs) +} diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go new file mode 100644 index 00000000000..9426b6973c7 --- /dev/null +++ b/cdc/sink/mq/mq.go @@ -0,0 +1,527 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "strings" + "sync" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/cdc/sink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/mq/manager" + kafkamanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/kafka" + pulsarmanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/pulsar" + "github.com/pingcap/tiflow/cdc/sink/mq/producer" + "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" + "github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type resolvedTsEvent struct { + tableID model.TableID + resolvedTs model.Ts +} + +const ( + // Depend on this size, `resolvedBuffer` will take + // approximately 2 KiB memory. + defaultResolvedTsEventBufferSize = 128 +) + +type mqSink struct { + mqProducer producer.Producer + eventRouter *dispatcher.EventRouter + encoderBuilder codec.EncoderBuilder + filter *filter.Filter + protocol config.Protocol + + topicManager manager.TopicManager + flushWorker *flushWorker + tableCheckpointTsMap sync.Map + resolvedBuffer chan resolvedTsEvent + + statistics *metrics.Statistics + + role util.Role + id model.ChangeFeedID +} + +func newMqSink( + ctx context.Context, + credential *security.Credential, + topicManager manager.TopicManager, + mqProducer producer.Producer, + filter *filter.Filter, + defaultTopic string, + replicaConfig *config.ReplicaConfig, encoderConfig *codec.Config, + errCh chan error, +) (*mqSink, error) { + encoderBuilder, err := codec.NewEventBatchEncoderBuilder(encoderConfig, credential) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic) + if err != nil { + return nil, errors.Trace(err) + } + + changefeedID := contextutil.ChangefeedIDFromCtx(ctx) + role := contextutil.RoleFromCtx(ctx) + + encoder := encoderBuilder.Build() + statistics := metrics.NewStatistics(ctx, metrics.SinkTypeMQ) + flushWorker := newFlushWorker(encoder, mqProducer, statistics) + + s := &mqSink{ + mqProducer: mqProducer, + eventRouter: eventRouter, + encoderBuilder: encoderBuilder, + filter: filter, + protocol: encoderConfig.Protocol(), + topicManager: topicManager, + flushWorker: flushWorker, + resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize), + statistics: statistics, + role: role, + id: changefeedID, + } + + go func() { + if err := s.run(ctx); err != nil && errors.Cause(err) != context.Canceled { + select { + case <-ctx.Done(): + return + case errCh <- err: + default: + log.Error("error channel is full", zap.Error(err), + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.Any("role", s.role)) + } + } + }() + return s, nil +} + +// TryEmitRowChangedEvents just calls EmitRowChangedEvents internally, +// it still blocking in current implementation. +// TODO(dongmen): We should make this method truly non-blocking after we remove buffer sink +func (k *mqSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + err := k.EmitRowChangedEvents(ctx, rows...) + if err != nil { + return false, err + } + return true, nil +} + +// Init table sink resources +func (k *mqSink) Init(tableID model.TableID) error { + // We need to clean up the old values of the table, + // otherwise when the table is dispatched back again, + // it may read the old values. + // See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382. + if checkpointTs, loaded := k.tableCheckpointTsMap.LoadAndDelete(tableID); loaded { + log.Info("clean up table checkpoint ts in MQ sink", + zap.Int64("tableID", tableID), + zap.Uint64("checkpointTs", checkpointTs.(uint64))) + } + + return nil +} + +func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + rowsCount := 0 + for _, row := range rows { + if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { + log.Info("Row changed event ignored", + zap.Uint64("startTs", row.StartTs), + zap.String("namespace", k.id.Namespace), + zap.String("changefeed", k.id.ID), + zap.Any("role", k.role)) + continue + } + topic := k.eventRouter.GetTopicForRowChange(row) + partitionNum, err := k.topicManager.GetPartitionNum(topic) + if err != nil { + return errors.Trace(err) + } + partition := k.eventRouter.GetPartitionForRowChange(row, partitionNum) + err = k.flushWorker.addEvent(ctx, mqEvent{ + row: row, + key: topicPartitionKey{ + topic: topic, partition: partition, + }, + }) + if err != nil { + return err + } + rowsCount++ + } + k.statistics.AddRowsCount(rowsCount) + return nil +} + +// FlushRowChangedEvents is thread-safety +func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + var checkpointTs uint64 + v, ok := k.tableCheckpointTsMap.Load(tableID) + if ok { + checkpointTs = v.(uint64) + } + if resolvedTs <= checkpointTs { + return checkpointTs, nil + } + select { + case <-ctx.Done(): + return 0, ctx.Err() + case k.resolvedBuffer <- resolvedTsEvent{ + tableID: tableID, + resolvedTs: resolvedTs, + }: + } + k.statistics.PrintStatus(ctx) + return checkpointTs, nil +} + +// bgFlushTs flush resolvedTs to workers and flush the mqProducer +func (k *mqSink) bgFlushTs(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case msg := <-k.resolvedBuffer: + resolvedTs := msg.resolvedTs + err := k.flushTsToWorker(ctx, resolvedTs) + if err != nil { + return errors.Trace(err) + } + // Since CDC does not guarantee exactly once semantic, it won't cause any problem + // here even if the table was moved or removed. + // ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134 + k.tableCheckpointTsMap.Store(msg.tableID, resolvedTs) + } + } +} + +func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error { + if err := k.flushWorker.addEvent(ctx, mqEvent{resolvedTs: resolvedTs}); err != nil { + if errors.Cause(err) != context.Canceled { + log.Warn("failed to flush TS to worker", zap.Error(err)) + } else { + log.Debug("flushing TS to worker has been canceled", zap.Error(err)) + } + return err + } + return nil +} + +func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error { + encoder := k.encoderBuilder.Build() + msg, err := encoder.EncodeCheckpointEvent(ts) + if err != nil { + return errors.Trace(err) + } + if msg == nil { + return nil + } + // NOTICE: When there is no table sync, + // we need to send checkpoint ts to the default topic. T + // This will be compatible with the old behavior. + if len(tables) == 0 { + topic := k.eventRouter.GetDefaultTopic() + partitionNum, err := k.topicManager.GetPartitionNum(topic) + if err != nil { + return errors.Trace(err) + } + log.Debug("emit checkpointTs to default topic", + zap.String("topic", topic), zap.Uint64("checkpointTs", ts)) + err = k.mqProducer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) + return errors.Trace(err) + } + topics := k.eventRouter.GetActiveTopics(tables) + log.Debug("MQ sink current active topics", zap.Any("topics", topics)) + for _, topic := range topics { + partitionNum, err := k.topicManager.GetPartitionNum(topic) + if err != nil { + return errors.Trace(err) + } + log.Debug("emit checkpointTs to active topic", + zap.String("topic", topic), zap.Uint64("checkpointTs", ts)) + err = k.mqProducer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + if k.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Type, ddl.TableInfo.Schema, ddl.TableInfo.Table) { + log.Info( + "DDL event ignored", + zap.String("query", ddl.Query), + zap.Uint64("startTs", ddl.StartTs), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("namespace", k.id.Namespace), + zap.String("changefeed", k.id.ID), + zap.Any("role", k.role), + ) + return cerror.ErrDDLEventIgnored.GenWithStackByArgs() + } + + encoder := k.encoderBuilder.Build() + msg, err := encoder.EncodeDDLEvent(ddl) + if err != nil { + return errors.Trace(err) + } + if msg == nil { + return nil + } + + topic := k.eventRouter.GetTopicForDDL(ddl) + partitionRule := k.eventRouter.GetDLLDispatchRuleByProtocol(k.protocol) + k.statistics.AddDDLCount() + log.Debug("emit ddl event", + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("query", ddl.Query), + zap.String("namespace", k.id.Namespace), + zap.String("changefeed", k.id.ID), + zap.Any("role", k.role)) + if partitionRule == dispatcher.PartitionAll { + partitionNum, err := k.topicManager.GetPartitionNum(topic) + if err != nil { + return errors.Trace(err) + } + err = k.mqProducer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) + return errors.Trace(err) + } + // Notice: We must call GetPartitionNum here, + // which will be responsible for automatically creating topics when they don't exist. + // If it is not called here and kafka has `auto.create.topics.enable` turned on, + // then the auto-created topic will not be created as configured by ticdc. + _, err = k.topicManager.GetPartitionNum(topic) + if err != nil { + return errors.Trace(err) + } + err = k.asyncFlushToPartitionZero(ctx, topic, msg) + return errors.Trace(err) +} + +// Close the producer asynchronously, does not care closed successfully or not. +func (k *mqSink) Close(ctx context.Context) error { + go k.mqProducer.Close() + return nil +} + +func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error { + // Barrier does nothing because FlushRowChangedEvents in mq sink has flushed + // all buffered events by force. + return nil +} + +func (k *mqSink) run(ctx context.Context) error { + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return k.bgFlushTs(ctx) + }) + wg.Go(func() error { + return k.flushWorker.run(ctx) + }) + return wg.Wait() +} + +// asyncFlushToPartitionZero writes message to +// partition zero asynchronously and flush it immediately. +func (k *mqSink) asyncFlushToPartitionZero( + ctx context.Context, topic string, message *codec.MQMessage, +) error { + err := k.mqProducer.AsyncSendMessage(ctx, topic, dispatcher.PartitionZero, message) + if err != nil { + return err + } + return k.mqProducer.Flush(ctx) +} + +// NewKafkaSaramaSink creates a new Kafka mqSink. +func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, + filter *filter.Filter, replicaConfig *config.ReplicaConfig, + opts map[string]string, errCh chan error, +) (*mqSink, error) { + topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool { + return r == '/' + }) + if topic == "" { + return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") + } + + baseConfig := kafka.NewConfig() + if err := baseConfig.Apply(sinkURI); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + if err := replicaConfig.ApplyProtocol(sinkURI).Validate(); err != nil { + return nil, errors.Trace(err) + } + + saramaConfig, err := kafka.NewSaramaConfig(ctx, baseConfig) + if err != nil { + return nil, errors.Trace(err) + } + + adminClient, err := kafka.NewAdminClientImpl(baseConfig.BrokerEndpoints, saramaConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + // we must close adminClient when this func return cause by an error + // otherwise the adminClient will never be closed and lead to an goroutine leak + defer func() { + if err != nil { + adminClient.Close() + } + }() + + if err := kafka.AdjustConfig(adminClient, baseConfig, saramaConfig, topic); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + var protocol config.Protocol + if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + encoderConfig := codec.NewConfig(protocol, contextutil.TimezoneFromCtx(ctx)) + if err := encoderConfig.Apply(sinkURI, opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + // always set encoder's `MaxMessageBytes` equal to producer's `MaxMessageBytes` + // to prevent that the encoder generate batched message too large then cause producer meet `message too large` + encoderConfig = encoderConfig.WithMaxMessageBytes(saramaConfig.Producer.MaxMessageBytes) + + if err := encoderConfig.Validate(); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + client, err := sarama.NewClient(baseConfig.BrokerEndpoints, saramaConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + topicManager := kafkamanager.NewTopicManager( + client, + adminClient, + baseConfig.DeriveTopicConfig(), + ) + if _, err := topicManager.CreateTopic(topic); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) + } + + sProducer, err := kafka.NewKafkaSaramaProducer( + ctx, + client, + adminClient, + baseConfig, + saramaConfig, + errCh, + ) + if err != nil { + return nil, errors.Trace(err) + } + + sink, err := newMqSink( + ctx, + baseConfig.Credential, + topicManager, + sProducer, + filter, + topic, + replicaConfig, + encoderConfig, + errCh, + ) + if err != nil { + return nil, errors.Trace(err) + } + return sink, nil +} + +// NewPulsarSink creates a new Pulsar mqSink. +func NewPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, + replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error, +) (*mqSink, error) { + s := sinkURI.Query().Get(config.ProtocolKey) + if s != "" { + replicaConfig.Sink.Protocol = s + } + err := replicaConfig.Validate() + if err != nil { + return nil, err + } + + var protocol config.Protocol + if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + encoderConfig := codec.NewConfig(protocol, contextutil.TimezoneFromCtx(ctx)) + if err := encoderConfig.Apply(sinkURI, opts); err != nil { + return nil, errors.Trace(err) + } + // todo: set by pulsar producer's `max.message.bytes` + // encoderConfig = encoderConfig.WithMaxMessageBytes() + if err := encoderConfig.Validate(); err != nil { + return nil, errors.Trace(err) + } + + producer, err := pulsar.NewProducer(sinkURI, errCh) + if err != nil { + return nil, errors.Trace(err) + } + // For now, it's a placeholder. Avro format have to make connection to Schema Registry, + // and it may need credential. + credential := &security.Credential{} + fakeTopicManager := pulsarmanager.NewTopicManager( + producer.GetPartitionNum(), + ) + sink, err := newMqSink( + ctx, + credential, + fakeTopicManager, + producer, + filter, + "", + replicaConfig, + encoderConfig, + errCh, + ) + if err != nil { + return nil, errors.Trace(err) + } + return sink, nil +} diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index bd905ff7584..dbff2396560 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -826,6 +826,7 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { } } +<<<<<<< HEAD:cdc/sink/mysql.go // cleanup waits for notification from closedCh and consumes all txns from txnCh. // The exit sequence is // 1. producer(sink.flushRowChangedEvents goroutine) of txnCh exits @@ -843,6 +844,25 @@ func (w *mysqlSinkWorker) cleanup() { return } } +======= +func (s *mysqlSink) cleanTableResource(tableID model.TableID) { + // We need to clean up the old values of the table, + // otherwise when the table is dispatched back again, + // it may read the old values. + // See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382. + if resolvedTs, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded { + log.Info("clean up table max resolved ts in MySQL sink", + zap.Int64("tableID", tableID), + zap.Uint64("resolvedTs", resolvedTs.(uint64))) + } + if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { + log.Info("clean up table checkpoint ts in MySQL sink", + zap.Int64("tableID", tableID), + zap.Uint64("checkpointTs", checkpointTs.(uint64))) + } + // try to remove table txn cache + s.txnCache.RemoveTableTxn(tableID) +>>>>>>> 544aadb0f (sink(ticdc): clean up table checkpoint ts in buffer and MQ sink (#5372)):cdc/sink/mysql/mysql.go } func (s *mysqlSink) Close(ctx context.Context) error {