Skip to content

Commit

Permalink
fix pump pull pop bug while closing pump (#745)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored and july2993 committed Sep 5, 2019
1 parent 640c1ff commit 07ba123
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
7 changes: 6 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Server struct {
wg sync.WaitGroup
gcDuration time.Duration
triggerGC chan time.Time
pullClose chan struct{}
metrics *util.MetricClient
// save the last time we write binlog to Storage
// if long time not write, we can write a fake binlog
Expand Down Expand Up @@ -181,6 +182,7 @@ func NewServer(cfg *Config) (*Server, error) {
pdCli: pdCli,
cfg: cfg,
triggerGC: make(chan time.Time),
pullClose: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -278,12 +280,14 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi
log.Error("drainer request a purged binlog TS, some binlog events may be loss", zap.Int64("gc TS", gcTS), zap.Reflect("request", in))
}

ctx, cancel := context.WithCancel(s.ctx)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
binlogs := s.storage.PullCommitBinlog(ctx, last)

for {
select {
case <-s.pullClose:
return nil
case data, ok := <-binlogs:
if !ok {
return nil
Expand Down Expand Up @@ -857,6 +861,7 @@ func (s *Server) Close() {
s.commitStatus()
log.Info("commit status done")

close(s.pullClose)
// stop the gRPC server
s.gs.GracefulStop()
log.Info("grpc is stopped")
Expand Down
3 changes: 2 additions & 1 deletion pump/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,8 @@ func (s *startServerSuite) TestStartPumpServer(c *C) {
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
pdCli: nil,
cfg: cfg,
triggerGC: make(chan time.Time)}
triggerGC: make(chan time.Time),
pullClose: make(chan struct{})}
defer func() {
close(sig)
p.Close()
Expand Down

0 comments on commit 07ba123

Please sign in to comment.