Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.s7comm): Implement startup-error behavior settings #15655

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions plugins/inputs/s7comm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->

In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:

- `error`: Telegraf with stop and exit in case of startup errors. This is the
default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
the startup succeeds.

## Configuration

```toml @sample.conf
Expand Down
8 changes: 6 additions & 2 deletions plugins/inputs/s7comm/s7comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand Down Expand Up @@ -143,7 +144,7 @@ func (s *S7comm) Init() error {
s.handler = gos7.NewTCPClientHandlerWithConnectType(s.Server, s.Rack, s.Slot, connectionTypeMap[s.ConnectionType])
s.handler.Timeout = time.Duration(s.Timeout)
if s.DebugConnection {
s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm]", log.LstdFlags)
s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm] ", log.LstdFlags)
}

// Create the requests
Expand All @@ -154,7 +155,10 @@ func (s *S7comm) Init() error {
func (s *S7comm) Start(_ telegraf.Accumulator) error {
s.Log.Debugf("Connecting to %q...", s.Server)
if err := s.handler.Connect(); err != nil {
return fmt.Errorf("connecting to %q failed: %w", s.Server, err)
return &internal.StartupError{
Err: fmt.Errorf("connecting to %q failed: %w", s.Server, err),
Retry: true,
}
}
s.client = gos7.NewClient(s.handler)

Expand Down
241 changes: 210 additions & 31 deletions plugins/inputs/s7comm/s7comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"net"
"sync/atomic"
"testing"
"time"

"github.com/robinson/gos7"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -707,20 +711,220 @@ func TestMetricCollisions(t *testing.T) {

func TestConnectionLoss(t *testing.T) {
// Create fake S7 comm server that can accept connects
listener, err := net.Listen("tcp", "127.0.0.1:0")
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer listener.Close()
defer server.Close()
require.NoError(t, server.Start())

var connectionAttempts uint32
// Create the plugin and attempt a connection
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
require.NoError(t, plugin.Gather(&acc))
require.NoError(t, plugin.Gather(&acc))
plugin.Stop()
server.Close()

require.Equal(t, uint32(3), server.ConnectionAttempts.Load())
}

func TestStartupErrorBehaviorError(t *testing.T) {
// Create fake S7 comm server that can accept connects
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

// Setup the plugin and the model to be able to use the startup retry strategy
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "s7comm",
Alias: "error-test", // required to get a unique error stats instance
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())

// Starting the plugin will fail with an error because the server does not listen
var acc testutil.Accumulator
require.ErrorContains(t, model.Start(&acc), "connecting to \""+server.Addr()+"\" failed")
}

func TestStartupErrorBehaviorIgnore(t *testing.T) {
// Create fake S7 comm server that can accept connects
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

// Setup the plugin and the model to be able to use the startup retry strategy
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "s7comm",
Alias: "ignore-test", // required to get a unique error stats instance
StartupErrorBehavior: "ignore",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())

// Starting the plugin will fail because the server does not accept connections.
// The model code should convert it to a fatal error for the agent to remove
// the plugin.
var acc testutil.Accumulator
err = model.Start(&acc)
require.ErrorContains(t, err, "connecting to \""+server.Addr()+"\" failed")
var fatalErr *internal.FatalError
require.ErrorAs(t, err, &fatalErr)
}

func TestStartupErrorBehaviorRetry(t *testing.T) {
// Create fake S7 comm server that can accept connects
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

// Setup the plugin and the model to be able to use the startup retry strategy
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "s7comm",
Alias: "retry-test", // required to get a unique error stats instance
StartupErrorBehavior: "retry",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())

// Starting the plugin will return no error because the plugin will
// retry to connect in every gather cycle.
var acc testutil.Accumulator
require.NoError(t, model.Start(&acc))

// The gather should fail as the server does not accept connections (yet)
require.Empty(t, acc.GetTelegrafMetrics())
require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected)
require.Equal(t, int64(2), model.StartupErrors.Get())

// Allow connection in the server, now the connection should succeed
require.NoError(t, server.Start())
defer model.Stop()
require.NoError(t, model.Gather(&acc))
}

type MockServer struct {
ConnectionAttempts atomic.Uint32

listener net.Listener
}

func NewMockServer(addr string) (*MockServer, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
return &MockServer{listener: l}, nil
}

func (s *MockServer) Addr() string {
return s.listener.Addr().String()
}

func (s *MockServer) Close() error {
if s.listener != nil {
return s.listener.Close()
}
return nil
}

func (s *MockServer) Start() error {
go func() {
defer s.listener.Close()
for {
conn, err := listener.Accept()
conn, err := s.listener.Accept()
if err != nil {
return
}
if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil {
conn.Close()
return
}

// Count the number of connection attempts
atomic.AddUint32(&connectionAttempts, 1)
s.ConnectionAttempts.Add(1)

buf := make([]byte, 4096)

Expand Down Expand Up @@ -757,31 +961,6 @@ func TestConnectionLoss(t *testing.T) {
conn.Close()
}
}()
plugin := &S7comm{
Server: listener.Addr().String(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
require.NoError(t, plugin.Gather(&acc))
require.NoError(t, plugin.Gather(&acc))
plugin.Stop()
listener.Close()

require.Equal(t, 3, int(atomic.LoadUint32(&connectionAttempts)))
return nil
}
Loading