Skip to content

Commit

Permalink
util+plugins: Fix memory leak with explicit timer cancellation.
Browse files Browse the repository at this point in the history
This commit adds a utility for explicitly creating cancelable timers,
to avoid a possible memory leak caused by timer.After. This issue is
fixed in Go 1.23, but since we're still on Go 1.21, this will resolve
the possibility of leaks in the mean time.

Signed-off-by: Philip Conrad <[email protected]>
  • Loading branch information
philipaconrad committed Oct 2, 2024
1 parent 69cd388 commit 910f252
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 8 deletions.
4 changes: 3 additions & 1 deletion download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,16 @@ func (d *Downloader) loop(ctx context.Context) {

d.logger.Debug("Waiting %v before next download/retry.", delay)

timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(delay):
case <-timer.C:
if err != nil {
retry++
} else {
retry = 0
}
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return
}
}
Expand Down
4 changes: 3 additions & 1 deletion download/oci_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,16 @@ func (d *OCIDownloader) loop(ctx context.Context) {

d.logger.Debug("OCI - Waiting %v before next download/retry.", delay)

timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(delay):
case <-timer.C:
if err != nil {
retry++
} else {
retry = 0
}
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/wasm/sdk/opa/loader/file/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/wasm/sdk/opa/errors"
"github.com/open-policy-agent/opa/util"

"github.com/open-policy-agent/opa/internal/wasm/sdk/opa"
)
Expand Down Expand Up @@ -156,9 +157,11 @@ func (l *Loader) poller() {
l.logError(err)
}

timer, timerCancel := util.TimerWithCancel(l.interval)
select {
case <-time.After(l.interval):
case <-timer.C:
case <-l.closing:
timerCancel() // explicitly cancel the timer.
return
}
}
Expand Down
11 changes: 9 additions & 2 deletions internal/wasm/sdk/opa/loader/http/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/wasm/sdk/opa"
"github.com/open-policy-agent/opa/internal/wasm/sdk/opa/errors"
"github.com/open-policy-agent/opa/util"
)

const (
Expand Down Expand Up @@ -139,9 +140,12 @@ func (l *Loader) poller() {
break
}

delay := time.Duration(float64((l.maxDelay-l.minDelay))*rand.Float64()) + l.minDelay
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(time.Duration(float64((l.maxDelay-l.minDelay))*rand.Float64()) + l.minDelay):
case <-timer.C:
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return
}
}
Expand All @@ -160,9 +164,12 @@ func (l *Loader) download(ctx context.Context) error {
break
}

delay := defaultBackoff(float64(MinRetryDelay), float64(l.maxDelay), retry)
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(defaultBackoff(float64(MinRetryDelay), float64(l.maxDelay), retry)):
case <-timer.C:
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return context.Canceled
}
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,15 +802,17 @@ func (p *Plugin) loop() {

waitC = make(chan struct{})
go func() {
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(delay):
case <-timer.C:
if err != nil {
retry++
} else {
retry = 0
}
close(waitC)
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
}
}()
}
Expand Down
7 changes: 5 additions & 2 deletions topdown/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func createHTTPRequest(bctx BuiltinContext, obj ast.Object) (*http.Request, *htt
var tlsConfig tls.Config
var customHeaders map[string]interface{}
var tlsInsecureSkipVerify bool
var timeout = defaultHTTPRequestTimeout
timeout := defaultHTTPRequestTimeout

for _, val := range obj.Keys() {
key, err := ast.JSON(val.Value)
Expand Down Expand Up @@ -736,9 +736,12 @@ func executeHTTPRequest(req *http.Request, client *http.Client, inputReqObj ast.
return nil, err
}

delay := util.DefaultBackoff(float64(minRetryDelay), float64(maxRetryDelay), i)
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(util.DefaultBackoff(float64(minRetryDelay), float64(maxRetryDelay), i)):
case <-timer.C:
case <-req.Context().Done():
timerCancel() // explicitly cancel the timer.
return nil, context.Canceled
}
}
Expand Down
48 changes: 48 additions & 0 deletions util/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package util

import "time"

// TimerWithCancel exists because of memory leaks when using
// time.After in select statements. Instead, we now manually create timers,
// wait on them, and manually free them.
//
// See this for more details:
// https://www.arangodb.com/2020/09/a-story-of-a-memory-leak-in-go-how-to-properly-use-time-after/
//
// Note: This issue is fixed in Go 1.23, but this fix helps us until then.
//
// Warning: the cancel cannot be done concurrent to reading, everything should
// work in the same goroutine.
//
// Example:
//
// for retries := 0; true; retries++ {
//
// ...main logic...
//
// timer, cancel := utils.TimerWithCancel(utils.Backoff(retries))
// select {
// case <-ctx.Done():
// cancel()
// return ctx.Err()
// case <-timer.C:
// continue
// }
// }
func TimerWithCancel(delay time.Duration) (*time.Timer, func()) {
timer := time.NewTimer(delay)

return timer, func() {
// Note: The Stop function returns:
// - true: if the timer is active. (no draining required)
// - false: if the timer was already stopped or fired/expired.
// In this case the channel should be drained to prevent memory
// leaks only if it is not empty.
// This operation is safe only if the cancel function is
// used in same goroutine. Concurrent reading or canceling may
// cause deadlock.
if !timer.Stop() && len(timer.C) > 0 {
<-timer.C
}
}
}

0 comments on commit 910f252

Please sign in to comment.