Skip to content

Commit

Permalink
Move SetStickyWorkflowCacheSize to global config setup (#1836)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew authored Feb 19, 2025
1 parent 7e3d821 commit e44e74e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 31 deletions.
51 changes: 20 additions & 31 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ func (ts *IntegrationTestSuite) SetupTest() {
WorkflowPanicPolicy: panicPolicy,
}

worker.SetStickyWorkflowCacheSize(ts.config.maxWorkflowCacheSize)

if strings.Contains(ts.T().Name(), "Session") {
options.EnableSessionWorker = true
// Limit the session execution size
Expand Down Expand Up @@ -3033,35 +3031,26 @@ func (ts *IntegrationTestSuite) waitForQueryTrue(run client.WorkflowRun, query s
ts.True(result, "query didn't return true in reasonable amount of time")
}

//func (ts *IntegrationTestSuite) TestNumPollersCounter() {
// _, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// defer cancel()
// assertNumPollersEventually := func(expected float64, pollerType string, tags ...string) {
// // Try for two seconds
// var lastCount float64
// for start := time.Now(); time.Since(start) <= 10*time.Second; {
// lastCount = ts.metricGauge(
// metrics.NumPoller,
// "poller_type", pollerType,
// "task_queue", ts.taskQueueName,
// )
// if lastCount == expected {
// return
// }
// time.Sleep(50 * time.Millisecond)
// }
// // Will fail
// ts.Equal(expected, lastCount)
// }
// if ts.config.maxWorkflowCacheSize == 0 {
// assertNumPollersEventually(2, "workflow_task")
// assertNumPollersEventually(0, "workflow_sticky_task")
// } else {
// assertNumPollersEventually(1, "workflow_task")
// assertNumPollersEventually(1, "workflow_sticky_task")
// }
// assertNumPollersEventually(2, "activity_task")
//}
func (ts *IntegrationTestSuite) TestNumPollersCounter() {
assertNumPollersEventually := func(expected float64, pollerType string) {
ts.Require().EventuallyWithT(func(t *assert.CollectT) {
lastCount := ts.metricGauge(
metrics.NumPoller,
"poller_type", pollerType,
"task_queue", ts.taskQueueName,
)
assert.Equal(t, expected, lastCount)
}, 10*time.Second, 50*time.Millisecond)
}
if ts.config.maxWorkflowCacheSize == 0 {
assertNumPollersEventually(2, "workflow_task")
assertNumPollersEventually(0, "workflow_sticky_task")
} else {
assertNumPollersEventually(1, "workflow_task")
assertNumPollersEventually(1, "workflow_sticky_task")
}
assertNumPollersEventually(2, "activity_task")
}

func (ts *IntegrationTestSuite) TestSlotsAvailableCounter() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
2 changes: 2 additions & 0 deletions test/test_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"crypto/tls"
"fmt"
"go.temporal.io/sdk/worker"
"log"
"net"
"os"
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewConfig() Config {
}
cfg.maxWorkflowCacheSize = asInt
}
worker.SetStickyWorkflowCacheSize(cfg.maxWorkflowCacheSize)
if debug := getDebug(); debug != "" {
cfg.Debug = debug == "true"
}
Expand Down

0 comments on commit e44e74e

Please sign in to comment.