@@ -3,6 +3,7 @@ package fsnotify
3
3
import (
4
4
"context"
5
5
"io"
6
+ "io/fs"
6
7
"os"
7
8
"sync"
8
9
"time"
@@ -11,6 +12,7 @@ import (
11
12
"github.com/fsnotify/fsnotify"
12
13
"github.com/pkg/errors"
13
14
"go.uber.org/zap"
15
+ "golang.org/x/sync/errgroup"
14
16
)
15
17
16
18
type releaseIPsClient interface {
@@ -27,17 +29,18 @@ type watcher struct {
27
29
}
28
30
29
31
// Create the AsyncDelete watcher.
30
- func New (cli releaseIPsClient , path string , logger * zap.Logger ) * watcher { //nolint
32
+ func New (cli releaseIPsClient , path string , logger * zap.Logger ) ( * watcher , error ) { //nolint
31
33
// Add directory where intended deletes are kept
32
- if err := os .Mkdir (path , 0o755 ); err != nil { //nolint
34
+ if err := os .Mkdir (path , 0o755 ); err != nil && ! errors . Is ( err , fs . ErrExist ) { //nolint
33
35
logger .Error ("error making directory" , zap .String ("path" , path ), zap .Error (err ))
36
+ return nil , errors .Wrapf (err , "failed to create dir %s" , path )
34
37
}
35
38
return & watcher {
36
39
cli : cli ,
37
40
path : path ,
38
41
log : logger ,
39
42
pendingDelete : make (map [string ]struct {}),
40
- }
43
+ }, nil
41
44
}
42
45
43
46
// releaseAll locks and iterates the pendingDeletes map and calls CNS to
@@ -115,6 +118,23 @@ func (w *watcher) watchFS(ctx context.Context) error {
115
118
if err != nil {
116
119
w .log .Error ("failed to add path to fsnotify watcher" , zap .String ("path" , w .path ), zap .Error (err ))
117
120
}
121
+ // List the directory and creates synthetic events for any existing items.
122
+ w .log .Info ("listing directory" , zap .String ("path" , w .path ))
123
+ dirContents , err := os .ReadDir (w .path )
124
+ if err != nil {
125
+ w .log .Error ("error reading deleteID directory" , zap .String ("path" , w .path ), zap .Error (err ))
126
+ return errors .Wrapf (err , "failed to read %s" , w .path )
127
+ }
128
+ if len (dirContents ) == 0 {
129
+ w .log .Info ("no missed deletes found" )
130
+ }
131
+ w .lock .Lock ()
132
+ for _ , file := range dirContents {
133
+ w .log .Info ("adding missed delete from file" , zap .String ("name" , file .Name ()))
134
+ w .pendingDelete [file .Name ()] = struct {}{}
135
+ }
136
+ w .lock .Unlock ()
137
+
118
138
// Start listening for events.
119
139
w .log .Info ("listening for events from fsnotify watcher" )
120
140
for {
@@ -139,51 +159,20 @@ func (w *watcher) watchFS(ctx context.Context) error {
139
159
}
140
160
}
141
161
142
- // readFS lists the directory and enqueues any missed deletes that are already
143
- // present on-disk.
144
- func (w * watcher ) readFS () error {
145
- w .log .Info ("listing directory" , zap .String ("path" , w .path ))
146
- dirContents , err := os .ReadDir (w .path )
147
- if err != nil {
148
- w .log .Error ("error reading deleteID directory" , zap .String ("path" , w .path ), zap .Error (err ))
149
- return errors .Wrapf (err , "failed to read %s" , w .path )
150
- }
151
- if len (dirContents ) == 0 {
152
- w .log .Info ("no missed deletes found" )
153
- return nil
154
- }
155
- w .lock .Lock ()
156
- for _ , file := range dirContents {
157
- w .log .Info ("adding missed delete from file" , zap .String ("name" , file .Name ()))
158
- w .pendingDelete [file .Name ()] = struct {}{}
159
- }
160
- w .lock .Unlock ()
161
- return nil
162
- }
163
-
164
- // WatchFS starts the filesystem watcher to handle async Pod deletes.
162
+ // Start starts the filesystem watcher to handle async Pod deletes.
165
163
// Blocks until the context is closed; returns underlying fsnotify errors
166
164
// if something goes fatally wrong.
167
- func (w * watcher ) Start (ctx context.Context ) error {
168
- errs := make (chan error )
165
+ func (w * watcher ) Start (c context.Context ) error {
166
+ ctx , cancel := context .WithCancel (c )
167
+ defer cancel ()
168
+ g , groupCtx := errgroup .WithContext (ctx )
169
169
// Start watching for enqueued missed deletes so that we process them as soon as they arrive.
170
- go func (errs chan <- error ) {
171
- errs <- w .watchPendingDelete (ctx )
172
- }(errs )
173
-
170
+ g .Go (func () error { return w .watchPendingDelete (groupCtx ) })
174
171
// Start watching for changes to the filesystem so that we don't miss any async deletes.
175
- go func (errs chan <- error ) {
176
- errs <- w .watchFS (ctx )
177
- }(errs )
178
-
179
- // Read the directory to enqueue any missed deletes that are already present on-disk.
180
- if err := w .readFS (); err != nil {
181
- return err
182
- }
183
-
184
- // block until one of the goroutines returns an error
185
- err := <- errs
186
- return err
172
+ g .Go (func () error { return w .watchFS (groupCtx ) })
173
+ // the first error from the errgroup will trigger context cancellation for other goroutines in the group.
174
+ // this will block until all goroutines complete and return the first error.
175
+ return g .Wait () //nolint:wrapcheck // ignore
187
176
}
188
177
189
178
// AddFile creates new file using the containerID as name
0 commit comments