Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Jan 24, 2023
1 parent e8ea7a0 commit 0d98d69
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 128 deletions.
257 changes: 131 additions & 126 deletions surveyor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func newServiceObservation(observationConfig ObservationConfig, sopts Options, m
}, nil
}

func (s *Surveyor) startObservationsInDir(depth int) fs.WalkDirFunc {
func (s *Surveyor) startObservationsInDir() fs.WalkDirFunc {
return func(path string, info fs.DirEntry, err error) error {
if err != nil {
return err
Expand Down Expand Up @@ -333,148 +333,153 @@ func (s *Surveyor) watchObservations(dir string, depth int) error {
s.observationWatchers[dir] = struct{}{}
s.Mutex.Unlock()
s.logger.Debugf("starting listener goroutine for %s", dir)
Outer:
for {
select {
case event, ok := <-watcher.Events:
path := event.Name
if !ok {
return
}
switch {
case event.Has(fsnotify.Create):
fs, err := os.Stat(path)
if err != nil {
s.logger.Warnf("could not read observation file %s: %s", path, err)
}
if filepath.Ext(fs.Name()) == ".json" {
obs, err := NewServiceObservationFromFile(path, s.opts, s.observationMetrics, s.reconnectCtr)
if err != nil {
s.logger.Warnf("could not create observation from %s: %s", path, err)
continue
}
for _, existingObservation := range s.observations {
if obs.opts.ServiceName == existingObservation.opts.ServiceName {
continue Outer
}
}
if err := s.handleWatcherEvent(event, depth); err != nil {
s.logger.Warn(err)
}
case <-s.stop:
return
}
}
}()
return nil
}

err = obs.Start()
if err != nil {
s.logger.Warnf("could not start observation from %s: %s", path, err)
continue
}
func (s *Surveyor) handleWatcherEvent(event fsnotify.Event, depth int) error {
path := event.Name
s.Lock()
defer s.Unlock()

s.Mutex.Lock()
s.observations = append(s.observations, obs)
s.Mutex.Unlock()
}
if fs.IsDir() {
depth--
err = filepath.WalkDir(path, s.startObservationsInDir(depth))
if err != nil {
s.logger.Warnf("could not start observation from %s: %s", path, err)
continue
}
if err := s.watchObservations(path, depth); err != nil {
s.logger.Errorf("could not start watcher in directory %s: %s", path, err)
continue
}
}
case event.Has(fsnotify.Write) && !event.Has(fsnotify.Remove):
fs, err := os.Stat(path)
if err != nil {
s.logger.Warnf("could not read observation file %s: %s", path, err)
continue
}
if filepath.Ext(fs.Name()) != ".json" {
continue
}
s.Mutex.Lock()
obs, err := NewServiceObservationFromFile(path, s.opts, s.observationMetrics, s.reconnectCtr)
for i, existingObservation := range s.observations {
// overwrite service observation in a file
if path == existingObservation.fromFile {
existingObservation.Stop()
if i < len(s.observations)-1 {
s.observations = append(s.observations[:i], s.observations[i+1:]...)
} else {
s.observations = s.observations[:i]
}
}
switch {
case event.Has(fsnotify.Create):
s.logger.Warnf("Creating observation: %s", path)
fs, err := os.Stat(path)
if err != nil {
return fmt.Errorf("could not read observation file %s: %s", path, err)
}
// if a new directory was created, first start all observation already in it
// and then start watchong for changes in this directory (fsnotify.Watcher is not recursive)
if fs.IsDir() {
depth--
err = filepath.WalkDir(path, s.startObservationsInDir())
if err != nil {
return fmt.Errorf("could not start observation from %s: %s", path, err)
}
if err := s.watchObservations(path, depth); err != nil {
return fmt.Errorf("could not start watcher in directory %s: %s", path, err)
}
}
// if not a directory and not a JSON, ignore
if filepath.Ext(fs.Name()) != ".json" {
return nil
}

// ignore service if it already exists
if obs.opts.ServiceName == existingObservation.opts.ServiceName {
s.Mutex.Unlock()
continue Outer
}
}
if err != nil {
s.logger.Warnf("could not create observation from %s: %s", path, err)
s.Mutex.Unlock()
continue
}
// create new observation from json
obs, err := NewServiceObservationFromFile(path, s.opts, s.observationMetrics, s.reconnectCtr)
if err != nil {
return fmt.Errorf("could not create observation from %s: %s", path, err)
}
for _, existingObservation := range s.observations {
if obs.opts.ServiceName == existingObservation.opts.ServiceName {
return nil
}
}

err = obs.Start()
if err != nil {
s.logger.Warnf("could not start observation from %s: %s", path, err)
s.Mutex.Unlock()
continue
}
err = obs.Start()
if err != nil {
return fmt.Errorf("could not start observation from %s: %s", path, err)
}

s.observations = append(s.observations, obs)
s.Mutex.Unlock()
s.observations = append(s.observations, obs)
case event.Has(fsnotify.Write) && !event.Has(fsnotify.Remove):
s.logger.Warnf("Writing observation: %s", path)
fs, err := os.Stat(path)
if err != nil {
return fmt.Errorf("could not read observation file %s: %s", path, err)
}
// if not a JSON, ignore
if filepath.Ext(fs.Name()) != ".json" {
return nil
}
obs, err := NewServiceObservationFromFile(path, s.opts, s.observationMetrics, s.reconnectCtr)
for i, existingObservation := range s.observations {
// overwrite service observation in a file
if path == existingObservation.fromFile {
existingObservation.Stop()
if i < len(s.observations)-1 {
s.observations = append(s.observations[:i], s.observations[i+1:]...)
} else {
s.observations = s.observations[:i]
}
}

case event.Has(fsnotify.Remove):
if _, ok := s.observationWatchers[path]; ok {
s.Mutex.Lock()
for i := 0; ; i++ {
if i > len(s.observations)-1 {
break
}
if strings.HasPrefix(s.observations[i].fromFile, path) {
s.observations[i].Stop()
if i < len(s.observations)-1 {
s.observations = append(s.observations[:i], s.observations[i+1:]...)
} else {
s.observations = s.observations[:i]
}
i--
}
}
delete(s.observationWatchers, path)
s.Mutex.Unlock()
watcher.Remove(path)
continue Outer
}
if filepath.Ext(path) != ".json" {
continue
}
s.Mutex.Lock()
for i := 0; ; i++ {
if i > len(s.observations)-1 {
break
}
if s.observations[i].fromFile == path {
s.observations[i].Stop()
if i < len(s.observations)-1 {
s.observations = append(s.observations[:i], s.observations[i+1:]...)
} else {
s.observations = s.observations[:i]
}
i--
}
}
s.Mutex.Unlock()
// ignore service if it already exists
if obs.opts.ServiceName == existingObservation.opts.ServiceName {
return fmt.Errorf("service observation with provided service name already exists: %s", obs.opts.ServiceName)
}
}
if err != nil {
return fmt.Errorf("could not create observation from %s: %s", path, err)
}

err = obs.Start()
if err != nil {
return fmt.Errorf("could not start observation from %s: %s", path, err)
}

s.observations = append(s.observations, obs)

case event.Has(fsnotify.Remove):
s.logger.Warnf("Removing observation: %s", path)
// directory removed, delete all observations inside and cancel watching this dir
if _, ok := s.observationWatchers[path]; ok {
for i := 0; ; i++ {
if i > len(s.observations)-1 {
break
}
if strings.HasPrefix(s.observations[i].fromFile, path) {
s.observations = removeObservation(s.observations, i)
i--
}
case <-s.stop:
return
}
delete(s.observationWatchers, path)
return nil
}
}()
// if not a directory and not a JSON, ignore
if filepath.Ext(path) != ".json" {
return nil
}
for i := 0; ; i++ {
if i > len(s.observations)-1 {
break
}
if s.observations[i].fromFile == path {
s.observations = removeObservation(s.observations, i)
i--
}
}
}
return nil
}

func removeObservation(observations []*ServiceObsListener, i int) []*ServiceObsListener {
if i >= len(observations) {
return observations
}
observations[i].Stop()
if i < len(observations)-1 {
observations = append(observations[:i], observations[i+1:]...)
} else {
observations = observations[:i]
}
return observations
}

type ManageObservationOpts func(*ObservationsManager) error

type ObservationErrHandler func(string, error)
Expand All @@ -495,7 +500,7 @@ func ObservationsError(handler ObservationErrHandler) ManageObservationOpts {
}
}

// ManageObservations creates an ObservationManager, allowing for adding/deleting service observations to the surveyor.
// ManageObservastions creates an ObservationManager, allowing for adding/deleting service observations to the surveyor.
//
// ManageObservationOpts can be supplied to configure ObservationsManager.
func (s *Surveyor) ManageObservastions(opts ...ManageObservationOpts) (*ObservationsManager, error) {
Expand Down
2 changes: 1 addition & 1 deletion surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (s *Surveyor) startObservations() error {
return fmt.Errorf("observations dir %s is not a directory", dir)
}

err = filepath.WalkDir(dir, s.startObservationsInDir(5))
err = filepath.WalkDir(dir, s.startObservationsInDir())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion surveyor/surveyor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func TestSurveyor_ObservationsWatcher(t *testing.T) {
waitForMetricUpdate := func(t *testing.T, expectedObservationsNum int) {
t.Helper()
ticker := time.NewTicker(150 * time.Millisecond)
timeout := time.After(5 * time.Second)
timeout := time.After(20 * time.Second)
defer ticker.Stop()
for {
select {
Expand Down

0 comments on commit 0d98d69

Please sign in to comment.