Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for 'create' event from the inotify watcher #11

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion watch/filechanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ type FileChanges struct {
Modified chan bool // Channel to get notified of modifications
Truncated chan bool // Channel to get notified of truncations
Deleted chan bool // Channel to get notified of deletions/renames
Created chan bool // Channel to get notified of creations
}

func NewFileChanges() *FileChanges {
return &FileChanges{
make(chan bool, 1), make(chan bool, 1), make(chan bool, 1)}
make(chan bool, 1),
make(chan bool, 1),
make(chan bool, 1),
make(chan bool, 1),
}
}

func (fc *FileChanges) NotifyModified() {
Expand All @@ -23,6 +28,10 @@ func (fc *FileChanges) NotifyDeleted() {
sendOnlyIfEmpty(fc.Deleted)
}

func (fc *FileChanges) NotifyCreated() {
sendOnlyIfEmpty(fc.Created)
}

// sendOnlyIfEmpty sends on a bool channel only if the channel has no
// backlog to be read by other goroutines. This concurrency pattern
// can be used to notify other goroutines if and only if they are
Expand Down
2 changes: 2 additions & 0 deletions watch/inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange
} else {
changes.NotifyModified()
}
case evt.Op&fsnotify.Create == fsnotify.Create:
changes.NotifyCreated()
}
}
}()
Expand Down
25 changes: 19 additions & 6 deletions watch/inotify_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"sync"
"syscall"

"github.com/influxdata/tail/util"

"gopkg.in/fsnotify.v1"

"github.com/influxdata/tail/util"
)

type InotifyTracker struct {
Expand All @@ -31,8 +31,8 @@ type watchInfo struct {
fname string
}

func (this *watchInfo) isCreate() bool {
return this.op == fsnotify.Create
func (winfo *watchInfo) isCreate() bool {
return winfo.op == fsnotify.Create
}

var (
Expand Down Expand Up @@ -199,8 +199,21 @@ func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
name := filepath.Clean(event.Name)

shared.mux.Lock()
ch := shared.chans[name]
done := shared.done[name]

// since the watcher could be defined on a directory, we check if the directory is present in the channels map
dir := filepath.Dir(name)
var ch chan fsnotify.Event
var done chan bool
var ok bool

if ch, ok = shared.chans[dir]; ok {
// watcher on directory present, only need to initialize "done"
done = shared.done[dir]
} else {
ch = shared.chans[name]
done = shared.done[name]
}

shared.mux.Unlock()

if ch != nil && done != nil {
Expand Down
3 changes: 2 additions & 1 deletion watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ type FileWatcher interface {
BlockUntilExists(*tomb.Tomb) error

// ChangeEvents reports on changes to a file, be it modification,
// deletion, renames or truncations. Returned FileChanges group of
// deletion, creation, renames or truncations. Returned FileChanges group of
// channels will be closed, thus become unusable, after a deletion
// or truncation event.
// In order to properly report truncations, ChangeEvents requires
// the caller to pass their current offset in the file.
// File creations are reported when the watcher is initialized on the parent directory.
ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error)
}
145 changes: 94 additions & 51 deletions watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package watch
import (
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
Expand All @@ -16,64 +15,105 @@ import (
)

func TestWatchNotify(t *testing.T) {
tmpDir := t.TempDir()
testCases := []struct {
name string
poll bool
name string
poll bool
toWatch func() []string
}{
{"Test watch inotify", false},
{"Test watch poll", true},
{
name: "Test watch inotify with directory",
poll: false,
toWatch: func() []string {
dirPath := filepath.Join(tmpDir, "testDir")
err := os.Mkdir(dirPath, 0755)
if err != nil {
t.Fatal(err)
}
xyzFile := filepath.Join(tmpDir, "xyz")
f, err := os.Create(xyzFile)
if err != nil {
t.Fatal(err)
}
f.Close()
return []string{dirPath, xyzFile}
},
},
{
name: "Test watch poll",
poll: true,
toWatch: func() []string {
abcFile := filepath.Join(tmpDir, "abc")
f, err := os.Create(abcFile)
if err != nil {
t.Fatal(err)
}
f.Close()
xyzFile := filepath.Join(tmpDir, "xyz")
f, err = os.Create(xyzFile)
if err != nil {
t.Fatal(err)
}
f.Close()
return []string{abcFile, xyzFile}
},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "watch-")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
filePath := filepath.Join(tmpDir, "a")
// create file
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0777)
if err != nil {
t.Fatal(err)
}
err = file.Close()
if err != nil {
t.Fatal(err)
}
filesToWatch := test.toWatch()

var wg sync.WaitGroup
var werr error
changes := 0
chanClose := make(chan struct{})
wg.Add(1)
go func() {
changes, werr = watchFile(filePath, test.poll, chanClose)
wg.Done()
}()

writeToFile(t, filePath, "hello", true)
<-time.After(time.Second)
writeToFile(t, filePath, "world", true)
<-time.After(time.Second)
writeToFile(t, filePath, "end", false)
<-time.After(time.Second)
//err = os.Remove(filePath)
//if err != nil {
// t.Fatal(err)
//}
rmFile(t, filePath)
chanClose <- struct{}{}
wg.Wait()
close(chanClose)
// each file path test is done synchronously, but watcher works async
for _, filePath := range filesToWatch {
changes := 0
var werr error
var wg sync.WaitGroup
chanClose := make(chan struct{})
wg.Add(1)
go func(filePath string) {
changes, werr = watchFile(filePath, test.poll, chanClose)
wg.Done()
}(filePath)
wait := make(chan bool)

if werr != nil {
t.Fatal(werr)
}
// ideally, there should be 4 changes (2xmodified,1xtruncaed and 1xdeleted)
// but, notifications from fsnotify are usually 2 (2xmodify) and 3x from poll (2xmodify, 1xtruncated)
if changes < 1 || changes > 4 {
t.Errorf("Invalid changes count: %d\n", changes)
// check if file is a directory, if yes, create a file
if fi, err := os.Stat(filePath); err == nil && fi.IsDir() {
time.AfterFunc(time.Second, func() {
f, err := os.Create(filepath.Join(filePath, "a"))
if err != nil {
t.Fatal(err)
}
f.Close()
filePath, _ = filepath.Abs(f.Name())
wait <- true
})
<-wait
}
writeToFile(t, filePath, "hello", true)
<-time.After(time.Second)
writeToFile(t, filePath, "world", true)
<-time.After(time.Second)
writeToFile(t, filePath, "end", false)
<-time.After(time.Second)
//err = os.Remove(filePath)
//if err != nil {
// t.Fatal(err)
//}
rmFile(t, filePath)
chanClose <- struct{}{}
close(chanClose)
close(wait)
wg.Wait()
if werr != nil {
t.Fatal(werr)
}
// ideally, there should be 4 changes (2xmodified,1xtruncaed and 1xdeleted)
// but, notifications from fsnotify are usually 2 (2xmodify) and 3x from poll (2xmodify, 1xtruncated)
if changes < 1 || changes > 4 {
t.Errorf("Invalid changes count: %d\n", changes)
}
}

})
}
}
Expand Down Expand Up @@ -151,6 +191,9 @@ func watchFile(path string, poll bool, close <-chan struct{}) (int, error) {
case <-changes.Truncated:
fmt.Println("Truncated")
changesCount++
case <-changes.Created:
fmt.Println("Created")
changesCount++
case <-mytomb.Dying():
return -1, errors.New("dying")
case <-close:
Expand Down