Skip to content

Commit

Permalink
add arbiter server
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Jan 10, 2019
1 parent d4f5cf7 commit dbe3d66
Show file tree
Hide file tree
Showing 21 changed files with 668 additions and 61 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
### Makefile for tidb-binlog
.PHONY: build test check update clean pump drainer fmt diff reparo integration_test
.PHONY: build test check update clean pump drainer fmt diff reparo integration_test arbiter

PROJECT=tidb-binlog

Expand Down Expand Up @@ -34,16 +34,19 @@ buildsucc:

all: dev install

dev: check test build
dev: check test

build: pump drainer reparo
build: pump drainer reparo arbiter

pump:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/pump cmd/pump/main.go

drainer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/drainer cmd/drainer/main.go

arbiter:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/arbiter cmd/arbiter/main.go

diff:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/diff cmd/diff/main.go

Expand Down
94 changes: 94 additions & 0 deletions arbiter/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package arbiter

import (
"database/sql"
"fmt"

"github.com/juju/errors"
)

const (
// StatusNormal is server quit normally, data <= ts is synced to downstream
StatusNormal int = 0
// StatusRunning is server running or quit abnormally, part of data may or may not been synced to downstream
StatusRunning int = 1
)

// Checkpoint to save the checkpoint
type Checkpoint struct {
database string
table string
db *sql.DB
topicName string
}

// NewCheckpoint creates a Checkpoint
func NewCheckpoint(db *sql.DB, topicName string) (cp *Checkpoint, err error) {
cp = &Checkpoint{
db: db,
database: "tidb_binlog",
table: "arbiter_checkpoint",
topicName: topicName,
}

err = cp.createSchemaIfNeed()
if err != nil {
return nil, errors.Trace(err)
}

return cp, nil
}

func (c *Checkpoint) createSchemaIfNeed() error {
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", c.database)
_, err := c.db.Exec(sql)
if err != nil {
return errors.Trace(err)
}

sql = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s(
topic_name VARCHAR(255) PRIMARY KEY, ts BIGINT, status INT)`,
c.database, c.table)
_, err = c.db.Exec(sql)
if err != nil {
return errors.Trace(err)
}

return nil
}

// Save saves the ts and status
func (c *Checkpoint) Save(ts int64, status int) error {
sql := fmt.Sprintf(`INSERT INTO %s.%s(topic_name, ts, status) VALUES(?,?,?)
ON DUPLICATE KEY UPDATE
ts = VALUES(ts), status = VALUES(status)`,
c.database, c.table)
_, err := c.db.Exec(sql, c.topicName, ts, status)
if err != nil {
return errors.Annotatef(err, "exec fail: '%s', args: %s %d, %d", sql, c.topicName, ts, status)
}

return nil
}

// Load return ts and status, if no record in checkpoint, return err = errors.NotFoundf
func (c *Checkpoint) Load() (ts int64, status int, err error) {
sql := fmt.Sprintf("SELECT ts, status FROM %s.%s WHERE topic_name = '%s'",
c.database, c.table, c.topicName)

rows, err := c.db.Query(sql)
if err != nil {
return 0, 0, errors.Annotatef(err, "exec fail: '%s'", sql)
}

if rows.Next() {
err := rows.Scan(&ts, &status)
if err != nil {
return 0, 0, errors.Trace(err)
}
} else {
return 0, 0, errors.NotFoundf("no checkpoint for: %s", c.topicName)
}

return
}
65 changes: 65 additions & 0 deletions arbiter/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package arbiter

import (
"fmt"
"testing"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/juju/errors"
check "github.com/pingcap/check"
)

func Test(t *testing.T) { check.TestingT(t) }

type CheckpointSuite struct {
}

var _ = check.Suite(&CheckpointSuite{})

func setNewExpect(mock sqlmock.Sqlmock) {
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS").WillReturnResult(sqlmock.NewResult(0, 1))
}

func (cs *CheckpointSuite) TestNewCheckpoint(c *check.C) {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)

setNewExpect(mock)

_, err = NewCheckpoint(db, "topic_name")
c.Assert(err, check.IsNil)

c.Assert(mock.ExpectationsWereMet(), check.IsNil)
}

func (cs *CheckpointSuite) TestSaveAndLoad(c *check.C) {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)

setNewExpect(mock)
cp, err := NewCheckpoint(db, "topic_name")
c.Assert(err, check.IsNil)

mock.ExpectQuery(fmt.Sprintf("SELECT (.+) FROM %s.%s WHERE topic_name = '%s'",
cp.database, cp.table, cp.topicName)).
WillReturnError(errors.NotFoundf("no checkpoint for: %s", cp.topicName))

_, _, err = cp.Load()
c.Log(err)
c.Assert(errors.IsNotFound(err), check.IsTrue)

mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(0, 1))
var saveTS int64 = 10
saveStatus := 1
err = cp.Save(saveTS, saveStatus)
c.Assert(err, check.IsNil)

rows := sqlmock.NewRows([]string{"ts", "status"}).
AddRow(saveTS, saveStatus)
mock.ExpectQuery("SELECT ts, status FROM").WillReturnRows(rows)
ts, status, err := cp.Load()
c.Assert(err, check.IsNil)
c.Assert(ts, check.Equals, saveTS)
c.Assert(status, check.Equals, saveStatus)
}
178 changes: 178 additions & 0 deletions arbiter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package arbiter

import (
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"os"

"github.com/BurntSushi/toml"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb-binlog/pkg/version"
)

const (
defaultKafkaAddrs = "127.0.0.1:9092"
defaultKafkaVersion = "0.8.2.0"
)

// Config is the configuration of Server
type Config struct {
*flag.FlagSet `json:"-"`
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
Security security.Config `toml:"security" json:"security"`

Up UpConfig `toml:"up" json:"up"`
Down DownConfig `toml:"down" json:"down"`

MetricsAddr string
MetricsInterval int
configFile string
printVersion bool
tls *tls.Config
}

// UpConfig is configuration of upstream
type UpConfig struct {
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`

InitialCommitTS int64 `toml:"initial-commit-ts" json:"initial-commit-ts"`
Topic string `toml:"topic" json:"topic"`
Offset int64 `toml:"offset" json:"offset"`
}

// DownConfig is configuration of downstream
type DownConfig struct {
Host string `toml:"host" json:"host"`
Port int `toml:"port" json:"port"`
User string `toml:"User" json:"User"`
Password string `toml:"password" json:"password"`

WorkerCount int `toml:"worker-count" json:"worker-count"`
BatchSize int `toml:"batch-size" json:"batch-size"`
}

// NewConfig return an instance of configuration
func NewConfig() *Config {
cfg := &Config{}
cfg.FlagSet = flag.NewFlagSet("arbiter", flag.ContinueOnError)
fs := cfg.FlagSet
fs.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of arbiter:")
fs.PrintDefaults()
}
fs.StringVar(&cfg.ListenAddr, "addr", "127.0.0.1:8251", "addr (i.e. 'host:port') to listen on for arbiter connections")
fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.configFile, "config", "", "path to the configuration file")
fs.BoolVar(&cfg.printVersion, "V", false, "print version info")
fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day")
fs.Int64Var(&cfg.Up.InitialCommitTS, "up.initial-commit-ts", 0, "if arbiter donesn't have checkpoint, use initial commitTS to initial checkpoint")
fs.StringVar(&cfg.Up.Topic, "up.topic", "test_kafka", "topic name of kafka")

fs.IntVar(&cfg.Down.WorkerCount, "down.worker-count", 16, "concurrency write to downstream")
fs.IntVar(&cfg.Down.BatchSize, "down.batch-size", 64, "batch size write to downstream")

return cfg
}

func (cfg *Config) String() string {
data, err := json.MarshalIndent(cfg, "\t", "\t")
if err != nil {
log.Error(err)
}

return string(data)
}

// Parse parses all config from command-line flags, environment vars or the configuration file
func (cfg *Config) Parse(args []string) error {
// parse first to get config file
perr := cfg.FlagSet.Parse(args)
switch perr {
case nil:
case flag.ErrHelp:
os.Exit(0)
default:
os.Exit(2)
}
if cfg.printVersion {
version.PrintVersionInfo()
os.Exit(0)
}

// load config file if specified
if cfg.configFile != "" {
if err := cfg.configFromFile(cfg.configFile); err != nil {
return errors.Trace(err)
}
}
// parse again to replace with command line options
cfg.FlagSet.Parse(args)
if len(cfg.FlagSet.Args()) > 0 {
return errors.Errorf("'%s' is not a valid flag", cfg.FlagSet.Arg(0))
}
// replace with environment vars
err := flags.SetFlagsFromEnv("BINLOG_SERVER", cfg.FlagSet)
if err != nil {
return errors.Trace(err)
}

cfg.tls, err = cfg.Security.ToTLSConfig()
if err != nil {
return errors.Errorf("tls config %+v error %v", cfg.Security, err)
}

if err = cfg.adjustConfig(); err != nil {
return errors.Trace(err)
}

return cfg.validate()
}

// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
if len(cfg.Up.Topic) == 0 {
return errors.Errorf("up.topic not config")
}

return nil
}

func (cfg *Config) adjustConfig() error {
// cfg.Up
if len(cfg.Up.KafkaAddrs) == 0 {
cfg.Up.KafkaAddrs = defaultKafkaAddrs
}
if len(cfg.Up.KafkaVersion) == 0 {
cfg.Up.KafkaVersion = defaultKafkaVersion
}

// cfg.Down
if len(cfg.Down.Host) == 0 {
cfg.Down.Host = "localhost"
}
if cfg.Down.Port == 0 {
cfg.Down.Port = 3306
}
if len(cfg.Down.User) == 0 {
cfg.Down.User = "root"
}

return nil
}

func (cfg *Config) configFromFile(path string) error {
_, err := toml.DecodeFile(path, cfg)
return errors.Trace(err)
}
Loading

0 comments on commit dbe3d66

Please sign in to comment.