Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix visibility processor panic on add after stop #3830

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -69,6 +70,7 @@ type (
logger log.Logger
metricsHandler metrics.Handler
indexerConcurrency uint32
shutdownLock sync.RWMutex
}

// ProcessorConfig contains all configs for processor
Expand Down Expand Up @@ -97,6 +99,10 @@ const (
visibilityProcessorName = "visibility-processor"
)

var (
errVisibilityShutdown = errors.New("visiblity processor was shut down")
)

// NewProcessor create new processorImpl
func NewProcessor(
cfg *ProcessorConfig,
Expand Down Expand Up @@ -150,14 +156,15 @@ func (p *processorImpl) Stop() {
return
}

p.shutdownLock.Lock()
defer p.shutdownLock.Unlock()

err := p.bulkProcessor.Stop()
if err != nil {
// This could happen if ES is down when we're trying to shut down the server.
p.logger.Error("Unable to stop Elasticsearch processor.", tag.LifeCycleStopFailed, tag.Error(err))
return
}
p.mapToAckFuture = nil
p.bulkProcessor = nil
}

func (p *processorImpl) hashFn(key interface{}) uint32 {
Expand All @@ -172,7 +179,18 @@ func (p *processorImpl) hashFn(key interface{}) uint32 {

// Add request to the bulk and return a future object which will receive ack signal when request is processed.
func (p *processorImpl) Add(request *client.BulkableRequest, visibilityTaskKey string) *future.FutureImpl[bool] {
newFuture := newAckFuture()
newFuture := newAckFuture() // Create future first to measure impact of following RWLock on latency

p.shutdownLock.RLock()
defer p.shutdownLock.RUnlock()

if atomic.LoadInt32(&p.status) == common.DaemonStatusStopped {
p.logger.Warn("Rejecting ES request for visibility task key because processor has been shut down.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc))
errFuture := future.NewFuture[bool]()
errFuture.Set(false, errVisibilityShutdown)
return errFuture
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: now we can reuse the future in newFuture and avoid allocating a new channel.

Not related to your PR but it seems like the assignment on L202 will cause the underlying channel in newFuture not getting closed. I am not sure if golang GC can handle this case or result in some leak. cc @alexshtin

}

_, isDup, _ := p.mapToAckFuture.PutOrDo(visibilityTaskKey, newFuture, func(key interface{}, value interface{}) error {
existingFuture, ok := value.(*ackFuture)
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/olivere/elastic/v7"
"github.com/stretchr/testify/suite"

"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/future"
Expand Down Expand Up @@ -89,6 +90,7 @@ func (s *processorSuite) SetupTest() {
// esProcessor.Start mock
s.esProcessor.mapToAckFuture = collection.NewShardedConcurrentTxMap(1024, s.esProcessor.hashFn)
s.esProcessor.bulkProcessor = s.mockBulkProcessor
s.esProcessor.status = common.DaemonStatusStarted
}

func (s *processorSuite) TearDownTest() {
Expand Down Expand Up @@ -126,8 +128,8 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() {
s.NotNil(p.bulkProcessor)

p.Stop()
s.Nil(p.mapToAckFuture)
s.Nil(p.bulkProcessor)
s.NotNil(p.mapToAckFuture)
s.NotNil(p.bulkProcessor)
}

func (s *processorSuite) TestAdd() {
Expand Down Expand Up @@ -219,6 +221,43 @@ func (s *processorSuite) TestAdd_ConcurrentAdd_Duplicates() {
s.Equal(1, s.esProcessor.mapToAckFuture.Len(), "only one request should be in the bulk")
}

func (s *processorSuite) TestAdd_ConcurrentAdd_Shutdown() {
request := &client.BulkableRequest{}
docsCount := 1000
parallelFactor := 10
futures := make([]future.Future[bool], docsCount)

s.mockBulkProcessor.EXPECT().Add(request).MaxTimes(docsCount + 2) // +2 for explicit adds before and after shutdown
s.mockBulkProcessor.EXPECT().Stop().Return(nil).Times(1)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).MaxTimes(docsCount + 2)

addBefore := s.esProcessor.Add(request, "test-key-before")

wg := sync.WaitGroup{}
wg.Add(parallelFactor + 1) // +1 for separate shutdown goroutine
for i := 0; i < parallelFactor; i++ {
go func(i int) {
for j := 0; j < docsCount/parallelFactor; j++ {
futures[i*docsCount/parallelFactor+j] = s.esProcessor.Add(request, fmt.Sprintf("test-key-%d-%d", i, j))
}
wg.Done()
}(i)
}
go func() {
time.Sleep(1 * time.Millisecond) // slight delay so at least a few docs get added
s.esProcessor.Stop()
wg.Done()
}()

wg.Wait()
addAfter := s.esProcessor.Add(request, "test-key-after")

s.False(addBefore.Ready()) // first request should be in bulk
s.True(addAfter.Ready()) // final request should be only error
_, err := addAfter.Get(context.Background())
s.ErrorIs(err, errVisibilityShutdown)
}

func (s *processorSuite) TestBulkAfterAction_Ack() {
version := int64(3)
testKey := "testKey"
Expand Down