From 086c4ba1f4fdecc2e0e16b7dc5791e4c92860d19 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Thu, 1 Feb 2024 11:02:24 -0600 Subject: [PATCH] [chore][pkg/stanza] Remove unnecessary parameter in recombine operator (#30930) This PR is just a minor cleanup step broken out from #30784 --- .../transformer/recombine/recombine.go | 20 +++++++------------ .../transformer/recombine/recombine_test.go | 3 +-- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index 3b0a13444813..429ac1f9d03f 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -178,7 +178,7 @@ func (r *Transformer) flushLoop() { if timeSinceFirstEntry < r.forceFlushTimeout { continue } - if err := r.flushSource(context.Background(), source, true); err != nil { + if err := r.flushSource(context.Background(), source); err != nil { r.Errorf("there was error flushing combined logs %s", err) } } @@ -241,7 +241,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { // This is the first entry in the next batch case matches && r.matchIndicatesFirst(): // Flush the existing batch - err := r.flushSource(ctx, s, true) + err := r.flushSource(ctx, s) if err != nil { return err } @@ -252,7 +252,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { // This is the last entry in a complete batch case matches && r.matchIndicatesLast(): r.addToBatch(ctx, e, s) - return r.flushSource(ctx, s, true) + return r.flushSource(ctx, s) } // This is neither the first entry of a new log, @@ -302,7 +302,7 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str batch.recombined.WriteString(s) if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize { - if err := r.flushSource(ctx, source, false); err != nil { + if err := r.flushSource(ctx, source); err != nil { r.Errorf("there was error flushing combined logs %s", err) } } @@ -326,7 +326,7 @@ func (r *Transformer) flushUncombined(ctx context.Context) { func (r *Transformer) flushAllSources(ctx context.Context) { var errs []error for source := range r.batchMap { - if err := r.flushSource(ctx, source, true); err != nil { + if err := r.flushSource(ctx, source); err != nil { errs = append(errs, err) } } @@ -337,7 +337,7 @@ func (r *Transformer) flushAllSources(ctx context.Context) { // flushSource combines the entries currently in the batch into a single entry, // then forwards them to the next operator in the pipeline -func (r *Transformer) flushSource(ctx context.Context, source string, deleteSource bool) error { +func (r *Transformer) flushSource(ctx context.Context, source string) error { batch := r.batchMap[source] // Skip flushing a combined log if the batch is empty if batch == nil { @@ -366,13 +366,7 @@ func (r *Transformer) flushSource(ctx context.Context, source string, deleteSour } r.Write(ctx, base) - if deleteSource { - r.removeBatch(source) - } else { - batch.entries = batch.entries[:0] - batch.recombined.Reset() - } - + r.removeBatch(source) return nil } diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index 90164ee4276c..34e1ee29cc44 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -756,9 +756,8 @@ func TestSourceBatchDelete(t *testing.T) { ctx := context.Background() require.NoError(t, recombine.Process(ctx, start)) - require.NoError(t, recombine.Process(ctx, next)) require.Equal(t, 1, len(recombine.batchMap)) - require.NoError(t, recombine.flushSource(ctx, "file1", true)) + require.NoError(t, recombine.Process(ctx, next)) require.Equal(t, 0, len(recombine.batchMap)) fake.ExpectEntry(t, expect) require.NoError(t, recombine.Stop())