Skip to content

Commit

Permalink
pump client: compatible with kafka version tidb-binlog && add unit te…
Browse files Browse the repository at this point in the history
…st (pingcap#139)
  • Loading branch information
WangXiangUSTC committed Mar 25, 2019
1 parent 419f308 commit 426fef0
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 39 deletions.
2 changes: 1 addition & 1 deletion dump_region/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"golang.org/x/net/context"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package etcd

import (
"context"
"crypto/tls"
"path"
"strings"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/pingcap/errors"
"golang.org/x/net/context"
)

// Node organizes the ectd query result as a Trie tree
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package etcd

import (
"context"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"golang.org/x/net/context"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion tidb-binlog/binlogctl/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"bytes"
"context"
"os"
"path"
"time"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/siddontang/go/ioutil2"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

const physicalShiftBits = 18
Expand Down
2 changes: 1 addition & 1 deletion tidb-binlog/binlogctl/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"net/http"
"time"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion tidb-binlog/node/registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package node

import (
"context"
"encoding/json"
"path"
"strings"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/etcd"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

// EtcdRegistry wraps the reactions with etcd
Expand Down
90 changes: 72 additions & 18 deletions tidb-binlog/pump_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package client

import (
"context"
"crypto/tls"
"encoding/json"
"path"
Expand All @@ -29,15 +30,17 @@ import (
"github.com/pingcap/tidb-tools/tidb-binlog/node"
pb "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

const (
// DefaultEtcdTimeout is the default timeout config for etcd.
DefaultEtcdTimeout = 5 * time.Second

// DefaultRetryTime is the default time of retry.
DefaultRetryTime = 20
// DefaultAllRetryTime is the default retry time for all pumps, should greter than RetryTime.
DefaultAllRetryTime = 20

// RetryTime is the retry time for each pump.
RetryTime = 5

// DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump.
DefaultBinlogWriteTimeout = 15 * time.Second
Expand Down Expand Up @@ -71,6 +74,15 @@ type PumpInfos struct {
UnAvaliablePumps map[string]*PumpStatus
}

// NewPumpInfos returns a PumpInfos.
func NewPumpInfos() *PumpInfos {
return &PumpInfos{
Pumps: make(map[string]*PumpStatus),
AvaliablePumps: make(map[string]*PumpStatus),
UnAvaliablePumps: make(map[string]*PumpStatus),
}
}

// PumpsClient is the client of pumps.
type PumpsClient struct {
ctx context.Context
Expand Down Expand Up @@ -99,14 +111,14 @@ type PumpsClient struct {

// Security is the security config
Security *tls.Config

// binlog socket file path, for compatible with kafka version pump.
binlogSocket string
}

// NewPumpsClient returns a PumpsClient.
// TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.
func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
// TODO: get strategy from etcd, and can update strategy in real-time. now use Range as default.
strategy := Range
selector := NewSelector(strategy)

ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -130,21 +142,15 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur
return nil, errors.Trace(err)
}

pumpInfos := &PumpInfos{
Pumps: make(map[string]*PumpStatus),
AvaliablePumps: make(map[string]*PumpStatus),
UnAvaliablePumps: make(map[string]*PumpStatus),
}

ctx, cancel := context.WithCancel(context.Background())
newPumpsClient := &PumpsClient{
ctx: ctx,
cancel: cancel,
ClusterID: clusterID,
EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout),
Pumps: pumpInfos,
Selector: selector,
RetryTime: DefaultRetryTime,
Pumps: NewPumpInfos(),
Selector: NewSelector(Range),
RetryTime: DefaultAllRetryTime,
BinlogWriteTimeout: timeout,
Security: security,
}
Expand All @@ -162,7 +168,55 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur
return newPumpsClient, nil
}

// getPumpStatus retruns all the pumps status in the etcd.
// NewLocalPumpsClient returns a PumpsClient, this PumpsClient will write binlog by socket file. For compatible with kafka version pump.
func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs)
if err != nil {
return nil, errors.Trace(err)
}

// get clusterid
pdCli, err := pd.NewClient(ectdEndpoints, securityOpt)
if err != nil {
return nil, errors.Trace(err)
}
clusterID := pdCli.GetClusterID(context.Background())
pdCli.Close()

security, err := utils.ToTLSConfig(securityOpt.CAPath, securityOpt.CertPath, securityOpt.KeyPath)
if err != nil {
return nil, errors.Trace(err)
}

ctx, cancel := context.WithCancel(context.Background())
newPumpsClient := &PumpsClient{
ctx: ctx,
cancel: cancel,
ClusterID: clusterID,
Pumps: NewPumpInfos(),
Selector: NewSelector(LocalUnix),
RetryTime: DefaultAllRetryTime,
BinlogWriteTimeout: timeout,
Security: security,
binlogSocket: binlogSocket,
}
newPumpsClient.getLocalPumpStatus(ctx)

return newPumpsClient, nil
}

// getLocalPumpStatus gets the local pump. For compatible with kafka version tidb-binlog.
func (c *PumpsClient) getLocalPumpStatus(pctx context.Context) {
nodeStatus := &node.Status{
NodeID: localPump,
Addr: c.binlogSocket,
IsAlive: true,
State: node.Online,
}
c.addPump(NewPumpStatus(nodeStatus, c.Security), true)
}

// getPumpStatus gets all the pumps status in the etcd.
func (c *PumpsClient) getPumpStatus(pctx context.Context) error {
nodesStatus, err := c.EtcdRegistry.Nodes(pctx, node.NodePrefix[node.PumpNode])
if err != nil {
Expand Down Expand Up @@ -221,7 +275,7 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error {
}

// every pump can retry 5 times, if retry 5 times and still failed, set this pump unavaliable, and choose a new pump.
if (retryTime+1)%5 == 0 {
if (retryTime+1)%RetryTime == 0 {
c.setPumpAvaliable(pump, false)
pump = c.Selector.Next(binlog, retryTime/5+1)
Logger.Debugf("[pumps client] avaliable pumps: %v, write binlog choose pump %v", c.Pumps.AvaliablePumps, pump)
Expand Down
Loading

0 comments on commit 426fef0

Please sign in to comment.