Skip to content

Commit

Permalink
tool sync from kafka to mysql (#441)
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Feb 27, 2019
1 parent b4c33a7 commit 1aed4c1
Show file tree
Hide file tree
Showing 30 changed files with 1,011 additions and 70 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
### Makefile for tidb-binlog
.PHONY: build test check update clean pump drainer fmt diff reparo integration_test
.PHONY: build test check update clean pump drainer fmt diff reparo integration_test arbiter

PROJECT=tidb-binlog

Expand Down Expand Up @@ -34,16 +34,19 @@ buildsucc:

all: dev install

dev: check test build
dev: check test

build: pump drainer reparo
build: pump drainer reparo arbiter

pump:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/pump cmd/pump/main.go

drainer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/drainer cmd/drainer/main.go

arbiter:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/arbiter cmd/arbiter/main.go

diff:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/diff cmd/diff/main.go

Expand Down
67 changes: 67 additions & 0 deletions arbiter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
Arbiter
==========

**Arbiter** is a tool used for syncing data from Kafka to TiDB incrementally.

![](./arbiter.png)

The complete import process is as follows:

1. Read Binlog from Kafka in the format of [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/slave_binlog_proto/proto/binlog.proto).
2. While reaching a limit data size, construct the SQL according the Binlog and wirte to downstream concurrently(notice: Arbiter will split the upstream transaction).
3. Save the checkpoint.


## Checkpoint
`arbiter` will write a record to the table `tidb_binlog.arbiter_checkpoint` at downstream TiDB.
```
mysql> select * from tidb_binlog.arbiter_checkpoint;
+-------------+--------------------+--------+
| topic_name | ts | status |
+-------------+--------------------+--------+
| test_kafka4 | 405809779094585347 | 1 |
+-------------+--------------------+--------+
```
- topic_name: the topic name of Kafka to consume.
- ts: the timestamp checkpoint
- status:
* 0
All Binlog data <= ts has synced to downstream.
* 1
means `Arbiter` is running or quit unexpectedly, Binlog with timestamp bigger than ts may partially synced to downstream.



## Monitor

Arbiter supports metrics collection via [Prometheus](https://prometheus.io/).

###Metrics

* **`binlog_arbiter_checkpoint_tso`** (Gauge)

Corresponding to ts in table `tidb_binlog.arbiter_checkpoint`

* **`binlog_arbiter_query_duration_time`** (Histogram)

Bucketed histogram of the time needed to wirte to downstream. Labels:

* **type**: `exec` `commit` time takes to execute and commit SQL.

* **`binlog_arbiter_event`** (Counter)

Event times counter. Labels:

* **type**: e.g. `DDL` `Insert` `Update` `Delete` `Txn`

* **`binlog_arbiter_queue_size`** (Gauge)

Queue size. Labels:

* **name**: e.g. `kafka_reader` `loader_input`

* **`binlog_arbiter_txn_latency_seconds`** (Histogram)

Bucketed histogram of the time duration between the time write to downstream and commit time of upstream transaction(phsical part of commitTS).


67 changes: 67 additions & 0 deletions arbiter/README_CN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
Arbiter
==========

**Arbiter** 是一个从 Kafka 获取 Binlog 增量同步数据到 TiDB 的工具.

![](./arbiter.png)

整体工作原理如下:

1. 读取 Kafka 的 [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/slave_binlog_proto/proto/binlog.proto) 格式 Binlog 。
2. 达到一定数据量后 根据 Binlog 构造对应 SQL 并发写入下游(注意 Arbiter 会拆分上游事务)。
3. 保存 checkpoint 。


## Checkpoint
`arbiter` 会在下游 TiDB `tidb_binlog.arbiter_checkpoint` 表里保存一条 checkpoint 记录。
```
mysql> select * from tidb_binlog.arbiter_checkpoint;
+-------------+--------------------+--------+
| topic_name | ts | status |
+-------------+--------------------+--------+
| test_kafka4 | 405809779094585347 | 1 |
+-------------+--------------------+--------+
```
- topic_name: 消费的 Kafka 主题名。
- ts: 当前同步到了哪个 ts
- status:
* 0
表示 <= ts 的数据都同步到下游了。
* 1
运行中或者异常退出,> ts 后的部分 Binlog 可能同步到下游。



## 监控

Arbiter 支持给 [Prometheus](https://prometheus.io/) 采集度量 (metrics)。

### 度量

* **`binlog_arbiter_checkpoint_tso`** (测量仪)

对应 `tidb_binlog.arbiter_checkpoint` 表里的 ts

* **`binlog_arbiter_query_duration_time`** (直方图)

写下游需时的直方图。标签:

* **type**: `exec` `commit` 执行 SQL 跟提交时的耗时。

* **`binlog_arbiter_event`** (计数器)

计算事件次数

* **type**: `DDL` `Insert` `Update` `Delete` `Txn`

* **`binlog_arbiter_queue_size`** (测量仪)

内部队列数据囤积大小。标签:

* **name**: `kafka_reader` `loader_input`

* **`binlog_arbiter_txn_latency_seconds`** (直方图)

上游事务提交(commitTS物理时间) 到对应事务写入下游的花时。


Binary file added arbiter/arbiter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
90 changes: 90 additions & 0 deletions arbiter/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package arbiter

import (
gosql "database/sql"
"fmt"

pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"

"github.com/pingcap/errors"
)

const (
// StatusNormal means server quit normally, data <= ts is synced to downstream
StatusNormal int = 0
// StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream
StatusRunning int = 1
)

// Checkpoint to save the checkpoint
type Checkpoint struct {
database string
table string
db *gosql.DB
topicName string
}

// NewCheckpoint creates a Checkpoint
func NewCheckpoint(db *gosql.DB, topicName string) (cp *Checkpoint, err error) {
cp = &Checkpoint{
db: db,
database: "tidb_binlog",
table: "arbiter_checkpoint",
topicName: topicName,
}

err = cp.createSchemaIfNeed()
if err != nil {
return nil, errors.Trace(err)
}

return cp, nil
}

func (c *Checkpoint) createSchemaIfNeed() error {
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", pkgsql.QuoteName(c.database))
_, err := c.db.Exec(sql)
if err != nil {
return errors.Trace(err)
}

sql = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s(
topic_name VARCHAR(255) PRIMARY KEY, ts BIGINT NOT NULL, status INT NOT NULL)`,
pkgsql.QuoteSchema(c.database, c.table))
_, err = c.db.Exec(sql)
if err != nil {
return errors.Trace(err)
}

return nil
}

// Save saves the ts and status
func (c *Checkpoint) Save(ts int64, status int) error {
sql := fmt.Sprintf("REPLACE INTO %s(topic_name, ts, status) VALUES(?,?,?)",
pkgsql.QuoteSchema(c.database, c.table))
_, err := c.db.Exec(sql, c.topicName, ts, status)
if err != nil {
return errors.Annotatef(err, "exec fail: '%s', args: %s %d, %d", sql, c.topicName, ts, status)
}

return nil
}

// Load return ts and status, if no record in checkpoint, return err = errors.NotFoundf
func (c *Checkpoint) Load() (ts int64, status int, err error) {
sql := fmt.Sprintf("SELECT ts, status FROM %s WHERE topic_name = ?",
pkgsql.QuoteSchema(c.database, c.table))

row := c.db.QueryRow(sql, c.topicName)

err = row.Scan(&ts, &status)
if err != nil {
if errors.Cause(err) == gosql.ErrNoRows {
return 0, 0, errors.NotFoundf("no checkpoint for: %s", c.topicName)
}
return 0, 0, errors.Trace(err)
}

return
}
68 changes: 68 additions & 0 deletions arbiter/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package arbiter

import (
"fmt"
"testing"

sqlmock "github.com/DATA-DOG/go-sqlmock"
check "github.com/pingcap/check"
"github.com/pingcap/errors"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
)

func Test(t *testing.T) { check.TestingT(t) }

type CheckpointSuite struct {
}

var _ = check.Suite(&CheckpointSuite{})

func setNewExpect(mock sqlmock.Sqlmock) {
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS").WillReturnResult(sqlmock.NewResult(0, 1))
}

func (cs *CheckpointSuite) TestNewCheckpoint(c *check.C) {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)

setNewExpect(mock)

_, err = NewCheckpoint(db, "topic_name")
c.Assert(err, check.IsNil)

c.Assert(mock.ExpectationsWereMet(), check.IsNil)
}

func (cs *CheckpointSuite) TestSaveAndLoad(c *check.C) {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)

setNewExpect(mock)
cp, err := NewCheckpoint(db, "topic_name")
c.Assert(err, check.IsNil)
sql := fmt.Sprintf("SELECT (.+) FROM %s WHERE topic_name = ?",
pkgsql.QuoteSchema(cp.database, cp.table))
mock.ExpectQuery(sql).WithArgs(cp.topicName).
WillReturnError(errors.NotFoundf("no checkpoint for: %s", cp.topicName))

_, _, err = cp.Load()
c.Log(err)
c.Assert(errors.IsNotFound(err), check.IsTrue)

var saveTS int64 = 10
saveStatus := 1
mock.ExpectExec("REPLACE INTO").
WithArgs(cp.topicName, saveTS, saveStatus).
WillReturnResult(sqlmock.NewResult(0, 1))
err = cp.Save(saveTS, saveStatus)
c.Assert(err, check.IsNil)

rows := sqlmock.NewRows([]string{"ts", "status"}).
AddRow(saveTS, saveStatus)
mock.ExpectQuery("SELECT ts, status FROM").WillReturnRows(rows)
ts, status, err := cp.Load()
c.Assert(err, check.IsNil)
c.Assert(ts, check.Equals, saveTS)
c.Assert(status, check.Equals, saveStatus)
}
Loading

0 comments on commit 1aed4c1

Please sign in to comment.