Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/k8sobjects] Fix issue where resourceVersion was not being remembered, #24806

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/k8sobjects-fix-resourceversion-bug.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/k8sobjects

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix bug where duplicate data would be ingested for watch mode if the client connection got reset.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24806]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 5 additions & 2 deletions receiver/k8sobjectsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
- `name`: Name of the resource object to collect
- `mode`: define in which way it collects this type of object, either "poll" or "watch".
- `pull` mode will read all objects of this type use the list API at an interval.
- `watch` mode will setup a long connection using the watch API to just get updates.
- `watch` mode will do setup a long connection using the watch API to just get updates.
- `label_selector`: select objects by label(s)
- `field_selector`: select objects by field(s)
- `interval`: the interval at which object is pulled, default 60 minutes. Only useful for `pull` mode.
- `resource_version` allows watch resources starting from a specific version (default = `1`). Only available for `watch` mode.
- `resource_version` allows watch resources starting from a specific version (default = `1`). Only available for `watch` mode. If not specified, the receiver will do an initial list to get the resourceVersion before starting the watch. See [Efficient Detection of Change](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) for details on why this is necessary.
- `namespaces`: An array of `namespaces` to collect events from. (default = `all`)
- `group`: API group name. It is an optional config. When given resource object is present in multiple groups,
use this config to specify the group to select. By default, it will select the first group.
Expand Down Expand Up @@ -121,6 +121,8 @@ Use the below commands to create a `ClusterRole` with required permissions and a
Following config will work for collecting pods and events only. You need to add
appropriate rule for collecting other objects.

When using watch mode without specifying a `resource_version` you must also specify `list` verb so that the receiver has permission to do its initial list.

```bash
<<EOF | kubectl apply -f -
apiVersion: rbac.authorization.k8s.io/v1
Expand All @@ -145,6 +147,7 @@ rules:
- events
verbs:
- watch
- list
EOF
```

Expand Down
4 changes: 0 additions & 4 deletions receiver/k8sobjectsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ func (c *Config) Validate() error {
object.Interval = defaultPullInterval
}

if object.Mode == WatchMode && object.ResourceVersion == "" {
object.ResourceVersion = defaultResourceVersion
}

object.gvr = gvr
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions receiver/k8sobjectsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestLoadConfig(t *testing.T) {
Mode: WatchMode,
Namespaces: []string{"default"},
Group: "events.k8s.io",
ResourceVersion: "1",
ResourceVersion: "",
gvr: &schema.GroupVersionResource{
Group: "events.k8s.io",
Version: "v1",
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestWatchResourceVersion(t *testing.T) {
Mode: WatchMode,
Namespaces: []string{"default"},
Group: "events.k8s.io",
ResourceVersion: "1",
ResourceVersion: "",
gvr: &schema.GroupVersionResource{
Group: "events.k8s.io",
Version: "v1",
Expand Down
4 changes: 2 additions & 2 deletions receiver/k8sobjectsreceiver/mock_dynamic_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c mockDynamicClient) createPods(objects ...*unstructured.Unstructured) {
}
}

func generatePod(name, namespace string, labels map[string]interface{}) *unstructured.Unstructured {
func generatePod(name, namespace string, labels map[string]interface{}, resourceVersion string) *unstructured.Unstructured {
pod := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
Expand All @@ -60,6 +60,6 @@ func generatePod(name, namespace string, labels map[string]interface{}) *unstruc
},
}

pod.SetResourceVersion("1")
pod.SetResourceVersion(resourceVersion)
return &pod
}
61 changes: 49 additions & 12 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sobjectsreceiver // import "github.com/open-telemetry/opentelemetry-co

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -146,40 +147,76 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.mu.Unlock()

resourceVersion, err := getResourceVersion(ctx, config, resource)
if err != nil {
kr.setting.Logger.Error("could not retrieve an initial resourceVersion", zap.String("resource", config.gvr.String()), zap.Error(err))
return
}
watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
return resource.Watch(ctx, metav1.ListOptions{
FieldSelector: config.FieldSelector,
LabelSelector: config.LabelSelector,
})
options.FieldSelector = config.FieldSelector
options.LabelSelector = config.LabelSelector
return resource.Watch(ctx, options)
}

watch, err := watch.NewRetryWatcher(config.ResourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
watcher, err := watch.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
if err != nil {
kr.setting.Logger.Error("error in watching object", zap.String("resource", config.gvr.String()), zap.Error(err))
return
}

res := watch.ResultChan()
res := watcher.ResultChan()
for {
select {
case data, ok := <-res:
if !ok {
kr.setting.Logger.Warn("Watch channel closed unexpectedly", zap.String("resource", config.gvr.String()))
return
}
logs := watchObjectsToLogData(&data, time.Now(), config)

obsCtx := kr.obsrecv.StartLogsOp(ctx)
err := kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, 1, err)
logs, err := watchObjectsToLogData(&data, time.Now(), config)
if err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're passing around resourceVersion values properly there is now a chance that the client returns a 410 and an status object complaining that the resource doesn't exist. This is a rare situation that would normally only happen if the user self-supplied a resourceVersion that was too old.

At the moment I am handling that as an error and allowing the processor to keep running, but the group that had the error will stop. The proper way to handle that situation is to do another List to get a proper resourceVersion again. I think we should do that, but as a followup PR.

kr.setting.Logger.Error("error converting objects to log data", zap.Error(err))
} else {
obsCtx := kr.obsrecv.StartLogsOp(ctx)
err := kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, 1, err)
}
case <-stopperChan:
watch.Stop()
watcher.Stop()
return
}
}

}

func getResourceVersion(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) {
resourceVersion := config.ResourceVersion
if resourceVersion == "" || resourceVersion == "0" {
// Proper use of the Kubernetes API Watch capability when no resourceVersion is supplied is to do a list first
// to get the initial state and a useable resourceVersion.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes for details.
objects, err := resource.List(ctx, metav1.ListOptions{
FieldSelector: config.FieldSelector,
LabelSelector: config.LabelSelector,
})
if err != nil {
return "", fmt.Errorf("could not perform initial list for watch on %v, %w", config.gvr.String(), err)
}
if objects == nil {
return "", fmt.Errorf("nil objects returned, this is an error in the k8sobjectsreceiver")
}

resourceVersion = objects.GetResourceVersion()

// If we still don't have a resourceVersion we can try 1 as a last ditch effort.
// This also helps our unit tests since the fake client can't handle returning resource versions
// as part of a list of objects.
if resourceVersion == "" || resourceVersion == "0" {
resourceVersion = defaultResourceVersion
}
}
return resourceVersion, nil
}

// Start ticking immediately.
// Ref: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
func NewTicker(repeat time.Duration) *time.Ticker {
Expand Down
33 changes: 16 additions & 17 deletions receiver/k8sobjectsreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func TestPullObject(t *testing.T) {
mockClient.createPods(
generatePod("pod1", "default", map[string]interface{}{
"environment": "production",
}),
}, "1"),
generatePod("pod2", "default", map[string]interface{}{
"environment": "test",
}),
}, "2"),
generatePod("pod3", "default_ignore", map[string]interface{}{
"environment": "production",
}),
}, "3"),
)

rCfg := createDefaultConfig().(*Config)
Expand Down Expand Up @@ -84,6 +84,12 @@ func TestWatchObject(t *testing.T) {

mockClient := newMockDynamicClient()

mockClient.createPods(
generatePod("pod1", "default", map[string]interface{}{
"environment": "production",
}, "1"),
)

rCfg := createDefaultConfig().(*Config)
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
rCfg.makeDiscoveryClient = getMockDiscoveryClient
Expand Down Expand Up @@ -112,30 +118,23 @@ func TestWatchObject(t *testing.T) {
require.NoError(t, r.Start(ctx, componenttest.NewNopHost()))

time.Sleep(time.Millisecond * 100)
assert.Len(t, consumer.Logs(), 0)
assert.Equal(t, 0, consumer.Count())

mockClient.createPods(
generatePod("pod1", "default", map[string]interface{}{
"environment": "production",
}),
generatePod("pod2", "default", map[string]interface{}{
"environment": "test",
}),
}, "2"),
generatePod("pod3", "default_ignore", map[string]interface{}{
"environment": "production",
}),
)
time.Sleep(time.Millisecond * 100)
assert.Len(t, consumer.Logs(), 2)
assert.Equal(t, 2, consumer.Count())

mockClient.createPods(
}, "3"),
generatePod("pod4", "default", map[string]interface{}{
"environment": "production",
}),
}, "4"),
)
time.Sleep(time.Millisecond * 100)
assert.Len(t, consumer.Logs(), 3)
assert.Equal(t, 3, consumer.Count())
assert.Len(t, consumer.Logs(), 2)
assert.Equal(t, 2, consumer.Count())

assert.NoError(t, r.Shutdown(ctx))
}
11 changes: 8 additions & 3 deletions receiver/k8sobjectsreceiver/unstructured_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package k8sobjectsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver"

import (
"fmt"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -15,8 +16,12 @@ import (

type attrUpdaterFunc func(pcommon.Map)

func watchObjectsToLogData(event *watch.Event, observedAt time.Time, config *K8sObjectsConfig) plog.Logs {
udata := event.Object.(*unstructured.Unstructured)
func watchObjectsToLogData(event *watch.Event, observedAt time.Time, config *K8sObjectsConfig) (plog.Logs, error) {
udata, ok := event.Object.(*unstructured.Unstructured)
if !ok {
return plog.Logs{}, fmt.Errorf("received data that wasnt unstructure, %v", event)
}

ul := unstructured.UnstructuredList{
Items: []unstructured.Unstructured{{
Object: map[string]interface{}{
Expand All @@ -33,7 +38,7 @@ func watchObjectsToLogData(event *watch.Event, observedAt time.Time, config *K8s
attrs.PutStr("event.domain", "k8s")
attrs.PutStr("event.name", name)
}
})
}), nil
}

func pullObjectsToLogData(event *unstructured.UnstructuredList, observedAt time.Time, config *K8sObjectsConfig) plog.Logs {
Expand Down
6 changes: 4 additions & 2 deletions receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func TestUnstructuredListToLogData(t *testing.T) {
},
}

logs := watchObjectsToLogData(event, time.Now(), config)
logs, err := watchObjectsToLogData(event, time.Now(), config)
assert.NoError(t, err)

assert.Equal(t, logs.LogRecordCount(), 1)

Expand Down Expand Up @@ -153,7 +154,8 @@ func TestUnstructuredListToLogData(t *testing.T) {
}

observedAt := time.Now()
logs := watchObjectsToLogData(event, observedAt, config)
logs, err := watchObjectsToLogData(event, observedAt, config)
assert.NoError(t, err)

assert.Equal(t, logs.LogRecordCount(), 1)

Expand Down