Skip to content

Commit ec54c72

Browse files
feat(lambda/promtail): support dropping labels (#10755)
**What this PR does / why we need it**: **Which issue(s) this PR fixes**: Fixes #10669 **Special notes for your reviewer**: **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](d10549e) --------- Signed-off-by: hainenber <[email protected]> Co-authored-by: Michel Hollands <[email protected]>
1 parent d0cf3b0 commit ec54c72

File tree

10 files changed

+90
-26
lines changed

10 files changed

+90
-26
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
* [10416](https://github.com/grafana/loki/pull/10416) **lpugoy**: Lambda-Promtail: Add support for WAF logs in S3
4343
* [10301](https://github.com/grafana/loki/pull/10301) **wildum**: users can now define `additional_fields` in cloudflare configuration.
44+
* [10755](https://github.com/grafana/loki/pull/10755) **hainenber**: Lambda-Promtail: Add support for dropping labels passed via env var
4445

4546
##### Changes
4647

tools/lambda-promtail/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ Then use Terraform to deploy:
5454

5555
```bash
5656
## use cloudwatch log group
57-
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'bearer_token=<bearer-token>' -var 'log_group_names=["log-group-01", "log-group-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
57+
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'bearer_token=<bearer-token>' -var 'log_group_names=["log-group-01", "log-group-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var 'drop_labels="name1,name2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
5858
```
5959

6060
```bash
6161
## use kinesis data stream
62-
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'kinesis_stream_name=["kinesis-stream-01", "kinesis-stream-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
62+
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'kinesis_stream_name=["kinesis-stream-01", "kinesis-stream-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var 'drop_labels="name1,name2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
6363
```
6464

6565
or CloudFormation:

tools/lambda-promtail/lambda-promtail/cw.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent)
2626
labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(data.LogStream)
2727
}
2828

29-
labels = applyExtraLabels(labels)
29+
labels = applyLabels(labels)
3030

3131
for _, event := range data.LogEvents {
3232
timestamp := time.UnixMilli(event.Timestamp)

tools/lambda-promtail/lambda-promtail/kinesis.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent)
2626
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
2727
}
2828

29-
labels = applyExtraLabels(labels)
29+
labels = applyLabels(labels)
3030

3131
// Check if the data is gzipped by inspecting the 'data' field
3232
if isGzipped(record.Kinesis.Data) {

tools/lambda-promtail/lambda-promtail/main.go

+39-11
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,19 @@ const (
2525

2626
maxErrMsgLen = 1024
2727

28-
invalidExtraLabelsError = "Invalid value for environment variable EXTRA_LABELS. Expected a comma separated list with an even number of entries. "
28+
invalidExtraLabelsError = "invalid value for environment variable EXTRA_LABELS. Expected a comma separated list with an even number of entries. "
2929
)
3030

3131
var (
32-
writeAddress *url.URL
33-
username, password, extraLabelsRaw, tenantID, bearerToken string
34-
keepStream bool
35-
batchSize int
36-
s3Clients map[string]*s3.Client
37-
extraLabels model.LabelSet
38-
skipTlsVerify bool
39-
printLogLine bool
32+
writeAddress *url.URL
33+
username, password, extraLabelsRaw, dropLabelsRaw, tenantID, bearerToken string
34+
keepStream bool
35+
batchSize int
36+
s3Clients map[string]*s3.Client
37+
extraLabels model.LabelSet
38+
dropLabels []model.LabelName
39+
skipTlsVerify bool
40+
printLogLine bool
4041
)
4142

4243
func setupArguments() {
@@ -60,6 +61,11 @@ func setupArguments() {
6061
panic(err)
6162
}
6263

64+
dropLabels, err = getDropLabels()
65+
if err != nil {
66+
panic(err)
67+
}
68+
6369
username = os.Getenv("USERNAME")
6470
password = os.Getenv("PASSWORD")
6571
// If either username or password is set then both must be.
@@ -128,8 +134,30 @@ func parseExtraLabels(extraLabelsRaw string, omitPrefix bool) (model.LabelSet, e
128134
return extractedLabels, nil
129135
}
130136

131-
func applyExtraLabels(labels model.LabelSet) model.LabelSet {
132-
return labels.Merge(extraLabels)
137+
func getDropLabels() ([]model.LabelName, error) {
138+
var result []model.LabelName
139+
140+
dropLabelsRaw = os.Getenv("DROP_LABELS")
141+
dropLabelsRawSplit := strings.Split(dropLabelsRaw, ",")
142+
for _, dropLabelRaw := range dropLabelsRawSplit {
143+
dropLabel := model.LabelName(dropLabelRaw)
144+
if !dropLabel.IsValid() {
145+
return []model.LabelName{}, fmt.Errorf("invalid label name %s", dropLabelRaw)
146+
}
147+
result = append(result, dropLabel)
148+
}
149+
150+
return result, nil
151+
}
152+
153+
func applyLabels(labels model.LabelSet) model.LabelSet {
154+
finalLabels := labels.Merge(extraLabels)
155+
156+
for _, dropLabel := range dropLabels {
157+
delete(finalLabels, dropLabel)
158+
}
159+
160+
return finalLabels
133161
}
134162

135163
func checkEventType(ev map[string]interface{}) (interface{}, error) {

tools/lambda-promtail/lambda-promtail/main_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"os"
45
"testing"
56

67
"github.com/prometheus/common/model"
@@ -34,3 +35,27 @@ func TestLambdaPromtail_TestParseLabelsNoneProvided(t *testing.T) {
3435
require.Len(t, extraLabels, 0)
3536
require.Nil(t, err)
3637
}
38+
39+
func TestLambdaPromtail_TestDropLabels(t *testing.T) {
40+
os.Setenv("DROP_LABELS", "A1,A2")
41+
42+
// Reset the shared global variables
43+
defer func() {
44+
os.Unsetenv("DROP_LABELS")
45+
dropLabels = []model.LabelName{}
46+
}()
47+
48+
var err error
49+
dropLabels, err = getDropLabels()
50+
require.Nil(t, err)
51+
require.Contains(t, dropLabels, model.LabelName("A1"))
52+
53+
defaultLabelSet := model.LabelSet{
54+
model.LabelName("default"): model.LabelValue("default"),
55+
model.LabelName("A1"): model.LabelValue("A1"),
56+
model.LabelName("B2"): model.LabelValue("B2"),
57+
}
58+
modifiedLabels := applyLabels(defaultLabelSet)
59+
require.NotContains(t, modifiedLabels, model.LabelName("A1"))
60+
require.Contains(t, modifiedLabels, model.LabelName("B2"))
61+
}

tools/lambda-promtail/lambda-promtail/promtail.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,11 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
126126
}
127127

128128
func (b *batch) flushBatch(ctx context.Context) error {
129-
err := b.client.sendToPromtail(ctx, b)
130-
if err != nil {
131-
return err
129+
if b.client != nil {
130+
err := b.client.sendToPromtail(ctx, b)
131+
if err != nil {
132+
return err
133+
}
132134
}
133135
b.resetBatch()
134136

tools/lambda-promtail/lambda-promtail/s3.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
162162
model.LabelName(fmt.Sprintf("__aws_%s_owner", parser.logTypeLabel)): model.LabelValue(labels[parser.ownerLabelKey]),
163163
}
164164

165-
ls = applyExtraLabels(ls)
165+
ls = applyLabels(ls)
166166

167167
// extract the timestamp of the nested event and sends the rest as raw json
168168
if labels["type"] == CLOUDTRAIL_LOG_TYPE {
@@ -341,13 +341,14 @@ func stringToRawEvent(body string) (map[string]interface{}, error) {
341341
// It also makes use of the fact that the log10 of a number in base 10 is its number of digits - 1.
342342
// It returns early if the fractional seconds is 0 because getting the log10 of 0 results in -Inf.
343343
// For example, given a string 1234567890123:
344-
// iLog10 = 12 // the parsed int is 13 digits long
345-
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
346-
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
347-
// fractionalSec = 123 // the rest of the parsed int
348-
// fractionalSecLog10 = 2 // it is 3 digits long
349-
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
350-
// nsec = 123000000 // this is the nanoseconds part of the Unix time
344+
//
345+
// iLog10 = 12 // the parsed int is 13 digits long
346+
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
347+
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
348+
// fractionalSec = 123 // the rest of the parsed int
349+
// fractionalSecLog10 = 2 // it is 3 digits long
350+
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
351+
// nsec = 123000000 // this is the nanoseconds part of the Unix time
351352
func getUnixSecNsec(s string) (sec int64, nsec int64, err error) {
352353
const (
353354
UNIX_SEC_LOG10 = 9

tools/lambda-promtail/main.tf

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ resource "aws_lambda_function" "this" {
174174
KEEP_STREAM = var.keep_stream
175175
BATCH_SIZE = var.batch_size
176176
EXTRA_LABELS = var.extra_labels
177+
DROP_LABELS = var.drop_labels
177178
OMIT_EXTRA_LABELS_PREFIX = var.omit_extra_labels_prefix ? "true" : "false"
178179
TENANT_ID = var.tenant_id
179180
SKIP_TLS_VERIFY = var.skip_tls_verify

tools/lambda-promtail/variables.tf

+6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ variable "extra_labels" {
7272
default = ""
7373
}
7474

75+
variable "drop_labels" {
76+
type = string
77+
description = "Comma separated list of labels to be drop, in the format 'name1,name2,...,nameN' to be omitted to entries forwarded by lambda-promtail."
78+
default = ""
79+
}
80+
7581
variable "omit_extra_labels_prefix" {
7682
type = bool
7783
description = "Whether or not to omit the prefix `__extra_` from extra labels defined in the variable `extra_labels`."

0 commit comments

Comments
 (0)