Skip to content

Commit

Permalink
Fix a rare deadlock in scanner.Stop
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 19, 2022
1 parent 41b96bb commit bc05257
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 7 deletions.
27 changes: 20 additions & 7 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/channel"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -102,8 +103,9 @@ type (
// of database tables to cleanup resources, monitor anamolies
// and emit stats for analytics
Scanner struct {
context scannerContext
wg sync.WaitGroup
context scannerContext
wg sync.WaitGroup
shutdownOnce channel.ShutdownOnce
}
)

Expand Down Expand Up @@ -137,6 +139,7 @@ func New(
workerFactory: workerFactory,
namespaceRegistry: registry,
},
shutdownOnce: channel.NewShutdownOnce(),
}
}

Expand Down Expand Up @@ -192,6 +195,7 @@ func (s *Scanner) Start() error {
}

func (s *Scanner) Stop() {
s.shutdownOnce.Shutdown()
s.wg.Wait()
}

Expand All @@ -205,24 +209,33 @@ func (s *Scanner) startWorkflowWithRetry(
policy := backoff.NewExponentialRetryPolicy(time.Second).
WithMaximumInterval(time.Minute).
WithExpirationInterval(backoff.NoInterval)
err := backoff.ThrottleRetry(func() error {
return s.startWorkflow(s.context.sdkClientFactory.GetSystemClient(), options, workflowType, workflowArgs...)
ctx, cancel := s.shutdownOnce.PropagateShutdown(context.Background())
defer cancel()
err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error {
return s.startWorkflow(
ctx,
s.context.sdkClientFactory.GetSystemClient(),
options,
workflowType,
workflowArgs...,
)
}, policy, func(err error) bool {
return true
})
if err != nil {
// if the scanner shuts down before the workflow is started, then the error will be context canceled
if err != nil && err != context.Canceled {
s.context.logger.Fatal("unable to start scanner", tag.WorkflowType(workflowType), tag.Error(err))
}
}

func (s *Scanner) startWorkflow(
ctx context.Context,
client sdkclient.Client,
options sdkclient.StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) error {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
_, err := client.ExecuteWorkflow(ctx, options, workflowType, workflowArgs...)
cancel()
if err != nil {
Expand Down
79 changes: 79 additions & 0 deletions service/worker/scanner/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
package scanner

import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/client"

"go.temporal.io/server/api/adminservicemock/v1"
"go.temporal.io/server/api/historyservicemock/v1"
Expand Down Expand Up @@ -216,3 +218,80 @@ func (s *scannerTestSuite) TestScannerEnabled() {
})
}
}

// TestScannerWorkflow tests that the scanner can be shut down even when it hasn't finished starting.
// This fixes a rare issue that can occur when Stop() is called quickly after Start(). When Start() is called, the
// scanner starts a new goroutine for each scanner type. In that goroutine, an sdk client is created which dials the
// frontend service. If the test driver calls Stop() on the server, then the server stops the frontend service and the
// history service. In some cases, the frontend services stops before the sdk client has finished connecting to it.
// This causes the startWorkflow() call to fail with an error. However, startWorkflowWithRetry retries the call for
// a whole minute, which causes the test to take a long time to fail. So, instead we immediately cancel all async
// requests when Stop() is called.
func (s *scannerTestSuite) TestScannerShutdown() {
ctrl := gomock.NewController(s.T())

logger := log.NewTestLogger()
mockSdkClientFactory := sdk.NewMockClientFactory(ctrl)
mockSdkClient := mocksdk.NewMockClient(ctrl)
mockNamespaceRegistry := namespace.NewMockRegistry(ctrl)
mockAdminClient := adminservicemock.NewMockAdminServiceClient(ctrl)
mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl)
worker := mocksdk.NewMockWorker(ctrl)
scanner := New(
logger,
&Config{
MaxConcurrentActivityExecutionSize: func() int {
return 1
},
MaxConcurrentWorkflowTaskExecutionSize: func() int {
return 1
},
MaxConcurrentActivityTaskPollers: func() int {
return 1
},
MaxConcurrentWorkflowTaskPollers: func() int {
return 1
},
HistoryScannerEnabled: func() bool {
return true
},
ExecutionsScannerEnabled: func() bool {
return false
},
TaskQueueScannerEnabled: func() bool {
return false
},
Persistence: &config.Persistence{
DefaultStore: config.StoreTypeNoSQL,
DataStores: map[string]config.DataStore{
config.StoreTypeNoSQL: {},
},
},
},
mockSdkClientFactory,
metrics.NoopMetricsHandler,
p.NewMockExecutionManager(ctrl),
p.NewMockTaskManager(ctrl),
historyservicemock.NewMockHistoryServiceClient(ctrl),
mockAdminClient,
mockNamespaceRegistry,
mockWorkerFactory,
)
mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().Start()
mockWorkerFactory.EXPECT().New(gomock.Any(), gomock.Any(), gomock.Any()).Return(worker)
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(
ctx context.Context,
options client.StartWorkflowOptions,
workflow interface{},
args ...interface{},
) (client.WorkflowRun, error) {
<-ctx.Done()
return nil, ctx.Err()
})
err := scanner.Start()
s.NoError(err)
scanner.Stop()
}

0 comments on commit bc05257

Please sign in to comment.