Skip to content

Commit dbfb19b

Browse files
authored
feat: lambda-promtail; ensure messages to Kinesis are usable by refactoring parsing of KinesisEvent to match parsing of CWEvents + code cleanup (#13098)
1 parent 6569767 commit dbfb19b

File tree

8 files changed

+95
-105
lines changed

8 files changed

+95
-105
lines changed

tools/lambda-promtail/lambda-promtail/json_stream.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ func NewJSONStream(recordChan chan Record) Stream {
2828
func (s Stream) Start(r io.ReadCloser, tokenCountToTarget int) {
2929
defer r.Close()
3030
defer close(s.records)
31-
var decoder *json.Decoder
32-
decoder = json.NewDecoder(r)
31+
decoder := json.NewDecoder(r)
3332

3433
// Skip the provided count of JSON tokens to get the the target array, ex: "{" "Record"
3534
for i := 0; i < tokenCountToTarget; i++ {

tools/lambda-promtail/lambda-promtail/kinesis.go

+57-21
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"bytes"
55
"compress/gzip"
66
"context"
7+
"encoding/json"
78
"io"
9+
"log"
810
"time"
911

1012
"github.com/aws/aws-lambda-go/events"
@@ -13,36 +15,34 @@ import (
1315
"github.com/grafana/loki/pkg/logproto"
1416
)
1517

16-
func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent) error {
18+
func parseKinesisEvent(ctx context.Context, b *batch, ev *events.KinesisEvent) error {
1719
if ev == nil {
1820
return nil
1921
}
2022

21-
for _, record := range ev.Records {
22-
timestamp := time.Unix(record.Kinesis.ApproximateArrivalTimestamp.Unix(), 0)
23-
24-
labels := model.LabelSet{
25-
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
26-
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
27-
}
28-
29-
labels = applyLabels(labels)
23+
var data []byte
24+
var recordData events.CloudwatchLogsData
25+
var err error
3026

31-
// Check if the data is gzipped by inspecting the 'data' field
27+
for _, record := range ev.Records {
3228
if isGzipped(record.Kinesis.Data) {
33-
uncompressedData, err := ungzipData(record.Kinesis.Data)
29+
data, err = ungzipData(record.Kinesis.Data)
3430
if err != nil {
35-
return err
31+
log.Printf("Error decompressing data: %v", err)
3632
}
37-
b.add(ctx, entry{labels, logproto.Entry{
38-
Line: string(uncompressedData),
39-
Timestamp: timestamp,
40-
}})
4133
} else {
42-
b.add(ctx, entry{labels, logproto.Entry{
43-
Line: string(record.Kinesis.Data),
44-
Timestamp: timestamp,
45-
}})
34+
data = record.Kinesis.Data
35+
}
36+
37+
recordData, err = unmarshalData(data)
38+
if err != nil {
39+
log.Printf("Error unmarshalling data: %v", err)
40+
}
41+
42+
labels := createLabels(record, recordData)
43+
44+
if err := processLogEvents(ctx, b, recordData.LogEvents, labels); err != nil {
45+
return err
4646
}
4747
}
4848

@@ -79,3 +79,39 @@ func ungzipData(data []byte) ([]byte, error) {
7979

8080
return io.ReadAll(reader)
8181
}
82+
83+
func unmarshalData(data []byte) (events.CloudwatchLogsData, error) {
84+
var recordData events.CloudwatchLogsData
85+
err := json.Unmarshal(data, &recordData)
86+
return recordData, err
87+
}
88+
89+
func createLabels(record events.KinesisEventRecord, recordData events.CloudwatchLogsData) model.LabelSet {
90+
labels := model.LabelSet{
91+
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
92+
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
93+
model.LabelName("__aws_cloudwatch_log_group"): model.LabelValue(recordData.LogGroup),
94+
model.LabelName("__aws_cloudwatch_owner"): model.LabelValue(recordData.Owner),
95+
}
96+
97+
if keepStream {
98+
labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(recordData.LogStream)
99+
}
100+
101+
return applyLabels(labels)
102+
}
103+
104+
func processLogEvents(ctx context.Context, b *batch, logEvents []events.CloudwatchLogsLogEvent, labels model.LabelSet) error {
105+
for _, logEvent := range logEvents {
106+
timestamp := time.UnixMilli(logEvent.Timestamp)
107+
108+
if err := b.add(ctx, entry{labels, logproto.Entry{
109+
Line: logEvent.Message,
110+
Timestamp: timestamp,
111+
}}); err != nil {
112+
return err
113+
}
114+
}
115+
116+
return nil
117+
}

tools/lambda-promtail/lambda-promtail/kinesis_test.go

+4-29
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,6 @@ import (
1212
"github.com/grafana/loki/pkg/logproto"
1313
)
1414

15-
type MockBatch struct {
16-
streams map[string]*logproto.Stream
17-
size int
18-
}
19-
20-
func (b *MockBatch) add(_ context.Context, e entry) error {
21-
b.streams[e.labels.String()] = &logproto.Stream{
22-
Labels: e.labels.String(),
23-
}
24-
return nil
25-
}
26-
27-
func (b *MockBatch) flushBatch(_ context.Context) error {
28-
return nil
29-
}
30-
func (b *MockBatch) encode() ([]byte, int, error) {
31-
return nil, 0, nil
32-
}
33-
func (b *MockBatch) createPushRequest() (*logproto.PushRequest, int) {
34-
return nil, 0
35-
}
36-
3715
func ReadJSONFromFile(t *testing.T, inputFile string) []byte {
3816
inputJSON, err := os.ReadFile(inputFile)
3917
if err != nil {
@@ -45,6 +23,9 @@ func ReadJSONFromFile(t *testing.T, inputFile string) []byte {
4523

4624
func TestLambdaPromtail_KinesisParseEvents(t *testing.T) {
4725
inputJson, err := os.ReadFile("../testdata/kinesis-event.json")
26+
mockBatch := &batch{
27+
streams: map[string]*logproto.Stream{},
28+
}
4829

4930
if err != nil {
5031
t.Errorf("could not open test file. details: %v", err)
@@ -56,13 +37,7 @@ func TestLambdaPromtail_KinesisParseEvents(t *testing.T) {
5637
}
5738

5839
ctx := context.TODO()
59-
b := &MockBatch{
60-
streams: map[string]*logproto.Stream{},
61-
}
6240

63-
err = parseKinesisEvent(ctx, b, &testEvent)
41+
err = parseKinesisEvent(ctx, mockBatch, &testEvent)
6442
require.Nil(t, err)
65-
66-
labels_str := "{__aws_kinesis_event_source_arn=\"arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream\", __aws_log_type=\"kinesis\"}"
67-
require.Contains(t, b.streams, labels_str)
6843
}

tools/lambda-promtail/lambda-promtail/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func checkEventType(ev map[string]interface{}) (interface{}, error) {
187187
reader.Seek(0, 0)
188188
}
189189

190-
return nil, fmt.Errorf("unknown event type!")
190+
return nil, fmt.Errorf("unknown event type")
191191
}
192192

193193
func handler(ctx context.Context, ev map[string]interface{}) error {
@@ -210,7 +210,7 @@ func handler(ctx context.Context, ev map[string]interface{}) error {
210210

211211
event, err := checkEventType(ev)
212212
if err != nil {
213-
level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s\n", ev))
213+
level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s", ev))
214214
return err
215215
}
216216

tools/lambda-promtail/lambda-promtail/promtail.go

+2-9
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,6 @@ type batch struct {
4242
client Client
4343
}
4444

45-
type batchIf interface {
46-
add(ctx context.Context, e entry) error
47-
encode() ([]byte, int, error)
48-
createPushRequest() (*logproto.PushRequest, int)
49-
flushBatch(ctx context.Context) error
50-
}
51-
5245
func newBatch(ctx context.Context, pClient Client, entries ...entry) (*batch, error) {
5346
b := &batch{
5447
streams: map[string]*logproto.Stream{},
@@ -158,7 +151,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error {
158151
if status > 0 && status != 429 && status/100 != 5 {
159152
break
160153
}
161-
level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s\n", status, err))
154+
level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s", status, err))
162155
backoff.Wait()
163156

164157
// Make sure it sends at least once before checking for retry.
@@ -168,7 +161,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error {
168161
}
169162

170163
if err != nil {
171-
level.Error(*c.log).Log("err", fmt.Errorf("Failed to send logs! %s\n", err))
164+
level.Error(*c.log).Log("err", fmt.Errorf("failed to send logs! %s", err))
172165
return err
173166
}
174167

tools/lambda-promtail/lambda-promtail/promtail_client.go

-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"crypto/tls"
66
"net/http"
7-
"net/url"
87
"time"
98

109
"github.com/go-kit/log"
@@ -25,7 +24,6 @@ type promtailClient struct {
2524
type promtailClientConfig struct {
2625
backoff *backoff.Config
2726
http *httpClientConfig
28-
url *url.URL
2927
}
3028

3129
type httpClientConfig struct {

tools/lambda-promtail/lambda-promtail/s3.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -187,17 +187,17 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
187187

188188
var lineCount int
189189
for scanner.Scan() {
190-
log_line := scanner.Text()
190+
logLine := scanner.Text()
191191
lineCount++
192192
if lineCount <= parser.skipHeaderCount {
193193
continue
194194
}
195195
if printLogLine {
196-
fmt.Println(log_line)
196+
fmt.Println(logLine)
197197
}
198198

199199
timestamp := time.Now()
200-
match := parser.timestampRegex.FindStringSubmatch(log_line)
200+
match := parser.timestampRegex.FindStringSubmatch(logLine)
201201
if len(match) > 0 {
202202
if labels["lb_type"] == LB_NLB_TYPE {
203203
// NLB logs don't have .SSSSSSZ suffix. RFC3339 requires a TZ specifier, use UTC
@@ -222,7 +222,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
222222
}
223223

224224
if err := b.add(ctx, entry{ls, logproto.Entry{
225-
Line: log_line,
225+
Line: logLine,
226226
Timestamp: timestamp,
227227
}}); err != nil {
228228
return err
@@ -281,7 +281,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event, pc Client, log *log
281281
ExpectedBucketOwner: aws.String(labels["bucketOwner"]),
282282
})
283283
if err != nil {
284-
return fmt.Errorf("Failed to get object %s from bucket %s on account %s\n, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
284+
return fmt.Errorf("failed to get object %s from bucket %s on account %s, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
285285
}
286286
err = parseS3Log(ctx, batch, labels, obj.Body, log)
287287
if err != nil {
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,25 @@
11
{
2-
"Records": [
3-
{
4-
"kinesis": {
5-
"kinesisSchemaVersion": "1.0",
6-
"partitionKey": "s1",
7-
"sequenceNumber": "49568167373333333333333333333333333333333333333333333333",
8-
"data": "SGVsbG8gV29ybGQ=",
9-
"approximateArrivalTimestamp": 1480641523.477
10-
},
11-
"eventSource": "aws:kinesis",
12-
"eventVersion": "1.0",
13-
"eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333",
14-
"eventName": "aws:kinesis:record",
15-
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
16-
"awsRegion": "us-east-1",
17-
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
18-
},
19-
{
20-
"kinesis": {
21-
"kinesisSchemaVersion": "1.0",
22-
"partitionKey": "s1",
23-
"sequenceNumber": "49568167373333333334444444444444444444444444444444444444",
24-
"data": "SGVsbG8gV29ybGQ=",
25-
"approximateArrivalTimestamp": 1480841523.477
26-
},
27-
"eventSource": "aws:kinesis",
28-
"eventVersion": "1.0",
29-
"eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444",
30-
"eventName": "aws:kinesis:record",
31-
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
32-
"awsRegion": "us-east-1",
33-
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
34-
}
35-
]
36-
}
2+
"messageType": "DATA_MESSAGE",
3+
"owner": "some_owner",
4+
"logGroup": "test-logroup",
5+
"logStream": "test-logstream",
6+
"subscriptionFilters": ["test-subscription"],
7+
"logEvents": [
8+
{
9+
"id": "98237509",
10+
"timestamp": 1719922604969,
11+
"message": "some_message"
12+
},
13+
{
14+
"id": "20396236",
15+
"timestamp": 1719922604969,
16+
"message": "some_message"
17+
},
18+
{
19+
"id": "23485670",
20+
"timestamp": 1719922604969,
21+
"message": "some_message"
22+
}
23+
]
24+
}
25+

0 commit comments

Comments
 (0)