Skip to content

Commit 4f56d81

Browse files
authored
Merge pull request #11983 from hashicorp/b-select-after
cleanup: prevent leaks from time.After
2 parents 5a32783 + c1e033c commit 4f56d81

File tree

16 files changed

+144
-21
lines changed

16 files changed

+144
-21
lines changed

.changelog/11983.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
cleanup: prevent leaks from time.After
3+
```

api/allocations_exec.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ import (
1313
"github.com/gorilla/websocket"
1414
)
1515

16+
const (
17+
// heartbeatInterval is the amount of time to wait between sending heartbeats
18+
// during an exec streaming operation
19+
heartbeatInterval = 10 * time.Second
20+
)
21+
1622
type execSession struct {
1723
client *Client
1824
alloc *Allocation
@@ -177,15 +183,19 @@ func (s *execSession) startTransmit(ctx context.Context, conn *websocket.Conn) <
177183

178184
// send a heartbeat every 10 seconds
179185
go func() {
186+
t := time.NewTimer(heartbeatInterval)
187+
defer t.Stop()
188+
180189
for {
190+
t.Reset(heartbeatInterval)
191+
181192
select {
182193
case <-ctx.Done():
183194
return
184-
// heartbeat message
185-
case <-time.After(10 * time.Second):
195+
case <-t.C:
196+
// heartbeat message
186197
send(&execStreamingInputHeartbeat)
187198
}
188-
189199
}
190200
}()
191201

client/allocrunner/taskrunner/task_runner.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,9 @@ func (tr *TaskRunner) Run() {
527527
return
528528
}
529529

530+
timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT
531+
defer stop()
532+
530533
MAIN:
531534
for !tr.shouldShutdown() {
532535
select {
@@ -612,9 +615,11 @@ MAIN:
612615
break MAIN
613616
}
614617

618+
timer.Reset(restartDelay)
619+
615620
// Actually restart by sleeping and also watching for destroy events
616621
select {
617-
case <-time.After(restartDelay):
622+
case <-timer.C:
618623
case <-tr.killCtx.Done():
619624
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
620625
break MAIN

command/agent/consul/version_checker.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
log "github.com/hashicorp/go-hclog"
99
version "github.com/hashicorp/go-version"
10+
"github.com/hashicorp/nomad/helper"
1011
)
1112

1213
// checkConsulTLSSkipVerify logs if Consul does not support TLSSkipVerify on
@@ -20,6 +21,10 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
2021
defer close(done)
2122

2223
i := uint64(0)
24+
25+
timer, stop := helper.NewSafeTimer(limit)
26+
defer stop()
27+
2328
for {
2429
self, err := client.Self()
2530
if err == nil {
@@ -39,10 +44,12 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
3944
i++
4045
}
4146

47+
timer.Reset(backoff)
48+
4249
select {
4350
case <-ctx.Done():
4451
return
45-
case <-time.After(backoff):
52+
case <-timer.C:
4653
}
4754
}
4855
}

command/agent/monitor/monitor.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
log "github.com/hashicorp/go-hclog"
9+
"github.com/hashicorp/nomad/helper"
910
)
1011

1112
// Monitor provides a mechanism to stream logs using go-hclog
@@ -107,12 +108,17 @@ func (d *monitor) Start() <-chan []byte {
107108
// dropped messages and makes room on the logCh
108109
// to add a dropped message count warning
109110
go func() {
111+
timer, stop := helper.NewSafeTimer(d.droppedDuration)
112+
defer stop()
113+
110114
// loop and check for dropped messages
111115
for {
116+
timer.Reset(d.droppedDuration)
117+
112118
select {
113119
case <-d.doneCh:
114120
return
115-
case <-time.After(d.droppedDuration):
121+
case <-timer.C:
116122
d.Lock()
117123

118124
// Check if there have been any dropped messages.

drivers/docker/stats.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
docker "github.com/fsouza/go-dockerclient"
1111
cstructs "github.com/hashicorp/nomad/client/structs"
1212
"github.com/hashicorp/nomad/drivers/docker/util"
13+
"github.com/hashicorp/nomad/helper"
1314
nstructs "github.com/hashicorp/nomad/nomad/structs"
1415
)
1516

@@ -91,19 +92,27 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
9192
defer destCh.close()
9293

9394
// backoff and retry used if the docker stats API returns an error
94-
var backoff time.Duration
95+
var backoff time.Duration = 0
9596
var retry int
97+
98+
// create an interval timer
99+
timer, stop := helper.NewSafeTimer(backoff)
100+
defer stop()
101+
96102
// loops until doneCh is closed
97103
for {
104+
timer.Reset(backoff)
105+
98106
if backoff > 0 {
99107
select {
100-
case <-time.After(backoff):
108+
case <-timer.C:
101109
case <-ctx.Done():
102110
return
103111
case <-h.doneCh:
104112
return
105113
}
106114
}
115+
107116
// make a channel for docker stats structs and start a collector to
108117
// receive stats from docker and emit nomad stats
109118
// statsCh will always be closed by docker client.

drivers/shared/eventer/eventer.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
hclog "github.com/hashicorp/go-hclog"
9+
"github.com/hashicorp/nomad/helper"
910
"github.com/hashicorp/nomad/plugins/drivers"
1011
)
1112

@@ -62,14 +63,19 @@ func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer {
6263
// eventLoop is the main logic which pulls events from the channel and broadcasts
6364
// them to all consumers
6465
func (e *Eventer) eventLoop() {
66+
timer, stop := helper.NewSafeTimer(ConsumerGCInterval)
67+
defer stop()
68+
6569
for {
70+
timer.Reset(ConsumerGCInterval)
71+
6672
select {
6773
case <-e.ctx.Done():
6874
e.logger.Trace("task event loop shutdown")
6975
return
7076
case event := <-e.events:
7177
e.iterateConsumers(event)
72-
case <-time.After(ConsumerGCInterval):
78+
case <-timer.C:
7379
e.gcConsumers()
7480
}
7581
}

helper/funcs.go

+27
Original file line numberDiff line numberDiff line change
@@ -577,3 +577,30 @@ func PathEscapesSandbox(sandboxDir, path string) bool {
577577
}
578578
return false
579579
}
580+
581+
// StopFunc is used to stop a time.Timer created with NewSafeTimer
582+
type StopFunc func()
583+
584+
// NewSafeTimer creates a time.Timer but does not panic if duration is <= 0.
585+
//
586+
// Using a time.Timer is recommended instead of time.After when it is necessary
587+
// to avoid leaking goroutines (e.g. in a select inside a loop).
588+
//
589+
// Returns the time.Timer and also a StopFunc, forcing the caller to deal
590+
// with stopping the time.Timer to avoid leaking a goroutine.
591+
func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) {
592+
if duration <= 0 {
593+
// Avoid panic by using the smallest positive value. This is close enough
594+
// to the behavior of time.After(0), which this helper is intended to
595+
// replace.
596+
// https://go.dev/play/p/EIkm9MsPbHY
597+
duration = 1
598+
}
599+
600+
t := time.NewTimer(duration)
601+
cancel := func() {
602+
t.Stop()
603+
}
604+
605+
return t, cancel
606+
}

helper/funcs_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -431,3 +431,17 @@ func TestPathEscapesSandbox(t *testing.T) {
431431
})
432432
}
433433
}
434+
435+
func Test_NewSafeTimer(t *testing.T) {
436+
t.Run("zero", func(t *testing.T) {
437+
timer, stop := NewSafeTimer(0)
438+
defer stop()
439+
<-timer.C
440+
})
441+
442+
t.Run("positive", func(t *testing.T) {
443+
timer, stop := NewSafeTimer(1)
444+
defer stop()
445+
<-timer.C
446+
})
447+
}

nomad/blocked_evals.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -706,9 +706,14 @@ func (b *BlockedEvals) Stats() *BlockedStats {
706706

707707
// EmitStats is used to export metrics about the blocked eval tracker while enabled
708708
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
709+
timer, stop := helper.NewSafeTimer(period)
710+
defer stop()
711+
709712
for {
713+
timer.Reset(period)
714+
710715
select {
711-
case <-time.After(period):
716+
case <-timer.C:
712717
stats := b.Stats()
713718
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit))
714719
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked))

nomad/drainer/watch_jobs.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"sync"
7-
"time"
87

98
log "github.com/hashicorp/go-hclog"
109
memdb "github.com/hashicorp/go-memdb"
@@ -140,10 +139,17 @@ func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) {
140139

141140
// watch is the long lived watching routine that detects job drain changes.
142141
func (w *drainingJobWatcher) watch() {
142+
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
143+
defer stop()
144+
143145
waitIndex := uint64(1)
146+
144147
for {
148+
timer.Reset(stateReadErrorDelay)
149+
145150
w.logger.Trace("getting job allocs at index", "index", waitIndex)
146151
jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex)
152+
147153
if err != nil {
148154
if err == context.Canceled {
149155
// Determine if it is a cancel or a shutdown
@@ -164,7 +170,7 @@ func (w *drainingJobWatcher) watch() {
164170
case <-w.ctx.Done():
165171
w.logger.Trace("shutting down")
166172
return
167-
case <-time.After(stateReadErrorDelay):
173+
case <-timer.C:
168174
continue
169175
}
170176
}

nomad/drainer/watch_nodes.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package drainer
22

33
import (
44
"context"
5-
"time"
65

76
log "github.com/hashicorp/go-hclog"
87
memdb "github.com/hashicorp/go-memdb"
8+
"github.com/hashicorp/nomad/helper"
99

1010
"github.com/hashicorp/nomad/nomad/state"
1111
"github.com/hashicorp/nomad/nomad/structs"
@@ -148,8 +148,13 @@ func NewNodeDrainWatcher(ctx context.Context, limiter *rate.Limiter, state *stat
148148

149149
// watch is the long lived watching routine that detects node changes.
150150
func (w *nodeDrainWatcher) watch() {
151+
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
152+
defer stop()
153+
151154
nindex := uint64(1)
155+
152156
for {
157+
timer.Reset(stateReadErrorDelay)
153158
nodes, index, err := w.getNodes(nindex)
154159
if err != nil {
155160
if err == context.Canceled {
@@ -160,7 +165,7 @@ func (w *nodeDrainWatcher) watch() {
160165
select {
161166
case <-w.ctx.Done():
162167
return
163-
case <-time.After(stateReadErrorDelay):
168+
case <-timer.C:
164169
continue
165170
}
166171
}

nomad/eval_broker.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ package nomad
22

33
import (
44
"container/heap"
5+
"context"
56
"errors"
67
"fmt"
78
"math/rand"
89
"sync"
910
"time"
1011

11-
"context"
12-
1312
metrics "github.com/armon/go-metrics"
13+
"github.com/hashicorp/nomad/helper"
1414
"github.com/hashicorp/nomad/helper/uuid"
1515
"github.com/hashicorp/nomad/lib/delayheap"
1616
"github.com/hashicorp/nomad/nomad/structs"
@@ -835,9 +835,14 @@ func (b *EvalBroker) Stats() *BrokerStats {
835835

836836
// EmitStats is used to export metrics about the broker while enabled
837837
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
838+
timer, stop := helper.NewSafeTimer(period)
839+
defer stop()
840+
838841
for {
842+
timer.Reset(period)
843+
839844
select {
840-
case <-time.After(period):
845+
case <-timer.C:
841846
stats := b.Stats()
842847
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
843848
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))

nomad/plan_queue.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
metrics "github.com/armon/go-metrics"
10+
"github.com/hashicorp/nomad/helper"
1011
"github.com/hashicorp/nomad/nomad/structs"
1112
)
1213

@@ -196,12 +197,14 @@ func (q *PlanQueue) Stats() *QueueStats {
196197

197198
// EmitStats is used to export metrics about the broker while enabled
198199
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
200+
timer, stop := helper.NewSafeTimer(period)
201+
defer stop()
202+
199203
for {
200204
select {
201-
case <-time.After(period):
205+
case <-timer.C:
202206
stats := q.Stats()
203207
metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth))
204-
205208
case <-stopCh:
206209
return
207210
}

0 commit comments

Comments
 (0)