-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[chore][pkg/stanza] Cleanup recombine operator files
- Loading branch information
1 parent
c5316c6
commit cd94644
Showing
4 changed files
with
366 additions
and
352 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package recombine // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine" | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/expr-lang/expr/vm" | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" | ||
) | ||
|
||
const ( | ||
operatorType = "recombine" | ||
defaultCombineWith = "\n" | ||
) | ||
|
||
func init() { | ||
operator.Register(operatorType, func() operator.Builder { return NewConfig() }) | ||
} | ||
|
||
// NewConfig creates a new recombine config with default values | ||
func NewConfig() *Config { | ||
return NewConfigWithID(operatorType) | ||
} | ||
|
||
// NewConfigWithID creates a new recombine config with default values | ||
func NewConfigWithID(operatorID string) *Config { | ||
return &Config{ | ||
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType), | ||
MaxBatchSize: 1000, | ||
MaxSources: 1000, | ||
CombineWith: defaultCombineWith, | ||
OverwriteWith: "newest", | ||
ForceFlushTimeout: 5 * time.Second, | ||
SourceIdentifier: entry.NewAttributeField("file.path"), | ||
} | ||
} | ||
|
||
// Config is the configuration of a recombine operator | ||
type Config struct { | ||
helper.TransformerConfig `mapstructure:",squash"` | ||
IsFirstEntry string `mapstructure:"is_first_entry"` | ||
IsLastEntry string `mapstructure:"is_last_entry"` | ||
MaxBatchSize int `mapstructure:"max_batch_size"` | ||
CombineField entry.Field `mapstructure:"combine_field"` | ||
CombineWith string `mapstructure:"combine_with"` | ||
SourceIdentifier entry.Field `mapstructure:"source_identifier"` | ||
OverwriteWith string `mapstructure:"overwrite_with"` | ||
ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"` | ||
MaxSources int `mapstructure:"max_sources"` | ||
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` | ||
} | ||
|
||
// Build creates a new Transformer from a config | ||
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { | ||
transformer, err := c.TransformerConfig.Build(logger) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to build transformer config: %w", err) | ||
} | ||
|
||
if c.IsLastEntry != "" && c.IsFirstEntry != "" { | ||
return nil, fmt.Errorf("only one of is_first_entry and is_last_entry can be set") | ||
} | ||
|
||
if c.IsLastEntry == "" && c.IsFirstEntry == "" { | ||
return nil, fmt.Errorf("one of is_first_entry and is_last_entry must be set") | ||
} | ||
|
||
var matchesFirst bool | ||
var prog *vm.Program | ||
if c.IsFirstEntry != "" { | ||
matchesFirst = true | ||
prog, err = helper.ExprCompileBool(c.IsFirstEntry) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to compile is_first_entry: %w", err) | ||
} | ||
} else { | ||
matchesFirst = false | ||
prog, err = helper.ExprCompileBool(c.IsLastEntry) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to compile is_last_entry: %w", err) | ||
} | ||
} | ||
|
||
if c.CombineField.FieldInterface == nil { | ||
return nil, fmt.Errorf("missing required argument 'combine_field'") | ||
} | ||
|
||
var overwriteWithOldest bool | ||
switch c.OverwriteWith { | ||
case "newest": | ||
overwriteWithOldest = false | ||
case "oldest", "": | ||
overwriteWithOldest = true | ||
default: | ||
return nil, fmt.Errorf("invalid value '%s' for parameter 'overwrite_with'", c.OverwriteWith) | ||
} | ||
|
||
return &Transformer{ | ||
TransformerOperator: transformer, | ||
matchFirstLine: matchesFirst, | ||
prog: prog, | ||
maxBatchSize: c.MaxBatchSize, | ||
maxSources: c.MaxSources, | ||
overwriteWithOldest: overwriteWithOldest, | ||
batchMap: make(map[string]*sourceBatch), | ||
batchPool: sync.Pool{ | ||
New: func() any { | ||
return &sourceBatch{ | ||
recombined: &bytes.Buffer{}, | ||
} | ||
}, | ||
}, | ||
combineField: c.CombineField, | ||
combineWith: c.CombineWith, | ||
forceFlushTimeout: c.ForceFlushTimeout, | ||
ticker: time.NewTicker(c.ForceFlushTimeout), | ||
chClose: make(chan struct{}), | ||
sourceIdentifier: c.SourceIdentifier, | ||
maxLogSize: int64(c.MaxLogSize), | ||
}, nil | ||
} |
Oops, something went wrong.