Skip to content

Commit

Permalink
StatusFunc improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Sep 10, 2023
1 parent b7684b1 commit 239c635
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 60 deletions.
14 changes: 8 additions & 6 deletions component/componenttest/nop_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (
// NewNopTelemetrySettings returns a new nop telemetry settings for Create* functions.
func NewNopTelemetrySettings() component.TelemetrySettings {
return component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(component.Status, ...component.StatusEventOption) {},
Logger: zap.NewNop(),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(component.Status, ...component.StatusEventOption) error {
return nil
},
}
}
2 changes: 1 addition & 1 deletion component/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,4 @@ type StatusWatcher interface {
ComponentStatusChanged(source *InstanceID, event *StatusEvent)
}

type StatusFunc func(Status, ...StatusEventOption)
type StatusFunc func(Status, ...StatusEventOption) error
11 changes: 11 additions & 0 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ type TelemetrySettingsBase[T any] struct {
// Resource contains the resource attributes for the collector's telemetry.
Resource pcommon.Resource

// ReportComponentStatus allows a component to report runtime changes in status. The service
// will automatically report status for a component during startup and shutdown. Components can
// use this method to report status after start and before shutdown. ReportComponentStatus
// will only return errors if the API used incorrectly. The three scenarios where an error will
// be returned are:
//
// - An illegal state transition
// - Using the WithError() option with a non-error status
// - Calling this method before component startup
//
// If the API is being used properly, these errors are safe to ignore.
ReportComponentStatus T
}

Expand Down
3 changes: 1 addition & 2 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ func TestComponentStatusWatcher(t *testing.T) {
changedComponents := map[*component.InstanceID]component.Status{}
var mux sync.Mutex
onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) {
// skip the startup notifications
if event.Status() == component.StatusStarting {
if event.Status() != component.StatusRecoverableError {
return
}
mux.Lock()
Expand Down
3 changes: 1 addition & 2 deletions processor/processortest/unhealthy_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ type unhealthyProcessor struct {
}

func (p unhealthyProcessor) Start(_ context.Context, host component.Host) error {
p.telemetry.ReportComponentStatus(component.StatusStarting)
go func() {
p.telemetry.ReportComponentStatus(component.StatusRecoverableError)
_ = p.telemetry.ReportComponentStatus(component.StatusRecoverableError)
}()
return nil
}
12 changes: 6 additions & 6 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,14 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
}

instanceID := g.instanceIDs[node.ID()]
g.telemetry.ReportComponentStatus(instanceID, component.StatusStarting)
_ = g.telemetry.ReportComponentStatus(instanceID, component.StatusStarting)

if compErr := comp.Start(ctx, host); compErr != nil {
g.telemetry.ReportComponentStatus(instanceID, component.StatusPermanentError, component.WithError(compErr))
_ = g.telemetry.ReportComponentStatus(instanceID, component.StatusPermanentError, component.WithError(compErr))
return compErr
}

g.telemetry.ReportComponentStatus(instanceID, component.StatusOK)
_ = g.telemetry.ReportComponentStatus(instanceID, component.StatusOK)
}
return nil
}
Expand All @@ -411,15 +411,15 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
}

instanceID := g.instanceIDs[node.ID()]
g.telemetry.ReportComponentStatus(instanceID, component.StatusStopping)
_ = g.telemetry.ReportComponentStatus(instanceID, component.StatusStopping)

if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
g.telemetry.ReportComponentStatus(instanceID, component.StatusPermanentError, component.WithError(compErr))
_ = g.telemetry.ReportComponentStatus(instanceID, component.StatusPermanentError, component.WithError(compErr))
continue
}

g.telemetry.ReportComponentStatus(instanceID, component.StatusStopped)
_ = g.telemetry.ReportComponentStatus(instanceID, component.StatusStopped)
}
return errs
}
Expand Down
21 changes: 12 additions & 9 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ func TestGraphStartStopCycle(t *testing.T) {
e1 := &testNode{id: component.NewIDWithName("e", "1")}

pg.instanceIDs = map[int64]*component.InstanceID{
r1.ID(): &component.InstanceID{},
p1.ID(): &component.InstanceID{},
c1.ID(): &component.InstanceID{},
e1.ID(): &component.InstanceID{},
r1.ID(): {},
p1.ID(): {},
c1.ID(): {},
e1.ID(): {},
}

pg.componentGraph.SetEdge(simple.Edge{F: r1, T: p1})
Expand Down Expand Up @@ -209,8 +209,8 @@ func TestGraphStartStopComponentError(t *testing.T) {
shutdownErr: errors.New("bar"),
}
pg.instanceIDs = map[int64]*component.InstanceID{
r1.ID(): &component.InstanceID{},
e1.ID(): &component.InstanceID{},
r1.ID(): {},
e1.ID(): {},
}
pg.componentGraph.SetEdge(simple.Edge{
F: r1,
Expand Down Expand Up @@ -2138,7 +2138,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
shutdownErr error
}{
{
name: "succesful startup/shutdown",
name: "successful startup/shutdown",
edge: [2]*testNode{rNoErr, eNoErr},
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rNoErr]: {
Expand Down Expand Up @@ -2227,8 +2227,8 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
pg.telemetry = servicetelemetry.NewNopSettings()

actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent)
pg.telemetry.ReportComponentStatus = status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
//copy event to normalize timestamp
init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
// copy event to normalize timestamp
opts := []component.StatusEventOption{component.WithTimestamp(now)}
if ev.Err() != nil {
opts = append(opts, component.WithError(ev.Err()))
Expand All @@ -2237,6 +2237,9 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
actualStatuses[id] = append(actualStatuses[id], evCopy)
})

pg.telemetry.ReportComponentStatus = statusFunc
init()

e0, e1 := tc.edge[0], tc.edge[1]
pg.instanceIDs = map[int64]*component.InstanceID{
e0.ID(): instanceIDs[e0],
Expand Down
14 changes: 8 additions & 6 deletions service/internal/servicetelemetry/nop_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (
// NewNopSettings returns a new nop settings for Create* functions.
func NewNopSettings() Settings {
return Settings{
Logger: zap.NewNop(),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.InstanceID, component.Status, ...component.StatusEventOption) {},
Logger: zap.NewNop(),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.InstanceID, component.Status, ...component.StatusEventOption) error {
return nil
},
}
}
3 changes: 1 addition & 2 deletions service/internal/servicetelemetry/nop_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ func TestNewNopSettings(t *testing.T) {
require.Equal(t, noop.NewMeterProvider(), set.MeterProvider)
require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel)
require.Equal(t, pcommon.NewResource(), set.Resource)

set.ReportComponentStatus(&component.InstanceID{}, component.StatusStarting)
require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.StatusStarting))
}
65 changes: 48 additions & 17 deletions service/internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type fsm struct {
onTransition onTransitionFunc
}

// Event will attempt to execute a state transition. If successful, it calls the onTransitionFunc
// Transition will attempt to execute a state transition. If successful, it calls the onTransitionFunc
// with a StatusEvent representing the new state. Returns an error if the arguments result in an
// invalid status, or if the state transition is not valid.
func (m *fsm) Event(status component.Status, options ...component.StatusEventOption) error {
func (m *fsm) Transition(status component.Status, options ...component.StatusEventOption) error {
if _, ok := m.transitions[m.current.Status()][status]; !ok {
return fmt.Errorf(
"cannot transition from %s to %s: %w",
Expand Down Expand Up @@ -93,34 +93,65 @@ func newFSM(onTransition onTransitionFunc) *fsm {
}
}

type InitFunc func()
type readyFunc func() bool

func initAndReadyFuncs() (InitFunc, readyFunc) {
mu := sync.RWMutex{}
isReady := false

init := func() {
mu.Lock()
defer mu.Unlock()
isReady = true
}

ready := func() bool {
mu.RLock()
defer mu.RUnlock()
return isReady
}

return init, ready
}

type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent)
type ServiceStatusFunc func(id *component.InstanceID, status component.Status, opts ...component.StatusEventOption)
type ServiceStatusFunc func(id *component.InstanceID, status component.Status, opts ...component.StatusEventOption) error

var errStatusNotReady = errors.New("report component status is not ready until service start")

// NewServiceStatusFunc returns a function to be used as ReportComponentStatus for
// servicetelemetry.Settings, which differs from component.TelemetrySettings in that
// the service version does not correspond to a specific component, and thus needs
// the a component.InstanceID as a parameter.
func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) ServiceStatusFunc {
var fsmMap sync.Map
return func(id *component.InstanceID, status component.Status, opts ...component.StatusEventOption) {
f, ok := fsmMap.Load(id)
if !ok {
f = newFSM(func(ev *component.StatusEvent) {
notifyStatusChange(id, ev)
})
if val, loaded := fsmMap.LoadOrStore(id, f); loaded {
f = val
func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, ServiceStatusFunc) {
init, isReady := initAndReadyFuncs()
mu := sync.Mutex{}
fsmMap := make(map[*component.InstanceID]*fsm)
return init,
func(id *component.InstanceID, status component.Status, opts ...component.StatusEventOption) error {
if !isReady() {
return errStatusNotReady
}
mu.Lock()
defer mu.Unlock()
fsm, ok := fsmMap[id]
if !ok {
fsm = newFSM(func(ev *component.StatusEvent) {
notifyStatusChange(id, ev)
})
fsmMap[id] = fsm
}
return fsm.Transition(status, opts...)
}
_ = f.(*fsm).Event(status, opts...)
}

}

// NewComponentStatusFunc returns a function to be used as ReportComponentStatus for
// component.TelemetrySettings, which differs from servicetelemetry.Settings in that
// the component version is tied to specific component instance.
func NewComponentStatusFunc(id *component.InstanceID, srvStatus ServiceStatusFunc) component.StatusFunc {
return func(status component.Status, opts ...component.StatusEventOption) {
srvStatus(id, status, opts...)
return func(status component.Status, opts ...component.StatusEventOption) error {
return srvStatus(id, status, opts...)
}
}
61 changes: 54 additions & 7 deletions service/internal/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package status

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestStatusFSM(t *testing.T) {

errorCount := 0
for _, status := range tc.reportedStatuses {
if err := fsm.Event(status); err != nil {
if err := fsm.Transition(status); err != nil {
errorCount++
require.ErrorIs(t, err, errInvalidStateTransition)
}
Expand All @@ -145,9 +146,11 @@ func TestStatusFSM(t *testing.T) {

func TestStatusEventError(t *testing.T) {
fsm := newFSM(func(*component.StatusEvent) {})
fsm.Event(component.StatusStarting)
err := fsm.Transition(component.StatusStarting)
require.NoError(t, err)

// the combination of StatusOK with an error is invalid
err := fsm.Event(component.StatusOK, component.WithError(assert.AnError))
err = fsm.Transition(component.StatusOK, component.WithError(assert.AnError))

require.Error(t, err)
require.ErrorIs(t, err, component.ErrStatusEventInvalidArgument)
Expand Down Expand Up @@ -183,18 +186,62 @@ func TestStatusFuncs(t *testing.T) {
id2: statuses2,
}

serviceStatusFn := NewServiceStatusFunc(statusFunc)

init, serviceStatusFn := NewServiceStatusFunc(statusFunc)
comp1Func := NewComponentStatusFunc(id1, serviceStatusFn)
comp2Func := NewComponentStatusFunc(id2, serviceStatusFn)
init()

for _, st := range statuses1 {
comp1Func(st)
require.NoError(t, comp1Func(st))
}

for _, st := range statuses2 {
comp2Func(st)
require.NoError(t, comp2Func(st))
}

require.Equal(t, expectedStatuses, actualStatuses)
}

func TestStatusFuncsConcurrent(t *testing.T) {
ids := []*component.InstanceID{{}, {}, {}, {}}
count := 0
statusFunc := func(id *component.InstanceID, ev *component.StatusEvent) {
count++
}
init, serviceStatusFn := NewServiceStatusFunc(statusFunc)
init()

wg := sync.WaitGroup{}
wg.Add(len(ids))

for _, id := range ids {
id := id
go func() {
compFn := NewComponentStatusFunc(id, serviceStatusFn)
_ = compFn(component.StatusStarting)
for i := 0; i < 1000; i++ {
_ = compFn(component.StatusRecoverableError)
_ = compFn(component.StatusOK)
}
wg.Done()
}()
}

wg.Wait()
require.Equal(t, 8004, count)
}

func TestStatusFuncReady(t *testing.T) {
statusFunc := func(*component.InstanceID, *component.StatusEvent) {}
init, serviceStatusFn := NewServiceStatusFunc(statusFunc)
id := &component.InstanceID{}

err := serviceStatusFn(id, component.StatusStarting)
require.Error(t, err)
require.ErrorIs(t, err, errStatusNotReady)

init()

err = serviceStatusFn(id, component.StatusStarting)
require.NoError(t, err)
}
Loading

0 comments on commit 239c635

Please sign in to comment.