Skip to content

Commit

Permalink
tests(ticdc): Adding tests to the default dispatcher when old value i…
Browse files Browse the repository at this point in the history
…s enabled (#4380)

close #4379
  • Loading branch information
Rustin170506 authored Jan 18, 2022
1 parent 0538d37 commit 3ab3c7d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 59 deletions.
40 changes: 32 additions & 8 deletions cdc/sink/dispatcher/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
package dispatcher

import (
"github.com/pingcap/check"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type DefaultDispatcherSuite struct{}

var _ = check.Suite(&DefaultDispatcherSuite{})
func TestDefaultDispatcher(t *testing.T) {
t.Parallel()

func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
row *model.RowChangedEvent
exceptPartition int32
Expand Down Expand Up @@ -197,6 +195,32 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) {
}
p := newDefaultDispatcher(16, false)
for _, tc := range testCases {
c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition)
require.Equal(t, tc.exceptPartition, p.Dispatch(tc.row))
}
}

func TestDefaultDispatcherWithOldValue(t *testing.T) {
t.Parallel()

row := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t3",
},
Columns: []*model.Column{
{
Name: "id",
Value: 2,
Flag: model.HandleKeyFlag | model.PrimaryKeyFlag,
}, {
Name: "a",
Value: 3,
Flag: model.UniqueKeyFlag,
},
},
IndexColumns: [][]int{{0}, {1}},
}

p := newDefaultDispatcher(16, true)
require.Equal(t, int32(3), p.Dispatch(row))
}
14 changes: 6 additions & 8 deletions cdc/sink/dispatcher/index_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
package dispatcher

import (
"github.com/pingcap/check"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type IndexValueDispatcherSuite struct{}

var _ = check.Suite(&IndexValueDispatcherSuite{})
func TestIndexValueDispatcher(t *testing.T) {
t.Parallel()

func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
row *model.RowChangedEvent
exceptPartition int32
Expand Down Expand Up @@ -151,6 +149,6 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) {
}
p := newIndexValueDispatcher(16)
for _, tc := range testCases {
c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition)
require.Equal(t, tc.exceptPartition, p.Dispatch(tc.row))
}
}
54 changes: 29 additions & 25 deletions cdc/sink/dispatcher/switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,66 +14,70 @@
package dispatcher

import (
"github.com/pingcap/check"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type SwitcherSuite struct{}

var _ = check.Suite(&SwitcherSuite{})
func TestSwitcher(t *testing.T) {
t.Parallel()

func (s SwitcherSuite) TestSwitcher(c *check.C) {
defer testleak.AfterTest(c)()
d, err := NewDispatcher(config.GetDefaultReplicaConfig(), 4)
c.Assert(err, check.IsNil)
c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
require.Nil(t, err)
require.IsType(t, &defaultDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "test", Table: "test",
},
}), check.FitsTypeOf, &defaultDispatcher{})
}))

d, err = NewDispatcher(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default.*"}, Dispatcher: "default"},
{Matcher: []string{"test_default1.*"}, Dispatcher: "default"},
{Matcher: []string{"test_default2.*"}, Dispatcher: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, Dispatcher: "table"},
{Matcher: []string{"test_index_value.*"}, Dispatcher: "index-value"},
{Matcher: []string{"test.*"}, Dispatcher: "rowid"},
{Matcher: []string{"*.*", "!*.test"}, Dispatcher: "ts"},
},
},
}, 4)
c.Assert(err, check.IsNil)
c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
require.Nil(t, err)
require.IsType(t, &indexValueDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "test", Table: "table1",
},
}), check.FitsTypeOf, &indexValueDispatcher{})
c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
}))
require.IsType(t, &tsDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "sbs", Table: "table2",
},
}), check.FitsTypeOf, &tsDispatcher{})
c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
}))
require.IsType(t, &defaultDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "sbs", Table: "test",
},
}), check.FitsTypeOf, &defaultDispatcher{})
c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
}))
require.IsType(t, &defaultDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "test_default1", Table: "test",
},
}))
require.IsType(t, &defaultDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "test_default", Table: "test",
Schema: "test_default2", Table: "test",
},
}), check.FitsTypeOf, &defaultDispatcher{})
c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
}))
require.IsType(t, &tableDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "test_table", Table: "test",
},
}), check.FitsTypeOf, &tableDispatcher{})
c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
}))
require.IsType(t, &indexValueDispatcher{}, d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{
Table: &model.TableName{
Schema: "test_index_value", Table: "test",
},
}), check.FitsTypeOf, &indexValueDispatcher{})
}))
}
14 changes: 6 additions & 8 deletions cdc/sink/dispatcher/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
package dispatcher

import (
"github.com/pingcap/check"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type TableDispatcherSuite struct{}

var _ = check.Suite(&TableDispatcherSuite{})
func TestTableDispatcher(t *testing.T) {
t.Parallel()

func (s TableDispatcherSuite) TestTableDispatcher(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
row *model.RowChangedEvent
exceptPartition int32
Expand Down Expand Up @@ -81,6 +79,6 @@ func (s TableDispatcherSuite) TestTableDispatcher(c *check.C) {
}
p := newTableDispatcher(16)
for _, tc := range testCases {
c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition)
require.Equal(t, tc.exceptPartition, p.Dispatch(tc.row))
}
}
14 changes: 4 additions & 10 deletions cdc/sink/dispatcher/ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@ package dispatcher
import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

func Test(t *testing.T) { check.TestingT(t) }
func TestTsDispatcher(t *testing.T) {
t.Parallel()

type TsDispatcherSuite struct{}

var _ = check.Suite(&TsDispatcherSuite{})

func (s TsDispatcherSuite) TestTsDispatcher(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
row *model.RowChangedEvent
exceptPartition int32
Expand Down Expand Up @@ -78,6 +72,6 @@ func (s TsDispatcherSuite) TestTsDispatcher(c *check.C) {
}
p := &tsDispatcher{partitionNum: 16}
for _, tc := range testCases {
c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition)
require.Equal(t, tc.exceptPartition, p.Dispatch(tc.row))
}
}

0 comments on commit 3ab3c7d

Please sign in to comment.