diff --git a/.chloggen/over-capacity-issue.yaml b/.chloggen/over-capacity-issue.yaml new file mode 100644 index 000000000000..c012f2639721 --- /dev/null +++ b/.chloggen/over-capacity-issue.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make sure to not return an error if we are over capacity, just return that we cannot accept the event. + +# One or more tracking issues related to the change +issues: [20481] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/splunkhecexporter/buffer.go b/exporter/splunkhecexporter/buffer.go index 720e098f7af7..a195fe11360f 100644 --- a/exporter/splunkhecexporter/buffer.go +++ b/exporter/splunkhecexporter/buffer.go @@ -102,6 +102,10 @@ func (b *bufferState) accept(data []byte) (bool, error) { // if the byte writer was over capacity, try to write the new entry in the zip writer: if overCapacity { if _, err2 := zipWriter.Write(data); err2 != nil { + overCapacity2 := errors.Is(err2, errOverCapacity) + if overCapacity2 { + return false, nil + } return false, err2 } diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index c9e0f3e2ac6d..4432d900130e 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -144,6 +144,18 @@ func repeat(what int, times int) []int { return result } +// these runes are used to generate long log messages that will compress down to a number of bytes we can rely on for testing. +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789wersgdgr43q3zvbcgv65ew 346xx$gt5/kuopo89.nytqasdfghjklpoiuy") + +func repeatableString(length int) string { + b := make([]rune, length) + for i := range b { + l := i % len(letterRunes) + b[i] = letterRunes[l] + } + return string(b) +} + func createLogDataWithCustomLibraries(numResources int, libraries []string, numRecords []int) plog.Logs { logs := plog.NewLogs() logs.ResourceLogs().EnsureCapacity(numResources) @@ -641,6 +653,27 @@ func TestReceiveLogs(t *testing.T) { compressed: true, }, }, + { + name: "one event with 1340 bytes, then one triggering compression (going over 1500 bytes) and bypassing the max length, moving to a separate batch", + logs: func() plog.Logs { + firstLog := createLogData(1, 1, 2) + firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(repeatableString(1340)) + firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Body().SetStr(repeatableString(2800000)) + return firstLog + }(), + conf: func() *Config { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.MaxContentLengthLogs = 10000 // small so we can reproduce without allocating big logs. + return cfg + }(), + want: wantType{ + batches: [][]string{ + {`"otel.log.name":"0_0_0"`}, {`"otel.log.name":"0_0_1"`}, + }, + numBatches: 2, + compressed: true, + }, + }, } for _, test := range tests { @@ -648,7 +681,7 @@ func TestReceiveLogs(t *testing.T) { got, err := runLogExport(test.conf, test.logs, test.want.numBatches, t) require.NoError(t, err) - require.Len(t, got, test.want.numBatches) + require.Equal(t, test.want.numBatches, len(got)) for i := 0; i < test.want.numBatches; i++ { require.NotZero(t, got[i])