From 426fef04cf8211e58f5fb528ed6e458564713a01 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 18 Dec 2018 15:25:13 +0800 Subject: [PATCH] pump client: compatible with kafka version tidb-binlog && add unit test (#139) --- dump_region/main.go | 2 +- pkg/etcd/etcd.go | 2 +- pkg/etcd/etcd_test.go | 2 +- tidb-binlog/binlogctl/meta.go | 2 +- tidb-binlog/binlogctl/nodes.go | 2 +- tidb-binlog/node/registry.go | 2 +- tidb-binlog/pump_client/client.go | 90 ++++++++++--- tidb-binlog/pump_client/client_test.go | 177 +++++++++++++++++++++++-- tidb-binlog/pump_client/pump.go | 20 ++- tidb-binlog/pump_client/selector.go | 45 +++++++ 10 files changed, 305 insertions(+), 39 deletions(-) diff --git a/dump_region/main.go b/dump_region/main.go index e3a6e1ef5..66816a3af 100644 --- a/dump_region/main.go +++ b/dump_region/main.go @@ -15,6 +15,7 @@ package main import ( "bytes" + "context" "encoding/json" "flag" "fmt" @@ -25,7 +26,6 @@ import ( "github.com/pingcap/tidb-tools/pkg/utils" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" - "golang.org/x/net/context" ) var ( diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 0beb844ac..beb866c2d 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -14,6 +14,7 @@ package etcd import ( + "context" "crypto/tls" "path" "strings" @@ -21,7 +22,6 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/pingcap/errors" - "golang.org/x/net/context" ) // Node organizes the ectd query result as a Trie tree diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 34d0e12db..bae15a365 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -14,6 +14,7 @@ package etcd import ( + "context" "testing" "time" @@ -21,7 +22,6 @@ import ( "github.com/coreos/etcd/integration" . "github.com/pingcap/check" "github.com/pingcap/errors" - "golang.org/x/net/context" ) var ( diff --git a/tidb-binlog/binlogctl/meta.go b/tidb-binlog/binlogctl/meta.go index 488fa9584..9bfff5493 100644 --- a/tidb-binlog/binlogctl/meta.go +++ b/tidb-binlog/binlogctl/meta.go @@ -15,6 +15,7 @@ package main import ( "bytes" + "context" "os" "path" "time" @@ -25,7 +26,6 @@ import ( "github.com/pingcap/tidb-tools/pkg/utils" "github.com/siddontang/go/ioutil2" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) const physicalShiftBits = 18 diff --git a/tidb-binlog/binlogctl/nodes.go b/tidb-binlog/binlogctl/nodes.go index fca4fe8fa..21f67347b 100644 --- a/tidb-binlog/binlogctl/nodes.go +++ b/tidb-binlog/binlogctl/nodes.go @@ -14,6 +14,7 @@ package main import ( + "context" "fmt" "net/http" "time" @@ -23,7 +24,6 @@ import ( "github.com/pingcap/tidb-tools/pkg/utils" "github.com/pingcap/tidb-tools/tidb-binlog/node" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) var ( diff --git a/tidb-binlog/node/registry.go b/tidb-binlog/node/registry.go index 46e1e1697..9637e9c41 100644 --- a/tidb-binlog/node/registry.go +++ b/tidb-binlog/node/registry.go @@ -1,6 +1,7 @@ package node import ( + "context" "encoding/json" "path" "strings" @@ -10,7 +11,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb-tools/pkg/etcd" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) // EtcdRegistry wraps the reactions with etcd diff --git a/tidb-binlog/pump_client/client.go b/tidb-binlog/pump_client/client.go index 48e4a5370..e8d092fec 100644 --- a/tidb-binlog/pump_client/client.go +++ b/tidb-binlog/pump_client/client.go @@ -14,6 +14,7 @@ package client import ( + "context" "crypto/tls" "encoding/json" "path" @@ -29,15 +30,17 @@ import ( "github.com/pingcap/tidb-tools/tidb-binlog/node" pb "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) const ( // DefaultEtcdTimeout is the default timeout config for etcd. DefaultEtcdTimeout = 5 * time.Second - // DefaultRetryTime is the default time of retry. - DefaultRetryTime = 20 + // DefaultAllRetryTime is the default retry time for all pumps, should greter than RetryTime. + DefaultAllRetryTime = 20 + + // RetryTime is the retry time for each pump. + RetryTime = 5 // DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump. DefaultBinlogWriteTimeout = 15 * time.Second @@ -71,6 +74,15 @@ type PumpInfos struct { UnAvaliablePumps map[string]*PumpStatus } +// NewPumpInfos returns a PumpInfos. +func NewPumpInfos() *PumpInfos { + return &PumpInfos{ + Pumps: make(map[string]*PumpStatus), + AvaliablePumps: make(map[string]*PumpStatus), + UnAvaliablePumps: make(map[string]*PumpStatus), + } +} + // PumpsClient is the client of pumps. type PumpsClient struct { ctx context.Context @@ -99,14 +111,14 @@ type PumpsClient struct { // Security is the security config Security *tls.Config + + // binlog socket file path, for compatible with kafka version pump. + binlogSocket string } // NewPumpsClient returns a PumpsClient. +// TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now. func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) { - // TODO: get strategy from etcd, and can update strategy in real-time. now use Range as default. - strategy := Range - selector := NewSelector(strategy) - ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs) if err != nil { return nil, errors.Trace(err) @@ -130,21 +142,15 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur return nil, errors.Trace(err) } - pumpInfos := &PumpInfos{ - Pumps: make(map[string]*PumpStatus), - AvaliablePumps: make(map[string]*PumpStatus), - UnAvaliablePumps: make(map[string]*PumpStatus), - } - ctx, cancel := context.WithCancel(context.Background()) newPumpsClient := &PumpsClient{ ctx: ctx, cancel: cancel, ClusterID: clusterID, EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout), - Pumps: pumpInfos, - Selector: selector, - RetryTime: DefaultRetryTime, + Pumps: NewPumpInfos(), + Selector: NewSelector(Range), + RetryTime: DefaultAllRetryTime, BinlogWriteTimeout: timeout, Security: security, } @@ -162,7 +168,55 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur return newPumpsClient, nil } -// getPumpStatus retruns all the pumps status in the etcd. +// NewLocalPumpsClient returns a PumpsClient, this PumpsClient will write binlog by socket file. For compatible with kafka version pump. +func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) { + ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs) + if err != nil { + return nil, errors.Trace(err) + } + + // get clusterid + pdCli, err := pd.NewClient(ectdEndpoints, securityOpt) + if err != nil { + return nil, errors.Trace(err) + } + clusterID := pdCli.GetClusterID(context.Background()) + pdCli.Close() + + security, err := utils.ToTLSConfig(securityOpt.CAPath, securityOpt.CertPath, securityOpt.KeyPath) + if err != nil { + return nil, errors.Trace(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + newPumpsClient := &PumpsClient{ + ctx: ctx, + cancel: cancel, + ClusterID: clusterID, + Pumps: NewPumpInfos(), + Selector: NewSelector(LocalUnix), + RetryTime: DefaultAllRetryTime, + BinlogWriteTimeout: timeout, + Security: security, + binlogSocket: binlogSocket, + } + newPumpsClient.getLocalPumpStatus(ctx) + + return newPumpsClient, nil +} + +// getLocalPumpStatus gets the local pump. For compatible with kafka version tidb-binlog. +func (c *PumpsClient) getLocalPumpStatus(pctx context.Context) { + nodeStatus := &node.Status{ + NodeID: localPump, + Addr: c.binlogSocket, + IsAlive: true, + State: node.Online, + } + c.addPump(NewPumpStatus(nodeStatus, c.Security), true) +} + +// getPumpStatus gets all the pumps status in the etcd. func (c *PumpsClient) getPumpStatus(pctx context.Context) error { nodesStatus, err := c.EtcdRegistry.Nodes(pctx, node.NodePrefix[node.PumpNode]) if err != nil { @@ -221,7 +275,7 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error { } // every pump can retry 5 times, if retry 5 times and still failed, set this pump unavaliable, and choose a new pump. - if (retryTime+1)%5 == 0 { + if (retryTime+1)%RetryTime == 0 { c.setPumpAvaliable(pump, false) pump = c.Selector.Next(binlog, retryTime/5+1) Logger.Debugf("[pumps client] avaliable pumps: %v, write binlog choose pump %v", c.Pumps.AvaliablePumps, pump) diff --git a/tidb-binlog/pump_client/client_test.go b/tidb-binlog/pump_client/client_test.go index 28f8859c7..7fa63ae74 100644 --- a/tidb-binlog/pump_client/client_test.go +++ b/tidb-binlog/pump_client/client_test.go @@ -14,12 +14,24 @@ package client import ( + "context" "fmt" + "net" + "os" "testing" + "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/tidb-tools/tidb-binlog/node" + binlog "github.com/pingcap/tipb/go-binlog" pb "github.com/pingcap/tipb/go-binlog" + "google.golang.org/grpc" +) + +var ( + testMaxRecvMsgSize = 1024 + testRetryTime = 5 ) func TestClient(t *testing.T) { @@ -37,24 +49,18 @@ var _ = Suite(&testClientSuite{}) type testClientSuite struct{} -func (t *testClientSuite) TestPumpsClient(c *C) { +func (t *testClientSuite) TestSelector(c *C) { algorithms := []string{Hash, Range} for _, algorithm := range algorithms { - t.testPumpsClient(c, algorithm) + t.testSelector(c, algorithm) } } -func (*testClientSuite) testPumpsClient(c *C, algorithm string) { - pumpInfos := &PumpInfos{ - Pumps: make(map[string]*PumpStatus), - AvaliablePumps: make(map[string]*PumpStatus), - UnAvaliablePumps: make(map[string]*PumpStatus), - } - +func (*testClientSuite) testSelector(c *C, algorithm string) { pumpsClient := &PumpsClient{ - Pumps: pumpInfos, + Pumps: NewPumpInfos(), Selector: NewSelector(algorithm), - RetryTime: DefaultRetryTime, + RetryTime: DefaultAllRetryTime, BinlogWriteTimeout: DefaultBinlogWriteTimeout, } @@ -136,3 +142,152 @@ func (*testClientSuite) testPumpsClient(c *C, algorithm string) { c.Assert(pump2.IsAvaliable, Equals, true) } } + +func (t *testClientSuite) TestWriteBinlog(c *C) { + pumpServerConfig := []struct { + addr string + serverMode string + }{ + { + "/tmp/mock-pump.sock", + "unix", + }, { + "127.0.0.1:15049", + "tcp", + }, + } + + for _, cfg := range pumpServerConfig { + pumpServer, err := createMockPumpServer(cfg.addr, cfg.serverMode) + c.Assert(err, IsNil) + + opt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout(cfg.serverMode, addr, timeout) + }) + clientCon, err := grpc.Dial(cfg.addr, opt, grpc.WithInsecure()) + c.Assert(err, IsNil) + c.Assert(clientCon, NotNil) + pumpClient := mockPumpsClient(pb.NewPumpClient(clientCon)) + + // test binlog size bigger than grpc's MaxRecvMsgSize + blog := &pb.Binlog{ + Tp: pb.BinlogType_Prewrite, + PrewriteValue: make([]byte, testMaxRecvMsgSize+1), + } + err = pumpClient.WriteBinlog(blog) + c.Assert(err, NotNil) + + for i := 0; i < 10; i++ { + // test binlog size small than grpc's MaxRecvMsgSize + blog = &pb.Binlog{ + Tp: pb.BinlogType_Prewrite, + PrewriteValue: make([]byte, 1), + } + err = pumpClient.WriteBinlog(blog) + c.Assert(err, IsNil) + } + + // after write some binlog, the pump without grpc client will move to unavaliable list in pump client. + c.Assert(len(pumpClient.Pumps.UnAvaliablePumps), Equals, 1) + + // test when pump is down + pumpServer.Close() + err = pumpClient.WriteBinlog(blog) + c.Assert(err, NotNil) + } +} + +type mockPumpServer struct { + mode string + addr string + server *grpc.Server + + retryTime int +} + +// WriteBinlog implements PumpServer interface. +func (p *mockPumpServer) WriteBinlog(ctx context.Context, req *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) { + p.retryTime++ + if p.retryTime < testRetryTime { + return &binlog.WriteBinlogResp{}, errors.New("fake error") + } + + // only the last retry will return succuess + p.retryTime = 0 + return &binlog.WriteBinlogResp{}, nil +} + +// PullBinlogs implements PumpServer interface. +func (p *mockPumpServer) PullBinlogs(req *binlog.PullBinlogReq, srv binlog.Pump_PullBinlogsServer) error { + return nil +} + +func (p *mockPumpServer) Close() { + p.server.Stop() + if p.mode == "unix" { + os.Remove(p.addr) + } +} + +func createMockPumpServer(addr string, mode string) (*mockPumpServer, error) { + if mode == "unix" { + os.Remove(addr) + } + + l, err := net.Listen(mode, addr) + if err != nil { + return nil, err + } + serv := grpc.NewServer(grpc.MaxRecvMsgSize(testMaxRecvMsgSize)) + pump := &mockPumpServer{ + mode: mode, + addr: addr, + server: serv, + } + pb.RegisterPumpServer(serv, pump) + go serv.Serve(l) + + return pump, nil +} + +// mockPumpsClient creates a PumpsClient, used for test. +func mockPumpsClient(client pb.PumpClient) *PumpsClient { + // add a avaliable pump + nodeID1 := "pump-1" + pump1 := &PumpStatus{ + Status: node.Status{ + NodeID: nodeID1, + State: node.Online, + }, + IsAvaliable: true, + Client: client, + } + + // add a pump without grpc client + nodeID2 := "pump-2" + pump2 := &PumpStatus{ + Status: node.Status{ + NodeID: nodeID2, + State: node.Online, + }, + IsAvaliable: true, + } + + pumpInfos := NewPumpInfos() + pumpInfos.Pumps[nodeID1] = pump1 + pumpInfos.AvaliablePumps[nodeID1] = pump1 + pumpInfos.Pumps[nodeID2] = pump2 + pumpInfos.AvaliablePumps[nodeID2] = pump2 + + pCli := &PumpsClient{ + ClusterID: 1, + Pumps: pumpInfos, + Selector: NewSelector(Range), + // have two pump, so use 2 * testRetryTime + RetryTime: 2 * testRetryTime, + BinlogWriteTimeout: 15 * time.Second, + } + pCli.Selector.SetPumps([]*PumpStatus{pump1, pump2}) + + return pCli +} diff --git a/tidb-binlog/pump_client/pump.go b/tidb-binlog/pump_client/pump.go index 370ddba25..bd45b9c66 100644 --- a/tidb-binlog/pump_client/pump.go +++ b/tidb-binlog/pump_client/pump.go @@ -14,6 +14,7 @@ package client import ( + "context" "crypto/tls" "net" "time" @@ -21,11 +22,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb-tools/tidb-binlog/node" pb "github.com/pingcap/tipb/go-binlog" - "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) +var ( + // localPump is used to write local pump through unix socket connection. + localPump = "localPump" +) + // PumpStatus saves pump's status. type PumpStatus struct { /* @@ -78,9 +83,16 @@ func (p *PumpStatus) createGrpcClient(security *tls.Config) error { p.grpcConn.Close() } - dialerOpt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("tcp", addr, timeout) - }) + var dialerOpt grpc.DialOption + if p.NodeID == localPump { + dialerOpt = grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }) + } else { + dialerOpt = grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("tcp", addr, timeout) + }) + } Logger.Debugf("[pumps client] create gcpc client at %s", p.Addr) var clientConn *grpc.ClientConn var err error diff --git a/tidb-binlog/pump_client/selector.go b/tidb-binlog/pump_client/selector.go index 343c250e1..5a7f7257c 100644 --- a/tidb-binlog/pump_client/selector.go +++ b/tidb-binlog/pump_client/selector.go @@ -30,6 +30,9 @@ const ( // Score means choose pump by it's score. Score = "score" + + // LocalUnix means will only use the local pump by unix socket. + LocalUnix = "local unix" ) // PumpSelector selects pump for sending binlog. @@ -224,6 +227,46 @@ func (r *RangeSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { return nextPump } +// LocalUnixSelector will always select the local pump, used for compatible with kafka version tidb-binlog. +type LocalUnixSelector struct { + sync.RWMutex + + // the pump to be selected. + Pump *PumpStatus +} + +// NewLocalUnixSelector returns a new LocalUnixSelector. +func NewLocalUnixSelector() PumpSelector { + return &LocalUnixSelector{} +} + +// SetPumps implement PumpSelector.SetPumps. +func (u *LocalUnixSelector) SetPumps(pumps []*PumpStatus) { + u.Lock() + if len(pumps) == 0 { + u.Pump = nil + } else { + u.Pump = pumps[0] + } + u.Unlock() +} + +// Select implement PumpSelector.Select. +func (u *LocalUnixSelector) Select(binlog *pb.Binlog) *PumpStatus { + u.RLock() + defer u.RUnlock() + + return u.Pump +} + +// Next implement PumpSelector.Next. Only for Prewrite binlog. +func (u *LocalUnixSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { + u.RLock() + defer u.RUnlock() + + return u.Pump +} + // ScoreSelector select a pump by pump's score. type ScoreSelector struct{} @@ -259,6 +302,8 @@ func NewSelector(algorithm string) PumpSelector { selector = NewHashSelector() case Score: selector = NewScoreSelector() + case LocalUnix: + selector = NewLocalUnixSelector() default: Logger.Warnf("unknow algorithm %s, use range as default", algorithm) selector = NewRangeSelector()