Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate status reporting on start #8836

Merged
merged 12 commits into from
Nov 28, 2023
25 changes: 25 additions & 0 deletions .chloggen/automated-status-on-start.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: statusreporting

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Automates status reporting upon the completion of component.Start().

# One or more tracking issues or pull requests related to the change
issues: [7682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
13 changes: 7 additions & 6 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
)

type TelemetrySettingsBase[T any] struct {
mwear marked this conversation as resolved.
Show resolved Hide resolved
// TelemetrySettings provides components with APIs to report telemetry.
//
// Note: there is a service version of this struct, servicetelemetry.TelemetrySettings, that mirrors
// this struct with the exception of ReportComponentStatus. When adding or removing anything from
// this struct consider whether or not the same should be done for the service version.
type TelemetrySettings struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
Logger *zap.Logger
Expand Down Expand Up @@ -40,9 +45,5 @@ type TelemetrySettingsBase[T any] struct {
// - Calling this method before component startup
//
// If the API is being used properly, these errors are safe to ignore.
ReportComponentStatus T
ReportComponentStatus StatusFunc
}

// TelemetrySettings and servicetelemetry.Settings differ in the method signature for
// ReportComponentStatus
type TelemetrySettings TelemetrySettingsBase[StatusFunc]
7 changes: 4 additions & 3 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel
c.seenSettings[telemetrySettings] = struct{}{}
prev := c.telemetry.ReportComponentStatus
c.telemetry.ReportComponentStatus = func(ev *component.StatusEvent) error {
if err := telemetrySettings.ReportComponentStatus(ev); err != nil {
return err
err := telemetrySettings.ReportComponentStatus(ev)
if prevErr := prev(ev); prevErr != nil {
err = prevErr
}
return prev(ev)
return err
}
}
return c, nil
Expand Down
24 changes: 19 additions & 5 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,24 @@ func TestComponentStatusWatcher(t *testing.T) {
// Start the newly created collector.
wg := startCollector(context.Background(), t, col)

// An unhealthy processor asynchronously reports a recoverable error.
expectedStatuses := []component.Status{
// An unhealthy processor asynchronously reports a recoverable error. Depending on the Go
// Scheduler the statuses reported at startup will be one of the two valid sequnces below.
startupStatuses1 := []component.Status{
component.StatusStarting,
component.StatusOK,
component.StatusRecoverableError,
}
startupStatuses2 := []component.Status{
component.StatusStarting,
component.StatusRecoverableError,
}
// the modulus of the actual statuses will match the modulus of the startup statuses
startupStatuses := func(actualStatuses []component.Status) []component.Status {
if len(actualStatuses)%2 == 1 {
return startupStatuses1
}
return startupStatuses2
}

// The "unhealthy" processors will now begin to asynchronously report StatusRecoverableError.
// We expect to see these reports.
Expand All @@ -197,8 +210,8 @@ func TestComponentStatusWatcher(t *testing.T) {
for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
// And all must have the expected statuses
assert.Equal(t, expectedStatuses, v)
// And all must have a valid startup sequence
assert.Equal(t, startupStatuses(v), v)
}
// We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml
// We must have exactly 3 items in our map. This ensures that the "source" argument
Expand All @@ -212,8 +225,9 @@ func TestComponentStatusWatcher(t *testing.T) {
wg.Wait()

// Check for additional statuses after Shutdown.
expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped)
for _, v := range changedComponents {
expectedStatuses := append([]component.Status{}, startupStatuses(v)...)
expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped)
assert.Equal(t, expectedStatuses, v)
}

Expand Down
30 changes: 25 additions & 5 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,22 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
return err
}
_ = bes.telemetry.Status.ReportComponentStatusIf(
instanceID,
component.NewStatusEvent(component.StatusOK),
func(st component.Status) bool { return st == component.StatusStarting },
)
extLogger.Info("Extension started.")
}
return nil
Expand All @@ -55,13 +66,22 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)
if err := ext.Shutdown(ctx); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
errs = multierr.Append(errs, err)
continue
}
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
}

return errs
Expand Down
8 changes: 5 additions & 3 deletions service/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
name: "successful startup/shutdown",
expectedStatuses: []*component.StatusEvent{
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -400,6 +401,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
name: "shutdown error",
expectedStatuses: []*component.StatusEvent{
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
Expand Down Expand Up @@ -430,11 +432,11 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
assert.NoError(t, err)

var actualStatuses []*component.StatusEvent
init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses = append(actualStatuses, ev)
})
extensions.telemetry.ReportComponentStatus = statusFunc
init()
extensions.telemetry.Status = rep
rep.Ready()

assert.Equal(t, tc.startErr, extensions.Start(context.Background(), componenttest.NewNopHost()))
if tc.startErr == nil {
Expand Down
31 changes: 26 additions & 5 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,24 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)

if compErr := comp.Start(ctx, host); compErr != nil {
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
return compErr
}

_ = g.telemetry.Status.ReportComponentStatusIf(
instanceID,
component.NewStatusEvent(component.StatusOK),
func(st component.Status) bool { return st == component.StatusStarting },
)
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
Expand All @@ -417,15 +429,24 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)

if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
continue
}

_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
}
return errs
}
Expand Down
13 changes: 10 additions & 3 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2163,11 +2163,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -2194,6 +2196,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -2206,11 +2209,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rSdErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -2223,11 +2228,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
instanceIDs[eSdErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
Expand All @@ -2240,12 +2247,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
pg.telemetry = servicetelemetry.NewNopTelemetrySettings()

actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent)
init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses[id] = append(actualStatuses[id], ev)
})

pg.telemetry.ReportComponentStatus = statusFunc
init()
pg.telemetry.Status = rep
rep.Ready()

e0, e1 := tc.edge[0], tc.edge[1]
pg.instanceIDs = map[int64]*component.InstanceID{
Expand Down
5 changes: 2 additions & 3 deletions service/internal/servicetelemetry/nop_telemetry_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/internal/status"
)

// NewNopTelemetrySettings returns a new nop settings for Create* functions.
Expand All @@ -21,8 +22,6 @@ func NewNopTelemetrySettings() TelemetrySettings {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error {
return nil
},
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,25 @@ import (

func TestNewNopSettings(t *testing.T) {
set := NewNopTelemetrySettings()

set.Status.Ready()
require.NotNil(t, set)
require.IsType(t, TelemetrySettings{}, set)
require.Equal(t, zap.NewNop(), set.Logger)
require.Equal(t, nooptrace.NewTracerProvider(), set.TracerProvider)
require.Equal(t, noopmetric.NewMeterProvider(), set.MeterProvider)
require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel)
require.Equal(t, pcommon.NewResource(), set.Resource)
require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusStarting)))
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
)
require.NoError(t,
set.Status.ReportComponentStatusIf(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
func(component.Status) bool { return true },
),
)
}
Loading