diff --git a/test/integration_test.go b/test/integration_test.go index dbaa4f2f4..24b74cfa5 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -225,8 +225,6 @@ func (ts *IntegrationTestSuite) SetupTest() { WorkflowPanicPolicy: panicPolicy, } - worker.SetStickyWorkflowCacheSize(ts.config.maxWorkflowCacheSize) - if strings.Contains(ts.T().Name(), "Session") { options.EnableSessionWorker = true // Limit the session execution size @@ -3033,35 +3031,26 @@ func (ts *IntegrationTestSuite) waitForQueryTrue(run client.WorkflowRun, query s ts.True(result, "query didn't return true in reasonable amount of time") } -//func (ts *IntegrationTestSuite) TestNumPollersCounter() { -// _, cancel := context.WithTimeout(context.Background(), 10*time.Second) -// defer cancel() -// assertNumPollersEventually := func(expected float64, pollerType string, tags ...string) { -// // Try for two seconds -// var lastCount float64 -// for start := time.Now(); time.Since(start) <= 10*time.Second; { -// lastCount = ts.metricGauge( -// metrics.NumPoller, -// "poller_type", pollerType, -// "task_queue", ts.taskQueueName, -// ) -// if lastCount == expected { -// return -// } -// time.Sleep(50 * time.Millisecond) -// } -// // Will fail -// ts.Equal(expected, lastCount) -// } -// if ts.config.maxWorkflowCacheSize == 0 { -// assertNumPollersEventually(2, "workflow_task") -// assertNumPollersEventually(0, "workflow_sticky_task") -// } else { -// assertNumPollersEventually(1, "workflow_task") -// assertNumPollersEventually(1, "workflow_sticky_task") -// } -// assertNumPollersEventually(2, "activity_task") -//} +func (ts *IntegrationTestSuite) TestNumPollersCounter() { + assertNumPollersEventually := func(expected float64, pollerType string) { + ts.Require().EventuallyWithT(func(t *assert.CollectT) { + lastCount := ts.metricGauge( + metrics.NumPoller, + "poller_type", pollerType, + "task_queue", ts.taskQueueName, + ) + assert.Equal(t, expected, lastCount) + }, 10*time.Second, 50*time.Millisecond) + } + if ts.config.maxWorkflowCacheSize == 0 { + assertNumPollersEventually(2, "workflow_task") + assertNumPollersEventually(0, "workflow_sticky_task") + } else { + assertNumPollersEventually(1, "workflow_task") + assertNumPollersEventually(1, "workflow_sticky_task") + } + assertNumPollersEventually(2, "activity_task") +} func (ts *IntegrationTestSuite) TestSlotsAvailableCounter() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/test/test_utils_test.go b/test/test_utils_test.go index 541775faa..913c2e7cd 100644 --- a/test/test_utils_test.go +++ b/test/test_utils_test.go @@ -28,6 +28,7 @@ import ( "context" "crypto/tls" "fmt" + "go.temporal.io/sdk/worker" "log" "net" "os" @@ -92,6 +93,7 @@ func NewConfig() Config { } cfg.maxWorkflowCacheSize = asInt } + worker.SetStickyWorkflowCacheSize(cfg.maxWorkflowCacheSize) if debug := getDebug(); debug != "" { cfg.Debug = debug == "true" }