Skip to content

Commit

Permalink
Support Worker Deployments 3.1 (#1832)
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal authored Feb 22, 2025
1 parent d32c252 commit f5882aa
Show file tree
Hide file tree
Showing 41 changed files with 3,263 additions and 142 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ frontend.workerVersioningWorkflowAPIs:
- value: true
system.enableDeployments:
- value: true
system.enableDeploymentVersions:
- value: true
matching.wv.VersionDrainageStatusVisibilityGracePeriod:
- value: 10
matching.wv.VersionDrainageStatusRefreshInterval:
- value: 1
worker.buildIdScavengerEnabled:
- value: true
worker.removableBuildIdDurationSinceDefault:
Expand Down
271 changes: 238 additions & 33 deletions client/client.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/datadog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
go.temporal.io/api v1.44.0 // indirect
go.temporal.io/api v1.44.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/datadog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.temporal.io/api v1.44.0 h1:AhaSyyAjG0X09GUFO7z0ttp/c1e67CrL/FbdNFQ/HyA=
go.temporal.io/api v1.44.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.44.1 h1:sb5Hq08AB0WtYvfLJMiWmHzxjqs2b+6Jmzg4c8IOeng=
go.temporal.io/api v1.44.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
Expand Down
2 changes: 1 addition & 1 deletion contrib/opentelemetry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.temporal.io/api v1.44.0 // indirect
go.temporal.io/api v1.44.1 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/opentelemetry/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N
go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.temporal.io/api v1.44.0 h1:AhaSyyAjG0X09GUFO7z0ttp/c1e67CrL/FbdNFQ/HyA=
go.temporal.io/api v1.44.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.44.1 h1:sb5Hq08AB0WtYvfLJMiWmHzxjqs2b+6Jmzg4c8IOeng=
go.temporal.io/api v1.44.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
2 changes: 1 addition & 1 deletion contrib/opentracing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.temporal.io/api v1.44.0 // indirect
go.temporal.io/api v1.44.1 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/opentracing/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.44.0 h1:AhaSyyAjG0X09GUFO7z0ttp/c1e67CrL/FbdNFQ/HyA=
go.temporal.io/api v1.44.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.44.1 h1:sb5Hq08AB0WtYvfLJMiWmHzxjqs2b+6Jmzg4c8IOeng=
go.temporal.io/api v1.44.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
2 changes: 1 addition & 1 deletion contrib/resourcetuner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.temporal.io/api v1.44.0 // indirect
go.temporal.io/api v1.44.1 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/resourcetuner/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.einride.tech/pid v0.1.3 h1:yWAKSmD2Z10jxd4gYFhOjbBNqXeIQwAtnCO/XKCT7sQ=
go.einride.tech/pid v0.1.3/go.mod h1:33JSUbKrH/4v8DZf/0K8IC8Enjd92wB2birp+bCYQso=
go.temporal.io/api v1.44.0 h1:AhaSyyAjG0X09GUFO7z0ttp/c1e67CrL/FbdNFQ/HyA=
go.temporal.io/api v1.44.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.44.1 h1:sb5Hq08AB0WtYvfLJMiWmHzxjqs2b+6Jmzg4c8IOeng=
go.temporal.io/api v1.44.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
Expand Down
2 changes: 1 addition & 1 deletion contrib/tally/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/twmb/murmur3 v1.1.5 // indirect
go.temporal.io/api v1.44.0 // indirect
go.temporal.io/api v1.44.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/tally/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.44.0 h1:AhaSyyAjG0X09GUFO7z0ttp/c1e67CrL/FbdNFQ/HyA=
go.temporal.io/api v1.44.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.44.1 h1:sb5Hq08AB0WtYvfLJMiWmHzxjqs2b+6Jmzg4c8IOeng=
go.temporal.io/api v1.44.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/pborman/uuid v1.2.1
github.com/robfig/cron v1.2.0
github.com/stretchr/testify v1.10.0
go.temporal.io/api v1.44.0
go.temporal.io/api v1.44.1
golang.org/x/sync v0.8.0
golang.org/x/sys v0.24.0
golang.org/x/time v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.44.0 h1:AhaSyyAjG0X09GUFO7z0ttp/c1e67CrL/FbdNFQ/HyA=
go.temporal.io/api v1.44.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.44.1 h1:sb5Hq08AB0WtYvfLJMiWmHzxjqs2b+6Jmzg4c8IOeng=
go.temporal.io/api v1.44.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
5 changes: 5 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,13 @@ type (
ScheduleClient() ScheduleClient

// DeploymentClient creates a new deployment client with the same gRPC connection as this client.
//
// Deprecated: Use [WorkerDeploymentClient]
DeploymentClient() DeploymentClient

// WorkerDeploymentClient creates a new worker deployment client with the same gRPC connection as this client.
WorkerDeploymentClient() WorkerDeploymentClient

// Close client and clean up underlying resources.
Close()
}
Expand Down
3 changes: 3 additions & 0 deletions internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ func (b *builder) integrationTest() error {
"--dynamic-config-value", "worker.buildIdScavengerEnabled=true",
"--dynamic-config-value", "worker.removableBuildIdDurationSinceDefault=1",
"--dynamic-config-value", "system.enableDeployments=true",
"--dynamic-config-value", "system.enableDeploymentVersions=true",
"--dynamic-config-value", "matching.wv.VersionDrainageStatusVisibilityGracePeriod=10",
"--dynamic-config-value", "matching.wv.VersionDrainageStatusRefreshInterval=1",
"--http-port", "7243", // Nexus tests use the HTTP port directly
"--dynamic-config-value", `component.callbacks.allowedAddresses=[{"Pattern":"*","AllowInsecure":true}]`, // SDK tests use arbitrary callback URLs, permit that on the server
},
Expand Down
17 changes: 11 additions & 6 deletions internal/internal_nexus_task_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ func newNexusTaskPoller(
) *nexusTaskPoller {
return &nexusTaskPoller{
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
deploymentSeriesName: params.DeploymentSeriesName,
capabilities: params.capabilities,
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.WorkerDeploymentVersion,
deploymentSeriesName: params.DeploymentSeriesName,
capabilities: params.capabilities,
},
taskHandler: taskHandler,
service: service,
Expand Down Expand Up @@ -96,6 +97,10 @@ func (ntp *nexusTaskPoller) poll(ctx context.Context) (taskForWorker, error) {
UseVersioning: ntp.useBuildIDVersioning,
DeploymentSeriesName: ntp.deploymentSeriesName,
},
DeploymentOptions: workerDeploymentOptionsToProto(
ntp.useBuildIDVersioning,
ntp.workerDeploymentVersion,
),
}

response, err := ntp.pollNexusTaskQueue(ctx, request)
Expand Down
20 changes: 16 additions & 4 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type (
identity string
workerBuildID string
useBuildIDForVersioning bool
workerDeploymentVersion string
deploymentSeriesName string
defaultVersioningBehavior VersioningBehavior
enableLoggingInReplay bool
Expand Down Expand Up @@ -173,6 +174,7 @@ type (
maxHeartbeatThrottleInterval time.Duration
versionStamp *commonpb.WorkerVersionStamp
deployment *deploymentpb.Deployment
workerDeploymentOptions *deploymentpb.WorkerDeploymentOptions
}

// history wrapper method to help information about events.
Expand Down Expand Up @@ -559,6 +561,7 @@ func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePoin
identity: params.Identity,
workerBuildID: params.getBuildID(),
useBuildIDForVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.WorkerDeploymentVersion,
deploymentSeriesName: params.DeploymentSeriesName,
defaultVersioningBehavior: params.DefaultVersioningBehavior,
enableLoggingInReplay: params.EnableLoggingInReplay,
Expand Down Expand Up @@ -1945,11 +1948,16 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
BuildId: wth.workerBuildID,
SeriesName: wth.deploymentSeriesName,
},
DeploymentOptions: workerDeploymentOptionsToProto(
wth.useBuildIDForVersioning,
wth.workerDeploymentVersion,
),
}
if wth.capabilities != nil && wth.capabilities.BuildIdBasedVersioning {
builtRequest.BinaryChecksum = ""
}
if wth.useBuildIDForVersioning && wth.deploymentSeriesName != "" {
if (wth.useBuildIDForVersioning && wth.deploymentSeriesName != "") ||
wth.workerDeploymentVersion != "" {
workflowType := workflowContext.workflowInfo.WorkflowType
if behavior, ok := wth.registry.getWorkflowVersioningBehavior(workflowType); ok {
builtRequest.VersioningBehavior = versioningBehaviorToProto(behavior)
Expand Down Expand Up @@ -2014,6 +2022,10 @@ func newActivityTaskHandlerWithCustomProvider(
BuildId: params.getBuildID(),
SeriesName: params.DeploymentSeriesName,
},
workerDeploymentOptions: workerDeploymentOptionsToProto(
params.UseBuildIDForVersioning,
params.WorkerDeploymentVersion,
),
}
}

Expand Down Expand Up @@ -2222,7 +2234,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
metricsHandler.Counter(metrics.UnregisteredActivityInvocationCounter).Inc(1)
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil,
NewActivityNotRegisteredError(activityType, ath.getRegisteredActivityNames()),
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment), nil
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions), nil
}

// panic handler
Expand All @@ -2240,7 +2252,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
metricsHandler.Counter(metrics.ActivityTaskErrorCounter).Inc(1)
panicErr := newPanicError(p, st)
result = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr,
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment)
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions)
}
}()

Expand Down Expand Up @@ -2280,7 +2292,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
)
}
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, output, err,
ath.dataConverter, ath.failureConverter, ath.namespace, isActivityCancel, ath.versionStamp, ath.deployment), nil
ath.dataConverter, ath.failureConverter, ath.namespace, isActivityCancel, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions), nil
}

func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
Expand Down
Loading

0 comments on commit f5882aa

Please sign in to comment.