diff --git a/cns/fsnotify/fsnotify.go b/cns/fsnotify/fsnotify.go index 5f1beaf1b17..c7d8287b610 100644 --- a/cns/fsnotify/fsnotify.go +++ b/cns/fsnotify/fsnotify.go @@ -3,6 +3,7 @@ package fsnotify import ( "context" "io" + "io/fs" "os" "sync" "time" @@ -27,17 +28,18 @@ type watcher struct { } // Create the AsyncDelete watcher. -func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint +func New(cli releaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint // Add directory where intended deletes are kept - if err := os.Mkdir(path, 0o755); err != nil { //nolint + if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint logger.Error("error making directory", zap.String("path", path), zap.Error(err)) + return nil, errors.Wrapf(err, "failed to create dir %s", path) } return &watcher{ cli: cli, path: path, log: logger, pendingDelete: make(map[string]struct{}), - } + }, nil } // releaseAll locks and iterates the pendingDeletes map and calls CNS to @@ -164,8 +166,12 @@ func (w *watcher) readFS() error { // WatchFS starts the filesystem watcher to handle async Pod deletes. // Blocks until the context is closed; returns underlying fsnotify errors // if something goes fatally wrong. -func (w *watcher) Start(ctx context.Context) error { +func (w *watcher) Start(c context.Context) error { + ctx, cancel := context.WithCancel(c) + defer cancel() + errs := make(chan error) + // Start watching for enqueued missed deletes so that we process them as soon as they arrive. go func(errs chan<- error) { errs <- w.watchPendingDelete(ctx) @@ -182,8 +188,12 @@ func (w *watcher) Start(ctx context.Context) error { } // block until one of the goroutines returns an error - err := <-errs - return err + select { + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "exiting Start") + case err := <-errs: + return err + } } // AddFile creates new file using the containerID as name diff --git a/cns/service/main.go b/cns/service/main.go index 3304541540e..da013ab23e6 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -57,7 +57,7 @@ import ( localtls "github.com/Azure/azure-container-networking/server/tls" "github.com/Azure/azure-container-networking/store" "github.com/Azure/azure-container-networking/telemetry" - "github.com/avast/retry-go/v3" + "github.com/avast/retry-go/v4" "github.com/pkg/errors" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -826,15 +826,19 @@ func main() { z.Error("failed to create cnsclient", zap.Error(err)) } go func() { - for { + _ = retry.Do(func() error { z.Info("starting fsnotify watcher to process missed Pod deletes") - w := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z) + w, err := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z) + if err != nil { + z.Error("failed to create fsnotify watcher", zap.Error(err)) + return errors.Wrap(err, "failed to create fsnotify watcher, will retry") + } if err := w.Start(rootCtx); err != nil { z.Error("failed to start fsnotify watcher, will retry", zap.Error(err)) - time.Sleep(time.Minute) - continue + return errors.Wrap(err, "failed to start fsnotify watcher, will retry") } - } + return nil + }, retry.DelayType(retry.BackOffDelay), retry.Attempts(0), retry.Context(rootCtx)) }() }