From 29be3ccba48e78c860523557f1937a00b2ce3e88 Mon Sep 17 00:00:00 2001 From: satoru Date: Mon, 1 Apr 2019 11:24:45 +0800 Subject: [PATCH] Add more unit tests (#506) * Add more unit tests * Finetune tests according to review comments --- pkg/etcd/etcd_test.go | 27 ++++++++ pkg/node/registry_test.go | 25 +++++++ pump/server.go | 2 +- pump/server_test.go | 134 +++++++++++++++++++++++++++++++++++++- 4 files changed, 185 insertions(+), 3 deletions(-) diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 048e1f728..d7191a3e4 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -169,3 +169,30 @@ func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3) etcd := NewClient(cluster.RandClient(), "binlog") return context.Background(), etcd, cluster } + +type parseToDirTreeSuite struct{} + +var _ = Suite(&parseToDirTreeSuite{}) + +func (s *parseToDirTreeSuite) TestReturnFound(c *C) { + root := &Node{ + Childs: map[string]*Node{ + "drainer": &Node{ + Childs: map[string]*Node{ + "1": &Node{Value: []byte("alive")}, + }, + }, + }, + } + target := parseToDirTree(root, "drainer/1") + c.Assert(target.Value, BytesEquals, []byte("alive")) +} + +func (s *parseToDirTreeSuite) TestCreateNew(c *C) { + root := new(Node) + target := parseToDirTree(root, "drainer/mysql/42") + target.Value = []byte("hello") + c.Assert(len(root.Childs), Equals, 1) + stored := root.Childs["drainer"].Childs["mysql"].Childs["42"] + c.Assert(string(stored.Value), Equals, "hello") +} diff --git a/pkg/node/registry_test.go b/pkg/node/registry_test.go index 159f3ca0e..828f3a706 100644 --- a/pkg/node/registry_test.go +++ b/pkg/node/registry_test.go @@ -107,3 +107,28 @@ func mustEqualStatus(c *C, r RegisrerTestClient, nodeID string, status *Status) c.Assert(err, IsNil) c.Assert(ns, DeepEquals, status) } + +type checkNodeExistsSuite struct{} +var _ = Suite(&checkNodeExistsSuite{}) + +func (s *checkNodeExistsSuite) TestNotExist(c *C) { + etcdclient := etcd.NewClient(testEtcdCluster.RandClient(), DefaultRootPath) + r := NewEtcdRegistry(etcdclient, time.Duration(5)*time.Second) + + exist, err := r.checkNodeExists(context.Background(), "drainer", "404") + c.Assert(err, IsNil) + c.Assert(exist, IsFalse) +} + +func (s *checkNodeExistsSuite) TestExist(c *C) { + etcdclient := etcd.NewClient(testEtcdCluster.RandClient(), DefaultRootPath) + r := NewEtcdRegistry(etcdclient, time.Duration(5)*time.Second) + + ctx := context.Background() + if err := r.client.Create(ctx, "/tidb-binlog/v1", "pump", nil); err != nil { + c.Fatal("Can't create node for testing") + } + exist, err := r.checkNodeExists(ctx, "/tidb-binlog", "v1") + c.Assert(err, IsNil) + c.Assert(exist, IsTrue) +} diff --git a/pump/server.go b/pump/server.go index 670c1d078..518d2e405 100644 --- a/pump/server.go +++ b/pump/server.go @@ -155,7 +155,7 @@ func NewServer(cfg *Config) (*Server, error) { // WriteBinlog implements the gRPC interface of pump server func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) { - // pump client will write some empty Payload to detect weather pump is working, should avoid this + // pump client will write some empty Payload to detect whether pump is working, should avoid this if in.Payload == nil { ret := new(binlog.WriteBinlogResp) return ret, nil diff --git a/pump/server_test.go b/pump/server_test.go index d342eb58c..44c4c4095 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -1,10 +1,16 @@ package pump import ( + "fmt" "testing" "github.com/coreos/etcd/integration" . "github.com/pingcap/check" + "github.com/pingcap/tidb-binlog/pkg/node" + "github.com/pingcap/tipb/go-binlog" + pb "github.com/pingcap/tipb/go-binlog" + "golang.org/x/net/context" + "google.golang.org/grpc" ) var testEtcdCluster *integration.ClusterV3 @@ -16,6 +22,130 @@ func TestPump(t *testing.T) { TestingT(t) } -var _ = Suite(&testPumpServerSuite{}) +type writeBinlogSuite struct{} -type testPumpServerSuite struct{} +var _ = Suite(&writeBinlogSuite{}) + +func (s *writeBinlogSuite) TestIgnoreEmptyRequest(c *C) { + server := &Server{} + resp, err := server.WriteBinlog(context.Background(), &binlog.WriteBinlogReq{}) + c.Assert(resp, NotNil) + c.Assert(err, IsNil) + c.Assert(server.writeBinlogCount, Equals, int64(0)) +} + +func (s *writeBinlogSuite) TestReturnErrIfClusterIDMismatched(c *C) { + server := &Server{clusterID: 42} + req := &binlog.WriteBinlogReq{} + req.ClusterID = 53 + resp, err := server.writeBinlog(context.Background(), req, false) + c.Assert(resp, IsNil) + c.Assert(err, ErrorMatches, ".*mismatch.*") +} + +func (s *writeBinlogSuite) TestIgnoreReqWithInvalidPayload(c *C) { + server := &Server{clusterID: 42} + req := &binlog.WriteBinlogReq{ClusterID: 42, Payload: []byte("invalid")} + resp, err := server.writeBinlog(context.Background(), req, false) + c.Assert(resp.Errmsg, Equals, "unexpected EOF") + c.Assert(err, NotNil) +} + +type fakeNode struct{} + +func (n *fakeNode) ID() string { return "fakenode-long" } +func (n *fakeNode) ShortID() string { return "fakenode" } +func (n *fakeNode) RefreshStatus(ctx context.Context, status *node.Status) error { return nil } +func (n *fakeNode) Heartbeat(ctx context.Context) <-chan error { return make(chan error) } +func (n *fakeNode) Notify(ctx context.Context) error { return nil } +func (n *fakeNode) NodeStatus() *node.Status { return &node.Status{State: node.Paused} } +func (n *fakeNode) NodesStatus(ctx context.Context) ([]*node.Status, error) { + return []*node.Status{}, nil +} +func (n *fakeNode) Quit() error { return nil } + +func (s *writeBinlogSuite) TestDetectNoOnline(c *C) { + server := &Server{clusterID: 42, node: &fakeNode{}} + + log := new(binlog.Binlog) + data, err := log.Marshal() + if err != nil { + c.Fatal("Fail to marshal binlog") + } + req := &binlog.WriteBinlogReq{ClusterID: 42, Payload: data} + _, err = server.writeBinlog(context.Background(), req, false) + c.Assert(err, ErrorMatches, ".*no online.*") +} + +type pullBinlogsSuite struct{} + +var _ = Suite(&pullBinlogsSuite{}) + +type fakePullBinlogsServer struct { + grpc.ServerStream + ctx context.Context + sent []*binlog.PullBinlogResp +} + +func newFakePullBinlogsServer() *fakePullBinlogsServer { + return &fakePullBinlogsServer{ + ctx: context.Background(), + sent: []*binlog.PullBinlogResp{}, + } +} + +func (x *fakePullBinlogsServer) Context() context.Context { return x.ctx } +func (x *fakePullBinlogsServer) Send(m *binlog.PullBinlogResp) error { + x.sent = append(x.sent, m) + return nil +} + +func (s *pullBinlogsSuite) TestReturnErrIfClusterIDMismatched(c *C) { + server := &Server{clusterID: 42} + req := &binlog.PullBinlogReq{ClusterID: 43} + err := server.PullBinlogs(req, newFakePullBinlogsServer()) + c.Assert(err, ErrorMatches, ".*mismatch.*") +} + +type fakeStorage struct{} + +func (s *fakeStorage) WriteBinlog(binlog *pb.Binlog) error { return nil } +func (s *fakeStorage) GCTS(ts int64) {} +func (s *fakeStorage) MaxCommitTS() int64 { return 0 } +func (s *fakeStorage) GetBinlog(ts int64) (*binlog.Binlog, error) { return nil, nil } +func (s *fakeStorage) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte { + return make(chan []byte) +} +func (s *fakeStorage) Close() error { return nil } + +type fakePullable struct{ fakeStorage } + +func (s *fakePullable) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte { + chl := make(chan []byte) + go func() { + for i := 0; i < 3; i++ { + chl <- []byte(fmt.Sprintf("payload_%d", i)) + } + close(chl) + }() + return chl +} + +func (s *pullBinlogsSuite) TestPullBinlogFromStorage(c *C) { + ctx := context.Background() + server := &Server{clusterID: 42, storage: &fakePullable{}, ctx: ctx} + req := &binlog.PullBinlogReq{ + ClusterID: 42, + StartFrom: binlog.Pos{ + Suffix: 1, + Offset: 97, + }, + } + stream := newFakePullBinlogsServer() + err := server.PullBinlogs(req, stream) + c.Assert(err, IsNil) + c.Assert(stream.sent, HasLen, 3) + for i, resp := range stream.sent { + c.Assert(string(resp.Entity.Payload), Equals, fmt.Sprintf("payload_%d", i)) + } +}