Skip to content

Commit

Permalink
Merge branch 'release-3.0' into release-3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Feb 11, 2020
2 parents 88c47fb + 9ed320f commit 4603c5d
Show file tree
Hide file tree
Showing 34 changed files with 3,226 additions and 116 deletions.
7 changes: 6 additions & 1 deletion binlogctl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ const (

// OfflineDrainer is comamnd used for offlien drainer.
OfflineDrainer = "offline-drainer"

// Encrypt is command used for encrypt password.
Encrypt = "encrypt"
)

// Config holds the configuration of drainer
Expand All @@ -74,6 +77,7 @@ type Config struct {
SSLKey string `toml:"ssl-key" json:"ssl-key"`
State string `toml:"state" json:"state"`
ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"`
Text string `toml:"text" json:"text"`
tls *tls.Config
printVersion bool
}
Expand All @@ -83,7 +87,7 @@ func NewConfig() *Config {
cfg := &Config{}
cfg.FlagSet = flag.NewFlagSet("binlogctl", flag.ContinueOnError)

cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\"")
cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\", \"encrypt\"")
cfg.FlagSet.StringVar(&cfg.NodeID, "node-id", "", "id of node, use to update some node with operation update-pump, update-drainer, pause-pump, pause-drainer, offline-pump and offline-drainer")
cfg.FlagSet.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "meta directory path")
cfg.FlagSet.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints")
Expand All @@ -93,6 +97,7 @@ func NewConfig() *Config {
cfg.FlagSet.StringVar(&cfg.TimeZone, "time-zone", "", "set time zone if you want save time info in savepoint file, for example `Asia/Shanghai` for CST time, `Local` for local time")
cfg.FlagSet.StringVar(&cfg.State, "state", "", "set node's state, can set to online, pausing, paused, closing or offline.")
cfg.FlagSet.BoolVar(&cfg.ShowOfflineNodes, "show-offline-nodes", false, "include offline nodes when querying pumps/drainers")
cfg.FlagSet.StringVar(&cfg.Text, "text", "", "text to be encrypt when using encrypt command")
cfg.FlagSet.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")

return cfg
Expand Down
18 changes: 18 additions & 0 deletions binlogctl/encrypt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package binlogctl

import (
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/encrypt"
"go.uber.org/zap"
)

// EncryptHandler log the encrypted text if success or return error.
func EncryptHandler(text string) error {
enc, err := encrypt.Encrypt(text)
if err != nil {
return err
}

log.Info("encrypt text", zap.String("encrypted", string(enc)))
return nil
}
6 changes: 6 additions & 0 deletions cmd/binlogctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func main() {
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close)
case ctl.OfflineDrainer:
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close)
case ctl.Encrypt:
if len(cfg.Text) == 0 {
err = errors.New("need to specify the text to be encrypt")
} else {
err = ctl.EncryptHandler(cfg.Text)
}
default:
err = errors.NotSupportedf("cmd %s", cfg.Command)
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ compressor = ""
# number of binlog events in a transaction batch
txn-batch = 20

# sync ddl to downstream db or not
sync-ddl = true

# This variable works in dual-a. if it is false, the upstream data will all be synchronized to the downstream, except for the filtered table.
# If it is true, the channel value is set at the same time, and the upstream starts with the mark table ID updated, and the channel ID is the same as its channel ID.
# this part of data will not be synchronized to the downstream. Therefore, in dual-a scenario,both sides Channel id also needs to be set to the same value
loopback-control = false

# When loopback control is turned on, the channel ID will work.
# In the dual-a scenario, the channel ID synchronized from the downstream to the upstream and the channel ID synchronized from
# the upstream to the downstream need to be set to the same value to avoid loopback synchronization
channel-id = 1

# work count to execute binlogs
# if the latency between drainer and downstream(mysql or tidb) are too high, you might want to increase this
# to get higher throughput by higher concurrent write to the downstream
Expand Down Expand Up @@ -81,7 +94,13 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
host = "127.0.0.1"
user = "root"
password = ""
# if encrypted_password is not empty, password will be ignored.
encrypted_password = ""
port = 3306
# 1: SyncFullColumn, 2: SyncPartialColumn
# when setting SyncPartialColumn drainer will allow the downstream schema
# having more or less column numbers and relax sql mode by removing STRICT_TRANS_TABLES.
# sync-mode = 1

[syncer.to.checkpoint]
# only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved.
Expand All @@ -93,6 +112,8 @@ port = 3306
# schema = "tidb_binlog"
# host = "127.0.0.1"
# user = "root"
# if encrypted_password is not empty, password will be ignored.
# encrypted_password = ""
# password = ""
# port = 3306

Expand Down
26 changes: 25 additions & 1 deletion drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.uber.org/zap"

dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/encrypt"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
Expand Down Expand Up @@ -65,6 +66,9 @@ type SyncerConfig struct {
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
LoopbackControl bool `toml:"loopback-control" json:"loopback-control"`
SyncDDL bool `toml:"sync-ddl" json:"sync-ddl"`
ChannelID int64 `toml:"channel-id" json:"channel-id"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
Expand Down Expand Up @@ -126,6 +130,9 @@ func NewConfig() *Config {
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1")
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)")
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.BoolVar(&cfg.SyncerCfg.LoopbackControl, "loopback-control", false, "set mark or not ")
fs.BoolVar(&cfg.SyncerCfg.SyncDDL, "sync-ddl", true, "sync ddl or not")
fs.Int64Var(&cfg.SyncerCfg.ChannelID, "channel-id", 0, "sync channel id ")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count")
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml")
Expand Down Expand Up @@ -374,11 +381,28 @@ func (cfg *Config) adjustConfig() error {
}
cfg.SyncerCfg.To.User = user
}
if len(cfg.SyncerCfg.To.Password) == 0 {

if len(cfg.SyncerCfg.To.EncryptedPassword) > 0 {
decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword)
if err != nil {
return errors.Annotate(err, "failed to decrypt password in `to.encrypted_password`")
}

cfg.SyncerCfg.To.Password = decrypt
} else if len(cfg.SyncerCfg.To.Password) == 0 {
cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD")
}
}

if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 {
decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword)
if err != nil {
return errors.Annotate(err, "failed to decrypt password in `checkpoint.encrypted_password`")
}

cfg.SyncerCfg.To.Checkpoint.Password = decrypt
}

cfg.SyncerCfg.adjustWorkCount()
cfg.SyncerCfg.adjustDoDBAndTable()

Expand Down
30 changes: 30 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/check"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/encrypt"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/util"
pkgzk "github.com/pingcap/tidb-binlog/pkg/zk"
Expand Down Expand Up @@ -151,13 +153,41 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249))
c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr)

// test EncryptedPassword
cfg = NewConfig()
cfg.ListenAddr = "0.0.0.0:8257"
cfg.AdvertiseAddr = "192.168.15.12:8257"
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257")
c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257")

cfg = NewConfig()
encrypted, err := encrypt.Encrypt("origin")
c.Assert(err, IsNil)

cfg.SyncerCfg.To = &dsync.DBConfig{
EncryptedPassword: string(encrypted),
Checkpoint: dsync.CheckpointConfig{
EncryptedPassword: string(encrypted),
},
}
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin")
c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin")

// test false positive
cfg.SyncerCfg.To = &dsync.DBConfig{
EncryptedPassword: "what ever" + string(encrypted),
Checkpoint: dsync.CheckpointConfig{
EncryptedPassword: "what ever" + string(encrypted),
},
}

c.Logf("to.password: %v", cfg.SyncerCfg.To.Password)
err = cfg.adjustConfig()
c.Assert(err, NotNil)
}

func (t *testDrainerSuite) TestConfigParsingFileWithInvalidOptions(c *C) {
Expand Down
42 changes: 42 additions & 0 deletions drainer/loopbacksync/loopbacksync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package loopbacksync

const (
//MarkTableName mark table name
MarkTableName = "retl._drainer_repl_mark"
//ChannelID channel id
ChannelID = "channel_id"
//Val val
Val = "val"
//ChannelInfo channel info
ChannelInfo = "channel_info"
)

//LoopBackSync loopback sync info
type LoopBackSync struct {
ChannelID int64
LoopbackControl bool
SyncDDL bool
}

//NewLoopBackSyncInfo return LoopBackSyncInfo objec
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync {
l := &LoopBackSync{
ChannelID: ChannelID,
LoopbackControl: LoopbackControl,
SyncDDL: SyncDDL,
}
return l
}
27 changes: 27 additions & 0 deletions drainer/loopbacksync/loopbacksync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package loopbacksync

import "testing"

//TestNewLoopBackSyncInfo test loopBackSyncInfo alloc
func TestNewLoopBackSyncInfo(t *testing.T) {
var ChannelID int64 = 1
var LoopbackControl = true
var SyncDDL = false
l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL)
if l == nil {
t.Error("alloc loopBackSyncInfo objec failed ")
}
}
54 changes: 50 additions & 4 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ package sync

import (
"database/sql"
"strings"
"sync"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/translator"
Expand All @@ -38,21 +41,42 @@ type MysqlSyncer struct {
var createDB = loader.CreateDBWithSQLMode

// NewMysqlSyncer returns a instance of MysqlSyncer
func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string) (*MysqlSyncer, error) {
func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, info *loopbacksync.LoopBackSync) (*MysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode)
if err != nil {
return nil, errors.Trace(err)
}

var opts []loader.Option
opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"))
opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"), loader.SetloopBackSyncInfo(info))
if queryHistogramVec != nil {
opts = append(opts, loader.Metrics(&loader.MetricsGroup{
QueryHistogramVec: queryHistogramVec,
EventCounterVec: nil,
}))
}

if cfg.SyncMode != 0 {
mode := loader.SyncMode(cfg.SyncMode)
opts = append(opts, loader.SyncModeOption(mode))

if mode == loader.SyncPartialColumn {
var oldMode, newMode string
oldMode, newMode, err = relaxSQLMode(db)
if err != nil {
return nil, errors.Trace(err)
}

if newMode != oldMode {
db.Close()
db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode)
if err != nil {
return nil, errors.Trace(err)
}
}
}
}

loader, err := loader.NewLoader(db, opts...)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -69,18 +93,40 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w
return s, nil
}

// set newMode as the oldMode query from db by removing "STRICT_TRANS_TABLES".
func relaxSQLMode(db *sql.DB) (oldMode string, newMode string, err error) {
row := db.QueryRow("SELECT @@SESSION.sql_mode;")
err = row.Scan(&oldMode)
if err != nil {
return "", "", errors.Trace(err)
}

toRemove := "STRICT_TRANS_TABLES"
newMode = oldMode

if !strings.Contains(oldMode, toRemove) {
return
}

// concatenated by "," like: mode1,mode2
newMode = strings.Replace(newMode, toRemove+",", "", -1)
newMode = strings.Replace(newMode, ","+toRemove, "", -1)
newMode = strings.Replace(newMode, toRemove, "", -1)

return
}

// SetSafeMode make the MysqlSyncer to use safe mode or not
func (m *MysqlSyncer) SetSafeMode(mode bool) {
m.loader.SetSafeMode(mode)
}

// Sync implements Syncer interface
func (m *MysqlSyncer) Sync(item *Item) error {
txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue)
txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue, item.ShouldSkip)
if err != nil {
return errors.Trace(err)
}

txn.Metadata = item

select {
Expand Down
Loading

0 comments on commit 4603c5d

Please sign in to comment.