From 5b4e2e3170e88f1dbce2f5b9403d3168d49751bf Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Wed, 21 Jun 2023 17:55:30 -0700 Subject: [PATCH] Add per build id timestamp for when it was last made set default (#4526) * Add per build id timestamp for when it was last made set default * Add missing timestamp when reviving a build id * Address PR comments --- api/persistence/v1/task_queues.pb.go | 252 +++++++------ .../api/persistence/v1/task_queues.proto | 11 +- service/matching/matchingEngine.go | 7 +- service/matching/version_sets.go | 41 +-- service/matching/version_sets_merge.go | 70 ++-- service/matching/version_sets_merge_test.go | 46 +-- service/matching/version_sets_test.go | 334 +++++++----------- .../scanner/build_ids/scavenger_test.go | 76 ++-- 8 files changed, 370 insertions(+), 467 deletions(-) diff --git a/api/persistence/v1/task_queues.pb.go b/api/persistence/v1/task_queues.pb.go index d8a5982374d..adf7044ec7d 100644 --- a/api/persistence/v1/task_queues.pb.go +++ b/api/persistence/v1/task_queues.pb.go @@ -83,6 +83,10 @@ type BuildId struct { // (-- api-linter: core::0142::time-field-type=disabled // aip.dev/not-precedent: Using HLC instead of wall clock. --) StateUpdateTimestamp *v1.HybridLogicalClock `protobuf:"bytes,3,opt,name=state_update_timestamp,json=stateUpdateTimestamp,proto3" json:"state_update_timestamp,omitempty"` + // HLC timestamp representing when this build id was last made default in its version set. + // (-- api-linter: core::0142::time-field-type=disabled + // aip.dev/not-precedent: Using HLC instead of wall clock. --) + BecameDefaultTimestamp *v1.HybridLogicalClock `protobuf:"bytes,4,opt,name=became_default_timestamp,json=becameDefaultTimestamp,proto3" json:"became_default_timestamp,omitempty"` } func (m *BuildId) Reset() { *m = BuildId{} } @@ -138,6 +142,13 @@ func (m *BuildId) GetStateUpdateTimestamp() *v1.HybridLogicalClock { return nil } +func (m *BuildId) GetBecameDefaultTimestamp() *v1.HybridLogicalClock { + if m != nil { + return m.BecameDefaultTimestamp + } + return nil +} + // An internal represenation of temporal.api.taskqueue.v1.CompatibleVersionSet type CompatibleVersionSet struct { // Set IDs are used internally by matching. @@ -147,15 +158,10 @@ type CompatibleVersionSet struct { SetIds []string `protobuf:"bytes,1,rep,name=set_ids,json=setIds,proto3" json:"set_ids,omitempty"` // All the compatible versions, unordered except for the last element, which is considered the set "default". BuildIds []*BuildId `protobuf:"bytes,2,rep,name=build_ids,json=buildIds,proto3" json:"build_ids,omitempty"` - // HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which - // refers to the build ID status. - // (-- api-linter: core::0142::time-field-type=disabled - // aip.dev/not-precedent: Using HLC instead of wall clock. --) - DefaultUpdateTimestamp *v1.HybridLogicalClock `protobuf:"bytes,3,opt,name=default_update_timestamp,json=defaultUpdateTimestamp,proto3" json:"default_update_timestamp,omitempty"` // HLC timestamp representing when this set was last made the default for the queue. // (-- api-linter: core::0142::time-field-type=disabled // aip.dev/not-precedent: Using HLC instead of wall clock. --) - QueueDefaultUpdateTimestamp *v1.HybridLogicalClock `protobuf:"bytes,4,opt,name=queue_default_update_timestamp,json=queueDefaultUpdateTimestamp,proto3" json:"queue_default_update_timestamp,omitempty"` + BecameDefaultTimestamp *v1.HybridLogicalClock `protobuf:"bytes,4,opt,name=became_default_timestamp,json=becameDefaultTimestamp,proto3" json:"became_default_timestamp,omitempty"` } func (m *CompatibleVersionSet) Reset() { *m = CompatibleVersionSet{} } @@ -204,16 +210,9 @@ func (m *CompatibleVersionSet) GetBuildIds() []*BuildId { return nil } -func (m *CompatibleVersionSet) GetDefaultUpdateTimestamp() *v1.HybridLogicalClock { - if m != nil { - return m.DefaultUpdateTimestamp - } - return nil -} - -func (m *CompatibleVersionSet) GetQueueDefaultUpdateTimestamp() *v1.HybridLogicalClock { +func (m *CompatibleVersionSet) GetBecameDefaultTimestamp() *v1.HybridLogicalClock { if m != nil { - return m.QueueDefaultUpdateTimestamp + return m.BecameDefaultTimestamp } return nil } @@ -391,45 +390,44 @@ func init() { } var fileDescriptor_0cb9a0f256d1327d = []byte{ - // 593 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0x4f, 0x6f, 0xd3, 0x30, - 0x18, 0xc6, 0xe3, 0x74, 0x7f, 0x98, 0x3b, 0x4a, 0x67, 0x8d, 0x11, 0x0d, 0xc9, 0xaa, 0x72, 0xaa, - 0x40, 0x4a, 0x59, 0x18, 0x12, 0x12, 0xa7, 0xad, 0xcd, 0x58, 0xa4, 0x09, 0x41, 0x9a, 0xed, 0xc0, - 0x0e, 0x91, 0xdb, 0x78, 0x95, 0xb7, 0x64, 0x09, 0xb1, 0x13, 0x89, 0x1b, 0x7c, 0x02, 0xf8, 0x18, - 0x7c, 0x04, 0x3e, 0x02, 0xc7, 0x1d, 0x77, 0xe0, 0xc0, 0xd2, 0x0b, 0xc7, 0x7d, 0x04, 0x14, 0xa7, - 0x1d, 0x1b, 0xeb, 0x44, 0x99, 0x38, 0xd5, 0x7e, 0x9b, 0xf7, 0xf7, 0xbc, 0xcf, 0x13, 0xc7, 0x70, - 0x5d, 0xd0, 0x30, 0x8e, 0x12, 0x12, 0xb4, 0x38, 0x4d, 0x32, 0x9a, 0xb4, 0x48, 0xcc, 0x5a, 0x31, - 0x4d, 0x38, 0xe3, 0x82, 0x1e, 0xf7, 0x69, 0x2b, 0x5b, 0x6b, 0x09, 0xc2, 0x8f, 0xbc, 0x77, 0x29, - 0x4d, 0x29, 0x37, 0xe2, 0x24, 0x12, 0x11, 0xd2, 0xc7, 0x5d, 0x46, 0xd9, 0x65, 0x90, 0x98, 0x19, - 0x97, 0xba, 0x8c, 0x6c, 0x6d, 0xf5, 0xd1, 0x24, 0x72, 0x3f, 0x88, 0xfa, 0x47, 0x05, 0x33, 0xa4, - 0x9c, 0x93, 0x01, 0x2d, 0x79, 0xfa, 0x27, 0x15, 0xce, 0x6f, 0xa6, 0x2c, 0xf0, 0x6d, 0x1f, 0xd5, - 0xa0, 0xca, 0x7c, 0x0d, 0x34, 0x40, 0x73, 0xc1, 0x51, 0x99, 0x8f, 0x5e, 0xc2, 0x59, 0x2e, 0x88, - 0xa0, 0x9a, 0xda, 0x00, 0xcd, 0x9a, 0xb9, 0x66, 0xfc, 0x5d, 0xdb, 0x18, 0xb1, 0x8c, 0x6e, 0xd1, - 0xe8, 0x94, 0xfd, 0xe8, 0x00, 0xae, 0xc8, 0x85, 0x97, 0xc6, 0x7e, 0xf1, 0x23, 0x58, 0x48, 0xb9, - 0x20, 0x61, 0xac, 0x55, 0x1a, 0xa0, 0x59, 0x35, 0x9f, 0x4c, 0x24, 0xcb, 0x89, 0x0b, 0xe6, 0xf6, - 0xfb, 0x5e, 0xc2, 0xfc, 0x9d, 0x68, 0xc0, 0xfa, 0x24, 0x68, 0x17, 0x55, 0x67, 0x59, 0xf2, 0x76, - 0x25, 0xce, 0x1d, 0xd3, 0xf4, 0x36, 0x9c, 0x95, 0xba, 0xe8, 0x3e, 0x5c, 0xea, 0xba, 0x1b, 0xae, - 0xe5, 0xed, 0xbe, 0xea, 0xbe, 0xb6, 0xda, 0xf6, 0x96, 0x6d, 0x75, 0xea, 0x0a, 0xaa, 0xc3, 0xc5, - 0xb2, 0xbc, 0xd1, 0x76, 0xed, 0x3d, 0xab, 0x0e, 0xd0, 0x12, 0xbc, 0x5b, 0x56, 0x3a, 0xd6, 0x8e, - 0xe5, 0x5a, 0x9d, 0xba, 0xaa, 0x7f, 0x57, 0xe1, 0x72, 0x3b, 0x0a, 0x63, 0x22, 0x58, 0x2f, 0xa0, - 0x7b, 0x85, 0xbd, 0xe8, 0xb8, 0x4b, 0x05, 0x7a, 0x00, 0xe7, 0x39, 0x15, 0x1e, 0xf3, 0xb9, 0x06, - 0x1a, 0x95, 0xe6, 0x82, 0x33, 0xc7, 0xa9, 0xb0, 0x7d, 0x8e, 0xb6, 0xe1, 0x42, 0xaf, 0xb0, 0x2d, - 0xff, 0x52, 0x1b, 0x95, 0x66, 0xd5, 0x7c, 0xfc, 0x0f, 0x59, 0x39, 0x77, 0x7a, 0xe5, 0x82, 0xa3, - 0x43, 0xa8, 0xf9, 0xf4, 0x80, 0xa4, 0x81, 0xf8, 0x7f, 0x51, 0xad, 0x8c, 0x88, 0x7f, 0x84, 0x85, - 0x52, 0x88, 0xe5, 0xc9, 0xf2, 0x6e, 0x54, 0x9c, 0xb9, 0xa5, 0xe2, 0x43, 0xc9, 0xed, 0x4c, 0x94, - 0xd5, 0x43, 0x58, 0x1b, 0x65, 0xca, 0x8e, 0x07, 0x1d, 0x22, 0x08, 0xda, 0x87, 0x8b, 0x59, 0x59, - 0xf1, 0x38, 0x15, 0x65, 0xb8, 0x55, 0xf3, 0xf9, 0x34, 0x09, 0x4e, 0x7a, 0x4f, 0x4e, 0x35, 0xbb, - 0x58, 0x73, 0xfd, 0x2b, 0x80, 0x4b, 0x2e, 0xe1, 0x47, 0x6f, 0x8a, 0x91, 0x76, 0x39, 0x4d, 0xa4, - 0xe4, 0x16, 0x9c, 0x95, 0x06, 0xe4, 0x61, 0xbf, 0x8d, 0xc5, 0xb2, 0x1d, 0xed, 0xc3, 0x7b, 0xd9, - 0x85, 0x19, 0xcf, 0x27, 0x82, 0xc8, 0x6f, 0xa5, 0x6a, 0x9a, 0xd3, 0x4c, 0x7f, 0x35, 0x07, 0xa7, - 0x96, 0x5d, 0xd9, 0xeb, 0x1f, 0x01, 0x5c, 0x1d, 0x3d, 0x42, 0xfd, 0xeb, 0x1e, 0x6c, 0x38, 0x23, - 0x05, 0x4b, 0x0b, 0xcf, 0xa6, 0x11, 0xbc, 0x06, 0x71, 0x24, 0x02, 0x69, 0x70, 0x7e, 0xa4, 0x2d, - 0xc7, 0xaf, 0x38, 0xe3, 0xed, 0xe6, 0xe1, 0xc9, 0x19, 0x56, 0x4e, 0xcf, 0xb0, 0x72, 0x7e, 0x86, - 0xc1, 0x87, 0x1c, 0x83, 0x2f, 0x39, 0x06, 0xdf, 0x72, 0x0c, 0x4e, 0x72, 0x0c, 0x7e, 0xe4, 0x18, - 0xfc, 0xcc, 0xb1, 0x72, 0x9e, 0x63, 0xf0, 0x79, 0x88, 0x95, 0x93, 0x21, 0x56, 0x4e, 0x87, 0x58, - 0x79, 0xbb, 0x3e, 0x88, 0x7e, 0x8f, 0xc3, 0xa2, 0x9b, 0x2f, 0xb8, 0x17, 0x97, 0xb6, 0xbd, 0x39, - 0x79, 0x23, 0x3d, 0xfd, 0x15, 0x00, 0x00, 0xff, 0xff, 0xa3, 0xd1, 0x5f, 0x57, 0x19, 0x05, 0x00, - 0x00, + // 589 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x94, 0x4f, 0x6f, 0xd3, 0x3e, + 0x18, 0xc7, 0xe3, 0x74, 0x7f, 0x7e, 0x73, 0xf7, 0x2b, 0x9d, 0x35, 0x46, 0xb4, 0x83, 0x55, 0xe5, + 0x54, 0x81, 0x94, 0xb2, 0x30, 0x24, 0x24, 0x4e, 0x5b, 0x9b, 0xb1, 0x48, 0x13, 0x82, 0x34, 0xdb, + 0x81, 0x1d, 0x22, 0xa7, 0xf1, 0x2a, 0x6f, 0xc9, 0x12, 0x62, 0x37, 0x12, 0x37, 0x78, 0x07, 0xbc, + 0x0c, 0x5e, 0x02, 0x2f, 0x81, 0xe3, 0x8e, 0x3b, 0xb2, 0x4c, 0x48, 0x1c, 0x77, 0xe7, 0x82, 0x62, + 0x77, 0x63, 0xd3, 0x8a, 0x28, 0x3b, 0x70, 0xaa, 0xfd, 0x34, 0xdf, 0xcf, 0xf7, 0xf9, 0x3e, 0x89, + 0x0d, 0xd7, 0x05, 0x4d, 0xb2, 0x34, 0x27, 0x71, 0x87, 0xd3, 0xbc, 0xa0, 0x79, 0x87, 0x64, 0xac, + 0x93, 0xd1, 0x9c, 0x33, 0x2e, 0xe8, 0xf1, 0x80, 0x76, 0x8a, 0xb5, 0x8e, 0x20, 0xfc, 0x28, 0x78, + 0x3b, 0xa2, 0x23, 0xca, 0xad, 0x2c, 0x4f, 0x45, 0x8a, 0xcc, 0x4b, 0x95, 0xa5, 0x54, 0x16, 0xc9, + 0x98, 0x75, 0x4d, 0x65, 0x15, 0x6b, 0xab, 0x0f, 0x27, 0x91, 0x07, 0x71, 0x3a, 0x38, 0xaa, 0x98, + 0x09, 0xe5, 0x9c, 0x0c, 0xa9, 0xe2, 0x99, 0x3f, 0x74, 0x38, 0xbf, 0x39, 0x62, 0x71, 0xe4, 0x46, + 0xa8, 0x01, 0x75, 0x16, 0x19, 0xa0, 0x05, 0xda, 0x0b, 0x9e, 0xce, 0x22, 0xf4, 0x02, 0xce, 0x72, + 0x41, 0x04, 0x35, 0xf4, 0x16, 0x68, 0x37, 0xec, 0x35, 0xeb, 0xcf, 0xde, 0xd6, 0x98, 0x65, 0xf5, + 0x2b, 0xa1, 0xa7, 0xf4, 0xe8, 0x00, 0xae, 0xc8, 0x45, 0x30, 0xca, 0xa2, 0xea, 0x47, 0xb0, 0x84, + 0x72, 0x41, 0x92, 0xcc, 0xa8, 0xb5, 0x40, 0xbb, 0x6e, 0x3f, 0x9e, 0x48, 0x96, 0x1d, 0x57, 0xcc, + 0xed, 0x77, 0x61, 0xce, 0xa2, 0x9d, 0x74, 0xc8, 0x06, 0x24, 0xee, 0x56, 0x55, 0x6f, 0x59, 0xf2, + 0x76, 0x25, 0xce, 0xbf, 0xa4, 0xa1, 0x43, 0x68, 0x84, 0x74, 0x40, 0x12, 0x1a, 0x44, 0xf4, 0x80, + 0x8c, 0x62, 0x71, 0xcd, 0x69, 0xe6, 0x8e, 0x4e, 0x2b, 0x8a, 0xd8, 0x53, 0xc0, 0x2b, 0x2f, 0xb3, + 0x0b, 0x67, 0x65, 0x46, 0x74, 0x1f, 0x2e, 0xf5, 0xfd, 0x0d, 0xdf, 0x09, 0x76, 0x5f, 0xf6, 0x5f, + 0x39, 0x5d, 0x77, 0xcb, 0x75, 0x7a, 0x4d, 0x0d, 0x35, 0xe1, 0xa2, 0x2a, 0x6f, 0x74, 0x7d, 0x77, + 0xcf, 0x69, 0x02, 0xb4, 0x04, 0xff, 0x57, 0x95, 0x9e, 0xb3, 0xe3, 0xf8, 0x4e, 0xaf, 0xa9, 0x9b, + 0xdf, 0x00, 0x5c, 0xee, 0xa6, 0x49, 0x46, 0x04, 0x0b, 0x63, 0xba, 0x57, 0x8d, 0x32, 0x3d, 0xee, + 0x53, 0x81, 0x1e, 0xc0, 0x79, 0x4e, 0x45, 0xc0, 0x22, 0x6e, 0x80, 0x56, 0xad, 0xbd, 0xe0, 0xcd, + 0x71, 0x2a, 0xdc, 0x88, 0xa3, 0x6d, 0xb8, 0x10, 0x56, 0x23, 0x96, 0x7f, 0xe9, 0xad, 0x5a, 0xbb, + 0x6e, 0x3f, 0xfa, 0x8b, 0xf7, 0xe2, 0xfd, 0x17, 0xaa, 0x05, 0xff, 0xa7, 0xc3, 0x4a, 0x60, 0x63, + 0x1c, 0x8e, 0x1d, 0x0f, 0x7b, 0x44, 0x10, 0xb4, 0x0f, 0x17, 0x0b, 0x55, 0x09, 0x38, 0x15, 0x2a, + 0x65, 0xdd, 0x7e, 0x36, 0x4d, 0x94, 0x49, 0x03, 0xf3, 0xea, 0xc5, 0xd5, 0x9a, 0x9b, 0x9f, 0x01, + 0x5c, 0xf2, 0x09, 0x3f, 0x7a, 0x5d, 0x9d, 0x9c, 0x5d, 0x4e, 0x73, 0x69, 0xb9, 0x05, 0x67, 0x65, + 0xef, 0xf2, 0x0b, 0xbf, 0x4b, 0x3a, 0x25, 0x47, 0xfb, 0xf0, 0x5e, 0x71, 0x15, 0x26, 0x88, 0x88, + 0x20, 0xf2, 0x80, 0xd4, 0x6d, 0x7b, 0x9a, 0xee, 0x6f, 0xce, 0xc1, 0x6b, 0x14, 0x37, 0xf6, 0xe6, + 0x07, 0x00, 0x57, 0xc7, 0x8f, 0xd0, 0xe8, 0x76, 0x06, 0x17, 0xce, 0x48, 0x43, 0x15, 0xe1, 0xe9, + 0x34, 0x86, 0xb7, 0x20, 0x9e, 0x44, 0x20, 0x03, 0xce, 0x8f, 0xbd, 0x65, 0xfb, 0x35, 0xef, 0x72, + 0xbb, 0x79, 0x78, 0x72, 0x86, 0xb5, 0xd3, 0x33, 0xac, 0x5d, 0x9c, 0x61, 0xf0, 0xbe, 0xc4, 0xe0, + 0x53, 0x89, 0xc1, 0x97, 0x12, 0x83, 0x93, 0x12, 0x83, 0xaf, 0x25, 0x06, 0xdf, 0x4b, 0xac, 0x5d, + 0x94, 0x18, 0x7c, 0x3c, 0xc7, 0xda, 0xc9, 0x39, 0xd6, 0x4e, 0xcf, 0xb1, 0xf6, 0x66, 0x7d, 0x98, + 0xfe, 0x6a, 0x87, 0xa5, 0xbf, 0xbf, 0xd5, 0x9e, 0x5f, 0xdb, 0x86, 0x73, 0xf2, 0x1a, 0x7a, 0xf2, + 0x33, 0x00, 0x00, 0xff, 0xff, 0xce, 0x85, 0xd4, 0xc4, 0x0e, 0x05, 0x00, 0x00, } func (x BuildId_State) String() string { @@ -467,6 +465,9 @@ func (this *BuildId) Equal(that interface{}) bool { if !this.StateUpdateTimestamp.Equal(that1.StateUpdateTimestamp) { return false } + if !this.BecameDefaultTimestamp.Equal(that1.BecameDefaultTimestamp) { + return false + } return true } func (this *CompatibleVersionSet) Equal(that interface{}) bool { @@ -504,10 +505,7 @@ func (this *CompatibleVersionSet) Equal(that interface{}) bool { return false } } - if !this.DefaultUpdateTimestamp.Equal(that1.DefaultUpdateTimestamp) { - return false - } - if !this.QueueDefaultUpdateTimestamp.Equal(that1.QueueDefaultUpdateTimestamp) { + if !this.BecameDefaultTimestamp.Equal(that1.BecameDefaultTimestamp) { return false } return true @@ -599,13 +597,16 @@ func (this *BuildId) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&persistence.BuildId{") s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") s = append(s, "State: "+fmt.Sprintf("%#v", this.State)+",\n") if this.StateUpdateTimestamp != nil { s = append(s, "StateUpdateTimestamp: "+fmt.Sprintf("%#v", this.StateUpdateTimestamp)+",\n") } + if this.BecameDefaultTimestamp != nil { + s = append(s, "BecameDefaultTimestamp: "+fmt.Sprintf("%#v", this.BecameDefaultTimestamp)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -613,17 +614,14 @@ func (this *CompatibleVersionSet) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 8) + s := make([]string, 0, 7) s = append(s, "&persistence.CompatibleVersionSet{") s = append(s, "SetIds: "+fmt.Sprintf("%#v", this.SetIds)+",\n") if this.BuildIds != nil { s = append(s, "BuildIds: "+fmt.Sprintf("%#v", this.BuildIds)+",\n") } - if this.DefaultUpdateTimestamp != nil { - s = append(s, "DefaultUpdateTimestamp: "+fmt.Sprintf("%#v", this.DefaultUpdateTimestamp)+",\n") - } - if this.QueueDefaultUpdateTimestamp != nil { - s = append(s, "QueueDefaultUpdateTimestamp: "+fmt.Sprintf("%#v", this.QueueDefaultUpdateTimestamp)+",\n") + if this.BecameDefaultTimestamp != nil { + s = append(s, "BecameDefaultTimestamp: "+fmt.Sprintf("%#v", this.BecameDefaultTimestamp)+",\n") } s = append(s, "}") return strings.Join(s, "") @@ -696,6 +694,18 @@ func (m *BuildId) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.BecameDefaultTimestamp != nil { + { + size, err := m.BecameDefaultTimestamp.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTaskQueues(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } if m.StateUpdateTimestamp != nil { { size, err := m.StateUpdateTimestamp.MarshalToSizedBuffer(dAtA[:i]) @@ -743,9 +753,9 @@ func (m *CompatibleVersionSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.QueueDefaultUpdateTimestamp != nil { + if m.BecameDefaultTimestamp != nil { { - size, err := m.QueueDefaultUpdateTimestamp.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.BecameDefaultTimestamp.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -755,18 +765,6 @@ func (m *CompatibleVersionSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x22 } - if m.DefaultUpdateTimestamp != nil { - { - size, err := m.DefaultUpdateTimestamp.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTaskQueues(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x1a - } if len(m.BuildIds) > 0 { for iNdEx := len(m.BuildIds) - 1; iNdEx >= 0; iNdEx-- { { @@ -945,6 +943,10 @@ func (m *BuildId) Size() (n int) { l = m.StateUpdateTimestamp.Size() n += 1 + l + sovTaskQueues(uint64(l)) } + if m.BecameDefaultTimestamp != nil { + l = m.BecameDefaultTimestamp.Size() + n += 1 + l + sovTaskQueues(uint64(l)) + } return n } @@ -966,12 +968,8 @@ func (m *CompatibleVersionSet) Size() (n int) { n += 1 + l + sovTaskQueues(uint64(l)) } } - if m.DefaultUpdateTimestamp != nil { - l = m.DefaultUpdateTimestamp.Size() - n += 1 + l + sovTaskQueues(uint64(l)) - } - if m.QueueDefaultUpdateTimestamp != nil { - l = m.QueueDefaultUpdateTimestamp.Size() + if m.BecameDefaultTimestamp != nil { + l = m.BecameDefaultTimestamp.Size() n += 1 + l + sovTaskQueues(uint64(l)) } return n @@ -1039,6 +1037,7 @@ func (this *BuildId) String() string { `Id:` + fmt.Sprintf("%v", this.Id) + `,`, `State:` + fmt.Sprintf("%v", this.State) + `,`, `StateUpdateTimestamp:` + strings.Replace(fmt.Sprintf("%v", this.StateUpdateTimestamp), "HybridLogicalClock", "v1.HybridLogicalClock", 1) + `,`, + `BecameDefaultTimestamp:` + strings.Replace(fmt.Sprintf("%v", this.BecameDefaultTimestamp), "HybridLogicalClock", "v1.HybridLogicalClock", 1) + `,`, `}`, }, "") return s @@ -1055,8 +1054,7 @@ func (this *CompatibleVersionSet) String() string { s := strings.Join([]string{`&CompatibleVersionSet{`, `SetIds:` + fmt.Sprintf("%v", this.SetIds) + `,`, `BuildIds:` + repeatedStringForBuildIds + `,`, - `DefaultUpdateTimestamp:` + strings.Replace(fmt.Sprintf("%v", this.DefaultUpdateTimestamp), "HybridLogicalClock", "v1.HybridLogicalClock", 1) + `,`, - `QueueDefaultUpdateTimestamp:` + strings.Replace(fmt.Sprintf("%v", this.QueueDefaultUpdateTimestamp), "HybridLogicalClock", "v1.HybridLogicalClock", 1) + `,`, + `BecameDefaultTimestamp:` + strings.Replace(fmt.Sprintf("%v", this.BecameDefaultTimestamp), "HybridLogicalClock", "v1.HybridLogicalClock", 1) + `,`, `}`, }, "") return s @@ -1222,6 +1220,42 @@ func (m *BuildId) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BecameDefaultTimestamp", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTaskQueues + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTaskQueues + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTaskQueues + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.BecameDefaultTimestamp == nil { + m.BecameDefaultTimestamp = &v1.HybridLogicalClock{} + } + if err := m.BecameDefaultTimestamp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTaskQueues(dAtA[iNdEx:]) @@ -1341,45 +1375,9 @@ func (m *CompatibleVersionSet) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field DefaultUpdateTimestamp", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTaskQueues - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthTaskQueues - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthTaskQueues - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.DefaultUpdateTimestamp == nil { - m.DefaultUpdateTimestamp = &v1.HybridLogicalClock{} - } - if err := m.DefaultUpdateTimestamp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex case 4: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field QueueDefaultUpdateTimestamp", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field BecameDefaultTimestamp", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -1406,10 +1404,10 @@ func (m *CompatibleVersionSet) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.QueueDefaultUpdateTimestamp == nil { - m.QueueDefaultUpdateTimestamp = &v1.HybridLogicalClock{} + if m.BecameDefaultTimestamp == nil { + m.BecameDefaultTimestamp = &v1.HybridLogicalClock{} } - if err := m.QueueDefaultUpdateTimestamp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.BecameDefaultTimestamp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex diff --git a/proto/internal/temporal/server/api/persistence/v1/task_queues.proto b/proto/internal/temporal/server/api/persistence/v1/task_queues.proto index 4f789947524..5221acd6c47 100644 --- a/proto/internal/temporal/server/api/persistence/v1/task_queues.proto +++ b/proto/internal/temporal/server/api/persistence/v1/task_queues.proto @@ -39,6 +39,10 @@ message BuildId { // (-- api-linter: core::0142::time-field-type=disabled // aip.dev/not-precedent: Using HLC instead of wall clock. --) temporal.server.api.clock.v1.HybridLogicalClock state_update_timestamp = 3; + // HLC timestamp representing when this build id was last made default in its version set. + // (-- api-linter: core::0142::time-field-type=disabled + // aip.dev/not-precedent: Using HLC instead of wall clock. --) + temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4; } // An internal represenation of temporal.api.taskqueue.v1.CompatibleVersionSet @@ -50,15 +54,10 @@ message CompatibleVersionSet { repeated string set_ids = 1; // All the compatible versions, unordered except for the last element, which is considered the set "default". repeated BuildId build_ids = 2; - // HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which - // refers to the build ID status. - // (-- api-linter: core::0142::time-field-type=disabled - // aip.dev/not-precedent: Using HLC instead of wall clock. --) - temporal.server.api.clock.v1.HybridLogicalClock default_update_timestamp = 3; // HLC timestamp representing when this set was last made the default for the queue. // (-- api-linter: core::0142::time-field-type=disabled // aip.dev/not-precedent: Using HLC instead of wall clock. --) - temporal.server.api.clock.v1.HybridLogicalClock queue_default_update_timestamp = 4; + temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4; } // Holds all the data related to worker versioning for a task queue. diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index b8ae049228f..af91b97a6d1 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -1592,8 +1592,9 @@ func (e *matchingEngineImpl) reviveBuildId(ns *namespace.Namespace, taskQueue st tag.WorkflowTaskQueueName(taskQueue), tag.BuildId(buildId.Id)) return &persistencespb.BuildId{ - Id: buildId.GetId(), - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &stamp, + Id: buildId.GetId(), + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &stamp, + BecameDefaultTimestamp: buildId.BecameDefaultTimestamp, } } diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index 21420ced41b..cf689861fef 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -101,6 +101,9 @@ func gatherBuildIds(data *persistencespb.VersioningData) map[string]struct{} { return buildIds } +// RemoveBuildIds removes given buildIds from versioning data. +// Assumes that build ids are safe to remove, ex: a set default is never removed unless it is a single set member and +// that set is not default for the queue. func RemoveBuildIds(clock hlc.Clock, versioningData *persistencespb.VersioningData, buildIds []string) *persistencespb.VersioningData { buildIdsMap := make(map[string]struct{}, len(buildIds)) for _, buildId := range buildIds { @@ -113,9 +116,10 @@ func RemoveBuildIds(clock hlc.Clock, versioningData *persistencespb.VersioningDa for buildIdIdx, buildId := range set.BuildIds { if _, found := buildIdsMap[buildId.Id]; found { set.BuildIds[buildIdIdx] = &persistencespb.BuildId{ - Id: buildId.Id, - State: persistencespb.STATE_DELETED, - StateUpdateTimestamp: &clock, + Id: buildId.Id, + State: persistencespb.STATE_DELETED, + StateUpdateTimestamp: &clock, + BecameDefaultTimestamp: buildId.BecameDefaultTimestamp, } } } @@ -158,10 +162,9 @@ func shallowCloneVersioningData(data *persistencespb.VersioningData) *persistenc func shallowCloneVersionSet(set *persistencespb.CompatibleVersionSet) *persistencespb.CompatibleVersionSet { clone := &persistencespb.CompatibleVersionSet{ - SetIds: set.SetIds, - BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)), - DefaultUpdateTimestamp: set.DefaultUpdateTimestamp, - QueueDefaultUpdateTimestamp: set.QueueDefaultUpdateTimestamp, + SetIds: set.SetIds, + BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)), + BecameDefaultTimestamp: set.BecameDefaultTimestamp, } copy(clone.BuildIds, set.BuildIds) return clone @@ -297,21 +300,13 @@ func updateImpl(timestamp hlc.Clock, data *persistencespb.VersioningData, req *w } // Merge the sets together, preserving the primary set's default by making it have the most recent timestamp. primarySet := data.VersionSets[targetSetIdx] + primaryBuildId := primarySet.BuildIds[len(primarySet.BuildIds)-1] + primaryBuildId.BecameDefaultTimestamp = ×tamp justPrimaryData := &persistencespb.VersioningData{ - VersionSets: []*persistencespb.CompatibleVersionSet{{ - SetIds: primarySet.SetIds, - BuildIds: primarySet.BuildIds, - DefaultUpdateTimestamp: ×tamp, - QueueDefaultUpdateTimestamp: primarySet.QueueDefaultUpdateTimestamp, - }}, + VersionSets: []*persistencespb.CompatibleVersionSet{primarySet}, } secondarySet := data.VersionSets[secondarySetIdx] - data.VersionSets[secondarySetIdx] = &persistencespb.CompatibleVersionSet{ - SetIds: mergeSetIDs(primarySet.SetIds, secondarySet.SetIds), - BuildIds: secondarySet.BuildIds, - DefaultUpdateTimestamp: secondarySet.DefaultUpdateTimestamp, - QueueDefaultUpdateTimestamp: secondarySet.QueueDefaultUpdateTimestamp, - } + secondarySet.SetIds = mergeSetIDs(primarySet.SetIds, secondarySet.SetIds) data = MergeVersioningData(justPrimaryData, data) } @@ -349,7 +344,7 @@ func findVersion(data *persistencespb.VersioningData, buildID string) (setIndex, func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *hlc.Clock) { set := data.VersionSets[setIx] - set.QueueDefaultUpdateTimestamp = timestamp + set.BecameDefaultTimestamp = timestamp if setIx < len(data.VersionSets)-1 { // Move the set to the end and shift all the others down @@ -359,16 +354,16 @@ func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *h } func makeVersionInSetDefault(data *persistencespb.VersioningData, setIx, versionIx int, timestamp *hlc.Clock) { - data.VersionSets[setIx].DefaultUpdateTimestamp = timestamp setVersions := data.VersionSets[setIx].BuildIds + buildId := setVersions[versionIx] + buildId.BecameDefaultTimestamp = timestamp if len(setVersions) <= 1 { return } if versionIx < len(setVersions)-1 { // Move the build ID to the end and shift all the others down - moveMe := setVersions[versionIx] copy(setVersions[versionIx:], setVersions[versionIx+1:]) - setVersions[len(setVersions)-1] = moveMe + setVersions[len(setVersions)-1] = buildId } } diff --git a/service/matching/version_sets_merge.go b/service/matching/version_sets_merge.go index 8efa0f231af..aba571ec684 100644 --- a/service/matching/version_sets_merge.go +++ b/service/matching/version_sets_merge.go @@ -80,8 +80,7 @@ type buildIDInfo struct { func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string]buildIDInfo { buildIDToInfo := make(map[string]buildIDInfo, 0) for _, set := range sets { - lastIdx := len(set.BuildIds) - 1 - for setIdx, buildID := range set.BuildIds { + for _, buildID := range set.BuildIds { if info, found := buildIDToInfo[buildID.Id]; found { // A build ID appears in more than one source, merge its information, and track it state := info.state @@ -89,30 +88,21 @@ func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string] if hlc.Equal(stateUpdateTimestamp, *buildID.StateUpdateTimestamp) { state = buildID.State } - madeDefaultAt := info.madeDefaultAt - if setIdx == lastIdx { - madeDefaultAt = hlc.Max(*set.DefaultUpdateTimestamp, madeDefaultAt) - } - buildIDToInfo[buildID.Id] = buildIDInfo{ state: state, stateUpdateTimestamp: stateUpdateTimestamp, setIDs: mergeSetIDs(info.setIDs, set.SetIds), - madeDefaultAt: madeDefaultAt, - setMadeDefaultAt: hlc.Max(*set.QueueDefaultUpdateTimestamp, info.setMadeDefaultAt), + madeDefaultAt: hlc.Max(*buildID.BecameDefaultTimestamp, info.madeDefaultAt), + setMadeDefaultAt: hlc.Max(*set.BecameDefaultTimestamp, info.setMadeDefaultAt), } } else { // A build ID was seen for the first time, track it - madeDefaultAt := hlc.Zero(0) - if setIdx == lastIdx { - madeDefaultAt = *set.DefaultUpdateTimestamp - } buildIDToInfo[buildID.Id] = buildIDInfo{ state: buildID.State, stateUpdateTimestamp: *buildID.StateUpdateTimestamp, setIDs: set.SetIds, - madeDefaultAt: madeDefaultAt, - setMadeDefaultAt: *set.QueueDefaultUpdateTimestamp, + madeDefaultAt: *buildID.BecameDefaultTimestamp, + setMadeDefaultAt: *set.BecameDefaultTimestamp, } } } @@ -123,54 +113,44 @@ func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string] func intoVersionSets(buildIDToInfo map[string]buildIDInfo) []*persistencespb.CompatibleVersionSet { sets := make([]*persistencespb.CompatibleVersionSet, 0) for id, info := range buildIDToInfo { + info := info set := findSetWithSetIDs(sets, info.setIDs) if set == nil { set = &persistencespb.CompatibleVersionSet{ - SetIds: info.setIDs, - BuildIds: make([]*persistencespb.BuildId, 0), - DefaultUpdateTimestamp: hlc.Ptr(hlc.Zero(0)), - QueueDefaultUpdateTimestamp: hlc.Ptr(hlc.Zero(0)), + SetIds: info.setIDs, + BuildIds: make([]*persistencespb.BuildId, 0), + BecameDefaultTimestamp: &info.setMadeDefaultAt, } sets = append(sets, set) } else { set.SetIds = mergeSetIDs(set.SetIds, info.setIDs) + set.BecameDefaultTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.BecameDefaultTimestamp)) } - timestamp := info.stateUpdateTimestamp buildID := &persistencespb.BuildId{ - Id: id, - State: info.state, - StateUpdateTimestamp: ×tamp, - } - defaultTimestamp := info.madeDefaultAt - set.QueueDefaultUpdateTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.QueueDefaultUpdateTimestamp)) - - // Insert the build ID in the right order based on whether it is the default or by its update timestamp - if hlc.Greater(*set.DefaultUpdateTimestamp, defaultTimestamp) { - // Can't be the last element, it's the default already - lastIdx := len(set.BuildIds) - 1 - for idx, curr := range set.BuildIds { - if idx == lastIdx || hlc.Greater(*curr.StateUpdateTimestamp, timestamp) { - // Insert just before - set.BuildIds = append(set.BuildIds[:idx+1], set.BuildIds[idx:]...) - set.BuildIds[idx] = buildID - break - } - } - } else { - set.DefaultUpdateTimestamp = &defaultTimestamp - set.BuildIds = append(set.BuildIds, buildID) + Id: id, + State: info.state, + StateUpdateTimestamp: &info.stateUpdateTimestamp, + BecameDefaultTimestamp: &info.madeDefaultAt, } + set.BuildIds = append(set.BuildIds, buildID) } // Sort the sets based on their default update timestamp, ensuring the default set comes last sortSets(sets) + for _, set := range sets { + sortBuildIds(set.BuildIds) + } return sets } func sortSets(sets []*persistencespb.CompatibleVersionSet) { sort.Slice(sets, func(i, j int) bool { - si := sets[i] - sj := sets[j] - return hlc.Less(*si.QueueDefaultUpdateTimestamp, *sj.QueueDefaultUpdateTimestamp) + return hlc.Less(*sets[i].BecameDefaultTimestamp, *sets[j].BecameDefaultTimestamp) + }) +} + +func sortBuildIds(buildIds []*persistencespb.BuildId) { + sort.Slice(buildIds, func(i, j int) bool { + return hlc.Less(*buildIds[i].BecameDefaultTimestamp, *buildIds[j].BecameDefaultTimestamp) }) } diff --git a/service/matching/version_sets_merge_test.go b/service/matching/version_sets_merge_test.go index 6307a74f758..d10fd7005a0 100644 --- a/service/matching/version_sets_merge_test.go +++ b/service/matching/version_sets_merge_test.go @@ -44,30 +44,18 @@ func buildID(wallclock int64, id string, optionalState ...persistencespb.BuildId } return &persistencespb.BuildId{ - Id: id, - State: state, - StateUpdateTimestamp: fromWallClock(wallclock), + Id: id, + State: state, + StateUpdateTimestamp: fromWallClock(wallclock), + BecameDefaultTimestamp: fromWallClock(wallclock), } } -func mkBuildIds(buildIDs ...*persistencespb.BuildId) []*persistencespb.BuildId { - buildIDStructs := make([]*persistencespb.BuildId, len(buildIDs)) - for i, buildID := range buildIDs { - buildIDStructs[i] = &persistencespb.BuildId{ - Id: buildID.Id, - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: buildID.StateUpdateTimestamp, - } - } - return buildIDStructs -} - func mkSet(setID string, buildIDs ...*persistencespb.BuildId) *persistencespb.CompatibleVersionSet { return &persistencespb.CompatibleVersionSet{ - SetIds: []string{setID}, - BuildIds: mkBuildIds(buildIDs...), - DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, - QueueDefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, + SetIds: []string{setID}, + BuildIds: buildIDs, + BecameDefaultTimestamp: buildIDs[len(buildIDs)-1].BecameDefaultTimestamp, } } @@ -131,10 +119,9 @@ func TestSetMerge_DifferentSetIDs_MergesSetIDs(t *testing.T) { b := mkSingleSetData("0.2", buildID(3, "0.2")) expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{{ - SetIds: []string{"0.1", "0.2"}, - BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(6, "0.2")), - DefaultUpdateTimestamp: fromWallClock(6), - QueueDefaultUpdateTimestamp: fromWallClock(6), + SetIds: []string{"0.1", "0.2"}, + BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(6, "0.2")}, + BecameDefaultTimestamp: fromWallClock(6), }}, } assert.Equal(t, expected, MergeVersioningData(a, b)) @@ -158,10 +145,9 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) { } expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{{ - SetIds: []string{"0.1", "0.2"}, - BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(3, "0.2")), - DefaultUpdateTimestamp: fromWallClock(3), - QueueDefaultUpdateTimestamp: fromWallClock(3), + SetIds: []string{"0.1", "0.2"}, + BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(3, "0.2")}, + BecameDefaultTimestamp: fromWallClock(3), }}, } assert.Equal(t, expected, MergeVersioningData(a, b)) @@ -170,16 +156,16 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) { func TestSetMerge_BuildIdPromoted_PreservesSetDefault(t *testing.T) { a := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) - a.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3) + a.VersionSets[0].BuildIds[len(a.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3) b := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) - b.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3) + b.VersionSets[0].BuildIds[len(b.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3) assert.Equal(t, b, MergeVersioningData(a, b)) assert.Equal(t, b, MergeVersioningData(b, a)) } func TestSetMerge_SetPromoted_PreservesGlobalDefault(t *testing.T) { set01 := mkSet("0.1", buildID(1, "0.1")) - set01.QueueDefaultUpdateTimestamp = fromWallClock(3) + set01.BecameDefaultTimestamp = fromWallClock(3) a := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ mkSet("0.2", buildID(2, "0.2")), diff --git a/service/matching/version_sets_test.go b/service/matching/version_sets_test.go index 3f24cefd0ae..ea79a82f328 100644 --- a/service/matching/version_sets_test.go +++ b/service/matching/version_sets_test.go @@ -40,10 +40,9 @@ import ( func mkNewSet(id string, clock clockspb.HybridLogicalClock) *persistencespb.CompatibleVersionSet { return &persistencespb.CompatibleVersionSet{ - SetIds: []string{hashBuildId(id)}, - BuildIds: []*persistencespb.BuildId{{Id: id, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, + SetIds: []string{hashBuildId(id)}, + BuildIds: []*persistencespb.BuildId{{Id: id, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock, BecameDefaultTimestamp: &clock}}, + BecameDefaultTimestamp: &clock, } } @@ -108,6 +107,23 @@ func mkMergeSet(primaryId string, secondaryId string) *workflowservice.UpdateWor } } +func mkBuildId(id string, clock hlc.Clock) *persistencespb.BuildId { + return &persistencespb.BuildId{ + Id: id, + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &clock, + BecameDefaultTimestamp: &clock, + } +} + +func mkSingleBuildIdSet(id string, clock hlc.Clock) *persistencespb.CompatibleVersionSet { + return &persistencespb.CompatibleVersionSet{ + SetIds: []string{hashBuildId(id)}, + BuildIds: []*persistencespb.BuildId{mkBuildId(id, clock)}, + BecameDefaultTimestamp: &clock, + } +} + func TestNewDefaultUpdate(t *testing.T) { t.Parallel() clock := hlc.Zero(1) @@ -121,24 +137,9 @@ func TestNewDefaultUpdate(t *testing.T) { expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ - { - SetIds: []string{hashBuildId("0")}, - BuildIds: []*persistencespb.BuildId{{Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, - }, - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, - }, - { - SetIds: []string{hashBuildId("2")}, - BuildIds: []*persistencespb.BuildId{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}}, - DefaultUpdateTimestamp: &nextClock, - QueueDefaultUpdateTimestamp: &nextClock, - }, + mkSingleBuildIdSet("0", clock), + mkSingleBuildIdSet("1", clock), + mkSingleBuildIdSet("2", nextClock), }, } assert.Equal(t, expected, updatedData) @@ -160,12 +161,7 @@ func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) { expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}}, - DefaultUpdateTimestamp: &nextClock, - QueueDefaultUpdateTimestamp: &nextClock, - }, + mkSingleBuildIdSet("1", nextClock), }, } assert.Equal(t, expected, updatedData) @@ -184,20 +180,14 @@ func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) { expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ - { - SetIds: []string{hashBuildId("0")}, - BuildIds: []*persistencespb.BuildId{{Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, - }, + mkSingleBuildIdSet("0", clock), { SetIds: []string{hashBuildId("1")}, BuildIds: []*persistencespb.BuildId{ - {Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}, - {Id: "1.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}, + mkBuildId("1", clock), + mkBuildId("1.1", nextClock), }, - DefaultUpdateTimestamp: &nextClock, - QueueDefaultUpdateTimestamp: &nextClock, + BecameDefaultTimestamp: &nextClock, }, }, } @@ -217,20 +207,14 @@ func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) { expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, - }, + mkSingleBuildIdSet("1", clock), { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}, - {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}, + mkBuildId("0", clock), + mkBuildId("0.1", nextClock), }, - DefaultUpdateTimestamp: &nextClock, - QueueDefaultUpdateTimestamp: &nextClock, + BecameDefaultTimestamp: &nextClock, }, }, } @@ -253,18 +237,12 @@ func TestNewCompatibleWithVerInOlderSet(t *testing.T) { { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}, - {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}, + mkBuildId("0", clock), + mkBuildId("0.1", nextClock), }, - DefaultUpdateTimestamp: &nextClock, - QueueDefaultUpdateTimestamp: &clock, - }, - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, + BecameDefaultTimestamp: &clock, }, + mkSingleBuildIdSet("1", clock), }, } @@ -293,19 +271,13 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, - {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock1}, - {Id: "0.2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, + mkBuildId("0", clock0), + mkBuildId("0.1", clock1), + mkBuildId("0.2", clock2), }, - DefaultUpdateTimestamp: &clock2, - QueueDefaultUpdateTimestamp: &clock0, - }, - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock0, + BecameDefaultTimestamp: &clock0, }, + mkSingleBuildIdSet("1", clock0), }, } @@ -321,20 +293,14 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, - {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock1}, - {Id: "0.2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, - {Id: "0.3", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock3}, + mkBuildId("0", clock0), + mkBuildId("0.1", clock1), + mkBuildId("0.2", clock2), + mkBuildId("0.3", clock3), }, - DefaultUpdateTimestamp: &clock3, - QueueDefaultUpdateTimestamp: &clock0, - }, - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock0, + BecameDefaultTimestamp: &clock0, }, + mkSingleBuildIdSet("1", clock0), }, } @@ -365,25 +331,12 @@ func TestMakeExistingSetDefault(t *testing.T) { expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ + mkSingleBuildIdSet("0", clock0), + mkSingleBuildIdSet("2", clock0), { - SetIds: []string{hashBuildId("0")}, - BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, - }, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock0, - }, - { - SetIds: []string{hashBuildId("2")}, - BuildIds: []*persistencespb.BuildId{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock0, - }, - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock1, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{mkBuildId("1", clock0)}, + BecameDefaultTimestamp: &clock1, }, }, } @@ -398,26 +351,19 @@ func TestMakeExistingSetDefault(t *testing.T) { expected = &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ + mkSingleBuildIdSet("2", clock0), { - SetIds: []string{hashBuildId("2")}, - BuildIds: []*persistencespb.BuildId{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock0, - }, - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock1, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{mkBuildId("1", clock0)}, + BecameDefaultTimestamp: &clock1, }, { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, - {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, + mkBuildId("0", clock0), + mkBuildId("0.1", clock2), }, - DefaultUpdateTimestamp: &clock2, - QueueDefaultUpdateTimestamp: &clock2, + BecameDefaultTimestamp: &clock2, }, }, } @@ -486,19 +432,13 @@ func TestPromoteWithinVersion(t *testing.T) { { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, - {Id: "0.2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, - {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock1}, + mkBuildId("0", clock0), + mkBuildId("0.2", clock2), + {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock1, BecameDefaultTimestamp: &clock3}, }, - DefaultUpdateTimestamp: &clock3, - QueueDefaultUpdateTimestamp: &clock0, - }, - { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, - DefaultUpdateTimestamp: &clock0, - QueueDefaultUpdateTimestamp: &clock0, + BecameDefaultTimestamp: &clock0, }, + mkSingleBuildIdSet("1", clock0), }, } assert.Equal(t, expected, data) @@ -633,11 +573,10 @@ func TestToBuildIdOrderingResponseOmitsDeleted(t *testing.T) { { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ - {Id: "0", State: persistencespb.STATE_DELETED, StateUpdateTimestamp: &clock}, - {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}, + {Id: "0", State: persistencespb.STATE_DELETED, StateUpdateTimestamp: &clock, BecameDefaultTimestamp: &clock}, + {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock, BecameDefaultTimestamp: &clock}, }, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, + BecameDefaultTimestamp: &clock, }, }, } @@ -658,32 +597,28 @@ func TestGetBuildIdDeltas(t *testing.T) { prev := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildId("0")}, - BuildIds: []*persistencespb.BuildId{{Id: "0", State: persistencespb.STATE_DELETED, StateUpdateTimestamp: &clock}, {Id: "0.1", State: persistencespb.STATE_ACTIVE}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{{Id: "0", State: persistencespb.STATE_DELETED}, {Id: "0.1", State: persistencespb.STATE_ACTIVE}}, + BecameDefaultTimestamp: &clock, }, { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE}}, + BecameDefaultTimestamp: &clock, }, }, } curr := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildId("0")}, - BuildIds: []*persistencespb.BuildId{{Id: "0.1", State: persistencespb.STATE_ACTIVE}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{{Id: "0.1", State: persistencespb.STATE_ACTIVE}}, + BecameDefaultTimestamp: &clock, }, { - SetIds: []string{hashBuildId("1")}, - BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_DELETED, StateUpdateTimestamp: &clock}, {Id: "1.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, - DefaultUpdateTimestamp: &clock, - QueueDefaultUpdateTimestamp: &clock, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_DELETED}, {Id: "1.1", State: persistencespb.STATE_ACTIVE}}, + BecameDefaultTimestamp: &clock, }, }, } @@ -712,37 +647,37 @@ func Test_RemoveBuildIds_PutsTombstonesOnSuppliedBuildIds(t *testing.T) { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ { - Id: "0", - State: persistencespb.STATE_DELETED, - StateUpdateTimestamp: &c1, + Id: "0", + State: persistencespb.STATE_DELETED, + StateUpdateTimestamp: &c1, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{hashBuildId("1")}, BuildIds: []*persistencespb.BuildId{ { - Id: "1", - State: persistencespb.STATE_DELETED, - StateUpdateTimestamp: &c1, + Id: "1", + State: persistencespb.STATE_DELETED, + StateUpdateTimestamp: &c1, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{hashBuildId("2")}, BuildIds: []*persistencespb.BuildId{ { - Id: "2", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "2", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, } @@ -764,42 +699,43 @@ func Test_ClearTombstones(t *testing.T) { SetIds: []string{hashBuildId("0")}, BuildIds: []*persistencespb.BuildId{ { - Id: "0", - State: persistencespb.STATE_DELETED, - StateUpdateTimestamp: &c0, + Id: "0", + State: persistencespb.STATE_DELETED, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{hashBuildId("1")}, BuildIds: []*persistencespb.BuildId{ { - Id: "1", - State: persistencespb.STATE_DELETED, - StateUpdateTimestamp: &c0, + Id: "1", + State: persistencespb.STATE_DELETED, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { - Id: "1.1", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "1.1", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{hashBuildId("2")}, BuildIds: []*persistencespb.BuildId{ { - Id: "2", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "2", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, } @@ -810,25 +746,25 @@ func Test_ClearTombstones(t *testing.T) { SetIds: []string{hashBuildId("1")}, BuildIds: []*persistencespb.BuildId{ { - Id: "1.1", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "1.1", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{hashBuildId("2")}, BuildIds: []*persistencespb.BuildId{ { - Id: "2", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "2", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, } @@ -844,9 +780,11 @@ func TestMergeSets(t *testing.T) { clock := hlc.Zero(1) clockQueueDefault := hlc.Next(clock, commonclock.NewRealTimeSource()) initialData := mkInitialData(4, clock) - initialData.VersionSets[1].QueueDefaultUpdateTimestamp = hlc.Ptr(hlc.Next(clock, commonclock.NewRealTimeSource())) - initialData.VersionSets[2].QueueDefaultUpdateTimestamp = hlc.Ptr(hlc.Next(*initialData.VersionSets[1].QueueDefaultUpdateTimestamp, commonclock.NewRealTimeSource())) - initialData.VersionSets[3].QueueDefaultUpdateTimestamp = hlc.Ptr(hlc.Next(*initialData.VersionSets[2].QueueDefaultUpdateTimestamp, commonclock.NewRealTimeSource())) + + // Make sure the clocks are incrementing per version set for the merge to be predictable + initialData.VersionSets[1].BecameDefaultTimestamp = hlc.Ptr(hlc.Next(clock, commonclock.NewRealTimeSource())) + initialData.VersionSets[2].BecameDefaultTimestamp = hlc.Ptr(hlc.Next(*initialData.VersionSets[1].BecameDefaultTimestamp, commonclock.NewRealTimeSource())) + initialData.VersionSets[3].BecameDefaultTimestamp = hlc.Ptr(hlc.Next(*initialData.VersionSets[2].BecameDefaultTimestamp, commonclock.NewRealTimeSource())) req := mkMergeSet("1", "2") nextClock := hlc.Next(clockQueueDefault, commonclock.NewRealTimeSource()) @@ -862,13 +800,13 @@ func TestMergeSets(t *testing.T) { // Ensure it has the set ids of both sets bothSetIds := mergeSetIDs([]string{hashBuildId("1")}, []string{hashBuildId("2")}) assert.Equal(t, bothSetIds, updatedData.GetVersionSets()[1].GetSetIds()) - assert.Equal(t, initialData.GetVersionSets()[2].QueueDefaultUpdateTimestamp, updatedData.GetVersionSets()[1].QueueDefaultUpdateTimestamp) - assert.Equal(t, nextClock, *updatedData.GetVersionSets()[1].DefaultUpdateTimestamp) + assert.Equal(t, initialData.GetVersionSets()[2].BecameDefaultTimestamp, updatedData.GetVersionSets()[1].BecameDefaultTimestamp) + buildIds := updatedData.VersionSets[1].BuildIds + assert.Equal(t, nextClock, *buildIds[len(buildIds)-1].BecameDefaultTimestamp) // Initial data should not have changed assert.Equal(t, 4, len(initialData.VersionSets)) for _, set := range initialData.VersionSets { assert.Equal(t, 1, len(set.GetSetIds())) - assert.Equal(t, clock, *set.DefaultUpdateTimestamp) } // Same merge request must be idempotent @@ -879,9 +817,10 @@ func TestMergeSets(t *testing.T) { assert.Equal(t, "3", updatedData2.GetVersionSets()[2].GetBuildIds()[0].Id) assert.Equal(t, "1", updatedData2.GetVersionSets()[1].GetBuildIds()[1].Id) assert.Equal(t, "2", updatedData2.GetVersionSets()[1].GetBuildIds()[0].Id) - assert.Equal(t, initialData.GetVersionSets()[2].QueueDefaultUpdateTimestamp, updatedData2.GetVersionSets()[1].QueueDefaultUpdateTimestamp) + assert.Equal(t, initialData.GetVersionSets()[2].BecameDefaultTimestamp, updatedData2.GetVersionSets()[1].BecameDefaultTimestamp) // Clock shouldn't have changed - assert.Equal(t, nextClock, *updatedData2.GetVersionSets()[1].DefaultUpdateTimestamp) + buildIds = updatedData2.VersionSets[1].BuildIds + assert.Equal(t, nextClock, *buildIds[len(buildIds)-1].BecameDefaultTimestamp) // Verify merging into the current default maintains that set as the default req = mkMergeSet("3", "0") @@ -893,8 +832,9 @@ func TestMergeSets(t *testing.T) { assert.Equal(t, "0", updatedData3.GetVersionSets()[1].GetBuildIds()[0].Id) assert.Equal(t, "1", updatedData3.GetVersionSets()[0].GetBuildIds()[1].Id) assert.Equal(t, "2", updatedData3.GetVersionSets()[0].GetBuildIds()[0].Id) - assert.Equal(t, initialData.GetVersionSets()[3].QueueDefaultUpdateTimestamp, updatedData3.GetVersionSets()[1].QueueDefaultUpdateTimestamp) - assert.Equal(t, nextClock3, *updatedData3.GetVersionSets()[1].DefaultUpdateTimestamp) + assert.Equal(t, initialData.GetVersionSets()[3].BecameDefaultTimestamp, updatedData3.GetVersionSets()[1].BecameDefaultTimestamp) + buildIds = updatedData3.VersionSets[1].BuildIds + assert.Equal(t, nextClock3, *buildIds[len(buildIds)-1].BecameDefaultTimestamp) } func TestMergeInvalidTargets(t *testing.T) { diff --git a/service/worker/scanner/build_ids/scavenger_test.go b/service/worker/scanner/build_ids/scavenger_test.go index b7434662f22..b564e1034d3 100644 --- a/service/worker/scanner/build_ids/scavenger_test.go +++ b/service/worker/scanner/build_ids/scavenger_test.go @@ -115,64 +115,67 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) { SetIds: []string{"v1"}, BuildIds: []*persistencespb.BuildId{ { - Id: "v1.0", - State: persistencespb.STATE_DELETED, - StateUpdateTimestamp: &c0, + Id: "v1.0", + State: persistencespb.STATE_DELETED, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { - Id: "v1.1", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v1.1", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { - Id: "v1.2", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v1.2", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{"v2"}, BuildIds: []*persistencespb.BuildId{ { - Id: "v2.0", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v2.0", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{"v3"}, BuildIds: []*persistencespb.BuildId{ { - Id: "v3.0", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v3.0", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { - Id: "v3.1", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v3.1", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { SetIds: []string{"v4"}, BuildIds: []*persistencespb.BuildId{ { - Id: "v4.0", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v4.0", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, - QueueDefaultUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, }, @@ -334,17 +337,18 @@ func Test_ScavengeBuildIds_Heartbeats(t *testing.T) { SetIds: []string{"v1"}, BuildIds: []*persistencespb.BuildId{ { - Id: "v1.0", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v1.0", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, { - Id: "v1.1", - State: persistencespb.STATE_ACTIVE, - StateUpdateTimestamp: &c0, + Id: "v1.1", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + BecameDefaultTimestamp: &c0, }, }, - DefaultUpdateTimestamp: &c0, }, }, },