Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pump client: compatible with kafka version tidb-binlog && add unit test #139

Merged
merged 15 commits into from
Dec 18, 2018
88 changes: 71 additions & 17 deletions tidb-binlog/pump_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ const (
// DefaultEtcdTimeout is the default timeout config for etcd.
DefaultEtcdTimeout = 5 * time.Second

// DefaultRetryTime is the default time of retry.
DefaultRetryTime = 20
// DefaultAllRetryTime is the default retry time for all pumps, should greter than RetryTime.
DefaultAllRetryTime = 20

// RetryTime is the retry time for each pump.
RetryTime = 5

// DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump.
DefaultBinlogWriteTimeout = 15 * time.Second
Expand Down Expand Up @@ -71,6 +74,15 @@ type PumpInfos struct {
UnAvaliablePumps map[string]*PumpStatus
}

// NewPumpInfos returns a PumpInfos.
func NewPumpInfos() *PumpInfos {
return &PumpInfos{
Pumps: make(map[string]*PumpStatus),
AvaliablePumps: make(map[string]*PumpStatus),
UnAvaliablePumps: make(map[string]*PumpStatus),
}
}

// PumpsClient is the client of pumps.
type PumpsClient struct {
ctx context.Context
Expand Down Expand Up @@ -99,14 +111,14 @@ type PumpsClient struct {

// Security is the security config
Security *tls.Config

// binlog socket file path, for compatible with kafka version pump.
binlogSocket string
}

// NewPumpsClient returns a PumpsClient.
// TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.
func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
// TODO: get strategy from etcd, and can update strategy in real-time. now use Range as default.
strategy := Range
selector := NewSelector(strategy)

ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -130,21 +142,15 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur
return nil, errors.Trace(err)
}

pumpInfos := &PumpInfos{
Pumps: make(map[string]*PumpStatus),
AvaliablePumps: make(map[string]*PumpStatus),
UnAvaliablePumps: make(map[string]*PumpStatus),
}

ctx, cancel := context.WithCancel(context.Background())
newPumpsClient := &PumpsClient{
ctx: ctx,
cancel: cancel,
ClusterID: clusterID,
EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout),
Pumps: pumpInfos,
Selector: selector,
RetryTime: DefaultRetryTime,
Pumps: NewPumpInfos(),
Selector: NewSelector(Range),
RetryTime: DefaultAllRetryTime,
BinlogWriteTimeout: timeout,
Security: security,
}
Expand All @@ -162,7 +168,55 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur
return newPumpsClient, nil
}

// getPumpStatus retruns all the pumps status in the etcd.
// NewLocalPumpsClient returns a PumpsClient, this PumpsClient will write binlog by socket file. For compatible with kafka version pump.
func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs)
if err != nil {
return nil, errors.Trace(err)
}

// get clusterid
pdCli, err := pd.NewClient(ectdEndpoints, securityOpt)
if err != nil {
return nil, errors.Trace(err)
}
clusterID := pdCli.GetClusterID(context.Background())
pdCli.Close()

security, err := utils.ToTLSConfig(securityOpt.CAPath, securityOpt.CertPath, securityOpt.KeyPath)
if err != nil {
return nil, errors.Trace(err)
}

ctx, cancel := context.WithCancel(context.Background())
newPumpsClient := &PumpsClient{
ctx: ctx,
cancel: cancel,
ClusterID: clusterID,
Pumps: NewPumpInfos(),
Selector: NewSelector(LocalUnix),
RetryTime: DefaultAllRetryTime,
BinlogWriteTimeout: timeout,
Security: security,
binlogSocket: binlogSocket,
}
newPumpsClient.getLocalPumpStatus(ctx)

return newPumpsClient, nil
}

// getLocalPumpStatus gets the local pump. For compatible with kafka version tidb-binlog.
func (c *PumpsClient) getLocalPumpStatus(pctx context.Context) {
nodeStatus := &node.Status{
NodeID: localPump,
Addr: c.binlogSocket,
IsAlive: true,
State: node.Online,
}
c.addPump(NewPumpStatus(nodeStatus, c.Security), true)
}

// getPumpStatus gets all the pumps status in the etcd.
func (c *PumpsClient) getPumpStatus(pctx context.Context) error {
nodesStatus, err := c.EtcdRegistry.Nodes(pctx, node.NodePrefix[node.PumpNode])
if err != nil {
Expand Down Expand Up @@ -221,7 +275,7 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error {
}

// every pump can retry 5 times, if retry 5 times and still failed, set this pump unavaliable, and choose a new pump.
if (retryTime+1)%5 == 0 {
if (retryTime+1)%RetryTime == 0 {
c.setPumpAvaliable(pump, false)
pump = c.Selector.Next(binlog, retryTime/5+1)
Logger.Debugf("[pumps client] avaliable pumps: %v, write binlog choose pump %v", c.Pumps.AvaliablePumps, pump)
Expand Down
177 changes: 166 additions & 11 deletions tidb-binlog/pump_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,24 @@
package client

import (
"context"
"fmt"
"net"
"os"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
binlog "github.com/pingcap/tipb/go-binlog"
pb "github.com/pingcap/tipb/go-binlog"
"google.golang.org/grpc"
)

var (
testMaxRecvMsgSize = 1024
testRetryTime = 5
)

func TestClient(t *testing.T) {
Expand All @@ -37,24 +49,18 @@ var _ = Suite(&testClientSuite{})

type testClientSuite struct{}

func (t *testClientSuite) TestPumpsClient(c *C) {
func (t *testClientSuite) TestSelector(c *C) {
algorithms := []string{Hash, Range}
for _, algorithm := range algorithms {
t.testPumpsClient(c, algorithm)
t.testSelector(c, algorithm)
}
}

func (*testClientSuite) testPumpsClient(c *C, algorithm string) {
pumpInfos := &PumpInfos{
Pumps: make(map[string]*PumpStatus),
AvaliablePumps: make(map[string]*PumpStatus),
UnAvaliablePumps: make(map[string]*PumpStatus),
}

func (*testClientSuite) testSelector(c *C, algorithm string) {
pumpsClient := &PumpsClient{
Pumps: pumpInfos,
Pumps: NewPumpInfos(),
Selector: NewSelector(algorithm),
RetryTime: DefaultRetryTime,
RetryTime: DefaultAllRetryTime,
BinlogWriteTimeout: DefaultBinlogWriteTimeout,
}

Expand Down Expand Up @@ -136,3 +142,152 @@ func (*testClientSuite) testPumpsClient(c *C, algorithm string) {
c.Assert(pump2.IsAvaliable, Equals, true)
}
}

func (t *testClientSuite) TestWriteBinlog(c *C) {
pumpServerConfig := []struct {
addr string
serverMode string
}{
{
"/tmp/mock-pump.sock",
"unix",
}, {
"127.0.0.1:15049",
"tcp",
},
}

for _, cfg := range pumpServerConfig {
pumpServer, err := createMockPumpServer(cfg.addr, cfg.serverMode)
c.Assert(err, IsNil)

opt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(cfg.serverMode, addr, timeout)
})
clientCon, err := grpc.Dial(cfg.addr, opt, grpc.WithInsecure())
c.Assert(err, IsNil)
c.Assert(clientCon, NotNil)
pumpClient := mockPumpsClient(pb.NewPumpClient(clientCon))

// test binlog size bigger than grpc's MaxRecvMsgSize
blog := &pb.Binlog{
Tp: pb.BinlogType_Prewrite,
PrewriteValue: make([]byte, testMaxRecvMsgSize+1),
}
err = pumpClient.WriteBinlog(blog)
c.Assert(err, NotNil)

for i := 0; i < 10; i++ {
// test binlog size small than grpc's MaxRecvMsgSize
blog = &pb.Binlog{
Tp: pb.BinlogType_Prewrite,
PrewriteValue: make([]byte, 1),
}
err = pumpClient.WriteBinlog(blog)
c.Assert(err, IsNil)
}

// after write some binlog, the pump without grpc client will move to unavaliable list in pump client.
c.Assert(len(pumpClient.Pumps.UnAvaliablePumps), Equals, 1)

// test when pump is down
pumpServer.Close()
err = pumpClient.WriteBinlog(blog)
c.Assert(err, NotNil)
}
}

type mockPumpServer struct {
mode string
addr string
server *grpc.Server

retryTime int
}

// WriteBinlog implements PumpServer interface.
func (p *mockPumpServer) WriteBinlog(ctx context.Context, req *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) {
p.retryTime++
if p.retryTime < testRetryTime {
return &binlog.WriteBinlogResp{}, errors.New("fake error")
}

// only the last retry will return succuess
p.retryTime = 0
return &binlog.WriteBinlogResp{}, nil
}

// PullBinlogs implements PumpServer interface.
func (p *mockPumpServer) PullBinlogs(req *binlog.PullBinlogReq, srv binlog.Pump_PullBinlogsServer) error {
return nil
}

func (p *mockPumpServer) Close() {
p.server.Stop()
if p.mode == "unix" {
os.Remove(p.addr)
}
}

func createMockPumpServer(addr string, mode string) (*mockPumpServer, error) {
if mode == "unix" {
os.Remove(addr)
}

l, err := net.Listen(mode, addr)
if err != nil {
return nil, err
}
serv := grpc.NewServer(grpc.MaxRecvMsgSize(testMaxRecvMsgSize))
pump := &mockPumpServer{
mode: mode,
addr: addr,
server: serv,
}
pb.RegisterPumpServer(serv, pump)
go serv.Serve(l)

return pump, nil
}

// mockPumpsClient creates a PumpsClient, used for test.
func mockPumpsClient(client pb.PumpClient) *PumpsClient {
// add a avaliable pump
nodeID1 := "pump-1"
pump1 := &PumpStatus{
Status: node.Status{
NodeID: nodeID1,
State: node.Online,
},
IsAvaliable: true,
Client: client,
}

// add a pump without grpc client
nodeID2 := "pump-2"
pump2 := &PumpStatus{
Status: node.Status{
NodeID: nodeID2,
State: node.Online,
},
IsAvaliable: true,
}

pumpInfos := NewPumpInfos()
pumpInfos.Pumps[nodeID1] = pump1
pumpInfos.AvaliablePumps[nodeID1] = pump1
pumpInfos.Pumps[nodeID2] = pump2
pumpInfos.AvaliablePumps[nodeID2] = pump2

pCli := &PumpsClient{
ClusterID: 1,
Pumps: pumpInfos,
Selector: NewSelector(Range),
// have two pump, so use 2 * testRetryTime
RetryTime: 2 * testRetryTime,
BinlogWriteTimeout: 15 * time.Second,
}
pCli.Selector.SetPumps([]*PumpStatus{pump1, pump2})

return pCli
}
18 changes: 15 additions & 3 deletions tidb-binlog/pump_client/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"google.golang.org/grpc/credentials"
)

var (
// localPump is used to write local pump through unix socket connection.
localPump = "localPump"
)

// PumpStatus saves pump's status.
type PumpStatus struct {
/*
Expand Down Expand Up @@ -78,9 +83,16 @@ func (p *PumpStatus) createGrpcClient(security *tls.Config) error {
p.grpcConn.Close()
}

dialerOpt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, timeout)
})
var dialerOpt grpc.DialOption
if p.NodeID == localPump {
dialerOpt = grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
})
} else {
dialerOpt = grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, timeout)
})
}
Logger.Debugf("[pumps client] create gcpc client at %s", p.Addr)
var clientConn *grpc.ClientConn
var err error
Expand Down
Loading