-
Notifications
You must be signed in to change notification settings - Fork 242
/
Copy pathfsnotify.go
208 lines (191 loc) · 7 KB
/
fsnotify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package fsnotify
import (
"context"
"io"
"io/fs"
"os"
"sync"
"time"
"github.com/Azure/azure-container-networking/cns"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type releaseIPsClient interface {
ReleaseIPAddress(ctx context.Context, ipconfig cns.IPConfigRequest) error
}
type watcher struct {
cli releaseIPsClient
path string
log *zap.Logger
pendingDelete map[string]struct{}
lock sync.Mutex
}
// Create the AsyncDelete watcher.
func New(cli releaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint
// Add directory where intended deletes are kept
if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint
logger.Error("error making directory", zap.String("path", path), zap.Error(err))
return nil, errors.Wrapf(err, "failed to create dir %s", path)
}
return &watcher{
cli: cli,
path: path,
log: logger,
pendingDelete: make(map[string]struct{}),
}, nil
}
// releaseAll locks and iterates the pendingDeletes map and calls CNS to
// release the IP for any Pod containerIDs present. When an IP is released
// that entry is removed from the map and the file is deleted. If the file
// fails to delete, we still remove it from the map so that we don't retry
// it during the life of this process, but we may retry it on a subsequent
// invocation of CNS. This is okay because calling releaseIP on an already
// processed containerID is a no-op, and we may be able to delete the file
// during that future retry.
func (w *watcher) releaseAll(ctx context.Context) {
w.lock.Lock()
defer w.lock.Unlock()
for containerID := range w.pendingDelete {
// read file contents
filepath := w.path + "/" + containerID
file, err := os.Open(filepath)
if err != nil {
w.log.Error("failed to open file", zap.Error(err))
}
data, errReadingFile := io.ReadAll(file)
if errReadingFile != nil {
w.log.Error("failed to read file content", zap.Error(errReadingFile))
}
file.Close()
podInterfaceID := string(data)
w.log.Info("releasing IP for missed delete", zap.String("podInterfaceID", podInterfaceID), zap.String("containerID", containerID))
if err := w.releaseIP(ctx, podInterfaceID, containerID); err != nil {
w.log.Error("failed to release IP for missed delete", zap.String("containerID", containerID), zap.Error(err))
continue
}
w.log.Info("successfully released IP for missed delete", zap.String("containerID", containerID))
delete(w.pendingDelete, containerID)
if err := removeFile(containerID, w.path); err != nil {
w.log.Error("failed to remove file for missed delete", zap.Error(err))
}
}
}
// watchPendingDelete periodically checks the map for pending release IPs
// and calls releaseAll to process the contents when present.
func (w *watcher) watchPendingDelete(ctx context.Context) error {
ticker := time.NewTicker(15 * time.Second) //nolint
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "exiting watchPendingDelete")
case <-ticker.C:
n := len(w.pendingDelete)
if n == 0 {
continue
}
w.log.Info("processing pending missed deletes", zap.Int("count", n))
w.releaseAll(ctx)
}
}
}
// watchFS starts the fsnotify watcher and handles events for file creation
// or deletion in the missed pending delete directory. A file creation event
// indicates that CNS missed the delete call for a containerID and needs
// to process the release IP asynchronously.
func (w *watcher) watchFS(ctx context.Context) error {
// Create new fs watcher.
watcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "error creating fsnotify watcher")
}
defer watcher.Close()
// Start watching the directory, so that we don't miss any events.
err = watcher.Add(w.path)
if err != nil {
w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err))
return errors.Wrap(err, "failed to add path to fsnotify watcher")
}
// List the directory and creates synthetic events for any existing items.
w.log.Info("listing directory", zap.String("path", w.path))
dirContents, err := os.ReadDir(w.path)
if err != nil {
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
return errors.Wrapf(err, "failed to read %s", w.path)
}
if len(dirContents) == 0 {
w.log.Info("no missed deletes found")
}
w.lock.Lock()
for _, file := range dirContents {
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
w.pendingDelete[file.Name()] = struct{}{}
}
w.lock.Unlock()
// Start listening for events.
w.log.Info("listening for events from fsnotify watcher")
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "exiting watchFS")
case event, ok := <-watcher.Events:
if !ok {
return errors.New("fsnotify watcher closed")
}
if !event.Has(fsnotify.Create) {
// discard any event that is not a file Create
continue
}
w.log.Info("received create event", zap.String("event", event.Name))
w.lock.Lock()
w.pendingDelete[event.Name] = struct{}{}
w.lock.Unlock()
case watcherErr := <-watcher.Errors:
w.log.Error("fsnotify watcher error", zap.Error(watcherErr))
}
}
}
// Start starts the filesystem watcher to handle async Pod deletes.
// Blocks until the context is closed; returns underlying fsnotify errors
// if something goes fatally wrong.
func (w *watcher) Start(ctx context.Context) error {
g, groupCtx := errgroup.WithContext(ctx)
// Start watching for enqueued missed deletes so that we process them as soon as they arrive.
g.Go(func() error { return w.watchPendingDelete(groupCtx) })
// Start watching for changes to the filesystem so that we don't miss any async deletes.
g.Go(func() error { return w.watchFS(groupCtx) })
// the first error from the errgroup will trigger context cancellation for other goroutines in the group.
// this will block until all goroutines complete and return the first error.
return g.Wait() //nolint:wrapcheck // ignore
}
// AddFile creates new file using the containerID as name
func AddFile(podInterfaceID, containerID, path string) error {
filepath := path + "/" + containerID
f, err := os.Create(filepath)
if err != nil {
return errors.Wrap(err, "error creating file")
}
_, writeErr := f.WriteString(podInterfaceID)
if writeErr != nil {
return errors.Wrap(writeErr, "error writing to file")
}
return errors.Wrap(f.Close(), "error adding file to directory")
}
// removeFile removes the file based on containerID
func removeFile(containerID, path string) error {
filepath := path + "/" + containerID
if err := os.Remove(filepath); err != nil {
return errors.Wrap(err, "error deleting file")
}
return nil
}
// call cns ReleaseIPs
func (w *watcher) releaseIP(ctx context.Context, podInterfaceID, containerID string) error {
ipconfigreq := &cns.IPConfigRequest{
PodInterfaceID: podInterfaceID,
InfraContainerID: containerID,
}
return errors.Wrap(w.cli.ReleaseIPAddress(ctx, *ipconfigreq), "failed to release IP from CNS")
}