From abcac15ce336798090a4612c9f90d4dc8d636021 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 3 Apr 2023 11:31:04 -0700 Subject: [PATCH 1/5] Implement version set merge algorithm --- .../hybrid_logical_clock.go | 85 ++++++++ .../hybrid_logical_clock_test.go | 55 +++++ service/matching/matchingEngine.go | 3 +- service/matching/version_sets.go | 34 +--- service/matching/version_sets_merge.go | 185 +++++++++++++++++ service/matching/version_sets_merge_test.go | 192 ++++++++++++++++++ service/matching/version_sets_test.go | 45 ++-- 7 files changed, 546 insertions(+), 53 deletions(-) create mode 100644 common/clock/hybrid_logical_clock/hybrid_logical_clock.go create mode 100644 common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go create mode 100644 service/matching/version_sets_merge.go create mode 100644 service/matching/version_sets_merge_test.go diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go new file mode 100644 index 00000000000..e5d8cc1a5b5 --- /dev/null +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go @@ -0,0 +1,85 @@ +package hybrid_logical_clock + +import ( + "errors" + "time" + + clockpb "go.temporal.io/server/api/clock/v1" +) + +var ErrClocksEqual = errors.New("HybridLogicalClocks are equal") + +type Clock = clockpb.HybridLogicalClock + +// Next generates the next clock timestamp given the current clock. +// HybridLogicalClock requires the previous clock to ensure that time doesn't move backwards and the next clock is +// monotonically increasing. +func Next(clock Clock) Clock { + wallclock := time.Now().UnixMilli() + return next(clock, wallclock) +} + +func next(clock Clock, wallclock int64) Clock { + // Ensure time does not move backwards + if wallclock < clock.GetWallClock() { + wallclock = clock.GetWallClock() + } + // Ensure timestamp is monotonically increasing + if wallclock == clock.GetWallClock() { + clock.Version = clock.GetVersion() + 1 + } else { + clock.Version = 0 + clock.WallClock = wallclock + } + + return Clock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId} +} + +// Zero generates a zeroed logical clock for the cluster ID. +func Zero(clusterID int64) Clock { + return Clock{WallClock: 0, Version: 0, ClusterId: clusterID} +} + +func sign[T int64 | int32](x T) int { + if x > 0 { + return 1 + } + if x < 0 { + return -1 + } + return 0 +} + +// Compare 2 clocks, returns 0 if a == b, -1 if a > b, 1 if a < b +func Compare(a Clock, b Clock) int { + if a.WallClock == b.WallClock { + if a.Version == b.Version { + return sign(b.ClusterId - a.ClusterId) + } + return sign(b.Version - a.Version) + } + return sign(b.WallClock - a.WallClock) +} + +// Greater returns true if a is greater than b +func Greater(a Clock, b Clock) bool { + return Compare(b, a) > 0 +} + +// Greater returns true if a is greater than b +func Less(a Clock, b Clock) bool { + return Compare(a, b) > 0 +} + +// Max returns the maximum of two clocks +func Max(a Clock, b Clock) Clock { + if Compare(a, b) > 0 { + return b + } + return a +} + +// Equal returns whether two clocks are equal +func Equal(a Clock, b Clock) bool { + return Compare(a, b) == 0 +} diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go new file mode 100644 index 00000000000..7ce1ab93317 --- /dev/null +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go @@ -0,0 +1,55 @@ +package hybrid_logical_clock + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_Next_ReturnsGreaterClock(t *testing.T) { + t0 := Zero(1) + // Same wallclock + t1 := next(t0, 0) + assert.Equal(t, Compare(t0, t1), 1) + // Greater wallclock + t2 := next(t1, 1) + assert.Equal(t, Compare(t1, t2), 1) +} + +func Test_Compare(t *testing.T) { + var t0 Clock + var t1 Clock + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + assert.Equal(t, Compare(t0, t1), 0) + assert.True(t, Equal(t0, t1)) + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 1, Version: 1, ClusterId: 2} + assert.Equal(t, Compare(t0, t1), 1) + // Let's get a -1 in there for sanity + assert.Equal(t, Compare(t1, t0), -1) + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 1, Version: 2, ClusterId: 1} + assert.Equal(t, Compare(t0, t1), 1) + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 2, Version: 1, ClusterId: 1} + assert.Equal(t, Compare(t0, t1), 1) + + assert.True(t, Greater(t1, t0)) + assert.True(t, Less(t0, t1)) +} + +func Test_Max_ReturnsMaximum(t *testing.T) { + t0 := Zero(1) + t1 := Zero(2) + + max := Max(t0, t1) + assert.Equal(t, max, t1) + // Just in case it doesn't work in reverse order... + max = Max(t1, t0) + assert.Equal(t, max, t1) +} diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 8e4c81e1e17..3dad4afbf2b 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -45,6 +45,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -723,7 +724,7 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility( err = tqMgr.UpdateUserData(hCtx.Context, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, error) { clock := data.GetClock() if clock == nil { - tmp := zeroHLC(e.clusterMeta.GetClusterID()) + tmp := hlc.Zero(e.clusterMeta.GetClusterID()) clock = &tmp } diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index 91c4e70d900..f67ca2e8d11 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -27,14 +27,13 @@ package matching import ( "encoding/binary" "fmt" - "time" "github.com/dgryski/go-farm" "go.temporal.io/api/serviceerror" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" - clockpb "go.temporal.io/server/api/clock/v1" "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" ) // ToBuildIdOrderingResponse transforms the internal VersioningData representation to public representation. @@ -117,8 +116,8 @@ func checkLimits(g *persistence.VersioningData, maxSets, maxBuildIDs int) error // Deletions are performed by a background process which verifies build IDs are no longer in use and safe to delete (not yet implemented). // // Update may fail with FailedPrecondition if it would cause exceeding the supplied limits. -func UpdateVersionSets(clock clockpb.HybridLogicalClock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (clockpb.HybridLogicalClock, *persistence.VersioningData, error) { - clock = generateHLCTimestamp(clock) +func UpdateVersionSets(clock hlc.Clock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (hlc.Clock, *persistence.VersioningData, error) { + clock = hlc.Next(clock) data, err := updateImpl(clock, data, req) if err != nil { return clock, nil, err @@ -130,7 +129,7 @@ func UpdateVersionSets(clock clockpb.HybridLogicalClock, data *persistence.Versi } //nolint:revive // cyclomatic complexity -func updateImpl(timestamp clockpb.HybridLogicalClock, existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistence.VersioningData, error) { +func updateImpl(timestamp hlc.Clock, existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistence.VersioningData, error) { // First find if the targeted version is already in the sets targetedVersion := extractTargetedVersion(req) findRes := findVersion(existingData, targetedVersion) @@ -231,7 +230,7 @@ func findVersion(data *persistence.VersioningData, buildID string) findVersionRe } } -func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *clockpb.HybridLogicalClock) { +func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *hlc.Clock) { data.DefaultUpdateTimestamp = timestamp if len(data.VersionSets) <= 1 { return @@ -244,7 +243,7 @@ func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *cloc } } -func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx int, timestamp *clockpb.HybridLogicalClock) { +func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx int, timestamp *hlc.Clock) { data.VersionSets[setIx].DefaultUpdateTimestamp = timestamp setVersions := data.VersionSets[setIx].BuildIds if len(setVersions) <= 1 { @@ -257,24 +256,3 @@ func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx setVersions[len(setVersions)-1] = moveMe } } - -func generateHLCTimestamp(clock clockpb.HybridLogicalClock) clockpb.HybridLogicalClock { - wallclock := time.Now().UnixMilli() - // Ensure time does not move backwards - if wallclock < clock.GetWallClock() { - wallclock = clock.GetWallClock() - } - // Ensure timestamp is monotonically increasing - if wallclock == clock.GetWallClock() { - clock.Version = clock.GetVersion() + 1 - } else { - clock.Version = 0 - clock.WallClock = wallclock - } - - return clockpb.HybridLogicalClock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId} -} - -func zeroHLC(clusterID int64) clockpb.HybridLogicalClock { - return clockpb.HybridLogicalClock{WallClock: 0, Version: 0, ClusterId: clusterID} -} diff --git a/service/matching/version_sets_merge.go b/service/matching/version_sets_merge.go new file mode 100644 index 00000000000..e082b8de9d3 --- /dev/null +++ b/service/matching/version_sets_merge.go @@ -0,0 +1,185 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "sort" + + persistencepb "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" +) + +// Merge and sort two sets of set IDs +func mergeSetIDs(a []string, b []string) []string { + var mergedSetIDs []string + seenSetIDs := make(map[string]struct{}, len(a)) + mergedSetIDs = append(mergedSetIDs, a...) + for _, setID := range a { + seenSetIDs[setID] = struct{}{} + } + for _, setID := range b { + if _, found := seenSetIDs[setID]; !found { + mergedSetIDs = append(mergedSetIDs, setID) + } + } + sort.Strings(mergedSetIDs) + return mergedSetIDs +} + +// Merge and sort two sets of set IDs +func setContainsSetIDs(set *persistencepb.CompatibleVersionSet, ids []string) bool { + for _, needle := range ids { + for _, id := range set.SetIds { + if needle == id { + return true + } + } + } + return false +} + +func findSetWithSetIDs(sets []*persistencepb.CompatibleVersionSet, ids []string) *persistencepb.CompatibleVersionSet { + for _, set := range sets { + if setContainsSetIDs(set, ids) { + return set + } + } + return nil +} + +type buildIDInfo struct { + state persistencepb.BuildID_State + stateUpdateTimestamp hlc.Clock + setIDs []string + madeDefaultAt hlc.Clock +} + +// MergeVersioningData merges two VersioningData structs. +// If a build ID appears in both data structures, the merged structure will include that latest status and timestamp. +// If a build ID appears in different sets in the different structures, those sets will be merged. +// The merged data's per set default and global default will be set according to the latest timestamps in the sources. +func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.VersioningData) *persistencepb.VersioningData { + buildIDToInfo := make(map[string]buildIDInfo, 0) + // Collect information about each build ID from both sources + for _, set := range append(a.VersionSets, b.VersionSets...) { + lastIdx := len(set.BuildIds) - 1 + for setIdx, buildID := range set.BuildIds { + if info, found := buildIDToInfo[buildID.Id]; found { + // A build ID appears in more than one source, merge its information, and track it + state := info.state + stateUpdateTimestamp := hlc.Max(*buildID.StateUpdateTimestamp, info.stateUpdateTimestamp) + if hlc.Equal(stateUpdateTimestamp, *buildID.StateUpdateTimestamp) { + state = buildID.State + } + madeDefaultAt := info.madeDefaultAt + if setIdx == lastIdx { + madeDefaultAt = hlc.Max(*set.DefaultUpdateTimestamp, madeDefaultAt) + } + + buildIDToInfo[buildID.Id] = buildIDInfo{ + state: state, + stateUpdateTimestamp: stateUpdateTimestamp, + setIDs: mergeSetIDs(info.setIDs, set.SetIds), + madeDefaultAt: madeDefaultAt, + } + } else { + // A build ID was seen for the first time, track it + madeDefaultAt := hlc.Zero(0) + if setIdx == lastIdx { + madeDefaultAt = *set.DefaultUpdateTimestamp + } + buildIDToInfo[buildID.Id] = buildIDInfo{ + state: buildID.State, + stateUpdateTimestamp: *buildID.StateUpdateTimestamp, + setIDs: set.SetIds, + madeDefaultAt: madeDefaultAt, + } + } + } + } + // Build the merged compatible sets using collected build ID information + mergedSets := make([]*persistencepb.CompatibleVersionSet, 0) + for id, info := range buildIDToInfo { + set := findSetWithSetIDs(mergedSets, info.setIDs) + if set == nil { + defaultTimestamp := hlc.Zero(0) + set = &persistencepb.CompatibleVersionSet{ + SetIds: info.setIDs, + BuildIds: make([]*persistencepb.BuildID, 0), + DefaultUpdateTimestamp: &defaultTimestamp, + } + mergedSets = append(mergedSets, set) + } else { + set.SetIds = mergeSetIDs(set.SetIds, info.setIDs) + } + timestamp := info.stateUpdateTimestamp + buildID := &persistencepb.BuildID{ + Id: id, + State: info.state, + StateUpdateTimestamp: ×tamp, + } + defaultTimestamp := info.madeDefaultAt + + // Insert the build ID in the right order based on whether it is the default or by its update timestamp + if hlc.Greater(*set.DefaultUpdateTimestamp, defaultTimestamp) { + // Can't be the last element, it's the default already + lastIdx := len(set.BuildIds) - 1 + for idx, curr := range set.BuildIds { + if idx == lastIdx || hlc.Greater(*curr.StateUpdateTimestamp, timestamp) { + // Insert just before + set.BuildIds = append(set.BuildIds[:idx+1], set.BuildIds[idx:]...) + set.BuildIds[idx] = buildID + break + } + } + } else { + set.DefaultUpdateTimestamp = &defaultTimestamp + set.BuildIds = append(set.BuildIds, buildID) + } + } + maxDefaultTimestamp := hlc.Max(*b.DefaultUpdateTimestamp, *a.DefaultUpdateTimestamp) + + // Sort the sets based on their default update timestamp, ensuring the default set comes last + defaultSetIds := a.VersionSets[len(a.VersionSets)-1].SetIds + if hlc.Equal(maxDefaultTimestamp, *b.DefaultUpdateTimestamp) { + defaultSetIds = b.VersionSets[len(b.VersionSets)-1].SetIds + } + sort.Slice(mergedSets, func(i, j int) bool { + si := mergedSets[i] + sj := mergedSets[j] + if setContainsSetIDs(si, defaultSetIds) { + return false + } + if setContainsSetIDs(sj, defaultSetIds) { + return true + } + return hlc.Less(*si.DefaultUpdateTimestamp, *sj.DefaultUpdateTimestamp) + }) + + return &persistencepb.VersioningData{ + VersionSets: mergedSets, + DefaultUpdateTimestamp: &maxDefaultTimestamp, + } +} diff --git a/service/matching/version_sets_merge_test.go b/service/matching/version_sets_merge_test.go new file mode 100644 index 00000000000..905ce42d592 --- /dev/null +++ b/service/matching/version_sets_merge_test.go @@ -0,0 +1,192 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + persistencepb "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" +) + +func fromWallClock(wallclock int64) *hlc.Clock { + return &hlc.Clock{WallClock: wallclock, Version: 0, ClusterId: 1} +} + +func buildID(wallclock int64, id string, optionalState ...persistencepb.BuildID_State) *persistencepb.BuildID { + state := persistencepb.STATE_ACTIVE + if len(optionalState) == 1 { + state = optionalState[0] + } + + return &persistencepb.BuildID{ + Id: id, + State: state, + StateUpdateTimestamp: fromWallClock(wallclock), + } +} + +func mkBuildIDs(buildIDs ...*persistencepb.BuildID) []*persistencepb.BuildID { + buildIDStructs := make([]*persistencepb.BuildID, len(buildIDs)) + for i, buildID := range buildIDs { + buildIDStructs[i] = &persistencepb.BuildID{ + Id: buildID.Id, + State: persistencepb.STATE_ACTIVE, + StateUpdateTimestamp: buildID.StateUpdateTimestamp, + } + } + return buildIDStructs +} + +func mkSet(setID string, buildIDs ...*persistencepb.BuildID) *persistencepb.CompatibleVersionSet { + return &persistencepb.CompatibleVersionSet{ + SetIds: []string{setID}, + BuildIds: mkBuildIDs(buildIDs...), + DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, + } +} + +func mkSingleSetData(setID string, buildIDs ...*persistencepb.BuildID) *persistencepb.VersioningData { + return &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{mkSet(setID, buildIDs...)}, + DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, + } +} + +func TestSetMerge_IdenticalBuildIDsAndGreaterUpdateTimestamp_SetsMaxUpdateTimestamp(t *testing.T) { + // look here 👇 + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.2")) + b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + assert.Equal(t, a, MergeVersioningData(a, b)) + assert.Equal(t, a, MergeVersioningData(b, a)) +} + +func TestSetMerge_AdditionalBuildIDAndGreaterUpdateTimestamp_MergesBuildIDsAndSetsMaxUpdateTimestamp(t *testing.T) { + a := mkSingleSetData("0.1", buildID(6, "0.1")) + b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + expected := mkSingleSetData("0.1", buildID(3, "0.2"), buildID(6, "0.1")) + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestSetMerge_NewerDefault_PrefersDefaultAndSetsMaxUpdatedAt(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.3")) + b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + expected := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2"), buildID(6, "0.3")) + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestDataMerge_PrefersNewerDefaultAndMergesDefault(t *testing.T) { + a := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.1", buildID(2, "0.1")), + mkSet("1.0", buildID(3, "1.0")), + }, + DefaultUpdateTimestamp: fromWallClock(3), + } + b := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("1.0", buildID(3, "1.0")), + mkSet("0.1", buildID(2, "0.1"), buildID(4, "0.2")), + }, + DefaultUpdateTimestamp: fromWallClock(4), + } + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} + +func TestSetMerge_DifferentSetIDs_MergesSetIDs(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.2")) + b := mkSingleSetData("0.2", buildID(3, "0.2")) + expected := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{{ + SetIds: []string{"0.1", "0.2"}, + BuildIds: mkBuildIDs(buildID(1, "0.1"), buildID(6, "0.2")), + DefaultUpdateTimestamp: fromWallClock(6), + }}, + DefaultUpdateTimestamp: fromWallClock(6), + } + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestSetMerge_DifferentStates_UpdatesTimestampsAndState(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.2", persistencepb.STATE_DELETED), buildID(7, "0.3", persistencepb.STATE_DELETED)) + b := mkSingleSetData("0.1", buildID(3, "0.1", persistencepb.STATE_DELETED), buildID(7, "0.2"), buildID(8, "0.3")) + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} + +func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + b := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.1", buildID(1, "0.1")), + mkSet("0.2", buildID(2, "0.2")), + }, + DefaultUpdateTimestamp: fromWallClock(2), + } + expected := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{{ + SetIds: []string{"0.1", "0.2"}, + BuildIds: mkBuildIDs(buildID(1, "0.1"), buildID(3, "0.2")), + DefaultUpdateTimestamp: fromWallClock(3), + }}, + DefaultUpdateTimestamp: fromWallClock(3), + } + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestSetMerge_BuildIDPromoted_PreservesSetDefault(t *testing.T) { + a := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) + a.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3) + b := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) + b.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3) + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} + +func TestSetMerge_SetPromoted_PreservesGlobalDefault(t *testing.T) { + a := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.2", buildID(2, "0.2")), + mkSet("0.1", buildID(1, "0.1")), + }, + DefaultUpdateTimestamp: fromWallClock(3), + } + b := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.2", buildID(2, "0.2")), + mkSet("0.1", buildID(1, "0.1")), + }, + DefaultUpdateTimestamp: fromWallClock(3), + } + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} diff --git a/service/matching/version_sets_test.go b/service/matching/version_sets_test.go index 5b71c38f0e4..5b30293e4e6 100644 --- a/service/matching/version_sets_test.go +++ b/service/matching/version_sets_test.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/api/workflowservice/v1" clockpb "go.temporal.io/server/api/clock/v1" persistencepb "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" ) func mkNewSet(id string, clock clockpb.HybridLogicalClock) *persistencepb.CompatibleVersionSet { @@ -57,7 +58,7 @@ func mkInitialData(numSets int, clock clockpb.HybridLogicalClock) *persistencepb } func mkUserData(numSets int) *persistencepb.TaskQueueUserData { - clock := zeroHLC(1) + clock := hlc.Zero(1) return &persistencepb.TaskQueueUserData{ Clock: &clock, VersioningData: mkInitialData(numSets, clock), @@ -97,17 +98,13 @@ func mkPromoteInSet(id string) *workflowservice.UpdateWorkerBuildIdCompatibility } } +// Asserts that clock1 is greater than clock2 func assertClockGreater(t *testing.T, clock1 clockpb.HybridLogicalClock, clock2 clockpb.HybridLogicalClock) { - if clock1.Version == clock2.Version { - assert.GreaterOrEqual(t, clock1.WallClock, clock2.WallClock) - } else { - assert.Equal(t, clock1.WallClock, clock2.WallClock) - } - assert.Equal(t, clock1.ClusterId, clock2.ClusterId) + assert.True(t, hlc.Greater(clock1, clock2)) } func TestNewDefaultUpdate(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewDefReq("2") @@ -143,7 +140,7 @@ func TestNewDefaultUpdate(t *testing.T) { } func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(0, clock) req := mkNewDefReq("1") @@ -166,7 +163,7 @@ func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) { } func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewCompatReq("1.1", "1", true) @@ -197,7 +194,7 @@ func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) { } func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewCompatReq("0.1", "0", true) @@ -228,7 +225,7 @@ func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) { } func TestNewCompatibleWithVerInOlderSet(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewCompatReq("0.1", "0", false) @@ -262,7 +259,7 @@ func TestNewCompatibleWithVerInOlderSet(t *testing.T) { } func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { - clock0 := zeroHLC(1) + clock0 := hlc.Zero(1) data := mkInitialData(2, clock0) req := mkNewCompatReq("0.1", "0", false) @@ -324,7 +321,7 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { } func TestCompatibleTargetsNotFound(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(1, clock) req := mkNewCompatReq("1.1", "1", false) @@ -334,7 +331,7 @@ func TestCompatibleTargetsNotFound(t *testing.T) { } func TestMakeExistingSetDefault(t *testing.T) { - clock0 := zeroHLC(1) + clock0 := hlc.Zero(1) data := mkInitialData(3, clock0) req := mkExistingDefault("1") @@ -398,7 +395,7 @@ func TestMakeExistingSetDefault(t *testing.T) { } func TestSayVersionIsCompatWithDifferentSetThanItsAlreadyCompatWithNotAllowed(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkNewCompatReq("0.1", "0", false) @@ -412,7 +409,7 @@ func TestSayVersionIsCompatWithDifferentSetThanItsAlreadyCompatWithNotAllowed(t } func TestLimitsMaxSets(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) maxSets := 10 data := mkInitialData(maxSets, clock) @@ -423,7 +420,7 @@ func TestLimitsMaxSets(t *testing.T) { } func TestLimitsMaxBuildIDs(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) maxBuildIDs := 10 data := mkInitialData(maxBuildIDs, clock) @@ -434,7 +431,7 @@ func TestLimitsMaxBuildIDs(t *testing.T) { } func TestPromoteWithinVersion(t *testing.T) { - clock0 := zeroHLC(1) + clock0 := hlc.Zero(1) data := mkInitialData(2, clock0) req := mkNewCompatReq("0.1", "0", false) @@ -470,7 +467,7 @@ func TestPromoteWithinVersion(t *testing.T) { } func TestAddAlreadyExtantVersionAsDefaultErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkNewDefReq("0") @@ -480,7 +477,7 @@ func TestAddAlreadyExtantVersionAsDefaultErrors(t *testing.T) { } func TestAddAlreadyExtantVersionToAnotherSetErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkNewCompatReq("0", "1", false) @@ -490,7 +487,7 @@ func TestAddAlreadyExtantVersionToAnotherSetErrors(t *testing.T) { } func TestMakeSetDefaultTargetingNonexistentVersionErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkExistingDefault("crab boi") @@ -500,7 +497,7 @@ func TestMakeSetDefaultTargetingNonexistentVersionErrors(t *testing.T) { } func TestPromoteWithinSetTargetingNonexistentVersionErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkPromoteInSet("i'd rather be writing rust ;)") @@ -510,7 +507,7 @@ func TestPromoteWithinSetTargetingNonexistentVersionErrors(t *testing.T) { } func TestToBuildIdOrderingResponseTrimsResponse(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) actual := ToBuildIdOrderingResponse(data, 2) expected := []*taskqueuepb.CompatibleVersionSet{{BuildIds: []string{"1"}}, {BuildIds: []string{"2"}}} From e40895bd0186d654e7e32e4ceaaa189d19e89257 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Fri, 21 Apr 2023 16:14:37 -0700 Subject: [PATCH 2/5] Use TimeSource interface --- .../hybrid_logical_clock.go | 10 +-- .../hybrid_logical_clock_test.go | 10 ++- service/matching/matchingEngine.go | 9 ++- service/matching/version_sets.go | 9 ++- service/matching/version_sets_test.go | 69 ++++++++++--------- 5 files changed, 58 insertions(+), 49 deletions(-) diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go index e5d8cc1a5b5..65c7c734d3a 100644 --- a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go @@ -2,9 +2,9 @@ package hybrid_logical_clock import ( "errors" - "time" clockpb "go.temporal.io/server/api/clock/v1" + commonclock "go.temporal.io/server/common/clock" ) var ErrClocksEqual = errors.New("HybridLogicalClocks are equal") @@ -14,12 +14,8 @@ type Clock = clockpb.HybridLogicalClock // Next generates the next clock timestamp given the current clock. // HybridLogicalClock requires the previous clock to ensure that time doesn't move backwards and the next clock is // monotonically increasing. -func Next(clock Clock) Clock { - wallclock := time.Now().UnixMilli() - return next(clock, wallclock) -} - -func next(clock Clock, wallclock int64) Clock { +func Next(clock Clock, source commonclock.TimeSource) Clock { + wallclock := source.Now().UnixMilli() // Ensure time does not move backwards if wallclock < clock.GetWallClock() { wallclock = clock.GetWallClock() diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go index 7ce1ab93317..7c5b3f0149a 100644 --- a/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go @@ -2,17 +2,23 @@ package hybrid_logical_clock import ( "testing" + "time" "github.com/stretchr/testify/assert" + commonclock "go.temporal.io/server/common/clock" ) func Test_Next_ReturnsGreaterClock(t *testing.T) { t0 := Zero(1) + timesource := commonclock.NewEventTimeSource() + // Same wallclock - t1 := next(t0, 0) + timesource.Update(time.Unix(0, 0).UTC()) + t1 := Next(t0, timesource) assert.Equal(t, Compare(t0, t1), 1) // Greater wallclock - t2 := next(t1, 1) + timesource.Update(time.Unix(0, 1).UTC()) + t2 := Next(t1, timesource) assert.Equal(t, Compare(t1, t2), 1) } diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 3dad4afbf2b..2375692135d 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -45,6 +45,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/clock" hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" @@ -104,6 +105,7 @@ type ( namespaceRegistry namespace.Registry keyResolver membership.ServiceResolver clusterMeta cluster.Metadata + timeSource clock.TimeSource } ) @@ -151,6 +153,7 @@ func NewEngine( namespaceRegistry: namespaceRegistry, keyResolver: resolver, clusterMeta: clusterMeta, + timeSource: clock.NewRealTimeSource(), // No need to mock this at the moment } } @@ -727,9 +730,9 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility( tmp := hlc.Zero(e.clusterMeta.GetClusterID()) clock = &tmp } - - updatedClock, versioningData, err := UpdateVersionSets( - *clock, + updatedClock := hlc.Next(*clock, e.timeSource) + versioningData, err := UpdateVersionSets( + updatedClock, data.GetVersioningData(), req.GetRequest(), e.config.VersionCompatibleSetLimitPerQueue(), diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index f67ca2e8d11..e45b0c6ca56 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -116,16 +116,15 @@ func checkLimits(g *persistence.VersioningData, maxSets, maxBuildIDs int) error // Deletions are performed by a background process which verifies build IDs are no longer in use and safe to delete (not yet implemented). // // Update may fail with FailedPrecondition if it would cause exceeding the supplied limits. -func UpdateVersionSets(clock hlc.Clock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (hlc.Clock, *persistence.VersioningData, error) { - clock = hlc.Next(clock) +func UpdateVersionSets(clock hlc.Clock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (*persistence.VersioningData, error) { data, err := updateImpl(clock, data, req) if err != nil { - return clock, nil, err + return nil, err } if err := checkLimits(data, maxSets, maxBuildIDs); err != nil { - return clock, nil, err + return nil, err } - return clock, data, nil + return data, nil } //nolint:revive // cyclomatic complexity diff --git a/service/matching/version_sets_test.go b/service/matching/version_sets_test.go index 5b30293e4e6..4f621f3583e 100644 --- a/service/matching/version_sets_test.go +++ b/service/matching/version_sets_test.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/api/workflowservice/v1" clockpb "go.temporal.io/server/api/clock/v1" persistencepb "go.temporal.io/server/api/persistence/v1" + commonclock "go.temporal.io/server/common/clock" hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" ) @@ -98,19 +99,14 @@ func mkPromoteInSet(id string) *workflowservice.UpdateWorkerBuildIdCompatibility } } -// Asserts that clock1 is greater than clock2 -func assertClockGreater(t *testing.T, clock1 clockpb.HybridLogicalClock, clock2 clockpb.HybridLogicalClock) { - assert.True(t, hlc.Greater(clock1, clock2)) -} - func TestNewDefaultUpdate(t *testing.T) { clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewDefReq("2") - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -144,9 +140,9 @@ func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) { initialData := mkInitialData(0, clock) req := mkNewDefReq("1") - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(0, clock), initialData) expected := &persistencepb.VersioningData{ @@ -167,9 +163,9 @@ func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) { initialData := mkInitialData(2, clock) req := mkNewCompatReq("1.1", "1", true) - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -198,9 +194,9 @@ func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) { initialData := mkInitialData(2, clock) req := mkNewCompatReq("0.1", "0", true) - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -229,9 +225,9 @@ func TestNewCompatibleWithVerInOlderSet(t *testing.T) { initialData := mkInitialData(2, clock) req := mkNewCompatReq("0.1", "0", false) - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -263,11 +259,13 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { data := mkInitialData(2, clock0) req := mkNewCompatReq("0.1", "0", false) - clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0) + clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource()) + data, err := UpdateVersionSets(clock1, data, req, 0, 0) assert.NoError(t, err) req = mkNewCompatReq("0.2", "0.1", false) - clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0) + clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock2, data, req, 0, 0) assert.NoError(t, err) expected := &persistencepb.VersioningData{ @@ -293,7 +291,8 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { assert.Equal(t, expected, data) // Ensure setting a compatible version which targets a non-leaf compat version ends up without a branch req = mkNewCompatReq("0.3", "0.1", false) - clock3, data, err := UpdateVersionSets(clock2, data, req, 0, 0) + clock3 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock3, data, req, 0, 0) assert.NoError(t, err) expected = &persistencepb.VersioningData{ @@ -325,7 +324,8 @@ func TestCompatibleTargetsNotFound(t *testing.T) { data := mkInitialData(1, clock) req := mkNewCompatReq("1.1", "1", false) - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + _, err := UpdateVersionSets(nextClock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.NotFound{}, err) } @@ -335,7 +335,8 @@ func TestMakeExistingSetDefault(t *testing.T) { data := mkInitialData(3, clock0) req := mkExistingDefault("1") - clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0) + clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource()) + data, err := UpdateVersionSets(clock1, data, req, 0, 0) assert.NoError(t, err) expected := &persistencepb.VersioningData{ @@ -365,7 +366,8 @@ func TestMakeExistingSetDefault(t *testing.T) { // Add a compatible version to a set and then make that set the default via the compatible version req = mkNewCompatReq("0.1", "0", true) - clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0) + clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock2, data, req, 0, 0) assert.NoError(t, err) expected = &persistencepb.VersioningData{ @@ -399,11 +401,11 @@ func TestSayVersionIsCompatWithDifferentSetThanItsAlreadyCompatWithNotAllowed(t data := mkInitialData(3, clock) req := mkNewCompatReq("0.1", "0", false) - _, data, err := UpdateVersionSets(clock, data, req, 0, 0) + data, err := UpdateVersionSets(clock, data, req, 0, 0) assert.NoError(t, err) req = mkNewCompatReq("0.1", "1", false) - _, _, err = UpdateVersionSets(clock, data, req, 0, 0) + _, err = UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.InvalidArgument{}, err) } @@ -414,7 +416,7 @@ func TestLimitsMaxSets(t *testing.T) { data := mkInitialData(maxSets, clock) req := mkNewDefReq("10") - _, _, err := UpdateVersionSets(clock, data, req, maxSets, 0) + _, err := UpdateVersionSets(clock, data, req, maxSets, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.FailedPrecondition{}, err) } @@ -425,7 +427,7 @@ func TestLimitsMaxBuildIDs(t *testing.T) { data := mkInitialData(maxBuildIDs, clock) req := mkNewDefReq("10") - _, _, err := UpdateVersionSets(clock, data, req, 0, maxBuildIDs) + _, err := UpdateVersionSets(clock, data, req, 0, maxBuildIDs) assert.Error(t, err) assert.IsType(t, &serviceerror.FailedPrecondition{}, err) } @@ -435,13 +437,16 @@ func TestPromoteWithinVersion(t *testing.T) { data := mkInitialData(2, clock0) req := mkNewCompatReq("0.1", "0", false) - clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0) + clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource()) + data, err := UpdateVersionSets(clock1, data, req, 0, 0) assert.NoError(t, err) req = mkNewCompatReq("0.2", "0", false) - clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0) + clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock2, data, req, 0, 0) assert.NoError(t, err) req = mkPromoteInSet("0.1") - clock3, data, err := UpdateVersionSets(clock2, data, req, 0, 0) + clock3 := hlc.Next(clock2, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock3, data, req, 0, 0) assert.NoError(t, err) expected := &persistencepb.VersioningData{ @@ -471,7 +476,7 @@ func TestAddAlreadyExtantVersionAsDefaultErrors(t *testing.T) { data := mkInitialData(3, clock) req := mkNewDefReq("0") - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.InvalidArgument{}, err) } @@ -481,7 +486,7 @@ func TestAddAlreadyExtantVersionToAnotherSetErrors(t *testing.T) { data := mkInitialData(3, clock) req := mkNewCompatReq("0", "1", false) - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.InvalidArgument{}, err) } @@ -491,7 +496,7 @@ func TestMakeSetDefaultTargetingNonexistentVersionErrors(t *testing.T) { data := mkInitialData(3, clock) req := mkExistingDefault("crab boi") - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.NotFound{}, err) } @@ -501,7 +506,7 @@ func TestPromoteWithinSetTargetingNonexistentVersionErrors(t *testing.T) { data := mkInitialData(3, clock) req := mkPromoteInSet("i'd rather be writing rust ;)") - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.NotFound{}, err) } From b565b4a1821e7d601f41adf148742496aa16aa3a Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Fri, 21 Apr 2023 16:45:13 -0700 Subject: [PATCH 3/5] Simplify merge code --- service/matching/version_sets_merge.go | 58 ++++++++++++++++---------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/service/matching/version_sets_merge.go b/service/matching/version_sets_merge.go index e082b8de9d3..b5aa339ae81 100644 --- a/service/matching/version_sets_merge.go +++ b/service/matching/version_sets_merge.go @@ -76,14 +76,9 @@ type buildIDInfo struct { madeDefaultAt hlc.Clock } -// MergeVersioningData merges two VersioningData structs. -// If a build ID appears in both data structures, the merged structure will include that latest status and timestamp. -// If a build ID appears in different sets in the different structures, those sets will be merged. -// The merged data's per set default and global default will be set according to the latest timestamps in the sources. -func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.VersioningData) *persistencepb.VersioningData { +func collectBuildIDInfo(sets []*persistencepb.CompatibleVersionSet) map[string]buildIDInfo { buildIDToInfo := make(map[string]buildIDInfo, 0) - // Collect information about each build ID from both sources - for _, set := range append(a.VersionSets, b.VersionSets...) { + for _, set := range sets { lastIdx := len(set.BuildIds) - 1 for setIdx, buildID := range set.BuildIds { if info, found := buildIDToInfo[buildID.Id]; found { @@ -119,10 +114,13 @@ func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.Versi } } } - // Build the merged compatible sets using collected build ID information - mergedSets := make([]*persistencepb.CompatibleVersionSet, 0) + return buildIDToInfo +} + +func intoVersionSets(buildIDToInfo map[string]buildIDInfo, defaultSetIds []string) []*persistencepb.CompatibleVersionSet { + sets := make([]*persistencepb.CompatibleVersionSet, 0) for id, info := range buildIDToInfo { - set := findSetWithSetIDs(mergedSets, info.setIDs) + set := findSetWithSetIDs(sets, info.setIDs) if set == nil { defaultTimestamp := hlc.Zero(0) set = &persistencepb.CompatibleVersionSet{ @@ -130,7 +128,7 @@ func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.Versi BuildIds: make([]*persistencepb.BuildID, 0), DefaultUpdateTimestamp: &defaultTimestamp, } - mergedSets = append(mergedSets, set) + sets = append(sets, set) } else { set.SetIds = mergeSetIDs(set.SetIds, info.setIDs) } @@ -159,16 +157,15 @@ func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.Versi set.BuildIds = append(set.BuildIds, buildID) } } - maxDefaultTimestamp := hlc.Max(*b.DefaultUpdateTimestamp, *a.DefaultUpdateTimestamp) - // Sort the sets based on their default update timestamp, ensuring the default set comes last - defaultSetIds := a.VersionSets[len(a.VersionSets)-1].SetIds - if hlc.Equal(maxDefaultTimestamp, *b.DefaultUpdateTimestamp) { - defaultSetIds = b.VersionSets[len(b.VersionSets)-1].SetIds - } - sort.Slice(mergedSets, func(i, j int) bool { - si := mergedSets[i] - sj := mergedSets[j] + sortSets(sets, defaultSetIds) + return sets +} + +func sortSets(sets []*persistencepb.CompatibleVersionSet, defaultSetIds []string) { + sort.Slice(sets, func(i, j int) bool { + si := sets[i] + sj := sets[j] if setContainsSetIDs(si, defaultSetIds) { return false } @@ -177,9 +174,28 @@ func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.Versi } return hlc.Less(*si.DefaultUpdateTimestamp, *sj.DefaultUpdateTimestamp) }) +} + +// MergeVersioningData merges two VersioningData structs. +// If a build ID appears in both data structures, the merged structure will include that latest status and timestamp. +// If a build ID appears in different sets in the different structures, those sets will be merged. +// The merged data's per set default and global default will be set according to the latest timestamps in the sources. +func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.VersioningData) *persistencepb.VersioningData { + // Collect information about each build ID from both sources + buildIDToInfo := collectBuildIDInfo(append(a.VersionSets, b.VersionSets...)) + + maxDefaultTimestamp := hlc.Max(*b.DefaultUpdateTimestamp, *a.DefaultUpdateTimestamp) + + defaultSetIds := a.VersionSets[len(a.VersionSets)-1].SetIds + if hlc.Equal(maxDefaultTimestamp, *b.DefaultUpdateTimestamp) { + defaultSetIds = b.VersionSets[len(b.VersionSets)-1].SetIds + } + + // Build the merged compatible sets using collected build ID information + sets := intoVersionSets(buildIDToInfo, defaultSetIds) return &persistencepb.VersioningData{ - VersionSets: mergedSets, + VersionSets: sets, DefaultUpdateTimestamp: &maxDefaultTimestamp, } } From f9377fd749819af6ebda2d310ae93a0233bc68f0 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Fri, 21 Apr 2023 16:54:28 -0700 Subject: [PATCH 4/5] Add missing license files --- .../hybrid_logical_clock.go | 24 +++++++++++++++++++ .../hybrid_logical_clock_test.go | 24 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go index 65c7c734d3a..dac9e9cb3c5 100644 --- a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go @@ -1,3 +1,27 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package hybrid_logical_clock import ( diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go index 7c5b3f0149a..b5c84e925ed 100644 --- a/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go @@ -1,3 +1,27 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package hybrid_logical_clock import ( From aeca51143bcb2f262d4093eaa52c4b193adceae5 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 24 Apr 2023 17:37:27 -0700 Subject: [PATCH 5/5] Address review comments, fix unit test --- common/clock/hybrid_logical_clock/hybrid_logical_clock.go | 4 ---- service/matching/matchingEngine_test.go | 2 ++ service/matching/version_sets_merge.go | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go index dac9e9cb3c5..5198515682b 100644 --- a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go @@ -25,14 +25,10 @@ package hybrid_logical_clock import ( - "errors" - clockpb "go.temporal.io/server/api/clock/v1" commonclock "go.temporal.io/server/common/clock" ) -var ErrClocksEqual = errors.New("HybridLogicalClocks are equal") - type Clock = clockpb.HybridLogicalClock // Next generates the next clock timestamp given the current clock. diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 41a4ad9fa32..d34cccadcc3 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -55,6 +55,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" @@ -157,6 +158,7 @@ func newMatchingEngine( config: config, namespaceRegistry: mockNamespaceCache, clusterMeta: cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(false, true)), + timeSource: clock.NewRealTimeSource(), } } diff --git a/service/matching/version_sets_merge.go b/service/matching/version_sets_merge.go index b5aa339ae81..f0856d8f8e2 100644 --- a/service/matching/version_sets_merge.go +++ b/service/matching/version_sets_merge.go @@ -48,7 +48,7 @@ func mergeSetIDs(a []string, b []string) []string { return mergedSetIDs } -// Merge and sort two sets of set IDs +// Check if a set contains any of the given set IDs. func setContainsSetIDs(set *persistencepb.CompatibleVersionSet, ids []string) bool { for _, needle := range ids { for _, id := range set.SetIds {