diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index c94c87906..1fe6175e5 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -13,6 +13,9 @@ data-dir = "data.drainer" # a comma separated list of PD endpoints pd-urls = "http://127.0.0.1:2379" +# Use the specified compressor to compress payload between pump and drainer +compressor = "" + #[security] # Path of file that contains list of trusted SSL CAs for connection with cluster components. # ssl-ca = "/path/to/ca.pem" diff --git a/drainer/config.go b/drainer/config.go index 9f963354e..6504a3b58 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -40,6 +40,7 @@ const ( var ( maxBinlogItemCount int defaultBinlogItemCount = 16 << 12 + supportedCompressors = [...]string{"gzip"} ) // SyncerConfig is the Syncer's configuration. @@ -73,6 +74,7 @@ type Config struct { SyncerCfg *SyncerConfig `toml:"syncer" json:"sycner"` Security security.Config `toml:"security" json:"security"` SyncedCheckTime int `toml:"synced-check-time" json:"synced-check-time"` + Compressor string `toml:"compressor" json:"compressor"` EtcdTimeout time.Duration MetricsAddr string MetricsInterval int @@ -106,6 +108,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.LogFile, "log-file", "", "log file path") fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day") fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint") + fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)") fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") @@ -245,6 +248,20 @@ func (cfg *Config) validate() error { } } + if cfg.Compressor != "" { + found := false + for _, c := range supportedCompressors { + if cfg.Compressor == c { + found = true + break + } + } + if !found { + return errors.Errorf( + "Invalid compressor: %v, must be one of these: %v", cfg.Compressor, supportedCompressors) + } + } + return nil } diff --git a/drainer/pump.go b/drainer/pump.go index 45a925998..c353c462e 100644 --- a/drainer/pump.go +++ b/drainer/pump.go @@ -13,6 +13,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" + _ "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" ) @@ -177,7 +178,14 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error { p.grpcConn.Close() } - conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) + callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMsgSize)} + + if compressor, ok := ctx.Value(drainerKeyType("compressor")).(string); ok { + log.Infof("[pump %s] grpc compression enabled", p.nodeID) + callOpts = append(callOpts, grpc.UseCompressor(compressor)) + } + + conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(callOpts...)) if err != nil { log.Errorf("[pump %s] create grpc dial error %v", p.nodeID, err) p.pullCli = nil diff --git a/drainer/server.go b/drainer/server.go index 7b6169e4f..2a6a8f0aa 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -38,6 +38,8 @@ var ( pdReconnTimes = 30 ) +type drainerKeyType string + // Server implements the gRPC interface, // and maintains the runtime status type Server struct { @@ -86,6 +88,8 @@ func NewServer(cfg *Config) (*Server, error) { } ctx, cancel := context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, drainerKeyType("compressor"), cfg.Compressor) + clusterID = pdCli.GetClusterID(ctx) // update latestTS and latestTime latestTS, err := util.GetTSO(pdCli) diff --git a/pump/server.go b/pump/server.go index 347fa488f..496835fae 100644 --- a/pump/server.go +++ b/pump/server.go @@ -16,7 +16,7 @@ import ( "github.com/gorilla/mux" "github.com/ngaut/log" "github.com/pingcap/errors" - "github.com/pingcap/pd/client" + pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-binlog/pkg/flags" "github.com/pingcap/tidb-binlog/pkg/node" "github.com/pingcap/tidb-binlog/pkg/util" @@ -33,6 +33,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + _ "google.golang.org/grpc/encoding/gzip" ) var notifyDrainerTimeout = time.Second * 10