Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docker time stamps as metric time stamps #7434

Merged
merged 5 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions plugins/inputs/docker_log/docker_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package docker_log

import (
"bufio"
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"strings"
"sync"
Expand Down Expand Up @@ -287,7 +289,7 @@ func (d *DockerLogs) tailContainerLogs(
logOptions := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: false,
Timestamps: true,
Details: false,
Follow: true,
Tail: tail,
Expand All @@ -311,6 +313,30 @@ func (d *DockerLogs) tailContainerLogs(
}
}

func parseLine(line []byte) (time.Time, string, error) {
parts := bytes.SplitN(line, []byte(" "), 2)

switch len(parts) {
case 1:
parts = append(parts, []byte(""))
}

tsString := string(parts[0])

// Keep any leading space, but remove whitespace from end of line.
// This preserves space in, for example, stacktraces, while removing
// annoying end of line characters and is similar to how other logging
// plugins such as syslog behave.
message := bytes.TrimRightFunc(parts[1], unicode.IsSpace)

ts, err := time.Parse(time.RFC3339Nano, tsString)
if err != nil {
return time.Time{}, "", fmt.Errorf("error parsing timestamp %q: %v", tsString, err)
}

return ts, string(message), nil
}

func tailStream(
acc telegraf.Accumulator,
baseTags map[string]string,
Expand All @@ -328,22 +354,19 @@ func tailStream(

r := bufio.NewReaderSize(reader, 64*1024)

var err error
var message string
for {
message, err = r.ReadString('\n')

// Keep any leading space, but remove whitespace from end of line.
// This preserves space in, for example, stacktraces, while removing
// annoying end of line characters and is similar to how other logging
// plugins such as syslog behave.
message = strings.TrimRightFunc(message, unicode.IsSpace)

if len(message) != 0 {
acc.AddFields("docker_log", map[string]interface{}{
"container_id": containerID,
"message": message,
}, tags)
line, err := r.ReadBytes('\n')

if len(line) != 0 {
ts, message, err := parseLine(line)
if err != nil {
acc.AddError(err)
} else {
acc.AddFields("docker_log", map[string]interface{}{
"container_id": containerID,
"message": message,
}, tags, ts)
}
}

if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions plugins/inputs/docker_log/docker_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ func (r *Response) Close() error {
return nil
}

func MustParse(layout, value string) time.Time {
tm, err := time.Parse(layout, value)
if err != nil {
panic(err)
}
return tm
}

func Test(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -87,7 +95,7 @@ func Test(t *testing.T) {
}, nil
},
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return &Response{Reader: bytes.NewBuffer([]byte("hello\n"))}, nil
return &Response{Reader: bytes.NewBuffer([]byte("2020-04-28T18:43:16.432691200Z hello\n"))}, nil
},
},
expected: []telegraf.Metric{
Expand All @@ -104,7 +112,7 @@ func Test(t *testing.T) {
"container_id": "deadbeef",
"message": "hello",
},
time.Now(),
MustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"),
),
},
},
Expand All @@ -130,7 +138,7 @@ func Test(t *testing.T) {
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
var buf bytes.Buffer
w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout)
w.Write([]byte("hello from stdout"))
w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout"))
return &Response{Reader: &buf}, nil
},
},
Expand All @@ -148,7 +156,7 @@ func Test(t *testing.T) {
"container_id": "deadbeef",
"message": "hello from stdout",
},
time.Now(),
MustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"),
),
},
},
Expand All @@ -172,7 +180,9 @@ func Test(t *testing.T) {
acc.Wait(len(tt.expected))
plugin.Stop()

testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
require.Nil(t, acc.Errors) // no errors during gathering

testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics())
})
}
}