Skip to content

Commit

Permalink
Remove pooling from intake v2 processor
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Sep 3, 2024
1 parent cb1b411 commit 02a07dc
Showing 1 changed file with 5 additions and 36 deletions.
41 changes: 5 additions & 36 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"sync"

"go.elastic.co/apm/v2"
"go.uber.org/zap"
Expand All @@ -40,8 +39,6 @@ var (
errUnrecognizedObject = errors.New("did not recognize object type")

errEmptyBody = errors.New("empty body")

batchPool sync.Pool
)

const (
Expand All @@ -63,10 +60,9 @@ const (
// The buffered channel is meant to be shared between all the processors so
// the concurrency limit is shared between all the intake endpoints.
type Processor struct {
streamReaderPool sync.Pool
sem input.Semaphore
logger *zap.Logger
MaxEventSize int
sem input.Semaphore
logger *zap.Logger
MaxEventSize int
}

// Config holds configuration for Processor constructors.
Expand Down Expand Up @@ -256,10 +252,7 @@ func (p *Processor) HandleStream(
sr := p.getStreamReader(reader)

// Release the semaphore on early exit
defer func() {
sr.release()
p.sem.Release(1)
}()
defer p.sem.Release(1)

// The first item is the metadata object.
if err := p.readMetadata(sr, baseEvent); err != nil {
Expand All @@ -276,15 +269,7 @@ func (p *Processor) HandleStream(
}
}

var batch modelpb.Batch
if b, ok := batchPool.Get().(*modelpb.Batch); ok {
batch = (*b)[:0]
} else {
batch = make(modelpb.Batch, 0, batchSize)
}

defer batchPool.Put(&batch)

batch := make(modelpb.Batch, 0, batchSize)
for {
// reuse the batch for future iterations without pooling each time
batch = batch[:0]
Expand Down Expand Up @@ -321,20 +306,11 @@ func (p *Processor) handleStream(

// processBatch processes the batch and returns the events to the pool after it's been processed.
func (p *Processor) processBatch(ctx context.Context, processor modelpb.BatchProcessor, batch *modelpb.Batch) error {
defer func() {
for i := range *batch {
(*batch)[i].ReturnToVTPool()
}
}()
return processor.ProcessBatch(ctx, batch)
}

// getStreamReader returns a streamReader that reads ND-JSON lines from r.
func (p *Processor) getStreamReader(r io.Reader) *streamReader {
if sr, ok := p.streamReaderPool.Get().(*streamReader); ok {
sr.Reset(r)
return sr
}
return &streamReader{
processor: p,
NDJSONStreamDecoder: decoder.NewNDJSONStreamDecoder(r, p.MaxEventSize),
Expand All @@ -354,13 +330,6 @@ type streamReader struct {
*decoder.NDJSONStreamDecoder
}

// release releases the streamReader, adding it to its Processor's sync.Pool.
// The streamReader must not be used after release returns.
func (sr *streamReader) release() {
sr.Reset(nil)
sr.processor.streamReaderPool.Put(sr)
}

func (sr *streamReader) wrapError(err error) error {
if _, ok := err.(decoder.JSONDecodeError); ok {
return &InvalidInputError{
Expand Down

0 comments on commit 02a07dc

Please sign in to comment.