From 07ba12378a5ff7b6e23a7ecd729595d417f45f7e Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 5 Sep 2019 20:45:46 +0800 Subject: [PATCH] fix pump pull pop bug while closing pump (#745) --- pump/server.go | 7 ++++++- pump/server_test.go | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pump/server.go b/pump/server.go index 8e628c34e..464b3fd7a 100644 --- a/pump/server.go +++ b/pump/server.go @@ -83,6 +83,7 @@ type Server struct { wg sync.WaitGroup gcDuration time.Duration triggerGC chan time.Time + pullClose chan struct{} metrics *util.MetricClient // save the last time we write binlog to Storage // if long time not write, we can write a fake binlog @@ -181,6 +182,7 @@ func NewServer(cfg *Config) (*Server, error) { pdCli: pdCli, cfg: cfg, triggerGC: make(chan time.Time), + pullClose: make(chan struct{}), }, nil } @@ -278,12 +280,14 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi log.Error("drainer request a purged binlog TS, some binlog events may be loss", zap.Int64("gc TS", gcTS), zap.Reflect("request", in)) } - ctx, cancel := context.WithCancel(s.ctx) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() binlogs := s.storage.PullCommitBinlog(ctx, last) for { select { + case <-s.pullClose: + return nil case data, ok := <-binlogs: if !ok { return nil @@ -857,6 +861,7 @@ func (s *Server) Close() { s.commitStatus() log.Info("commit status done") + close(s.pullClose) // stop the gRPC server s.gs.GracefulStop() log.Info("grpc is stopped") diff --git a/pump/server_test.go b/pump/server_test.go index 82012a4a6..54257ac8e 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -733,7 +733,8 @@ func (s *startServerSuite) TestStartPumpServer(c *C) { gcDuration: time.Duration(cfg.GC) * 24 * time.Hour, pdCli: nil, cfg: cfg, - triggerGC: make(chan time.Time)} + triggerGC: make(chan time.Time), + pullClose: make(chan struct{})} defer func() { close(sig) p.Close()