Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address comments from #4346 review #4405

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/matching/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/tools/rpcwrappers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func makeGetMatchingClient(reqType reflect.Type) string {
switch t.Name() {
case "GetBuildIdTaskQueueMappingRequest":
// Pick a random node for this request, it's not associated with a specific task queue.
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%%s\", rand.Int())}"
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%s\", rand.Int())}"
tqtPath = "enumspb.TASK_QUEUE_TYPE_UNSPECIFIED"
return fmt.Sprintf("client, err := c.getClientForTaskqueue(%s, %s, %s)", nsIDPath, tqPath, tqtPath)
case "UpdateTaskQueueUserDataRequest",
Expand Down
20 changes: 10 additions & 10 deletions common/persistence/cassandra/matching_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,30 +134,30 @@ const (

templateGetTaskQueueUserDataQuery = `SELECT data, data_encoding, version
FROM task_queue_user_data
WHERE namespace_id = ? AND build_id_if_row_is_an_index = ''
WHERE namespace_id = ? AND build_id = ''
AND task_queue_name = ?`

templateUpdateTaskQueueUserDataQuery = `UPDATE task_queue_user_data SET
data = ?,
data_encoding = ?,
version = ?
WHERE namespace_id = ?
AND build_id_if_row_is_an_index = ''
AND build_id = ''
AND task_queue_name = ?
IF version = ?`

templateInsertTaskQueueUserDataQuery = `INSERT INTO task_queue_user_data
(namespace_id, build_id_if_row_is_an_index, task_queue_name, data, data_encoding, version) VALUES
(? , '' , ? , ? , ? , 1 ) IF NOT EXISTS`
(namespace_id, build_id, task_queue_name, data, data_encoding, version) VALUES
(? , '' , ? , ? , ? , 1 ) IF NOT EXISTS`

templateInsertBuildIdTaskQueueMappingQuery = `INSERT INTO task_queue_user_data
(namespace_id, build_id_if_row_is_an_index, task_queue_name) VALUES
(? , ? , ?)`
(namespace_id, build_id, task_queue_name) VALUES
(? , ? , ?)`
templateDeleteBuildIdTaskQueueMappingQuery = `DELETE FROM task_queue_user_data
WHERE namespace_id = ? AND build_id_if_row_is_an_index = ? AND task_queue_name = ?`
templateListTaskQueueUserDataQuery = `SELECT task_queue_name, data, data_encoding FROM task_queue_user_data WHERE namespace_id = ? AND build_id_if_row_is_an_index = ''`
templateListTaskQueueNamesByBuildIdQuery = `SELECT task_queue_name FROM task_queue_user_data WHERE namespace_id = ? AND build_id_if_row_is_an_index = ?`
templateCountTaskQueueByBuildIdQuery = `SELECT COUNT(*) FROM task_queue_user_data WHERE namespace_id = ? AND build_id_if_row_is_an_index = ?`
WHERE namespace_id = ? AND build_id = ? AND task_queue_name = ?`
templateListTaskQueueUserDataQuery = `SELECT task_queue_name, data, data_encoding FROM task_queue_user_data WHERE namespace_id = ? AND build_id = ''`
templateListTaskQueueNamesByBuildIdQuery = `SELECT task_queue_name FROM task_queue_user_data WHERE namespace_id = ? AND build_id = ?`
templateCountTaskQueueByBuildIdQuery = `SELECT COUNT(*) FROM task_queue_user_data WHERE namespace_id = ? AND build_id = ?`

// Not much of a need to make this configurable, we're just reading some strings
listTaskQueueNamesByBuildIdPageSize = 100
Expand Down
5 changes: 4 additions & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,10 @@ type (
BuildID string
}

CountTaskQueuesByBuildIdRequest = GetTaskQueuesByBuildIdRequest
CountTaskQueuesByBuildIdRequest struct {
NamespaceID string
BuildID string
}

// ListTaskQueueRequest contains the request params needed to invoke ListTaskQueue API
ListTaskQueueRequest struct {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/mock/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type (
UpdateTaskQueueUserData(ctx context.Context, request *InternalUpdateTaskQueueUserDataRequest) error
ListTaskQueueUserDataEntries(ctx context.Context, request *ListTaskQueueUserDataEntriesRequest) (*InternalListTaskQueueUserDataEntriesResponse, error)
GetTaskQueuesByBuildId(ctx context.Context, request *GetTaskQueuesByBuildIdRequest) ([]string, error)
CountTaskQueuesByBuildId(ctx context.Context, request *GetTaskQueuesByBuildIdRequest) (int, error)
CountTaskQueuesByBuildId(ctx context.Context, request *CountTaskQueuesByBuildIdRequest) (int, error)
}
// MetadataStore is a lower level of MetadataManager
MetadataStore interface {
Expand Down
13 changes: 8 additions & 5 deletions common/persistence/sql/sqlplugin/matching_task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ type (
DataEncoding string
}

AddBuildIdsToTaskQueueMapping struct {
AddToBuildIdToTaskQueueMapping struct {
NamespaceID []byte
TaskQueueName string
BuildIds []string
}

RemoveBuildIdsToTaskQueueMapping struct {
RemoveFromBuildIdToTaskQueueMapping struct {
NamespaceID []byte
TaskQueueName string
BuildIds []string
Expand All @@ -81,7 +81,10 @@ type (
BuildID string
}

CountTaskQueuesByBuildIdRequest = GetTaskQueuesByBuildIdRequest
CountTaskQueuesByBuildIdRequest struct {
NamespaceID []byte
BuildID string
}

VersionedBlob struct {
Version int64
Expand Down Expand Up @@ -114,8 +117,8 @@ type (
LockTaskQueues(ctx context.Context, filter TaskQueuesFilter) (int64, error)
GetTaskQueueUserData(ctx context.Context, request *GetTaskQueueUserDataRequest) (*VersionedBlob, error)
UpdateTaskQueueUserData(ctx context.Context, request *UpdateTaskQueueDataRequest) error
AddBuildIdToTaskQueueMapping(ctx context.Context, request AddBuildIdsToTaskQueueMapping) error
RemoveBuildIdToTaskQueueMapping(ctx context.Context, request RemoveBuildIdsToTaskQueueMapping) error
AddBuildIdToTaskQueueMapping(ctx context.Context, request AddToBuildIdToTaskQueueMapping) error
RemoveBuildIdToTaskQueueMapping(ctx context.Context, request RemoveFromBuildIdToTaskQueueMapping) error
ListTaskQueueUserDataEntries(ctx context.Context, request *ListTaskQueueUserDataEntriesRequest) ([]TaskQueueUserDataEntry, error)
GetTaskQueuesByBuildId(ctx context.Context, request *GetTaskQueuesByBuildIdRequest) ([]string, error)
CountTaskQueuesByBuildId(ctx context.Context, request *CountTaskQueuesByBuildIdRequest) (int, error)
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (mdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddToBuildIdToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -356,7 +356,7 @@ func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveFromBuildIdToTaskQueueMapping) error {
// TODO(bergundy): implement when we support deletion
panic("not implemented")
}
Expand All @@ -380,7 +380,7 @@ func (mdb *db) GetTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.Ge
return taskQueues, err
}

func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.GetTaskQueuesByBuildIdRequest) (int, error) {
func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.CountTaskQueuesByBuildIdRequest) (int, error) {
var count int
err := mdb.conn.GetContext(ctx, &count, countTaskQueuesByBuildIdQry, request.NamespaceID, request.BuildID)
return count, err
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (pdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddToBuildIdToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -355,7 +355,7 @@ func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (pdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
func (pdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveFromBuildIdToTaskQueueMapping) error {
// TODO(bergundy): implement when we support deletion
panic("not implemented")
}
Expand All @@ -379,7 +379,7 @@ func (pdb *db) GetTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.Ge
return taskQueues, err
}

func (pdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.GetTaskQueuesByBuildIdRequest) (int, error) {
func (pdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.CountTaskQueuesByBuildIdRequest) (int, error) {
var count int
err := pdb.conn.GetContext(ctx, &count, countTaskQueuesByBuildIdQry, request.NamespaceID, request.BuildID)
return count, err
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/sql/sqlplugin/sqlite/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"context"
"database/sql"
"fmt"
"strings"

"go.temporal.io/api/serviceerror"

Expand Down Expand Up @@ -110,7 +111,7 @@ task_queue_id = :task_queue_id
listTaskQueueUserDataQry = `SELECT task_queue_name, data, data_encoding FROM task_queue_user_data WHERE namespace_id = ? AND task_queue_name > ? LIMIT ?`

addBuildIdToTaskQueueMappingQry = `INSERT INTO build_id_to_task_queue (namespace_id, build_id, task_queue_name) VALUES `
removeBuildIdToTaskQueueMappingQry = `DELETE FROM build_id_to_task_queue WHERE `
removeBuildIdToTaskQueueMappingQry = `DELETE FROM build_id_to_task_queue WHERE namespace_id = ? AND task_queue_name = ? AND build_id IN (`
listTaskQueuesByBuildIdQry = `SELECT task_queue_name FROM build_id_to_task_queue WHERE namespace_id = ? AND build_id = ?`
countTaskQueuesByBuildIdQry = `SELECT COUNT(*) FROM build_id_to_task_queue WHERE namespace_id = ? AND build_id = ?`
)
Expand Down Expand Up @@ -346,7 +347,7 @@ func (mdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddToBuildIdToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -362,15 +363,14 @@ func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
query := removeBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
query += "namespace_id = ? AND build_id = ? AND task_queue_name = ?"
if idx < len(request.BuildIds)-1 {
query += " OR "
}
params = append(params, request.NamespaceID, buildId, request.TaskQueueName)
func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveFromBuildIdToTaskQueueMapping) error {
query := removeBuildIdToTaskQueueMappingQry + strings.Repeat("?, ", len(request.BuildIds)-1) + "?)"
// Golang doesn't support appending a string slice to an any slice which is essentially what we're doing here.
params := make([]any, len(request.BuildIds)+2)
params[0] = request.NamespaceID
params[1] = request.TaskQueueName
for i, buildId := range request.BuildIds {
params[i+2] = buildId
}

_, err := mdb.conn.ExecContext(ctx, query, params...)
Expand Down Expand Up @@ -400,7 +400,7 @@ func (mdb *db) GetTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.Ge
return taskQueues, err
}

func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.GetTaskQueuesByBuildIdRequest) (int, error) {
func (mdb *db) CountTaskQueuesByBuildId(ctx context.Context, request *sqlplugin.CountTaskQueuesByBuildIdRequest) (int, error) {
var count int
err := mdb.conn.GetContext(ctx, &count, countTaskQueuesByBuildIdQry, request.NamespaceID, request.BuildID)
return count, err
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (m *sqlTaskManager) UpdateTaskQueueUserData(ctx context.Context, request *p
return err
}
if len(request.BuildIdsAdded) > 0 {
err = tx.AddBuildIdToTaskQueueMapping(ctx, sqlplugin.AddBuildIdsToTaskQueueMapping{
err = tx.AddBuildIdToTaskQueueMapping(ctx, sqlplugin.AddToBuildIdToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsAdded,
Expand All @@ -528,7 +528,7 @@ func (m *sqlTaskManager) UpdateTaskQueueUserData(ctx context.Context, request *p
}
}
if len(request.BuildIdsRemoved) > 0 {
err = tx.RemoveBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveBuildIdsToTaskQueueMapping{
err = tx.RemoveBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveFromBuildIdToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsRemoved,
Expand Down Expand Up @@ -599,7 +599,7 @@ func (m *sqlTaskManager) CountTaskQueuesByBuildId(ctx context.Context, request *
if err != nil {
return 0, serviceerror.NewInternal(err.Error())
}
return m.Db.CountTaskQueuesByBuildId(ctx, &sqlplugin.GetTaskQueuesByBuildIdRequest{NamespaceID: namespaceID, BuildID: request.BuildID})
return m.Db.CountTaskQueuesByBuildId(ctx, &sqlplugin.CountTaskQueuesByBuildIdRequest{NamespaceID: namespaceID, BuildID: request.BuildID})
}

// Returns uint32 hash for a particular TaskQueue/Task given a Namespace, TaskQueueName and TaskQueueType
Expand Down
35 changes: 28 additions & 7 deletions common/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@

package common

import commonpb "go.temporal.io/api/common/v1"
import (
commonpb "go.temporal.io/api/common/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
)

const buildIdSearchAttributePrefixVersioned = "versioned"
const buildIdSearchAttributePrefixUnversioned = "unversioned"
const BuildIdSearchAttributeDelimiter = ":"

// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
const UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
const (
buildIdSearchAttributePrefixVersioned = "versioned"
buildIdSearchAttributePrefixUnversioned = "unversioned"
BuildIdSearchAttributeDelimiter = ":"
// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
)

// VersionedBuildIdSearchAttribute returns the search attribute value for an unversioned build id
func VersionedBuildIdSearchAttribute(buildId string) string {
Expand All @@ -53,3 +57,20 @@ func VersionStampToBuildIdSearchAttribute(stamp *commonpb.WorkerVersionStamp) st
}
return UnversionedBuildIdSearchAttribute(stamp.BuildId)
}

func FindBuildId(versionSets []*taskqueuepb.CompatibleVersionSet, buildId string) (setIndex, indexInSet int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll move them into a versioning package

setIndex = -1
indexInSet = -1
if len(versionSets) > 0 {
for sidx, set := range versionSets {
for bidx, id := range set.BuildIds {
if buildId == id {
setIndex = sidx
indexInSet = bidx
break
}
}
}
}
return setIndex, indexInSet
}
15 changes: 7 additions & 8 deletions schema/cassandra/temporal/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,17 @@ CREATE TABLE tasks (
-- OR
-- Used as a mapping from build id to task queue
CREATE TABLE task_queue_user_data (
namespace_id uuid,
task_queue_name text,
build_id_if_row_is_an_index text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
build_ids set<text>, -- All active build ids in all version sets on this task queue (used in an index below)
version bigint, -- Version of this row, used for optimistic concurrency
namespace_id uuid,
task_queue_name text,
build_id text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
version bigint, -- Version of this row, used for optimistic concurrency
-- task_queue_name is not a part of the parititioning key to allow cheaply iterating all task queues in a single
-- namespace. Access to this table should be infrequent enough that a single partition per namespace can be used.
-- Note that this imposes a limit on total task queue user data within one namespace (see the relevant single
-- partition Cassandra limits).
PRIMARY KEY ((namespace_id), build_id_if_row_is_an_index, task_queue_name)
PRIMARY KEY ((namespace_id), build_id, task_queue_name)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
-- OR
-- Used as a mapping from build id to task queue
CREATE TABLE task_queue_user_data (
namespace_id uuid,
task_queue_name text,
build_id_if_row_is_an_index text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
build_ids set<text>, -- All active build ids in all version sets on this task queue (used in an index below)
version bigint, -- Version of this row, used for optimistic concurrency
namespace_id uuid,
task_queue_name text,
build_id text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
version bigint, -- Version of this row, used for optimistic concurrency
-- task_queue_name is not a part of the parititioning key to allow cheaply iterating all task queues in a single
-- namespace. Access to this table should be infrequent enough that a single partition per namespace can be used.
-- Note that this imposes a limit on total task queue user data within one namespace (see the relevant single
-- partition Cassandra limits).
PRIMARY KEY ((namespace_id), build_id_if_row_is_an_index, task_queue_name)
PRIMARY KEY ((namespace_id), build_id, task_queue_name)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};
Loading