Skip to content

Commit

Permalink
Use TimeSource interface
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed Apr 21, 2023
1 parent abcac15 commit e40895b
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 49 deletions.
10 changes: 3 additions & 7 deletions common/clock/hybrid_logical_clock/hybrid_logical_clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package hybrid_logical_clock

import (
"errors"
"time"

clockpb "go.temporal.io/server/api/clock/v1"
commonclock "go.temporal.io/server/common/clock"
)

var ErrClocksEqual = errors.New("HybridLogicalClocks are equal")
Expand All @@ -14,12 +14,8 @@ type Clock = clockpb.HybridLogicalClock
// Next generates the next clock timestamp given the current clock.
// HybridLogicalClock requires the previous clock to ensure that time doesn't move backwards and the next clock is
// monotonically increasing.
func Next(clock Clock) Clock {
wallclock := time.Now().UnixMilli()
return next(clock, wallclock)
}

func next(clock Clock, wallclock int64) Clock {
func Next(clock Clock, source commonclock.TimeSource) Clock {
wallclock := source.Now().UnixMilli()
// Ensure time does not move backwards
if wallclock < clock.GetWallClock() {
wallclock = clock.GetWallClock()
Expand Down
10 changes: 8 additions & 2 deletions common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ package hybrid_logical_clock

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
commonclock "go.temporal.io/server/common/clock"
)

func Test_Next_ReturnsGreaterClock(t *testing.T) {
t0 := Zero(1)
timesource := commonclock.NewEventTimeSource()

// Same wallclock
t1 := next(t0, 0)
timesource.Update(time.Unix(0, 0).UTC())
t1 := Next(t0, timesource)
assert.Equal(t, Compare(t0, t1), 1)
// Greater wallclock
t2 := next(t1, 1)
timesource.Update(time.Unix(0, 1).UTC())
t2 := Next(t1, timesource)
assert.Equal(t, Compare(t1, t2), 1)
}

Expand Down
9 changes: 6 additions & 3 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -104,6 +105,7 @@ type (
namespaceRegistry namespace.Registry
keyResolver membership.ServiceResolver
clusterMeta cluster.Metadata
timeSource clock.TimeSource
}
)

Expand Down Expand Up @@ -151,6 +153,7 @@ func NewEngine(
namespaceRegistry: namespaceRegistry,
keyResolver: resolver,
clusterMeta: clusterMeta,
timeSource: clock.NewRealTimeSource(), // No need to mock this at the moment
}
}

Expand Down Expand Up @@ -727,9 +730,9 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
tmp := hlc.Zero(e.clusterMeta.GetClusterID())
clock = &tmp
}

updatedClock, versioningData, err := UpdateVersionSets(
*clock,
updatedClock := hlc.Next(*clock, e.timeSource)
versioningData, err := UpdateVersionSets(
updatedClock,
data.GetVersioningData(),
req.GetRequest(),
e.config.VersionCompatibleSetLimitPerQueue(),
Expand Down
9 changes: 4 additions & 5 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,15 @@ func checkLimits(g *persistence.VersioningData, maxSets, maxBuildIDs int) error
// Deletions are performed by a background process which verifies build IDs are no longer in use and safe to delete (not yet implemented).
//
// Update may fail with FailedPrecondition if it would cause exceeding the supplied limits.
func UpdateVersionSets(clock hlc.Clock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (hlc.Clock, *persistence.VersioningData, error) {
clock = hlc.Next(clock)
func UpdateVersionSets(clock hlc.Clock, data *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (*persistence.VersioningData, error) {
data, err := updateImpl(clock, data, req)
if err != nil {
return clock, nil, err
return nil, err
}
if err := checkLimits(data, maxSets, maxBuildIDs); err != nil {
return clock, nil, err
return nil, err
}
return clock, data, nil
return data, nil
}

//nolint:revive // cyclomatic complexity
Expand Down
69 changes: 37 additions & 32 deletions service/matching/version_sets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
clockpb "go.temporal.io/server/api/clock/v1"
persistencepb "go.temporal.io/server/api/persistence/v1"
commonclock "go.temporal.io/server/common/clock"
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
)

Expand Down Expand Up @@ -98,19 +99,14 @@ func mkPromoteInSet(id string) *workflowservice.UpdateWorkerBuildIdCompatibility
}
}

// Asserts that clock1 is greater than clock2
func assertClockGreater(t *testing.T, clock1 clockpb.HybridLogicalClock, clock2 clockpb.HybridLogicalClock) {
assert.True(t, hlc.Greater(clock1, clock2))
}

func TestNewDefaultUpdate(t *testing.T) {
clock := hlc.Zero(1)
initialData := mkInitialData(2, clock)

req := mkNewDefReq("2")
nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0)
nextClock := hlc.Next(clock, commonclock.NewRealTimeSource())
updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0)
assert.NoError(t, err)
assertClockGreater(t, nextClock, clock)
assert.Equal(t, mkInitialData(2, clock), initialData)

expected := &persistencepb.VersioningData{
Expand Down Expand Up @@ -144,9 +140,9 @@ func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) {
initialData := mkInitialData(0, clock)

req := mkNewDefReq("1")
nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0)
nextClock := hlc.Next(clock, commonclock.NewRealTimeSource())
updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0)
assert.NoError(t, err)
assertClockGreater(t, nextClock, clock)
assert.Equal(t, mkInitialData(0, clock), initialData)

expected := &persistencepb.VersioningData{
Expand All @@ -167,9 +163,9 @@ func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) {
initialData := mkInitialData(2, clock)

req := mkNewCompatReq("1.1", "1", true)
nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0)
nextClock := hlc.Next(clock, commonclock.NewRealTimeSource())
updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0)
assert.NoError(t, err)
assertClockGreater(t, nextClock, clock)
assert.Equal(t, mkInitialData(2, clock), initialData)

expected := &persistencepb.VersioningData{
Expand Down Expand Up @@ -198,9 +194,9 @@ func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) {
initialData := mkInitialData(2, clock)

req := mkNewCompatReq("0.1", "0", true)
nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0)
nextClock := hlc.Next(clock, commonclock.NewRealTimeSource())
updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0)
assert.NoError(t, err)
assertClockGreater(t, nextClock, clock)
assert.Equal(t, mkInitialData(2, clock), initialData)

expected := &persistencepb.VersioningData{
Expand Down Expand Up @@ -229,9 +225,9 @@ func TestNewCompatibleWithVerInOlderSet(t *testing.T) {
initialData := mkInitialData(2, clock)

req := mkNewCompatReq("0.1", "0", false)
nextClock, updatedData, err := UpdateVersionSets(clock, initialData, req, 0, 0)
nextClock := hlc.Next(clock, commonclock.NewRealTimeSource())
updatedData, err := UpdateVersionSets(nextClock, initialData, req, 0, 0)
assert.NoError(t, err)
assertClockGreater(t, nextClock, clock)
assert.Equal(t, mkInitialData(2, clock), initialData)

expected := &persistencepb.VersioningData{
Expand Down Expand Up @@ -263,11 +259,13 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) {
data := mkInitialData(2, clock0)

req := mkNewCompatReq("0.1", "0", false)
clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0)
clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource())
data, err := UpdateVersionSets(clock1, data, req, 0, 0)
assert.NoError(t, err)

req = mkNewCompatReq("0.2", "0.1", false)
clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0)
clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource())
data, err = UpdateVersionSets(clock2, data, req, 0, 0)
assert.NoError(t, err)

expected := &persistencepb.VersioningData{
Expand All @@ -293,7 +291,8 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) {
assert.Equal(t, expected, data)
// Ensure setting a compatible version which targets a non-leaf compat version ends up without a branch
req = mkNewCompatReq("0.3", "0.1", false)
clock3, data, err := UpdateVersionSets(clock2, data, req, 0, 0)
clock3 := hlc.Next(clock1, commonclock.NewRealTimeSource())
data, err = UpdateVersionSets(clock3, data, req, 0, 0)
assert.NoError(t, err)

expected = &persistencepb.VersioningData{
Expand Down Expand Up @@ -325,7 +324,8 @@ func TestCompatibleTargetsNotFound(t *testing.T) {
data := mkInitialData(1, clock)

req := mkNewCompatReq("1.1", "1", false)
_, _, err := UpdateVersionSets(clock, data, req, 0, 0)
nextClock := hlc.Next(clock, commonclock.NewRealTimeSource())
_, err := UpdateVersionSets(nextClock, data, req, 0, 0)
assert.Error(t, err)
assert.IsType(t, &serviceerror.NotFound{}, err)
}
Expand All @@ -335,7 +335,8 @@ func TestMakeExistingSetDefault(t *testing.T) {
data := mkInitialData(3, clock0)

req := mkExistingDefault("1")
clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0)
clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource())
data, err := UpdateVersionSets(clock1, data, req, 0, 0)
assert.NoError(t, err)

expected := &persistencepb.VersioningData{
Expand Down Expand Up @@ -365,7 +366,8 @@ func TestMakeExistingSetDefault(t *testing.T) {
// Add a compatible version to a set and then make that set the default via the compatible version
req = mkNewCompatReq("0.1", "0", true)

clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0)
clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource())
data, err = UpdateVersionSets(clock2, data, req, 0, 0)
assert.NoError(t, err)

expected = &persistencepb.VersioningData{
Expand Down Expand Up @@ -399,11 +401,11 @@ func TestSayVersionIsCompatWithDifferentSetThanItsAlreadyCompatWithNotAllowed(t
data := mkInitialData(3, clock)

req := mkNewCompatReq("0.1", "0", false)
_, data, err := UpdateVersionSets(clock, data, req, 0, 0)
data, err := UpdateVersionSets(clock, data, req, 0, 0)
assert.NoError(t, err)

req = mkNewCompatReq("0.1", "1", false)
_, _, err = UpdateVersionSets(clock, data, req, 0, 0)
_, err = UpdateVersionSets(clock, data, req, 0, 0)
assert.Error(t, err)
assert.IsType(t, &serviceerror.InvalidArgument{}, err)
}
Expand All @@ -414,7 +416,7 @@ func TestLimitsMaxSets(t *testing.T) {
data := mkInitialData(maxSets, clock)

req := mkNewDefReq("10")
_, _, err := UpdateVersionSets(clock, data, req, maxSets, 0)
_, err := UpdateVersionSets(clock, data, req, maxSets, 0)
assert.Error(t, err)
assert.IsType(t, &serviceerror.FailedPrecondition{}, err)
}
Expand All @@ -425,7 +427,7 @@ func TestLimitsMaxBuildIDs(t *testing.T) {
data := mkInitialData(maxBuildIDs, clock)

req := mkNewDefReq("10")
_, _, err := UpdateVersionSets(clock, data, req, 0, maxBuildIDs)
_, err := UpdateVersionSets(clock, data, req, 0, maxBuildIDs)
assert.Error(t, err)
assert.IsType(t, &serviceerror.FailedPrecondition{}, err)
}
Expand All @@ -435,13 +437,16 @@ func TestPromoteWithinVersion(t *testing.T) {
data := mkInitialData(2, clock0)

req := mkNewCompatReq("0.1", "0", false)
clock1, data, err := UpdateVersionSets(clock0, data, req, 0, 0)
clock1 := hlc.Next(clock0, commonclock.NewRealTimeSource())
data, err := UpdateVersionSets(clock1, data, req, 0, 0)
assert.NoError(t, err)
req = mkNewCompatReq("0.2", "0", false)
clock2, data, err := UpdateVersionSets(clock1, data, req, 0, 0)
clock2 := hlc.Next(clock1, commonclock.NewRealTimeSource())
data, err = UpdateVersionSets(clock2, data, req, 0, 0)
assert.NoError(t, err)
req = mkPromoteInSet("0.1")
clock3, data, err := UpdateVersionSets(clock2, data, req, 0, 0)
clock3 := hlc.Next(clock2, commonclock.NewRealTimeSource())
data, err = UpdateVersionSets(clock3, data, req, 0, 0)
assert.NoError(t, err)

expected := &persistencepb.VersioningData{
Expand Down Expand Up @@ -471,7 +476,7 @@ func TestAddAlreadyExtantVersionAsDefaultErrors(t *testing.T) {
data := mkInitialData(3, clock)

req := mkNewDefReq("0")
_, _, err := UpdateVersionSets(clock, data, req, 0, 0)
_, err := UpdateVersionSets(clock, data, req, 0, 0)
assert.Error(t, err)
assert.IsType(t, &serviceerror.InvalidArgument{}, err)
}
Expand All @@ -481,7 +486,7 @@ func TestAddAlreadyExtantVersionToAnotherSetErrors(t *testing.T) {
data := mkInitialData(3, clock)

req := mkNewCompatReq("0", "1", false)
_, _, err := UpdateVersionSets(clock, data, req, 0, 0)
_, err := UpdateVersionSets(clock, data, req, 0, 0)
assert.Error(t, err)
assert.IsType(t, &serviceerror.InvalidArgument{}, err)
}
Expand All @@ -491,7 +496,7 @@ func TestMakeSetDefaultTargetingNonexistentVersionErrors(t *testing.T) {
data := mkInitialData(3, clock)

req := mkExistingDefault("crab boi")
_, _, err := UpdateVersionSets(clock, data, req, 0, 0)
_, err := UpdateVersionSets(clock, data, req, 0, 0)
assert.Error(t, err)
assert.IsType(t, &serviceerror.NotFound{}, err)
}
Expand All @@ -501,7 +506,7 @@ func TestPromoteWithinSetTargetingNonexistentVersionErrors(t *testing.T) {
data := mkInitialData(3, clock)

req := mkPromoteInSet("i'd rather be writing rust ;)")
_, _, err := UpdateVersionSets(clock, data, req, 0, 0)
_, err := UpdateVersionSets(clock, data, req, 0, 0)
assert.Error(t, err)
assert.IsType(t, &serviceerror.NotFound{}, err)
}
Expand Down

0 comments on commit e40895b

Please sign in to comment.