Skip to content

Commit

Permalink
Worker versioning - add BuildIDs search attribute (#4284)
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy authored and dnr committed May 11, 2023
1 parent bf12d79 commit fa33616
Show file tree
Hide file tree
Showing 37 changed files with 687 additions and 294 deletions.
160 changes: 80 additions & 80 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ const (
// queue. Update requests which would cause the versioning data to exceed this number will fail with a
// FailedPrecondition error.
VersionCompatibleSetLimitPerQueue = "limit.versionCompatibleSetLimitPerQueue"
// VersionBuildIDLimitPerQueue is the max number of build IDs allowed to be defined in the versioning data for a
// VersionBuildIdLimitPerQueue is the max number of build IDs allowed to be defined in the versioning data for a
// task queue. Update requests which would cause the versioning data to exceed this number will fail with a
// FailedPrecondition error.
VersionBuildIDLimitPerQueue = "limit.versionBuildIDLimitPerQueue"
VersionBuildIdLimitPerQueue = "limit.versionBuildIdLimitPerQueue"

// keys for frontend

Expand Down Expand Up @@ -608,6 +608,8 @@ const (
DefaultWorkflowRetryPolicy = "history.defaultWorkflowRetryPolicy"
// HistoryMaxAutoResetPoints is the key for max number of auto reset points stored in mutableState
HistoryMaxAutoResetPoints = "history.historyMaxAutoResetPoints"
// HistoryMaxTrackedBuildIds indicates the max number of build IDs to store in the BuildIds search attribute
HistoryMaxTrackedBuildIds = "history.maxTrackedBuildIds"
// EnableParentClosePolicy whether to ParentClosePolicy
EnableParentClosePolicy = "history.enableParentClosePolicy"
// ParentClosePolicyThreshold decides that parent close policy will be processed by sys workers(if enabled) if
Expand Down
2 changes: 2 additions & 0 deletions common/searchattribute/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
StateTransitionCount = "StateTransitionCount"
TemporalChangeVersion = "TemporalChangeVersion"
BinaryChecksums = "BinaryChecksums"
BuildIds = "BuildIds"
BatcherNamespace = "BatcherNamespace"
BatcherUser = "BatcherUser"
HistorySizeBytes = "HistorySizeBytes"
Expand Down Expand Up @@ -88,6 +89,7 @@ var (
predefined = map[string]enumspb.IndexedValueType{
TemporalChangeVersion: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
BinaryChecksums: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
BuildIds: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
BatcherNamespace: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BatcherUser: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
TemporalScheduledStartTime: enumspb.INDEXED_VALUE_TYPE_DATETIME,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ require (
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk v1.13.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0
go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc
go.temporal.io/api v1.19.1-0.20230505041445-71aa6a37fc2e
go.temporal.io/sdk v1.22.2-0.20230505041526-809dc9f34e08
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.10.0
go.uber.org/automaxprocs v1.5.2
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1117,11 +1117,10 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.temporal.io/api v1.19.1-0.20230503164115-e6a655167ace/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8=
go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0 h1:C2zhJnL7FvHT7FrTCfzk7B+Ra67ZYvmLp1YISHCsLos=
go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8=
go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc h1:BABNHYopDR0C/9SF/sLI/Ktbt8JBZ2uqOtpRlAAcFZY=
go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc/go.mod h1:mOHv2LGMQ3NHeCixq42+7041i7hayymv/Q9C1BobtRE=
go.temporal.io/api v1.19.1-0.20230505041445-71aa6a37fc2e h1:axHivcPAaz72OzVsjkEeH0nEZb3DGsqAhtzmWHjv68Y=
go.temporal.io/api v1.19.1-0.20230505041445-71aa6a37fc2e/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8=
go.temporal.io/sdk v1.22.2-0.20230505041526-809dc9f34e08 h1:nXYZBaIqZeHSU41DBi0p0Tx0N7HOS61C/H9cErCY52k=
go.temporal.io/sdk v1.22.2-0.20230505041526-809dc9f34e08/go.mod h1:5w9eNoNN6tHBJikoA1fJ/DBWbs3YrmjA9LxZ2h1swxc=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ option go_package = "go.temporal.io/server/api/persistence/v1;persistence";

import "temporal/server/api/clock/v1/message.proto";

// BuildID is an identifier with a timestamped status used to identify workers for task queue versioning purposes.
message BuildID {
// BuildId is an identifier with a timestamped status used to identify workers for task queue versioning purposes.
message BuildId {
enum State {
STATE_UNSPECIFIED = 0;
STATE_ACTIVE = 1;
Expand All @@ -49,8 +49,8 @@ message CompatibleVersionSet {
// case a set might end up with more than one ID.
repeated string set_ids = 1;
// All the compatible versions, unordered except for the last element, which is considered the set "default".
repeated BuildID build_ids = 2;
// HLC timestamp representing when the set default was updated. Different from BuildID.state_update_timestamp, which
repeated BuildId build_ids = 2;
// HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which
// refers to the build ID status.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
Expand Down
2 changes: 1 addition & 1 deletion schema/elasticsearch/visibility/index_template_v7.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{
"order": 0,
"index_patterns": ["temporal_visibility_v1*"],
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "0",
"auto_expand_replicas": "0-2",
"search.idle.after": "365d",
"sort.field": ["CloseTime", "StartTime", "RunId"],
"sort.order": ["desc", "desc", "desc"],
"sort.missing": ["_first", "_first", "_first"]
}
},
"mappings": {
"dynamic": "false",
"properties": {
"NamespaceId": {
"type": "keyword"
},
"TemporalNamespaceDivision": {
"type": "keyword"
},
"WorkflowId": {
"type": "keyword"
},
"RunId": {
"type": "keyword"
},
"WorkflowType": {
"type": "keyword"
},
"StartTime": {
"type": "date_nanos"
},
"ExecutionTime": {
"type": "date_nanos"
},
"CloseTime": {
"type": "date_nanos"
},
"ExecutionDuration": {
"type": "long"
},
"ExecutionStatus": {
"type": "keyword"
},
"TaskQueue": {
"type": "keyword"
},
"TemporalChangeVersion": {
"type": "keyword"
},
"BatcherNamespace": {
"type": "keyword"
},
"BatcherUser": {
"type": "keyword"
},
"BinaryChecksums": {
"type": "keyword"
},
"HistoryLength": {
"type": "long"
},
"StateTransitionCount": {
"type": "long"
},
"TemporalScheduledStartTime": {
"type": "date_nanos"
},
"TemporalScheduledById": {
"type": "keyword"
},
"TemporalSchedulePaused": {
"type": "boolean"
},
"HistorySizeBytes": {
"type": "long"
},
"BuildIds": {
"type": "keyword"
}
}
},
"aliases": {}
}
54 changes: 54 additions & 0 deletions schema/elasticsearch/visibility/versioned/v5/upgrade.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash

set -eu -o pipefail

# Prerequisites:
# - jq
# - curl

# Input parameters.
: "${ES_SCHEME:=http}"
: "${ES_SERVER:=127.0.0.1}"
: "${ES_PORT:=9200}"
: "${ES_USER:=}"
: "${ES_PWD:=}"
: "${ES_VERSION:=v7}"
: "${ES_VIS_INDEX_V1:=temporal_visibility_v1_dev}"
: "${AUTO_CONFIRM:=}"
: "${SLICES_COUNT:=auto}"

es_endpoint="${ES_SCHEME}://${ES_SERVER}:${ES_PORT}"

echo "=== Step 0. Sanity check if Elasticsearch index is accessible ==="

if ! curl --silent --fail --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/${ES_VIS_INDEX_V1}/_stats/docs" --write-out "\n"; then
echo "Elasticsearch index ${ES_VIS_INDEX_V1} is not accessible at ${es_endpoint}."
exit 1
fi

echo "=== Step 1. Add new builtin search attributes ==="

new_mapping='
{
"properties": {
"BuildIds": {
"type": "keyword"
}
}
}
'

if [ -z "${AUTO_CONFIRM}" ]; then
read -p "Add new builtin search attributes to the index ${ES_VIS_INDEX_V1}? (N/y)" -n 1 -r
echo
else
REPLY="y"
fi
if [ "${REPLY}" = "y" ]; then
curl --silent --fail --user "${ES_USER}":"${ES_PWD}" -X PUT "${es_endpoint}/${ES_VIS_INDEX_V1}/_mapping" -H "Content-Type: application/json" --data-binary "$new_mapping" | jq
# Wait for mapping changes to go through.
until curl --silent --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/_cluster/health/${ES_VIS_INDEX_V1}" | jq --exit-status '.status=="green" | .'; do
echo "Waiting for Elasticsearch index ${ES_VIS_INDEX_V1} become green."
sleep 1
done
fi
2 changes: 2 additions & 0 deletions schema/mysql/v8/visibility/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ CREATE TABLE executions_visibility (
TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>"$.TemporalScheduledById"),
TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS (search_attributes->"$.TemporalSchedulePaused"),
TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>"$.TemporalNamespaceDivision"),
BuildIds JSON GENERATED ALWAYS AS (search_attributes->"$.BuildIds"),

PRIMARY KEY (namespace_id, run_id)
);
Expand All @@ -53,6 +54,7 @@ CREATE INDEX by_task_queue ON executions_visibility (namespace_id, task
-- Indexes for the predefined search attributes
CREATE INDEX by_temporal_change_version ON executions_visibility (namespace_id, (CAST(TemporalChangeVersion AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_binary_checksums ON executions_visibility (namespace_id, (CAST(BinaryChecksums AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_build_ids ON executions_visibility (namespace_id, (CAST(BuildIds AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_batcher_user ON executions_visibility (namespace_id, BatcherUser, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_start_time ON executions_visibility (namespace_id, TemporalScheduledStartTime, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_by_id ON executions_visibility (namespace_id, TemporalScheduledById, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE executions_visibility ADD COLUMN BuildIds JSON GENERATED ALWAYS AS (search_attributes->'BuildIds');
CREATE INDEX by_build_ids ON executions_visibility (namespace_id, (CAST(BuildIds AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
5 changes: 3 additions & 2 deletions schema/mysql/v8/visibility/versioned/v1.3/manifest.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"CurrVersion": "1.3",
"MinCompatibleVersion": "0.1",
"Description": "add history size bytes",
"Description": "add history size bytes and build IDs visibility columns and indices",
"SchemaUpdateCqlFiles": [
"add_history_size_bytes.sql"
"add_history_size_bytes.sql",
"add_build_ids_search_attribute.sql"
]
}
2 changes: 2 additions & 0 deletions schema/postgresql/v12/visibility/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ CREATE TABLE executions_visibility (
TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>'TemporalScheduledById') STORED,
TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS ((search_attributes->'TemporalSchedulePaused')::boolean) STORED,
TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>'TemporalNamespaceDivision') STORED,
BuildIds JSONB GENERATED ALWAYS AS (search_attributes->'BuildIds') STORED,

-- Pre-allocated custom search attributes
Bool01 BOOLEAN GENERATED ALWAYS AS ((search_attributes->'Bool01')::boolean) STORED,
Expand Down Expand Up @@ -82,6 +83,7 @@ CREATE INDEX by_task_queue ON executions_visibility (namespace_id, task
-- Indexes for the predefined search attributes
CREATE INDEX by_temporal_change_version ON executions_visibility USING GIN (namespace_id, TemporalChangeVersion jsonb_path_ops);
CREATE INDEX by_binary_checksums ON executions_visibility USING GIN (namespace_id, BinaryChecksums jsonb_path_ops);
CREATE INDEX by_build_ids ON executions_visibility USING GIN (namespace_id, BuildIds jsonb_path_ops);
CREATE INDEX by_batcher_user ON executions_visibility (namespace_id, BatcherUser, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_start_time ON executions_visibility (namespace_id, TemporalScheduledStartTime, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_by_id ON executions_visibility (namespace_id, TemporalScheduledById, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE executions_visibility ADD COLUMN BuildIds JSONB GENERATED ALWAYS AS (search_attributes->'BuildIds') STORED;
CREATE INDEX by_build_ids ON executions_visibility USING GIN (namespace_id, BuildIds jsonb_path_ops);
5 changes: 3 additions & 2 deletions schema/postgresql/v12/visibility/versioned/v1.3/manifest.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"CurrVersion": "1.3",
"MinCompatibleVersion": "0.1",
"Description": "add history size bytes",
"Description": "add history size bytes and build IDs visibility columns and indices",
"SchemaUpdateCqlFiles": [
"add_history_size_bytes.sql"
"add_history_size_bytes.sql",
"add_build_ids_search_attribute.sql"
]
}
10 changes: 10 additions & 0 deletions schema/sqlite/v3/visibility/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ CREATE TABLE executions_visibility (
TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalScheduledById")),
TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalSchedulePaused")),
TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalNamespaceDivision")),
BuildIds TEXT GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.BuildIds")) STORED,

-- Pre-allocated custom search attributes
Bool01 BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.Bool01")),
Expand Down Expand Up @@ -114,6 +115,7 @@ CREATE VIRTUAL TABLE executions_visibility_fts_text USING fts5 (
CREATE VIRTUAL TABLE executions_visibility_fts_keyword_list USING fts5 (
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03,
Expand All @@ -140,13 +142,15 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
) VALUES (
NEW.rowid,
NEW.TemporalChangeVersion,
NEW.BinaryChecksums,
NEW.BuildIds,
NEW.KeywordList01,
NEW.KeywordList02,
NEW.KeywordList03
Expand Down Expand Up @@ -175,6 +179,7 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
Expand All @@ -183,6 +188,7 @@ BEGIN
OLD.rowid,
OLD.TemporalChangeVersion,
OLD.BinaryChecksums,
OLD.BuildIds,
OLD.KeywordList01,
OLD.KeywordList02,
OLD.KeywordList03
Expand Down Expand Up @@ -222,6 +228,7 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
Expand All @@ -230,6 +237,7 @@ BEGIN
OLD.rowid,
OLD.TemporalChangeVersion,
OLD.BinaryChecksums,
OLD.BuildIds,
OLD.KeywordList01,
OLD.KeywordList02,
OLD.KeywordList03
Expand All @@ -238,13 +246,15 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
) VALUES (
NEW.rowid,
NEW.TemporalChangeVersion,
NEW.BinaryChecksums,
NEW.BuildIds,
NEW.KeywordList01,
NEW.KeywordList02,
NEW.KeywordList03
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ var (
errCronAndStartDelaySet = serviceerror.NewInvalidArgument("CronSchedule and WorkflowStartDelay may not be used together.")
errInvalidWorkflowStartDelaySeconds = serviceerror.NewInvalidArgument("An invalid WorkflowStartDelaySeconds is set on request.")
errRaceConditionAddingSearchAttributes = serviceerror.NewUnavailable("Generated search attributes mapping unavailble.")
errUseVersioningWithoutBuildID = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.")

errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.")
errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.")
Expand Down
Loading

0 comments on commit fa33616

Please sign in to comment.