From 249b48a93f70597c9f393df3ff7b67f24691d3ac Mon Sep 17 00:00:00 2001 From: Cian Gallagher Date: Sat, 11 Dec 2021 16:22:13 +0000 Subject: [PATCH] BACKEND: Implement pub/sub pattern for file system events. --- cmd/main.go | 21 +++++++++++ watcher/watcher.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/cmd/main.go b/cmd/main.go index 06ab7d0..0ace5ed 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1 +1,22 @@ package main + +import ( + "github.com/cian911/switchboard/watcher" +) + +func main() { + path := "/Users/cian.gallagher/test_events" + destination := "/tmp" + + var pw watcher.Producer = &watcher.PathWatcher{ + Path: path, + } + + var pc watcher.Consumer = &watcher.PathConsumer{ + Path: path, + Destination: destination, + } + + pw.Register(&pc) + pw.Observe() +} diff --git a/watcher/watcher.go b/watcher/watcher.go index 8278790..1d47db3 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -1 +1,94 @@ package watcher + +import ( + "log" + "os" + "path/filepath" + + "github.com/fsnotify/fsnotify" +) + +type Producer interface { + Register(consumer *Consumer) + Unregister(consumer *Consumer) + notify(path, event string) + Observe() +} + +type Consumer interface { + Receive(path, event string) + Process(path, destination string) +} + +type PathWatcher struct { + Consumers []*Consumer + Watcher fsnotify.Watcher + Path string +} + +type PathConsumer struct { + Path string + Destination string +} + +func (pc *PathConsumer) Receive(path, event string) { + log.Printf("CONSUMER EVENT: path: %s, event: %s", path, event) +} + +func (pc *PathConsumer) Process(path, destination string) { + log.Printf("CONSUMER PROCESSING EVENT: path: %s, event: %s", path, destination) +} + +func (pw *PathWatcher) Register(consumer *Consumer) { + pw.Consumers = append(pw.Consumers, consumer) +} + +func (pw *PathWatcher) Unregister(consumer *Consumer) { + for i, cons := range pw.Consumers { + if cons == consumer { + pw.Consumers[i] = pw.Consumers[len(pw.Consumers)-1] + pw.Consumers = pw.Consumers[:len(pw.Consumers)-1] + } + } +} + +func (pw *PathWatcher) notify(path, event string) { + for _, cons := range pw.Consumers { + (*cons).Receive(path, event) + } +} + +func (pw *PathWatcher) Observe() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatalf("Could not create new watcher: %v", err) + } + + defer watcher.Close() + + // fsnotify doesnt support recursive folders, so we can here + if err := filepath.Walk(pw.Path, func(path string, info os.FileInfo, err error) error { + if info.Mode().IsDir() { + watcher.Add(path) + } + + return nil + }); err != nil { + log.Fatalf("Could not parse recursive path: %v", err) + } + + done := make(chan bool) + + go func() { + for { + select { + case event := <-watcher.Events: + pw.notify(event.Name, event.Op.String()) + case err := <-watcher.Errors: + log.Printf("Watcher encountered an error when observing %s: %v", pw.Path, err) + } + } + }() + + <-done +}