From 372d58dc46e74fe9e367ce285e6d8cdffbddecaa Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 24 Apr 2023 20:21:48 -0700 Subject: [PATCH] Versioning data merge algo and hybrid logical clock utils (#4205) --- .../hybrid_logical_clock.go | 101 +++++++++ .../hybrid_logical_clock_test.go | 85 ++++++++ service/matching/matchingEngine.go | 12 +- service/matching/matchingEngine_test.go | 2 + service/matching/version_sets.go | 39 +--- service/matching/version_sets_merge.go | 201 ++++++++++++++++++ service/matching/version_sets_merge_test.go | 192 +++++++++++++++++ service/matching/version_sets_test.go | 110 +++++----- 8 files changed, 653 insertions(+), 89 deletions(-) create mode 100644 common/clock/hybrid_logical_clock/hybrid_logical_clock.go create mode 100644 common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go create mode 100644 service/matching/version_sets_merge.go create mode 100644 service/matching/version_sets_merge_test.go diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go new file mode 100644 index 00000000000..5198515682b --- /dev/null +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock.go @@ -0,0 +1,101 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package hybrid_logical_clock + +import ( + clockpb "go.temporal.io/server/api/clock/v1" + commonclock "go.temporal.io/server/common/clock" +) + +type Clock = clockpb.HybridLogicalClock + +// Next generates the next clock timestamp given the current clock. +// HybridLogicalClock requires the previous clock to ensure that time doesn't move backwards and the next clock is +// monotonically increasing. +func Next(clock Clock, source commonclock.TimeSource) Clock { + wallclock := source.Now().UnixMilli() + // Ensure time does not move backwards + if wallclock < clock.GetWallClock() { + wallclock = clock.GetWallClock() + } + // Ensure timestamp is monotonically increasing + if wallclock == clock.GetWallClock() { + clock.Version = clock.GetVersion() + 1 + } else { + clock.Version = 0 + clock.WallClock = wallclock + } + + return Clock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId} +} + +// Zero generates a zeroed logical clock for the cluster ID. +func Zero(clusterID int64) Clock { + return Clock{WallClock: 0, Version: 0, ClusterId: clusterID} +} + +func sign[T int64 | int32](x T) int { + if x > 0 { + return 1 + } + if x < 0 { + return -1 + } + return 0 +} + +// Compare 2 clocks, returns 0 if a == b, -1 if a > b, 1 if a < b +func Compare(a Clock, b Clock) int { + if a.WallClock == b.WallClock { + if a.Version == b.Version { + return sign(b.ClusterId - a.ClusterId) + } + return sign(b.Version - a.Version) + } + return sign(b.WallClock - a.WallClock) +} + +// Greater returns true if a is greater than b +func Greater(a Clock, b Clock) bool { + return Compare(b, a) > 0 +} + +// Greater returns true if a is greater than b +func Less(a Clock, b Clock) bool { + return Compare(a, b) > 0 +} + +// Max returns the maximum of two clocks +func Max(a Clock, b Clock) Clock { + if Compare(a, b) > 0 { + return b + } + return a +} + +// Equal returns whether two clocks are equal +func Equal(a Clock, b Clock) bool { + return Compare(a, b) == 0 +} diff --git a/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go new file mode 100644 index 00000000000..b5c84e925ed --- /dev/null +++ b/common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go @@ -0,0 +1,85 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package hybrid_logical_clock + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + commonclock "go.temporal.io/server/common/clock" +) + +func Test_Next_ReturnsGreaterClock(t *testing.T) { + t0 := Zero(1) + timesource := commonclock.NewEventTimeSource() + + // Same wallclock + timesource.Update(time.Unix(0, 0).UTC()) + t1 := Next(t0, timesource) + assert.Equal(t, Compare(t0, t1), 1) + // Greater wallclock + timesource.Update(time.Unix(0, 1).UTC()) + t2 := Next(t1, timesource) + assert.Equal(t, Compare(t1, t2), 1) +} + +func Test_Compare(t *testing.T) { + var t0 Clock + var t1 Clock + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + assert.Equal(t, Compare(t0, t1), 0) + assert.True(t, Equal(t0, t1)) + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 1, Version: 1, ClusterId: 2} + assert.Equal(t, Compare(t0, t1), 1) + // Let's get a -1 in there for sanity + assert.Equal(t, Compare(t1, t0), -1) + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 1, Version: 2, ClusterId: 1} + assert.Equal(t, Compare(t0, t1), 1) + + t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1} + t1 = Clock{WallClock: 2, Version: 1, ClusterId: 1} + assert.Equal(t, Compare(t0, t1), 1) + + assert.True(t, Greater(t1, t0)) + assert.True(t, Less(t0, t1)) +} + +func Test_Max_ReturnsMaximum(t *testing.T) { + t0 := Zero(1) + t1 := Zero(2) + + max := Max(t0, t1) + assert.Equal(t, max, t1) + // Just in case it doesn't work in reverse order... + max = Max(t1, t0) + assert.Equal(t, max, t1) +} diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 8e4c81e1e17..2375692135d 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -45,6 +45,8 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/clock" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -103,6 +105,7 @@ type ( namespaceRegistry namespace.Registry keyResolver membership.ServiceResolver clusterMeta cluster.Metadata + timeSource clock.TimeSource } ) @@ -150,6 +153,7 @@ func NewEngine( namespaceRegistry: namespaceRegistry, keyResolver: resolver, clusterMeta: clusterMeta, + timeSource: clock.NewRealTimeSource(), // No need to mock this at the moment } } @@ -723,12 +727,12 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility( err = tqMgr.UpdateUserData(hCtx.Context, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, error) { clock := data.GetClock() if clock == nil { - tmp := zeroHLC(e.clusterMeta.GetClusterID()) + tmp := hlc.Zero(e.clusterMeta.GetClusterID()) clock = &tmp } - - updatedClock, versioningData, err := UpdateVersionSets( - *clock, + updatedClock := hlc.Next(*clock, e.timeSource) + versioningData, err := UpdateVersionSets( + updatedClock, data.GetVersioningData(), req.GetRequest(), e.config.VersionCompatibleSetLimitPerQueue(), diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 41a4ad9fa32..d34cccadcc3 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -55,6 +55,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" @@ -157,6 +158,7 @@ func newMatchingEngine( config: config, namespaceRegistry: mockNamespaceCache, clusterMeta: cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(false, true)), + timeSource: clock.NewRealTimeSource(), } } diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index 91c4e70d900..e45b0c6ca56 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -27,14 +27,13 @@ package matching import ( "encoding/binary" "fmt" - "time" "github.com/dgryski/go-farm" "go.temporal.io/api/serviceerror" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" - clockpb "go.temporal.io/server/api/clock/v1" "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" ) // ToBuildIdOrderingResponse transforms the internal VersioningData representation to public representation. @@ -117,20 +116,19 @@ func checkLimits(g *persistence.VersioningData, maxSets, maxBuildIDs int) error // Deletions are performed by a background process which verifies build IDs are no longer in use and safe to delete (not yet implemented). // // Update may fail with FailedPrecondition if it would cause exceeding the supplied limits. -func UpdateVersionSets(clock clockpb.HybridLogicalClock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (clockpb.HybridLogicalClock, *persistence.VersioningData, error) { - clock = generateHLCTimestamp(clock) +func UpdateVersionSets(clock hlc.Clock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (*persistence.VersioningData, error) { data, err := updateImpl(clock, data, req) if err != nil { - return clock, nil, err + return nil, err } if err := checkLimits(data, maxSets, maxBuildIDs); err != nil { - return clock, nil, err + return nil, err } - return clock, data, nil + return data, nil } //nolint:revive // cyclomatic complexity -func updateImpl(timestamp clockpb.HybridLogicalClock, existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistence.VersioningData, error) { +func updateImpl(timestamp hlc.Clock, existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistence.VersioningData, error) { // First find if the targeted version is already in the sets targetedVersion := extractTargetedVersion(req) findRes := findVersion(existingData, targetedVersion) @@ -231,7 +229,7 @@ func findVersion(data *persistence.VersioningData, buildID string) findVersionRe } } -func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *clockpb.HybridLogicalClock) { +func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *hlc.Clock) { data.DefaultUpdateTimestamp = timestamp if len(data.VersionSets) <= 1 { return @@ -244,7 +242,7 @@ func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *cloc } } -func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx int, timestamp *clockpb.HybridLogicalClock) { +func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx int, timestamp *hlc.Clock) { data.VersionSets[setIx].DefaultUpdateTimestamp = timestamp setVersions := data.VersionSets[setIx].BuildIds if len(setVersions) <= 1 { @@ -257,24 +255,3 @@ func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx setVersions[len(setVersions)-1] = moveMe } } - -func generateHLCTimestamp(clock clockpb.HybridLogicalClock) clockpb.HybridLogicalClock { - wallclock := time.Now().UnixMilli() - // Ensure time does not move backwards - if wallclock < clock.GetWallClock() { - wallclock = clock.GetWallClock() - } - // Ensure timestamp is monotonically increasing - if wallclock == clock.GetWallClock() { - clock.Version = clock.GetVersion() + 1 - } else { - clock.Version = 0 - clock.WallClock = wallclock - } - - return clockpb.HybridLogicalClock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId} -} - -func zeroHLC(clusterID int64) clockpb.HybridLogicalClock { - return clockpb.HybridLogicalClock{WallClock: 0, Version: 0, ClusterId: clusterID} -} diff --git a/service/matching/version_sets_merge.go b/service/matching/version_sets_merge.go new file mode 100644 index 00000000000..f0856d8f8e2 --- /dev/null +++ b/service/matching/version_sets_merge.go @@ -0,0 +1,201 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "sort" + + persistencepb "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" +) + +// Merge and sort two sets of set IDs +func mergeSetIDs(a []string, b []string) []string { + var mergedSetIDs []string + seenSetIDs := make(map[string]struct{}, len(a)) + mergedSetIDs = append(mergedSetIDs, a...) + for _, setID := range a { + seenSetIDs[setID] = struct{}{} + } + for _, setID := range b { + if _, found := seenSetIDs[setID]; !found { + mergedSetIDs = append(mergedSetIDs, setID) + } + } + sort.Strings(mergedSetIDs) + return mergedSetIDs +} + +// Check if a set contains any of the given set IDs. +func setContainsSetIDs(set *persistencepb.CompatibleVersionSet, ids []string) bool { + for _, needle := range ids { + for _, id := range set.SetIds { + if needle == id { + return true + } + } + } + return false +} + +func findSetWithSetIDs(sets []*persistencepb.CompatibleVersionSet, ids []string) *persistencepb.CompatibleVersionSet { + for _, set := range sets { + if setContainsSetIDs(set, ids) { + return set + } + } + return nil +} + +type buildIDInfo struct { + state persistencepb.BuildID_State + stateUpdateTimestamp hlc.Clock + setIDs []string + madeDefaultAt hlc.Clock +} + +func collectBuildIDInfo(sets []*persistencepb.CompatibleVersionSet) map[string]buildIDInfo { + buildIDToInfo := make(map[string]buildIDInfo, 0) + for _, set := range sets { + lastIdx := len(set.BuildIds) - 1 + for setIdx, buildID := range set.BuildIds { + if info, found := buildIDToInfo[buildID.Id]; found { + // A build ID appears in more than one source, merge its information, and track it + state := info.state + stateUpdateTimestamp := hlc.Max(*buildID.StateUpdateTimestamp, info.stateUpdateTimestamp) + if hlc.Equal(stateUpdateTimestamp, *buildID.StateUpdateTimestamp) { + state = buildID.State + } + madeDefaultAt := info.madeDefaultAt + if setIdx == lastIdx { + madeDefaultAt = hlc.Max(*set.DefaultUpdateTimestamp, madeDefaultAt) + } + + buildIDToInfo[buildID.Id] = buildIDInfo{ + state: state, + stateUpdateTimestamp: stateUpdateTimestamp, + setIDs: mergeSetIDs(info.setIDs, set.SetIds), + madeDefaultAt: madeDefaultAt, + } + } else { + // A build ID was seen for the first time, track it + madeDefaultAt := hlc.Zero(0) + if setIdx == lastIdx { + madeDefaultAt = *set.DefaultUpdateTimestamp + } + buildIDToInfo[buildID.Id] = buildIDInfo{ + state: buildID.State, + stateUpdateTimestamp: *buildID.StateUpdateTimestamp, + setIDs: set.SetIds, + madeDefaultAt: madeDefaultAt, + } + } + } + } + return buildIDToInfo +} + +func intoVersionSets(buildIDToInfo map[string]buildIDInfo, defaultSetIds []string) []*persistencepb.CompatibleVersionSet { + sets := make([]*persistencepb.CompatibleVersionSet, 0) + for id, info := range buildIDToInfo { + set := findSetWithSetIDs(sets, info.setIDs) + if set == nil { + defaultTimestamp := hlc.Zero(0) + set = &persistencepb.CompatibleVersionSet{ + SetIds: info.setIDs, + BuildIds: make([]*persistencepb.BuildID, 0), + DefaultUpdateTimestamp: &defaultTimestamp, + } + sets = append(sets, set) + } else { + set.SetIds = mergeSetIDs(set.SetIds, info.setIDs) + } + timestamp := info.stateUpdateTimestamp + buildID := &persistencepb.BuildID{ + Id: id, + State: info.state, + StateUpdateTimestamp: ×tamp, + } + defaultTimestamp := info.madeDefaultAt + + // Insert the build ID in the right order based on whether it is the default or by its update timestamp + if hlc.Greater(*set.DefaultUpdateTimestamp, defaultTimestamp) { + // Can't be the last element, it's the default already + lastIdx := len(set.BuildIds) - 1 + for idx, curr := range set.BuildIds { + if idx == lastIdx || hlc.Greater(*curr.StateUpdateTimestamp, timestamp) { + // Insert just before + set.BuildIds = append(set.BuildIds[:idx+1], set.BuildIds[idx:]...) + set.BuildIds[idx] = buildID + break + } + } + } else { + set.DefaultUpdateTimestamp = &defaultTimestamp + set.BuildIds = append(set.BuildIds, buildID) + } + } + // Sort the sets based on their default update timestamp, ensuring the default set comes last + sortSets(sets, defaultSetIds) + return sets +} + +func sortSets(sets []*persistencepb.CompatibleVersionSet, defaultSetIds []string) { + sort.Slice(sets, func(i, j int) bool { + si := sets[i] + sj := sets[j] + if setContainsSetIDs(si, defaultSetIds) { + return false + } + if setContainsSetIDs(sj, defaultSetIds) { + return true + } + return hlc.Less(*si.DefaultUpdateTimestamp, *sj.DefaultUpdateTimestamp) + }) +} + +// MergeVersioningData merges two VersioningData structs. +// If a build ID appears in both data structures, the merged structure will include that latest status and timestamp. +// If a build ID appears in different sets in the different structures, those sets will be merged. +// The merged data's per set default and global default will be set according to the latest timestamps in the sources. +func MergeVersioningData(a *persistencepb.VersioningData, b *persistencepb.VersioningData) *persistencepb.VersioningData { + // Collect information about each build ID from both sources + buildIDToInfo := collectBuildIDInfo(append(a.VersionSets, b.VersionSets...)) + + maxDefaultTimestamp := hlc.Max(*b.DefaultUpdateTimestamp, *a.DefaultUpdateTimestamp) + + defaultSetIds := a.VersionSets[len(a.VersionSets)-1].SetIds + if hlc.Equal(maxDefaultTimestamp, *b.DefaultUpdateTimestamp) { + defaultSetIds = b.VersionSets[len(b.VersionSets)-1].SetIds + } + + // Build the merged compatible sets using collected build ID information + sets := intoVersionSets(buildIDToInfo, defaultSetIds) + + return &persistencepb.VersioningData{ + VersionSets: sets, + DefaultUpdateTimestamp: &maxDefaultTimestamp, + } +} diff --git a/service/matching/version_sets_merge_test.go b/service/matching/version_sets_merge_test.go new file mode 100644 index 00000000000..905ce42d592 --- /dev/null +++ b/service/matching/version_sets_merge_test.go @@ -0,0 +1,192 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + persistencepb "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" +) + +func fromWallClock(wallclock int64) *hlc.Clock { + return &hlc.Clock{WallClock: wallclock, Version: 0, ClusterId: 1} +} + +func buildID(wallclock int64, id string, optionalState ...persistencepb.BuildID_State) *persistencepb.BuildID { + state := persistencepb.STATE_ACTIVE + if len(optionalState) == 1 { + state = optionalState[0] + } + + return &persistencepb.BuildID{ + Id: id, + State: state, + StateUpdateTimestamp: fromWallClock(wallclock), + } +} + +func mkBuildIDs(buildIDs ...*persistencepb.BuildID) []*persistencepb.BuildID { + buildIDStructs := make([]*persistencepb.BuildID, len(buildIDs)) + for i, buildID := range buildIDs { + buildIDStructs[i] = &persistencepb.BuildID{ + Id: buildID.Id, + State: persistencepb.STATE_ACTIVE, + StateUpdateTimestamp: buildID.StateUpdateTimestamp, + } + } + return buildIDStructs +} + +func mkSet(setID string, buildIDs ...*persistencepb.BuildID) *persistencepb.CompatibleVersionSet { + return &persistencepb.CompatibleVersionSet{ + SetIds: []string{setID}, + BuildIds: mkBuildIDs(buildIDs...), + DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, + } +} + +func mkSingleSetData(setID string, buildIDs ...*persistencepb.BuildID) *persistencepb.VersioningData { + return &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{mkSet(setID, buildIDs...)}, + DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, + } +} + +func TestSetMerge_IdenticalBuildIDsAndGreaterUpdateTimestamp_SetsMaxUpdateTimestamp(t *testing.T) { + // look here 👇 + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.2")) + b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + assert.Equal(t, a, MergeVersioningData(a, b)) + assert.Equal(t, a, MergeVersioningData(b, a)) +} + +func TestSetMerge_AdditionalBuildIDAndGreaterUpdateTimestamp_MergesBuildIDsAndSetsMaxUpdateTimestamp(t *testing.T) { + a := mkSingleSetData("0.1", buildID(6, "0.1")) + b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + expected := mkSingleSetData("0.1", buildID(3, "0.2"), buildID(6, "0.1")) + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestSetMerge_NewerDefault_PrefersDefaultAndSetsMaxUpdatedAt(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.3")) + b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + expected := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2"), buildID(6, "0.3")) + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestDataMerge_PrefersNewerDefaultAndMergesDefault(t *testing.T) { + a := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.1", buildID(2, "0.1")), + mkSet("1.0", buildID(3, "1.0")), + }, + DefaultUpdateTimestamp: fromWallClock(3), + } + b := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("1.0", buildID(3, "1.0")), + mkSet("0.1", buildID(2, "0.1"), buildID(4, "0.2")), + }, + DefaultUpdateTimestamp: fromWallClock(4), + } + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} + +func TestSetMerge_DifferentSetIDs_MergesSetIDs(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.2")) + b := mkSingleSetData("0.2", buildID(3, "0.2")) + expected := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{{ + SetIds: []string{"0.1", "0.2"}, + BuildIds: mkBuildIDs(buildID(1, "0.1"), buildID(6, "0.2")), + DefaultUpdateTimestamp: fromWallClock(6), + }}, + DefaultUpdateTimestamp: fromWallClock(6), + } + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestSetMerge_DifferentStates_UpdatesTimestampsAndState(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.2", persistencepb.STATE_DELETED), buildID(7, "0.3", persistencepb.STATE_DELETED)) + b := mkSingleSetData("0.1", buildID(3, "0.1", persistencepb.STATE_DELETED), buildID(7, "0.2"), buildID(8, "0.3")) + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} + +func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) { + a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) + b := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.1", buildID(1, "0.1")), + mkSet("0.2", buildID(2, "0.2")), + }, + DefaultUpdateTimestamp: fromWallClock(2), + } + expected := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{{ + SetIds: []string{"0.1", "0.2"}, + BuildIds: mkBuildIDs(buildID(1, "0.1"), buildID(3, "0.2")), + DefaultUpdateTimestamp: fromWallClock(3), + }}, + DefaultUpdateTimestamp: fromWallClock(3), + } + assert.Equal(t, expected, MergeVersioningData(a, b)) + assert.Equal(t, expected, MergeVersioningData(b, a)) +} + +func TestSetMerge_BuildIDPromoted_PreservesSetDefault(t *testing.T) { + a := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) + a.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3) + b := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) + b.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3) + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} + +func TestSetMerge_SetPromoted_PreservesGlobalDefault(t *testing.T) { + a := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.2", buildID(2, "0.2")), + mkSet("0.1", buildID(1, "0.1")), + }, + DefaultUpdateTimestamp: fromWallClock(3), + } + b := &persistencepb.VersioningData{ + VersionSets: []*persistencepb.CompatibleVersionSet{ + mkSet("0.2", buildID(2, "0.2")), + mkSet("0.1", buildID(1, "0.1")), + }, + DefaultUpdateTimestamp: fromWallClock(3), + } + assert.Equal(t, b, MergeVersioningData(a, b)) + assert.Equal(t, b, MergeVersioningData(b, a)) +} diff --git a/service/matching/version_sets_test.go b/service/matching/version_sets_test.go index 5b71c38f0e4..4f621f3583e 100644 --- a/service/matching/version_sets_test.go +++ b/service/matching/version_sets_test.go @@ -35,6 +35,8 @@ import ( "go.temporal.io/api/workflowservice/v1" clockpb "go.temporal.io/server/api/clock/v1" persistencepb "go.temporal.io/server/api/persistence/v1" + commonclock "go.temporal.io/server/common/clock" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" ) func mkNewSet(id string, clock clockpb.HybridLogicalClock) *persistencepb.CompatibleVersionSet { @@ -57,7 +59,7 @@ func mkInitialData(numSets int, clock clockpb.HybridLogicalClock) *persistencepb } func mkUserData(numSets int) *persistencepb.TaskQueueUserData { - clock := zeroHLC(1) + clock := hlc.Zero(1) return &persistencepb.TaskQueueUserData{ Clock: &clock, VersioningData: mkInitialData(numSets, clock), @@ -97,23 +99,14 @@ func mkPromoteInSet(id string) *workflowservice.UpdateWorkerBuildIdCompatibility } } -func assertClockGreater(t *testing.T, clock1 clockpb.HybridLogicalClock, clock2 clockpb.HybridLogicalClock) { - if clock1.Version == clock2.Version { - assert.GreaterOrEqual(t, clock1.WallClock, clock2.WallClock) - } else { - assert.Equal(t, clock1.WallClock, clock2.WallClock) - } - assert.Equal(t, clock1.ClusterId, clock2.ClusterId) -} - func TestNewDefaultUpdate(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewDefReq("2") - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -143,13 +136,13 @@ func TestNewDefaultUpdate(t *testing.T) { } func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(0, clock) req := mkNewDefReq("1") - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(0, clock), initialData) expected := &persistencepb.VersioningData{ @@ -166,13 +159,13 @@ func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) { } func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewCompatReq("1.1", "1", true) - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -197,13 +190,13 @@ func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) { } func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewCompatReq("0.1", "0", true) - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -228,13 +221,13 @@ func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) { } func TestNewCompatibleWithVerInOlderSet(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) initialData := mkInitialData(2, clock) req := mkNewCompatReq("0.1", "0", false) - nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0) assert.NoError(t, err) - assertClockGreater(t, nextClock, clock) assert.Equal(t, mkInitialData(2, clock), initialData) expected := &persistencepb.VersioningData{ @@ -262,15 +255,17 @@ func TestNewCompatibleWithVerInOlderSet(t *testing.T) { } func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { - clock0 := zeroHLC(1) + clock0 := hlc.Zero(1) data := mkInitialData(2, clock0) req := mkNewCompatReq("0.1", "0", false) - clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0) + clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource()) + data, err := UpdateVersionSets(clock1, data, req, 0, 0) assert.NoError(t, err) req = mkNewCompatReq("0.2", "0.1", false) - clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0) + clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock2, data, req, 0, 0) assert.NoError(t, err) expected := &persistencepb.VersioningData{ @@ -296,7 +291,8 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { assert.Equal(t, expected, data) // Ensure setting a compatible version which targets a non-leaf compat version ends up without a branch req = mkNewCompatReq("0.3", "0.1", false) - clock3, data, err := UpdateVersionSets(clock2, data, req, 0, 0) + clock3 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock3, data, req, 0, 0) assert.NoError(t, err) expected = &persistencepb.VersioningData{ @@ -324,21 +320,23 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { } func TestCompatibleTargetsNotFound(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(1, clock) req := mkNewCompatReq("1.1", "1", false) - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + nextClock := hlc.Next(clock, commonclock.NewRealTimeSource()) + _, err := UpdateVersionSets(nextClock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.NotFound{}, err) } func TestMakeExistingSetDefault(t *testing.T) { - clock0 := zeroHLC(1) + clock0 := hlc.Zero(1) data := mkInitialData(3, clock0) req := mkExistingDefault("1") - clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0) + clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource()) + data, err := UpdateVersionSets(clock1, data, req, 0, 0) assert.NoError(t, err) expected := &persistencepb.VersioningData{ @@ -368,7 +366,8 @@ func TestMakeExistingSetDefault(t *testing.T) { // Add a compatible version to a set and then make that set the default via the compatible version req = mkNewCompatReq("0.1", "0", true) - clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0) + clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock2, data, req, 0, 0) assert.NoError(t, err) expected = &persistencepb.VersioningData{ @@ -398,53 +397,56 @@ func TestMakeExistingSetDefault(t *testing.T) { } func TestSayVersionIsCompatWithDifferentSetThanItsAlreadyCompatWithNotAllowed(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkNewCompatReq("0.1", "0", false) - _, data, err := UpdateVersionSets(clock, data, req, 0, 0) + data, err := UpdateVersionSets(clock, data, req, 0, 0) assert.NoError(t, err) req = mkNewCompatReq("0.1", "1", false) - _, _, err = UpdateVersionSets(clock, data, req, 0, 0) + _, err = UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.InvalidArgument{}, err) } func TestLimitsMaxSets(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) maxSets := 10 data := mkInitialData(maxSets, clock) req := mkNewDefReq("10") - _, _, err := UpdateVersionSets(clock, data, req, maxSets, 0) + _, err := UpdateVersionSets(clock, data, req, maxSets, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.FailedPrecondition{}, err) } func TestLimitsMaxBuildIDs(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) maxBuildIDs := 10 data := mkInitialData(maxBuildIDs, clock) req := mkNewDefReq("10") - _, _, err := UpdateVersionSets(clock, data, req, 0, maxBuildIDs) + _, err := UpdateVersionSets(clock, data, req, 0, maxBuildIDs) assert.Error(t, err) assert.IsType(t, &serviceerror.FailedPrecondition{}, err) } func TestPromoteWithinVersion(t *testing.T) { - clock0 := zeroHLC(1) + clock0 := hlc.Zero(1) data := mkInitialData(2, clock0) req := mkNewCompatReq("0.1", "0", false) - clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0) + clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource()) + data, err := UpdateVersionSets(clock1, data, req, 0, 0) assert.NoError(t, err) req = mkNewCompatReq("0.2", "0", false) - clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0) + clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock2, data, req, 0, 0) assert.NoError(t, err) req = mkPromoteInSet("0.1") - clock3, data, err := UpdateVersionSets(clock2, data, req, 0, 0) + clock3 := hlc.Next(clock2, commonclock.NewRealTimeSource()) + data, err = UpdateVersionSets(clock3, data, req, 0, 0) assert.NoError(t, err) expected := &persistencepb.VersioningData{ @@ -470,47 +472,47 @@ func TestPromoteWithinVersion(t *testing.T) { } func TestAddAlreadyExtantVersionAsDefaultErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkNewDefReq("0") - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.InvalidArgument{}, err) } func TestAddAlreadyExtantVersionToAnotherSetErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkNewCompatReq("0", "1", false) - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.InvalidArgument{}, err) } func TestMakeSetDefaultTargetingNonexistentVersionErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkExistingDefault("crab boi") - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.NotFound{}, err) } func TestPromoteWithinSetTargetingNonexistentVersionErrors(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) req := mkPromoteInSet("i'd rather be writing rust ;)") - _, _, err := UpdateVersionSets(clock, data, req, 0, 0) + _, err := UpdateVersionSets(clock, data, req, 0, 0) assert.Error(t, err) assert.IsType(t, &serviceerror.NotFound{}, err) } func TestToBuildIdOrderingResponseTrimsResponse(t *testing.T) { - clock := zeroHLC(1) + clock := hlc.Zero(1) data := mkInitialData(3, clock) actual := ToBuildIdOrderingResponse(data, 2) expected := []*taskqueuepb.CompatibleVersionSet{{BuildIds: []string{"1"}}, {BuildIds: []string{"2"}}}