Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed Jun 22, 2023
1 parent e86ee19 commit 53c35a9
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 324 deletions.
137 changes: 68 additions & 69 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ message BuildId {
// HLC timestamp representing when this build id was last made default in its version set.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock set_default_update_timestamp = 4;
temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4;
}

// An internal represenation of temporal.api.taskqueue.v1.CompatibleVersionSet
Expand All @@ -57,7 +57,7 @@ message CompatibleVersionSet {
// HLC timestamp representing when this set was last made the default for the queue.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock queue_default_update_timestamp = 4;
temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4;
}

// Holds all the data related to worker versioning for a task queue.
Expand Down
8 changes: 4 additions & 4 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,9 +1592,9 @@ func (e *matchingEngineImpl) reviveBuildId(ns *namespace.Namespace, taskQueue st
tag.WorkflowTaskQueueName(taskQueue),
tag.BuildId(buildId.Id))
return &persistencespb.BuildId{
Id: buildId.GetId(),
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &stamp,
SetDefaultUpdateTimestamp: buildId.SetDefaultUpdateTimestamp,
Id: buildId.GetId(),
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &stamp,
BecameDefaultTimestamp: buildId.BecameDefaultTimestamp,
}
}
20 changes: 10 additions & 10 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func RemoveBuildIds(clock hlc.Clock, versioningData *persistencespb.VersioningDa
for buildIdIdx, buildId := range set.BuildIds {
if _, found := buildIdsMap[buildId.Id]; found {
set.BuildIds[buildIdIdx] = &persistencespb.BuildId{
Id: buildId.Id,
State: persistencespb.STATE_DELETED,
StateUpdateTimestamp: &clock,
SetDefaultUpdateTimestamp: buildId.SetDefaultUpdateTimestamp,
Id: buildId.Id,
State: persistencespb.STATE_DELETED,
StateUpdateTimestamp: &clock,
BecameDefaultTimestamp: buildId.BecameDefaultTimestamp,
}
}
}
Expand Down Expand Up @@ -162,9 +162,9 @@ func shallowCloneVersioningData(data *persistencespb.VersioningData) *persistenc

func shallowCloneVersionSet(set *persistencespb.CompatibleVersionSet) *persistencespb.CompatibleVersionSet {
clone := &persistencespb.CompatibleVersionSet{
SetIds: set.SetIds,
BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)),
QueueDefaultUpdateTimestamp: set.QueueDefaultUpdateTimestamp,
SetIds: set.SetIds,
BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)),
BecameDefaultTimestamp: set.BecameDefaultTimestamp,
}
copy(clone.BuildIds, set.BuildIds)
return clone
Expand Down Expand Up @@ -301,7 +301,7 @@ func updateImpl(timestamp hlc.Clock, data *persistencespb.VersioningData, req *w
// Merge the sets together, preserving the primary set's default by making it have the most recent timestamp.
primarySet := data.VersionSets[targetSetIdx]
primaryBuildId := primarySet.BuildIds[len(primarySet.BuildIds)-1]
primaryBuildId.SetDefaultUpdateTimestamp = &timestamp
primaryBuildId.BecameDefaultTimestamp = &timestamp
justPrimaryData := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{primarySet},
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func findVersion(data *persistencespb.VersioningData, buildID string) (setIndex,

func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *hlc.Clock) {
set := data.VersionSets[setIx]
set.QueueDefaultUpdateTimestamp = timestamp
set.BecameDefaultTimestamp = timestamp

if setIx < len(data.VersionSets)-1 {
// Move the set to the end and shift all the others down
Expand All @@ -356,7 +356,7 @@ func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *h
func makeVersionInSetDefault(data *persistencespb.VersioningData, setIx, versionIx int, timestamp *hlc.Clock) {
setVersions := data.VersionSets[setIx].BuildIds
buildId := setVersions[versionIx]
buildId.SetDefaultUpdateTimestamp = timestamp
buildId.BecameDefaultTimestamp = timestamp
if len(setVersions) <= 1 {
return
}
Expand Down
28 changes: 14 additions & 14 deletions service/matching/version_sets_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string]
state: state,
stateUpdateTimestamp: stateUpdateTimestamp,
setIDs: mergeSetIDs(info.setIDs, set.SetIds),
madeDefaultAt: hlc.Max(*buildID.SetDefaultUpdateTimestamp, info.madeDefaultAt),
setMadeDefaultAt: hlc.Max(*set.QueueDefaultUpdateTimestamp, info.setMadeDefaultAt),
madeDefaultAt: hlc.Max(*buildID.BecameDefaultTimestamp, info.madeDefaultAt),
setMadeDefaultAt: hlc.Max(*set.BecameDefaultTimestamp, info.setMadeDefaultAt),
}
} else {
// A build ID was seen for the first time, track it
buildIDToInfo[buildID.Id] = buildIDInfo{
state: buildID.State,
stateUpdateTimestamp: *buildID.StateUpdateTimestamp,
setIDs: set.SetIds,
madeDefaultAt: *buildID.SetDefaultUpdateTimestamp,
setMadeDefaultAt: *set.QueueDefaultUpdateTimestamp,
madeDefaultAt: *buildID.BecameDefaultTimestamp,
setMadeDefaultAt: *set.BecameDefaultTimestamp,
}
}
}
Expand All @@ -117,20 +117,20 @@ func intoVersionSets(buildIDToInfo map[string]buildIDInfo) []*persistencespb.Com
set := findSetWithSetIDs(sets, info.setIDs)
if set == nil {
set = &persistencespb.CompatibleVersionSet{
SetIds: info.setIDs,
BuildIds: make([]*persistencespb.BuildId, 0),
QueueDefaultUpdateTimestamp: &info.setMadeDefaultAt,
SetIds: info.setIDs,
BuildIds: make([]*persistencespb.BuildId, 0),
BecameDefaultTimestamp: &info.setMadeDefaultAt,
}
sets = append(sets, set)
} else {
set.SetIds = mergeSetIDs(set.SetIds, info.setIDs)
set.QueueDefaultUpdateTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.QueueDefaultUpdateTimestamp))
set.BecameDefaultTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.BecameDefaultTimestamp))
}
buildID := &persistencespb.BuildId{
Id: id,
State: info.state,
StateUpdateTimestamp: &info.stateUpdateTimestamp,
SetDefaultUpdateTimestamp: &info.madeDefaultAt,
Id: id,
State: info.state,
StateUpdateTimestamp: &info.stateUpdateTimestamp,
BecameDefaultTimestamp: &info.madeDefaultAt,
}
set.BuildIds = append(set.BuildIds, buildID)
}
Expand All @@ -144,13 +144,13 @@ func intoVersionSets(buildIDToInfo map[string]buildIDInfo) []*persistencespb.Com

func sortSets(sets []*persistencespb.CompatibleVersionSet) {
sort.Slice(sets, func(i, j int) bool {
return hlc.Less(*sets[i].QueueDefaultUpdateTimestamp, *sets[j].QueueDefaultUpdateTimestamp)
return hlc.Less(*sets[i].BecameDefaultTimestamp, *sets[j].BecameDefaultTimestamp)
})
}

func sortBuildIds(buildIds []*persistencespb.BuildId) {
sort.Slice(buildIds, func(i, j int) bool {
return hlc.Less(*buildIds[i].SetDefaultUpdateTimestamp, *buildIds[j].SetDefaultUpdateTimestamp)
return hlc.Less(*buildIds[i].BecameDefaultTimestamp, *buildIds[j].BecameDefaultTimestamp)
})
}

Expand Down
32 changes: 16 additions & 16 deletions service/matching/version_sets_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ func buildID(wallclock int64, id string, optionalState ...persistencespb.BuildId
}

return &persistencespb.BuildId{
Id: id,
State: state,
StateUpdateTimestamp: fromWallClock(wallclock),
SetDefaultUpdateTimestamp: fromWallClock(wallclock),
Id: id,
State: state,
StateUpdateTimestamp: fromWallClock(wallclock),
BecameDefaultTimestamp: fromWallClock(wallclock),
}
}

func mkSet(setID string, buildIDs ...*persistencespb.BuildId) *persistencespb.CompatibleVersionSet {
return &persistencespb.CompatibleVersionSet{
SetIds: []string{setID},
BuildIds: buildIDs,
QueueDefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].SetDefaultUpdateTimestamp,
SetIds: []string{setID},
BuildIds: buildIDs,
BecameDefaultTimestamp: buildIDs[len(buildIDs)-1].BecameDefaultTimestamp,
}
}

Expand Down Expand Up @@ -119,9 +119,9 @@ func TestSetMerge_DifferentSetIDs_MergesSetIDs(t *testing.T) {
b := mkSingleSetData("0.2", buildID(3, "0.2"))
expected := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(6, "0.2")},
QueueDefaultUpdateTimestamp: fromWallClock(6),
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(6, "0.2")},
BecameDefaultTimestamp: fromWallClock(6),
}},
}
assert.Equal(t, expected, MergeVersioningData(a, b))
Expand All @@ -145,9 +145,9 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) {
}
expected := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(3, "0.2")},
QueueDefaultUpdateTimestamp: fromWallClock(3),
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(3, "0.2")},
BecameDefaultTimestamp: fromWallClock(3),
}},
}
assert.Equal(t, expected, MergeVersioningData(a, b))
Expand All @@ -156,16 +156,16 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) {

func TestSetMerge_BuildIdPromoted_PreservesSetDefault(t *testing.T) {
a := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2"))
a.VersionSets[0].BuildIds[len(a.VersionSets[0].BuildIds)-1].SetDefaultUpdateTimestamp = fromWallClock(3)
a.VersionSets[0].BuildIds[len(a.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3)
b := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2"))
b.VersionSets[0].BuildIds[len(b.VersionSets[0].BuildIds)-1].SetDefaultUpdateTimestamp = fromWallClock(3)
b.VersionSets[0].BuildIds[len(b.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3)
assert.Equal(t, b, MergeVersioningData(a, b))
assert.Equal(t, b, MergeVersioningData(b, a))
}

func TestSetMerge_SetPromoted_PreservesGlobalDefault(t *testing.T) {
set01 := mkSet("0.1", buildID(1, "0.1"))
set01.QueueDefaultUpdateTimestamp = fromWallClock(3)
set01.BecameDefaultTimestamp = fromWallClock(3)
a := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{
mkSet("0.2", buildID(2, "0.2")),
Expand Down
Loading

0 comments on commit 53c35a9

Please sign in to comment.