Skip to content

Commit

Permalink
consumer(ticdc): kafka consumer support specify the consumer group id (
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Feb 28, 2024
1 parent 03fde5e commit 285353c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 172 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
})

start := time.Now()
log.Info("Start exec DDL", zap.Any("DDL", ddl), zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID))
log.Info("Start exec DDL", zap.String("DDL", ddl.Query), zap.Uint64("commitTs", ddl.CommitTs),
zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID))
tx, err := m.db.BeginTx(ctx, nil)
if err != nil {
return err
Expand Down
177 changes: 7 additions & 170 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"os"
"os/signal"
"runtime/debug"
"sort"
"strconv"
"strings"
Expand All @@ -34,7 +35,6 @@ import (
"time"

"github.com/IBM/sarama"
"github.com/edwingeng/deque"
"github.com/google/uuid"
cerror "github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -65,7 +65,6 @@ import (

func newConsumerOption() *consumerOption {
return &consumerOption{
groupID: fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()),
version: "2.4.0",

maxMessageBytes: math.MaxInt64,
Expand Down Expand Up @@ -112,11 +111,6 @@ func (o *consumerOption) Adjust(upstreamURI *url.URL, configFile string) error {
o.version = s
}

s = upstreamURI.Query().Get("consumer-group-id")
if s != "" {
o.groupID = s
}

o.topic = strings.TrimFunc(upstreamURI.Path, func(r rune) bool {
return r == '/'
})
Expand Down Expand Up @@ -202,20 +196,24 @@ func (o *consumerOption) Adjust(upstreamURI *url.URL, configFile string) error {
}

func main() {
debug.SetMemoryLimit(14 * 1024 * 1024 * 1024)

consumerOption := newConsumerOption()

var (
upstreamURIStr string
configFile string
)

groupID := fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String())

flag.StringVar(&configFile, "config", "", "config file for changefeed")

flag.StringVar(&upstreamURIStr, "upstream-uri", "", "Kafka uri")
flag.StringVar(&consumerOption.downstreamURI, "downstream-uri", "", "downstream sink uri")
flag.StringVar(&consumerOption.schemaRegistryURI, "schema-registry-uri", "", "schema registry uri")
flag.StringVar(&consumerOption.upstreamTiDBDSN, "upstream-tidb-dsn", "", "upstream TiDB DSN")

flag.StringVar(&consumerOption.groupID, "consumer-group-id", groupID, "consumer group id")
flag.StringVar(&consumerOption.logPath, "log-file", "cdc_kafka_consumer.log", "log file path")
flag.StringVar(&consumerOption.logLevel, "log-level", "info", "log file path")
flag.StringVar(&consumerOption.timezone, "tz", "System", "Specify time zone of Kafka consumer")
Expand Down Expand Up @@ -414,8 +412,6 @@ type partitionSinks struct {
tableSinksMap sync.Map
// resolvedTs record the maximum timestamp of the received event
resolvedTs uint64

flowController *flowController
}

// Consumer represents a Sarama consumer group consumer
Expand Down Expand Up @@ -445,10 +441,6 @@ type Consumer struct {
upstreamTiDB *sql.DB
}

const (
defaultMemoryQuotaInBytes = 2 * 1024 * 1024 * 1024 // 2GB
)

// NewConsumer creates a new cdc kafka consumer
func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) {
c := new(Consumer)
Expand Down Expand Up @@ -483,15 +475,9 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) {
ctx, cancel := context.WithCancel(ctx)
errChan := make(chan error, 1)

memoryQuotaPerPartition := defaultMemoryQuotaInBytes / int(o.partitionNum)
for i := 0; i < int(o.partitionNum); i++ {
c.sinks[i] = &partitionSinks{
flowController: newFlowController(uint64(memoryQuotaPerPartition)),
}
c.sinks[i] = &partitionSinks{}
}
log.Info("flow controller created for each partition",
zap.Int32("partitionNum", o.partitionNum),
zap.Int("quota", memoryQuotaPerPartition))

changefeedID := model.DefaultChangeFeedID("kafka-consumer")
f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil)
Expand Down Expand Up @@ -701,16 +687,6 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
group.Append(row)
// todo: mark the offset after the DDL is fully synced to the downstream mysql.
session.MarkMessage(message, "")

size := uint64(row.ApproximateBytes())
err = sink.flowController.consume(row.CommitTs, size)
if err != nil {
if errors.Is(err, errFlowControllerAborted) {
log.Info("flow control aborted")
return nil
}
return cerror.Trace(err)
}
case model.MessageTypeResolved:
ts, err := decoder.NextResolvedEvent()
if err != nil {
Expand Down Expand Up @@ -922,7 +898,6 @@ func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSinks, resolv
if !checkpoint.EqualOrGreater(resolvedTs) {
flushedResolvedTs = false
}
sink.flowController.release(checkpoint.Ts)
return true
})
if flushedResolvedTs {
Expand Down Expand Up @@ -972,141 +947,3 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
log.Info("open db success", zap.String("dsn", dsn))
return db, nil
}

var (
errFlowControllerLargerThanQuota = errors.New("flow controller request memory larger than quota")
errFlowControllerAborted = errors.New("flow controller aborted")
)

type memoryQuota struct {
quota uint64 // should not be changed once initialized

isAborted atomic.Bool

consumed struct {
sync.Mutex
bytes uint64
}

consumedCond *sync.Cond
}

// newMemoryQuota creates a new memoryQuota
// quota: max advised memory consumption in bytes.
func newMemoryQuota(quota uint64) *memoryQuota {
ret := &memoryQuota{
quota: quota,
}

ret.consumedCond = sync.NewCond(&ret.consumed)
return ret
}

// consumeWithBlocking is called when a hard-limit is needed. The method will
// block until enough memory has been freed up by release.
// blockCallBack will be called if the function will block.
// Should be used with care to prevent deadlock.
func (c *memoryQuota) consumeWithBlocking(nBytes uint64) error {
if nBytes >= c.quota {
return errFlowControllerLargerThanQuota
}

c.consumed.Lock()
defer c.consumed.Unlock()

for {
if c.isAborted.Load() {
return errFlowControllerAborted
}

newConsumed := c.consumed.bytes + nBytes
if newConsumed < c.quota {
break
}
c.consumedCond.Wait()
}

c.consumed.bytes += nBytes
return nil
}

// release is called when a chuck of memory is done being used.
func (c *memoryQuota) release(nBytes uint64) {
c.consumed.Lock()

if c.consumed.bytes < nBytes {
c.consumed.Unlock()
log.Panic("memoryQuota: releasing more than consumed, report a bug",
zap.Uint64("consumed", c.consumed.bytes),
zap.Uint64("released", nBytes))
}

c.consumed.bytes -= nBytes
if c.consumed.bytes < c.quota {
c.consumed.Unlock()
c.consumedCond.Signal()
return
}

c.consumed.Unlock()
}

type flowController struct {
memoryQuota *memoryQuota

queueMu struct {
sync.Mutex
queue deque.Deque
}
}

type entry struct {
commitTs uint64
size uint64
}

func newFlowController(quota uint64) *flowController {
return &flowController{
memoryQuota: newMemoryQuota(quota),
queueMu: struct {
sync.Mutex
queue deque.Deque
}{
queue: deque.NewDeque(),
},
}
}

func (c *flowController) consume(commitTs uint64, size uint64) error {
err := c.memoryQuota.consumeWithBlocking(size)
if err != nil {
return cerror.Trace(err)
}

c.queueMu.Lock()
defer c.queueMu.Unlock()

c.queueMu.queue.PushBack(&entry{
commitTs: commitTs,
size: size,
})

return nil
}

func (c *flowController) release(resolvedTs uint64) {
var nBytesToRelease uint64

c.queueMu.Lock()
for c.queueMu.queue.Len() > 0 {
if peeked := c.queueMu.queue.Front().(*entry); peeked.commitTs <= resolvedTs {
nBytesToRelease += peeked.size
c.queueMu.queue.PopFront()
} else {
break
}
}
c.queueMu.Unlock()

c.memoryQuota.release(nBytesToRelease)
}

0 comments on commit 285353c

Please sign in to comment.