Skip to content

Commit

Permalink
test: Add unit test for cloudstorage sink (#1063)
Browse files Browse the repository at this point in the history
ref #758
  • Loading branch information
wk989898 authored Mar 7, 2025
1 parent 56fa563 commit b6a9c66
Show file tree
Hide file tree
Showing 14 changed files with 1,337 additions and 402 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ P=3

# The following packages are used in unit tests.
# Add new packages here if you want to include them in unit tests.
UT_PACKAGES_DISPATCHER := ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/worker/... ./pkg/sink/codec/open/... ./pkg/sink/codec/canal/...
UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/worker/... ./downstreamadapter/worker/writer/... ./pkg/sink/codec/open/... ./pkg/sink/codec/csv/... ./pkg/sink/codec/canal/...
UT_PACKAGES_MAINTAINER := ./maintainer/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_LOGSERVICE := ./logservice/...
Expand Down
98 changes: 92 additions & 6 deletions downstreamadapter/sink/cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,105 @@ package sink

import (
"context"
"fmt"
"net/url"
"testing"
"time"

"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/stretchr/testify/require"
)

func newCloudStorageSinkForTest() (*CloudStorageSink, error) {
func newCloudStorageSinkForTest(parentDir string) (*CloudStorageSink, error) {
ctx := context.Background()
mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
changefeedID := common.NewChangefeedID4Test("test", "test")
// csvProtocol := "csv-protocol"
// sinkConfig := &config.SinkConfig{Protocol: &csvProtocol}

sink := &CloudStorageSink{
changefeedID: changefeedID,
csvProtocol := "csv"
sinkConfig := &config.SinkConfig{Protocol: &csvProtocol}
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
if err != nil {
return nil, err
}
sink, err := newCloudStorageSink(ctx, changefeedID, sinkURI, sinkConfig, nil)
if err != nil {
return nil, err
}
go sink.Run(ctx)
return sink, nil
}

func TestCloudStorageSinkBasicFunctionality(t *testing.T) {
sink, err := newCloudStorageSinkForTest(t.TempDir())
require.NoError(t, err)

count.Store(0)

helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
createTableSQL := "create table t (id int primary key, name varchar(32));"
job := helper.DDL2Job(createTableSQL)
require.NotNil(t, job)
helper.ApplyJob(job)

tableInfo := helper.GetTableInfo(job)

ddlEvent := &commonEvent.DDLEvent{
Query: job.Query,
SchemaName: job.SchemaName,
TableName: job.TableName,
FinishedTs: 1,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
TableInfo: tableInfo,
NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}},
PostTxnFlushed: []func(){
func() { count.Add(1) },
},
}

ddlEvent2 := &commonEvent.DDLEvent{
Query: job.Query,
SchemaName: job.SchemaName,
TableName: job.TableName,
FinishedTs: 4,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
TableInfo: tableInfo,
NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}},
PostTxnFlushed: []func(){
func() { count.Add(1) },
},
}

dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')", "insert into t values (2, 'test2');")
dmlEvent.PostTxnFlushed = []func(){
func() {
count.Add(1)
},
}
dmlEvent.TableInfoVersion = 1

err = sink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

err = sink.AddDMLEvent(dmlEvent)
require.NoError(t, err)

time.Sleep(5 * time.Second)

sink.PassBlockEvent(ddlEvent2)

require.Equal(t, count.Load(), int64(3))
}
175 changes: 175 additions & 0 deletions downstreamadapter/worker/cloudstorage_ddl_worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
"context"
"fmt"
"net/url"
"os"
"path"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/util"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/stretchr/testify/require"
)

func newCloudStorageDDLWorkerForTest(parentDir string) (*CloudStorageDDLWorker, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
if err != nil {
return nil, err
}
replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
if err != nil {
return nil, err
}
changefeedID := common.NewChangefeedID4Test("test", "test")

csvProtocol := "csv"
sinkConfig := &config.SinkConfig{Protocol: &csvProtocol}
cfg := cloudstorage.NewConfig()
err = cfg.Apply(ctx, sinkURI, sinkConfig)
if err != nil {
return nil, err
}
storage, err := helper.GetExternalStorageFromURI(ctx, sinkURI.String())
if err != nil {
return nil, err
}
sink := NewCloudStorageDDLWorker(changefeedID, sinkURI, cfg, nil, storage, metrics.NewStatistics(changefeedID, "CloudStorageSink"))
go sink.Run(ctx)
return sink, nil
}

func TestCloudStorageWriteDDLEvent(t *testing.T) {
parentDir := t.TempDir()
sink, err := newCloudStorageDDLWorkerForTest(parentDir)
require.NoError(t, err)

tableInfo := common.WrapTableInfo(100, "test", &timodel.TableInfo{
ID: 20,
Name: pmodel.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: pmodel.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
{
Name: pmodel.NewCIStr("col2"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
})
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 add col2 varchar(64)",
Type: byte(timodel.ActionAddColumn),
SchemaName: "test",
TableName: "table1",
FinishedTs: 100,
TableInfo: tableInfo,
}

tableDir := path.Join(parentDir, "test/table1/meta/")
err = sink.WriteBlockEvent(ddlEvent)
require.Nil(t, err)

tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
require.Nil(t, err)
require.JSONEq(t, `{
"Table": "table1",
"Schema": "test",
"Version": 1,
"TableVersion": 100,
"Query": "alter table test.table1 add col2 varchar(64)",
"Type": 5,
"TableColumns": [
{
"ColumnName": "col1",
"ColumnType": "INT",
"ColumnPrecision": "11"
},
{
"ColumnName": "col2",
"ColumnType": "VARCHAR",
"ColumnPrecision": "5"
}
],
"TableColumnsTotal": 2
}`, string(tableSchema))
}

func TestCloudStorageWriteCheckpointTs(t *testing.T) {
parentDir := t.TempDir()
sink, err := newCloudStorageDDLWorkerForTest(parentDir)
require.NoError(t, err)

time.Sleep(3 * time.Second)
sink.AddCheckpointTs(100)
metadata, err := os.ReadFile(path.Join(parentDir, "metadata"))
require.Nil(t, err)
require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata))
}

func TestCleanupExpiredFiles(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{
FileExpirationDays: util.AddressOf(1),
FileCleanupCronSpec: util.AddressOf("* * * * * *"),
}
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.Nil(t, err)

cnt := atomic.Int64{}
cleanupJobs := []func(){
func() {
cnt.Add(1)
},
}
changefeedID := common.NewChangefeedID4Test("test", "test")
cfg := cloudstorage.NewConfig()
err = cfg.Apply(ctx, sinkURI, replicaConfig.Sink)
require.Nil(t, err)
storage, err := helper.GetExternalStorageFromURI(ctx, sinkURI.String())
require.Nil(t, err)

sink := NewCloudStorageDDLWorker(changefeedID, sinkURI, cfg, cleanupJobs, storage, metrics.NewStatistics(changefeedID, "CloudStorageSink"))
go sink.Run(ctx)
require.Nil(t, err)

_ = sink
time.Sleep(5 * time.Second)
require.LessOrEqual(t, int64(1), cnt.Load())
}
Loading

0 comments on commit b6a9c66

Please sign in to comment.