Skip to content

Commit

Permalink
Add per build id timestamp for when it was last made set default (#4526)
Browse files Browse the repository at this point in the history
* Add per build id timestamp for when it was last made set default

* Add missing timestamp when reviving a build id

* Address PR comments
  • Loading branch information
bergundy authored Jun 22, 2023
1 parent cdf158e commit 5b4e2e3
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 467 deletions.
252 changes: 125 additions & 127 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ message BuildId {
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock state_update_timestamp = 3;
// HLC timestamp representing when this build id was last made default in its version set.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4;
}

// An internal represenation of temporal.api.taskqueue.v1.CompatibleVersionSet
Expand All @@ -50,15 +54,10 @@ message CompatibleVersionSet {
repeated string set_ids = 1;
// All the compatible versions, unordered except for the last element, which is considered the set "default".
repeated BuildId build_ids = 2;
// HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which
// refers to the build ID status.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock default_update_timestamp = 3;
// HLC timestamp representing when this set was last made the default for the queue.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock queue_default_update_timestamp = 4;
temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4;
}

// Holds all the data related to worker versioning for a task queue.
Expand Down
7 changes: 4 additions & 3 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,8 +1592,9 @@ func (e *matchingEngineImpl) reviveBuildId(ns *namespace.Namespace, taskQueue st
tag.WorkflowTaskQueueName(taskQueue),
tag.BuildId(buildId.Id))
return &persistencespb.BuildId{
Id: buildId.GetId(),
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &stamp,
Id: buildId.GetId(),
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &stamp,
BecameDefaultTimestamp: buildId.BecameDefaultTimestamp,
}
}
41 changes: 18 additions & 23 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func gatherBuildIds(data *persistencespb.VersioningData) map[string]struct{} {
return buildIds
}

// RemoveBuildIds removes given buildIds from versioning data.
// Assumes that build ids are safe to remove, ex: a set default is never removed unless it is a single set member and
// that set is not default for the queue.
func RemoveBuildIds(clock hlc.Clock, versioningData *persistencespb.VersioningData, buildIds []string) *persistencespb.VersioningData {
buildIdsMap := make(map[string]struct{}, len(buildIds))
for _, buildId := range buildIds {
Expand All @@ -113,9 +116,10 @@ func RemoveBuildIds(clock hlc.Clock, versioningData *persistencespb.VersioningDa
for buildIdIdx, buildId := range set.BuildIds {
if _, found := buildIdsMap[buildId.Id]; found {
set.BuildIds[buildIdIdx] = &persistencespb.BuildId{
Id: buildId.Id,
State: persistencespb.STATE_DELETED,
StateUpdateTimestamp: &clock,
Id: buildId.Id,
State: persistencespb.STATE_DELETED,
StateUpdateTimestamp: &clock,
BecameDefaultTimestamp: buildId.BecameDefaultTimestamp,
}
}
}
Expand Down Expand Up @@ -158,10 +162,9 @@ func shallowCloneVersioningData(data *persistencespb.VersioningData) *persistenc

func shallowCloneVersionSet(set *persistencespb.CompatibleVersionSet) *persistencespb.CompatibleVersionSet {
clone := &persistencespb.CompatibleVersionSet{
SetIds: set.SetIds,
BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)),
DefaultUpdateTimestamp: set.DefaultUpdateTimestamp,
QueueDefaultUpdateTimestamp: set.QueueDefaultUpdateTimestamp,
SetIds: set.SetIds,
BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)),
BecameDefaultTimestamp: set.BecameDefaultTimestamp,
}
copy(clone.BuildIds, set.BuildIds)
return clone
Expand Down Expand Up @@ -297,21 +300,13 @@ func updateImpl(timestamp hlc.Clock, data *persistencespb.VersioningData, req *w
}
// Merge the sets together, preserving the primary set's default by making it have the most recent timestamp.
primarySet := data.VersionSets[targetSetIdx]
primaryBuildId := primarySet.BuildIds[len(primarySet.BuildIds)-1]
primaryBuildId.BecameDefaultTimestamp = &timestamp
justPrimaryData := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: primarySet.SetIds,
BuildIds: primarySet.BuildIds,
DefaultUpdateTimestamp: &timestamp,
QueueDefaultUpdateTimestamp: primarySet.QueueDefaultUpdateTimestamp,
}},
VersionSets: []*persistencespb.CompatibleVersionSet{primarySet},
}
secondarySet := data.VersionSets[secondarySetIdx]
data.VersionSets[secondarySetIdx] = &persistencespb.CompatibleVersionSet{
SetIds: mergeSetIDs(primarySet.SetIds, secondarySet.SetIds),
BuildIds: secondarySet.BuildIds,
DefaultUpdateTimestamp: secondarySet.DefaultUpdateTimestamp,
QueueDefaultUpdateTimestamp: secondarySet.QueueDefaultUpdateTimestamp,
}
secondarySet.SetIds = mergeSetIDs(primarySet.SetIds, secondarySet.SetIds)
data = MergeVersioningData(justPrimaryData, data)
}

Expand Down Expand Up @@ -349,7 +344,7 @@ func findVersion(data *persistencespb.VersioningData, buildID string) (setIndex,

func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *hlc.Clock) {
set := data.VersionSets[setIx]
set.QueueDefaultUpdateTimestamp = timestamp
set.BecameDefaultTimestamp = timestamp

if setIx < len(data.VersionSets)-1 {
// Move the set to the end and shift all the others down
Expand All @@ -359,16 +354,16 @@ func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *h
}

func makeVersionInSetDefault(data *persistencespb.VersioningData, setIx, versionIx int, timestamp *hlc.Clock) {
data.VersionSets[setIx].DefaultUpdateTimestamp = timestamp
setVersions := data.VersionSets[setIx].BuildIds
buildId := setVersions[versionIx]
buildId.BecameDefaultTimestamp = timestamp
if len(setVersions) <= 1 {
return
}
if versionIx < len(setVersions)-1 {
// Move the build ID to the end and shift all the others down
moveMe := setVersions[versionIx]
copy(setVersions[versionIx:], setVersions[versionIx+1:])
setVersions[len(setVersions)-1] = moveMe
setVersions[len(setVersions)-1] = buildId
}
}

Expand Down
70 changes: 25 additions & 45 deletions service/matching/version_sets_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,39 +80,29 @@ type buildIDInfo struct {
func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string]buildIDInfo {
buildIDToInfo := make(map[string]buildIDInfo, 0)
for _, set := range sets {
lastIdx := len(set.BuildIds) - 1
for setIdx, buildID := range set.BuildIds {
for _, buildID := range set.BuildIds {
if info, found := buildIDToInfo[buildID.Id]; found {
// A build ID appears in more than one source, merge its information, and track it
state := info.state
stateUpdateTimestamp := hlc.Max(*buildID.StateUpdateTimestamp, info.stateUpdateTimestamp)
if hlc.Equal(stateUpdateTimestamp, *buildID.StateUpdateTimestamp) {
state = buildID.State
}
madeDefaultAt := info.madeDefaultAt
if setIdx == lastIdx {
madeDefaultAt = hlc.Max(*set.DefaultUpdateTimestamp, madeDefaultAt)
}

buildIDToInfo[buildID.Id] = buildIDInfo{
state: state,
stateUpdateTimestamp: stateUpdateTimestamp,
setIDs: mergeSetIDs(info.setIDs, set.SetIds),
madeDefaultAt: madeDefaultAt,
setMadeDefaultAt: hlc.Max(*set.QueueDefaultUpdateTimestamp, info.setMadeDefaultAt),
madeDefaultAt: hlc.Max(*buildID.BecameDefaultTimestamp, info.madeDefaultAt),
setMadeDefaultAt: hlc.Max(*set.BecameDefaultTimestamp, info.setMadeDefaultAt),
}
} else {
// A build ID was seen for the first time, track it
madeDefaultAt := hlc.Zero(0)
if setIdx == lastIdx {
madeDefaultAt = *set.DefaultUpdateTimestamp
}
buildIDToInfo[buildID.Id] = buildIDInfo{
state: buildID.State,
stateUpdateTimestamp: *buildID.StateUpdateTimestamp,
setIDs: set.SetIds,
madeDefaultAt: madeDefaultAt,
setMadeDefaultAt: *set.QueueDefaultUpdateTimestamp,
madeDefaultAt: *buildID.BecameDefaultTimestamp,
setMadeDefaultAt: *set.BecameDefaultTimestamp,
}
}
}
Expand All @@ -123,54 +113,44 @@ func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string]
func intoVersionSets(buildIDToInfo map[string]buildIDInfo) []*persistencespb.CompatibleVersionSet {
sets := make([]*persistencespb.CompatibleVersionSet, 0)
for id, info := range buildIDToInfo {
info := info
set := findSetWithSetIDs(sets, info.setIDs)
if set == nil {
set = &persistencespb.CompatibleVersionSet{
SetIds: info.setIDs,
BuildIds: make([]*persistencespb.BuildId, 0),
DefaultUpdateTimestamp: hlc.Ptr(hlc.Zero(0)),
QueueDefaultUpdateTimestamp: hlc.Ptr(hlc.Zero(0)),
SetIds: info.setIDs,
BuildIds: make([]*persistencespb.BuildId, 0),
BecameDefaultTimestamp: &info.setMadeDefaultAt,
}
sets = append(sets, set)
} else {
set.SetIds = mergeSetIDs(set.SetIds, info.setIDs)
set.BecameDefaultTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.BecameDefaultTimestamp))
}
timestamp := info.stateUpdateTimestamp
buildID := &persistencespb.BuildId{
Id: id,
State: info.state,
StateUpdateTimestamp: &timestamp,
}
defaultTimestamp := info.madeDefaultAt
set.QueueDefaultUpdateTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.QueueDefaultUpdateTimestamp))

// Insert the build ID in the right order based on whether it is the default or by its update timestamp
if hlc.Greater(*set.DefaultUpdateTimestamp, defaultTimestamp) {
// Can't be the last element, it's the default already
lastIdx := len(set.BuildIds) - 1
for idx, curr := range set.BuildIds {
if idx == lastIdx || hlc.Greater(*curr.StateUpdateTimestamp, timestamp) {
// Insert just before
set.BuildIds = append(set.BuildIds[:idx+1], set.BuildIds[idx:]...)
set.BuildIds[idx] = buildID
break
}
}
} else {
set.DefaultUpdateTimestamp = &defaultTimestamp
set.BuildIds = append(set.BuildIds, buildID)
Id: id,
State: info.state,
StateUpdateTimestamp: &info.stateUpdateTimestamp,
BecameDefaultTimestamp: &info.madeDefaultAt,
}
set.BuildIds = append(set.BuildIds, buildID)
}
// Sort the sets based on their default update timestamp, ensuring the default set comes last
sortSets(sets)
for _, set := range sets {
sortBuildIds(set.BuildIds)
}
return sets
}

func sortSets(sets []*persistencespb.CompatibleVersionSet) {
sort.Slice(sets, func(i, j int) bool {
si := sets[i]
sj := sets[j]
return hlc.Less(*si.QueueDefaultUpdateTimestamp, *sj.QueueDefaultUpdateTimestamp)
return hlc.Less(*sets[i].BecameDefaultTimestamp, *sets[j].BecameDefaultTimestamp)
})
}

func sortBuildIds(buildIds []*persistencespb.BuildId) {
sort.Slice(buildIds, func(i, j int) bool {
return hlc.Less(*buildIds[i].BecameDefaultTimestamp, *buildIds[j].BecameDefaultTimestamp)
})
}

Expand Down
46 changes: 16 additions & 30 deletions service/matching/version_sets_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,18 @@ func buildID(wallclock int64, id string, optionalState ...persistencespb.BuildId
}

return &persistencespb.BuildId{
Id: id,
State: state,
StateUpdateTimestamp: fromWallClock(wallclock),
Id: id,
State: state,
StateUpdateTimestamp: fromWallClock(wallclock),
BecameDefaultTimestamp: fromWallClock(wallclock),
}
}

func mkBuildIds(buildIDs ...*persistencespb.BuildId) []*persistencespb.BuildId {
buildIDStructs := make([]*persistencespb.BuildId, len(buildIDs))
for i, buildID := range buildIDs {
buildIDStructs[i] = &persistencespb.BuildId{
Id: buildID.Id,
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: buildID.StateUpdateTimestamp,
}
}
return buildIDStructs
}

func mkSet(setID string, buildIDs ...*persistencespb.BuildId) *persistencespb.CompatibleVersionSet {
return &persistencespb.CompatibleVersionSet{
SetIds: []string{setID},
BuildIds: mkBuildIds(buildIDs...),
DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp,
QueueDefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp,
SetIds: []string{setID},
BuildIds: buildIDs,
BecameDefaultTimestamp: buildIDs[len(buildIDs)-1].BecameDefaultTimestamp,
}
}

Expand Down Expand Up @@ -131,10 +119,9 @@ func TestSetMerge_DifferentSetIDs_MergesSetIDs(t *testing.T) {
b := mkSingleSetData("0.2", buildID(3, "0.2"))
expected := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: []string{"0.1", "0.2"},
BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(6, "0.2")),
DefaultUpdateTimestamp: fromWallClock(6),
QueueDefaultUpdateTimestamp: fromWallClock(6),
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(6, "0.2")},
BecameDefaultTimestamp: fromWallClock(6),
}},
}
assert.Equal(t, expected, MergeVersioningData(a, b))
Expand All @@ -158,10 +145,9 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) {
}
expected := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: []string{"0.1", "0.2"},
BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(3, "0.2")),
DefaultUpdateTimestamp: fromWallClock(3),
QueueDefaultUpdateTimestamp: fromWallClock(3),
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(3, "0.2")},
BecameDefaultTimestamp: fromWallClock(3),
}},
}
assert.Equal(t, expected, MergeVersioningData(a, b))
Expand All @@ -170,16 +156,16 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) {

func TestSetMerge_BuildIdPromoted_PreservesSetDefault(t *testing.T) {
a := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2"))
a.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3)
a.VersionSets[0].BuildIds[len(a.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3)
b := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2"))
b.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3)
b.VersionSets[0].BuildIds[len(b.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3)
assert.Equal(t, b, MergeVersioningData(a, b))
assert.Equal(t, b, MergeVersioningData(b, a))
}

func TestSetMerge_SetPromoted_PreservesGlobalDefault(t *testing.T) {
set01 := mkSet("0.1", buildID(1, "0.1"))
set01.QueueDefaultUpdateTimestamp = fromWallClock(3)
set01.BecameDefaultTimestamp = fromWallClock(3)
a := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{
mkSet("0.2", buildID(2, "0.2")),
Expand Down
Loading

0 comments on commit 5b4e2e3

Please sign in to comment.