Skip to content

Commit

Permalink
Add more unit tests (#506)
Browse files Browse the repository at this point in the history
* Add more unit tests

* Finetune tests according to review comments
  • Loading branch information
suzaku authored Apr 1, 2019
1 parent 87a8d8c commit 29be3cc
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 3 deletions.
27 changes: 27 additions & 0 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,30 @@ func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3)
etcd := NewClient(cluster.RandClient(), "binlog")
return context.Background(), etcd, cluster
}

type parseToDirTreeSuite struct{}

var _ = Suite(&parseToDirTreeSuite{})

func (s *parseToDirTreeSuite) TestReturnFound(c *C) {
root := &Node{
Childs: map[string]*Node{
"drainer": &Node{
Childs: map[string]*Node{
"1": &Node{Value: []byte("alive")},
},
},
},
}
target := parseToDirTree(root, "drainer/1")
c.Assert(target.Value, BytesEquals, []byte("alive"))
}

func (s *parseToDirTreeSuite) TestCreateNew(c *C) {
root := new(Node)
target := parseToDirTree(root, "drainer/mysql/42")
target.Value = []byte("hello")
c.Assert(len(root.Childs), Equals, 1)
stored := root.Childs["drainer"].Childs["mysql"].Childs["42"]
c.Assert(string(stored.Value), Equals, "hello")
}
25 changes: 25 additions & 0 deletions pkg/node/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,28 @@ func mustEqualStatus(c *C, r RegisrerTestClient, nodeID string, status *Status)
c.Assert(err, IsNil)
c.Assert(ns, DeepEquals, status)
}

type checkNodeExistsSuite struct{}
var _ = Suite(&checkNodeExistsSuite{})

func (s *checkNodeExistsSuite) TestNotExist(c *C) {
etcdclient := etcd.NewClient(testEtcdCluster.RandClient(), DefaultRootPath)
r := NewEtcdRegistry(etcdclient, time.Duration(5)*time.Second)

exist, err := r.checkNodeExists(context.Background(), "drainer", "404")
c.Assert(err, IsNil)
c.Assert(exist, IsFalse)
}

func (s *checkNodeExistsSuite) TestExist(c *C) {
etcdclient := etcd.NewClient(testEtcdCluster.RandClient(), DefaultRootPath)
r := NewEtcdRegistry(etcdclient, time.Duration(5)*time.Second)

ctx := context.Background()
if err := r.client.Create(ctx, "/tidb-binlog/v1", "pump", nil); err != nil {
c.Fatal("Can't create node for testing")
}
exist, err := r.checkNodeExists(ctx, "/tidb-binlog", "v1")
c.Assert(err, IsNil)
c.Assert(exist, IsTrue)
}
2 changes: 1 addition & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func NewServer(cfg *Config) (*Server, error) {

// WriteBinlog implements the gRPC interface of pump server
func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) {
// pump client will write some empty Payload to detect weather pump is working, should avoid this
// pump client will write some empty Payload to detect whether pump is working, should avoid this
if in.Payload == nil {
ret := new(binlog.WriteBinlogResp)
return ret, nil
Expand Down
134 changes: 132 additions & 2 deletions pump/server_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package pump

import (
"fmt"
"testing"

"github.com/coreos/etcd/integration"
. "github.com/pingcap/check"
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tipb/go-binlog"
pb "github.com/pingcap/tipb/go-binlog"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

var testEtcdCluster *integration.ClusterV3
Expand All @@ -16,6 +22,130 @@ func TestPump(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testPumpServerSuite{})
type writeBinlogSuite struct{}

type testPumpServerSuite struct{}
var _ = Suite(&writeBinlogSuite{})

func (s *writeBinlogSuite) TestIgnoreEmptyRequest(c *C) {
server := &Server{}
resp, err := server.WriteBinlog(context.Background(), &binlog.WriteBinlogReq{})
c.Assert(resp, NotNil)
c.Assert(err, IsNil)
c.Assert(server.writeBinlogCount, Equals, int64(0))
}

func (s *writeBinlogSuite) TestReturnErrIfClusterIDMismatched(c *C) {
server := &Server{clusterID: 42}
req := &binlog.WriteBinlogReq{}
req.ClusterID = 53
resp, err := server.writeBinlog(context.Background(), req, false)
c.Assert(resp, IsNil)
c.Assert(err, ErrorMatches, ".*mismatch.*")
}

func (s *writeBinlogSuite) TestIgnoreReqWithInvalidPayload(c *C) {
server := &Server{clusterID: 42}
req := &binlog.WriteBinlogReq{ClusterID: 42, Payload: []byte("invalid")}
resp, err := server.writeBinlog(context.Background(), req, false)
c.Assert(resp.Errmsg, Equals, "unexpected EOF")
c.Assert(err, NotNil)
}

type fakeNode struct{}

func (n *fakeNode) ID() string { return "fakenode-long" }
func (n *fakeNode) ShortID() string { return "fakenode" }
func (n *fakeNode) RefreshStatus(ctx context.Context, status *node.Status) error { return nil }
func (n *fakeNode) Heartbeat(ctx context.Context) <-chan error { return make(chan error) }
func (n *fakeNode) Notify(ctx context.Context) error { return nil }
func (n *fakeNode) NodeStatus() *node.Status { return &node.Status{State: node.Paused} }
func (n *fakeNode) NodesStatus(ctx context.Context) ([]*node.Status, error) {
return []*node.Status{}, nil
}
func (n *fakeNode) Quit() error { return nil }

func (s *writeBinlogSuite) TestDetectNoOnline(c *C) {
server := &Server{clusterID: 42, node: &fakeNode{}}

log := new(binlog.Binlog)
data, err := log.Marshal()
if err != nil {
c.Fatal("Fail to marshal binlog")
}
req := &binlog.WriteBinlogReq{ClusterID: 42, Payload: data}
_, err = server.writeBinlog(context.Background(), req, false)
c.Assert(err, ErrorMatches, ".*no online.*")
}

type pullBinlogsSuite struct{}

var _ = Suite(&pullBinlogsSuite{})

type fakePullBinlogsServer struct {
grpc.ServerStream
ctx context.Context
sent []*binlog.PullBinlogResp
}

func newFakePullBinlogsServer() *fakePullBinlogsServer {
return &fakePullBinlogsServer{
ctx: context.Background(),
sent: []*binlog.PullBinlogResp{},
}
}

func (x *fakePullBinlogsServer) Context() context.Context { return x.ctx }
func (x *fakePullBinlogsServer) Send(m *binlog.PullBinlogResp) error {
x.sent = append(x.sent, m)
return nil
}

func (s *pullBinlogsSuite) TestReturnErrIfClusterIDMismatched(c *C) {
server := &Server{clusterID: 42}
req := &binlog.PullBinlogReq{ClusterID: 43}
err := server.PullBinlogs(req, newFakePullBinlogsServer())
c.Assert(err, ErrorMatches, ".*mismatch.*")
}

type fakeStorage struct{}

func (s *fakeStorage) WriteBinlog(binlog *pb.Binlog) error { return nil }
func (s *fakeStorage) GCTS(ts int64) {}
func (s *fakeStorage) MaxCommitTS() int64 { return 0 }
func (s *fakeStorage) GetBinlog(ts int64) (*binlog.Binlog, error) { return nil, nil }
func (s *fakeStorage) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte {
return make(chan []byte)
}
func (s *fakeStorage) Close() error { return nil }

type fakePullable struct{ fakeStorage }

func (s *fakePullable) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte {
chl := make(chan []byte)
go func() {
for i := 0; i < 3; i++ {
chl <- []byte(fmt.Sprintf("payload_%d", i))
}
close(chl)
}()
return chl
}

func (s *pullBinlogsSuite) TestPullBinlogFromStorage(c *C) {
ctx := context.Background()
server := &Server{clusterID: 42, storage: &fakePullable{}, ctx: ctx}
req := &binlog.PullBinlogReq{
ClusterID: 42,
StartFrom: binlog.Pos{
Suffix: 1,
Offset: 97,
},
}
stream := newFakePullBinlogsServer()
err := server.PullBinlogs(req, stream)
c.Assert(err, IsNil)
c.Assert(stream.sent, HasLen, 3)
for i, resp := range stream.sent {
c.Assert(string(resp.Entity.Payload), Equals, fmt.Sprintf("payload_%d", i))
}
}

0 comments on commit 29be3cc

Please sign in to comment.