-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathk8s.go
318 lines (290 loc) · 9.69 KB
/
k8s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datasenders // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datasenders"
import (
"context"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)
// FileLogK8sWriter represents abstract container k8s writer
type FileLogK8sWriter struct {
file *os.File
config string
}
// Ensure FileLogK8sWriter implements LogDataSender.
var _ testbed.LogDataSender = (*FileLogK8sWriter)(nil)
// NewFileLogK8sWriter creates a new data sender that will write kubernetes containerd
// log entries to a file, to be tailed by FileLogReceiver and sent to the collector.
//
// config is an Otelcol config appended to the receivers section after executing fmt.Sprintf on it.
// This implies few things:
// - it should contain `%s` which will be replaced with the filename
// - all `%` should be represented as `%%`
// - indentation style matters. Spaces have to be used for indentation
// and it should start with two spaces indentation
//
// Example config:
// |`
// | filelog:
// | include: [ %s ]
// | start_at: beginning
// | operators:
// | type: regex_parser
// | regex: ^(?P<log>.*)$
// | `
func NewFileLogK8sWriter(config string) *FileLogK8sWriter {
dir, err := os.MkdirTemp("", "namespace-*_test-pod_000011112222333344445555666677778888")
if err != nil {
panic("failed to create temp dir")
}
dir, err = os.MkdirTemp(dir, "*")
if err != nil {
panic("failed to create temp dir")
}
file, err := os.CreateTemp(dir, "*.log")
if err != nil {
panic("failed to create temp file")
}
f := &FileLogK8sWriter{
file: file,
config: config,
}
return f
}
func (f *FileLogK8sWriter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (f *FileLogK8sWriter) Start() error {
return nil
}
func (f *FileLogK8sWriter) ConsumeLogs(_ context.Context, logs plog.Logs) error {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
for j := 0; j < logs.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
ills := logs.ResourceLogs().At(i).ScopeLogs().At(j)
for k := 0; k < ills.LogRecords().Len(); k++ {
_, err := f.file.Write(append(f.convertLogToTextLine(ills.LogRecords().At(k)), '\n'))
if err != nil {
return err
}
}
}
}
return nil
}
func (f *FileLogK8sWriter) convertLogToTextLine(lr plog.LogRecord) []byte {
sb := strings.Builder{}
// Timestamp
sb.WriteString(time.Unix(0, int64(lr.Timestamp())).Format("2006-01-02T15:04:05.000000000Z"))
// Severity
sb.WriteString(" stderr F ")
sb.WriteString(lr.SeverityText())
sb.WriteString(" ")
if lr.Body().Type() == pcommon.ValueTypeStr {
sb.WriteString(lr.Body().Str())
}
lr.Attributes().Range(func(k string, v pcommon.Value) bool {
sb.WriteString(" ")
sb.WriteString(k)
sb.WriteString("=")
switch v.Type() {
case pcommon.ValueTypeStr:
sb.WriteString(v.Str())
case pcommon.ValueTypeInt:
sb.WriteString(strconv.FormatInt(v.Int(), 10))
case pcommon.ValueTypeDouble:
sb.WriteString(strconv.FormatFloat(v.Double(), 'f', -1, 64))
case pcommon.ValueTypeBool:
sb.WriteString(strconv.FormatBool(v.Bool()))
default:
panic("missing case")
}
return true
})
return []byte(sb.String())
}
func (f *FileLogK8sWriter) Flush() {
_ = f.file.Sync()
}
func (f *FileLogK8sWriter) GenConfigYAMLStr() string {
// Note that this generates a receiver config for agent.
// We are testing filelog receiver here.
return fmt.Sprintf(f.config, f.file.Name())
}
func (f *FileLogK8sWriter) ProtocolName() string {
return "filelog"
}
func (f *FileLogK8sWriter) GetEndpoint() net.Addr {
return nil
}
// NewKubernetesContainerWriter returns FileLogK8sWriter with configuration
// to recognize and parse kubernetes container logs
func NewKubernetesContainerWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Find out which format is used by kubernetes
- type: router
id: get-format
routes:
- output: parser-docker
expr: 'body matches "^\\{"'
- output: parser-crio
expr: 'body matches "^[^ Z]+ "'
- output: parser-containerd
expr: 'body matches "^[^ Z]+Z"'
# Parse CRI-O format
- type: regex_parser
id: parser-crio
regex: '^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: attributes.time
layout_type: gotime
layout: '2006-01-02T15:04:05.000000000-07:00'
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: attributes.time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
# Parse Docker format
- type: json_parser
id: parser-docker
output: extract_metadata_from_filepath
timestamp:
parse_from: attributes.time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
- type: move
from: attributes.log
to: body
# Extract metadata from file path
- type: regex_parser
id: extract_metadata_from_filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$'
parse_from: attributes["log.file.path"]
# Move out attributes to Attributes
- type: move
from: attributes.stream
to: attributes["log.iostream"]
- type: move
from: attributes.container_name
to: attributes["k8s.container.name"]
- type: move
from: attributes.namespace
to: attributes["k8s.namespace.name"]
- type: move
from: attributes.pod_name
to: attributes["k8s.pod.name"]
- type: move
from: attributes.restart_count
to: attributes["k8s.container.restart_count"]
- type: move
from: attributes.uid
to: attributes["k8s.pod.uid"]
`)
}
// NewKubernetesCRIContainerdWriter returns FileLogK8sWriter with configuration
// to parse only CRI-Containerd kubernetes logs
func NewKubernetesCRIContainerdWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: attributes.time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
# Extract metadata from file path
- type: regex_parser
id: extract_metadata_from_filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$'
parse_from: attributes["log.file.path"]
- type: move
from: attributes.log
to: body
# Move out attributes to Attributes
- type: move
from: attributes.stream
to: attributes["log.iostream"]
- type: move
from: attributes.container_name
to: attributes["k8s.container.name"]
- type: move
from: attributes.namespace
to: attributes["k8s.namespace.name"]
- type: move
from: attributes.pod_name
to: attributes["k8s.pod.name"]
- type: move
from: attributes.restart_count
to: attributes["k8s.container.restart_count"]
- type: move
from: attributes.uid
to: attributes["k8s.pod.uid"]
`)
}
// NewKubernetesCRIContainerdNoAttributesOpsWriter returns FileLogK8sWriter with configuration
// to parse only CRI-Containerd kubernetes logs without reformatting attributes
func NewKubernetesCRIContainerdNoAttributesOpsWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: attributes.time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
# Extract metadata from file path
- type: regex_parser
id: extract_metadata_from_filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$'
parse_from: attributes["log.file.path"]
`)
}
// NewCRIContainerdWriter returns FileLogK8sWriter with configuration
// to parse only CRI-Containerd logs (no extracting metadata from filename)
func NewCRIContainerdWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
timestamp:
parse_from: attributes.time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
`)
}