Skip to content

Commit

Permalink
tso: fix memory leak introduced by timer.After tikv#6730
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 17, 2024
1 parent b14f742 commit 60e81ba
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
43 changes: 29 additions & 14 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/timerutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *TSODispatcher) DispatchRequest(
val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests))
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
tsDeadlineCh := make(chan *TSDeadline, 1)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, updateServicePrimaryAddrChs...)
go watchTSDeadline(ctx, tsDeadlineCh)
}
Expand All @@ -81,7 +82,7 @@ func (s *TSODispatcher) dispatch(
forwardedHost string,
clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request,
tsDeadlineCh chan<- deadline,
tsDeadlineCh chan<- *TSDeadline,
doneCh <-chan struct{},
errCh chan<- error,
updateServicePrimaryAddrChs ...chan<- struct{}) {
Expand Down Expand Up @@ -121,11 +122,7 @@ func (s *TSODispatcher) dispatch(
requests[i] = <-tsoRequestCh
}
done := make(chan struct{})
dl := deadline{
timer: time.After(DefaultTSOProxyTimeout),
done: done,
cancel: cancel,
}
dl := NewTSDeadline(DefaultTSOProxyTimeout, done, cancel)
select {
case tsDeadlineCh <- dl:
case <-dispatcherCtx.Done():
Expand Down Expand Up @@ -204,27 +201,44 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical
return nil
}

type deadline struct {
timer <-chan time.Time
// TSDeadline is used to watch the deadline of each tso request.
type TSDeadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) {
// NewTSDeadline creates a new TSDeadline.
func NewTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *TSDeadline {
timer := timerutil.GlobalTimerPool.Get(timeout)
return &TSDeadline{
timer: timer,
done: done,
cancel: cancel,
}
}

func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {
defer logutil.LogPanic()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
case <-d.timer.C:
log.Error("tso proxy request processing is canceled due to timeout",
errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
continue
timerutil.GlobalTimerPool.Put(d.timer)
case <-ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-ctx.Done():
Expand All @@ -235,11 +249,12 @@ func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) {

func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
defer logutil.LogPanic()

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(3 * time.Second):
case <-timer.C:
cancel()
case <-streamCtx.Done():
}
Expand Down
4 changes: 3 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,10 +1761,12 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient
// TODO: If goroutine here timeout when tso stream created successfully, we need to handle it correctly.
func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
defer logutil.LogPanic()
timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(3 * time.Second):
case <-timer.C:
cancel()
case <-streamCtx.Done():
}
Expand Down

0 comments on commit 60e81ba

Please sign in to comment.