Skip to content

Commit f118360

Browse files
committed
Merge branch 'main' of github.com:grafana/loki into have-extra-scheduler-grpc-config
2 parents 9cb6360 + fcd544c commit f118360

File tree

143 files changed

+5228
-1500
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

143 files changed

+5228
-1500
lines changed

.github/pull_request_template.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Fixes #<issue number>
1010
- [ ] Documentation added
1111
- [ ] Tests updated
1212
- [ ] Title matches the required conventional commits format, see [here](https://www.conventionalcommits.org/en/v1.0.0/)
13+
- **Note** that Promtail is considered to be feature complete, and future development for logs collection will be in [Grafana Alloy](https://github.com/grafana/alloy). As such, `feat` PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
1314
- [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md`
1415
- [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
1516
- [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15)

CONTRIBUTING.md

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Loki uses GitHub to manage reviews of pull requests:
66
- If you plan to do something more involved, discuss your ideas on the relevant GitHub issue.
77
- Make sure to follow the prerequisites below before marking your PR as ready for review.
88

9+
**Note that Promtail is considered to be feature complete, and future development for logs collection will be in [Grafana Alloy](https://github.com/grafana/alloy)**
10+
911
## Loki Improvement Documents (LIDs)
1012

1113
Before creating a large pull request to change or add functionality, please create a _Loki Improvement Document (LID)_. We use LIDs to discuss and vet ideas submitted by maintainers or the community in an open and transparent way. As of Jan 2023, we are starting with a lightweight LID process and we may add more structure, inspired by Python's [PEP](https://peps.python.org/pep-0001/) and Kafka's [KIP](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) approaches.

LICENSING.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The following folders and their subfolders are licensed under Apache-2.0:
1010

1111
```
1212
clients/
13+
pkg/framedstdcopy/
1314
pkg/ingester/wal
1415
pkg/logproto/
1516
pkg/loghttp/

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -301,12 +301,14 @@ ifeq ($(SKIP_ARM),true)
301301
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 darwin/amd64 windows/amd64 freebsd/amd64" ./cmd/loki
302302
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 darwin/amd64 windows/amd64 freebsd/amd64" ./cmd/logcli
303303
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 darwin/amd64 windows/amd64 freebsd/amd64" ./cmd/loki-canary
304+
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 darwin/amd64 windows/amd64 freebsd/amd64" ./cmd/lokitool
304305
CGO_ENABLED=0 $(GOX) -osarch="darwin/amd64 windows/amd64 windows/386 freebsd/amd64" ./clients/cmd/promtail
305306
CGO_ENABLED=1 $(CGO_GOX) -tags promtail_journal_enabled -osarch="linux/amd64" ./clients/cmd/promtail
306307
else
307308
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 linux/arm64 linux/arm darwin/amd64 darwin/arm64 windows/amd64 freebsd/amd64" ./cmd/loki
308309
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 linux/arm64 linux/arm darwin/amd64 darwin/arm64 windows/amd64 freebsd/amd64" ./cmd/logcli
309310
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 linux/arm64 linux/arm darwin/amd64 darwin/arm64 windows/amd64 freebsd/amd64" ./cmd/loki-canary
311+
CGO_ENABLED=0 $(GOX) -osarch="linux/amd64 linux/arm64 linux/arm darwin/amd64 darwin/arm64 windows/amd64 freebsd/amd64" ./cmd/lokitool
310312
CGO_ENABLED=0 $(GOX) -osarch="darwin/amd64 darwin/arm64 windows/amd64 windows/386 freebsd/amd64" ./clients/cmd/promtail
311313
PKG_CONFIG_PATH="/usr/lib/aarch64-linux-gnu/pkgconfig" CC="aarch64-linux-gnu-gcc" $(CGO_GOX) -tags promtail_journal_enabled -osarch="linux/arm64" ./clients/cmd/promtail
312314
PKG_CONFIG_PATH="/usr/lib/arm-linux-gnueabihf/pkgconfig" CC="arm-linux-gnueabihf-gcc" $(CGO_GOX) -tags promtail_journal_enabled -osarch="linux/arm" ./clients/cmd/promtail

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ A Loki-based logging stack consists of 3 components:
2424
- `loki` is the main server, responsible for storing logs and processing queries.
2525
- [Grafana](https://github.com/grafana/grafana) for querying and displaying the logs.
2626

27+
**Note that Promtail is considered to be feature complete, and future development for logs collection will be in [Grafana Alloy](https://github.com/grafana/alloy)**
28+
2729
Loki is like Prometheus, but for logs: we prefer a multidimensional label-based approach to indexing, and want a single-binary, easy to operate system with no dependencies.
2830
Loki differs from Prometheus by focusing on logs instead of metrics, and delivering logs via push, instead of pull.
2931

clients/pkg/promtail/promtail.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (p *Promtail) reloadConfig(cfg *config.Config) error {
184184
entryHandlers = append(entryHandlers, p.client)
185185
p.entriesFanout = utils.NewFanoutEntryHandler(timeoutUntilFanoutHardStop, entryHandlers...)
186186

187-
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.entriesFanout, cfg.ScrapeConfig, &cfg.TargetConfig, cfg.Global.FileWatch)
187+
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.entriesFanout, cfg.ScrapeConfig, &cfg.TargetConfig, cfg.Global.FileWatch, &cfg.LimitsConfig)
188188
if err != nil {
189189
return err
190190
}

clients/pkg/promtail/targets/docker/target.go

+101-61
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
package docker
22

33
import (
4-
"bufio"
54
"context"
65
"fmt"
7-
"io"
86
"strconv"
97
"strings"
108
"sync"
119
"time"
1210

1311
"github.com/docker/docker/api/types/container"
1412
"github.com/docker/docker/client"
15-
"github.com/docker/docker/pkg/stdcopy"
1613
"github.com/go-kit/log"
1714
"github.com/go-kit/log/level"
1815
"github.com/prometheus/common/model"
@@ -24,6 +21,7 @@ import (
2421
"github.com/grafana/loki/v3/clients/pkg/promtail/positions"
2522
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/target"
2623

24+
"github.com/grafana/loki/v3/pkg/framedstdcopy"
2725
"github.com/grafana/loki/v3/pkg/logproto"
2826
)
2927

@@ -36,6 +34,7 @@ type Target struct {
3634
labels model.LabelSet
3735
relabelConfig []*relabel.Config
3836
metrics *Metrics
37+
maxLineSize int
3938

4039
cancel context.CancelFunc
4140
client client.APIClient
@@ -53,6 +52,7 @@ func NewTarget(
5352
labels model.LabelSet,
5453
relabelConfig []*relabel.Config,
5554
client client.APIClient,
55+
maxLineSize int,
5656
) (*Target, error) {
5757

5858
pos, err := position.Get(positions.CursorKey(containerName))
@@ -73,6 +73,7 @@ func NewTarget(
7373
labels: labels,
7474
relabelConfig: relabelConfig,
7575
metrics: metrics,
76+
maxLineSize: maxLineSize,
7677

7778
client: client,
7879
running: atomic.NewBool(false),
@@ -109,22 +110,22 @@ func (t *Target) processLoop(ctx context.Context) {
109110
}
110111

111112
// Start transferring
112-
rstdout, wstdout := io.Pipe()
113-
rstderr, wstderr := io.Pipe()
113+
cstdout := make(chan []byte)
114+
cstderr := make(chan []byte)
114115
t.wg.Add(1)
115116
go func() {
116117
defer func() {
117118
t.wg.Done()
118-
wstdout.Close()
119-
wstderr.Close()
119+
close(cstdout)
120+
close(cstderr)
120121
t.Stop()
121122
}()
122123
var written int64
123124
var err error
124125
if inspectInfo.Config.Tty {
125-
written, err = io.Copy(wstdout, logs)
126+
written, err = framedstdcopy.NoHeaderFramedStdCopy(cstdout, logs)
126127
} else {
127-
written, err = stdcopy.StdCopy(wstdout, wstderr, logs)
128+
written, err = framedstdcopy.FramedStdCopy(cstdout, cstderr, logs)
128129
}
129130
if err != nil {
130131
level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err)
@@ -135,8 +136,8 @@ func (t *Target) processLoop(ctx context.Context) {
135136

136137
// Start processing
137138
t.wg.Add(2)
138-
go t.process(rstdout, "stdout")
139-
go t.process(rstderr, "stderr")
139+
go t.process(cstdout, "stdout")
140+
go t.process(cstderr, "stderr")
140141

141142
// Wait until done
142143
<-ctx.Done()
@@ -149,81 +150,120 @@ func (t *Target) processLoop(ctx context.Context) {
149150
func extractTs(line string) (time.Time, string, error) {
150151
pair := strings.SplitN(line, " ", 2)
151152
if len(pair) != 2 {
152-
return time.Now(), line, fmt.Errorf("Could not find timestamp in '%s'", line)
153+
return time.Now(), line, fmt.Errorf("could not find timestamp in '%s'", line)
153154
}
154155
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0])
155156
if err != nil {
156-
return time.Now(), line, fmt.Errorf("Could not parse timestamp from '%s': %w", pair[0], err)
157+
return time.Now(), line, fmt.Errorf("could not parse timestamp from '%s': %w", pair[0], err)
157158
}
158159
return ts, pair[1], nil
159160
}
160161

161-
// https://devmarkpro.com/working-big-files-golang
162-
func readLine(r *bufio.Reader) (string, error) {
162+
func (t *Target) process(frames chan []byte, logStream string) {
163+
defer func() {
164+
t.wg.Done()
165+
}()
166+
163167
var (
164-
isPrefix = true
165-
err error
166-
line, ln []byte
168+
sizeLimit = t.maxLineSize
169+
discardRemainingLine = false
170+
payloadAcc strings.Builder
171+
curTs = time.Now()
167172
)
168173

169-
for isPrefix && err == nil {
170-
line, isPrefix, err = r.ReadLine()
171-
ln = append(ln, line...)
174+
// If max_line_size is disabled (set to 0), we can in theory have infinite buffer growth.
175+
// We can't guarantee that there's any bound on Docker logs, they could be an infinite stream
176+
// without newlines for all we know. To protect promtail from OOM in that case, we introduce
177+
// this safety limit into the Docker target, inspired by the default Loki max_line_size value:
178+
// https://grafana.com/docs/loki/latest/configure/#limits_config
179+
if sizeLimit == 0 {
180+
sizeLimit = 256 * 1024
172181
}
173182

174-
return string(ln), err
175-
}
176-
177-
func (t *Target) process(r io.Reader, logStream string) {
178-
defer func() {
179-
t.wg.Done()
180-
}()
181-
182-
reader := bufio.NewReader(r)
183-
for {
184-
line, err := readLine(reader)
183+
for frame := range frames {
184+
// Split frame into timestamp and payload
185+
ts, payload, err := extractTs(string(frame))
185186
if err != nil {
186-
if err == io.EOF {
187-
break
187+
if payloadAcc.Len() == 0 {
188+
// If we are currently accumulating a line split over multiple frames, we would still expect
189+
// timestamps in every frame, but since we don't use those secondary ones, we don't log an error in that case.
190+
level.Error(t.logger).Log("msg", "error reading docker log line, skipping line", "err", err)
191+
t.metrics.dockerErrors.Inc()
192+
continue
188193
}
189-
level.Error(t.logger).Log("msg", "error reading docker log line, skipping line", "err", err)
190-
t.metrics.dockerErrors.Inc()
194+
ts = curTs
191195
}
192196

193-
ts, line, err := extractTs(line)
194-
if err != nil {
195-
level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err)
196-
t.metrics.dockerErrors.Inc()
197+
// If time has changed, we are looking at a new event (although we should have seen a new line..),
198+
// so flush the buffer if we have one.
199+
if ts != curTs {
200+
discardRemainingLine = false
201+
if payloadAcc.Len() > 0 {
202+
t.handleOutput(logStream, curTs, payloadAcc.String())
203+
payloadAcc.Reset()
204+
}
205+
}
206+
207+
// Check if we have the end of the event
208+
var isEol = strings.HasSuffix(payload, "\n")
209+
210+
// If we are currently discarding a line (due to size limits), skip ahead, but don't skip the next
211+
// frame if we saw the end of the line.
212+
if discardRemainingLine {
213+
discardRemainingLine = !isEol
197214
continue
198215
}
199216

200-
// Add all labels from the config, relabel and filter them.
201-
lb := labels.NewBuilder(nil)
202-
for k, v := range t.labels {
203-
lb.Set(string(k), string(v))
217+
// Strip newline ending if we have it
218+
payload = strings.TrimRight(payload, "\r\n")
219+
220+
// Fast path: Most log lines are a single frame. If we have a full line in frame and buffer is empty,
221+
// then don't use the buffer at all.
222+
if payloadAcc.Len() == 0 && isEol {
223+
t.handleOutput(logStream, ts, payload)
224+
continue
204225
}
205-
lb.Set(dockerLabelLogStream, logStream)
206-
processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)
207226

208-
filtered := make(model.LabelSet)
209-
for _, lbl := range processed {
210-
if strings.HasPrefix(lbl.Name, "__") {
211-
continue
212-
}
213-
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
227+
// Add to buffer
228+
payloadAcc.WriteString(payload)
229+
curTs = ts
230+
231+
// Send immediately if line ended or we built a very large event
232+
if isEol || payloadAcc.Len() > sizeLimit {
233+
discardRemainingLine = !isEol
234+
t.handleOutput(logStream, curTs, payloadAcc.String())
235+
payloadAcc.Reset()
214236
}
237+
}
238+
}
239+
240+
func (t *Target) handleOutput(logStream string, ts time.Time, payload string) {
241+
// Add all labels from the config, relabel and filter them.
242+
lb := labels.NewBuilder(nil)
243+
for k, v := range t.labels {
244+
lb.Set(string(k), string(v))
245+
}
246+
lb.Set(dockerLabelLogStream, logStream)
247+
processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)
215248

216-
t.handler.Chan() <- api.Entry{
217-
Labels: filtered,
218-
Entry: logproto.Entry{
219-
Timestamp: ts,
220-
Line: line,
221-
},
249+
filtered := make(model.LabelSet)
250+
for _, lbl := range processed {
251+
if strings.HasPrefix(lbl.Name, "__") {
252+
continue
222253
}
223-
t.metrics.dockerEntries.Inc()
224-
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix())
225-
t.since = ts.Unix()
254+
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
255+
}
256+
257+
t.handler.Chan() <- api.Entry{
258+
Labels: filtered,
259+
Entry: logproto.Entry{
260+
Timestamp: ts,
261+
Line: payload,
262+
},
226263
}
264+
t.metrics.dockerEntries.Inc()
265+
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix())
266+
t.since = ts.Unix()
227267
}
228268

229269
// startIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice.

clients/pkg/promtail/targets/docker/target_group.go

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type targetGroup struct {
3636
httpClientConfig config.HTTPClientConfig
3737
client client.APIClient
3838
refreshInterval model.Duration
39+
maxLineSize int
3940

4041
mtx sync.Mutex
4142
targets map[string]*Target
@@ -120,6 +121,7 @@ func (tg *targetGroup) addTarget(id string, discoveredLabels model.LabelSet) err
120121
discoveredLabels.Merge(tg.defaultLabels),
121122
tg.relabelConfig,
122123
tg.client,
124+
tg.maxLineSize,
123125
)
124126
if err != nil {
125127
return err

0 commit comments

Comments
 (0)