Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer,pump: Add support for enabling gzip grpc compression #495

Merged
merged 1 commit into from
Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ data-dir = "data.drainer"
# a comma separated list of PD endpoints
pd-urls = "http://127.0.0.1:2379"

# Use the specified compressor to compress payload between pump and drainer
compressor = ""

#[security]
# Path of file that contains list of trusted SSL CAs for connection with cluster components.
# ssl-ca = "/path/to/ca.pem"
Expand Down
17 changes: 17 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
var (
maxBinlogItemCount int
defaultBinlogItemCount = 16 << 12
supportedCompressors = [...]string{"gzip"}
)

// SyncerConfig is the Syncer's configuration.
Expand Down Expand Up @@ -68,6 +69,7 @@ type Config struct {
SyncerCfg *SyncerConfig `toml:"syncer" json:"sycner"`
Security security.Config `toml:"security" json:"security"`
SyncedCheckTime int `toml:"synced-check-time" json:"synced-check-time"`
Compressor string `toml:"compressor" json:"compressor"`
EtcdTimeout time.Duration
MetricsAddr string
MetricsInterval int
Expand Down Expand Up @@ -101,6 +103,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day")
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint")
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)")
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count")
Expand Down Expand Up @@ -233,6 +236,20 @@ func (cfg *Config) validate() error {
}
}

if cfg.Compressor != "" {
found := false
for _, c := range supportedCompressors {
if cfg.Compressor == c {
found = true
break
}
}
if !found {
return errors.Errorf(
"Invalid compressor: %v, must be one of these: %v", cfg.Compressor, supportedCompressors)
}
}

return nil
}

Expand Down
10 changes: 9 additions & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -177,7 +178,14 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
p.grpcConn.Close()
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMsgSize)}

if compressor, ok := ctx.Value(drainerKeyType("compressor")).(string); ok {
log.Infof("[pump %s] grpc compression enabled", p.nodeID)
callOpts = append(callOpts, grpc.UseCompressor(compressor))
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(callOpts...))
if err != nil {
log.Errorf("[pump %s] create grpc dial error %v", p.nodeID, err)
p.pullCli = nil
Expand Down
4 changes: 4 additions & 0 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
clusterID uint64
)

type drainerKeyType string

// Server implements the gRPC interface,
// and maintains the runtime status
type Server struct {
Expand Down Expand Up @@ -83,6 +85,8 @@ func NewServer(cfg *Config) (*Server, error) {
}

ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, drainerKeyType("compressor"), cfg.Compressor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better just pass by add param fro NewPump?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to get this piece of information from cfg in Server and pass it all the way down to Pump.
But it turned out I would also have to add a corresponding field in Collector and Pump because the part that may need to know compressor was not called when these structs were created.
So I guess using context.WithValue might be a little simpler, for example, when I changed enable_compression to compressor I only need to update two or three lines of code.


clusterID = pdCli.GetClusterID(ctx)
// update latestTS and latestTime
latestTS, err := util.GetTSO(pdCli)
Expand Down
3 changes: 2 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/gorilla/mux"
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/pd/client"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/util"
Expand All @@ -33,6 +33,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
)

var notifyDrainerTimeout = time.Second * 10
Expand Down