diff --git a/api/adminservice/v1/service.pb.go b/api/adminservice/v1/service.pb.go index 17991fd327f8..61cb44b7c331 100644 --- a/api/adminservice/v1/service.pb.go +++ b/api/adminservice/v1/service.pb.go @@ -113,7 +113,7 @@ var fileDescriptor_cf5ca5e0c737570d = []byte{ // Reference imports to suppress errors if they are not otherwise used. var _ context.Context -var _ grpc.ClientConnInterface +var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. @@ -185,10 +185,10 @@ type AdminServiceClient interface { } type adminServiceClient struct { - cc grpc.ClientConnInterface + cc *grpc.ClientConn } -func NewAdminServiceClient(cc grpc.ClientConnInterface) AdminServiceClient { +func NewAdminServiceClient(cc *grpc.ClientConn) AdminServiceClient { return &adminServiceClient{cc} } diff --git a/api/historyservice/v1/service.pb.go b/api/historyservice/v1/service.pb.go index e2a13d403ec0..a85aaa58ddc5 100644 --- a/api/historyservice/v1/service.pb.go +++ b/api/historyservice/v1/service.pb.go @@ -137,7 +137,7 @@ var fileDescriptor_655983da427ae822 = []byte{ // Reference imports to suppress errors if they are not otherwise used. var _ context.Context -var _ grpc.ClientConnInterface +var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. @@ -310,10 +310,10 @@ type HistoryServiceClient interface { } type historyServiceClient struct { - cc grpc.ClientConnInterface + cc *grpc.ClientConn } -func NewHistoryServiceClient(cc grpc.ClientConnInterface) HistoryServiceClient { +func NewHistoryServiceClient(cc *grpc.ClientConn) HistoryServiceClient { return &historyServiceClient{cc} } diff --git a/api/matchingservice/v1/service.pb.go b/api/matchingservice/v1/service.pb.go index bcef4f139cf6..d80a788ea1e3 100644 --- a/api/matchingservice/v1/service.pb.go +++ b/api/matchingservice/v1/service.pb.go @@ -94,7 +94,7 @@ var fileDescriptor_1a5c83076e651916 = []byte{ // Reference imports to suppress errors if they are not otherwise used. var _ context.Context -var _ grpc.ClientConnInterface +var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. @@ -146,10 +146,10 @@ type MatchingServiceClient interface { } type matchingServiceClient struct { - cc grpc.ClientConnInterface + cc *grpc.ClientConn } -func NewMatchingServiceClient(cc grpc.ClientConnInterface) MatchingServiceClient { +func NewMatchingServiceClient(cc *grpc.ClientConn) MatchingServiceClient { return &matchingServiceClient{cc} } diff --git a/api/persistence/v1/tasks.pb.go b/api/persistence/v1/tasks.pb.go index 6095d0600222..fe6b71b85a5a 100644 --- a/api/persistence/v1/tasks.pb.go +++ b/api/persistence/v1/tasks.pb.go @@ -303,12 +303,7 @@ func (m *TaskQueueInfo) GetVersioningData() *VersioningData { // Holds all the data related to worker versioning for a task queue. // Backwards-incompatible changes cannot be made, as this would make existing stored data unreadable type VersioningData struct { - // The currently established default worker build id version. - CurrentDefault *v12.VersionIdNode `protobuf:"bytes,1,opt,name=current_default,json=currentDefault,proto3" json:"current_default,omitempty"` - // Other current latest-compatible versions who are not the overall default. These are the - // versions that will be used when generating new tasks by following the graph from the - // version of the last task out to a leaf. - CompatibleLeaves []*v12.VersionIdNode `protobuf:"bytes,2,rep,name=compatible_leaves,json=compatibleLeaves,proto3" json:"compatible_leaves,omitempty"` + VersionSets []*v12.CompatibleVersionSet `protobuf:"bytes,1,rep,name=version_sets,json=versionSets,proto3" json:"version_sets,omitempty"` } func (m *VersioningData) Reset() { *m = VersioningData{} } @@ -343,16 +338,9 @@ func (m *VersioningData) XXX_DiscardUnknown() { var xxx_messageInfo_VersioningData proto.InternalMessageInfo -func (m *VersioningData) GetCurrentDefault() *v12.VersionIdNode { +func (m *VersioningData) GetVersionSets() []*v12.CompatibleVersionSet { if m != nil { - return m.CurrentDefault - } - return nil -} - -func (m *VersioningData) GetCompatibleLeaves() []*v12.VersionIdNode { - if m != nil { - return m.CompatibleLeaves + return m.VersionSets } return nil } @@ -421,53 +409,52 @@ func init() { } var fileDescriptor_f9c734e3b35cf986 = []byte{ - // 734 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0x4f, 0x4f, 0x13, 0x4f, - 0x18, 0xee, 0xd2, 0xd2, 0x3f, 0xd3, 0xdf, 0xaf, 0xc0, 0x24, 0xc6, 0x06, 0x93, 0x05, 0x1a, 0xa3, - 0x68, 0xc8, 0x6e, 0x40, 0x0f, 0x26, 0xc6, 0x28, 0x88, 0x87, 0x0a, 0x31, 0x61, 0x03, 0x1c, 0xf4, - 0xd0, 0x0c, 0x3b, 0x6f, 0xeb, 0xda, 0xed, 0xcc, 0x3a, 0x33, 0xbb, 0xd8, 0x9b, 0x1f, 0x81, 0x8f, - 0xe1, 0x67, 0xf0, 0x13, 0x78, 0xf0, 0xc0, 0x91, 0x78, 0x51, 0xca, 0xc5, 0x23, 0x1f, 0xc1, 0xcc, - 0x6c, 0xb7, 0xd0, 0x08, 0xb1, 0x26, 0xde, 0xf6, 0x9d, 0xf7, 0x79, 0x9e, 0x79, 0xe7, 0x79, 0x66, - 0x5a, 0xe4, 0x28, 0xe8, 0x45, 0x5c, 0x90, 0xd0, 0x95, 0x20, 0x12, 0x10, 0x2e, 0x89, 0x02, 0x37, - 0x02, 0x21, 0x03, 0xa9, 0x80, 0xf9, 0xe0, 0x26, 0xab, 0xae, 0x22, 0xb2, 0x2b, 0x9d, 0x48, 0x70, - 0xc5, 0x71, 0x23, 0xc3, 0x3b, 0x29, 0xde, 0x21, 0x51, 0xe0, 0x5c, 0xc2, 0x3b, 0xc9, 0xea, 0xfc, - 0x42, 0x87, 0xf3, 0x4e, 0x08, 0xae, 0x61, 0x1c, 0xc4, 0x6d, 0x57, 0x05, 0x3d, 0x90, 0x8a, 0xf4, - 0xa2, 0x54, 0x64, 0x7e, 0x89, 0x42, 0x04, 0x8c, 0x02, 0xf3, 0x03, 0x90, 0x6e, 0x87, 0x77, 0xb8, - 0x59, 0x37, 0x5f, 0x43, 0xc8, 0x9d, 0xd1, 0x5c, 0x7a, 0x20, 0x60, 0x71, 0x4f, 0x66, 0xa3, 0xb4, - 0xde, 0xc7, 0x10, 0xc3, 0x10, 0x77, 0x77, 0x0c, 0xa7, 0xdb, 0xa6, 0xab, 0xb1, 0x3d, 0x90, 0x92, - 0x74, 0x32, 0xe0, 0xfd, 0xab, 0x0e, 0xea, 0x87, 0xdc, 0xef, 0xfe, 0x86, 0x6d, 0x30, 0x34, 0xb7, - 0x1e, 0x86, 0xdc, 0x27, 0x0a, 0xe8, 0x2e, 0x91, 0xdd, 0x26, 0x6b, 0x73, 0xfc, 0x0c, 0x15, 0x28, - 0x51, 0xa4, 0x6e, 0x2d, 0x5a, 0xcb, 0xd5, 0xb5, 0x15, 0xe7, 0xcf, 0x46, 0x38, 0x19, 0xd7, 0x33, - 0x4c, 0x7c, 0x13, 0x95, 0xcc, 0xfc, 0x01, 0xad, 0x4f, 0x2d, 0x5a, 0xcb, 0x79, 0xaf, 0xa8, 0xcb, - 0x26, 0x6d, 0x7c, 0x9b, 0x42, 0xe5, 0xd1, 0x3e, 0x4b, 0xe8, 0x3f, 0x46, 0x7a, 0x20, 0x23, 0xe2, - 0x83, 0x86, 0xea, 0xfd, 0x2a, 0x5e, 0x75, 0xb4, 0xd6, 0xa4, 0x78, 0x01, 0x55, 0x0f, 0xb9, 0xe8, - 0xb6, 0x43, 0x7e, 0x98, 0x89, 0x55, 0x3c, 0x94, 0x2d, 0x35, 0x29, 0xbe, 0x81, 0x8a, 0x22, 0x66, - 0xba, 0x97, 0x37, 0xbd, 0x69, 0x11, 0xb3, 0x26, 0xc5, 0x2b, 0x08, 0x4b, 0xff, 0x2d, 0xd0, 0x38, - 0x04, 0xda, 0x82, 0x04, 0x98, 0xd2, 0x90, 0x82, 0x99, 0x65, 0x76, 0xd4, 0x79, 0xa1, 0x1b, 0x4d, - 0x8a, 0xd7, 0x51, 0xd5, 0x17, 0x40, 0x14, 0xb4, 0x74, 0x7e, 0xf5, 0x69, 0x73, 0xee, 0x79, 0x27, - 0x0d, 0xd7, 0xc9, 0xc2, 0x75, 0x76, 0xb3, 0x70, 0x37, 0x0a, 0x47, 0xdf, 0x17, 0x2c, 0x0f, 0xa5, - 0x24, 0xbd, 0xac, 0x25, 0xe0, 0x43, 0x14, 0x88, 0x7e, 0x2a, 0x51, 0x9c, 0x54, 0x22, 0x25, 0x19, - 0x89, 0xa7, 0x68, 0xda, 0xa4, 0x54, 0x2f, 0x19, 0xf2, 0xbd, 0x2b, 0x7d, 0x37, 0x08, 0xed, 0xf8, - 0x3e, 0xf8, 0x8a, 0x8b, 0xe7, 0xba, 0xf4, 0x52, 0x5e, 0xe3, 0x6b, 0x1e, 0xfd, 0xaf, 0xcd, 0xdd, - 0xd1, 0xf7, 0x62, 0x52, 0x87, 0x31, 0x2a, 0xe8, 0x72, 0x68, 0xad, 0xf9, 0xc6, 0xeb, 0xa8, 0x62, - 0xe2, 0x53, 0xfd, 0x08, 0x8c, 0xaf, 0xb5, 0xb5, 0xdb, 0x17, 0xd3, 0xe8, 0x31, 0xcc, 0x35, 0xcd, - 0x82, 0x37, 0xfb, 0xed, 0xf6, 0x23, 0xf0, 0xca, 0x9a, 0xa6, 0xbf, 0xf0, 0x23, 0x54, 0xe8, 0x06, - 0x2c, 0xb5, 0x7c, 0x02, 0xf6, 0x56, 0xc0, 0xa8, 0x67, 0x18, 0xf8, 0x16, 0xaa, 0x10, 0xbf, 0xdb, - 0x0a, 0x21, 0x81, 0xd0, 0x44, 0x91, 0xf7, 0xca, 0xc4, 0xef, 0x6e, 0xeb, 0xfa, 0x5f, 0xd8, 0xfc, - 0x12, 0xcd, 0x86, 0x44, 0xaa, 0x56, 0x1c, 0xd1, 0x51, 0xe2, 0xa5, 0x09, 0x75, 0x6a, 0x9a, 0xb9, - 0x67, 0x88, 0x46, 0xeb, 0x0d, 0x9a, 0x49, 0xf4, 0x43, 0xe0, 0x2c, 0x60, 0x9d, 0x96, 0x79, 0x34, - 0x65, 0x23, 0xb5, 0x36, 0xc9, 0xa3, 0xd9, 0x1f, 0x51, 0x37, 0x89, 0x22, 0x5e, 0x2d, 0x19, 0xab, - 0x1b, 0x9f, 0x2d, 0x54, 0x1b, 0x87, 0xe0, 0x1d, 0x34, 0xe3, 0xc7, 0x42, 0xe8, 0xeb, 0x4c, 0xa1, - 0x4d, 0xe2, 0x50, 0x0d, 0x1f, 0xe9, 0xf2, 0xb8, 0xc1, 0xa3, 0x5f, 0x87, 0x4b, 0xdb, 0x34, 0xe9, - 0x2b, 0x4e, 0xc1, 0xab, 0x0d, 0x05, 0x36, 0x53, 0x3e, 0xde, 0x43, 0x73, 0x3e, 0xef, 0x45, 0x44, - 0x05, 0x07, 0x21, 0xb4, 0x42, 0x20, 0x09, 0xc8, 0xfa, 0xd4, 0x62, 0xfe, 0xaf, 0x44, 0x67, 0x2f, - 0x24, 0xb6, 0x8d, 0x42, 0x83, 0xa0, 0x92, 0x0e, 0x77, 0x0b, 0xfa, 0xf8, 0x09, 0xaa, 0xb4, 0x03, - 0x31, 0x74, 0xda, 0x9a, 0xd0, 0xe9, 0xb2, 0xa6, 0x18, 0x8f, 0xaf, 0xfb, 0x2d, 0xd9, 0x78, 0x77, - 0x7c, 0x6a, 0xe7, 0x4e, 0x4e, 0xed, 0xdc, 0xf9, 0xa9, 0x6d, 0x7d, 0x1c, 0xd8, 0xd6, 0xa7, 0x81, - 0x6d, 0x7d, 0x19, 0xd8, 0xd6, 0xf1, 0xc0, 0xb6, 0x7e, 0x0c, 0x6c, 0xeb, 0xe7, 0xc0, 0xce, 0x9d, - 0x0f, 0x6c, 0xeb, 0xe8, 0xcc, 0xce, 0x1d, 0x9f, 0xd9, 0xb9, 0x93, 0x33, 0x3b, 0xf7, 0xfa, 0x61, - 0x87, 0x5f, 0x1c, 0x2b, 0xe0, 0xd7, 0xff, 0x19, 0x3c, 0xbe, 0x54, 0x1e, 0x14, 0xcd, 0xa0, 0x0f, - 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0xe0, 0x4b, 0xc0, 0xf0, 0x45, 0x06, 0x00, 0x00, + // 705 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0xcd, 0x6e, 0xd3, 0x4c, + 0x14, 0x8d, 0x9b, 0x34, 0x3f, 0x93, 0x7e, 0xf9, 0xca, 0x48, 0x88, 0xa8, 0x48, 0x6e, 0x1b, 0x21, + 0x28, 0xa8, 0xb2, 0xd5, 0xc0, 0x02, 0x09, 0x21, 0x68, 0x0b, 0x8b, 0x50, 0x36, 0x98, 0xd2, 0x05, + 0x2c, 0xa2, 0xa9, 0xe7, 0x26, 0x98, 0xd8, 0x33, 0xc6, 0x33, 0x76, 0xc9, 0x8e, 0x47, 0xe8, 0x63, + 0xf0, 0x28, 0x2c, 0x58, 0x74, 0x59, 0xb1, 0x81, 0xba, 0x1b, 0x96, 0x7d, 0x04, 0x34, 0xe3, 0xd8, + 0x6d, 0x44, 0x2b, 0xb2, 0x60, 0x37, 0x77, 0xee, 0x39, 0x67, 0xee, 0x9c, 0xe3, 0x31, 0xb2, 0x24, + 0x04, 0x21, 0x8f, 0x88, 0x6f, 0x0b, 0x88, 0x12, 0x88, 0x6c, 0x12, 0x7a, 0x76, 0x08, 0x91, 0xf0, + 0x84, 0x04, 0xe6, 0x82, 0x9d, 0x6c, 0xd8, 0x92, 0x88, 0x91, 0xb0, 0xc2, 0x88, 0x4b, 0x8e, 0x3b, + 0x39, 0xde, 0xca, 0xf0, 0x16, 0x09, 0x3d, 0xeb, 0x02, 0xde, 0x4a, 0x36, 0x96, 0x96, 0x87, 0x9c, + 0x0f, 0x7d, 0xb0, 0x35, 0x63, 0x3f, 0x1e, 0xd8, 0xd2, 0x0b, 0x40, 0x48, 0x12, 0x84, 0x99, 0xc8, + 0xd2, 0x2a, 0x85, 0x10, 0x18, 0x05, 0xe6, 0x7a, 0x20, 0xec, 0x21, 0x1f, 0x72, 0xbd, 0xaf, 0x57, + 0x13, 0xc8, 0xed, 0x62, 0x2e, 0x35, 0x10, 0xb0, 0x38, 0x10, 0xf9, 0x28, 0xfd, 0x8f, 0x31, 0xc4, + 0x30, 0xc1, 0xdd, 0x99, 0xc2, 0xa9, 0xb6, 0xee, 0x2a, 0x6c, 0x00, 0x42, 0x90, 0x61, 0x0e, 0xbc, + 0x77, 0xd9, 0x45, 0x5d, 0x9f, 0xbb, 0xa3, 0x3f, 0xb0, 0x1d, 0x86, 0xae, 0x6d, 0xfa, 0x3e, 0x77, + 0x89, 0x04, 0xba, 0x4b, 0xc4, 0xa8, 0xc7, 0x06, 0x1c, 0x3f, 0x45, 0x15, 0x4a, 0x24, 0x69, 0x1b, + 0x2b, 0xc6, 0x5a, 0xb3, 0xbb, 0x6e, 0xfd, 0xdd, 0x08, 0x2b, 0xe7, 0x3a, 0x9a, 0x89, 0x6f, 0xa0, + 0x9a, 0x9e, 0xdf, 0xa3, 0xed, 0xb9, 0x15, 0x63, 0xad, 0xec, 0x54, 0x55, 0xd9, 0xa3, 0x9d, 0xef, + 0x73, 0xa8, 0x5e, 0x9c, 0xb3, 0x8a, 0x16, 0x18, 0x09, 0x40, 0x84, 0xc4, 0x05, 0x05, 0x55, 0xe7, + 0x35, 0x9c, 0x66, 0xb1, 0xd7, 0xa3, 0x78, 0x19, 0x35, 0x0f, 0x78, 0x34, 0x1a, 0xf8, 0xfc, 0x20, + 0x17, 0x6b, 0x38, 0x28, 0xdf, 0xea, 0x51, 0x7c, 0x1d, 0x55, 0xa3, 0x98, 0xa9, 0x5e, 0x59, 0xf7, + 0xe6, 0xa3, 0x98, 0xf5, 0x28, 0x5e, 0x47, 0x58, 0xb8, 0xef, 0x81, 0xc6, 0x3e, 0xd0, 0x3e, 0x24, + 0xc0, 0xa4, 0x82, 0x54, 0xf4, 0x2c, 0x8b, 0x45, 0xe7, 0xb9, 0x6a, 0xf4, 0x28, 0xde, 0x44, 0x4d, + 0x37, 0x02, 0x22, 0xa1, 0xaf, 0xf2, 0x6b, 0xcf, 0xeb, 0x7b, 0x2f, 0x59, 0x59, 0xb8, 0x56, 0x1e, + 0xae, 0xb5, 0x9b, 0x87, 0xbb, 0x55, 0x39, 0xfc, 0xb1, 0x6c, 0x38, 0x28, 0x23, 0xa9, 0x6d, 0x25, + 0x01, 0x9f, 0x42, 0x2f, 0x1a, 0x67, 0x12, 0xd5, 0x59, 0x25, 0x32, 0x92, 0x96, 0x78, 0x82, 0xe6, + 0x75, 0x4a, 0xed, 0x9a, 0x26, 0xdf, 0xbd, 0xd4, 0x77, 0x8d, 0x50, 0x8e, 0xef, 0x81, 0x2b, 0x79, + 0xb4, 0xad, 0x4a, 0x27, 0xe3, 0x75, 0xbe, 0x95, 0xd1, 0x7f, 0xca, 0xdc, 0x57, 0xea, 0xbb, 0x98, + 0xd5, 0x61, 0x8c, 0x2a, 0xaa, 0x9c, 0x58, 0xab, 0xd7, 0x78, 0x13, 0x35, 0x74, 0x7c, 0x72, 0x1c, + 0x82, 0xf6, 0xb5, 0xd5, 0xbd, 0x75, 0x3e, 0x8d, 0x1a, 0x43, 0x7f, 0xa6, 0x79, 0xf0, 0xfa, 0xbc, + 0xdd, 0x71, 0x08, 0x4e, 0x5d, 0xd1, 0xd4, 0x0a, 0x3f, 0x44, 0x95, 0x91, 0xc7, 0x32, 0xcb, 0x67, + 0x60, 0xef, 0x78, 0x8c, 0x3a, 0x9a, 0x81, 0x6f, 0xa2, 0x06, 0x71, 0x47, 0x7d, 0x1f, 0x12, 0xf0, + 0x75, 0x14, 0x65, 0xa7, 0x4e, 0xdc, 0xd1, 0x4b, 0x55, 0xff, 0x0b, 0x9b, 0x5f, 0xa0, 0x45, 0x9f, + 0x08, 0xd9, 0x8f, 0x43, 0x5a, 0x24, 0x5e, 0x9b, 0x51, 0xa7, 0xa5, 0x98, 0x6f, 0x34, 0x51, 0x6b, + 0xbd, 0x43, 0xff, 0x27, 0xea, 0x21, 0x70, 0xe6, 0xb1, 0x61, 0x5f, 0x3f, 0x9a, 0xba, 0x96, 0xea, + 0xce, 0xf2, 0x68, 0xf6, 0x0a, 0xea, 0x33, 0x22, 0x89, 0xd3, 0x4a, 0xa6, 0xea, 0x0e, 0x45, 0xad, + 0x69, 0x04, 0x76, 0xd0, 0xc2, 0x04, 0xd3, 0x17, 0x20, 0x45, 0xdb, 0x58, 0x29, 0xaf, 0x35, 0xbb, + 0xf6, 0xb4, 0xb9, 0xc5, 0x9f, 0x41, 0x1d, 0xb1, 0xcd, 0x83, 0x90, 0x48, 0x6f, 0xdf, 0x87, 0x89, + 0xd4, 0x6b, 0x90, 0x4e, 0x33, 0x29, 0xd6, 0xa2, 0x43, 0x50, 0x4d, 0xa5, 0xb0, 0x03, 0x63, 0xfc, + 0x18, 0x35, 0x06, 0x5e, 0x34, 0xb1, 0xc4, 0x98, 0xd1, 0x92, 0xba, 0xa2, 0x68, 0x33, 0xae, 0x7a, + 0xf4, 0x5b, 0x1f, 0x8e, 0x4e, 0xcc, 0xd2, 0xf1, 0x89, 0x59, 0x3a, 0x3b, 0x31, 0x8d, 0xcf, 0xa9, + 0x69, 0x7c, 0x49, 0x4d, 0xe3, 0x6b, 0x6a, 0x1a, 0x47, 0xa9, 0x69, 0xfc, 0x4c, 0x4d, 0xe3, 0x57, + 0x6a, 0x96, 0xce, 0x52, 0xd3, 0x38, 0x3c, 0x35, 0x4b, 0x47, 0xa7, 0x66, 0xe9, 0xf8, 0xd4, 0x2c, + 0xbd, 0x7d, 0x30, 0xe4, 0xe7, 0x17, 0xf3, 0xf8, 0xd5, 0x7f, 0xed, 0x47, 0x17, 0xca, 0xfd, 0xaa, + 0x1e, 0xf4, 0xfe, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0xac, 0xd4, 0xe2, 0x22, 0xee, 0x05, 0x00, + 0x00, } func (this *AllocatedTaskInfo) Equal(that interface{}) bool { @@ -619,14 +606,11 @@ func (this *VersioningData) Equal(that interface{}) bool { } else if this == nil { return false } - if !this.CurrentDefault.Equal(that1.CurrentDefault) { - return false - } - if len(this.CompatibleLeaves) != len(that1.CompatibleLeaves) { + if len(this.VersionSets) != len(that1.VersionSets) { return false } - for i := range this.CompatibleLeaves { - if !this.CompatibleLeaves[i].Equal(that1.CompatibleLeaves[i]) { + for i := range this.VersionSets { + if !this.VersionSets[i].Equal(that1.VersionSets[i]) { return false } } @@ -717,13 +701,10 @@ func (this *VersioningData) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 5) s = append(s, "&persistence.VersioningData{") - if this.CurrentDefault != nil { - s = append(s, "CurrentDefault: "+fmt.Sprintf("%#v", this.CurrentDefault)+",\n") - } - if this.CompatibleLeaves != nil { - s = append(s, "CompatibleLeaves: "+fmt.Sprintf("%#v", this.CompatibleLeaves)+",\n") + if this.VersionSets != nil { + s = append(s, "VersionSets: "+fmt.Sprintf("%#v", this.VersionSets)+",\n") } s = append(s, "}") return strings.Join(s, "") @@ -972,10 +953,10 @@ func (m *VersioningData) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.CompatibleLeaves) > 0 { - for iNdEx := len(m.CompatibleLeaves) - 1; iNdEx >= 0; iNdEx-- { + if len(m.VersionSets) > 0 { + for iNdEx := len(m.VersionSets) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.CompatibleLeaves[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.VersionSets[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -983,21 +964,9 @@ func (m *VersioningData) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintTasks(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x12 + dAtA[i] = 0xa } } - if m.CurrentDefault != nil { - { - size, err := m.CurrentDefault.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTasks(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } return len(dAtA) - i, nil } @@ -1027,12 +996,12 @@ func (m *TaskKey) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x10 } if m.FireTime != nil { - n9, err9 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.FireTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(*m.FireTime):]) - if err9 != nil { - return 0, err9 + n8, err8 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.FireTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(*m.FireTime):]) + if err8 != nil { + return 0, err8 } - i -= n9 - i = encodeVarintTasks(dAtA, i, uint64(n9)) + i -= n8 + i = encodeVarintTasks(dAtA, i, uint64(n8)) i-- dAtA[i] = 0xa } @@ -1146,12 +1115,8 @@ func (m *VersioningData) Size() (n int) { } var l int _ = l - if m.CurrentDefault != nil { - l = m.CurrentDefault.Size() - n += 1 + l + sovTasks(uint64(l)) - } - if len(m.CompatibleLeaves) > 0 { - for _, e := range m.CompatibleLeaves { + if len(m.VersionSets) > 0 { + for _, e := range m.VersionSets { l = e.Size() n += 1 + l + sovTasks(uint64(l)) } @@ -1229,14 +1194,13 @@ func (this *VersioningData) String() string { if this == nil { return "nil" } - repeatedStringForCompatibleLeaves := "[]*VersionIdNode{" - for _, f := range this.CompatibleLeaves { - repeatedStringForCompatibleLeaves += strings.Replace(fmt.Sprintf("%v", f), "VersionIdNode", "v12.VersionIdNode", 1) + "," + repeatedStringForVersionSets := "[]*CompatibleVersionSet{" + for _, f := range this.VersionSets { + repeatedStringForVersionSets += strings.Replace(fmt.Sprintf("%v", f), "CompatibleVersionSet", "v12.CompatibleVersionSet", 1) + "," } - repeatedStringForCompatibleLeaves += "}" + repeatedStringForVersionSets += "}" s := strings.Join([]string{`&VersioningData{`, - `CurrentDefault:` + strings.Replace(fmt.Sprintf("%v", this.CurrentDefault), "VersionIdNode", "v12.VersionIdNode", 1) + `,`, - `CompatibleLeaves:` + repeatedStringForCompatibleLeaves + `,`, + `VersionSets:` + repeatedStringForVersionSets + `,`, `}`, }, "") return s @@ -1957,43 +1921,7 @@ func (m *VersioningData) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field CurrentDefault", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTasks - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthTasks - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthTasks - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.CurrentDefault == nil { - m.CurrentDefault = &v12.VersionIdNode{} - } - if err := m.CurrentDefault.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field CompatibleLeaves", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field VersionSets", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -2020,8 +1948,8 @@ func (m *VersioningData) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.CompatibleLeaves = append(m.CompatibleLeaves, &v12.VersionIdNode{}) - if err := m.CompatibleLeaves[len(m.CompatibleLeaves)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + m.VersionSets = append(m.VersionSets, &v12.CompatibleVersionSet{}) + if err := m.VersionSets[len(m.VersionSets)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex diff --git a/proto/internal/temporal/server/api/persistence/v1/tasks.proto b/proto/internal/temporal/server/api/persistence/v1/tasks.proto index 558c1af0d51c..d149df98d135 100644 --- a/proto/internal/temporal/server/api/persistence/v1/tasks.proto +++ b/proto/internal/temporal/server/api/persistence/v1/tasks.proto @@ -63,12 +63,7 @@ message TaskQueueInfo { // Holds all the data related to worker versioning for a task queue. // Backwards-incompatible changes cannot be made, as this would make existing stored data unreadable message VersioningData { - // The currently established default worker build id version. - temporal.api.taskqueue.v1.VersionIdNode current_default = 1; - // Other current latest-compatible versions who are not the overall default. These are the - // versions that will be used when generating new tasks by following the graph from the - // version of the last task out to a leaf. - repeated temporal.api.taskqueue.v1.VersionIdNode compatible_leaves = 2; + repeated temporal.api.taskqueue.v1.CompatibleVersionSet version_sets = 1; } message TaskKey { diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 1b933bdfb7f4..f4d0e026d5ae 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4266,21 +4266,52 @@ func (wh *WorkflowHandler) validateBuildIdOrderingUpdate( ) error { errstr := "request to update worker build id ordering requires:" hadErr := false + + checkIdLen := func(id string) { + if len(id) > wh.config.WorkerBuildIdSizeLimit() { + errstr += fmt.Sprintf(" Worker build IDs to be no larger than %v characters", + wh.config.WorkerBuildIdSizeLimit()) + hadErr = true + } + } + if req.GetNamespace() == "" { - errstr += " `namespace` to be set" + errstr += " `namespace` to be set." hadErr = true } if req.GetTaskQueue() == "" { - errstr += " `task_queue` to be set" + errstr += " `task_queue` to be set." hadErr = true } - if req.GetVersionId().GetWorkerBuildId() == "" { - errstr += " targeting a valid version identifier" + if req.GetOperation() == nil { + errstr += " an operation to be specified." hadErr = true } - if len(req.GetVersionId().GetWorkerBuildId()) > wh.config.WorkerBuildIdSizeLimit() { - errstr += fmt.Sprintf(" Worker build IDs to be no larger than %v characters", wh.config.WorkerBuildIdSizeLimit()) - hadErr = true + if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion_); ok { + if x.NewCompatibleVersion.GetNewVersionId() == "" { + errstr += " `new_version_id` to be set." + hadErr = true + } else { + checkIdLen(x.NewCompatibleVersion.GetNewVersionId()) + } + if x.NewCompatibleVersion.GetExistingCompatibleVersion() == "" { + errstr += " `existing_compatible_version` to be set." + hadErr = true + } + } else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdOrderingRequest_NewDefaultVersionId); ok { + if x.NewDefaultVersionId == "" { + errstr += " `new_default_major_version_id` to be set." + hadErr = true + } else { + checkIdLen(x.NewDefaultVersionId) + } + } else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdOrderingRequest_ExistingVersionIdInSetToPromote); ok { + if x.ExistingVersionIdInSetToPromote == "" { + errstr += " `existing_version_id_in_set_to_promote` to be set." + hadErr = true + } else { + checkIdLen(x.ExistingVersionIdInSetToPromote) + } } if hadErr { return serviceerror.NewInvalidArgument(errstr) diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 794ebdb7ac9d..23cebb004052 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -744,7 +744,7 @@ func (e *matchingEngineImpl) GetWorkerBuildIdOrdering( return nil, err } return &matchingservice.GetWorkerBuildIdOrderingResponse{ - Response: ToBuildIdOrderingResponse(verDat, int(req.GetRequest().GetMaxDepth())), + Response: ToBuildIdOrderingResponse(verDat, int(req.GetRequest().GetMaxSets())), }, nil } diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 2bda8e3aeede..64dcc7e39c88 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -1856,7 +1856,7 @@ func (s *matchingEngineSuite) TestGetVersioningData() { Request: &workflowservice.GetWorkerBuildIdOrderingRequest{ Namespace: namespaceID.String(), TaskQueue: tq, - MaxDepth: 0, + MaxSets: 0, }, }) s.NoError(err) @@ -1864,14 +1864,15 @@ func (s *matchingEngineSuite) TestGetVersioningData() { // Set a long list of versions for i := 0; i < 100; i++ { - id := mkVerId(fmt.Sprintf("%d", i)) + id := fmt.Sprintf("%d", i) res, err := s.matchingEngine.UpdateWorkerBuildIdOrdering(s.handlerContext, &matchingservice.UpdateWorkerBuildIdOrderingRequest{ NamespaceId: namespaceID.String(), Request: &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - Namespace: namespaceID.String(), - TaskQueue: tq, - VersionId: id, - BecomeDefault: true, + Namespace: namespaceID.String(), + TaskQueue: tq, + Operation: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewDefaultVersionId{ + NewDefaultVersionId: id, + }, }, }) s.NoError(err) @@ -1879,18 +1880,23 @@ func (s *matchingEngineSuite) TestGetVersioningData() { } // Make a long compat-versions chain for i := 0; i < 10; i++ { - id := mkVerId(fmt.Sprintf("99.%d", i)) - prevCompat := mkVerId(fmt.Sprintf("99.%d", i-1)) + id := fmt.Sprintf("99.%d", i) + prevCompat := fmt.Sprintf("99.%d", i-1) if i == 0 { - prevCompat = mkVerId("99") + prevCompat = "99" } res, err := s.matchingEngine.UpdateWorkerBuildIdOrdering(s.handlerContext, &matchingservice.UpdateWorkerBuildIdOrderingRequest{ NamespaceId: namespaceID.String(), Request: &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - Namespace: namespaceID.String(), - TaskQueue: tq, - VersionId: id, - PreviousCompatible: prevCompat, + Namespace: namespaceID.String(), + TaskQueue: tq, + Operation: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion_{ + NewCompatibleVersion: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion{ + NewVersionId: id, + ExistingCompatibleVersion: prevCompat, + BecomeDefault: false, + }, + }, }, }) s.NoError(err) @@ -1903,18 +1909,18 @@ func (s *matchingEngineSuite) TestGetVersioningData() { Request: &workflowservice.GetWorkerBuildIdOrderingRequest{ Namespace: namespaceID.String(), TaskQueue: tq, - MaxDepth: 0, + MaxSets: 0, }, }) s.NoError(err) - s.NotNil(res.GetResponse().GetCurrentDefault()) - lastNode := res.GetResponse().GetCurrentDefault() - s.Equal(mkVerId("99"), lastNode.GetVersion()) - for lastNode.GetPreviousIncompatible() != nil { - lastNode = lastNode.GetPreviousIncompatible() - } - s.Equal(mkVerId("0"), lastNode.GetVersion()) - s.Equal(mkVerId("99.9"), res.GetResponse().GetCompatibleLeaves()[0].GetVersion()) + //s.NotNil(res.GetResponse().GetCurrentDefault()) + //lastNode := res.GetResponse().GetCurrentDefault() + //s.Equal(mkVerId("99"), lastNode.GetVersion()) + //for lastNode.GetPreviousIncompatible() != nil { + // lastNode = lastNode.GetPreviousIncompatible() + //} + //s.Equal(mkVerId("0"), lastNode.GetVersion()) + //s.Equal(mkVerId("99.9"), res.GetResponse().GetCompatibleLeaves()[0].GetVersion()) // Ensure depth limiting works res, err = s.matchingEngine.GetWorkerBuildIdOrdering(s.handlerContext, &matchingservice.GetWorkerBuildIdOrderingRequest{ @@ -1922,40 +1928,40 @@ func (s *matchingEngineSuite) TestGetVersioningData() { Request: &workflowservice.GetWorkerBuildIdOrderingRequest{ Namespace: namespaceID.String(), TaskQueue: tq, - MaxDepth: 1, + MaxSets: 1, }, }) s.NoError(err) - s.NotNil(res.GetResponse().GetCurrentDefault()) - s.Nil(res.GetResponse().GetCurrentDefault().GetPreviousIncompatible()) - s.Nil(res.GetResponse().GetCompatibleLeaves()[0].GetPreviousCompatible()) + //s.NotNil(res.GetResponse().GetCurrentDefault()) + //s.Nil(res.GetResponse().GetCurrentDefault().GetPreviousIncompatible()) + //s.Nil(res.GetResponse().GetCompatibleLeaves()[0].GetPreviousCompatible()) res, err = s.matchingEngine.GetWorkerBuildIdOrdering(s.handlerContext, &matchingservice.GetWorkerBuildIdOrderingRequest{ NamespaceId: namespaceID.String(), Request: &workflowservice.GetWorkerBuildIdOrderingRequest{ Namespace: namespaceID.String(), TaskQueue: tq, - MaxDepth: 5, + MaxSets: 5, }, }) s.NoError(err) - s.NotNil(res.GetResponse().GetCurrentDefault()) - lastNode = res.GetResponse().GetCurrentDefault() - for { - if lastNode.GetPreviousIncompatible() == nil { - break - } - lastNode = lastNode.GetPreviousIncompatible() - } - s.Equal(mkVerId("95"), lastNode.GetVersion()) - lastNode = res.GetResponse().GetCompatibleLeaves()[0] - for { - if lastNode.GetPreviousCompatible() == nil { - break - } - lastNode = lastNode.GetPreviousCompatible() - } - s.Equal(mkVerId("99.5"), lastNode.GetVersion()) + //s.NotNil(res.GetResponse().GetCurrentDefault()) + //lastNode = res.GetResponse().GetCurrentDefault() + //for { + // if lastNode.GetPreviousIncompatible() == nil { + // break + // } + // lastNode = lastNode.GetPreviousIncompatible() + //} + //s.Equal(mkVerId("95"), lastNode.GetVersion()) + //lastNode = res.GetResponse().GetCompatibleLeaves()[0] + //for { + // if lastNode.GetPreviousCompatible() == nil { + // break + // } + // lastNode = lastNode.GetPreviousCompatible() + //} + //s.Equal(mkVerId("99.5"), lastNode.GetVersion()) } func (s *matchingEngineSuite) TestActivityQueueMetadataInvalidate() { @@ -1974,7 +1980,6 @@ func (s *matchingEngineSuite) TestActivityQueueMetadataInvalidate() { Request: &workflowservice.GetWorkerBuildIdOrderingRequest{ Namespace: namespaceID.String(), TaskQueue: tq, - MaxDepth: 0, }, }) s.NoError(err) @@ -1988,10 +1993,12 @@ func (s *matchingEngineSuite) TestActivityQueueMetadataInvalidate() { s.NotNil(ttqm) _, err = s.matchingEngine.InvalidateTaskQueueMetadata(s.handlerContext, &matchingservice.InvalidateTaskQueueMetadataRequest{ - NamespaceId: namespaceID.String(), - TaskQueue: tq, - TaskQueueType: enumspb.TASK_QUEUE_TYPE_ACTIVITY, - VersioningData: &persistencespb.VersioningData{CurrentDefault: mkVerIdNode("hi")}, + NamespaceId: namespaceID.String(), + TaskQueue: tq, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_ACTIVITY, + VersioningData: &persistencespb.VersioningData{ + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("hi")}, + }, }) s.NoError(err) } diff --git a/service/matching/taskQueueManager_test.go b/service/matching/taskQueueManager_test.go index c8fec94ee335..0d8c7d1e1776 100644 --- a/service/matching/taskQueueManager_test.go +++ b/service/matching/taskQueueManager_test.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "go.temporal.io/api/taskqueue/v1" "math" "sync/atomic" "testing" @@ -512,7 +513,7 @@ func TestTaskQueueSubParitionFetchesVersioningInfoFromRootPartitionOnInit(t *tes tqCfg.tqId = subTqId data := &persistencespb.VersioningData{ - CurrentDefault: mkVerIdNode("0"), + VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -551,7 +552,7 @@ func TestTaskQueueSubParitionSendsCurrentHashOfVersioningDataWhenFetching(t *tes tqCfg.tqId = subTqId data := &persistencespb.VersioningData{ - CurrentDefault: mkVerIdNode("0"), + VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -617,7 +618,7 @@ func TestTaskQueueRootPartitionNotifiesChildrenOfInvalidation(t *testing.T) { // Make a change, mock verifies children are invalidated require.NoError(t, rootTq.MutateVersioningData(ctx, func(vd *persistencespb.VersioningData) error { *vd = persistencespb.VersioningData{ - CurrentDefault: mkVerIdNode("0"), + VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, } return nil })) @@ -639,7 +640,7 @@ func TestTaskQueueSubPartitionPollsPeriodically(t *testing.T) { asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ VersioningData: &persistencespb.VersioningData{ - CurrentDefault: mkVerIdNode("0"), + VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, }, }, } @@ -677,7 +678,7 @@ func TestTaskQueueSubPartitionDoesNotPollIfNoDataThenPollsWhenInvalidated(t *tes }, } verDat := &persistencespb.VersioningData{ - CurrentDefault: mkVerIdNode("0"), + VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, } hasDatResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -724,7 +725,7 @@ func TestTaskQueueManagerWaitInitFailThenPass(t *testing.T) { } data := &persistencespb.VersioningData{ - CurrentDefault: mkVerIdNode("0"), + VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -791,7 +792,7 @@ func TestActivityQueueGetsVersioningDataFromWorkflowQueue(t *testing.T) { ctx := context.Background() data := &persistencespb.VersioningData{ - CurrentDefault: mkVerIdNode("0"), + VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ diff --git a/service/matching/version_graph.go b/service/matching/version_graph.go deleted file mode 100644 index 4ee1d929ca3a..000000000000 --- a/service/matching/version_graph.go +++ /dev/null @@ -1,274 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "encoding/binary" - "fmt" - - "github.com/dgryski/go-farm" - - "github.com/gogo/protobuf/proto" - "go.temporal.io/api/serviceerror" - taskqueuepb "go.temporal.io/api/taskqueue/v1" - "go.temporal.io/api/workflowservice/v1" - "go.temporal.io/server/api/persistence/v1" -) - -func ToBuildIdOrderingResponse(g *persistence.VersioningData, maxDepth int) *workflowservice.GetWorkerBuildIdOrderingResponse { - return depthLimiter(g, maxDepth, true) -} - -// HashVersioningData returns a farm.Fingerprint64 hash of the versioning data as bytes. If the data is nonexistent or -// invalid, returns nil. -func HashVersioningData(data *persistence.VersioningData) []byte { - if data == nil || data.GetCurrentDefault() == nil { - return nil - } - asBytes, err := data.Marshal() - if err != nil { - return nil - } - b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, farm.Fingerprint64(asBytes)) - return b -} - -func depthLimiter(g *persistence.VersioningData, maxDepth int, noMutate bool) *workflowservice.GetWorkerBuildIdOrderingResponse { - curDefault := g.GetCurrentDefault() - compatLeaves := g.GetCompatibleLeaves() - if maxDepth > 0 { - if noMutate { - curDefault = proto.Clone(g.GetCurrentDefault()).(*taskqueuepb.VersionIdNode) - } - curNode := curDefault - curDepth := 1 - for curDepth < maxDepth { - if curNode.GetPreviousIncompatible() == nil { - break - } - curNode = curNode.GetPreviousIncompatible() - curDepth++ - } - if curNode != nil { - curNode.PreviousIncompatible = nil - } - // Apply to compatible leaves as well - newCompatLeaves := make([]*taskqueuepb.VersionIdNode, len(g.GetCompatibleLeaves())) - for ix := range compatLeaves { - compatLeaf := compatLeaves[ix] - if noMutate { - compatLeaf = proto.Clone(compatLeaves[ix]).(*taskqueuepb.VersionIdNode) - } - curNode = compatLeaf - curDepth = 1 - for curDepth < maxDepth { - if curNode.GetPreviousCompatible() == nil { - break - } - curNode = curNode.GetPreviousCompatible() - curDepth++ - } - if curNode != nil { - curNode.PreviousCompatible = nil - } - newCompatLeaves[ix] = compatLeaf - } - compatLeaves = newCompatLeaves - } - return &workflowservice.GetWorkerBuildIdOrderingResponse{ - CurrentDefault: curDefault, - CompatibleLeaves: compatLeaves, - } -} - -// Given an existing graph and an update request, update the graph appropriately. -// -// See the API docs for more detail. In short, the graph looks like one long line of default versions, each of which -// is incompatible with the previous, optionally with branching compatibility branches. Like so: -// -// ─┬─1.0───2.0─┬─3.0───4.0 -// │ ├─3.1 -// │ └─3.2 -// ├─1.1 -// ├─1.2 -// └─1.3 -// -// In the above graph, 4.0 is the current default, and [1.3, 3.2] is the set of current compatible leaves. Links -// going left are incompatible relationships, and links going up are compatible relationships. -// -// A request may: -// 1. Add a new version to the graph, as a default version -// 2. Add a new version to the graph, compatible with some existing version. -// 3. Add a new version to the graph, compatible with some existing version and as the new default. -// 4. Unset a version as a default. It will be dropped and its previous incompatible version becomes default. -// 5. Unset a version as a compatible. It will be dropped and its previous compatible version will become the new -// compatible leaf for that branch. -func UpdateVersionsGraph(existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdOrderingRequest, maxSize int) error { - if req.GetVersionId().GetWorkerBuildId() == "" { - return serviceerror.NewInvalidArgument( - "request to update worker build id ordering is missing a valid version identifier") - } - err := updateImpl(existingData, req) - if err != nil { - return err - } - // Limit graph size if it's grown too large - depthLimiter(existingData, maxSize, false) - return nil -} - -func updateImpl(existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdOrderingRequest) error { - // If the version is to become the new default, add it to the list of current defaults, possibly replacing - // the currently set one. - if req.GetBecomeDefault() { - curDefault := existingData.GetCurrentDefault() - isCompatWithCurDefault := - req.GetPreviousCompatible().GetWorkerBuildId() == curDefault.GetVersion().GetWorkerBuildId() - if req.GetPreviousCompatible() != nil && !isCompatWithCurDefault { - // It does not make sense to introduce a version which is the new overall default, but is somehow also - // supposed to be compatible with some existing version, as that would necessarily imply that the newly - // added version is somehow both compatible and incompatible with the same target version. - return serviceerror.NewInvalidArgument("adding a new default version which is compatible " + - " with any version other than the existing default is not allowed.") - } - if curDefault != nil { - // If the current default is going to be the previous compat version with the one we're adding, - // then we need to skip over it when setting the previous *incompatible* version. - if isCompatWithCurDefault { - existingData.CurrentDefault = &taskqueuepb.VersionIdNode{ - Version: req.VersionId, - PreviousCompatible: curDefault, - PreviousIncompatible: curDefault.PreviousIncompatible, - } - } else { - // Otherwise, set the previous incompatible version to the current default. - existingData.CurrentDefault = &taskqueuepb.VersionIdNode{ - Version: req.VersionId, - PreviousCompatible: nil, - PreviousIncompatible: curDefault, - } - } - } else { - existingData.CurrentDefault = &taskqueuepb.VersionIdNode{ - Version: req.VersionId, - PreviousCompatible: nil, - PreviousIncompatible: nil, - } - } - } else { - if req.GetPreviousCompatible() != nil { - prevCompat, indexInCompatLeaves := findCompatibleNode(existingData, req.GetPreviousCompatible()) - if prevCompat != nil { - newNode := &taskqueuepb.VersionIdNode{ - Version: req.VersionId, - PreviousCompatible: prevCompat, - PreviousIncompatible: nil, - } - if indexInCompatLeaves >= 0 { - existingData.CompatibleLeaves[indexInCompatLeaves] = newNode - } else { - existingData.CompatibleLeaves = append(existingData.CompatibleLeaves, newNode) - } - } else { - return serviceerror.NewNotFound( - fmt.Sprintf("previous compatible version %v not found", req.GetPreviousCompatible())) - } - } else { - // Check if the version is already a default, and remove it from being one if it is. - curDefault := existingData.GetCurrentDefault() - if curDefault.GetVersion().Equal(req.GetVersionId()) { - existingData.CurrentDefault = nil - if curDefault.GetPreviousCompatible() != nil { - existingData.CurrentDefault = curDefault.GetPreviousCompatible() - } else if curDefault.GetPreviousIncompatible() != nil { - existingData.CurrentDefault = curDefault.GetPreviousIncompatible() - } - return nil - } - // Check if it's a compatible leaf, and remove it from being one if it is. - for i, def := range existingData.GetCompatibleLeaves() { - if def.GetVersion().Equal(req.GetVersionId()) { - existingData.CompatibleLeaves = - append(existingData.CompatibleLeaves[:i], existingData.CompatibleLeaves[i+1:]...) - if def.GetPreviousCompatible() != nil { - existingData.CompatibleLeaves = - append(existingData.CompatibleLeaves, def.GetPreviousCompatible()) - } - return nil - } - } - return serviceerror.NewInvalidArgument( - "requests to update build id ordering cannot create a new non-default version with no links") - } - } - return nil -} - -// Finds the node that the provided version should point at, given that it says it's compatible with the provided -// version. Note that this does not necessary mean *that* node. If the version being targeted as compatible has nodes -// which already point at it as their previous compatible version, that chain will be followed out to the leaf, which -// will be returned. -func findCompatibleNode( - existingData *persistence.VersioningData, - versionId *taskqueuepb.VersionId, -) (*taskqueuepb.VersionIdNode, int) { - // First search down from all existing compatible leaves, as if any of those chains point at the desired version, - // we will need to return that leaf. - for ix, node := range existingData.GetCompatibleLeaves() { - if node.GetVersion().Equal(versionId) { - return node, ix - } - if findInNode(node, versionId) != nil { - return node, ix - } - } - // Otherwise, this must be targeting some version in the default/incompatible chain, and it will become a new leaf - curDefault := existingData.GetCurrentDefault() - if curDefault.GetVersion().Equal(versionId) { - return curDefault, -1 - } - if nn := findInNode(curDefault, versionId); nn != nil { - return nn, -1 - } - - return nil, -1 -} - -func findInNode( - node *taskqueuepb.VersionIdNode, - versionId *taskqueuepb.VersionId, -) *taskqueuepb.VersionIdNode { - if node.GetVersion().Equal(versionId) { - return node - } - if node.GetPreviousCompatible() != nil { - return findInNode(node.GetPreviousCompatible(), versionId) - } - if node.GetPreviousIncompatible() != nil { - return findInNode(node.GetPreviousIncompatible(), versionId) - } - return nil -} diff --git a/service/matching/version_graph_test.go b/service/matching/version_graph_test.go deleted file mode 100644 index 439144b848bf..000000000000 --- a/service/matching/version_graph_test.go +++ /dev/null @@ -1,408 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "go.temporal.io/api/serviceerror" - taskqueuepb "go.temporal.io/api/taskqueue/v1" - "go.temporal.io/api/workflowservice/v1" - - persistencepb "go.temporal.io/server/api/persistence/v1" -) - -func mkVerIdNode(id string) *taskqueuepb.VersionIdNode { - return &taskqueuepb.VersionIdNode{ - Version: mkVerId(id), - } -} - -func mkVerId(id string) *taskqueuepb.VersionId { - return &taskqueuepb.VersionId{ - WorkerBuildId: id, - } -} - -func TestNewDefaultGraphUpdate(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1.PreviousIncompatible = n0 - data := &persistencepb.VersioningData{ - CurrentDefault: n1, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("2"), - BecomeDefault: true, - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(req.VersionId)) - assert.Equal(t, "2", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.Equal(t, n1, data.CurrentDefault.PreviousIncompatible) - assert.Equal(t, "1", data.CurrentDefault.PreviousIncompatible.Version.GetWorkerBuildId()) - assert.Equal(t, n0, data.CurrentDefault.PreviousIncompatible.PreviousIncompatible) - assert.Equal(t, "0", data.CurrentDefault.PreviousIncompatible.PreviousIncompatible.Version.GetWorkerBuildId()) - - asResp := ToBuildIdOrderingResponse(data, 0) - assert.Equal(t, 0, len(asResp.GetCompatibleLeaves())) -} - -func TestNewDefaultGraphUpdateOfEmptyGraph(t *testing.T) { - data := &persistencepb.VersioningData{} - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("1"), - BecomeDefault: true, - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(req.VersionId)) - assert.Equal(t, "1", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.Nil(t, data.CurrentDefault.GetPreviousIncompatible()) - assert.Nil(t, data.CurrentDefault.GetPreviousCompatible()) -} - -func TestNewDefaultGraphUpdateCompatWithCurDefault(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1.PreviousIncompatible = n0 - data := &persistencepb.VersioningData{ - CurrentDefault: n1, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("2"), - PreviousCompatible: mkVerId("1"), - BecomeDefault: true, - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(req.VersionId)) - assert.Equal(t, "2", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.Equal(t, n1, data.CurrentDefault.PreviousCompatible) - assert.Equal(t, "1", data.CurrentDefault.PreviousCompatible.Version.GetWorkerBuildId()) - assert.Equal(t, n0, data.CurrentDefault.PreviousIncompatible) - assert.Equal(t, "0", data.CurrentDefault.PreviousIncompatible.Version.GetWorkerBuildId()) -} - -func TestNewDefaultGraphUpdateCompatWithSomethingElse(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1.PreviousIncompatible = n0 - data := &persistencepb.VersioningData{ - CurrentDefault: n1, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("2"), - PreviousCompatible: mkVerId("0"), - BecomeDefault: true, - } - - err := UpdateVersionsGraph(data, req, 0) - assert.Error(t, err) - assert.IsType(t, &serviceerror.InvalidArgument{}, err) -} - -func TestNewCompatibleWithNodeDeepInIncompatChain(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1.PreviousIncompatible = n0 - n2 := mkVerIdNode("2") - n2.PreviousIncompatible = n1 - data := &persistencepb.VersioningData{ - CurrentDefault: n2, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("0.1"), - PreviousCompatible: mkVerId("0"), - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.Equal(t, "2", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.True(t, data.CompatibleLeaves[0].Version.Equal(req.VersionId)) - assert.Equal(t, "0.1", data.CompatibleLeaves[0].Version.GetWorkerBuildId()) - assert.Equal(t, "0", data.CompatibleLeaves[0].PreviousCompatible.Version.GetWorkerBuildId()) - - asResp := ToBuildIdOrderingResponse(data, 0) - assert.Equal(t, 1, len(asResp.GetCompatibleLeaves())) - assert.Equal(t, "0.1", asResp.CompatibleLeaves[0].Version.GetWorkerBuildId()) -} - -func TestNewCompatibleWithNonDefaultGraphUpdate(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1.PreviousIncompatible = n0 - data := &persistencepb.VersioningData{ - CurrentDefault: n1, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("0.1"), - PreviousCompatible: mkVerId("0"), - } - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(n1.Version)) - assert.Equal(t, "1", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.Equal(t, 1, len(data.CompatibleLeaves)) - assert.True(t, data.CompatibleLeaves[0].Version.Equal(req.VersionId)) - assert.Equal(t, "0.1", data.CompatibleLeaves[0].Version.GetWorkerBuildId()) - - req = &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("0.2"), - PreviousCompatible: mkVerId("0.1"), - } - err = UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(n1.Version)) - assert.Equal(t, "1", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.Equal(t, 1, len(data.CompatibleLeaves)) - assert.True(t, data.CompatibleLeaves[0].Version.Equal(req.VersionId)) - assert.Equal(t, "0.2", data.CompatibleLeaves[0].Version.GetWorkerBuildId()) - - // Ensure setting a compatible version which targets a non-leaf compat version ends up without a branch - req = &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("0.3"), - PreviousCompatible: mkVerId("0.1"), - } - err = UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(n1.Version)) - assert.Equal(t, "1", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.Equal(t, 1, len(data.CompatibleLeaves)) - assert.True(t, data.CompatibleLeaves[0].Version.Equal(req.VersionId)) - assert.Equal(t, "0.3", data.CompatibleLeaves[0].Version.GetWorkerBuildId()) - assert.Equal(t, "0.2", data.CompatibleLeaves[0].PreviousCompatible.Version.GetWorkerBuildId()) -} - -func TestAddingNewNodeCompatWithPreviousWhenNoDefaultNotAllowed(t *testing.T) { - data := &persistencepb.VersioningData{} - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("0.1"), - PreviousCompatible: mkVerId("0"), - BecomeDefault: true, - } - err := UpdateVersionsGraph(data, req, 0) - assert.Error(t, err) - assert.IsType(t, &serviceerror.InvalidArgument{}, err) -} - -func TestAddingNewNodeWithNoLinksNotAllowed(t *testing.T) { - data := &persistencepb.VersioningData{} - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("0.1"), - } - err := UpdateVersionsGraph(data, req, 0) - assert.Error(t, err) - assert.IsType(t, &serviceerror.InvalidArgument{}, err) -} - -func TestUnsetCurrentDefault(t *testing.T) { - n1 := mkVerIdNode("1") - data := &persistencepb.VersioningData{ - CurrentDefault: n1, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("1"), - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.Nil(t, data.CurrentDefault) -} - -func TestUnsetCurrentDefaultPreviousIncompatBecomesDefault(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1.PreviousIncompatible = n0 - data := &persistencepb.VersioningData{ - CurrentDefault: n1, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("1"), - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(n0.Version)) - assert.Equal(t, "0", data.CurrentDefault.Version.GetWorkerBuildId()) -} - -func TestUnsetCurrentDefaultPreviousCompatBecomesDefault(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1d1 := mkVerIdNode("1.1") - n1.PreviousIncompatible = n0 - n1d1.PreviousCompatible = n1 - data := &persistencepb.VersioningData{ - CurrentDefault: n1d1, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("1.1"), - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(n1.Version)) - assert.Equal(t, "1", data.CurrentDefault.Version.GetWorkerBuildId()) -} - -func TestDropCompatibleLeaf(t *testing.T) { - n0 := mkVerIdNode("0") - n1 := mkVerIdNode("1") - n1d1 := mkVerIdNode("1.1") - n1.PreviousIncompatible = n0 - n1d1.PreviousCompatible = n1 - data := &persistencepb.VersioningData{ - CurrentDefault: n1, - CompatibleLeaves: []*taskqueuepb.VersionIdNode{n1d1}, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("1.1"), - } - - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - - assert.True(t, data.CurrentDefault.Version.Equal(n1.Version)) - assert.Equal(t, "1", data.CurrentDefault.Version.GetWorkerBuildId()) - assert.Equal(t, 1, len(data.CompatibleLeaves)) - assert.Equal(t, "1", data.CompatibleLeaves[0].Version.GetWorkerBuildId()) -} - -func TestCompatibleTargetsNotFound(t *testing.T) { - n0 := mkVerIdNode("0") - data := &persistencepb.VersioningData{ - CurrentDefault: n0, - } - - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: mkVerId("1.1"), - PreviousCompatible: mkVerId("1"), - } - - err := UpdateVersionsGraph(data, req, 0) - assert.Error(t, err) - assert.IsType(t, &serviceerror.NotFound{}, err) -} - -func TestLimitsMaxSize(t *testing.T) { - data := &persistencepb.VersioningData{} - maxSize := 1000 - - for i := 0; i < 1024; i++ { - id := mkVerId(fmt.Sprintf("%d", i)) - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: id, - BecomeDefault: true, - } - err := UpdateVersionsGraph(data, req, maxSize) - assert.NoError(t, err) - } - for i := 0; i < 1024; i++ { - id := mkVerId(fmt.Sprintf("50.%d", i)) - var compatId *taskqueuepb.VersionId - if i == 0 { - compatId = mkVerId("50") - } else { - compatId = mkVerId(fmt.Sprintf("50.%d", i-1)) - } - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: id, - PreviousCompatible: compatId, - } - err := UpdateVersionsGraph(data, req, maxSize) - assert.NoError(t, err) - } - - lastNode := data.GetCurrentDefault() - for { - if lastNode.GetPreviousIncompatible() == nil { - break - } - lastNode = lastNode.GetPreviousIncompatible() - } - assert.Equal(t, mkVerId("24"), lastNode.GetVersion()) - assert.Equal(t, 1, len(data.GetCompatibleLeaves())) - lastNode = data.GetCompatibleLeaves()[0] - for { - if lastNode.GetPreviousCompatible() == nil { - break - } - lastNode = lastNode.GetPreviousCompatible() - } - assert.Equal(t, mkVerId("50.24"), lastNode.GetVersion()) -} - -func FuzzVersionGraphEnsureNoSameTypeDefaults(f *testing.F) { - f.Fuzz(func(t *testing.T, numUpdates, willPickCompatMod, compatModTarget uint8) { - addedNodes := make([]*taskqueuepb.VersionId, 0, numUpdates) - data := &persistencepb.VersioningData{} - - for i := uint8(0); i < numUpdates; i++ { - id := mkVerId(fmt.Sprintf("%d", i)) - req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ - VersionId: id, - BecomeDefault: true, - } - if willPickCompatMod > 0 && compatModTarget > 0 && - numUpdates%willPickCompatMod == 0 && - uint8(len(addedNodes)) > numUpdates%compatModTarget { - compatTarget := addedNodes[numUpdates%compatModTarget] - req.PreviousCompatible = compatTarget - } - addedNodes = append(addedNodes, id) - err := UpdateVersionsGraph(data, req, 0) - assert.NoError(t, err) - assert.NotNil(t, ToBuildIdOrderingResponse(data, 0)) - } - }) -} diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go new file mode 100644 index 000000000000..183bf4341b6c --- /dev/null +++ b/service/matching/version_sets.go @@ -0,0 +1,178 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "encoding/binary" + "fmt" + "github.com/dgryski/go-farm" + "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/persistence/v1" +) + +func ToBuildIdOrderingResponse(g *persistence.VersioningData, maxDepth int) *workflowservice.GetWorkerBuildIdOrderingResponse { + // TODO: Current default pointer not represented in response. Can either shuffle it to front or change API. + return depthLimiter(g, maxDepth, false) +} + +// HashVersioningData returns a farm.Fingerprint64 hash of the versioning data as bytes. If the data is nonexistent or +// invalid, returns nil. +func HashVersioningData(data *persistence.VersioningData) []byte { + if data == nil || data.GetVersionSets() == nil { + return nil + } + asBytes, err := data.Marshal() + if err != nil { + return nil + } + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, farm.Fingerprint64(asBytes)) + return b +} + +func depthLimiter(g *persistence.VersioningData, maxDepth int, mutate bool) *workflowservice.GetWorkerBuildIdOrderingResponse { + if maxDepth <= 0 || maxDepth >= len(g.GetVersionSets()) { + return &workflowservice.GetWorkerBuildIdOrderingResponse{MajorVersionSets: g.VersionSets} + } + shortened := g.GetVersionSets()[maxDepth:] + if mutate { + g.VersionSets = shortened + } + return &workflowservice.GetWorkerBuildIdOrderingResponse{MajorVersionSets: shortened} +} + +// Given existing versioning data and an update request, update the version sets appropriately. The request is expected +// to have already been validated. +// +// See the API docs for more detail. In short, the versioning data representation consists of a sequence of sequences of +// compatible versions. Like so: +// +// * +// ┬─1.0───2.0─┬─3.0───4.0 +// │ ├─3.1 +// │ └─3.2 +// ├─1.1 +// ├─1.2 +// └─1.3 +// +// In the above example, 4.0 is the current default version and no other versions are compatible with it. The previous +// compatible set is the 3.x set, with 3.2 being the current default for that set, and so on. The * represents the +// current default set pointer, which can be shifted around by the user. +// +// A request may: +// 1. Add a new version possibly as the new overall default version, creating a new set. +// 2. Add a new version, compatible with some existing version, adding it to that existing set and making it the new +// default for that set. +// 3. Target some existing version, marking it (and thus its set) as the default set. +// +// Deletions are not permitted, as inserting new versions can accomplish the same goals with less complexity. However, +// sets may be dropped when the number of sets limit is reached. They are dropped oldest first - the current default set +// is never dropped, instead dropping the next oldest set. +func UpdateVersionsGraph(existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdOrderingRequest, maxSize int) error { + err := updateImpl(existingData, req) + if err != nil { + return err + } + // Limit graph size if it's grown too large + depthLimiter(existingData, maxSize, true) + return nil +} + +func updateImpl(existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdOrderingRequest) error { + // First find if the targeted version is already in the sets + targetedVersion := extractTargetedVersion(req) + targetSetIx, _ := findVersion(existingData, targetedVersion) + + if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdOrderingRequest_NewDefaultVersionId); ok { + // If it's not already in the sets, add it as the new default set + if targetSetIx != -1 { + return serviceerror.NewInvalidArgument(fmt.Sprintf("version %s already exists", targetedVersion)) + } + + existingData.VersionSets = append(existingData.GetVersionSets(), &taskqueuepb.CompatibleVersionSet{ + Versions: []string{targetedVersion}, + }) + } else if op, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion_); ok { + compatVer := op.NewCompatibleVersion.GetExistingCompatibleVersion() + compatSetIx, _ := findVersion(existingData, compatVer) + if compatSetIx == -1 { + return serviceerror.NewNotFound( + fmt.Sprintf("targeted compatible_version %v not found", compatVer)) + } + if targetSetIx != -1 { + // If the version does exist, this operation can't do anything meaningful, but we can fail if the user + // says the version is now compatible with some different set. + return serviceerror.NewInvalidArgument(fmt.Sprintf("version %s already exists", targetedVersion)) + } + + // If the version doesn't exist, add it to the compatible set + existingData.VersionSets[compatSetIx].Versions = + append(existingData.VersionSets[compatSetIx].Versions, targetedVersion) + if op.NewCompatibleVersion.GetBecomeDefault() { + makeDefaultSet(existingData, compatSetIx) + } + } else if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdOrderingRequest_ExistingVersionIdInSetToPromote); ok { + if targetSetIx == -1 { + return serviceerror.NewNotFound(fmt.Sprintf("targeted version %v not found", targetedVersion)) + } + makeDefaultSet(existingData, targetSetIx) + } + + return nil +} + +func extractTargetedVersion(req *workflowservice.UpdateWorkerBuildIdOrderingRequest) string { + if req.GetNewCompatibleVersion() != nil { + return req.GetNewCompatibleVersion().GetNewVersionId() + } else if req.GetExistingVersionIdInSetToPromote() != "" { + return req.GetExistingVersionIdInSetToPromote() + } + return req.GetNewDefaultVersionId() +} + +// Finds the version in the version sets, returning (set index, index within that set) +// Returns -1, -1 if not found. +func findVersion(data *persistence.VersioningData, id string) (int, int) { + for setIx, set := range data.GetVersionSets() { + for versionIx, version := range set.GetVersions() { + if version == id { + return setIx, versionIx + } + } + } + return -1, -1 +} + +func makeDefaultSet(data *persistence.VersioningData, setIx int) { + if len(data.VersionSets) <= 1 { + return + } + // Move the set to the end and shift all the others down + moveMe := data.VersionSets[setIx] + copy(data.VersionSets[setIx:], data.VersionSets[setIx+1:]) + data.VersionSets[len(data.VersionSets)-1] = moveMe +} diff --git a/service/matching/version_sets_test.go b/service/matching/version_sets_test.go new file mode 100644 index 000000000000..11a27a747264 --- /dev/null +++ b/service/matching/version_sets_test.go @@ -0,0 +1,239 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "fmt" + "go.temporal.io/api/serviceerror" + "testing" + + "github.com/stretchr/testify/assert" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + + persistencepb "go.temporal.io/server/api/persistence/v1" +) + +func mkNewSet(id string) *taskqueuepb.CompatibleVersionSet { + return &taskqueuepb.CompatibleVersionSet{ + Versions: []string{id}, + } +} + +func mkInitialData(numSets int) *persistencepb.VersioningData { + sets := make([]*taskqueuepb.CompatibleVersionSet, numSets) + for i := 0; i < numSets; i++ { + sets[i] = mkNewSet(fmt.Sprintf("%v", i)) + } + return &persistencepb.VersioningData{ + VersionSets: sets, + } +} + +func mkNewDefReq(id string) *workflowservice.UpdateWorkerBuildIdOrderingRequest { + return &workflowservice.UpdateWorkerBuildIdOrderingRequest{ + Operation: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewDefaultVersionId{ + NewDefaultVersionId: id, + }, + } +} +func mkNewCompatReq(id, compat string, becomeDefault bool) *workflowservice.UpdateWorkerBuildIdOrderingRequest { + return &workflowservice.UpdateWorkerBuildIdOrderingRequest{ + Operation: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion_{ + NewCompatibleVersion: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion{ + NewVersionId: id, + ExistingCompatibleVersion: compat, + BecomeDefault: becomeDefault, + }, + }, + } +} +func mkExistingDefault(id string) *workflowservice.UpdateWorkerBuildIdOrderingRequest { + return &workflowservice.UpdateWorkerBuildIdOrderingRequest{ + Operation: &workflowservice.UpdateWorkerBuildIdOrderingRequest_ExistingVersionIdInSetToPromote{ + ExistingVersionIdInSetToPromote: id, + }, + } +} + +func TestNewDefaultUpdate(t *testing.T) { + data := mkInitialData(2) + + req := mkNewDefReq("2") + err := UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + curd := data.VersionSets[len(data.VersionSets)-1] + assert.Equal(t, "2", curd.Versions[0]) + assert.Equal(t, "1", data.VersionSets[1].Versions[0]) + assert.Equal(t, "0", data.VersionSets[0].Versions[0]) + + asResp := ToBuildIdOrderingResponse(data, 0) + assert.Equal(t, "2", asResp.MajorVersionSets[2].Versions[0]) +} + +func TestNewDefaultGraphUpdateOfEmptyGraph(t *testing.T) { + data := &persistencepb.VersioningData{} + + req := mkNewDefReq("1") + err := UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + curd := data.VersionSets[len(data.VersionSets)-1] + assert.Equal(t, "1", curd.Versions[0]) + assert.Equal(t, 1, len(data.VersionSets)) +} + +func TestNewDefaultGraphUpdateCompatWithCurDefault(t *testing.T) { + data := mkInitialData(2) + + req := mkNewCompatReq("1.1", "1", true) + err := UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + curd := data.VersionSets[len(data.VersionSets)-1] + assert.Equal(t, "1.1", curd.Versions[1]) + assert.Equal(t, "1", curd.Versions[0]) + assert.Equal(t, "0", data.VersionSets[0].Versions[0]) +} + +func TestNewDefaultGraphUpdateCompatWithNonDefaultSet(t *testing.T) { + data := mkInitialData(2) + + req := mkNewCompatReq("0.1", "0", true) + err := UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + curd := data.VersionSets[len(data.VersionSets)-1] + assert.Equal(t, "0.1", curd.Versions[1]) + assert.Equal(t, "0", curd.Versions[0]) + assert.Equal(t, "1", data.VersionSets[0].Versions[0]) +} + +func TestNewCompatibleWithVerInOlderSet(t *testing.T) { + data := mkInitialData(3) + + req := mkNewCompatReq("0.1", "0", false) + err := UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + curd := data.VersionSets[len(data.VersionSets)-1] + assert.Equal(t, "2", curd.Versions[0]) + assert.Equal(t, "0.1", data.VersionSets[0].Versions[1]) + assert.Equal(t, "0", data.VersionSets[0].Versions[0]) + + asResp := ToBuildIdOrderingResponse(data, 0) + assert.Equal(t, "0.1", asResp.MajorVersionSets[0].Versions[1]) +} + +func TestNewCompatibleWithNonDefaultGraphUpdate(t *testing.T) { + data := mkInitialData(2) + + req := mkNewCompatReq("0.1", "0", false) + err := UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + req = mkNewCompatReq("0.2", "0.1", false) + err = UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + curd := data.VersionSets[len(data.VersionSets)-1] + assert.Equal(t, "1", curd.Versions[0]) + assert.Equal(t, "0", data.VersionSets[0].Versions[0]) + assert.Equal(t, "0.1", data.VersionSets[0].Versions[1]) + assert.Equal(t, "0.2", data.VersionSets[0].Versions[2]) + + // Ensure setting a compatible version which targets a non-leaf compat version ends up without a branch + req = mkNewCompatReq("0.3", "0.1", false) + err = UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + assert.Equal(t, "1", curd.Versions[0]) + assert.Equal(t, "0", data.VersionSets[0].Versions[0]) + assert.Equal(t, "0.1", data.VersionSets[0].Versions[1]) + assert.Equal(t, "0.2", data.VersionSets[0].Versions[2]) + assert.Equal(t, "0.3", data.VersionSets[0].Versions[3]) +} + +func TestCompatibleTargetsNotFound(t *testing.T) { + data := mkInitialData(1) + + req := mkNewCompatReq("1.1", "1", false) + err := UpdateVersionsGraph(data, req, 0) + assert.Error(t, err) + assert.IsType(t, &serviceerror.NotFound{}, err) +} + +func TestMakeExistingSetDefault(t *testing.T) { + data := mkInitialData(4) + + req := mkExistingDefault("2") + err := UpdateVersionsGraph(data, req, 0) + + assert.NoError(t, err) + assert.Equal(t, "0", data.VersionSets[0].Versions[0]) + assert.Equal(t, "1", data.VersionSets[1].Versions[0]) + assert.Equal(t, "3", data.VersionSets[2].Versions[0]) + assert.Equal(t, "2", data.VersionSets[3].Versions[0]) + + // Add a compatible version to a set and then make that set the default via the compatible version + req = mkNewCompatReq("1.1", "1", true) + + err = UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + assert.Equal(t, "0", data.VersionSets[0].Versions[0]) + assert.Equal(t, "3", data.VersionSets[1].Versions[0]) + assert.Equal(t, "2", data.VersionSets[2].Versions[0]) + assert.Equal(t, "1", data.VersionSets[3].Versions[0]) +} + +func TestSayVersionIsCompatWithDifferentSetThanItsAlreadyCompatWithNotAllowed(t *testing.T) { + data := mkInitialData(3) + + req := mkNewCompatReq("0.1", "0", false) + err := UpdateVersionsGraph(data, req, 0) + assert.NoError(t, err) + + req = mkNewCompatReq("0.1", "1", false) + err = UpdateVersionsGraph(data, req, 0) + assert.Error(t, err) + assert.IsType(t, &serviceerror.InvalidArgument{}, err) +} + +func TestLimitsMaxSize(t *testing.T) { + data := &persistencepb.VersioningData{} + maxSize := 10 + + for i := 0; i < 20; i++ { + id := fmt.Sprintf("%d", i) + req := mkNewDefReq(id) + err := UpdateVersionsGraph(data, req, maxSize) + assert.NoError(t, err) + } + + for i := 0; i < len(data.VersionSets); i++ { + assert.Equal(t, fmt.Sprintf("%d", i+10), data.VersionSets[i].Versions[0]) + } +}