-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathunstructured_to_logdata.go
81 lines (68 loc) · 2.49 KB
/
unstructured_to_logdata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package k8sobjectsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver"
import (
"fmt"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.9.0"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)
type attrUpdaterFunc func(pcommon.Map)
func watchObjectsToLogData(event *watch.Event, observedAt time.Time, config *K8sObjectsConfig) (plog.Logs, error) {
udata, ok := event.Object.(*unstructured.Unstructured)
if !ok {
return plog.Logs{}, fmt.Errorf("received data that wasnt unstructure, %v", event)
}
ul := unstructured.UnstructuredList{
Items: []unstructured.Unstructured{{
Object: map[string]any{
"type": string(event.Type),
"object": udata.Object,
},
}},
}
return unstructuredListToLogData(&ul, observedAt, config, func(attrs pcommon.Map) {
objectMeta := udata.Object["metadata"].(map[string]any)
name := objectMeta["name"].(string)
if name != "" {
attrs.PutStr("event.domain", "k8s")
attrs.PutStr("event.name", name)
}
}), nil
}
func pullObjectsToLogData(event *unstructured.UnstructuredList, observedAt time.Time, config *K8sObjectsConfig) plog.Logs {
return unstructuredListToLogData(event, observedAt, config)
}
func unstructuredListToLogData(event *unstructured.UnstructuredList, observedAt time.Time, config *K8sObjectsConfig, attrUpdaters ...attrUpdaterFunc) plog.Logs {
out := plog.NewLogs()
resourceLogs := out.ResourceLogs()
namespaceResourceMap := make(map[string]plog.LogRecordSlice)
for _, e := range event.Items {
logSlice, ok := namespaceResourceMap[e.GetNamespace()]
if !ok {
rl := resourceLogs.AppendEmpty()
resourceAttrs := rl.Resource().Attributes()
if namespace := e.GetNamespace(); namespace != "" {
resourceAttrs.PutStr(semconv.AttributeK8SNamespaceName, namespace)
}
sl := rl.ScopeLogs().AppendEmpty()
logSlice = sl.LogRecords()
namespaceResourceMap[e.GetNamespace()] = logSlice
}
record := logSlice.AppendEmpty()
record.SetObservedTimestamp(pcommon.NewTimestampFromTime(observedAt))
attrs := record.Attributes()
attrs.PutStr("k8s.resource.name", config.gvr.Resource)
for _, attrUpdate := range attrUpdaters {
attrUpdate(attrs)
}
dest := record.Body()
destMap := dest.SetEmptyMap()
//nolint:errcheck
destMap.FromRaw(e.Object)
}
return out
}