From 02a07dc7cb42a66238423c60caace8aa8f8cc739 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 2 Sep 2024 17:00:07 -0700 Subject: [PATCH] Remove pooling from intake v2 processor Signed-off-by: Marc Lopez Rubio --- input/elasticapm/processor.go | 41 +++++------------------------------ 1 file changed, 5 insertions(+), 36 deletions(-) diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 0994687a..63e25e05 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "io" - "sync" "go.elastic.co/apm/v2" "go.uber.org/zap" @@ -40,8 +39,6 @@ var ( errUnrecognizedObject = errors.New("did not recognize object type") errEmptyBody = errors.New("empty body") - - batchPool sync.Pool ) const ( @@ -63,10 +60,9 @@ const ( // The buffered channel is meant to be shared between all the processors so // the concurrency limit is shared between all the intake endpoints. type Processor struct { - streamReaderPool sync.Pool - sem input.Semaphore - logger *zap.Logger - MaxEventSize int + sem input.Semaphore + logger *zap.Logger + MaxEventSize int } // Config holds configuration for Processor constructors. @@ -256,10 +252,7 @@ func (p *Processor) HandleStream( sr := p.getStreamReader(reader) // Release the semaphore on early exit - defer func() { - sr.release() - p.sem.Release(1) - }() + defer p.sem.Release(1) // The first item is the metadata object. if err := p.readMetadata(sr, baseEvent); err != nil { @@ -276,15 +269,7 @@ func (p *Processor) HandleStream( } } - var batch modelpb.Batch - if b, ok := batchPool.Get().(*modelpb.Batch); ok { - batch = (*b)[:0] - } else { - batch = make(modelpb.Batch, 0, batchSize) - } - - defer batchPool.Put(&batch) - + batch := make(modelpb.Batch, 0, batchSize) for { // reuse the batch for future iterations without pooling each time batch = batch[:0] @@ -321,20 +306,11 @@ func (p *Processor) handleStream( // processBatch processes the batch and returns the events to the pool after it's been processed. func (p *Processor) processBatch(ctx context.Context, processor modelpb.BatchProcessor, batch *modelpb.Batch) error { - defer func() { - for i := range *batch { - (*batch)[i].ReturnToVTPool() - } - }() return processor.ProcessBatch(ctx, batch) } // getStreamReader returns a streamReader that reads ND-JSON lines from r. func (p *Processor) getStreamReader(r io.Reader) *streamReader { - if sr, ok := p.streamReaderPool.Get().(*streamReader); ok { - sr.Reset(r) - return sr - } return &streamReader{ processor: p, NDJSONStreamDecoder: decoder.NewNDJSONStreamDecoder(r, p.MaxEventSize), @@ -354,13 +330,6 @@ type streamReader struct { *decoder.NDJSONStreamDecoder } -// release releases the streamReader, adding it to its Processor's sync.Pool. -// The streamReader must not be used after release returns. -func (sr *streamReader) release() { - sr.Reset(nil) - sr.processor.streamReaderPool.Put(sr) -} - func (sr *streamReader) wrapError(err error) error { if _, ok := err.(decoder.JSONDecodeError); ok { return &InvalidInputError{