From 6d7b76652b16f5e0524446c703eb2342cc8e75b6 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 2 Aug 2023 13:44:35 -0600 Subject: [PATCH 1/5] Fix issue where options weren't respected --- receiver/k8sobjectsreceiver/README.md | 7 +- receiver/k8sobjectsreceiver/config.go | 4 -- receiver/k8sobjectsreceiver/config_test.go | 4 +- .../mock_dynamic_client_test.go | 4 +- receiver/k8sobjectsreceiver/receiver.go | 68 +++++++++++++++---- receiver/k8sobjectsreceiver/receiver_test.go | 37 +++++----- .../unstructured_to_logdata.go | 11 ++- .../unstructured_to_logdata_test.go | 6 +- 8 files changed, 95 insertions(+), 46 deletions(-) diff --git a/receiver/k8sobjectsreceiver/README.md b/receiver/k8sobjectsreceiver/README.md index fdf00c2893f0..f4558f6f0337 100644 --- a/receiver/k8sobjectsreceiver/README.md +++ b/receiver/k8sobjectsreceiver/README.md @@ -46,11 +46,11 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount` - `name`: Name of the resource object to collect - `mode`: define in which way it collects this type of object, either "poll" or "watch". - `pull` mode will read all objects of this type use the list API at an interval. - - `watch` mode will setup a long connection using the watch API to just get updates. + - `watch` mode will do setup a long connection using the watch API to just get updates. - `label_selector`: select objects by label(s) - `field_selector`: select objects by field(s) - `interval`: the interval at which object is pulled, default 60 minutes. Only useful for `pull` mode. -- `resource_version` allows watch resources starting from a specific version (default = `1`). Only available for `watch` mode. +- `resource_version` allows watch resources starting from a specific version (default = `1`). Only available for `watch` mode. If not specified, the receiver will do an initial list to get the resourceVersion before starting the watch. See [Efficient Detection of Change](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) for details on why this is necessary. - `namespaces`: An array of `namespaces` to collect events from. (default = `all`) - `group`: API group name. It is an optional config. When given resource object is present in multiple groups, use this config to specify the group to select. By default, it will select the first group. @@ -121,6 +121,8 @@ Use the below commands to create a `ClusterRole` with required permissions and a Following config will work for collecting pods and events only. You need to add appropriate rule for collecting other objects. +When using watch mode without specifying a `resource_version` you must also specify `list` verb so that the receiver has permission to do its initial list. + ```bash < 0 { + logs := pullObjectsToLogData(objects, time.Now(), config) + obsCtx := kr.obsrecv.StartLogsOp(ctx) + err = kr.consumer.ConsumeLogs(obsCtx, logs) + kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, logs.LogRecordCount(), err) + } + + return objects.GetResourceVersion(), nil + +} + // Start ticking immediately. // Ref: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately func NewTicker(repeat time.Duration) *time.Ticker { diff --git a/receiver/k8sobjectsreceiver/receiver_test.go b/receiver/k8sobjectsreceiver/receiver_test.go index 1db25fc96988..eb264b241061 100644 --- a/receiver/k8sobjectsreceiver/receiver_test.go +++ b/receiver/k8sobjectsreceiver/receiver_test.go @@ -39,13 +39,13 @@ func TestPullObject(t *testing.T) { mockClient.createPods( generatePod("pod1", "default", map[string]interface{}{ "environment": "production", - }), + }, "1"), generatePod("pod2", "default", map[string]interface{}{ "environment": "test", - }), + }, "2"), generatePod("pod3", "default_ignore", map[string]interface{}{ "environment": "production", - }), + }, "3"), ) rCfg := createDefaultConfig().(*Config) @@ -84,6 +84,18 @@ func TestWatchObject(t *testing.T) { mockClient := newMockDynamicClient() + mockClient.createPods( + generatePod("pod1", "default", map[string]interface{}{ + "environment": "production", + }, "1"), + generatePod("pod2", "default", map[string]interface{}{ + "environment": "test", + }, "2"), + generatePod("pod3", "default_ignore", map[string]interface{}{ + "environment": "production", + }, "3"), + ) + rCfg := createDefaultConfig().(*Config) rCfg.makeDynamicClient = mockClient.getMockDynamicClient rCfg.makeDiscoveryClient = getMockDiscoveryClient @@ -112,29 +124,16 @@ func TestWatchObject(t *testing.T) { require.NoError(t, r.Start(ctx, componenttest.NewNopHost())) time.Sleep(time.Millisecond * 100) - - mockClient.createPods( - generatePod("pod1", "default", map[string]interface{}{ - "environment": "production", - }), - generatePod("pod2", "default", map[string]interface{}{ - "environment": "test", - }), - generatePod("pod3", "default_ignore", map[string]interface{}{ - "environment": "production", - }), - ) - time.Sleep(time.Millisecond * 100) - assert.Len(t, consumer.Logs(), 2) + assert.Len(t, consumer.Logs(), 1) assert.Equal(t, 2, consumer.Count()) mockClient.createPods( generatePod("pod4", "default", map[string]interface{}{ "environment": "production", - }), + }, "4"), ) time.Sleep(time.Millisecond * 100) - assert.Len(t, consumer.Logs(), 3) + assert.Len(t, consumer.Logs(), 2) assert.Equal(t, 3, consumer.Count()) assert.NoError(t, r.Shutdown(ctx)) diff --git a/receiver/k8sobjectsreceiver/unstructured_to_logdata.go b/receiver/k8sobjectsreceiver/unstructured_to_logdata.go index ff7b9f9ffb84..589b0547b369 100644 --- a/receiver/k8sobjectsreceiver/unstructured_to_logdata.go +++ b/receiver/k8sobjectsreceiver/unstructured_to_logdata.go @@ -4,6 +4,7 @@ package k8sobjectsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver" import ( + "fmt" "time" "go.opentelemetry.io/collector/pdata/pcommon" @@ -15,8 +16,12 @@ import ( type attrUpdaterFunc func(pcommon.Map) -func watchObjectsToLogData(event *watch.Event, observedAt time.Time, config *K8sObjectsConfig) plog.Logs { - udata := event.Object.(*unstructured.Unstructured) +func watchObjectsToLogData(event *watch.Event, observedAt time.Time, config *K8sObjectsConfig) (plog.Logs, error) { + udata, ok := event.Object.(*unstructured.Unstructured) + if !ok { + return plog.Logs{}, fmt.Errorf("received data that wasnt unstructure, %v", event) + } + ul := unstructured.UnstructuredList{ Items: []unstructured.Unstructured{{ Object: map[string]interface{}{ @@ -33,7 +38,7 @@ func watchObjectsToLogData(event *watch.Event, observedAt time.Time, config *K8s attrs.PutStr("event.domain", "k8s") attrs.PutStr("event.name", name) } - }) + }), nil } func pullObjectsToLogData(event *unstructured.UnstructuredList, observedAt time.Time, config *K8sObjectsConfig) plog.Logs { diff --git a/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go b/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go index 4f0c16db460e..d25c8116bcd8 100644 --- a/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go +++ b/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go @@ -113,7 +113,8 @@ func TestUnstructuredListToLogData(t *testing.T) { }, } - logs := watchObjectsToLogData(event, time.Now(), config) + logs, err := watchObjectsToLogData(event, time.Now(), config) + assert.NoError(t, err) assert.Equal(t, logs.LogRecordCount(), 1) @@ -153,7 +154,8 @@ func TestUnstructuredListToLogData(t *testing.T) { } observedAt := time.Now() - logs := watchObjectsToLogData(event, observedAt, config) + logs, err := watchObjectsToLogData(event, observedAt, config) + assert.NoError(t, err) assert.Equal(t, logs.LogRecordCount(), 1) From e1050b533f69149baae8c05d578b88bf98674c4a Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 2 Aug 2023 15:23:12 -0600 Subject: [PATCH 2/5] add comment --- receiver/k8sobjectsreceiver/receiver.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index 08da8182b37f..203361369062 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -159,6 +159,9 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects kr.setting.Logger.Error("could not perform initial list for watch", zap.String("resource", config.gvr.String()), zap.Error(err)) return } + // If we still don't have a resourceVersion we can try 1 as a last ditch effort. + // This also helps our unit tests since the fake client can't handle returning resource versions + // as part of a list of objects. if resourceVersion == "" { resourceVersion = defaultResourceVersion } From d7ff668cddade89e0d6efb29a8a2fdab558fd7df Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 2 Aug 2023 15:23:37 -0600 Subject: [PATCH 3/5] remove log --- receiver/k8sobjectsreceiver/receiver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index 203361369062..6242c89770cb 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -154,7 +154,6 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects // to get the initial state and a useable resourceVersion. // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes for details. resourceVersion, err = kr.doInitialList(ctx, config, resource) - kr.setting.Logger.Info("starting resourceVersion", zap.String("resourceVersion", resourceVersion)) if err != nil { kr.setting.Logger.Error("could not perform initial list for watch", zap.String("resource", config.gvr.String()), zap.Error(err)) return From 94f1cfd644feda71a5a1dd0261077705f5a71e4a Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 2 Aug 2023 15:25:20 -0600 Subject: [PATCH 4/5] changelog --- .../k8sobjects-fix-resourceversion-bug.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100755 .chloggen/k8sobjects-fix-resourceversion-bug.yaml diff --git a/.chloggen/k8sobjects-fix-resourceversion-bug.yaml b/.chloggen/k8sobjects-fix-resourceversion-bug.yaml new file mode 100755 index 000000000000..895a914bc7f2 --- /dev/null +++ b/.chloggen/k8sobjects-fix-resourceversion-bug.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/k8sobjects + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix bug where duplicate data would be ingested for watch mode if the client connection got reset. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24806] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 7c48a78aeb5e035e1f8d3ac35c252d132640935d Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Thu, 3 Aug 2023 13:16:07 -0600 Subject: [PATCH 5/5] Stop ingesting initial state --- receiver/k8sobjectsreceiver/receiver.go | 65 +++++++++----------- receiver/k8sobjectsreceiver/receiver_test.go | 18 +++--- 2 files changed, 37 insertions(+), 46 deletions(-) diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index 6242c89770cb..2c226d8e8626 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -147,25 +147,11 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects kr.stopperChanList = append(kr.stopperChanList, stopperChan) kr.mu.Unlock() - resourceVersion := config.ResourceVersion - var err error - if resourceVersion == "" { - // Proper use of the Kubernetes API Watch capability when no resourceVersion is supplied is to do a list first - // to get the initial state and a useable resourceVersion. - // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes for details. - resourceVersion, err = kr.doInitialList(ctx, config, resource) - if err != nil { - kr.setting.Logger.Error("could not perform initial list for watch", zap.String("resource", config.gvr.String()), zap.Error(err)) - return - } - // If we still don't have a resourceVersion we can try 1 as a last ditch effort. - // This also helps our unit tests since the fake client can't handle returning resource versions - // as part of a list of objects. - if resourceVersion == "" { - resourceVersion = defaultResourceVersion - } + resourceVersion, err := getResourceVersion(ctx, config, resource) + if err != nil { + kr.setting.Logger.Error("could not retrieve an initial resourceVersion", zap.String("resource", config.gvr.String()), zap.Error(err)) + return } - watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) { options.FieldSelector = config.FieldSelector options.LabelSelector = config.LabelSelector @@ -202,28 +188,33 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects } -func (kr *k8sobjectsreceiver) doInitialList(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) { - objects, err := resource.List(ctx, metav1.ListOptions{ - FieldSelector: config.FieldSelector, - LabelSelector: config.LabelSelector, - }) - if err != nil { - return "", err - } +func getResourceVersion(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) { + resourceVersion := config.ResourceVersion + if resourceVersion == "" || resourceVersion == "0" { + // Proper use of the Kubernetes API Watch capability when no resourceVersion is supplied is to do a list first + // to get the initial state and a useable resourceVersion. + // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes for details. + objects, err := resource.List(ctx, metav1.ListOptions{ + FieldSelector: config.FieldSelector, + LabelSelector: config.LabelSelector, + }) + if err != nil { + return "", fmt.Errorf("could not perform initial list for watch on %v, %w", config.gvr.String(), err) + } + if objects == nil { + return "", fmt.Errorf("nil objects returned, this is an error in the k8sobjectsreceiver") + } - if objects == nil { - return "", fmt.Errorf("nil objects returned, this is an error in the k8sobjectsreceiver") - } + resourceVersion = objects.GetResourceVersion() - if len(objects.Items) > 0 { - logs := pullObjectsToLogData(objects, time.Now(), config) - obsCtx := kr.obsrecv.StartLogsOp(ctx) - err = kr.consumer.ConsumeLogs(obsCtx, logs) - kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, logs.LogRecordCount(), err) + // If we still don't have a resourceVersion we can try 1 as a last ditch effort. + // This also helps our unit tests since the fake client can't handle returning resource versions + // as part of a list of objects. + if resourceVersion == "" || resourceVersion == "0" { + resourceVersion = defaultResourceVersion + } } - - return objects.GetResourceVersion(), nil - + return resourceVersion, nil } // Start ticking immediately. diff --git a/receiver/k8sobjectsreceiver/receiver_test.go b/receiver/k8sobjectsreceiver/receiver_test.go index eb264b241061..2105fab1ce61 100644 --- a/receiver/k8sobjectsreceiver/receiver_test.go +++ b/receiver/k8sobjectsreceiver/receiver_test.go @@ -88,12 +88,6 @@ func TestWatchObject(t *testing.T) { generatePod("pod1", "default", map[string]interface{}{ "environment": "production", }, "1"), - generatePod("pod2", "default", map[string]interface{}{ - "environment": "test", - }, "2"), - generatePod("pod3", "default_ignore", map[string]interface{}{ - "environment": "production", - }, "3"), ) rCfg := createDefaultConfig().(*Config) @@ -124,17 +118,23 @@ func TestWatchObject(t *testing.T) { require.NoError(t, r.Start(ctx, componenttest.NewNopHost())) time.Sleep(time.Millisecond * 100) - assert.Len(t, consumer.Logs(), 1) - assert.Equal(t, 2, consumer.Count()) + assert.Len(t, consumer.Logs(), 0) + assert.Equal(t, 0, consumer.Count()) mockClient.createPods( + generatePod("pod2", "default", map[string]interface{}{ + "environment": "test", + }, "2"), + generatePod("pod3", "default_ignore", map[string]interface{}{ + "environment": "production", + }, "3"), generatePod("pod4", "default", map[string]interface{}{ "environment": "production", }, "4"), ) time.Sleep(time.Millisecond * 100) assert.Len(t, consumer.Logs(), 2) - assert.Equal(t, 3, consumer.Count()) + assert.Equal(t, 2, consumer.Count()) assert.NoError(t, r.Shutdown(ctx)) }