Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor versioning API #237

Merged
merged 10 commits into from
Mar 13, 2023
5 changes: 5 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
version: v1
breaking:
ignore:
# TODO: Remove after PR 237
- temporal/api/taskqueue/v1
- temporal/api/workflowservice/v1
- temporal/api/history/v1
use:
- PACKAGE
lint:
Expand Down
17 changes: 17 additions & 0 deletions temporal/api/common/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,20 @@ message MeteringMetadata {
// aip.dev/not-precedent: Negative values make no sense to represent. --)
uint32 nonfirst_local_activity_execution_attempts = 13;
}

// Identifies the version(s) of a worker that processed a task
message WorkerVersionStamp {
// An opaque whole-worker identifier
string build_id = 1;
// Set if the worker used a dynamically loadable bundle to process
// the task. The bundle could be a WASM blob, JS bundle, etc.
string bundle_id = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is bundle_id included in build_id, i.e. does build_id definitely change if bundle_id changes? the compatible set stuff is all about build ids, not bundle ids at all, so if one build_id can use multiple bundles then it seems like this won't work

Copy link
Member Author

@Sushisource Sushisource Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was that any worker with a given build_id should be able to load up all bundle_ids that any other compatible worker (as defined by build_id compatibility) would also be able to load. Hence by construction it should slot into the idea of build_id compatibility.

To directly answer your question - no, build_id doesn't necessarily change if bundle_id changes. However, from a compat perspective it should be consistent. EX: If workers A and B have the same (or compatible) build_ids, this scenario can happen/work:

  • A takes task one, replies build id foo and bundle_id b_1
  • B takes next task, build id foo.1 and bundle b_2
  • A takes next task, still has build id foo, but maybe doesn't have b_2, but it can download it and it's expected that works, because foo is compat with foo.1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does A know that it should use b_2 at that point? I know this feature isn't fully designed yet, just trying to make sure we won't have to make incompatible changes here later, e.g. needing to have bundle ids also (i.e. the whole WorkerVersionStamp) where this change just has build ids

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because it simply looks at the history which includes the version stamp on the last WFT complete, and downloads that bundle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh duh, I'm thinking too server-centric

}

// Identifies the version(s) that a worker is compatible with when polling or identifying itself
message WorkerVersionCapabilities {
// An opaque whole-worker identifier
string build_id = 1;

// Later, may include info like "I can process WASM and/or JS bundles"
}
7 changes: 4 additions & 3 deletions temporal/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ message WorkflowTaskCompletedEventAttributes {
string identity = 3;
// Binary ID of the worker who completed this task
string binary_checksum = 4;
// ID of the worker who picked up this workflow task, or missing if worker
// is not using versioning.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 5;
// Version info of the worker who processed this workflow task, or missing if worker is not
// using versioning. If present, the `build_id` field within is also used as `binary_checksum`,
// which may be omitted in that case (it may also be populated to preserve compatability).
temporal.api.common.v1.WorkerVersionStamp worker_version = 5;
// Data the SDK wishes to record for itself, but server need not interpret, and does not
// directly impact workflow state.
temporal.api.sdk.v1.WorkflowTaskCompletedMetadata sdk_metadata = 6;
Expand Down
34 changes: 12 additions & 22 deletions temporal/api/taskqueue/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import "google/protobuf/wrappers.proto";
import "dependencies/gogoproto/gogo.proto";

import "temporal/api/enums/v1/task_queue.proto";
import "temporal/api/common/v1/message.proto";

// See https://docs.temporal.io/docs/concepts/task-queues/
message TaskQueue {
Expand Down Expand Up @@ -71,13 +72,12 @@ message TaskQueuePartitionMetadata {
}

message PollerInfo {
// Unix Nano
google.protobuf.Timestamp last_access_time = 1 [(gogoproto.stdtime) = true];
string identity = 2;
double rate_per_second = 3;
// If a worker has specified an ID for use with the worker versioning feature while polling,
// that id must appear here.
VersionId worker_versioning_id = 4;
// If a worker has opted into the worker versioning feature while polling, its capabilities will
// appear here.
temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 4;
}

message StickyExecutionAttributes {
Expand All @@ -87,22 +87,12 @@ message StickyExecutionAttributes {
google.protobuf.Duration schedule_to_start_timeout = 2 [(gogoproto.stdduration) = true];
}

// Used by the worker versioning APIs, represents a node in the version graph for a particular
// task queue
message VersionIdNode {
VersionId version = 1;
// A pointer to the previous version this version is considered to be compatible with
VersionIdNode previous_compatible = 2;
// A pointer to the last incompatible version (previous major version)
VersionIdNode previous_incompatible = 3;
// Used by the worker versioning APIs, represents an ordering of one or more versions which are
// considered to be compatible with each other. Currently the versions are always worker build ids.
message CompatibleVersionSet {
// A unique identifier for this version set. Users don't need to understand or care about this
// value, but it has value for debugging purposes.
string version_set_id = 1;
// All the compatible versions, ordered from oldest to newest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find oldest to newest to be confusing since I can use the API to mark a previous version as the default which would not make it the "newest".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will do that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's what you consider "newest", it doesn't read that way to me.
I'd just add the the "default" will alway be the last version in the list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, within a set, the order never changes. You only mark overall sets as defaults, and the comment for that is clear about it. There's no reason to have some kind of "inner" default, since all the versions are compatible, we always pick the latest one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, so there's no way to revert marking a build ID as "latest" (or default) in the set?

What if I mark a build as compatible in the set and later change my mind? What if the deployment for that build fails?
How do I tell the server to route requests in that set to a previous build?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a promote-within-set op

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that we should have a default attribute on this message to mark the default in the set so we can leave the versions array sorted by insertion time.
Another thing we might want to consider is to store some more information like when was each version added to the set and by who (using identity in the request).

repeated string build_ids = 2;
}

// Used by the worker versioning APIs, represents a specific version of something
// Currently, that's just a whole-worker id. In the future, if we support
// WASM workflow bundle based versioning, for example, then the inside of this
// message may become a oneof of different version types.
message VersionId {
// An opaque whole-worker identifier
string worker_build_id = 1;
}

2 changes: 2 additions & 0 deletions temporal/api/workflow/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ message WorkflowExecutionInfo {
string task_queue = 13;
int64 state_transition_count = 14;
int64 history_size_bytes = 15;
// If set, the most recent worker version stamp that appeared in a workflow task completion
temporal.api.common.v1.WorkerVersionStamp most_recent_worker_version_stamp = 16;
}

message WorkflowExecutionConfig {
Expand Down
144 changes: 102 additions & 42 deletions temporal/api/workflowservice/v1/request_response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ message PollWorkflowTaskQueueRequest {
// Each worker process should provide an ID unique to the specific set of code it is running
// "checksum" in this field name isn't very accurate, it should be though of as an id.
string binary_checksum = 4;
// If set, the worker is opting in to build-id based versioning and wishes to only
// receive tasks that are considered compatible with the version provided.
// If set, the worker is opting in to versioning and wishes to only
// receive tasks that are considered compatible with the version capabilities provided.
// Doing so only makes sense in conjunction with the `UpdateWorkerBuildIdOrdering` API.
// When `worker_versioning_id` has a `worker_build_id`, and `binary_checksum` is not
// When this field has a `worker_build_id`, and `binary_checksum` is not
// set, that value should also be considered as the `binary_checksum`.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 5;
temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there going to be a capability flag for this? how should a versioned worker behave if it connects to a server that doesn't support versioning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is -

bool build_id_based_versioning = 6;

Good question. We hadn't defined that. IMO the right thing to do is for the worker to throw errors on startup (we get capabilities on startup, and if you configured your worker to use versioning but the server can't do it, we can throw then)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a discussion this week we decided we want to be able to turn this on for individual namespaces, so we might need more than capabilities. We could figure that out in another PR

}

message PollWorkflowTaskQueueResponse {
Expand Down Expand Up @@ -310,11 +310,12 @@ message RespondWorkflowTaskCompletedRequest {
// Responses to the `queries` field in the task being responded to
map<string, temporal.api.query.v1.WorkflowQueryResult> query_results = 8;
string namespace = 9;
// If using versioning, worker should send the same id here that it used to
// poll for the workflow task.
// When `worker_versioning_id` has a `worker_build_id`, and `binary_checksum` is not
// set, that value should also be considered as the `binary_checksum`.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 10;
// If using versioning, the worker uses this field to indicate what version(s) it used to
// process the task. When this field has a `worker_build_id`, and `binary_checksum` is not set,
// that value should also be considered as the `binary_checksum`. Leaving this field empty when
// replying to a task has had this field previously populated in history in an error, and such
// a completion will be rejected.
temporal.api.common.v1.WorkerVersionStamp worker_version_stamp = 10;
// Protocol messages piggybacking on a WFT as a transport
repeated temporal.api.protocol.v1.Message messages = 11;
// Data the SDK wishes to record for itself, but server need not interpret, and does not
Expand Down Expand Up @@ -359,10 +360,10 @@ message PollActivityTaskQueueRequest {
// The identity of the worker/client
string identity = 3;
temporal.api.taskqueue.v1.TaskQueueMetadata task_queue_metadata = 4;
// If set, the worker is opting in to build-id based versioning and wishes to only
// receive tasks that are considered compatible with the version provided.
// If set, the worker is opting in to versioning and wishes to only
// receive tasks that are considered compatible with the capabilities provided.
// Doing so only makes sense in conjunction with the `UpdateWorkerBuildIdOrdering` API.
temporal.api.taskqueue.v1.VersionId worker_versioning_id = 5;
temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 5;
}

message PollActivityTaskQueueResponse {
Expand Down Expand Up @@ -1041,44 +1042,103 @@ message ListSchedulesResponse {
// aip.dev/not-precedent: UpdateWorkerBuildIdOrderingRequest doesn't follow Google API format --)
// (-- api-linter: core::0134::request-resource-required=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrderingRequest RPC doesn't follow Google API format. --)
message UpdateWorkerBuildIdOrderingRequest {
message UpdateWorkerBuildIdCompatabilityRequest {
message AddNewCompatibleVersion {
// A new id to be added to an existing compatible set.
string new_build_id = 1;
// A build id which must already exist in the version sets known by the task queue. The new
// id will be stored in the set containing this id, marking it as compatible with
// the versions within.
string existing_compatible_build_id = 2;
// When set, establishes the compatible set being targeted as the overall default for the
// queue. If a different set was the current default, the targeted set will replace it as
// the new default.
bool make_set_default = 3;
}

string namespace = 1;
// Must be set, the task queue to apply changes to. Because all workers on
// a given task queue must have the same set of workflow & activity
// implementations, there is no reason to specify a task queue type here.
// Must be set, the task queue to apply changes to. Because all workers on a given task queue
// must have the same set of workflow & activity implementations, there is no reason to specify
// a task queue type here.
string task_queue = 2;
// The version id we are targeting.
temporal.api.taskqueue.v1.VersionId version_id = 3;
// When set, indicates that the `version_id` in this message is compatible
// with the one specified in this field. Because compatability should form
// a DAG, any build id can only be the "next compatible" version for one
// other ID of a certain type at a time, and any setting which would create a cycle is invalid.
temporal.api.taskqueue.v1.VersionId previous_compatible = 4;
// When set, establishes the specified `version_id` as the default of it's type
// for the queue. Workers matching it will begin processing new workflow executions.
// The existing default will be marked as a previous incompatible version
// to this one, assuming it is not also in `is_compatible_with`.
bool become_default = 5;
}
message UpdateWorkerBuildIdOrderingResponse {}
oneof operation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these are exactly what we need but I feel like I'd have to look at the docs every time to know what each of these operations mean.

This is better IMHO:

message AddNewVersion {
  string version = 1;
  string compatible_with_version = 2;
  bool default_for_queue = 3;
}

message PromoteExistingVersion {
  string version = 1;
  bool default_for_queue = 2;
}

There's one case that we might not want to allow: AddNewVersion(version, default_for_queue=false) but I think the readability makes up for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be optimizing our gRPC API for readability over correctness. We make this easy to use / readable in our clients and tctl. The API should optimize for correctness, so that when we implement those things mistakes are easily prevented. Looking at the docstring is quick and easy in every editing environment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be optimizing for readability wherever possible TBH.
Notice the consistency in my proposed message names, they start with verbs as one might expect for an operation, they follow the same format, etc.
I'd be down to see an alternative proposal that disallows AddNewVersion(version, default_for_queue=false) and is easier to parse.

Copy link
Member Author

@Sushisource Sushisource Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about relative values, and correctness over readability is I think pretty clear.

But, regardless, I'm really not following what's hard to read about this. I could just make the field names slightly longer and that would seem to do it. I think your version is largely more clear to you because you wrote it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say there are 3 issues with your proposed API:

  1. These 2 variants sound the same, existing_version_id_in_set_to_promote and promote_version_id_within_set.
  2. Some variant names sound like operations while others don't (the ones that don't start with a verb).
  3. The term "default" is overloaded, there's a set default and a queue default, anywhere we mention default we should explicitly disambiguate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the field names a bit more clear to address this

// A new build id. This operation will create a new set which will be the new overall
// default version for the queue, with this id as its only member. This new set is
// incompatible with all previous sets/versions.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: In makes perfect sense here. --)
string add_new_build_id_in_new_default_set = 3;
// Adds a new id to an existing compatible set, see sub-message definition for more.
AddNewCompatibleVersion add_new_compatible_build_id = 4;
// Promote an existing set to be the current default (if it isn't already) by targeting
// an existing build id within it. This field's value is the extant build id.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: Names are hard. --)
string promote_set_by_build_id = 5;
// Promote an existing build id within some set to be the current default for that set.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: Within makes perfect sense here. --)
string promote_build_id_within_set = 6;
}
}
message UpdateWorkerBuildIdCompatabilityResponse {
// The id of the compatible set that the updated version was added to, or exists in. Users don't
// need to understand or care about this value, but it has value for debugging purposes.
string version_set_id = 1;
}

// (-- api-linter: core::0134::request-resource-required=disabled
// aip.dev/not-precedent: GetWorkerBuildIdOrderingRequest RPC doesn't follow Google API format. --)
message GetWorkerBuildIdOrderingRequest {
message GetWorkerBuildIdCompatabilityRequest {
string namespace = 1;
// Must be set, the task queue to interrogate about worker id ordering
string task_queue = 2;
// Limits how deep the returned DAG will go. 1 will return only the
// default build id. A default/0 value will return the entire graph.
int32 max_depth = 3;
}
message GetWorkerBuildIdOrderingResponse {
// The currently established default version
temporal.api.taskqueue.v1.VersionIdNode current_default = 1;
// Other current latest-compatible versions who are not the overall default. These are the
// versions that will be used when generating new tasks by following the graph from the
// version of the last task out to a leaf.
repeated temporal.api.taskqueue.v1.VersionIdNode compatible_leaves = 2;
// Limits how many compatible sets will be returned. Specify 1 to only return the current
// default major version set. 0 returns all sets.
int32 max_sets = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit on max versions per set also?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno if it's worth it. The most we would return now is 10k versions, if every set is maxed out, and that's still not a gigantor response.

This mostly exist to just have a "get only the current default" toggle.

// If set, the response will include information about worker versions which are ready to be
// retired.
bool include_retirement_candidates = 4;
// If set, the response will include information about which versions have open workflows, and
// whether or not there are currently polling workers who are compatible with those versions.
bool include_poller_compatability = 5;
}
message GetWorkerBuildIdCompatabilityResponse {
// Major version sets, in order from oldest to newest. The last element of the list will always
// be the current default major version. IE: New workflows will target the most recent version
// in that version set.
//
// There may be fewer sets returned than exist, if the request chose to limit this response.
repeated temporal.api.taskqueue.v1.CompatibleVersionSet major_version_sets = 1;

message RetirementCandidate {
// The worker build id which is ready for retirement
string build_id = 1;
// If true, there are no open *or* closed workflows, meaning there is no reason at all
// to keep the worker alive, not even to service queries on closed workflows. If not true,
// then there are no open workflows, but some closed ones.
bool all_workflows_are_archived = 2;
// Currently polling workers who match the build id ready for retirement
repeated temporal.api.taskqueue.v1.PollerInfo pollers = 3;
}

// A list of workers who are still live and polling the task queue, but may no longer be needed
// to make progress on open workflows.
repeated RetirementCandidate retirement_candidates = 2;

message VersionsWithCompatiblePollers {
// The latest build id which completed a workflow task on some open workflow
string most_recent_build_id = 1;
// Currently polling workers who are compatible with `most_recent_build_id`.
repeated temporal.api.taskqueue.v1.PollerInfo pollers = 2;
}

// A list of versions and pollers who are capable of processing tasks at that version (if any)
// for which there are currently open workflows.
repeated VersionsWithCompatiblePollers active_versions_and_pollers = 3;
}

// (-- api-linter: core::0134=disabled
Expand Down
14 changes: 8 additions & 6 deletions temporal/api/workflowservice/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -380,16 +380,18 @@ service WorkflowService {
rpc ListSchedules (ListSchedulesRequest) returns (ListSchedulesResponse) {
}

// Allows users to specify a graph of worker build id based versions on a
// per task queue basis. Versions are ordered, and may be either compatible
// with some extant version, or a new incompatible version.
// Allows users to specify sets of worker build id versions on a per task queue basis. Versions
// are ordered, and may be either compatible with some extant version, or a new incompatible
// version, forming sets of ids which are incompatible with each other, but whose contained
// members are compatible with one another.
//
// (-- api-linter: core::0134::response-message-name=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrdering RPC doesn't follow Google API format. --)
// (-- api-linter: core::0134::method-signature=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrdering RPC doesn't follow Google API format. --)
rpc UpdateWorkerBuildIdOrdering (UpdateWorkerBuildIdOrderingRequest) returns (UpdateWorkerBuildIdOrderingResponse) {}
// Fetches the worker build id versioning graph for some task queue.
rpc GetWorkerBuildIdOrdering (GetWorkerBuildIdOrderingRequest) returns (GetWorkerBuildIdOrderingResponse) {}
rpc UpdateWorkerBuildIdCompatability (UpdateWorkerBuildIdCompatabilityRequest) returns (UpdateWorkerBuildIdCompatabilityResponse) {}
// Fetches the worker build id versioning sets for some task queue and related metadata.
rpc GetWorkerBuildIdCompatability (GetWorkerBuildIdCompatabilityRequest) returns (GetWorkerBuildIdCompatabilityResponse) {}

// Invokes the specified update function on user workflow code.
// (-- api-linter: core::0134=disabled
Expand Down