Skip to content

Commit

Permalink
cherry pick pkg/loader (#471)
Browse files Browse the repository at this point in the history
* pkg/loader: add pkg to load data to mysql (#436)

* tests/* add swap unique index value test (#437)

* fix json type and may lost data when restart (#463)

* translate.go use string type instead of []byte for json field
  • Loading branch information
july2993 authored Feb 27, 2019
1 parent 6293884 commit b4c33a7
Show file tree
Hide file tree
Showing 38 changed files with 4,172 additions and 260 deletions.
15 changes: 8 additions & 7 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/drainer/executor"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/loader"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -55,7 +56,7 @@ type Syncer struct {

filter *filter

causality *causality
causality *loader.Causality

lastSyncTime time.Time
}
Expand All @@ -70,7 +71,7 @@ func NewSyncer(ctx context.Context, cp checkpoint.CheckPoint, cfg *SyncerConfig)
syncer.ctx, syncer.cancel = context.WithCancel(ctx)
syncer.initCommitTS = cp.TS()
syncer.positions = make(map[string]int64)
syncer.causality = newCausality()
syncer.causality = loader.NewCausality()
syncer.lastSyncTime = time.Now()
syncer.filter = newFilter(formatIgnoreSchemas(cfg.IgnoreSchemas), cfg.DoDBs, cfg.DoTables)

Expand Down Expand Up @@ -243,7 +244,7 @@ func (s *Syncer) addJob(job *job) {
if wait {
eventCounter.WithLabelValues("savepoint").Add(1)
s.jobWg.Wait()
s.causality.reset()
s.causality.Reset()
s.savePoint(job.commitTS)
}
}
Expand All @@ -270,18 +271,18 @@ func (s *Syncer) resolveCasuality(keys []string) (string, error) {
return keys[0], nil
}

if s.causality.detectConflict(keys) {
if s.causality.DetectConflict(keys) {
if err := s.flushJobs(); err != nil {
return "", errors.Trace(err)
}
s.causality.reset()
s.causality.Reset()
}

if err := s.causality.add(keys); err != nil {
if err := s.causality.Add(keys); err != nil {
return "", errors.Trace(err)
}

return s.causality.get(keys[0]), nil
return s.causality.Get(keys[0]), nil
}

func (s *Syncer) flushJobs() error {
Expand Down
13 changes: 7 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module github.com/pingcap/tidb-binlog

require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.3.2
github.com/Shopify/sarama v1.18.0
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/beorn7/perks v0.0.0-20160229213445-3ac7bf7a47d1 // indirect
Expand Down Expand Up @@ -62,15 +63,15 @@ require (
github.com/petar/GoLLRB v0.0.0-20130427215148-53be0d36a84c // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pingcap/check v0.0.0-20171206051426-1c287c953996
github.com/pingcap/errors v0.11.0 // indirect
github.com/pingcap/errors v0.11.0
github.com/pingcap/goleveldb v0.0.0-20161010101021-158edde5a354 // indirect
github.com/pingcap/kvproto v0.0.0-20181010074705-0ba3ca8a6e37 // indirect
github.com/pingcap/parser v0.0.0-20181210061630-27e9d3e251d4 // indirect
github.com/pingcap/pd v2.0.5+incompatible
github.com/pingcap/tidb v2.1.0-beta.0.20180823032518-ef6590e1899a+incompatible
github.com/pingcap/tidb-tools v2.1.1-0.20181130053235-0206fdab9ef8+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190215110732-23405d82dbe6+incompatible
github.com/pingcap/tipb v0.0.0-20180711115030-4141907f6909
github.com/pkg/errors v0.8.0 // indirect
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v0.8.0
github.com/prometheus/client_model v0.0.0-20150212101744-fa8ad6fec335 // indirect
Expand All @@ -79,7 +80,7 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165
github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be
github.com/siddontang/go v0.0.0-20161005110831-1e9ce2a5ac40
github.com/sirupsen/logrus v0.0.0-20180830201151-78fa2915c1fa // indirect
github.com/sirupsen/logrus v0.0.0-20180830201151-78fa2915c1fa
github.com/soheilhy/cmux v0.1.2
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 // indirect
github.com/stretchr/testify v1.2.2 // indirect
Expand All @@ -97,7 +98,7 @@ require (
golang.org/x/crypto v0.0.0-20150218234220-1351f936d976 // indirect
golang.org/x/lint v0.0.0-20181011164241-5906bd5c48cd // indirect
golang.org/x/net v0.0.0-20180724234803-3673e40ba225
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20161006025142-8d1157a43547
golang.org/x/time v0.0.0-20170420181420-c06e80d9300e // indirect
golang.org/x/tools v0.0.0-20181012201414-c0eb142035b5 // indirect
Expand All @@ -109,7 +110,7 @@ require (
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.0.0-20170407172122-cd8b52f8269e // indirect
)
31 changes: 31 additions & 0 deletions pkg/loader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
loader
======

A package to load data into MySQL in real-time, aimed to be used by *reparo*, *drainer* etc unified.


### Getting started
- Example is available via [example_loader_test.go](./example_loader_test.go)

You need to write a translator to use *Loader* like *SlaveBinlogToTxn* in [translate.go](./translate.go) to translate upstream data format (e.g. binlog) into `Txn` objects.


## Overview
Loader splits the upstream transaction DML events and concurrently (shared by primary key or unique key) loads data into MySQL. It respects causality with [causality.go](./causality.go).


## Optimization
#### Large Operation
Instead of executing DML one by one, we can combine many small operations into a single large operation, like using INSERT statements with multiple VALUES lists to insert several rows at a time. This is [faster](https://medium.com/@benmorel/high-speed-inserts-with-mysql-9d3dcd76f723) than inserting one by one.

#### Merge by Primary Key
You may want to read [log-compaction](https://kafka.apache.org/documentation/#compaction) of Kafka.

We can treat a table with Primary Key like a KV-store. To reload the table with the change history of the table, we only need the last value of every key.

While synchronizing data into downstream at real-time, we can get DML events from upstream in batchs and merge by key. After merging, there's only one event for each key, so at downstream, we don't need to do as many events as upstream. This also help we to use batch insert operation.

We should also consider secondary unique key here, see *execTableBatch* in [executor.go](./executor.go). Currently, we only merge by primary key and do batch operation if the table have primary key and no unique key.



239 changes: 239 additions & 0 deletions pkg/loader/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package loader

import (
"database/sql"
"fmt"
"sync"
"testing"

_ "github.com/go-sql-driver/mysql"
"github.com/juju/errors"
"github.com/ngaut/log"
)

func getTestDB() (db *sql.DB, err error) {
dsn := "root:@tcp(127.0.0.1:3306)/?charset=utf8&interpolateParams=true&readTimeout=1m&multiStatements=true"
db, err = sql.Open("mysql", dsn)
return
}

func BenchmarkInsertMerge(b *testing.B) {
benchmarkWrite(b, true)
}

func BenchmarkInsertNoMerge(b *testing.B) {
benchmarkWrite(b, false)
}

func BenchmarkUpdateMerge(b *testing.B) {
benchmarkUpdate(b, true)
}

func BenchmarkUpdateNoMerge(b *testing.B) {
benchmarkUpdate(b, false)
}

func BenchmarkDeleteMerge(b *testing.B) {
benchmarkDelete(b, true)
}

func BenchmarkDeleteNoMerge(b *testing.B) {
benchmarkDelete(b, false)
}

func benchmarkUpdate(b *testing.B, merge bool) {
log.SetLevelByString("error")

r, err := newRunner(merge)
if err != nil {
b.Fatal(err)
}

dropTable(r.db, r.loader)
createTable(r.db, r.loader)

loadTable(r.db, r.loader, b.N)

b.ResetTimer()
updateTable(r.db, r.loader, b.N)

r.close()
}

func benchmarkDelete(b *testing.B, merge bool) {
log.SetLevelByString("error")

r, err := newRunner(merge)
if err != nil {
b.Fatal(err)
}

dropTable(r.db, r.loader)
createTable(r.db, r.loader)

loadTable(r.db, r.loader, b.N)

b.ResetTimer()
deleteTable(r.db, r.loader, b.N)

r.close()
}

func benchmarkWrite(b *testing.B, merge bool) {
log.SetLevelByString("error")

r, err := newRunner(merge)
if err != nil {
b.Fatal(err)
}

dropTable(r.db, r.loader)
createTable(r.db, r.loader)

b.ResetTimer()
loadTable(r.db, r.loader, b.N)

r.close()
}

type runner struct {
db *sql.DB
loader *Loader
wg sync.WaitGroup
}

func newRunner(merge bool) (r *runner, err error) {
db, err := getTestDB()
if err != nil {
return nil, errors.Trace(err)
}

loader, err := NewLoader(db, WorkerCount(16), BatchSize(128))
if err != nil {
return nil, errors.Trace(err)
}

loader.merge = merge

r = new(runner)
r.db = db
r.loader = loader

r.wg.Add(1)
go func() {
err := loader.Run()
if err != nil {
log.Fatal(err)
}
r.wg.Done()
}()

go func() {
for range loader.Successes() {

}
}()

return
}

func (r *runner) close() {
r.loader.Close()
r.wg.Wait()
}

func createTable(db *sql.DB, loader *Loader) error {
var sql string

sql = "create table test1(id int primary key, a1 int)"
// sql = "create table test1(id int, a1 int, UNIQUE KEY `id` (`id`))"
loader.Input() <- NewDDLTxn("test", "test1", sql)

return nil
}

func dropTable(db *sql.DB, loader *Loader) error {
sql := fmt.Sprintf("drop table if exists test1")
loader.Input() <- NewDDLTxn("test", "test1", sql)
return nil
}

func loadTable(db *sql.DB, loader *Loader, n int) error {
var txns []*Txn
for i := 0; i < n; i++ {
txn := new(Txn)
dml := &DML{
Database: "test",
Table: "test1",
Tp: InsertDMLType,
Values: map[string]interface{}{
"id": i,
"a1": i,
},
}

txn.AppendDML(dml)
txns = append(txns, txn)
}

for _, txn := range txns {
loader.Input() <- txn
}

return nil
}

func updateTable(db *sql.DB, loader *Loader, n int) error {
var txns []*Txn
for i := 0; i < n; i++ {
txn := new(Txn)
dml := &DML{
Database: "test",
Table: "test1",
Tp: UpdateDMLType,
Values: map[string]interface{}{
"id": i,
"a1": i * 10,
},
OldValues: map[string]interface{}{
"id": i,
"a1": i,
},
}

txn.AppendDML(dml)
txns = append(txns, txn)
}

for _, txn := range txns {
loader.Input() <- txn
}

return nil
}

func deleteTable(db *sql.DB, loader *Loader, n int) error {
var txns []*Txn
for i := 0; i < n; i++ {
txn := new(Txn)
dml := &DML{
Database: "test",
Table: "test1",
Tp: DeleteDMLType,
Values: map[string]interface{}{
"id": i,
"a1": i,
},
}

txn.AppendDML(dml)
txns = append(txns, txn)
}

for _, txn := range txns {
loader.Input() <- txn
}

return nil

}
Loading

0 comments on commit b4c33a7

Please sign in to comment.