Skip to content

Commit

Permalink
Add updating observations, fix typo
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Jan 25, 2023
1 parent c501f58 commit a92b548
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 72 deletions.
176 changes: 119 additions & 57 deletions surveyor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (s *Surveyor) handleWatcherEvent(event fsnotify.Event, depth int) error {
}
// if a new directory was created, first start all observation already in it
// and then start watchong for changes in this directory (fsnotify.Watcher is not recursive)
if fs.IsDir() {
if fs.IsDir() && fs.Name() != "." {
depth--
err = filepath.WalkDir(path, s.startObservationsInDir())
if err != nil {
Expand Down Expand Up @@ -467,89 +467,62 @@ func removeObservation(observations []*ServiceObsListener, i int) []*ServiceObsL
return observations
}

type ManageObservationOpts func(*ObservationsManager) error
type ManageObservationsOpt func(*ObservationsManager) error

type ObservationErrHandler func(string, error)

type ObservationsManager struct {
surveyor *Surveyor
addObservation chan ObservationConfig
deleteObseravation chan string
errHandler ObservationErrHandler
surveyor *Surveyor
addObservations chan ObservationConfig
deleteObseravations chan string
updateObservations chan UpdateObservation
errHandler ObservationErrHandler
}

type UpdateObservation struct {
Name string
Config ObservationConfig
}

// ObservationsError is a configuration option for [ManageObservations].
// It sets error handler invoked when adding/deleting observations fails.
func ObservationsError(handler ObservationErrHandler) ManageObservationOpts {
func ObservationsError(handler ObservationErrHandler) ManageObservationsOpt {
return func(om *ObservationsManager) error {
om.errHandler = handler
return nil
}
}

// ManageObservastions creates an ObservationManager, allowing for adding/deleting service observations to the surveyor.
// ManageObservations creates an ObservationManager, allowing for adding/deleting service observations to the surveyor.
//
// ManageObservationOpts can be supplied to configure ObservationsManager.
func (s *Surveyor) ManageObservastions(opts ...ManageObservationOpts) (*ObservationsManager, error) {
func (s *Surveyor) ManageObservations(opts ...ManageObservationsOpt) (*ObservationsManager, error) {
obsManager := &ObservationsManager{
surveyor: s,
addObservation: make(chan ObservationConfig, 100),
deleteObseravation: make(chan string, 100),
surveyor: s,
addObservations: make(chan ObservationConfig, 100),
updateObservations: make(chan UpdateObservation, 100),
deleteObseravations: make(chan string, 100),
}
for _, opt := range opts {
if err := opt(obsManager); err != nil {
return nil, err
}
}
go func() {
Outer:
for {
select {
case obsConfig := <-obsManager.addObservation:
s.Mutex.Lock()
obs, err := newServiceObservation(obsConfig, s.opts, s.observationMetrics, s.reconnectCtr)
if err != nil {
s.logger.Warnf("could not create observation from config: %s: %s", obsConfig.ServiceName, err)
if obsManager.errHandler != nil {
obsManager.errHandler(obsConfig.ServiceName, err)
}
s.Mutex.Unlock()
continue
}
for _, existingObservation := range s.observations {
if obs.opts.ServiceName == existingObservation.opts.ServiceName {
s.logger.Warnf("adding service observation: observation with given service name already exists: %s", obsConfig.ServiceName)
s.Mutex.Unlock()
continue Outer
}
case obsConfig := <-obsManager.addObservations:
if err := obsManager.addObservation(obsConfig); err != nil {
s.logger.Warnf("adding service observation: %s", err)
}
err = obs.Start()
if err != nil {
s.logger.Warnf("could not start observation for service: %s: %s", obsConfig.ServiceName, err)
s.Mutex.Unlock()
continue
}

s.observations = append(s.observations, obs)
s.Mutex.Unlock()
case serviceName := <-obsManager.deleteObseravation:
s.Mutex.Lock()
var found bool
for i, existingObservation := range s.observations {
if serviceName == existingObservation.opts.ServiceName {
found = true
existingObservation.Stop()
if i < len(s.observations)-1 {
s.observations = append(s.observations[:i], s.observations[i+1:]...)
} else {
s.observations = s.observations[:i]
}
}
case updateObservation := <-obsManager.updateObservations:
if err := obsManager.updateObservation(updateObservation); err != nil {
s.logger.Warnf("updating service observation: %s", err)
}
if !found {
s.logger.Warnf("deleting service observation: observation with given service name does not exist: %s", serviceName)
case serviceName := <-obsManager.deleteObseravations:
if err := obsManager.deleteObservation(serviceName); err != nil {
s.logger.Warnf("deleting service observation: %s", err)
}
s.Mutex.Unlock()
case <-s.stop:
return
}
Expand All @@ -558,16 +531,105 @@ func (s *Surveyor) ManageObservastions(opts ...ManageObservationOpts) (*Observat
return obsManager, nil
}

func (om *ObservationsManager) addObservation(obsConfig ObservationConfig) error {
om.surveyor.Lock()
defer om.surveyor.Unlock()
for _, existingObservation := range om.surveyor.observations {
if obsConfig.ServiceName == existingObservation.opts.ServiceName {
return fmt.Errorf("observation with given service name already exists: %s", obsConfig.ServiceName)
}
}
obs, err := newServiceObservation(obsConfig, om.surveyor.opts, om.surveyor.observationMetrics, om.surveyor.reconnectCtr)
if err != nil {
if om.errHandler != nil {
om.errHandler(obsConfig.ServiceName, err)
}
return fmt.Errorf("could not create observation from config: %s: %s", obsConfig.ServiceName, err)
}

if err := obs.Start(); err != nil {
if om.errHandler != nil {
om.errHandler(obsConfig.ServiceName, err)
}
return fmt.Errorf("could not start observation for service: %s: %s", obsConfig.ServiceName, err)
}

om.surveyor.observations = append(om.surveyor.observations, obs)
return nil
}

func (om *ObservationsManager) updateObservation(updateObservation UpdateObservation) error {
om.surveyor.Lock()
defer om.surveyor.Unlock()
var found bool
var obsIndex int
for i, existingObservation := range om.surveyor.observations {
if updateObservation.Name == existingObservation.opts.ServiceName {
found = true
obsIndex = i
break
}
}
if !found {
return fmt.Errorf("observation with provided name does not exist: %s", updateObservation.Name)
}
obs, err := newServiceObservation(updateObservation.Config, om.surveyor.opts, om.surveyor.observationMetrics, om.surveyor.reconnectCtr)
if err != nil {
if om.errHandler != nil {
om.errHandler(updateObservation.Config.ServiceName, err)
}
return fmt.Errorf("could not create observation from config: %s: %s", updateObservation.Config.ServiceName, err)
}
if err := obs.Start(); err != nil {
if om.errHandler != nil {
om.errHandler(updateObservation.Config.ServiceName, err)
}
return fmt.Errorf("could not start observation for service: %s: %s", updateObservation.Config.ServiceName, err)
}

om.surveyor.observations[obsIndex].Stop()
om.surveyor.observations[obsIndex] = obs
return nil
}

func (om *ObservationsManager) deleteObservation(serviceName string) error {
om.surveyor.Lock()
defer om.surveyor.Unlock()
var found bool
for i, existingObservation := range om.surveyor.observations {
if serviceName == existingObservation.opts.ServiceName {
found = true
existingObservation.Stop()
if i < len(om.surveyor.observations)-1 {
om.surveyor.observations = append(om.surveyor.observations[:i], om.surveyor.observations[i+1:]...)
} else {
om.surveyor.observations = om.surveyor.observations[:i]
}
}
}
if !found {
return fmt.Errorf("observation with given service name does not exist: %s", serviceName)
}
return nil
}

// AddObservations creates and starts new service observations.
func (om *ObservationsManager) AddObservations(observations ...ObservationConfig) {
for _, obsConfig := range observations {
om.addObservation <- obsConfig
om.addObservations <- obsConfig
}
}

// DeleteObservations deletes exisiting observations with oprovided service names.
func (om *ObservationsManager) DeleteObservations(serviceNames ...string) {
for _, serviceName := range serviceNames {
om.deleteObseravation <- serviceName
om.deleteObseravations <- serviceName
}
}

// UpdateObservation deletes exisiting observations with oprovided service names.
func (om *ObservationsManager) UpdateObservations(observations ...UpdateObservation) {
for _, obsUpdate := range observations {
om.updateObservations <- obsUpdate
}
}
90 changes: 75 additions & 15 deletions surveyor/surveyor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,34 @@ func TestSurveyor_Observations(t *testing.T) {
if err != nil {
t.Fatalf("couldn't create surveyor: %v", err)
}

waitForMetricUpdate := func(t *testing.T, expectedObservationsNum int) {
t.Helper()
ticker := time.NewTicker(150 * time.Millisecond)
timeout := time.After(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
observationsNum := ptu.ToFloat64(s.observationMetrics.observationsGauge)
if observationsNum == float64(expectedObservationsNum) {
return
}
case <-timeout:
observationsNum := ptu.ToFloat64(s.observationMetrics.observationsGauge)
t.Fatalf("process error: invalid number of observations; want: %d; got: %f\n", expectedObservationsNum, observationsNum)
return
}
}
}
if err = s.Start(); err != nil {
t.Fatalf("start error: %v", err)
}
defer s.Stop()
obsManager, err := s.ManageObservastions()
errs := make(chan error, 10)
obsManager, err := s.ManageObservations(ObservationsError(func(s string, err error) {
errs <- err
}))
if err != nil {
t.Fatalf("Error creating observations manager: %s", err)
}
Expand All @@ -473,23 +496,42 @@ func TestSurveyor_Observations(t *testing.T) {
Credentials: "../test/myuser.creds",
},
)
time.Sleep(50 * time.Millisecond)

if ptu.ToFloat64(s.observationMetrics.observationsGauge) != 2 {
t.Fatalf("process error: observations not started")
waitForMetricUpdate(t, 2)
expectedServices := []string{"srv1", "srv2"}
for i, obs := range s.observations {
if obs.opts.ServiceName != expectedServices[i] {
t.Errorf("Unexpected service name: %s", obs.opts.ServiceName)
}
}

obsManager.DeleteObservations("srv1")
time.Sleep(50 * time.Millisecond)
if ptu.ToFloat64(s.observationMetrics.observationsGauge) != 1 {
t.Fatalf("process error: observations not started")
obsManager.UpdateObservations(
UpdateObservation{
Name: "srv1",
Config: ObservationConfig{
ServiceName: "srv3",
Topic: "testing_updated.topic",
Credentials: "../test/myuser.creds",
},
},
)

waitForMetricUpdate(t, 2)
expectedServices = []string{"srv3", "srv2"}
for i, obs := range s.observations {
if obs.opts.ServiceName != expectedServices[i] {
t.Errorf("Unexpected service name: %s", obs.opts.ServiceName)
}
}

obsManager.DeleteObservations("srv3")
waitForMetricUpdate(t, 1)

// observation no longer exists
obsManager.DeleteObservations("srv1")
time.Sleep(50 * time.Millisecond)
if ptu.ToFloat64(s.observationMetrics.observationsGauge) != 1 {
t.Fatalf("process error: observations not started")
obsManager.DeleteObservations("srv3")
waitForMetricUpdate(t, 1)

if len(errs) > 0 {
t.Errorf("Unexpected error when operating on observations: %s", err)
}
}

Expand All @@ -508,7 +550,7 @@ func TestSurveyor_ObservationsError(t *testing.T) {
}
errs := make(chan error)
defer s.Stop()
obsManager, err := s.ManageObservastions(ObservationsError(func(s string, err error) {
obsManager, err := s.ManageObservations(ObservationsError(func(s string, err error) {
errs <- err
}))
if err != nil {
Expand Down Expand Up @@ -544,6 +586,24 @@ func TestSurveyor_ObservationsError(t *testing.T) {
t.Errorf("Expected no error; got: %s", err)
case <-time.After(100 * time.Millisecond):
}

// update error, invalid config
obsManager.UpdateObservations(
UpdateObservation{
Name: "srv",
Config: ObservationConfig{
ServiceName: "srv",
Topic: "",
Credentials: "../test/myuser.creds",
},
},
)

select {
case <-errs:
case <-time.After(500 * time.Millisecond):
t.Errorf("Expected error; got timeout")
}
}

func TestSurveyor_ObservationsWatcher(t *testing.T) {
Expand Down Expand Up @@ -572,7 +632,7 @@ func TestSurveyor_ObservationsWatcher(t *testing.T) {

waitForMetricUpdate := func(t *testing.T, expectedObservationsNum int) {
t.Helper()
ticker := time.NewTicker(150 * time.Millisecond)
ticker := time.NewTicker(50 * time.Millisecond)
timeout := time.After(5 * time.Second)
defer ticker.Stop()
for {
Expand Down

0 comments on commit a92b548

Please sign in to comment.