diff --git a/config/config.go b/config/config.go index 258c9a5fcaa64..7f19f634397a4 100644 --- a/config/config.go +++ b/config/config.go @@ -190,7 +190,7 @@ type PreparedPlanCache struct { // OpenTracing is the opentracing section of the config. type OpenTracing struct { - Enable bool `toml:"enable" json:"enbale"` + Enable bool `toml:"enable" json:"enable"` Sampler OpenTracingSampler `toml:"sampler" json:"sampler"` Reporter OpenTracingReporter `toml:"reporter" json:"reporter"` RPCMetrics bool `toml:"rpc-metrics" json:"rpc-metrics"` @@ -242,7 +242,7 @@ type TiKVClient struct { // Binlog is the config for binlog. type Binlog struct { - BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` + Enable bool `toml:"enable" json:"enable"` WriteTimeout string `toml:"write-timeout" json:"write-timeout"` // If IgnoreError is true, when writing binlog meets error, TiDB would // ignore the error. diff --git a/config/config.toml.example b/config/config.toml.example index d67efc5f1b9a0..9cf4682f80282 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -236,9 +236,8 @@ enabled = true capacity = 2048000 [binlog] - -# Socket file to write binlog. -binlog-socket = "" +# enable to write binlog. +enable = false # WriteTimeout specifies how long it will wait for writing binlog to pump. write-timeout = "15s" diff --git a/config/config_test.go b/config/config_test.go index 5631254a52028..88f60fec4b920 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -33,7 +33,7 @@ func TestT(t *testing.T) { func (s *testConfigSuite) TestConfig(c *C) { conf := new(Config) - conf.Binlog.BinlogSocket = "/tmp/socket" + conf.Binlog.Enable = true conf.Binlog.IgnoreError = true conf.TiKVClient.CommitTimeout = "10s" @@ -52,7 +52,7 @@ commit-timeout="41s"`) c.Assert(conf.Load(configFile), IsNil) // Test that the original value will not be clear by load the config file that does not contain the option. - c.Assert(conf.Binlog.BinlogSocket, Equals, "/tmp/socket") + c.Assert(conf.Binlog.Enable, Equals, true) c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s") c.Assert(f.Close(), IsNil) diff --git a/ddl/ddl.go b/ddl/ddl.go index fb88f2e457408..b54b4e976cfa9 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -227,7 +228,7 @@ type DDL interface { // GetTableMaxRowID gets the max row ID of a normal table or a partition. GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (int64, bool, error) // SetBinlogClient sets the binlog client for DDL worker. It's exported for testing. - SetBinlogClient(interface{}) + SetBinlogClient(*pumpcli.PumpsClient) } // ddl is used to handle the statements that define the structure or schema of the database. @@ -248,8 +249,8 @@ type ddlCtx struct { schemaSyncer SchemaSyncer ddlJobDoneCh chan struct{} ddlEventCh chan<- *util.Event - lease time.Duration // lease is schema lease. - binlogCli interface{} // binlogCli is used for Binlog. + lease time.Duration // lease is schema lease. + binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog. // hook may be modified. mu struct { @@ -329,7 +330,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, ddlJobDoneCh: make(chan struct{}, 1), ownerManager: manager, schemaSyncer: syncer, - binlogCli: binloginfo.GetPumpClient(), + binlogCli: binloginfo.GetPumpsClient(), } ddlCtx.mu.hook = hook ddlCtx.mu.interceptor = &BaseInterceptor{} @@ -539,7 +540,7 @@ func (d *ddl) callHookOnChanged(err error) error { } // SetBinlogClient implements DDL.SetBinlogClient interface. -func (d *ddl) SetBinlogClient(binlogCli interface{}) { +func (d *ddl) SetBinlogClient(binlogCli *pumpcli.PumpsClient) { d.binlogCli = binlogCli } diff --git a/session/session.go b/session/session.go index 1f72f78c6cffe..0fed9b1f910de 100644 --- a/session/session.go +++ b/session/session.go @@ -299,7 +299,7 @@ func (s *session) doCommit(ctx context.Context) error { Tp: binlog.BinlogType_Prewrite, PrewriteValue: prewriteData, }, - Client: s.sessionVars.BinlogClient.(binlog.PumpClient), + Client: s.sessionVars.BinlogClient, } s.txn.SetOption(kv.BinlogInfo, info) } @@ -1193,7 +1193,7 @@ func createSession(store kv.Storage) (*session, error) { domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s - s.sessionVars.BinlogClient = binloginfo.GetPumpClient() + s.sessionVars.BinlogClient = binloginfo.GetPumpsClient() s.txn.init() return s, nil } diff --git a/session/session_test.go b/session/session_test.go index 3d87505793f23..57aa93e0477c4 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mocktikv" @@ -122,7 +123,7 @@ func (s *testSessionSuite) TestForCoverage(c *C) { tk.MustExec("admin check table t") // Cover dirty table operations in StateTxn. - tk.Se.GetSessionVars().BinlogClient = &mockBinlogPump{} + tk.Se.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&mockBinlogPump{}) tk.MustExec("begin") tk.MustExec("truncate table t") tk.MustExec("insert t values ()") diff --git a/session/tidb.go b/session/tidb.go index 5595f52d82bc7..4f5fdd76ce428 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -38,8 +38,6 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" ) type domainMap struct { @@ -267,33 +265,6 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) { return s, errors.Trace(err) } -// DialPumpClientWithRetry tries to dial to binlogSocket, -// if any error happens, it will try to re-dial, -// or return this error when timeout. -func DialPumpClientWithRetry(binlogSocket string, maxRetries int, dialerOpt grpc.DialOption) (*grpc.ClientConn, error) { - var clientCon *grpc.ClientConn - err := util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) { - log.Infof("setup binlog client") - var err error - tlsConfig, err := config.GetGlobalConfig().Security.ToTLSConfig() - if err != nil { - log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err)) - } - - if tlsConfig != nil { - clientCon, err = grpc.Dial(binlogSocket, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), dialerOpt) - } else { - clientCon, err = grpc.Dial(binlogSocket, grpc.WithInsecure(), dialerOpt) - } - - if err != nil { - log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err)) - } - return true, errors.Trace(err) - }) - return clientCon, errors.Trace(err) -} - var queryStmtTable = []string{"explain", "select", "show", "execute", "describe", "desc", "admin"} func trimSQL(sql string) string { diff --git a/session/tidb_test.go b/session/tidb_test.go index 2a631c554baaf..38b3f693f1e80 100644 --- a/session/tidb_test.go +++ b/session/tidb_test.go @@ -27,13 +27,11 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/testleak" "github.com/pkg/errors" "golang.org/x/net/context" - "google.golang.org/grpc" ) func TestT(t *testing.T) { @@ -125,21 +123,6 @@ func (s *testMainSuite) TestRetryOpenStore(c *C) { c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second)) } -func (s *testMainSuite) TestRetryDialPumpClient(c *C) { - retryDialPumpClientMustFail := func(binlogSocket string, clientCon *grpc.ClientConn, maxRetries int, dialerOpt grpc.DialOption) (err error) { - return util.RunWithRetry(maxRetries, 10, func() (bool, error) { - // Assume that it'll always return an error. - return true, errors.New("must fail") - }) - } - begin := time.Now() - err := retryDialPumpClientMustFail("", nil, 3, nil) - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "must fail") - elapse := time.Since(begin) - c.Assert(uint64(elapse), GreaterEqual, uint64(6*10*time.Millisecond)) -} - func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) { store, dom := newStoreWithBootstrap(c, s.dbName+"goroutine_leak") defer dom.Close() diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index 4084d4b56354c..1a5c100695796 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -14,6 +14,7 @@ package binloginfo import ( + "io/ioutil" "regexp" "strings" "sync" @@ -21,55 +22,47 @@ import ( "time" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb-tools/tidb-binlog/node" + pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" binlog "github.com/pingcap/tipb/go-binlog" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" "google.golang.org/grpc" ) func init() { grpc.EnableTracing = false + // don't need output pumps client's log + pumpcli.Logger.Out = ioutil.Discard } -var binlogWriteTimeout = 15 * time.Second - -// pumpClient is the gRPC client to write binlog, it is opened on server start and never close, +// pumpsClient is the client to write binlog, it is opened on server start and never close, // shared by all sessions. -var pumpClient binlog.PumpClient -var pumpClientLock sync.RWMutex +var pumpsClient *pumpcli.PumpsClient +var pumpsClientLock sync.RWMutex // BinlogInfo contains binlog data and binlog client. type BinlogInfo struct { Data *binlog.Binlog - Client binlog.PumpClient + Client *pumpcli.PumpsClient } -// GetPumpClient gets the pump client instance. -func GetPumpClient() binlog.PumpClient { - pumpClientLock.RLock() - client := pumpClient - pumpClientLock.RUnlock() +// GetPumpsClient gets the pumps client instance. +func GetPumpsClient() *pumpcli.PumpsClient { + pumpsClientLock.RLock() + client := pumpsClient + pumpsClientLock.RUnlock() return client } -// SetPumpClient sets the pump client instance. -func SetPumpClient(client binlog.PumpClient) { - pumpClientLock.Lock() - pumpClient = client - pumpClientLock.Unlock() -} - -// SetGRPCTimeout sets grpc timeout for writing binlog. -func SetGRPCTimeout(timeout time.Duration) { - if timeout < 300*time.Millisecond { - log.Warnf("set binlog grpc timeout %s ignored, use default value %s", timeout, binlogWriteTimeout) - return // Avoid invalid value - } - binlogWriteTimeout = timeout +// SetPumpsClient sets the pumps client instance. +func SetPumpsClient(client *pumpcli.PumpsClient) { + pumpsClientLock.Lock() + pumpsClient = client + pumpsClientLock.Unlock() } // GetPrewriteValue gets binlog prewrite value in the context. @@ -111,50 +104,39 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error { return nil } - commitData, err := info.Data.Marshal() - if err != nil { - return errors.Trace(err) - } - req := &binlog.WriteBinlogReq{ClusterID: clusterID, Payload: commitData} - - // Retry many times because we may raise CRITICAL error here. - for i := 0; i < 20; i++ { - var resp *binlog.WriteBinlogResp - ctx, cancel := context.WithTimeout(context.Background(), binlogWriteTimeout) - resp, err = info.Client.WriteBinlog(ctx, req) - cancel() - if err == nil && resp.Errmsg != "" { - err = errors.New(resp.Errmsg) - } - if err == nil { - return nil - } - if strings.Contains(err.Error(), "received message larger than max") { - // This kind of error is not critical and not retryable, return directly. - return errors.Errorf("binlog data is too large (%s)", err.Error()) - } - log.Errorf("write binlog error %v", err) - time.Sleep(time.Second) + if info.Client == nil { + return errors.New("pumps client is nil") } + // it will retry in PumpsClient if write binlog fail. + err := info.Client.WriteBinlog(info.Data) if err != nil { + log.Errorf("write binlog fail %v", errors.ErrorStack(err)) if atomic.LoadUint32(&ignoreError) == 1 { - log.Errorf("critical error, write binlog fail but error ignored: %s", errors.ErrorStack(err)) + log.Error("write binlog fail but error ignored") metrics.CriticalErrorCounter.Add(1) // If error happens once, we'll stop writing binlog. atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1) return nil } + + if strings.Contains(err.Error(), "received message larger than max") { + // This kind of error is not critical, return directly. + return errors.Errorf("binlog data is too large (%s)", err.Error()) + } + + return terror.ErrCritical.GenWithStackByArgs(err) } - return terror.ErrCritical.GenWithStackByArgs(err) + return nil } // SetDDLBinlog sets DDL binlog in the kv.Transaction. -func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery string) { +func SetDDLBinlog(client *pumpcli.PumpsClient, txn kv.Transaction, jobID int64, ddlQuery string) { if client == nil { return } + ddlQuery = addSpecialComment(ddlQuery) info := &BinlogInfo{ Data: &binlog.Binlog{ @@ -162,7 +144,7 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery DdlJobId: jobID, DdlQuery: []byte(ddlQuery), }, - Client: client.(binlog.PumpClient), + Client: client, } txn.SetOption(kv.BinlogInfo, info) } @@ -182,3 +164,35 @@ func addSpecialComment(ddlQuery string) string { } return ddlQuery[:loc[0]] + specialPrefix + ddlQuery[loc[0]:loc[1]] + ` */` + ddlQuery[loc[1]:] } + +// MockPumpsClient creates a PumpsClient, used for test. +func MockPumpsClient(client binlog.PumpClient) *pumpcli.PumpsClient { + nodeID := "pump-1" + pump := &pumpcli.PumpStatus{ + Status: node.Status{ + NodeID: nodeID, + State: node.Online, + }, + IsAvaliable: true, + Client: client, + } + + pumpInfos := &pumpcli.PumpInfos{ + Pumps: make(map[string]*pumpcli.PumpStatus), + AvaliablePumps: make(map[string]*pumpcli.PumpStatus), + UnAvaliablePumps: make(map[string]*pumpcli.PumpStatus), + } + pumpInfos.Pumps[nodeID] = pump + pumpInfos.AvaliablePumps[nodeID] = pump + + pCli := &pumpcli.PumpsClient{ + ClusterID: 1, + Pumps: pumpInfos, + Selector: pumpcli.NewSelector(pumpcli.Range), + RetryTime: 1, + BinlogWriteTimeout: 15 * time.Second, + } + pCli.Selector.SetPumps([]*pumpcli.PumpStatus{pump}) + + return pCli +} diff --git a/sessionctx/binloginfo/binloginfo_test.go b/sessionctx/binloginfo/binloginfo_test.go index e6d87fc46b9c6..ca974637b4750 100644 --- a/sessionctx/binloginfo/binloginfo_test.go +++ b/sessionctx/binloginfo/binloginfo_test.go @@ -23,6 +23,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/terror" + pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -81,7 +82,7 @@ type testBinlogSuite struct { unixFile string serv *grpc.Server pump *mockBinlogPump - client binlog.PumpClient + client *pumpcli.PumpsClient ddl ddl.DDL } @@ -112,7 +113,7 @@ func (s *testBinlogSuite) SetUpSuite(c *C) { sessionDomain := domain.GetDomain(tk.Se.(sessionctx.Context)) s.ddl = sessionDomain.DDL() - s.client = binlog.NewPumpClient(clientCon) + s.client = binloginfo.MockPumpsClient(binlog.NewPumpClient(clientCon)) s.ddl.SetBinlogClient(s.client) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f08bb7ba40868..4df9277511a69 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/auth" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" @@ -238,7 +239,7 @@ type SessionVars struct { SnapshotInfoschema interface{} // BinlogClient is used to write binlog. - BinlogClient interface{} + BinlogClient *pumpcli.PumpsClient // GlobalVarsAccessor is used to set and get global variables. GlobalVarsAccessor GlobalVarAccessor diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index d4a1f640bb3ad..d90d4beb5ba0f 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -84,7 +84,7 @@ func (ts *testSuite) TestBasic(c *C) { c.Assert(autoid, Greater, int64(0)) ctx := ts.se - ctx.GetSessionVars().BinlogClient = binloginfo.GetPumpClient() + ctx.GetSessionVars().BinlogClient = binloginfo.GetPumpsClient() ctx.GetSessionVars().InRestrictedSQL = false rid, err := tb.AddRecord(ctx, types.MakeDatums(1, "abc"), false) c.Assert(err, IsNil) diff --git a/tidb-server/main.go b/tidb-server/main.go index 57300d60b9436..82fcd0120efa8 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -16,7 +16,6 @@ package main import ( "flag" "fmt" - "net" "os" "runtime" "strconv" @@ -26,6 +25,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/pd/client" + pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" @@ -41,18 +42,15 @@ import ( "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/gcworker" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/printer" "github.com/pingcap/tidb/util/signal" "github.com/pingcap/tidb/util/systimemon" "github.com/pingcap/tidb/x-server" - "github.com/pingcap/tipb/go-binlog" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" log "github.com/sirupsen/logrus" - "google.golang.org/grpc" ) // Flag Names @@ -66,7 +64,7 @@ const ( nmPort = "P" nmCors = "cors" nmSocket = "socket" - nmBinlogSocket = "binlog-socket" + nmEnableBinlog = "enable-binlog" nmRunDDL = "run-ddl" nmLogLevel = "L" nmLogFile = "log-file" @@ -94,7 +92,7 @@ var ( port = flag.String(nmPort, "4000", "tidb server port") cors = flag.String(nmCors, "", "tidb server allow cors origin") socket = flag.String(nmSocket, "", "The socket file to use for connection.") - binlogSocket = flag.String(nmBinlogSocket, "", "socket file to write binlog") + enableBinlog = flagBoolean(nmEnableBinlog, false, "enable generate binlog") runDDL = flagBoolean(nmRunDDL, true, "run ddl worker on this tidb-server") ddlLease = flag.String(nmDdlLease, "45s", "schema lease duration, very dangerous to change only if you know what you do") tokenLimit = flag.Int(nmTokenLimit, 1000, "the limit of concurrent executed sessions") @@ -172,20 +170,23 @@ func createStoreAndDomain() { } func setupBinlogClient() { - if cfg.Binlog.BinlogSocket == "" { + if !cfg.Binlog.Enable { return } - dialerOpt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("unix", addr, timeout) - }) - clientConn, err := session.DialPumpClientWithRetry(cfg.Binlog.BinlogSocket, util.DefaultMaxRetries, dialerOpt) - terror.MustNil(err) + if cfg.Binlog.IgnoreError { binloginfo.SetIgnoreError(true) } - binloginfo.SetGRPCTimeout(parseDuration(cfg.Binlog.WriteTimeout)) - binloginfo.SetPumpClient(binlog.NewPumpClient(clientConn)) - log.Infof("created binlog client at %s, ignore error %v", cfg.Binlog.BinlogSocket, cfg.Binlog.IgnoreError) + + client, err := pumpcli.NewPumpsClient(cfg.Path, parseDuration(cfg.Binlog.WriteTimeout), pd.SecurityOption{ + CAPath: cfg.Security.ClusterSSLCA, + CertPath: cfg.Security.ClusterSSLCert, + KeyPath: cfg.Security.ClusterSSLKey, + }) + terror.MustNil(err) + + binloginfo.SetPumpsClient(client) + log.Infof("create pumps client success, ignore binlog error %v", cfg.Binlog.IgnoreError) } // Prometheus push. @@ -293,8 +294,8 @@ func overrideConfig() { if actualFlags[nmSocket] { cfg.Socket = *socket } - if actualFlags[nmBinlogSocket] { - cfg.Binlog.BinlogSocket = *binlogSocket + if actualFlags[nmEnableBinlog] { + cfg.Binlog.Enable = *enableBinlog } if actualFlags[nmRunDDL] { cfg.RunDDL = *runDDL