Skip to content

Commit

Permalink
Address David's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed May 26, 2023
1 parent 00821c1 commit d844235
Show file tree
Hide file tree
Showing 18 changed files with 127 additions and 137 deletions.
2 changes: 1 addition & 1 deletion cmd/tools/rpcwrappers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func makeGetMatchingClient(reqType reflect.Type) string {
switch t.Name() {
case "GetBuildIdTaskQueueMappingRequest":
// Pick a random node for this request, it's not associated with a specific task queue.
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%%s\", rand.Int())}"
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%s\", rand.Int())}"
tqtPath = "enumspb.TASK_QUEUE_TYPE_UNSPECIFIED"
return fmt.Sprintf("client, err := c.getClientForTaskqueue(%s, %s, %s)", nsIDPath, tqPath, tqtPath)
case "UpdateTaskQueueUserDataRequest",
Expand Down
20 changes: 10 additions & 10 deletions common/persistence/cassandra/matching_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,30 +134,30 @@ const (

templateGetTaskQueueUserDataQuery = `SELECT data, data_encoding, version
FROM task_queue_user_data
WHERE namespace_id = ? AND build_id_if_row_is_an_index = ''
WHERE namespace_id = ? AND build_id = ''
AND task_queue_name = ?`

templateUpdateTaskQueueUserDataQuery = `UPDATE task_queue_user_data SET
data = ?,
data_encoding = ?,
version = ?
WHERE namespace_id = ?
AND build_id_if_row_is_an_index = ''
AND build_id = ''
AND task_queue_name = ?
IF version = ?`

templateInsertTaskQueueUserDataQuery = `INSERT INTO task_queue_user_data
(namespace_id, build_id_if_row_is_an_index, task_queue_name, data, data_encoding, version) VALUES
(? , '' , ? , ? , ? , 1 ) IF NOT EXISTS`
(namespace_id, build_id, task_queue_name, data, data_encoding, version) VALUES
(? , '' , ? , ? , ? , 1 ) IF NOT EXISTS`

templateInsertBuildIdTaskQueueMappingQuery = `INSERT INTO task_queue_user_data
(namespace_id, build_id_if_row_is_an_index, task_queue_name) VALUES
(? , ? , ?)`
(namespace_id, build_id, task_queue_name) VALUES
(? , ? , ?)`
templateDeleteBuildIdTaskQueueMappingQuery = `DELETE FROM task_queue_user_data
WHERE namespace_id = ? AND build_id_if_row_is_an_index = ? AND task_queue_name = ?`
templateListTaskQueueUserDataQuery = `SELECT task_queue_name, data, data_encoding FROM task_queue_user_data WHERE namespace_id = ? AND build_id_if_row_is_an_index = ''`
templateListTaskQueueNamesByBuildIdQuery = `SELECT task_queue_name FROM task_queue_user_data WHERE namespace_id = ? AND build_id_if_row_is_an_index = ?`
templateCountTaskQueueByBuildIdQuery = `SELECT COUNT(*) FROM task_queue_user_data WHERE namespace_id = ? AND build_id_if_row_is_an_index = ?`
WHERE namespace_id = ? AND build_id = ? AND task_queue_name = ?`
templateListTaskQueueUserDataQuery = `SELECT task_queue_name, data, data_encoding FROM task_queue_user_data WHERE namespace_id = ? AND build_id = ''`
templateListTaskQueueNamesByBuildIdQuery = `SELECT task_queue_name FROM task_queue_user_data WHERE namespace_id = ? AND build_id = ?`
templateCountTaskQueueByBuildIdQuery = `SELECT COUNT(*) FROM task_queue_user_data WHERE namespace_id = ? AND build_id = ?`

// Not much of a need to make this configurable, we're just reading some strings
listTaskQueueNamesByBuildIdPageSize = 100
Expand Down
5 changes: 4 additions & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,10 @@ type (
BuildID string
}

CountTaskQueuesByBuildIdRequest = GetTaskQueuesByBuildIdRequest
CountTaskQueuesByBuildIdRequest struct {
NamespaceID string
BuildID string
}

// ListTaskQueueRequest contains the request params needed to invoke ListTaskQueue API
ListTaskQueueRequest struct {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type (
UpdateTaskQueueUserData(ctx context.Context, request *InternalUpdateTaskQueueUserDataRequest) error
ListTaskQueueUserDataEntries(ctx context.Context, request *ListTaskQueueUserDataEntriesRequest) (*InternalListTaskQueueUserDataEntriesResponse, error)
GetTaskQueuesByBuildId(ctx context.Context, request *GetTaskQueuesByBuildIdRequest) ([]string, error)
CountTaskQueuesByBuildId(ctx context.Context, request *GetTaskQueuesByBuildIdRequest) (int, error)
CountTaskQueuesByBuildId(ctx context.Context, request *CountTaskQueuesByBuildIdRequest) (int, error)
}
// MetadataStore is a lower level of MetadataManager
MetadataStore interface {
Expand Down
13 changes: 8 additions & 5 deletions common/persistence/sql/sqlplugin/matching_task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ type (
DataEncoding string
}

AddBuildIdsToTaskQueueMapping struct {
AddToBuildIdToTaskQueueMapping struct {
NamespaceID []byte
TaskQueueName string
BuildIds []string
}

RemoveBuildIdsToTaskQueueMapping struct {
RemoveFromBuildIdToTaskQueueMapping struct {
NamespaceID []byte
TaskQueueName string
BuildIds []string
Expand All @@ -81,7 +81,10 @@ type (
BuildID string
}

CountTaskQueuesByBuildIdRequest = GetTaskQueuesByBuildIdRequest
CountTaskQueuesByBuildIdRequest struct {
NamespaceID []byte
BuildID string
}

VersionedBlob struct {
Version int64
Expand Down Expand Up @@ -114,8 +117,8 @@ type (
LockTaskQueues(ctx context.Context, filter TaskQueuesFilter) (int64, error)
GetTaskQueueUserData(ctx context.Context, request *GetTaskQueueUserDataRequest) (*VersionedBlob, error)
UpdateTaskQueueUserData(ctx context.Context, request *UpdateTaskQueueDataRequest) error
AddBuildIdToTaskQueueMapping(ctx context.Context, request AddBuildIdsToTaskQueueMapping) error
RemoveBuildIdToTaskQueueMapping(ctx context.Context, request RemoveBuildIdsToTaskQueueMapping) error
AddBuildIdToTaskQueueMapping(ctx context.Context, request AddToBuildIdToTaskQueueMapping) error
RemoveBuildIdToTaskQueueMapping(ctx context.Context, request RemoveFromBuildIdToTaskQueueMapping) error
ListTaskQueueUserDataEntries(ctx context.Context, request *ListTaskQueueUserDataEntriesRequest) ([]TaskQueueUserDataEntry, error)
GetTaskQueuesByBuildId(ctx context.Context, request *GetTaskQueuesByBuildIdRequest) ([]string, error)
CountTaskQueuesByBuildId(ctx context.Context, request *CountTaskQueuesByBuildIdRequest) (int, error)
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (mdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddToBuildIdToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -356,7 +356,7 @@ func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveFromBuildIdToTaskQueueMapping) error {
// TODO(bergundy): implement when we support deletion
panic("not implemented")
}
Expand All @@ -380,7 +380,7 @@ func (mdb *db) GetTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.Ge
return taskQueues, err
}

func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.GetTaskQueuesByBuildIdRequest) (int, error) {
func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.CountTaskQueuesByBuildIdRequest) (int, error) {
var count int
err := mdb.conn.GetContext(ctx, &count, countTaskQueuesByBuildIdQry, request.NamespaceID, request.BuildID)
return count, err
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (pdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddToBuildIdToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -355,7 +355,7 @@ func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (pdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
func (pdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveFromBuildIdToTaskQueueMapping) error {
// TODO(bergundy): implement when we support deletion
panic("not implemented")
}
Expand All @@ -379,7 +379,7 @@ func (pdb *db) GetTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.Ge
return taskQueues, err
}

func (pdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.GetTaskQueuesByBuildIdRequest) (int, error) {
func (pdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.CountTaskQueuesByBuildIdRequest) (int, error) {
var count int
err := pdb.conn.GetContext(ctx, &count, countTaskQueuesByBuildIdQry, request.NamespaceID, request.BuildID)
return count, err
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/sql/sqlplugin/sqlite/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"context"
"database/sql"
"fmt"
"strings"

"go.temporal.io/api/serviceerror"

Expand Down Expand Up @@ -110,7 +111,7 @@ task_queue_id = :task_queue_id
listTaskQueueUserDataQry = `SELECT task_queue_name, data, data_encoding FROM task_queue_user_data WHERE namespace_id = ? AND task_queue_name > ? LIMIT ?`

addBuildIdToTaskQueueMappingQry = `INSERT INTO build_id_to_task_queue (namespace_id, build_id, task_queue_name) VALUES `
removeBuildIdToTaskQueueMappingQry = `DELETE FROM build_id_to_task_queue WHERE `
removeBuildIdToTaskQueueMappingQry = `DELETE FROM build_id_to_task_queue WHERE namespace_id = ? AND task_queue_name = ? AND build_id IN (`
listTaskQueuesByBuildIdQry = `SELECT task_queue_name FROM build_id_to_task_queue WHERE namespace_id = ? AND build_id = ?`
countTaskQueuesByBuildIdQry = `SELECT COUNT(*) FROM build_id_to_task_queue WHERE namespace_id = ? AND build_id = ?`
)
Expand Down Expand Up @@ -346,7 +347,7 @@ func (mdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddToBuildIdToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -362,15 +363,14 @@ func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
query := removeBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
query += "namespace_id = ? AND build_id = ? AND task_queue_name = ?"
if idx < len(request.BuildIds)-1 {
query += " OR "
}
params = append(params, request.NamespaceID, buildId, request.TaskQueueName)
func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveFromBuildIdToTaskQueueMapping) error {
query := removeBuildIdToTaskQueueMappingQry + strings.Repeat("?, ", len(request.BuildIds)-1) + "?)"
// Golang doesn't support appending a string slice to an any slice which is essentially what we're doing here.
params := make([]any, len(request.BuildIds)+2)
params[0] = request.NamespaceID
params[1] = request.TaskQueueName
for i, buildId := range request.BuildIds {
params[i+2] = buildId
}

_, err := mdb.conn.ExecContext(ctx, query, params...)
Expand Down Expand Up @@ -400,7 +400,7 @@ func (mdb *db) GetTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.Ge
return taskQueues, err
}

func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.GetTaskQueuesByBuildIdRequest) (int, error) {
func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.CountTaskQueuesByBuildIdRequest) (int, error) {
var count int
err := mdb.conn.GetContext(ctx, &count, countTaskQueuesByBuildIdQry, request.NamespaceID, request.BuildID)
return count, err
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (m *sqlTaskManager) UpdateTaskQueueUserData(ctx context.Context, request *p
return err
}
if len(request.BuildIdsAdded) > 0 {
err = tx.AddBuildIdToTaskQueueMapping(ctx, sqlplugin.AddBuildIdsToTaskQueueMapping{
err = tx.AddBuildIdToTaskQueueMapping(ctx, sqlplugin.AddToBuildIdToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsAdded,
Expand All @@ -528,7 +528,7 @@ func (m *sqlTaskManager) UpdateTaskQueueUserData(ctx context.Context, request *p
}
}
if len(request.BuildIdsRemoved) > 0 {
err = tx.RemoveBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveBuildIdsToTaskQueueMapping{
err = tx.RemoveBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveFromBuildIdToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsRemoved,
Expand Down Expand Up @@ -599,7 +599,7 @@ func (m *sqlTaskManager) CountTaskQueuesByBuildId(ctx context.Context, request *
if err != nil {
return 0, serviceerror.NewInternal(err.Error())
}
return m.Db.CountTaskQueuesByBuildId(ctx, &sqlplugin.GetTaskQueuesByBuildIdRequest{NamespaceID: namespaceID, BuildID: request.BuildID})
return m.Db.CountTaskQueuesByBuildId(ctx, &sqlplugin.CountTaskQueuesByBuildIdRequest{NamespaceID: namespaceID, BuildID: request.BuildID})
}

// Returns uint32 hash for a particular TaskQueue/Task given a Namespace, TaskQueueName and TaskQueueType
Expand Down
35 changes: 28 additions & 7 deletions common/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@

package common

import commonpb "go.temporal.io/api/common/v1"
import (
commonpb "go.temporal.io/api/common/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
)

const buildIdSearchAttributePrefixVersioned = "versioned"
const buildIdSearchAttributePrefixUnversioned = "unversioned"
const BuildIdSearchAttributeDelimiter = ":"

// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
const UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
const (
buildIdSearchAttributePrefixVersioned = "versioned"
buildIdSearchAttributePrefixUnversioned = "unversioned"
BuildIdSearchAttributeDelimiter = ":"
// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
)

// VersionedBuildIdSearchAttribute returns the search attribute value for an unversioned build id
func VersionedBuildIdSearchAttribute(buildId string) string {
Expand All @@ -53,3 +57,20 @@ func VersionStampToBuildIdSearchAttribute(stamp *commonpb.WorkerVersionStamp) st
}
return UnversionedBuildIdSearchAttribute(stamp.BuildId)
}

func FindBuildId(versionSets []*taskqueuepb.CompatibleVersionSet, buildId string) (setIndex, indexInSet int) {
setIndex = -1
indexInSet = -1
if len(versionSets) > 0 {
for sidx, set := range versionSets {
for bidx, id := range set.BuildIds {
if buildId == id {
setIndex = sidx
indexInSet = bidx
break
}
}
}
}
return setIndex, indexInSet
}
15 changes: 7 additions & 8 deletions schema/cassandra/temporal/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,17 @@ CREATE TABLE tasks (
-- OR
-- Used as a mapping from build id to task queue
CREATE TABLE task_queue_user_data (
namespace_id uuid,
task_queue_name text,
build_id_if_row_is_an_index text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
build_ids set<text>, -- All active build ids in all version sets on this task queue (used in an index below)
version bigint, -- Version of this row, used for optimistic concurrency
namespace_id uuid,
task_queue_name text,
build_id text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
version bigint, -- Version of this row, used for optimistic concurrency
-- task_queue_name is not a part of the parititioning key to allow cheaply iterating all task queues in a single
-- namespace. Access to this table should be infrequent enough that a single partition per namespace can be used.
-- Note that this imposes a limit on total task queue user data within one namespace (see the relevant single
-- partition Cassandra limits).
PRIMARY KEY ((namespace_id), build_id_if_row_is_an_index, task_queue_name)
PRIMARY KEY ((namespace_id), build_id, task_queue_name)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
-- OR
-- Used as a mapping from build id to task queue
CREATE TABLE task_queue_user_data (
namespace_id uuid,
task_queue_name text,
build_id_if_row_is_an_index text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
build_ids set<text>, -- All active build ids in all version sets on this task queue (used in an index below)
version bigint, -- Version of this row, used for optimistic concurrency
namespace_id uuid,
task_queue_name text,
build_id text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
version bigint, -- Version of this row, used for optimistic concurrency
-- task_queue_name is not a part of the parititioning key to allow cheaply iterating all task queues in a single
-- namespace. Access to this table should be infrequent enough that a single partition per namespace can be used.
-- Note that this imposes a limit on total task queue user data within one namespace (see the relevant single
-- partition Cassandra limits).
PRIMARY KEY ((namespace_id), build_id_if_row_is_an_index, task_queue_name)
PRIMARY KEY ((namespace_id), build_id, task_queue_name)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};
Loading

0 comments on commit d844235

Please sign in to comment.