Skip to content

Commit e4b246d

Browse files
committed
fix: add contexts to clean up fsnotify goroutines
Signed-off-by: Evan Baker <[email protected]>
1 parent 9eb2891 commit e4b246d

File tree

4 files changed

+85
-50
lines changed

4 files changed

+85
-50
lines changed

cns/fsnotify/fsnotify.go

+34-44
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package fsnotify
33
import (
44
"context"
55
"io"
6+
"io/fs"
67
"os"
78
"sync"
89
"time"
@@ -11,6 +12,7 @@ import (
1112
"github.com/fsnotify/fsnotify"
1213
"github.com/pkg/errors"
1314
"go.uber.org/zap"
15+
"golang.org/x/sync/errgroup"
1416
)
1517

1618
type releaseIPsClient interface {
@@ -27,17 +29,18 @@ type watcher struct {
2729
}
2830

2931
// Create the AsyncDelete watcher.
30-
func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint
32+
func New(cli releaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint
3133
// Add directory where intended deletes are kept
32-
if err := os.Mkdir(path, 0o755); err != nil { //nolint
34+
if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint
3335
logger.Error("error making directory", zap.String("path", path), zap.Error(err))
36+
return nil, errors.Wrapf(err, "failed to create dir %s", path)
3437
}
3538
return &watcher{
3639
cli: cli,
3740
path: path,
3841
log: logger,
3942
pendingDelete: make(map[string]struct{}),
40-
}
43+
}, nil
4144
}
4245

4346
// releaseAll locks and iterates the pendingDeletes map and calls CNS to
@@ -114,7 +117,25 @@ func (w *watcher) watchFS(ctx context.Context) error {
114117
err = watcher.Add(w.path)
115118
if err != nil {
116119
w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err))
120+
return errors.Wrap(err, "failed to add path to fsnotify watcher")
121+
}
122+
// List the directory and creates synthetic events for any existing items.
123+
w.log.Info("listing directory", zap.String("path", w.path))
124+
dirContents, err := os.ReadDir(w.path)
125+
if err != nil {
126+
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
127+
return errors.Wrapf(err, "failed to read %s", w.path)
128+
}
129+
if len(dirContents) == 0 {
130+
w.log.Info("no missed deletes found")
117131
}
132+
w.lock.Lock()
133+
for _, file := range dirContents {
134+
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
135+
w.pendingDelete[file.Name()] = struct{}{}
136+
}
137+
w.lock.Unlock()
138+
118139
// Start listening for events.
119140
w.log.Info("listening for events from fsnotify watcher")
120141
for {
@@ -139,51 +160,20 @@ func (w *watcher) watchFS(ctx context.Context) error {
139160
}
140161
}
141162

142-
// readFS lists the directory and enqueues any missed deletes that are already
143-
// present on-disk.
144-
func (w *watcher) readFS() error {
145-
w.log.Info("listing directory", zap.String("path", w.path))
146-
dirContents, err := os.ReadDir(w.path)
147-
if err != nil {
148-
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
149-
return errors.Wrapf(err, "failed to read %s", w.path)
150-
}
151-
if len(dirContents) == 0 {
152-
w.log.Info("no missed deletes found")
153-
return nil
154-
}
155-
w.lock.Lock()
156-
for _, file := range dirContents {
157-
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
158-
w.pendingDelete[file.Name()] = struct{}{}
159-
}
160-
w.lock.Unlock()
161-
return nil
162-
}
163-
164-
// WatchFS starts the filesystem watcher to handle async Pod deletes.
163+
// Start starts the filesystem watcher to handle async Pod deletes.
165164
// Blocks until the context is closed; returns underlying fsnotify errors
166165
// if something goes fatally wrong.
167-
func (w *watcher) Start(ctx context.Context) error {
168-
errs := make(chan error)
166+
func (w *watcher) Start(c context.Context) error {
167+
ctx, cancel := context.WithCancel(c)
168+
defer cancel()
169+
g, groupCtx := errgroup.WithContext(ctx)
169170
// Start watching for enqueued missed deletes so that we process them as soon as they arrive.
170-
go func(errs chan<- error) {
171-
errs <- w.watchPendingDelete(ctx)
172-
}(errs)
173-
171+
g.Go(func() error { return w.watchPendingDelete(groupCtx) })
174172
// Start watching for changes to the filesystem so that we don't miss any async deletes.
175-
go func(errs chan<- error) {
176-
errs <- w.watchFS(ctx)
177-
}(errs)
178-
179-
// Read the directory to enqueue any missed deletes that are already present on-disk.
180-
if err := w.readFS(); err != nil {
181-
return err
182-
}
183-
184-
// block until one of the goroutines returns an error
185-
err := <-errs
186-
return err
173+
g.Go(func() error { return w.watchFS(groupCtx) })
174+
// the first error from the errgroup will trigger context cancellation for other goroutines in the group.
175+
// this will block until all goroutines complete and return the first error.
176+
return g.Wait() //nolint:wrapcheck // ignore
187177
}
188178

189179
// AddFile creates new file using the containerID as name

cns/service/main.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ import (
5757
localtls "github.com/Azure/azure-container-networking/server/tls"
5858
"github.com/Azure/azure-container-networking/store"
5959
"github.com/Azure/azure-container-networking/telemetry"
60-
"github.com/avast/retry-go/v3"
60+
"github.com/avast/retry-go/v4"
6161
"github.com/pkg/errors"
6262
"go.uber.org/zap"
6363
corev1 "k8s.io/api/core/v1"
@@ -826,15 +826,19 @@ func main() {
826826
z.Error("failed to create cnsclient", zap.Error(err))
827827
}
828828
go func() {
829-
for {
829+
_ = retry.Do(func() error {
830830
z.Info("starting fsnotify watcher to process missed Pod deletes")
831-
w := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z)
831+
w, err := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z)
832+
if err != nil {
833+
z.Error("failed to create fsnotify watcher", zap.Error(err))
834+
return errors.Wrap(err, "failed to create fsnotify watcher, will retry")
835+
}
832836
if err := w.Start(rootCtx); err != nil {
833837
z.Error("failed to start fsnotify watcher, will retry", zap.Error(err))
834-
time.Sleep(time.Minute)
835-
continue
838+
return errors.Wrap(err, "failed to start fsnotify watcher, will retry")
836839
}
837-
}
840+
return nil
841+
}, retry.DelayType(retry.BackOffDelay), retry.Attempts(0), retry.Context(rootCtx))
838842
}()
839843
}
840844

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ require (
141141
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
142142
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
143143
github.com/rootless-containers/rootlesskit v1.1.1 // indirect
144+
golang.org/x/sync v0.5.0 // indirect
144145
)
145146

146147
replace (

0 commit comments

Comments
 (0)