Skip to content

Commit

Permalink
fix: add contexts to clean up fsnotify goroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Evan Baker <[email protected]>
  • Loading branch information
rbtr authored Dec 6, 2023
1 parent 77db22a commit 2d8b331
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
22 changes: 16 additions & 6 deletions cns/fsnotify/fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fsnotify
import (
"context"
"io"
"io/fs"
"os"
"sync"
"time"
Expand All @@ -27,17 +28,18 @@ type watcher struct {
}

// Create the AsyncDelete watcher.
func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint
func New(cli releaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint
// Add directory where intended deletes are kept
if err := os.Mkdir(path, 0o755); err != nil { //nolint
if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint
logger.Error("error making directory", zap.String("path", path), zap.Error(err))
return nil, errors.Wrapf(err, "failed to create dir %s", path)
}
return &watcher{
cli: cli,
path: path,
log: logger,
pendingDelete: make(map[string]struct{}),
}
}, nil
}

// releaseAll locks and iterates the pendingDeletes map and calls CNS to
Expand Down Expand Up @@ -164,8 +166,12 @@ func (w *watcher) readFS() error {
// WatchFS starts the filesystem watcher to handle async Pod deletes.
// Blocks until the context is closed; returns underlying fsnotify errors
// if something goes fatally wrong.
func (w *watcher) Start(ctx context.Context) error {
func (w *watcher) Start(c context.Context) error {
ctx, cancel := context.WithCancel(c)
defer cancel()

errs := make(chan error)

// Start watching for enqueued missed deletes so that we process them as soon as they arrive.
go func(errs chan<- error) {
errs <- w.watchPendingDelete(ctx)
Expand All @@ -182,8 +188,12 @@ func (w *watcher) Start(ctx context.Context) error {
}

// block until one of the goroutines returns an error
err := <-errs
return err
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "exiting Start")
case err := <-errs:
return err
}
}

// AddFile creates new file using the containerID as name
Expand Down
16 changes: 10 additions & 6 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import (
localtls "github.com/Azure/azure-container-networking/server/tls"
"github.com/Azure/azure-container-networking/store"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/avast/retry-go/v3"
"github.com/avast/retry-go/v4"
"github.com/pkg/errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -826,15 +826,19 @@ func main() {
z.Error("failed to create cnsclient", zap.Error(err))
}
go func() {
for {
_ = retry.Do(func() error {
z.Info("starting fsnotify watcher to process missed Pod deletes")
w := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z)
w, err := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z)
if err != nil {
z.Error("failed to create fsnotify watcher", zap.Error(err))
return errors.Wrap(err, "failed to create fsnotify watcher, will retry")
}
if err := w.Start(rootCtx); err != nil {
z.Error("failed to start fsnotify watcher, will retry", zap.Error(err))
time.Sleep(time.Minute)
continue
return errors.Wrap(err, "failed to start fsnotify watcher, will retry")
}
}
return nil
}, retry.DelayType(retry.BackOffDelay), retry.Attempts(0), retry.Context(rootCtx))
}()
}

Expand Down

0 comments on commit 2d8b331

Please sign in to comment.