diff --git a/go.mod b/go.mod index 9041953fcb567..9ddbaffbdf6bd 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/google/gopacket v1.1.19 github.com/google/licensecheck v0.3.1 github.com/google/uuid v1.5.0 - github.com/gopcua/opcua v0.4.0 + github.com/gopcua/opcua v0.5.3 github.com/gophercloud/gophercloud v1.7.0 github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.1 diff --git a/go.sum b/go.sum index 1665d5e09847d..8b7c47ad23add 100644 --- a/go.sum +++ b/go.sum @@ -1383,8 +1383,8 @@ github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56 github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= -github.com/gopcua/opcua v0.4.0 h1:Pr0PMFViNOzvkcvmzP3yTKqtLFVL1OUgav3tDj+hpqQ= -github.com/gopcua/opcua v0.4.0/go.mod h1:6BsaYGu33RhVRxnK+EqHWwSG+hYCSAMjyIjx3RGV1PQ= +github.com/gopcua/opcua v0.5.3 h1:K5QQhjK9KQxQW8doHL/Cd8oljUeXWnJJsNgP7mOGIhw= +github.com/gopcua/opcua v0.5.3/go.mod h1:nrVl4/Rs3SDQRhNQ50EbAiI5JSpDrTG6Frx3s4HLnw4= github.com/gophercloud/gophercloud v1.7.0 h1:fyJGKh0LBvIZKLvBWvQdIgkaV5yTM3Jh9EYUh+UNCAs= github.com/gophercloud/gophercloud v1.7.0/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= diff --git a/plugins/common/opcua/client.go b/plugins/common/opcua/client.go index 600c9276a1bd5..125056e065365 100644 --- a/plugins/common/opcua/client.go +++ b/plugins/common/opcua/client.go @@ -174,7 +174,7 @@ func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool { } // Connect to an OPC UA device -func (o *OpcUAClient) Connect() error { +func (o *OpcUAClient) Connect(ctx context.Context) error { o.Log.Debug("Connecting OPC UA Client to server") u, err := url.Parse(o.Config.Endpoint) if err != nil { @@ -189,14 +189,17 @@ func (o *OpcUAClient) Connect() error { if o.Client != nil { o.Log.Warnf("Closing connection to %q as already connected", u) - if err := o.Client.Close(); err != nil { + if err := o.Client.Close(ctx); err != nil { // Only log the error but to not bail-out here as this prevents // reconnections for multiple parties (see e.g. #9523). o.Log.Errorf("Closing connection failed: %v", err) } } - o.Client = opcua.NewClient(o.Config.Endpoint, o.opts...) + o.Client, err = opcua.NewClient(o.Config.Endpoint, o.opts...) + if err != nil { + return fmt.Errorf("error in new client: %w", err) + } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout)) defer cancel() if err := o.Client.Connect(ctx); err != nil { @@ -220,7 +223,7 @@ func (o *OpcUAClient) Disconnect(ctx context.Context) error { switch u.Scheme { case "opc.tcp": // We can't do anything about failing to close a connection - err := o.Client.CloseWithContext(ctx) + err := o.Client.Close(ctx) o.Client = nil return err default: diff --git a/plugins/common/opcua/input/input_client_test.go b/plugins/common/opcua/input/input_client_test.go index a55735f616270..255661aee8cda 100644 --- a/plugins/common/opcua/input/input_client_test.go +++ b/plugins/common/opcua/input/input_client_test.go @@ -833,7 +833,7 @@ func TestMetricForNode(t *testing.T) { status: ua.StatusOK, expected: metric.New("testingmetric", map[string]string{"t1": "v1", "id": "ns=3;s=hi"}, - map[string]interface{}{"Quality": "OK (0x0)", "fn": 16}, + map[string]interface{}{"Quality": "The operation succeeded. StatusGood (0x0)", "fn": 16}, time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{})), }, } diff --git a/plugins/inputs/opcua/opcua_test.go b/plugins/inputs/opcua/opcua_test.go index afa5d5ec4e836..1926e569802bb 100644 --- a/plugins/inputs/opcua/opcua_test.go +++ b/plugins/inputs/opcua/opcua_test.go @@ -185,12 +185,12 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) { "DateTime", } testopcquality := []string{ - "OK (0x0)", - "OK (0x0)", - "OK (0x0)", + "The operation succeeded. StatusGood (0x0)", + "The operation succeeded. StatusGood (0x0)", + "The operation succeeded. StatusGood (0x0)", "User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)", - "OK (0x0)", - "OK (0x0)", + "The operation succeeded. StatusGood (0x0)", + "The operation succeeded. StatusGood (0x0)", } expectedopcmetrics := []telegraf.Metric{} for i, x := range testopctags { diff --git a/plugins/inputs/opcua/read_client.go b/plugins/inputs/opcua/read_client.go index 29986e218040a..9d756ccf805ec 100644 --- a/plugins/inputs/opcua/read_client.go +++ b/plugins/inputs/opcua/read_client.go @@ -31,6 +31,7 @@ type ReadClient struct { // internal values req *ua.ReadRequest + ctx context.Context } func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) { @@ -52,7 +53,9 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, } func (o *ReadClient) Connect() error { - if err := o.OpcUAClient.Connect(); err != nil { + o.ctx = context.Background() + + if err := o.OpcUAClient.Connect(o.ctx); err != nil { return fmt.Errorf("connect failed: %w", err) } @@ -68,7 +71,7 @@ func (o *ReadClient) Connect() error { readValueIds = append(readValueIds, &ua.ReadValueID{NodeID: nid}) } } else { - regResp, err := o.Client.RegisterNodes(&ua.RegisterNodesRequest{ + regResp, err := o.Client.RegisterNodes(o.ctx, &ua.RegisterNodesRequest{ NodesToRegister: o.NodeIDs, }) if err != nil { @@ -133,7 +136,7 @@ func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) { } func (o *ReadClient) read() error { - resp, err := o.Client.Read(o.req) + resp, err := o.Client.Read(o.ctx, o.req) if err != nil { o.ReadError.Incr(1) return fmt.Errorf("RegisterNodes Read failed: %w", err) diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go index 8bd7e5df71d32..9aaa44ddcc98c 100644 --- a/plugins/inputs/opcua_listener/opcua_listener_test.go +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -274,12 +274,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { "DateTime", } testopcquality := []string{ - "OK (0x0)", - "OK (0x0)", - "OK (0x0)", + "The operation succeeded. StatusGood (0x0)", + "The operation succeeded. StatusGood (0x0)", + "The operation succeeded. StatusGood (0x0)", "User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)", - "OK (0x0)", - "OK (0x0)", + "The operation succeeded. StatusGood (0x0)", + "The operation succeeded. StatusGood (0x0)", } expectedopcmetrics := []telegraf.Metric{} for i, x := range testopctags { diff --git a/plugins/inputs/opcua_listener/subscribe_client.go b/plugins/inputs/opcua_listener/subscribe_client.go index a6e914c20a011..bb2147c76e513 100644 --- a/plugins/inputs/opcua_listener/subscribe_client.go +++ b/plugins/inputs/opcua_listener/subscribe_client.go @@ -30,8 +30,8 @@ type SubscribeClient struct { dataNotifications chan *opcua.PublishNotificationData metrics chan telegraf.Metric - processingCtx context.Context - processingCancel context.CancelFunc + ctx context.Context + cancel context.CancelFunc } func checkDataChangeFilterParameters(params *input.DataChangeFilter) error { @@ -90,6 +90,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su return nil, err } + processingCtx, processingCancel := context.WithCancel(context.Background()) subClient := &SubscribeClient{ OpcUAInputClient: client, Config: *sc, @@ -99,6 +100,8 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su // the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval. dataNotifications: make(chan *opcua.PublishNotificationData, 100), metrics: make(chan telegraf.Metric, 100), + ctx: processingCtx, + cancel: processingCancel, } log.Debugf("Creating monitored items") @@ -115,13 +118,13 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su } func (o *SubscribeClient) Connect() error { - err := o.OpcUAClient.Connect() + err := o.OpcUAClient.Connect(o.ctx) if err != nil { return err } o.Log.Debugf("Creating OPC UA subscription") - o.sub, err = o.Client.Subscribe(&opcua.SubscriptionParameters{ + o.sub, err = o.Client.Subscribe(o.ctx, &opcua.SubscriptionParameters{ Interval: time.Duration(o.Config.SubscriptionInterval), }, o.dataNotifications) if err != nil { @@ -144,7 +147,7 @@ func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} { } } closing := o.OpcUAInputClient.Stop(ctx) - o.processingCancel() + o.cancel() return closing } @@ -166,7 +169,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra return nil, err } - resp, err := o.sub.MonitorWithContext(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...) + resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...) if err != nil { return nil, fmt.Errorf("failed to start monitoring items: %w", err) } @@ -178,7 +181,6 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra } } - o.processingCtx, o.processingCancel = context.WithCancel(context.Background()) go o.processReceivedNotifications() return o.metrics, nil @@ -187,7 +189,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra func (o *SubscribeClient) processReceivedNotifications() { for { select { - case <-o.processingCtx.Done(): + case <-o.ctx.Done(): o.Log.Debug("Processing received notifications stopped") return