Skip to content

Commit

Permalink
Merge branch 'master' into add-conf-collation_compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 30, 2021
2 parents 174e00e + 582ce85 commit 7d9909e
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 172 deletions.
67 changes: 61 additions & 6 deletions cdc/processor/pipeline/cyclic_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package pipeline

import (
"container/list"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
Expand All @@ -36,6 +38,9 @@ type cyclicMarkNode struct {
// startTs -> replicaID
currentReplicaIDs map[model.Ts]uint64
currentCommitTs uint64

// todo : remove this flag after table actor is GA
isTableActorMode bool
}

func newCyclicMarkNode(markTableID model.TableID) pipeline.Node {
Expand All @@ -47,12 +52,16 @@ func newCyclicMarkNode(markTableID model.TableID) pipeline.Node {
}

func (n *cyclicMarkNode) Init(ctx pipeline.NodeContext) error {
n.localReplicaID = ctx.ChangefeedVars().Info.Config.Cyclic.ReplicaID
filterReplicaID := ctx.ChangefeedVars().Info.Config.Cyclic.FilterReplicaID
return n.InitTableActor(ctx.ChangefeedVars().Info.Config.Cyclic.ReplicaID, ctx.ChangefeedVars().Info.Config.Cyclic.FilterReplicaID, false)
}

func (n *cyclicMarkNode) InitTableActor(localReplicaID uint64, filterReplicaID []uint64, isTableActorMode bool) error {
n.localReplicaID = localReplicaID
n.filterReplicaID = make(map[uint64]struct{})
for _, rID := range filterReplicaID {
n.filterReplicaID[rID] = struct{}{}
}
n.isTableActorMode = isTableActorMode
// do nothing
return nil
}
Expand All @@ -70,27 +79,36 @@ func (n *cyclicMarkNode) Init(ctx pipeline.NodeContext) error {
// and adds the mark row event or normal row event into the cache.
func (n *cyclicMarkNode) Receive(ctx pipeline.NodeContext) error {
msg := ctx.Message()
_, err := n.TryHandleDataMessage(ctx, msg)
return err
}

func (n *cyclicMarkNode) TryHandleDataMessage(ctx pipeline.NodeContext, msg pipeline.Message) (bool, error) {
// limit the queue size when the table actor mode is enabled
if n.isTableActorMode && ctx.(*cyclicNodeContext).queue.Len() >= defaultSyncResolvedBatch {
return false, nil
}
switch msg.Tp {
case pipeline.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
n.flush(ctx, event.CRTs)
if event.RawKV.OpType == model.OpTypeResolved {
ctx.SendToNextNode(msg)
return nil
return true, nil
}
tableID, err := entry.DecodeTableID(event.RawKV.Key)
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
if tableID == n.markTableID {
n.appendMarkRowEvent(ctx, event)
} else {
n.appendNormalRowEvent(ctx, event)
}
return nil
return true, nil
}
ctx.SendToNextNode(msg)
return nil
return true, nil
}

// appendNormalRowEvent adds the normal row into the cache.
Expand Down Expand Up @@ -174,3 +192,40 @@ func extractReplicaID(markRow *model.RowChangedEvent) uint64 {
log.Panic("bad mark table, " + mark.CyclicReplicaIDCol + " not found")
return 0
}

// cyclicNodeContext implements the NodeContext, cyclicMarkNode can be reused in table actor
// to buffer all messages with a queue, it will not block the actor system
type cyclicNodeContext struct {
*actorNodeContext
queue list.List
}

func NewCyclicNodeContext(ctx *actorNodeContext) *cyclicNodeContext {
return &cyclicNodeContext{
actorNodeContext: ctx,
}
}

// SendToNextNode implement the NodeContext interface, push the message to a queue
// the queue size is limited by TryHandleDataMessage,size is defaultSyncResolvedBatch
func (c *cyclicNodeContext) SendToNextNode(msg pipeline.Message) {
c.queue.PushBack(msg)
}

// Message implements the NodeContext
func (c *cyclicNodeContext) Message() pipeline.Message {
msg := c.tryGetProcessedMessage()
if msg != nil {
return *msg
}
return pipeline.Message{}
}

func (c *cyclicNodeContext) tryGetProcessedMessage() *pipeline.Message {
el := c.queue.Front()
if el == nil {
return nil
}
msg := c.queue.Remove(el).(pipeline.Message)
return &msg
}
92 changes: 77 additions & 15 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@ import (
"context"
"sort"
"sync"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
"github.com/pingcap/tiflow/pkg/pipeline"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type markSuite struct{}

var _ = check.Suite(&markSuite{})

func (s *markSuite) TestCyclicMarkNode(c *check.C) {
defer testleak.AfterTest(c)()
func TestCyclicMarkNode(t *testing.T) {
markTableID := model.TableID(161025)
testCases := []struct {
input []*model.RowChangedEvent
Expand Down Expand Up @@ -145,7 +140,7 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
})
n := newCyclicMarkNode(markTableID)
err := n.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil))
c.Assert(err, check.IsNil)
require.Nil(t, err)
outputCh := make(chan pipeline.Message)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -162,11 +157,11 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
})
event.Row = row
err := n.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.PolymorphicEventMessage(event), outputCh))
c.Assert(err, check.IsNil)
require.Nil(t, err)
lastCommitTs = row.CommitTs
}
err := n.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, lastCommitTs+1)), outputCh))
c.Assert(err, check.IsNil)
require.Nil(t, err)
}()
output := []*model.RowChangedEvent{}
go func() {
Expand All @@ -182,9 +177,77 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
// check the commitTs is increasing
var lastCommitTs model.Ts
for _, row := range output {
c.Assert(row.CommitTs, check.GreaterEqual, lastCommitTs)
require.GreaterOrEqual(t, row.CommitTs, lastCommitTs)
// Ensure that the ReplicaID of the row is set correctly.
require.NotEqual(t, 0, row.ReplicaID)
lastCommitTs = row.CommitTs
}
sort.Slice(output, func(i, j int) bool {
if output[i].CommitTs == output[j].CommitTs {
return output[i].StartTs < output[j].StartTs
}
return output[i].CommitTs < output[j].CommitTs
})
require.Equal(t, tc.expected, output, cmp.Diff(output, tc.expected))
}

// table actor
for _, tc := range testCases {
ctx := NewCyclicNodeContext(NewContext(context.TODO(), nil, 1, &cdcContext.ChangefeedVars{
Info: &model.ChangeFeedInfo{
Config: &config.ReplicaConfig{
Cyclic: &config.CyclicConfig{
Enable: true,
ReplicaID: tc.replicaID,
FilterReplicaID: tc.filterID,
},
},
},
}, nil))
n := newCyclicMarkNode(markTableID).(*cyclicMarkNode)
err := n.Init(ctx)
require.Nil(t, err)
output := []*model.RowChangedEvent{}
putToOutput := func(row *pipeline.Message) {
if row == nil || row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved {
return
}
output = append(output, row.PolymorphicEvent.Row)
}

var lastCommitTs model.Ts
for _, row := range tc.input {
event := model.NewPolymorphicEvent(&model.RawKVEntry{
OpType: model.OpTypePut,
Key: tablecodec.GenTableRecordPrefix(row.Table.TableID),
StartTs: row.StartTs,
CRTs: row.CommitTs,
})
event.Row = row
ok, err := n.TryHandleDataMessage(ctx, pipeline.PolymorphicEventMessage(event))
require.Nil(t, err)
require.True(t, ok)
putToOutput(ctx.tryGetProcessedMessage())
lastCommitTs = row.CommitTs
}
ok, err := n.TryHandleDataMessage(ctx, pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, lastCommitTs+1)))
require.True(t, ok)
putToOutput(ctx.tryGetProcessedMessage())
require.Nil(t, err)
for {
msg := ctx.tryGetProcessedMessage()
if msg == nil {
break
}
putToOutput(msg)
}

// check the commitTs is increasing
lastCommitTs = 0
for _, row := range output {
require.GreaterOrEqual(t, row.CommitTs, lastCommitTs)
// Ensure that the ReplicaID of the row is set correctly.
c.Assert(row.ReplicaID, check.Not(check.Equals), 0)
require.NotEqual(t, 0, row.ReplicaID)
lastCommitTs = row.CommitTs
}
sort.Slice(output, func(i, j int) bool {
Expand All @@ -193,7 +256,6 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
}
return output[i].CommitTs < output[j].CommitTs
})
c.Assert(output, check.DeepEquals, tc.expected,
check.Commentf("%s", cmp.Diff(output, tc.expected)))
require.Equal(t, tc.expected, output, cmp.Diff(output, tc.expected))
}
}
28 changes: 28 additions & 0 deletions cdc/processor/pipeline/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(
m,
goleak.IgnoreTopFunction("github.com/pingcap/tiflow/cdc/sorter/unified.newBackEndPool.func1"),
)
}
Loading

0 comments on commit 7d9909e

Please sign in to comment.