From 97db2a1a272164a1bc90aad81144d4c40a8325ed Mon Sep 17 00:00:00 2001 From: Neelay Upadhyaya Date: Mon, 21 Nov 2022 17:43:47 +0530 Subject: [PATCH] feat: support for 'create' event from the inotify watcher --- watch/filechanges.go | 11 ++- watch/inotify.go | 2 + watch/inotify_tracker.go | 25 +++++-- watch/watch.go | 3 +- watch/watch_test.go | 145 +++++++++++++++++++++++++-------------- 5 files changed, 127 insertions(+), 59 deletions(-) diff --git a/watch/filechanges.go b/watch/filechanges.go index f80aead9..9908d760 100644 --- a/watch/filechanges.go +++ b/watch/filechanges.go @@ -4,11 +4,16 @@ type FileChanges struct { Modified chan bool // Channel to get notified of modifications Truncated chan bool // Channel to get notified of truncations Deleted chan bool // Channel to get notified of deletions/renames + Created chan bool // Channel to get notified of creations } func NewFileChanges() *FileChanges { return &FileChanges{ - make(chan bool, 1), make(chan bool, 1), make(chan bool, 1)} + make(chan bool, 1), + make(chan bool, 1), + make(chan bool, 1), + make(chan bool, 1), + } } func (fc *FileChanges) NotifyModified() { @@ -23,6 +28,10 @@ func (fc *FileChanges) NotifyDeleted() { sendOnlyIfEmpty(fc.Deleted) } +func (fc *FileChanges) NotifyCreated() { + sendOnlyIfEmpty(fc.Created) +} + // sendOnlyIfEmpty sends on a bool channel only if the channel has no // backlog to be read by other goroutines. This concurrency pattern // can be used to notify other goroutines if and only if they are diff --git a/watch/inotify.go b/watch/inotify.go index 93e46737..24f2c713 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -133,6 +133,8 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange } else { changes.NotifyModified() } + case evt.Op&fsnotify.Create == fsnotify.Create: + changes.NotifyCreated() } } }() diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index 333e261d..d88ad6e9 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -10,9 +10,9 @@ import ( "sync" "syscall" - "github.com/influxdata/tail/util" - "gopkg.in/fsnotify.v1" + + "github.com/influxdata/tail/util" ) type InotifyTracker struct { @@ -31,8 +31,8 @@ type watchInfo struct { fname string } -func (this *watchInfo) isCreate() bool { - return this.op == fsnotify.Create +func (winfo *watchInfo) isCreate() bool { + return winfo.op == fsnotify.Create } var ( @@ -199,8 +199,21 @@ func (shared *InotifyTracker) sendEvent(event fsnotify.Event) { name := filepath.Clean(event.Name) shared.mux.Lock() - ch := shared.chans[name] - done := shared.done[name] + + // since the watcher could be defined on a directory, we check if the directory is present in the channels map + dir := filepath.Dir(name) + var ch chan fsnotify.Event + var done chan bool + var ok bool + + if ch, ok = shared.chans[dir]; ok { + // watcher on directory present, only need to initialize "done" + done = shared.done[dir] + } else { + ch = shared.chans[name] + done = shared.done[name] + } + shared.mux.Unlock() if ch != nil && done != nil { diff --git a/watch/watch.go b/watch/watch.go index 2e1783ef..e0a96ecf 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -11,10 +11,11 @@ type FileWatcher interface { BlockUntilExists(*tomb.Tomb) error // ChangeEvents reports on changes to a file, be it modification, - // deletion, renames or truncations. Returned FileChanges group of + // deletion, creation, renames or truncations. Returned FileChanges group of // channels will be closed, thus become unusable, after a deletion // or truncation event. // In order to properly report truncations, ChangeEvents requires // the caller to pass their current offset in the file. + // File creations are reported when the watcher is initialized on the parent directory. ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error) } diff --git a/watch/watch_test.go b/watch/watch_test.go index dd35828d..96871f6d 100644 --- a/watch/watch_test.go +++ b/watch/watch_test.go @@ -3,7 +3,6 @@ package watch import ( "errors" "fmt" - "io/ioutil" "os" "os/exec" "path/filepath" @@ -16,64 +15,105 @@ import ( ) func TestWatchNotify(t *testing.T) { + tmpDir := t.TempDir() testCases := []struct { - name string - poll bool + name string + poll bool + toWatch func() []string }{ - {"Test watch inotify", false}, - {"Test watch poll", true}, + { + name: "Test watch inotify with directory", + poll: false, + toWatch: func() []string { + dirPath := filepath.Join(tmpDir, "testDir") + err := os.Mkdir(dirPath, 0755) + if err != nil { + t.Fatal(err) + } + xyzFile := filepath.Join(tmpDir, "xyz") + f, err := os.Create(xyzFile) + if err != nil { + t.Fatal(err) + } + f.Close() + return []string{dirPath, xyzFile} + }, + }, + { + name: "Test watch poll", + poll: true, + toWatch: func() []string { + abcFile := filepath.Join(tmpDir, "abc") + f, err := os.Create(abcFile) + if err != nil { + t.Fatal(err) + } + f.Close() + xyzFile := filepath.Join(tmpDir, "xyz") + f, err = os.Create(xyzFile) + if err != nil { + t.Fatal(err) + } + f.Close() + return []string{abcFile, xyzFile} + }, + }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "watch-") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tmpDir) - filePath := filepath.Join(tmpDir, "a") - // create file - file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0777) - if err != nil { - t.Fatal(err) - } - err = file.Close() - if err != nil { - t.Fatal(err) - } + filesToWatch := test.toWatch() - var wg sync.WaitGroup - var werr error - changes := 0 - chanClose := make(chan struct{}) - wg.Add(1) - go func() { - changes, werr = watchFile(filePath, test.poll, chanClose) - wg.Done() - }() - - writeToFile(t, filePath, "hello", true) - <-time.After(time.Second) - writeToFile(t, filePath, "world", true) - <-time.After(time.Second) - writeToFile(t, filePath, "end", false) - <-time.After(time.Second) - //err = os.Remove(filePath) - //if err != nil { - // t.Fatal(err) - //} - rmFile(t, filePath) - chanClose <- struct{}{} - wg.Wait() - close(chanClose) + // each file path test is done synchronously, but watcher works async + for _, filePath := range filesToWatch { + changes := 0 + var werr error + var wg sync.WaitGroup + chanClose := make(chan struct{}) + wg.Add(1) + go func(filePath string) { + changes, werr = watchFile(filePath, test.poll, chanClose) + wg.Done() + }(filePath) + wait := make(chan bool) - if werr != nil { - t.Fatal(werr) - } - // ideally, there should be 4 changes (2xmodified,1xtruncaed and 1xdeleted) - // but, notifications from fsnotify are usually 2 (2xmodify) and 3x from poll (2xmodify, 1xtruncated) - if changes < 1 || changes > 4 { - t.Errorf("Invalid changes count: %d\n", changes) + // check if file is a directory, if yes, create a file + if fi, err := os.Stat(filePath); err == nil && fi.IsDir() { + time.AfterFunc(time.Second, func() { + f, err := os.Create(filepath.Join(filePath, "a")) + if err != nil { + t.Fatal(err) + } + f.Close() + filePath, _ = filepath.Abs(f.Name()) + wait <- true + }) + <-wait + } + writeToFile(t, filePath, "hello", true) + <-time.After(time.Second) + writeToFile(t, filePath, "world", true) + <-time.After(time.Second) + writeToFile(t, filePath, "end", false) + <-time.After(time.Second) + //err = os.Remove(filePath) + //if err != nil { + // t.Fatal(err) + //} + rmFile(t, filePath) + chanClose <- struct{}{} + close(chanClose) + close(wait) + wg.Wait() + if werr != nil { + t.Fatal(werr) + } + // ideally, there should be 4 changes (2xmodified,1xtruncaed and 1xdeleted) + // but, notifications from fsnotify are usually 2 (2xmodify) and 3x from poll (2xmodify, 1xtruncated) + if changes < 1 || changes > 4 { + t.Errorf("Invalid changes count: %d\n", changes) + } } + }) } } @@ -151,6 +191,9 @@ func watchFile(path string, poll bool, close <-chan struct{}) (int, error) { case <-changes.Truncated: fmt.Println("Truncated") changesCount++ + case <-changes.Created: + fmt.Println("Created") + changesCount++ case <-mytomb.Dying(): return -1, errors.New("dying") case <-close: