Skip to content

Commit

Permalink
Versioning data merge algo and hybrid logical clock utils (temporalio…
Browse files Browse the repository at this point in the history
…#4205)

Note: This commit came from a feature branch and is not expected to build.
  • Loading branch information
bergundy authored and dnr committed May 26, 2023
1 parent 5cc12ef commit b4ef426
Show file tree
Hide file tree
Showing 8 changed files with 653 additions and 89 deletions.
101 changes: 101 additions & 0 deletions common/clock/hybrid_logical_clock/hybrid_logical_clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package hybrid_logical_clock

import (
clockpb "go.temporal.io/server/api/clock/v1"
commonclock "go.temporal.io/server/common/clock"
)

type Clock = clockpb.HybridLogicalClock

// Next generates the next clock timestamp given the current clock.
// HybridLogicalClock requires the previous clock to ensure that time doesn't move backwards and the next clock is
// monotonically increasing.
func Next(clock Clock, source commonclock.TimeSource) Clock {
wallclock := source.Now().UnixMilli()
// Ensure time does not move backwards
if wallclock < clock.GetWallClock() {
wallclock = clock.GetWallClock()
}
// Ensure timestamp is monotonically increasing
if wallclock == clock.GetWallClock() {
clock.Version = clock.GetVersion() + 1
} else {
clock.Version = 0
clock.WallClock = wallclock
}

return Clock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId}
}

// Zero generates a zeroed logical clock for the cluster ID.
func Zero(clusterID int64) Clock {
return Clock{WallClock: 0, Version: 0, ClusterId: clusterID}
}

func sign[T int64 | int32](x T) int {
if x > 0 {
return 1
}
if x < 0 {
return -1
}
return 0
}

// Compare 2 clocks, returns 0 if a == b, -1 if a > b, 1 if a < b
func Compare(a Clock, b Clock) int {
if a.WallClock == b.WallClock {
if a.Version == b.Version {
return sign(b.ClusterId - a.ClusterId)
}
return sign(b.Version - a.Version)
}
return sign(b.WallClock - a.WallClock)
}

// Greater returns true if a is greater than b
func Greater(a Clock, b Clock) bool {
return Compare(b, a) > 0
}

// Greater returns true if a is greater than b
func Less(a Clock, b Clock) bool {
return Compare(a, b) > 0
}

// Max returns the maximum of two clocks
func Max(a Clock, b Clock) Clock {
if Compare(a, b) > 0 {
return b
}
return a
}

// Equal returns whether two clocks are equal
func Equal(a Clock, b Clock) bool {
return Compare(a, b) == 0
}
85 changes: 85 additions & 0 deletions common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package hybrid_logical_clock

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
commonclock "go.temporal.io/server/common/clock"
)

func Test_Next_ReturnsGreaterClock(t *testing.T) {
t0 := Zero(1)
timesource := commonclock.NewEventTimeSource()

// Same wallclock
timesource.Update(time.Unix(0, 0).UTC())
t1 := Next(t0, timesource)
assert.Equal(t, Compare(t0, t1), 1)
// Greater wallclock
timesource.Update(time.Unix(0, 1).UTC())
t2 := Next(t1, timesource)
assert.Equal(t, Compare(t1, t2), 1)
}

func Test_Compare(t *testing.T) {
var t0 Clock
var t1 Clock

t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1}
t1 = Clock{WallClock: 1, Version: 1, ClusterId: 1}
assert.Equal(t, Compare(t0, t1), 0)
assert.True(t, Equal(t0, t1))

t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1}
t1 = Clock{WallClock: 1, Version: 1, ClusterId: 2}
assert.Equal(t, Compare(t0, t1), 1)
// Let's get a -1 in there for sanity
assert.Equal(t, Compare(t1, t0), -1)

t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1}
t1 = Clock{WallClock: 1, Version: 2, ClusterId: 1}
assert.Equal(t, Compare(t0, t1), 1)

t0 = Clock{WallClock: 1, Version: 1, ClusterId: 1}
t1 = Clock{WallClock: 2, Version: 1, ClusterId: 1}
assert.Equal(t, Compare(t0, t1), 1)

assert.True(t, Greater(t1, t0))
assert.True(t, Less(t0, t1))
}

func Test_Max_ReturnsMaximum(t *testing.T) {
t0 := Zero(1)
t1 := Zero(2)

max := Max(t0, t1)
assert.Equal(t, max, t1)
// Just in case it doesn't work in reverse order...
max = Max(t1, t0)
assert.Equal(t, max, t1)
}
12 changes: 8 additions & 4 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -103,6 +105,7 @@ type (
namespaceRegistry namespace.Registry
keyResolver membership.ServiceResolver
clusterMeta cluster.Metadata
timeSource clock.TimeSource
}
)

Expand Down Expand Up @@ -150,6 +153,7 @@ func NewEngine(
namespaceRegistry: namespaceRegistry,
keyResolver: resolver,
clusterMeta: clusterMeta,
timeSource: clock.NewRealTimeSource(), // No need to mock this at the moment
}
}

Expand Down Expand Up @@ -726,12 +730,12 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
err = tqMgr.UpdateUserData(ctx, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, error) {
clock := data.GetClock()
if clock == nil {
tmp := zeroHLC(e.clusterMeta.GetClusterID())
tmp := hlc.Zero(e.clusterMeta.GetClusterID())
clock = &tmp
}

updatedClock, versioningData, err := UpdateVersionSets(
*clock,
updatedClock := hlc.Next(*clock, e.timeSource)
versioningData, err := UpdateVersionSets(
updatedClock,
data.GetVersioningData(),
req.GetRequest(),
e.config.VersionCompatibleSetLimitPerQueue(),
Expand Down
2 changes: 2 additions & 0 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -148,6 +149,7 @@ func newMatchingEngine(
config: config,
namespaceRegistry: mockNamespaceCache,
clusterMeta: cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(false, true)),
timeSource: clock.NewRealTimeSource(),
}
}

Expand Down
39 changes: 8 additions & 31 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ package matching
import (
"encoding/binary"
"fmt"
"time"

"github.com/dgryski/go-farm"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
clockpb "go.temporal.io/server/api/clock/v1"
"go.temporal.io/server/api/persistence/v1"
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
)

// ToBuildIdOrderingResponse transforms the internal VersioningData representation to public representation.
Expand Down Expand Up @@ -117,20 +116,19 @@ func checkLimits(g *persistence.VersioningData, maxSets, maxBuildIDs int) error
// Deletions are performed by a background process which verifies build IDs are no longer in use and safe to delete (not yet implemented).
//
// Update may fail with FailedPrecondition if it would cause exceeding the supplied limits.
func UpdateVersionSets(clock clockpb.HybridLogicalClock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (clockpb.HybridLogicalClock, *persistence.VersioningData, error) {
clock = generateHLCTimestamp(clock)
func UpdateVersionSets(clock hlc.Clock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (*persistence.VersioningData, error) {
data, err := updateImpl(clock, data, req)
if err != nil {
return clock, nil, err
return nil, err
}
if err := checkLimits(data, maxSets, maxBuildIDs); err != nil {
return clock, nil, err
return nil, err
}
return clock, data, nil
return data, nil
}

//nolint:revive // cyclomatic complexity
func updateImpl(timestamp clockpb.HybridLogicalClock, existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistence.VersioningData, error) {
func updateImpl(timestamp hlc.Clock, existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistence.VersioningData, error) {
// First find if the targeted version is already in the sets
targetedVersion := extractTargetedVersion(req)
findRes := findVersion(existingData, targetedVersion)
Expand Down Expand Up @@ -231,7 +229,7 @@ func findVersion(data *persistence.VersioningData, buildID string) findVersionRe
}
}

func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *clockpb.HybridLogicalClock) {
func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *hlc.Clock) {
data.DefaultUpdateTimestamp = timestamp
if len(data.VersionSets) <= 1 {
return
Expand All @@ -244,7 +242,7 @@ func makeDefaultSet(data *persistence.VersioningData, setIx int, timestamp *cloc
}
}

func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx int, timestamp *clockpb.HybridLogicalClock) {
func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx int, timestamp *hlc.Clock) {
data.VersionSets[setIx].DefaultUpdateTimestamp = timestamp
setVersions := data.VersionSets[setIx].BuildIds
if len(setVersions) <= 1 {
Expand All @@ -257,24 +255,3 @@ func makeVersionInSetDefault(data *persistence.VersioningData, setIx, versionIx
setVersions[len(setVersions)-1] = moveMe
}
}

func generateHLCTimestamp(clock clockpb.HybridLogicalClock) clockpb.HybridLogicalClock {
wallclock := time.Now().UnixMilli()
// Ensure time does not move backwards
if wallclock < clock.GetWallClock() {
wallclock = clock.GetWallClock()
}
// Ensure timestamp is monotonically increasing
if wallclock == clock.GetWallClock() {
clock.Version = clock.GetVersion() + 1
} else {
clock.Version = 0
clock.WallClock = wallclock
}

return clockpb.HybridLogicalClock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId}
}

func zeroHLC(clusterID int64) clockpb.HybridLogicalClock {
return clockpb.HybridLogicalClock{WallClock: 0, Version: 0, ClusterId: clusterID}
}
Loading

0 comments on commit b4ef426

Please sign in to comment.