Skip to content

Commit

Permalink
[pkg/stanza] improve performance (open-telemetry#16027)
Browse files Browse the repository at this point in the history
* [pkg/stanza] improve performance
  • Loading branch information
newly12 authored and shalper2 committed Dec 6, 2022
1 parent 15a9f27 commit a556d1d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 17 deletions.
16 changes: 16 additions & 0 deletions .chloggen/pkg-stanza-perf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "improve performance"

# One or more tracking issues related to the change
issues: [16028]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 1 addition & 3 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) {
readerConfig: &readerConfig{
fingerprintSize: DefaultFingerprintSize,
maxLogSize: defaultMaxLogSize,
emit: func(_ context.Context, attrs *FileAttributes, token []byte) {
emitChan <- &emitParams{attrs, token}
},
emit: testEmitFunc(emitChan),
},
fromBeginning: true,
splitterFactory: newMultilineSplitterFactory(
Expand Down
12 changes: 9 additions & 3 deletions pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func testEmitFunc(emitChan chan *emitParams) EmitFunc {
return func(_ context.Context, attrs *FileAttributes, token []byte) {
copied := make([]byte, len(token))
copy(copied, token)
emitChan <- &emitParams{attrs, copied}
}
}

// includeDir is a builder-like helper for quickly setting up a test config
func (c *Config) includeDir(dir string) *Config {
c.Include = append(c.Include, fmt.Sprintf("%s/*", dir))
Expand All @@ -56,9 +64,7 @@ func buildTestManager(t *testing.T, cfg *Config) (*Manager, chan *emitParams) {
}

func buildTestManagerWithEmit(t *testing.T, cfg *Config, emitChan chan *emitParams) *Manager {
input, err := cfg.Build(testutil.Logger(t), func(_ context.Context, attrs *FileAttributes, token []byte) {
emitChan <- &emitParams{attrs, token}
})
input, err := cfg.Build(testutil.Logger(t), testEmitFunc(emitChan))
require.NoError(t, err)
return input
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/stanza/operator/helper/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,28 @@ func (c EncodingConfig) Build() (Encoding, error) {
}

return Encoding{
Encoding: enc,
Encoding: enc,
decodeBuffer: make([]byte, 1<<12),
decoder: enc.NewDecoder(),
}, nil
}

type Encoding struct {
Encoding encoding.Encoding
Encoding encoding.Encoding
decoder *encoding.Decoder
decodeBuffer []byte
}

// decode converts the bytes in msgBuf to utf-8 from the configured encoding
// Decode converts the bytes in msgBuf to utf-8 from the configured encoding
func (e *Encoding) Decode(msgBuf []byte) ([]byte, error) {
decodeBuffer := make([]byte, 1<<12)
decoder := e.Encoding.NewDecoder()

for {
decoder.Reset()
nDst, _, err := decoder.Transform(decodeBuffer, msgBuf, true)
e.decoder.Reset()
nDst, _, err := e.decoder.Transform(e.decodeBuffer, msgBuf, true)
if err == nil {
return decodeBuffer[:nDst], nil
return e.decodeBuffer[:nDst], nil
}
if errors.Is(err, transform.ErrShortDst) {
decodeBuffer = make([]byte, len(decodeBuffer)*2)
e.decodeBuffer = make([]byte, len(e.decodeBuffer)*2)
continue
}
return nil, fmt.Errorf("transform encoding: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}
if helper.IsNop(c.Config.Splitter.EncodingConfig.Encoding) {
toBody = func(token []byte) interface{} {
return token
copied := make([]byte, len(token))
copy(copied, token)
return copied
}
}

Expand Down

0 comments on commit a556d1d

Please sign in to comment.