Skip to content

Commit

Permalink
release 3.0 rebase master (#587)
Browse files Browse the repository at this point in the history
* drainer/: Fix when pk is handle and value overflow int64 (#573)

* arbiter: Refactor and add more unit tests (#570)

* More tests for arbiter

* Log os.Hostname error

* drainer: Refactor, eliminate duplicate code by introducing a taskGroup (#572)

* Refactor, eliminate duplicate code by introducing a taskGroup

* Use zap.Stack directly

* binlogctl: refine log output when use help command (#580)

* Minor edits to README (#583)

* drainer: Refactor and add unit tests (#571)

* Add tests for drainer

* Use DeepEquals to simplify map testing

* Just import context

* Use Equals
  • Loading branch information
WangXiangUSTC authored and IANTHEREAL committed May 7, 2019
1 parent 3e68c99 commit 8ff1c75
Show file tree
Hide file tree
Showing 15 changed files with 983 additions and 203 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## TiDB-Binlog introduction

TiDB-Binlog is a commercial tool used to collect [TiDB's](https://github.com/pingcap/tidb) binary logs with the following features:
TiDB-Binlog is a tool used to collect [TiDB's](https://github.com/pingcap/tidb) binary logs with the following features:

- Data replication

Expand Down Expand Up @@ -73,6 +73,9 @@ The recommended startup sequence: PD -> TiKV -> [Pump](./cmd/pump) -> TiDB -> [D

The best way to install TiDB-Binlog is via [TiDB-Binlog-Ansible](https://www.pingcap.com/docs-cn/tools/tidb-binlog-cluster/)

## Tutorial

Here's a [tutorial](https://www.pingcap.com/docs/dev/how-to/get-started/tidb-binlog/) to experiment with TiDB-Binlog (not for production use).

## Config File
* Pump config file: [pump.toml](./cmd/pump/pump.toml)
Expand All @@ -83,4 +86,4 @@ Contributions are welcomed and greatly appreciated. See [CONTRIBUTING.md](./CONT
for details on submitting patches and the contribution workflow.

## License
TiDB-Binlog is under the Apache 2.0 license. See the [LICENSE](./LICENSE) file for details.
TiDB-Binlog is under the Apache 2.0 license. See the [LICENSE](./LICENSE) file for details.
5 changes: 4 additions & 1 deletion arbiter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ func init() {
Registry.MustRegister(txnLatencySecondsHistogram)
}

var getHostname = os.Hostname

func instanceName(port int) string {
hostname, err := os.Hostname()
hostname, err := getHostname()
if err != nil {
log.Error("Failed to get hostname", zap.Error(err))
return "unknown"
}
return fmt.Sprintf("%s_%d", hostname, port)
Expand Down
31 changes: 31 additions & 0 deletions arbiter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
)

type testStartSuite struct{}
Expand All @@ -41,3 +42,33 @@ func (s *testStartSuite) TestCanBeStoppedFromOutside(c *C) {
mc.Start(ctx, 1234)
close(signal)
}

type instanceNameSuite struct{}

var _ = Suite(&instanceNameSuite{})

func (s *instanceNameSuite) TestShouldRetUnknown(c *C) {
orig := getHostname
defer func() {
getHostname = orig
}()
getHostname = func() (string, error) {
return "", errors.New("host")
}

n := instanceName(9090)
c.Assert(n, Equals, "unknown")
}

func (s *instanceNameSuite) TestShouldUseHostname(c *C) {
orig := getHostname
defer func() {
getHostname = orig
}()
getHostname = func() (string, error) {
return "kendoka", nil
}

n := instanceName(9090)
c.Assert(n, Equals, "kendoka_9090")
}
44 changes: 27 additions & 17 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ import (
"go.uber.org/zap"
)

var createDB = loader.CreateDB
var (
initSafeModeDuration = time.Minute * 5

// Make it possible to mock the following functions
createDB = loader.CreateDB
newReader = reader.NewReader
newLoader = loader.NewLoader
)

// Server is the server to load data to mysql
type Server struct {
Expand Down Expand Up @@ -97,13 +104,13 @@ func NewServer(cfg *Config) (srv *Server, err error) {

log.Info("use kafka binlog reader", zap.Reflect("cfg", readerCfg))

srv.kafkaReader, err = reader.NewReader(readerCfg)
srv.kafkaReader, err = newReader(readerCfg)
if err != nil {
return nil, errors.Trace(err)
}

// set loader
srv.load, err = loader.NewLoader(srv.downDB,
srv.load, err = newLoader(srv.downDB,
loader.WorkerCount(cfg.Down.WorkerCount),
loader.BatchSize(cfg.Down.BatchSize),
loader.Metrics(&loader.MetricsGroup{
Expand All @@ -119,7 +126,7 @@ func NewServer(cfg *Config) (srv *Server, err error) {
log.Info("set safe mode to be true")
srv.load.SetSafeMode(true)
go func() {
time.Sleep(time.Minute * 5)
time.Sleep(initSafeModeDuration)
srv.load.SetSafeMode(false)
log.Info("set safe mode to be false")
}()
Expand Down Expand Up @@ -173,19 +180,8 @@ func (s *Server) Run() error {

wg.Add(1)
go func() {
defer wg.Done()

for msg := range s.kafkaReader.Messages() {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
txn := loader.SlaveBinlogToTxn(msg.Binlog)
txn.Metadata = msg
s.load.Input() <- txn

queueSizeGauge.WithLabelValues("kafka_reader").Set(float64(len(s.kafkaReader.Messages())))
queueSizeGauge.WithLabelValues("loader_input").Set(float64(len(s.load.Input())))
}

s.load.Close()
syncBinlogs(s.kafkaReader.Messages(), s.load)
wg.Done()
}()

err := s.load.Run()
Expand Down Expand Up @@ -263,3 +259,17 @@ func (s *Server) loadStatus() (int, error) {
}
return status, errors.Trace(err)
}

func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) {
dest := ld.Input()
for msg := range source {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
txn := loader.SlaveBinlogToTxn(msg.Binlog)
txn.Metadata = msg
dest <- txn

queueSizeGauge.WithLabelValues("kafka_reader").Set(float64(len(source)))
queueSizeGauge.WithLabelValues("loader_input").Set(float64(len(dest)))
}
ld.Close()
}
Loading

0 comments on commit 8ff1c75

Please sign in to comment.