Skip to content

Commit

Permalink
Proto and persistence definitions for replicated versioning (#4172)
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy authored Apr 20, 2023
1 parent 74d3f15 commit 57b6718
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 33 deletions.
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ require (
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk v1.13.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
go.temporal.io/api v1.19.1-0.20230322213042-07fb271d475b
go.temporal.io/sdk v1.22.1
go.temporal.io/api v1.19.1-0.20230414223259-557a75eaec60
go.temporal.io/sdk v1.22.2-0.20230414224335-da95b25113ee
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.10.0
go.uber.org/fx v1.19.1
go.uber.org/multierr v1.9.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/oauth2 v0.5.0
golang.org/x/oauth2 v0.6.0
golang.org/x/sync v0.1.0
golang.org/x/time v0.3.0
google.golang.org/api v0.110.0
google.golang.org/api v0.114.0
google.golang.org/grpc v1.54.0
google.golang.org/grpc/examples v0.0.0-20230216223317-abff344ead8f
gopkg.in/square/go-jose.v2 v2.6.0
Expand All @@ -66,9 +66,9 @@ require (

require (
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.18.0 // indirect
cloud.google.com/go/compute v1.19.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.12.0 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
github.com/apache/thrift v0.18.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand All @@ -86,7 +86,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
Expand Down Expand Up @@ -123,13 +123,13 @@ require (
go.uber.org/dig v1.16.1 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230322174352-cde4c949918d // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
lukechampine.com/uint128 v1.2.0 // indirect
Expand Down
124 changes: 113 additions & 11 deletions go.sum

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions proto/internal/temporal/server/api/clock/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,17 @@ message VectorClock {
int64 clock = 2;
int64 cluster_id = 3;
}

// A Hybrid Logical Clock timestamp.
// Guarantees strict total ordering for conflict resolution purposes.
message HybridLogicalClock {
// Wall clock - A single time source MUST guarantee that 2 consecutive timestamps are monotonically non-decreasing.
// e.g. by storing the last wall clock and returning max(gettimeofday(), lastWallClock).
int64 wall_clock = 1;
// Incremental sequence that is reset every time the system's wallclock moves forward.
// Ensures the clock generates monotonically increasing timestamps.
int32 version = 2;
// The cluster version ID as described in the XDC docs - used as a tie breaker.
// See: https://github.com/uber/cadence/blob/master/docs/design/2290-cadence-ndc.md
int64 cluster_id = 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2020 Temporal Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

syntax = "proto3";

package temporal.server.api.persistence.v1;
option go_package = "go.temporal.io/server/api/persistence/v1;persistence";

import "temporal/server/api/clock/v1/message.proto";

// BuildID is an identifier with a timestamped status used to identify workers for task queue versioning purposes.
message BuildID {
enum State {
STATE_UNSPECIFIED = 0;
STATE_ACTIVE = 1;
STATE_DELETED = 2;
};

string id = 1;
State state = 2;
// HLC timestamp representing when the state was updated or the when build ID was originally inserted.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock state_update_timestamp = 3;
}

// An internal represenation of temporal.api.taskqueue.v1.CompatibleVersionSet
message CompatibleVersionSet {
// Set IDs are used internally by matching.
// A set typically has one set ID and extra care is taken to enforce this.
// In split brain scenarios, there may be conflicting concurrent writes to the task queue versioning data, in which
// case a set might end up with more than one ID.
repeated string set_ids = 1;
// All the compatible versions, unordered except for the last element, which is considered the set "default".
repeated BuildID build_ids = 2;
// HLC timestamp representing when the set default was updated. Different from BuildID.state_update_timestamp, which
// refers to the build ID status.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock default_update_timestamp = 3;
}

// Holds all the data related to worker versioning for a task queue.
// Backwards-incompatible changes cannot be made, as this would make existing stored data unreadable.
message VersioningData {
// All the incompatible version sets, unordered except for the last element, which is considered the set "default".
repeated CompatibleVersionSet version_sets = 1;
// HLC timestamp representing when the default set was last updated or established.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock default_update_timestamp = 2;
}

// Container for all persistent user provided data for a task queue.
// Task queue as a named concept here is close to how users interpret them, rather than relating to some specific type
// (workflow vs activity, etc) and thus, as a consequence, any data that applies to a specific type (say, activity rate
// limiting) should be defined as such within this structure.
// This data must all fit in a single DB column and is kept cached in-memory, take extra care to ensure data added here
// has reasonable size limits imposed on it.
message TaskQueueUserData {
// The last recorded cluster-local Hybrid Logical Clock timestamp for _this_ task queue.
// Updated whenever user data is directly updated due to a user action but not when applying replication events.
// The clock is referenced when new timestamps are generated to ensure it produces monotonically increasing
// timestamps.
temporal.server.api.clock.v1.HybridLogicalClock clock = 1;
VersioningData versioning_data = 2;

// For future use: description, rate limits, manual partition control, etc...
}
8 changes: 0 additions & 8 deletions proto/internal/temporal/server/api/persistence/v1/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import "google/protobuf/timestamp.proto";
import "dependencies/gogoproto/gogo.proto";

import "temporal/api/enums/v1/task_queue.proto";
import "temporal/api/taskqueue/v1/message.proto";

import "temporal/server/api/clock/v1/message.proto";

Expand Down Expand Up @@ -57,13 +56,6 @@ message TaskQueueInfo {
int64 ack_level = 5;
google.protobuf.Timestamp expiry_time = 6 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp last_update_time = 7 [(gogoproto.stdtime) = true];
VersioningData versioning_data = 8;
}

// Holds all the data related to worker versioning for a task queue.
// Backwards-incompatible changes cannot be made, as this would make existing stored data unreadable
message VersioningData {
repeated temporal.api.taskqueue.v1.CompatibleVersionSet version_sets = 1;
}

message TaskKey {
Expand Down
16 changes: 16 additions & 0 deletions schema/cassandra/temporal/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,22 @@ CREATE TABLE tasks (
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};

-- Stores task queue information such as user provided versioning data
CREATE TABLE task_queue_user_data (
namespace_id uuid,
task_queue_name text,
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
version bigint, -- Version of this row, used for optimistic concurrency
-- task_queue_name is not a part of the parititioning key to allow cheaply iterating all task queues in a single
-- namespace. Access to this table should be infrequent enough that a single partition per namespace can be used.
-- Note that this imposes a limit on total task queue user data within one namespace (see the relevant single
-- partition Cassandra limits).
PRIMARY KEY ((namespace_id), task_queue_name)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};

-- this table is only used for storage of mapping of namespace uuid to namespace name
CREATE TABLE namespaces_by_id (
id uuid,
Expand Down
11 changes: 11 additions & 0 deletions schema/mysql/v57/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ CREATE TABLE tasks (
PRIMARY KEY (range_hash, task_queue_id, task_id)
);

-- Stores ephemeral task queue information such as ack levels and expiry times
CREATE TABLE task_queues (
range_hash INT UNSIGNED NOT NULL,
task_queue_id VARBINARY(272) NOT NULL,
Expand All @@ -89,6 +90,16 @@ CREATE TABLE task_queues (
PRIMARY KEY (range_hash, task_queue_id)
);

-- Stores task queue information such as user provided versioning data
CREATE TABLE task_queue_user_data (
namespace_id BINARY(16) NOT NULL,
task_queue_name VARCHAR(255) NOT NULL,
data MEDIUMBLOB NOT NULL, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding VARCHAR(16) NOT NULL, -- Encoding type used for serialization, in practice this should always be proto3
version BIGINT NOT NULL, -- Version of this row, used for optimistic concurrency
PRIMARY KEY (namespace_id, task_queue_name)
);

CREATE TABLE history_immediate_tasks(
shard_id INT NOT NULL,
category_id INT NOT NULL,
Expand Down
11 changes: 11 additions & 0 deletions schema/mysql/v8/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ CREATE TABLE tasks (
PRIMARY KEY (range_hash, task_queue_id, task_id)
);

-- Stores ephemeral task queue information such as ack levels and expiry times
CREATE TABLE task_queues (
range_hash INT UNSIGNED NOT NULL,
task_queue_id VARBINARY(272) NOT NULL,
Expand All @@ -89,6 +90,16 @@ CREATE TABLE task_queues (
PRIMARY KEY (range_hash, task_queue_id)
);

-- Stores task queue information such as user provided versioning data
CREATE TABLE task_queue_user_data (
namespace_id BINARY(16) NOT NULL,
task_queue_name VARCHAR(255) NOT NULL,
data MEDIUMBLOB NOT NULL, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding VARCHAR(16) NOT NULL, -- Encoding type used for serialization, in practice this should always be proto3
version BIGINT NOT NULL, -- Version of this row, used for optimistic concurrency
PRIMARY KEY (namespace_id, task_queue_name)
);

CREATE TABLE history_immediate_tasks(
shard_id INT NOT NULL,
category_id INT NOT NULL,
Expand Down
13 changes: 12 additions & 1 deletion schema/postgresql/v12/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ CREATE TABLE tasks (
PRIMARY KEY (range_hash, task_queue_id, task_id)
);

-- Stores ephemeral task queue information such as ack levels and expiry times
CREATE TABLE task_queues (
range_hash BIGINT NOT NULL,
task_queue_id BYTEA NOT NULL,
Expand All @@ -89,6 +90,16 @@ CREATE TABLE task_queues (
PRIMARY KEY (range_hash, task_queue_id)
);

-- Stores task queue information such as user provided versioning data
CREATE TABLE task_queue_user_data (
namespace_id BYTEA NOT NULL,
task_queue_name VARCHAR(255) NOT NULL,
data BYTEA NOT NULL, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding VARCHAR(16) NOT NULL, -- Encoding type used for serialization, in practice this should always be proto3
version BIGINT NOT NULL, -- Version of this row, used for optimistic concurrency
PRIMARY KEY (namespace_id, task_queue_name)
);

CREATE TABLE history_immediate_tasks(
shard_id INTEGER NOT NULL,
category_id INTEGER NOT NULL,
Expand Down Expand Up @@ -295,4 +306,4 @@ CREATE UNIQUE INDEX cm_idx_rolehost ON cluster_membership (role, host_id);
CREATE INDEX cm_idx_rolelasthb ON cluster_membership (role, last_heartbeat);
CREATE INDEX cm_idx_rpchost ON cluster_membership (rpc_address, role);
CREATE INDEX cm_idx_lasthb ON cluster_membership (last_heartbeat);
CREATE INDEX cm_idx_recordexpiry ON cluster_membership (record_expiry);
CREATE INDEX cm_idx_recordexpiry ON cluster_membership (record_expiry);
13 changes: 12 additions & 1 deletion schema/postgresql/v96/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ CREATE TABLE tasks (
PRIMARY KEY (range_hash, task_queue_id, task_id)
);

-- Stores ephemeral task queue information such as ack levels and expiry times
CREATE TABLE task_queues (
range_hash BIGINT NOT NULL,
task_queue_id BYTEA NOT NULL,
Expand All @@ -89,6 +90,16 @@ CREATE TABLE task_queues (
PRIMARY KEY (range_hash, task_queue_id)
);

-- Stores task queue information such as user provided versioning data
CREATE TABLE task_queue_user_data (
namespace_id BYTEA NOT NULL,
task_queue_name VARCHAR(255) NOT NULL,
data BYTEA NOT NULL, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding VARCHAR(16) NOT NULL, -- Encoding type used for serialization, in practice this should always be proto3
version BIGINT NOT NULL, -- Version of this row, used for optimistic concurrency
PRIMARY KEY (namespace_id, task_queue_name)
);

CREATE TABLE history_immediate_tasks(
shard_id INTEGER NOT NULL,
category_id INTEGER NOT NULL,
Expand Down Expand Up @@ -295,4 +306,4 @@ CREATE UNIQUE INDEX cm_idx_rolehost ON cluster_membership (role, host_id);
CREATE INDEX cm_idx_rolelasthb ON cluster_membership (role, last_heartbeat);
CREATE INDEX cm_idx_rpchost ON cluster_membership (rpc_address, role);
CREATE INDEX cm_idx_lasthb ON cluster_membership (last_heartbeat);
CREATE INDEX cm_idx_recordexpiry ON cluster_membership (record_expiry);
CREATE INDEX cm_idx_recordexpiry ON cluster_membership (record_expiry);
11 changes: 11 additions & 0 deletions schema/sqlite/v3/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ CREATE TABLE tasks (
PRIMARY KEY (range_hash, task_queue_id, task_id)
);

-- Stores ephemeral task queue information such as ack levels and expiry times
CREATE TABLE task_queues (
range_hash INT UNSIGNED NOT NULL,
task_queue_id VARBINARY(272) NOT NULL,
Expand All @@ -88,6 +89,16 @@ CREATE TABLE task_queues (
PRIMARY KEY (range_hash, task_queue_id)
);

-- Stores task queue information such as user provided versioning data
CREATE TABLE task_queue_user_data (
namespace_id BINARY(16) NOT NULL,
task_queue_name VARCHAR(255) NOT NULL,
data MEDIUMBLOB NOT NULL, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding VARCHAR(16) NOT NULL, -- Encoding type used for serialization, in practice this should always be proto3
version BIGINT NOT NULL, -- Version of this row, used for optimistic concurrency
PRIMARY KEY (namespace_id, task_queue_name)
);

CREATE TABLE history_immediate_tasks(
shard_id INT NOT NULL,
category_id INT NOT NULL,
Expand Down

0 comments on commit 57b6718

Please sign in to comment.