Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(promtail): Handle docker logs when a log is split in multiple frames #12374

Merged
merged 9 commits into from
Apr 26, 2024
2 changes: 1 addition & 1 deletion clients/pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (p *Promtail) reloadConfig(cfg *config.Config) error {
entryHandlers = append(entryHandlers, p.client)
p.entriesFanout = utils.NewFanoutEntryHandler(timeoutUntilFanoutHardStop, entryHandlers...)

tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.entriesFanout, cfg.ScrapeConfig, &cfg.TargetConfig, cfg.Global.FileWatch)
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.entriesFanout, cfg.ScrapeConfig, &cfg.TargetConfig, cfg.Global.FileWatch, &cfg.LimitsConfig)
if err != nil {
return err
}
Expand Down
66 changes: 45 additions & 21 deletions clients/pkg/promtail/targets/docker/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Target struct {
labels model.LabelSet
relabelConfig []*relabel.Config
metrics *Metrics
maxLineSize int

cancel context.CancelFunc
client client.APIClient
Expand All @@ -51,6 +52,7 @@ func NewTarget(
labels model.LabelSet,
relabelConfig []*relabel.Config,
client client.APIClient,
maxLineSize int,
) (*Target, error) {

pos, err := position.Get(positions.CursorKey(containerName))
Expand All @@ -71,6 +73,7 @@ func NewTarget(
labels: labels,
relabelConfig: relabelConfig,
metrics: metrics,
maxLineSize: maxLineSize,

client: client,
running: atomic.NewBool(false),
Expand Down Expand Up @@ -107,8 +110,8 @@ func (t *Target) processLoop(ctx context.Context) {
}

// Start transferring
cstdout := make(chan []byte, 1)
cstderr := make(chan []byte, 1)
cstdout := make(chan []byte)
cstderr := make(chan []byte)
t.wg.Add(1)
go func() {
defer func() {
Expand Down Expand Up @@ -161,16 +164,21 @@ func (t *Target) process(frames chan []byte, logStream string) {
t.wg.Done()
}()

// softMaxBuffer is to prevent unlimited memory growth.
// In principle this introduces the same issue that we are compensating for,
// i.e. split messages, however there should be some limit and so this number
// is chosen to be "very high" just not "OOM high" (which admittedly is not
// a universally defined threshold).
const softMaxBuffer = 16 * 1024 * 1024
var (
payloadAcc = new(strings.Builder)
curTs = time.Now()
sizeLimit = t.maxLineSize
discardRemainingLine = false
payloadAcc = new(strings.Builder)
curTs = time.Now()
)

// If max_line_size is disabled (set to 0), we can in theory have infinite buffer growth.
// We can't guarantee that there's any bound on Docker logs, they could be an infinite stream
// without newlines for all we know. To protect promtail from OOM in that case, we introduce
// this safety limit.
if sizeLimit == 0 {
sizeLimit = 256 * 1024
}

for frame := range frames {
// Split frame into timestamp and payload
ts, payload, err := extractTs(string(frame))
Expand All @@ -185,34 +193,50 @@ func (t *Target) process(frames chan []byte, logStream string) {
ts = curTs
}

// If time has changed, we are looking at a new event (although we should have seen a new line..),
// so flush the buffer if we have one.
if ts != curTs {
discardRemainingLine = false
if payloadAcc.Len() > 0 {
t.handleOutput(logStream, curTs, payloadAcc.String())
payloadAcc = new(strings.Builder)
}
}

// Check if we have the end of the event
var isEol = strings.HasSuffix(payload, "\n")

// If we are currently discarding a line (due to size limits), skip ahead, but don't skip the next
// frame if we saw the end of the line.
if discardRemainingLine {
discardRemainingLine = !isEol
continue
}

// Strip newline ending if we have it
payload = strings.TrimRight(payload, "\r\n")

// Fast path: Most log lines are a single frame. If we have a full line in frame and buffer is empty,
// then don't use the buffer at all.
if payloadAcc.Len() == 0 && strings.HasSuffix(payload, "\n") {
if payloadAcc.Len() == 0 && isEol {
t.handleOutput(logStream, ts, payload)
continue
}

// If time has changed, we are looking at a different event (although we should have seen a new line..),
// so flush the buffer.
if payloadAcc.Len() > 0 && ts != curTs {
t.handleOutput(logStream, curTs, payloadAcc.String())
payloadAcc = new(strings.Builder)
}
// Add to buffer
payloadAcc.WriteString(payload)
curTs = ts

// Send immediately if line ended or we built a very large event
if strings.HasSuffix(payload, "\n") || payloadAcc.Len() >= softMaxBuffer {
if isEol || payloadAcc.Len() > sizeLimit {
discardRemainingLine = !isEol
t.handleOutput(logStream, curTs, payloadAcc.String())
payloadAcc = new(strings.Builder)
}
}
}

func (t *Target) handleOutput(logStream string, ts time.Time, payload string) {
// We don't output trailing newlines
payload = strings.TrimSuffix(payload, "\n")
payload = strings.TrimSuffix(payload, "\r")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be removing these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into the process function which now has a TrimRight call for these. The reason is that process keeps track of the length of the buffer, to see if it's over the new limit we set, but this length should exclude the trailing newline (because we don't output the newline). And I thought it was just easier to take away the trailing newline earlier in the process for this purpose (otherwise we'd have to keep track of the buffer length in a more complicated manner than just calling payloadAcc.Len()).

// Add all labels from the config, relabel and filter them.
lb := labels.NewBuilder(nil)
for k, v := range t.labels {
Expand Down
2 changes: 2 additions & 0 deletions clients/pkg/promtail/targets/docker/target_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type targetGroup struct {
httpClientConfig config.HTTPClientConfig
client client.APIClient
refreshInterval model.Duration
maxLineSize int

mtx sync.Mutex
targets map[string]*Target
Expand Down Expand Up @@ -120,6 +121,7 @@ func (tg *targetGroup) addTarget(id string, discoveredLabels model.LabelSet) err
discoveredLabels.Merge(tg.defaultLabels),
tg.relabelConfig,
tg.client,
tg.maxLineSize,
)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions clients/pkg/promtail/targets/docker/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func Test_DockerTarget(t *testing.T) {
model.LabelSet{"job": "docker"},
[]*relabel.Config{},
client,
0,
)
require.NoError(t, err)

Expand Down Expand Up @@ -149,6 +150,7 @@ func doTestPartial(t *testing.T, tty bool) {
model.LabelSet{"job": "docker"},
[]*relabel.Config{},
client,
0,
)
require.NoError(t, err)

Expand Down
2 changes: 2 additions & 0 deletions clients/pkg/promtail/targets/docker/targetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewTargetManager(
positions positions.Positions,
pushClient api.EntryHandler,
scrapeConfigs []scrapeconfig.Config,
maxLineSize int,
) (*TargetManager, error) {
noopRegistry := util.NoopRegistry{}
noopSdMetrics, err := discovery.CreateAndRegisterSDMetrics(noopRegistry)
Expand Down Expand Up @@ -94,6 +95,7 @@ func NewTargetManager(
host: sdConfig.Host,
httpClientConfig: sdConfig.HTTPClientConfig,
refreshInterval: sdConfig.RefreshInterval,
maxLineSize: maxLineSize,
}
}
configs[syncerKey] = append(configs[syncerKey], sdConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func Test_TargetManager(t *testing.T) {
ps,
entryHandler,
cfgs,
0,
)
require.NoError(t, err)
require.True(t, ta.Ready())
Expand Down
4 changes: 3 additions & 1 deletion clients/pkg/promtail/targets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/clients/pkg/promtail/api"
"github.com/grafana/loki/v3/clients/pkg/promtail/limit"
"github.com/grafana/loki/v3/clients/pkg/promtail/positions"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/azureeventhubs"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewTargetManagers(
scrapeConfigs []scrapeconfig.Config,
targetConfig *file.Config,
watchConfig file.WatchConfig,
limitsConfig *limit.Config,
) (*TargetManagers, error) {
if targetConfig.Stdin {
level.Debug(logger).Log("msg", "configured to read from stdin")
Expand Down Expand Up @@ -273,7 +275,7 @@ func NewTargetManagers(
if err != nil {
return nil, err
}
cfTargetManager, err := docker.NewTargetManager(dockerMetrics, logger, pos, client, scrapeConfigs)
cfTargetManager, err := docker.NewTargetManager(dockerMetrics, logger, pos, client, scrapeConfigs, limitsConfig.MaxLineSize.Val())
if err != nil {
return nil, errors.Wrap(err, "failed to make Docker service discovery target manager")
}
Expand Down
Loading