Skip to content

Commit

Permalink
Merge branch 'master' into handle-max-signal-count
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Mar 13, 2023
2 parents f1aaba8 + 9206f15 commit 6e604a8
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 73 deletions.
5 changes: 5 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
version: v1
breaking:
ignore:
# TODO: Remove after PR 237
- temporal/api/taskqueue/v1
- temporal/api/workflowservice/v1
- temporal/api/history/v1
use:
- PACKAGE
lint:
Expand Down
17 changes: 17 additions & 0 deletions temporal/api/common/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,20 @@ message MeteringMetadata {
// aip.dev/not-precedent: Negative values make no sense to represent. --)
uint32 nonfirst_local_activity_execution_attempts = 13;
}

// Identifies the version(s) of a worker that processed a task
message WorkerVersionStamp {
// An opaque whole-worker identifier
string build_id = 1;
// Set if the worker used a dynamically loadable bundle to process
// the task. The bundle could be a WASM blob, JS bundle, etc.
string bundle_id = 2;
}

// Identifies the version(s) that a worker is compatible with when polling or identifying itself
message WorkerVersionCapabilities {
// An opaque whole-worker identifier
string build_id = 1;

// Later, may include info like "I can process WASM and/or JS bundles"
}
7 changes: 4 additions & 3 deletions temporal/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ message WorkflowTaskCompletedEventAttributes {
string identity = 3;
// Binary ID of the worker who completed this task
string binary_checksum = 4;
// ID of the worker who picked up this workflow task, or missing if worker
// is not using versioning.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 5;
// Version info of the worker who processed this workflow task, or missing if worker is not
// using versioning. If present, the `build_id` field within is also used as `binary_checksum`,
// which may be omitted in that case (it may also be populated to preserve compatability).
temporal.api.common.v1.WorkerVersionStamp worker_version = 5;
// Data the SDK wishes to record for itself, but server need not interpret, and does not
// directly impact workflow state.
temporal.api.sdk.v1.WorkflowTaskCompletedMetadata sdk_metadata = 6;
Expand Down
34 changes: 12 additions & 22 deletions temporal/api/taskqueue/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import "google/protobuf/wrappers.proto";
import "dependencies/gogoproto/gogo.proto";

import "temporal/api/enums/v1/task_queue.proto";
import "temporal/api/common/v1/message.proto";

// See https://docs.temporal.io/docs/concepts/task-queues/
message TaskQueue {
Expand Down Expand Up @@ -71,13 +72,12 @@ message TaskQueuePartitionMetadata {
}

message PollerInfo {
// Unix Nano
google.protobuf.Timestamp last_access_time = 1 [(gogoproto.stdtime) = true];
string identity = 2;
double rate_per_second = 3;
// If a worker has specified an ID for use with the worker versioning feature while polling,
// that id must appear here.
VersionId worker_versioning_id = 4;
// If a worker has opted into the worker versioning feature while polling, its capabilities will
// appear here.
temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 4;
}

message StickyExecutionAttributes {
Expand All @@ -87,22 +87,12 @@ message StickyExecutionAttributes {
google.protobuf.Duration schedule_to_start_timeout = 2 [(gogoproto.stdduration) = true];
}

// Used by the worker versioning APIs, represents a node in the version graph for a particular
// task queue
message VersionIdNode {
VersionId version = 1;
// A pointer to the previous version this version is considered to be compatible with
VersionIdNode previous_compatible = 2;
// A pointer to the last incompatible version (previous major version)
VersionIdNode previous_incompatible = 3;
// Used by the worker versioning APIs, represents an ordering of one or more versions which are
// considered to be compatible with each other. Currently the versions are always worker build ids.
message CompatibleVersionSet {
// A unique identifier for this version set. Users don't need to understand or care about this
// value, but it has value for debugging purposes.
string version_set_id = 1;
// All the compatible versions, ordered from oldest to newest
repeated string build_ids = 2;
}

// Used by the worker versioning APIs, represents a specific version of something
// Currently, that's just a whole-worker id. In the future, if we support
// WASM workflow bundle based versioning, for example, then the inside of this
// message may become a oneof of different version types.
message VersionId {
// An opaque whole-worker identifier
string worker_build_id = 1;
}

2 changes: 2 additions & 0 deletions temporal/api/workflow/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ message WorkflowExecutionInfo {
string task_queue = 13;
int64 state_transition_count = 14;
int64 history_size_bytes = 15;
// If set, the most recent worker version stamp that appeared in a workflow task completion
temporal.api.common.v1.WorkerVersionStamp most_recent_worker_version_stamp = 16;
}

message WorkflowExecutionConfig {
Expand Down
144 changes: 102 additions & 42 deletions temporal/api/workflowservice/v1/request_response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ message PollWorkflowTaskQueueRequest {
// Each worker process should provide an ID unique to the specific set of code it is running
// "checksum" in this field name isn't very accurate, it should be though of as an id.
string binary_checksum = 4;
// If set, the worker is opting in to build-id based versioning and wishes to only
// receive tasks that are considered compatible with the version provided.
// If set, the worker is opting in to versioning and wishes to only
// receive tasks that are considered compatible with the version capabilities provided.
// Doing so only makes sense in conjunction with the `UpdateWorkerBuildIdOrdering` API.
// When `worker_versioning_id` has a `worker_build_id`, and `binary_checksum` is not
// When this field has a `worker_build_id`, and `binary_checksum` is not
// set, that value should also be considered as the `binary_checksum`.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 5;
temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 5;
}

message PollWorkflowTaskQueueResponse {
Expand Down Expand Up @@ -310,11 +310,12 @@ message RespondWorkflowTaskCompletedRequest {
// Responses to the `queries` field in the task being responded to
map<string, temporal.api.query.v1.WorkflowQueryResult> query_results = 8;
string namespace = 9;
// If using versioning, worker should send the same id here that it used to
// poll for the workflow task.
// When `worker_versioning_id` has a `worker_build_id`, and `binary_checksum` is not
// set, that value should also be considered as the `binary_checksum`.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 10;
// If using versioning, the worker uses this field to indicate what version(s) it used to
// process the task. When this field has a `worker_build_id`, and `binary_checksum` is not set,
// that value should also be considered as the `binary_checksum`. Leaving this field empty when
// replying to a task has had this field previously populated in history in an error, and such
// a completion will be rejected.
temporal.api.common.v1.WorkerVersionStamp worker_version_stamp = 10;
// Protocol messages piggybacking on a WFT as a transport
repeated temporal.api.protocol.v1.Message messages = 11;
// Data the SDK wishes to record for itself, but server need not interpret, and does not
Expand Down Expand Up @@ -359,10 +360,10 @@ message PollActivityTaskQueueRequest {
// The identity of the worker/client
string identity = 3;
temporal.api.taskqueue.v1.TaskQueueMetadata task_queue_metadata = 4;
// If set, the worker is opting in to build-id based versioning and wishes to only
// receive tasks that are considered compatible with the version provided.
// If set, the worker is opting in to versioning and wishes to only
// receive tasks that are considered compatible with the capabilities provided.
// Doing so only makes sense in conjunction with the `UpdateWorkerBuildIdOrdering` API.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 5;
temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 5;
}

message PollActivityTaskQueueResponse {
Expand Down Expand Up @@ -1041,44 +1042,103 @@ message ListSchedulesResponse {
// aip.dev/not-precedent: UpdateWorkerBuildIdOrderingRequest doesn't follow Google API format --)
// (-- api-linter: core::0134::request-resource-required=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrderingRequest RPC doesn't follow Google API format. --)
message UpdateWorkerBuildIdOrderingRequest {
message UpdateWorkerBuildIdCompatabilityRequest {
message AddNewCompatibleVersion {
// A new id to be added to an existing compatible set.
string new_build_id = 1;
// A build id which must already exist in the version sets known by the task queue. The new
// id will be stored in the set containing this id, marking it as compatible with
// the versions within.
string existing_compatible_build_id = 2;
// When set, establishes the compatible set being targeted as the overall default for the
// queue. If a different set was the current default, the targeted set will replace it as
// the new default.
bool make_set_default = 3;
}

string namespace = 1;
// Must be set, the task queue to apply changes to. Because all workers on
// a given task queue must have the same set of workflow & activity
// implementations, there is no reason to specify a task queue type here.
// Must be set, the task queue to apply changes to. Because all workers on a given task queue
// must have the same set of workflow & activity implementations, there is no reason to specify
// a task queue type here.
string task_queue = 2;
// The version id we are targeting.
temporal.api.taskqueue.v1.VersionId version_id = 3;
// When set, indicates that the `version_id` in this message is compatible
// with the one specified in this field. Because compatability should form
// a DAG, any build id can only be the "next compatible" version for one
// other ID of a certain type at a time, and any setting which would create a cycle is invalid.
temporal.api.taskqueue.v1.VersionId previous_compatible = 4;
// When set, establishes the specified `version_id` as the default of it's type
// for the queue. Workers matching it will begin processing new workflow executions.
// The existing default will be marked as a previous incompatible version
// to this one, assuming it is not also in `is_compatible_with`.
bool become_default = 5;
}
message UpdateWorkerBuildIdOrderingResponse {}
oneof operation {
// A new build id. This operation will create a new set which will be the new overall
// default version for the queue, with this id as its only member. This new set is
// incompatible with all previous sets/versions.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: In makes perfect sense here. --)
string add_new_build_id_in_new_default_set = 3;
// Adds a new id to an existing compatible set, see sub-message definition for more.
AddNewCompatibleVersion add_new_compatible_build_id = 4;
// Promote an existing set to be the current default (if it isn't already) by targeting
// an existing build id within it. This field's value is the extant build id.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: Names are hard. --)
string promote_set_by_build_id = 5;
// Promote an existing build id within some set to be the current default for that set.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: Within makes perfect sense here. --)
string promote_build_id_within_set = 6;
}
}
message UpdateWorkerBuildIdCompatabilityResponse {
// The id of the compatible set that the updated version was added to, or exists in. Users don't
// need to understand or care about this value, but it has value for debugging purposes.
string version_set_id = 1;
}

// (-- api-linter: core::0134::request-resource-required=disabled
// aip.dev/not-precedent: GetWorkerBuildIdOrderingRequest RPC doesn't follow Google API format. --)
message GetWorkerBuildIdOrderingRequest {
message GetWorkerBuildIdCompatabilityRequest {
string namespace = 1;
// Must be set, the task queue to interrogate about worker id ordering
string task_queue = 2;
// Limits how deep the returned DAG will go. 1 will return only the
// default build id. A default/0 value will return the entire graph.
int32 max_depth = 3;
}
message GetWorkerBuildIdOrderingResponse {
// The currently established default version
temporal.api.taskqueue.v1.VersionIdNode current_default = 1;
// Other current latest-compatible versions who are not the overall default. These are the
// versions that will be used when generating new tasks by following the graph from the
// version of the last task out to a leaf.
repeated temporal.api.taskqueue.v1.VersionIdNode compatible_leaves = 2;
// Limits how many compatible sets will be returned. Specify 1 to only return the current
// default major version set. 0 returns all sets.
int32 max_sets = 3;
// If set, the response will include information about worker versions which are ready to be
// retired.
bool include_retirement_candidates = 4;
// If set, the response will include information about which versions have open workflows, and
// whether or not there are currently polling workers who are compatible with those versions.
bool include_poller_compatability = 5;
}
message GetWorkerBuildIdCompatabilityResponse {
// Major version sets, in order from oldest to newest. The last element of the list will always
// be the current default major version. IE: New workflows will target the most recent version
// in that version set.
//
// There may be fewer sets returned than exist, if the request chose to limit this response.
repeated temporal.api.taskqueue.v1.CompatibleVersionSet major_version_sets = 1;

message RetirementCandidate {
// The worker build id which is ready for retirement
string build_id = 1;
// If true, there are no open *or* closed workflows, meaning there is no reason at all
// to keep the worker alive, not even to service queries on closed workflows. If not true,
// then there are no open workflows, but some closed ones.
bool all_workflows_are_archived = 2;
// Currently polling workers who match the build id ready for retirement
repeated temporal.api.taskqueue.v1.PollerInfo pollers = 3;
}

// A list of workers who are still live and polling the task queue, but may no longer be needed
// to make progress on open workflows.
repeated RetirementCandidate retirement_candidates = 2;

message VersionsWithCompatiblePollers {
// The latest build id which completed a workflow task on some open workflow
string most_recent_build_id = 1;
// Currently polling workers who are compatible with `most_recent_build_id`.
repeated temporal.api.taskqueue.v1.PollerInfo pollers = 2;
}

// A list of versions and pollers who are capable of processing tasks at that version (if any)
// for which there are currently open workflows.
repeated VersionsWithCompatiblePollers active_versions_and_pollers = 3;
}

// (-- api-linter: core::0134=disabled
Expand Down
14 changes: 8 additions & 6 deletions temporal/api/workflowservice/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -380,16 +380,18 @@ service WorkflowService {
rpc ListSchedules (ListSchedulesRequest) returns (ListSchedulesResponse) {
}

// Allows users to specify a graph of worker build id based versions on a
// per task queue basis. Versions are ordered, and may be either compatible
// with some extant version, or a new incompatible version.
// Allows users to specify sets of worker build id versions on a per task queue basis. Versions
// are ordered, and may be either compatible with some extant version, or a new incompatible
// version, forming sets of ids which are incompatible with each other, but whose contained
// members are compatible with one another.
//
// (-- api-linter: core::0134::response-message-name=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrdering RPC doesn't follow Google API format. --)
// (-- api-linter: core::0134::method-signature=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrdering RPC doesn't follow Google API format. --)
rpc UpdateWorkerBuildIdOrdering (UpdateWorkerBuildIdOrderingRequest) returns (UpdateWorkerBuildIdOrderingResponse) {}
// Fetches the worker build id versioning graph for some task queue.
rpc GetWorkerBuildIdOrdering (GetWorkerBuildIdOrderingRequest) returns (GetWorkerBuildIdOrderingResponse) {}
rpc UpdateWorkerBuildIdCompatability (UpdateWorkerBuildIdCompatabilityRequest) returns (UpdateWorkerBuildIdCompatabilityResponse) {}
// Fetches the worker build id versioning sets for some task queue and related metadata.
rpc GetWorkerBuildIdCompatability (GetWorkerBuildIdCompatabilityRequest) returns (GetWorkerBuildIdCompatabilityResponse) {}

// Invokes the specified update function on user workflow code.
// (-- api-linter: core::0134=disabled
Expand Down

0 comments on commit 6e604a8

Please sign in to comment.