diff --git a/api/persistence/v1/task_queues.pb.go b/api/persistence/v1/task_queues.pb.go index de023e290c7..cc61ede4c81 100644 --- a/api/persistence/v1/task_queues.pb.go +++ b/api/persistence/v1/task_queues.pb.go @@ -51,51 +51,51 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type BuildID_State int32 +type BuildId_State int32 const ( - STATE_UNSPECIFIED BuildID_State = 0 - STATE_ACTIVE BuildID_State = 1 - STATE_DELETED BuildID_State = 2 + STATE_UNSPECIFIED BuildId_State = 0 + STATE_ACTIVE BuildId_State = 1 + STATE_DELETED BuildId_State = 2 ) -var BuildID_State_name = map[int32]string{ +var BuildId_State_name = map[int32]string{ 0: "StateUnspecified", 1: "StateActive", 2: "StateDeleted", } -var BuildID_State_value = map[string]int32{ +var BuildId_State_value = map[string]int32{ "StateUnspecified": 0, "StateActive": 1, "StateDeleted": 2, } -func (BuildID_State) EnumDescriptor() ([]byte, []int) { +func (BuildId_State) EnumDescriptor() ([]byte, []int) { return fileDescriptor_0cb9a0f256d1327d, []int{0, 0} } -// BuildID is an identifier with a timestamped status used to identify workers for task queue versioning purposes. -type BuildID struct { +// BuildId is an identifier with a timestamped status used to identify workers for task queue versioning purposes. +type BuildId struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - State BuildID_State `protobuf:"varint,2,opt,name=state,proto3,enum=temporal.server.api.persistence.v1.BuildID_State" json:"state,omitempty"` + State BuildId_State `protobuf:"varint,2,opt,name=state,proto3,enum=temporal.server.api.persistence.v1.BuildId_State" json:"state,omitempty"` // HLC timestamp representing when the state was updated or the when build ID was originally inserted. // (-- api-linter: core::0142::time-field-type=disabled // aip.dev/not-precedent: Using HLC instead of wall clock. --) StateUpdateTimestamp *v1.HybridLogicalClock `protobuf:"bytes,3,opt,name=state_update_timestamp,json=stateUpdateTimestamp,proto3" json:"state_update_timestamp,omitempty"` } -func (m *BuildID) Reset() { *m = BuildID{} } -func (*BuildID) ProtoMessage() {} -func (*BuildID) Descriptor() ([]byte, []int) { +func (m *BuildId) Reset() { *m = BuildId{} } +func (*BuildId) ProtoMessage() {} +func (*BuildId) Descriptor() ([]byte, []int) { return fileDescriptor_0cb9a0f256d1327d, []int{0} } -func (m *BuildID) XXX_Unmarshal(b []byte) error { +func (m *BuildId) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *BuildID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *BuildId) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_BuildID.Marshal(b, m, deterministic) + return xxx_messageInfo_BuildId.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -105,33 +105,33 @@ func (m *BuildID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } -func (m *BuildID) XXX_Merge(src proto.Message) { - xxx_messageInfo_BuildID.Merge(m, src) +func (m *BuildId) XXX_Merge(src proto.Message) { + xxx_messageInfo_BuildId.Merge(m, src) } -func (m *BuildID) XXX_Size() int { +func (m *BuildId) XXX_Size() int { return m.Size() } -func (m *BuildID) XXX_DiscardUnknown() { - xxx_messageInfo_BuildID.DiscardUnknown(m) +func (m *BuildId) XXX_DiscardUnknown() { + xxx_messageInfo_BuildId.DiscardUnknown(m) } -var xxx_messageInfo_BuildID proto.InternalMessageInfo +var xxx_messageInfo_BuildId proto.InternalMessageInfo -func (m *BuildID) GetId() string { +func (m *BuildId) GetId() string { if m != nil { return m.Id } return "" } -func (m *BuildID) GetState() BuildID_State { +func (m *BuildId) GetState() BuildId_State { if m != nil { return m.State } return STATE_UNSPECIFIED } -func (m *BuildID) GetStateUpdateTimestamp() *v1.HybridLogicalClock { +func (m *BuildId) GetStateUpdateTimestamp() *v1.HybridLogicalClock { if m != nil { return m.StateUpdateTimestamp } @@ -146,8 +146,8 @@ type CompatibleVersionSet struct { // case a set might end up with more than one ID. SetIds []string `protobuf:"bytes,1,rep,name=set_ids,json=setIds,proto3" json:"set_ids,omitempty"` // All the compatible versions, unordered except for the last element, which is considered the set "default". - BuildIds []*BuildID `protobuf:"bytes,2,rep,name=build_ids,json=buildIds,proto3" json:"build_ids,omitempty"` - // HLC timestamp representing when the set default was updated. Different from BuildID.state_update_timestamp, which + BuildIds []*BuildId `protobuf:"bytes,2,rep,name=build_ids,json=buildIds,proto3" json:"build_ids,omitempty"` + // HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which // refers to the build ID status. // (-- api-linter: core::0142::time-field-type=disabled // aip.dev/not-precedent: Using HLC instead of wall clock. --) @@ -193,7 +193,7 @@ func (m *CompatibleVersionSet) GetSetIds() []string { return nil } -func (m *CompatibleVersionSet) GetBuildIds() []*BuildID { +func (m *CompatibleVersionSet) GetBuildIds() []*BuildId { if m != nil { return m.BuildIds } @@ -378,8 +378,8 @@ func (m *VersionedTaskQueueUserData) GetVersion() int64 { } func init() { - proto.RegisterEnum("temporal.server.api.persistence.v1.BuildID_State", BuildID_State_name, BuildID_State_value) - proto.RegisterType((*BuildID)(nil), "temporal.server.api.persistence.v1.BuildID") + proto.RegisterEnum("temporal.server.api.persistence.v1.BuildId_State", BuildId_State_name, BuildId_State_value) + proto.RegisterType((*BuildId)(nil), "temporal.server.api.persistence.v1.BuildId") proto.RegisterType((*CompatibleVersionSet)(nil), "temporal.server.api.persistence.v1.CompatibleVersionSet") proto.RegisterType((*VersioningData)(nil), "temporal.server.api.persistence.v1.VersioningData") proto.RegisterType((*TaskQueueUserData)(nil), "temporal.server.api.persistence.v1.TaskQueueUserData") @@ -391,7 +391,7 @@ func init() { } var fileDescriptor_0cb9a0f256d1327d = []byte{ - // 577 bytes of a gzipped FileDescriptorProto + // 578 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0x41, 0x6f, 0xd3, 0x3c, 0x1c, 0xc6, 0xe3, 0xf4, 0xdd, 0xf6, 0xce, 0x1d, 0xa5, 0xb5, 0xc6, 0x88, 0x76, 0xb0, 0xaa, 0x9e, 0x2a, 0x90, 0x52, 0x5a, 0x86, 0x84, 0xc4, 0x69, 0x6b, 0x33, 0x16, 0x69, 0x42, 0x90, 0xa6, 0x3b, @@ -402,50 +402,50 @@ var fileDescriptor_0cb9a0f256d1327d = []byte{ 0x0d, 0x39, 0xe3, 0x82, 0x8e, 0x7a, 0xb4, 0x16, 0xd5, 0x6b, 0x82, 0xf0, 0x81, 0xf3, 0x6a, 0x4c, 0xc7, 0x94, 0xeb, 0x41, 0xe8, 0x0b, 0x1f, 0x55, 0xe6, 0x2a, 0x3d, 0x55, 0xe9, 0x24, 0x60, 0xfa, 0x25, 0x95, 0x1e, 0xd5, 0xb7, 0xef, 0x2d, 0x22, 0xf7, 0x3c, 0xbf, 0x37, 0x48, 0x98, 0x43, 0xca, - 0x39, 0xe9, 0xd3, 0x94, 0x57, 0x79, 0xa7, 0xc2, 0xb5, 0xbd, 0x31, 0xf3, 0x5c, 0xb3, 0x85, 0x0a, - 0x50, 0x65, 0xae, 0x06, 0xca, 0xa0, 0xba, 0x6e, 0xa9, 0xcc, 0x45, 0x4f, 0xe1, 0x0a, 0x17, 0x44, - 0x50, 0x4d, 0x2d, 0x83, 0x6a, 0xa1, 0x51, 0xd7, 0xff, 0xec, 0xad, 0x67, 0x2c, 0xbd, 0x9d, 0x08, - 0xad, 0x54, 0x8f, 0x4e, 0xe0, 0x96, 0x5c, 0x38, 0xe3, 0xc0, 0x4d, 0x7e, 0x04, 0x1b, 0x52, 0x2e, - 0xc8, 0x30, 0xd0, 0x72, 0x65, 0x50, 0xcd, 0x37, 0x1e, 0x2c, 0x24, 0xcb, 0x13, 0x27, 0xcc, 0x83, - 0xd7, 0xdd, 0x90, 0xb9, 0x87, 0x7e, 0x9f, 0xf5, 0x88, 0xd7, 0x4c, 0xa6, 0xd6, 0xa6, 0xe4, 0x75, - 0x24, 0xce, 0x9e, 0xd3, 0x2a, 0x4d, 0xb8, 0x22, 0x7d, 0xd1, 0x1d, 0x58, 0x6a, 0xdb, 0xbb, 0xb6, - 0xe1, 0x74, 0x9e, 0xb5, 0x9f, 0x1b, 0x4d, 0x73, 0xdf, 0x34, 0x5a, 0x45, 0x05, 0x15, 0xe1, 0x46, - 0x3a, 0xde, 0x6d, 0xda, 0xe6, 0x91, 0x51, 0x04, 0xa8, 0x04, 0x6f, 0xa5, 0x93, 0x96, 0x71, 0x68, - 0xd8, 0x46, 0xab, 0xa8, 0x56, 0xbe, 0x02, 0xb8, 0xd9, 0xf4, 0x87, 0x01, 0x11, 0xac, 0xeb, 0xd1, - 0xa3, 0x24, 0x9e, 0x3f, 0x6a, 0x53, 0x81, 0xee, 0xc2, 0x35, 0x4e, 0x85, 0xc3, 0x5c, 0xae, 0x81, - 0x72, 0xae, 0xba, 0x6e, 0xad, 0x72, 0x2a, 0x4c, 0x97, 0xa3, 0x03, 0xb8, 0xde, 0x4d, 0x62, 0xcb, - 0xbf, 0xd4, 0x72, 0xae, 0x9a, 0x6f, 0xdc, 0xff, 0x8b, 0xae, 0xac, 0xff, 0xa5, 0x3a, 0x21, 0x9d, - 0x42, 0xcd, 0xa5, 0x27, 0x64, 0xec, 0x89, 0x9b, 0xab, 0x6a, 0x2b, 0x23, 0xfe, 0x5c, 0xd6, 0x67, - 0x00, 0x0b, 0x59, 0x3a, 0x36, 0xea, 0xb7, 0x88, 0x20, 0xe8, 0x18, 0x6e, 0x44, 0xe9, 0xc4, 0xe1, - 0x54, 0xa4, 0x31, 0xf3, 0x8d, 0xc7, 0xcb, 0x64, 0x59, 0xd4, 0x98, 0x95, 0x8f, 0x2e, 0xd6, 0xbf, - 0xcf, 0xa6, 0xde, 0x70, 0xb6, 0x8f, 0x00, 0x96, 0x6c, 0xc2, 0x07, 0x2f, 0x92, 0x4f, 0xa7, 0xc3, - 0x69, 0x28, 0xe3, 0xed, 0xc3, 0x15, 0x09, 0x93, 0xaf, 0xf8, 0xbf, 0xd8, 0xa5, 0x72, 0x74, 0x0c, - 0x6f, 0x47, 0x17, 0xc5, 0x39, 0x2e, 0x11, 0x24, 0x0b, 0xd0, 0x58, 0xa6, 0xa9, 0xab, 0x9d, 0x5b, - 0x85, 0xe8, 0xca, 0xbe, 0xf2, 0x16, 0xc0, 0xed, 0xec, 0x11, 0xea, 0x5e, 0xcf, 0x60, 0xc2, 0xff, - 0xa4, 0x61, 0x1a, 0xe1, 0xd1, 0x32, 0x86, 0xd7, 0x20, 0x96, 0x44, 0x20, 0x0d, 0xae, 0x65, 0xde, - 0xf2, 0xf8, 0x39, 0x6b, 0xbe, 0xdd, 0x3b, 0x9d, 0x4c, 0xb1, 0x72, 0x36, 0xc5, 0xca, 0xf9, 0x14, - 0x83, 0x37, 0x31, 0x06, 0x1f, 0x62, 0x0c, 0x3e, 0xc5, 0x18, 0x4c, 0x62, 0x0c, 0xbe, 0xc4, 0x18, - 0x7c, 0x8b, 0xb1, 0x72, 0x1e, 0x63, 0xf0, 0x7e, 0x86, 0x95, 0xc9, 0x0c, 0x2b, 0x67, 0x33, 0xac, - 0xbc, 0xdc, 0xe9, 0xfb, 0x3f, 0x8e, 0xc3, 0xfc, 0x5f, 0x5f, 0x6b, 0x4f, 0x2e, 0x6d, 0xbb, 0xab, - 0xf2, 0x1e, 0x7a, 0xf8, 0x3d, 0x00, 0x00, 0xff, 0xff, 0x5d, 0xb3, 0x9f, 0xdd, 0x0f, 0x05, 0x00, - 0x00, -} - -func (x BuildID_State) String() string { - s, ok := BuildID_State_name[int32(x)] + 0x39, 0xe9, 0xd3, 0x94, 0x57, 0x79, 0xa7, 0xc2, 0xb5, 0xbd, 0x31, 0xf3, 0x5c, 0xd3, 0x45, 0x05, + 0xa8, 0x32, 0x57, 0x03, 0x65, 0x50, 0x5d, 0xb7, 0x54, 0xe6, 0xa2, 0xa7, 0x70, 0x85, 0x0b, 0x22, + 0xa8, 0xa6, 0x96, 0x41, 0xb5, 0xd0, 0xa8, 0xeb, 0x7f, 0xf6, 0xd6, 0x33, 0x96, 0xde, 0x4e, 0x84, + 0x56, 0xaa, 0x47, 0x27, 0x70, 0x4b, 0x2e, 0x9c, 0x71, 0xe0, 0x26, 0x3f, 0x82, 0x0d, 0x29, 0x17, + 0x64, 0x18, 0x68, 0xb9, 0x32, 0xa8, 0xe6, 0x1b, 0x0f, 0x16, 0x92, 0xe5, 0x89, 0x13, 0xe6, 0xc1, + 0xeb, 0x6e, 0xc8, 0xdc, 0x43, 0xbf, 0xcf, 0x7a, 0xc4, 0x6b, 0x26, 0x53, 0x6b, 0x53, 0xf2, 0x3a, + 0x12, 0x67, 0xcf, 0x69, 0x95, 0x26, 0x5c, 0x91, 0xbe, 0xe8, 0x0e, 0x2c, 0xb5, 0xed, 0x5d, 0xdb, + 0x70, 0x3a, 0xcf, 0xda, 0xcf, 0x8d, 0xa6, 0xb9, 0x6f, 0x1a, 0xad, 0xa2, 0x82, 0x8a, 0x70, 0x23, + 0x1d, 0xef, 0x36, 0x6d, 0xf3, 0xc8, 0x28, 0x02, 0x54, 0x82, 0xb7, 0xd2, 0x49, 0xcb, 0x38, 0x34, + 0x6c, 0xa3, 0x55, 0x54, 0x2b, 0x5f, 0x01, 0xdc, 0x6c, 0xfa, 0xc3, 0x80, 0x08, 0xd6, 0xf5, 0xe8, + 0x51, 0x12, 0xcf, 0x1f, 0xb5, 0xa9, 0x40, 0x77, 0xe1, 0x1a, 0xa7, 0xc2, 0x61, 0x2e, 0xd7, 0x40, + 0x39, 0x57, 0x5d, 0xb7, 0x56, 0x39, 0x15, 0xa6, 0xcb, 0xd1, 0x01, 0x5c, 0xef, 0x26, 0xb1, 0xe5, + 0x5f, 0x6a, 0x39, 0x57, 0xcd, 0x37, 0xee, 0xff, 0x45, 0x57, 0xd6, 0xff, 0xdd, 0x74, 0xc1, 0xd1, + 0x29, 0xd4, 0x5c, 0x7a, 0x42, 0xc6, 0x9e, 0xb8, 0xb9, 0xaa, 0xb6, 0x32, 0xe2, 0xcf, 0x65, 0x7d, + 0x06, 0xb0, 0x90, 0xa5, 0x63, 0xa3, 0x7e, 0x8b, 0x08, 0x82, 0x8e, 0xe1, 0x46, 0x94, 0x4e, 0x1c, + 0x4e, 0x45, 0x1a, 0x33, 0xdf, 0x78, 0xbc, 0x4c, 0x96, 0x45, 0x8d, 0x59, 0xf9, 0xe8, 0x62, 0xfd, + 0xfb, 0x6c, 0xea, 0x0d, 0x67, 0xfb, 0x08, 0x60, 0xc9, 0x26, 0x7c, 0xf0, 0x22, 0xf9, 0x74, 0x3a, + 0x9c, 0x86, 0x32, 0xde, 0x3e, 0x5c, 0x91, 0x30, 0xf9, 0x8a, 0xff, 0x8b, 0x5d, 0x2a, 0x47, 0xc7, + 0xf0, 0x76, 0x74, 0x51, 0x9c, 0xe3, 0x12, 0x41, 0xb2, 0x00, 0x8d, 0x65, 0x9a, 0xba, 0xda, 0xb9, + 0x55, 0x88, 0xae, 0xec, 0x2b, 0x6f, 0x01, 0xdc, 0xce, 0x1e, 0xa1, 0xee, 0xf5, 0x0c, 0x26, 0xfc, + 0x4f, 0x1a, 0xa6, 0x11, 0x1e, 0x2d, 0x63, 0x78, 0x0d, 0x62, 0x49, 0x04, 0xd2, 0xe0, 0x5a, 0xe6, + 0x2d, 0x8f, 0x9f, 0xb3, 0xe6, 0xdb, 0xbd, 0xd3, 0xc9, 0x14, 0x2b, 0x67, 0x53, 0xac, 0x9c, 0x4f, + 0x31, 0x78, 0x13, 0x63, 0xf0, 0x21, 0xc6, 0xe0, 0x53, 0x8c, 0xc1, 0x24, 0xc6, 0xe0, 0x4b, 0x8c, + 0xc1, 0xb7, 0x18, 0x2b, 0xe7, 0x31, 0x06, 0xef, 0x67, 0x58, 0x99, 0xcc, 0xb0, 0x72, 0x36, 0xc3, + 0xca, 0xcb, 0x9d, 0xbe, 0xff, 0xe3, 0x38, 0xcc, 0xff, 0xf5, 0xb5, 0xf6, 0xe4, 0xd2, 0xb6, 0xbb, + 0x2a, 0xef, 0xa1, 0x87, 0xdf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x99, 0xb5, 0x16, 0x5c, 0x0f, 0x05, + 0x00, 0x00, +} + +func (x BuildId_State) String() string { + s, ok := BuildId_State_name[int32(x)] if ok { return s } return strconv.Itoa(int(x)) } -func (this *BuildID) Equal(that interface{}) bool { +func (this *BuildId) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*BuildID) + that1, ok := that.(*BuildId) if !ok { - that2, ok := that.(BuildID) + that2, ok := that.(BuildId) if ok { that1 = &that2 } else { @@ -594,12 +594,12 @@ func (this *VersionedTaskQueueUserData) Equal(that interface{}) bool { } return true } -func (this *BuildID) GoString() string { +func (this *BuildId) GoString() string { if this == nil { return "nil" } s := make([]string, 0, 7) - s = append(s, "&persistence.BuildID{") + s = append(s, "&persistence.BuildId{") s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") s = append(s, "State: "+fmt.Sprintf("%#v", this.State)+",\n") if this.StateUpdateTimestamp != nil { @@ -675,7 +675,7 @@ func valueToGoStringTaskQueues(v interface{}, typ string) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) } -func (m *BuildID) Marshal() (dAtA []byte, err error) { +func (m *BuildId) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -685,12 +685,12 @@ func (m *BuildID) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *BuildID) MarshalTo(dAtA []byte) (int, error) { +func (m *BuildId) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *BuildID) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *BuildId) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int @@ -927,7 +927,7 @@ func encodeVarintTaskQueues(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *BuildID) Size() (n int) { +func (m *BuildId) Size() (n int) { if m == nil { return 0 } @@ -1030,11 +1030,11 @@ func sovTaskQueues(x uint64) (n int) { func sozTaskQueues(x uint64) (n int) { return sovTaskQueues(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (this *BuildID) String() string { +func (this *BuildId) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&BuildID{`, + s := strings.Join([]string{`&BuildId{`, `Id:` + fmt.Sprintf("%v", this.Id) + `,`, `State:` + fmt.Sprintf("%v", this.State) + `,`, `StateUpdateTimestamp:` + strings.Replace(fmt.Sprintf("%v", this.StateUpdateTimestamp), "HybridLogicalClock", "v1.HybridLogicalClock", 1) + `,`, @@ -1046,9 +1046,9 @@ func (this *CompatibleVersionSet) String() string { if this == nil { return "nil" } - repeatedStringForBuildIds := "[]*BuildID{" + repeatedStringForBuildIds := "[]*BuildId{" for _, f := range this.BuildIds { - repeatedStringForBuildIds += strings.Replace(f.String(), "BuildID", "BuildID", 1) + "," + repeatedStringForBuildIds += strings.Replace(f.String(), "BuildId", "BuildId", 1) + "," } repeatedStringForBuildIds += "}" s := strings.Join([]string{`&CompatibleVersionSet{`, @@ -1105,7 +1105,7 @@ func valueToStringTaskQueues(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } -func (m *BuildID) Unmarshal(dAtA []byte) error { +func (m *BuildId) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -1128,10 +1128,10 @@ func (m *BuildID) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: BuildID: wiretype end group for non-group") + return fmt.Errorf("proto: BuildId: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: BuildID: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: BuildId: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: @@ -1180,7 +1180,7 @@ func (m *BuildID) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.State |= BuildID_State(b&0x7F) << shift + m.State |= BuildId_State(b&0x7F) << shift if b < 0x80 { break } @@ -1335,7 +1335,7 @@ func (m *CompatibleVersionSet) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.BuildIds = append(m.BuildIds, &BuildID{}) + m.BuildIds = append(m.BuildIds, &BuildId{}) if err := m.BuildIds[len(m.BuildIds)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index c8e50459889..25f970ec974 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -162,10 +162,10 @@ const ( // queue. Update requests which would cause the versioning data to exceed this number will fail with a // FailedPrecondition error. VersionCompatibleSetLimitPerQueue = "limit.versionCompatibleSetLimitPerQueue" - // VersionBuildIDLimitPerQueue is the max number of build IDs allowed to be defined in the versioning data for a + // VersionBuildIdLimitPerQueue is the max number of build IDs allowed to be defined in the versioning data for a // task queue. Update requests which would cause the versioning data to exceed this number will fail with a // FailedPrecondition error. - VersionBuildIDLimitPerQueue = "limit.versionBuildIDLimitPerQueue" + VersionBuildIdLimitPerQueue = "limit.versionBuildIdLimitPerQueue" // keys for frontend @@ -604,6 +604,8 @@ const ( DefaultWorkflowRetryPolicy = "history.defaultWorkflowRetryPolicy" // HistoryMaxAutoResetPoints is the key for max number of auto reset points stored in mutableState HistoryMaxAutoResetPoints = "history.historyMaxAutoResetPoints" + // HistoryMaxTrackedBuildIds indicates the max number of build IDs to store in the BuildIds search attribute + HistoryMaxTrackedBuildIds = "history.maxTrackedBuildIds" // EnableParentClosePolicy whether to ParentClosePolicy EnableParentClosePolicy = "history.enableParentClosePolicy" // ParentClosePolicyThreshold decides that parent close policy will be processed by sys workers(if enabled) if diff --git a/common/searchattribute/defs.go b/common/searchattribute/defs.go index 04d6b9fa4ef..37a455e2b65 100644 --- a/common/searchattribute/defs.go +++ b/common/searchattribute/defs.go @@ -46,6 +46,7 @@ const ( StateTransitionCount = "StateTransitionCount" TemporalChangeVersion = "TemporalChangeVersion" BinaryChecksums = "BinaryChecksums" + BuildIds = "BuildIds" BatcherNamespace = "BatcherNamespace" BatcherUser = "BatcherUser" HistorySizeBytes = "HistorySizeBytes" @@ -88,6 +89,7 @@ var ( predefined = map[string]enumspb.IndexedValueType{ TemporalChangeVersion: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, BinaryChecksums: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, + BuildIds: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, BatcherNamespace: enumspb.INDEXED_VALUE_TYPE_KEYWORD, BatcherUser: enumspb.INDEXED_VALUE_TYPE_KEYWORD, TemporalScheduledStartTime: enumspb.INDEXED_VALUE_TYPE_DATETIME, diff --git a/go.mod b/go.mod index bcf3b4399b5..b26838c3cbe 100644 --- a/go.mod +++ b/go.mod @@ -44,8 +44,8 @@ require ( go.opentelemetry.io/otel/metric v0.36.0 go.opentelemetry.io/otel/sdk v1.13.0 go.opentelemetry.io/otel/sdk/metric v0.36.0 - go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0 - go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc + go.temporal.io/api v1.19.1-0.20230505041445-71aa6a37fc2e + go.temporal.io/sdk v1.22.2-0.20230505041526-809dc9f34e08 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.10.0 go.uber.org/fx v1.19.1 diff --git a/go.sum b/go.sum index 3dbb665ca9b..48829c11088 100644 --- a/go.sum +++ b/go.sum @@ -1116,11 +1116,10 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= -go.temporal.io/api v1.19.1-0.20230503164115-e6a655167ace/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8= -go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0 h1:C2zhJnL7FvHT7FrTCfzk7B+Ra67ZYvmLp1YISHCsLos= -go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8= -go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc h1:BABNHYopDR0C/9SF/sLI/Ktbt8JBZ2uqOtpRlAAcFZY= -go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc/go.mod h1:mOHv2LGMQ3NHeCixq42+7041i7hayymv/Q9C1BobtRE= +go.temporal.io/api v1.19.1-0.20230505041445-71aa6a37fc2e h1:axHivcPAaz72OzVsjkEeH0nEZb3DGsqAhtzmWHjv68Y= +go.temporal.io/api v1.19.1-0.20230505041445-71aa6a37fc2e/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8= +go.temporal.io/sdk v1.22.2-0.20230505041526-809dc9f34e08 h1:nXYZBaIqZeHSU41DBi0p0Tx0N7HOS61C/H9cErCY52k= +go.temporal.io/sdk v1.22.2-0.20230505041526-809dc9f34e08/go.mod h1:5w9eNoNN6tHBJikoA1fJ/DBWbs3YrmjA9LxZ2h1swxc= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/proto/api b/proto/api index 1772b53e86a..0536bf45606 160000 --- a/proto/api +++ b/proto/api @@ -1 +1 @@ -Subproject commit 1772b53e86a8b7cbc5208eebe13e24800ab63d97 +Subproject commit 0536bf456066c6929ff8bc85583c845016d45aac diff --git a/proto/internal/temporal/server/api/persistence/v1/task_queues.proto b/proto/internal/temporal/server/api/persistence/v1/task_queues.proto index 6d023a63481..3127e7d5011 100644 --- a/proto/internal/temporal/server/api/persistence/v1/task_queues.proto +++ b/proto/internal/temporal/server/api/persistence/v1/task_queues.proto @@ -25,8 +25,8 @@ option go_package = "go.temporal.io/server/api/persistence/v1;persistence"; import "temporal/server/api/clock/v1/message.proto"; -// BuildID is an identifier with a timestamped status used to identify workers for task queue versioning purposes. -message BuildID { +// BuildId is an identifier with a timestamped status used to identify workers for task queue versioning purposes. +message BuildId { enum State { STATE_UNSPECIFIED = 0; STATE_ACTIVE = 1; @@ -49,8 +49,8 @@ message CompatibleVersionSet { // case a set might end up with more than one ID. repeated string set_ids = 1; // All the compatible versions, unordered except for the last element, which is considered the set "default". - repeated BuildID build_ids = 2; - // HLC timestamp representing when the set default was updated. Different from BuildID.state_update_timestamp, which + repeated BuildId build_ids = 2; + // HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which // refers to the build ID status. // (-- api-linter: core::0142::time-field-type=disabled // aip.dev/not-precedent: Using HLC instead of wall clock. --) diff --git a/schema/elasticsearch/visibility/index_template_v7.json b/schema/elasticsearch/visibility/index_template_v7.json index 69b2a5b7631..5c7e0072a06 120000 --- a/schema/elasticsearch/visibility/index_template_v7.json +++ b/schema/elasticsearch/visibility/index_template_v7.json @@ -1 +1 @@ -versioned/v4/index_template_v7.json \ No newline at end of file +versioned/v5/index_template_v7.json \ No newline at end of file diff --git a/schema/elasticsearch/visibility/versioned/v5/index_template_v7.json b/schema/elasticsearch/visibility/versioned/v5/index_template_v7.json new file mode 100644 index 00000000000..9529de530d6 --- /dev/null +++ b/schema/elasticsearch/visibility/versioned/v5/index_template_v7.json @@ -0,0 +1,87 @@ +{ + "order": 0, + "index_patterns": ["temporal_visibility_v1*"], + "settings": { + "index": { + "number_of_shards": "1", + "number_of_replicas": "0", + "auto_expand_replicas": "0-2", + "search.idle.after": "365d", + "sort.field": ["CloseTime", "StartTime", "RunId"], + "sort.order": ["desc", "desc", "desc"], + "sort.missing": ["_first", "_first", "_first"] + } + }, + "mappings": { + "dynamic": "false", + "properties": { + "NamespaceId": { + "type": "keyword" + }, + "TemporalNamespaceDivision": { + "type": "keyword" + }, + "WorkflowId": { + "type": "keyword" + }, + "RunId": { + "type": "keyword" + }, + "WorkflowType": { + "type": "keyword" + }, + "StartTime": { + "type": "date_nanos" + }, + "ExecutionTime": { + "type": "date_nanos" + }, + "CloseTime": { + "type": "date_nanos" + }, + "ExecutionDuration": { + "type": "long" + }, + "ExecutionStatus": { + "type": "keyword" + }, + "TaskQueue": { + "type": "keyword" + }, + "TemporalChangeVersion": { + "type": "keyword" + }, + "BatcherNamespace": { + "type": "keyword" + }, + "BatcherUser": { + "type": "keyword" + }, + "BinaryChecksums": { + "type": "keyword" + }, + "HistoryLength": { + "type": "long" + }, + "StateTransitionCount": { + "type": "long" + }, + "TemporalScheduledStartTime": { + "type": "date_nanos" + }, + "TemporalScheduledById": { + "type": "keyword" + }, + "TemporalSchedulePaused": { + "type": "boolean" + }, + "HistorySizeBytes": { + "type": "long" + }, + "BuildIds": { + "type": "keyword" + } + } + }, + "aliases": {} +} diff --git a/schema/elasticsearch/visibility/versioned/v5/upgrade.sh b/schema/elasticsearch/visibility/versioned/v5/upgrade.sh new file mode 100755 index 00000000000..7ebb0d28265 --- /dev/null +++ b/schema/elasticsearch/visibility/versioned/v5/upgrade.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +set -eu -o pipefail + +# Prerequisites: +# - jq +# - curl + +# Input parameters. +: "${ES_SCHEME:=http}" +: "${ES_SERVER:=127.0.0.1}" +: "${ES_PORT:=9200}" +: "${ES_USER:=}" +: "${ES_PWD:=}" +: "${ES_VERSION:=v7}" +: "${ES_VIS_INDEX_V1:=temporal_visibility_v1_dev}" +: "${AUTO_CONFIRM:=}" +: "${SLICES_COUNT:=auto}" + +es_endpoint="${ES_SCHEME}://${ES_SERVER}:${ES_PORT}" + +echo "=== Step 0. Sanity check if Elasticsearch index is accessible ===" + +if ! curl --silent --fail --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/${ES_VIS_INDEX_V1}/_stats/docs" --write-out "\n"; then + echo "Elasticsearch index ${ES_VIS_INDEX_V1} is not accessible at ${es_endpoint}." + exit 1 +fi + +echo "=== Step 1. Add new builtin search attributes ===" + +new_mapping=' +{ + "properties": { + "BuildIds": { + "type": "keyword" + } + } +} +' + +if [ -z "${AUTO_CONFIRM}" ]; then + read -p "Add new builtin search attributes to the index ${ES_VIS_INDEX_V1}? (N/y)" -n 1 -r + echo +else + REPLY="y" +fi +if [ "${REPLY}" = "y" ]; then + curl --silent --fail --user "${ES_USER}":"${ES_PWD}" -X PUT "${es_endpoint}/${ES_VIS_INDEX_V1}/_mapping" -H "Content-Type: application/json" --data-binary "$new_mapping" | jq + # Wait for mapping changes to go through. + until curl --silent --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/_cluster/health/${ES_VIS_INDEX_V1}" | jq --exit-status '.status=="green" | .'; do + echo "Waiting for Elasticsearch index ${ES_VIS_INDEX_V1} become green." + sleep 1 + done +fi diff --git a/schema/mysql/v8/visibility/schema.sql b/schema/mysql/v8/visibility/schema.sql index 65fc576ff4d..1ec784d00b9 100644 --- a/schema/mysql/v8/visibility/schema.sql +++ b/schema/mysql/v8/visibility/schema.sql @@ -37,6 +37,7 @@ CREATE TABLE executions_visibility ( TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>"$.TemporalScheduledById"), TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS (search_attributes->"$.TemporalSchedulePaused"), TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>"$.TemporalNamespaceDivision"), + BuildIds JSON GENERATED ALWAYS AS (search_attributes->"$.BuildIds"), PRIMARY KEY (namespace_id, run_id) ); @@ -53,6 +54,7 @@ CREATE INDEX by_task_queue ON executions_visibility (namespace_id, task -- Indexes for the predefined search attributes CREATE INDEX by_temporal_change_version ON executions_visibility (namespace_id, (CAST(TemporalChangeVersion AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id); CREATE INDEX by_binary_checksums ON executions_visibility (namespace_id, (CAST(BinaryChecksums AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id); +CREATE INDEX by_build_ids ON executions_visibility (namespace_id, (CAST(BuildIds AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id); CREATE INDEX by_batcher_user ON executions_visibility (namespace_id, BatcherUser, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id); CREATE INDEX by_temporal_scheduled_start_time ON executions_visibility (namespace_id, TemporalScheduledStartTime, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id); CREATE INDEX by_temporal_scheduled_by_id ON executions_visibility (namespace_id, TemporalScheduledById, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id); diff --git a/schema/mysql/v8/visibility/versioned/v1.3/add_build_ids_search_attribute.sql b/schema/mysql/v8/visibility/versioned/v1.3/add_build_ids_search_attribute.sql new file mode 100644 index 00000000000..921052cbebe --- /dev/null +++ b/schema/mysql/v8/visibility/versioned/v1.3/add_build_ids_search_attribute.sql @@ -0,0 +1,2 @@ +ALTER TABLE executions_visibility ADD COLUMN BuildIds JSON GENERATED ALWAYS AS (search_attributes->'BuildIds'); +CREATE INDEX by_build_ids ON executions_visibility (namespace_id, (CAST(BuildIds AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id); diff --git a/schema/mysql/v8/visibility/versioned/v1.3/manifest.json b/schema/mysql/v8/visibility/versioned/v1.3/manifest.json index 7a40fe1ffa2..5c9a6ae8be6 100644 --- a/schema/mysql/v8/visibility/versioned/v1.3/manifest.json +++ b/schema/mysql/v8/visibility/versioned/v1.3/manifest.json @@ -1,8 +1,9 @@ { "CurrVersion": "1.3", "MinCompatibleVersion": "0.1", - "Description": "add history size bytes", + "Description": "add history size bytes and build IDs visibility columns and indices", "SchemaUpdateCqlFiles": [ - "add_history_size_bytes.sql" + "add_history_size_bytes.sql", + "add_build_ids_search_attribute.sql" ] } diff --git a/schema/postgresql/v12/visibility/schema.sql b/schema/postgresql/v12/visibility/schema.sql index 5805e5d85db..523a74b6090 100644 --- a/schema/postgresql/v12/visibility/schema.sql +++ b/schema/postgresql/v12/visibility/schema.sql @@ -36,6 +36,7 @@ CREATE TABLE executions_visibility ( TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>'TemporalScheduledById') STORED, TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS ((search_attributes->'TemporalSchedulePaused')::boolean) STORED, TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>'TemporalNamespaceDivision') STORED, + BuildIds JSONB GENERATED ALWAYS AS (search_attributes->'BuildIds') STORED, -- Pre-allocated custom search attributes Bool01 BOOLEAN GENERATED ALWAYS AS ((search_attributes->'Bool01')::boolean) STORED, @@ -82,6 +83,7 @@ CREATE INDEX by_task_queue ON executions_visibility (namespace_id, task -- Indexes for the predefined search attributes CREATE INDEX by_temporal_change_version ON executions_visibility USING GIN (namespace_id, TemporalChangeVersion jsonb_path_ops); CREATE INDEX by_binary_checksums ON executions_visibility USING GIN (namespace_id, BinaryChecksums jsonb_path_ops); +CREATE INDEX by_build_ids ON executions_visibility USING GIN (namespace_id, BuildIds jsonb_path_ops); CREATE INDEX by_batcher_user ON executions_visibility (namespace_id, BatcherUser, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id); CREATE INDEX by_temporal_scheduled_start_time ON executions_visibility (namespace_id, TemporalScheduledStartTime, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id); CREATE INDEX by_temporal_scheduled_by_id ON executions_visibility (namespace_id, TemporalScheduledById, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id); diff --git a/schema/postgresql/v12/visibility/versioned/v1.3/add_build_ids_search_attribute.sql b/schema/postgresql/v12/visibility/versioned/v1.3/add_build_ids_search_attribute.sql new file mode 100644 index 00000000000..72d1e24e613 --- /dev/null +++ b/schema/postgresql/v12/visibility/versioned/v1.3/add_build_ids_search_attribute.sql @@ -0,0 +1,2 @@ +ALTER TABLE executions_visibility ADD COLUMN BuildIds JSONB GENERATED ALWAYS AS (search_attributes->'BuildIds') STORED; +CREATE INDEX by_build_ids ON executions_visibility USING GIN (namespace_id, BuildIds jsonb_path_ops); diff --git a/schema/postgresql/v12/visibility/versioned/v1.3/manifest.json b/schema/postgresql/v12/visibility/versioned/v1.3/manifest.json index 7a40fe1ffa2..5c9a6ae8be6 100644 --- a/schema/postgresql/v12/visibility/versioned/v1.3/manifest.json +++ b/schema/postgresql/v12/visibility/versioned/v1.3/manifest.json @@ -1,8 +1,9 @@ { "CurrVersion": "1.3", "MinCompatibleVersion": "0.1", - "Description": "add history size bytes", + "Description": "add history size bytes and build IDs visibility columns and indices", "SchemaUpdateCqlFiles": [ - "add_history_size_bytes.sql" + "add_history_size_bytes.sql", + "add_build_ids_search_attribute.sql" ] } diff --git a/schema/sqlite/v3/visibility/schema.sql b/schema/sqlite/v3/visibility/schema.sql index ccc76a19903..6274e2a84ef 100644 --- a/schema/sqlite/v3/visibility/schema.sql +++ b/schema/sqlite/v3/visibility/schema.sql @@ -22,6 +22,7 @@ CREATE TABLE executions_visibility ( TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalScheduledById")), TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalSchedulePaused")), TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalNamespaceDivision")), + BuildIds TEXT GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.BuildIds")) STORED, -- Pre-allocated custom search attributes Bool01 BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.Bool01")), @@ -114,6 +115,7 @@ CREATE VIRTUAL TABLE executions_visibility_fts_text USING fts5 ( CREATE VIRTUAL TABLE executions_visibility_fts_keyword_list USING fts5 ( TemporalChangeVersion, BinaryChecksums, + BuildIds, KeywordList01, KeywordList02, KeywordList03, @@ -140,6 +142,7 @@ BEGIN rowid, TemporalChangeVersion, BinaryChecksums, + BuildIds, KeywordList01, KeywordList02, KeywordList03 @@ -147,6 +150,7 @@ BEGIN NEW.rowid, NEW.TemporalChangeVersion, NEW.BinaryChecksums, + NEW.BuildIds, NEW.KeywordList01, NEW.KeywordList02, NEW.KeywordList03 @@ -175,6 +179,7 @@ BEGIN rowid, TemporalChangeVersion, BinaryChecksums, + BuildIds, KeywordList01, KeywordList02, KeywordList03 @@ -183,6 +188,7 @@ BEGIN OLD.rowid, OLD.TemporalChangeVersion, OLD.BinaryChecksums, + OLD.BuildIds, OLD.KeywordList01, OLD.KeywordList02, OLD.KeywordList03 @@ -222,6 +228,7 @@ BEGIN rowid, TemporalChangeVersion, BinaryChecksums, + BuildIds, KeywordList01, KeywordList02, KeywordList03 @@ -230,6 +237,7 @@ BEGIN OLD.rowid, OLD.TemporalChangeVersion, OLD.BinaryChecksums, + OLD.BuildIds, OLD.KeywordList01, OLD.KeywordList02, OLD.KeywordList03 @@ -238,6 +246,7 @@ BEGIN rowid, TemporalChangeVersion, BinaryChecksums, + BuildIds, KeywordList01, KeywordList02, KeywordList03 @@ -245,6 +254,7 @@ BEGIN NEW.rowid, NEW.TemporalChangeVersion, NEW.BinaryChecksums, + NEW.BuildIds, NEW.KeywordList01, NEW.KeywordList02, NEW.KeywordList03 diff --git a/service/frontend/errors.go b/service/frontend/errors.go index 66b1058ddae..9bfb454d86d 100644 --- a/service/frontend/errors.go +++ b/service/frontend/errors.go @@ -81,7 +81,8 @@ var ( errCronAndStartDelaySet = serviceerror.NewInvalidArgument("CronSchedule and WorkflowStartDelay may not be used together.") errInvalidWorkflowStartDelaySeconds = serviceerror.NewInvalidArgument("An invalid WorkflowStartDelaySeconds is set on request.") errRaceConditionAddingSearchAttributes = serviceerror.NewUnavailable("Generated search attributes mapping unavailble.") - errUseVersioningWithoutBuildID = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.") + errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.") + errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.") errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.") errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.") diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 753d297f20f..cf7a68c1a84 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -874,6 +874,10 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w return nil, errIdentityTooLong } + if len(request.GetWorkerVersionCapabilities().GetBuildId()) > wh.config.WorkerBuildIdSizeLimit() { + return nil, errBuildIdTooLong + } + if err := wh.validateTaskQueue(request.TaskQueue); err != nil { return nil, err } @@ -966,6 +970,9 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted( if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() { return nil, errIdentityTooLong } + if len(request.GetWorkerVersionStamp().GetBuildId()) > wh.config.WorkerBuildIdSizeLimit() { + return nil, errBuildIdTooLong + } taskToken, err := wh.tokenSerializer.Deserialize(request.TaskToken) if err != nil { @@ -973,16 +980,8 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted( } namespaceId := namespace.ID(taskToken.GetNamespaceId()) - if request.UseVersioning && len(request.WorkerVersionStamp.GetBuildId()) == 0 { - return nil, errUseVersioningWithoutBuildID - } - - // Copy WorkerVersionStamp to BinaryChecksum if BinaryChecksum is missing (small - // optimization to save space in the request). - if request.WorkerVersionStamp != nil { - if len(request.WorkerVersionStamp.BuildId) > 0 && len(request.BinaryChecksum) == 0 { - request.BinaryChecksum = request.WorkerVersionStamp.BuildId - } + if request.WorkerVersionStamp.GetUseVersioning() && len(request.WorkerVersionStamp.GetBuildId()) == 0 { + return nil, errUseVersioningWithoutBuildId } wh.overrides.DisableEagerActivityDispatchForBuggyClients(ctx, request) @@ -1131,6 +1130,10 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w return nil, errIdentityTooLong } + if len(request.GetWorkerVersionCapabilities().GetBuildId()) > wh.config.WorkerBuildIdSizeLimit() { + return nil, errBuildIdTooLong + } + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 5a5d8274a23..4a9d4b6ddf9 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -56,6 +56,7 @@ type Config struct { EmitShardLagLog dynamicconfig.BoolPropertyFn MaxAutoResetPoints dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxTrackedBuildIds dynamicconfig.IntPropertyFnWithNamespaceFilter ThrottledLogRPS dynamicconfig.IntPropertyFn EnableStickyQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter ShutdownDrainDuration dynamicconfig.DurationPropertyFn @@ -298,6 +299,7 @@ type Config struct { const ( DefaultHistoryMaxAutoResetPoints = 20 + DefaultHistoryMaxTrackedBuildIds = 20 ) // NewConfig returns new service config with default values @@ -320,6 +322,7 @@ func NewConfig( EnablePersistencePriorityRateLimiting: dc.GetBoolProperty(dynamicconfig.HistoryEnablePersistencePriorityRateLimiting, true), ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.HistoryShutdownDrainDuration, 0*time.Second), MaxAutoResetPoints: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryMaxAutoResetPoints, DefaultHistoryMaxAutoResetPoints), + MaxTrackedBuildIds: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryMaxTrackedBuildIds, DefaultHistoryMaxTrackedBuildIds), DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), ContinueAsNewMinInterval: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.ContinueAsNewMinInterval, time.Second), diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index c5a79e51663..1d8ba25346c 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -5384,7 +5384,7 @@ func addWorkflowTaskCompletedEvent(s *suite.Suite, ms workflow.MutableState, sch event, _ := ms.AddWorkflowTaskCompletedEvent(workflowTask, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: identity, - }, configs.DefaultHistoryMaxAutoResetPoints) + }, defaultWorkflowTaskCompletionLimits) ms.FlushBufferedEvents() diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index 74681c07207..527555de943 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -125,6 +125,8 @@ type ( } ) +var defaultWorkflowTaskCompletionLimits = workflow.WorkflowTaskCompletionLimits{MaxResetPoints: configs.DefaultHistoryMaxAutoResetPoints, MaxTrackedBuildIds: configs.DefaultHistoryMaxTrackedBuildIds} + func TestTransferQueueActiveTaskExecutorSuite(t *testing.T) { s := new(transferQueueActiveTaskExecutorSuite) suite.Run(t, s) @@ -949,7 +951,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen }}, }, }, - }, configs.DefaultHistoryMaxAutoResetPoints) + }, defaultWorkflowTaskCompletionLimits) _, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{ Namespace: "child namespace1", @@ -1074,7 +1076,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: "some random identity", Commands: commands, - }, configs.DefaultHistoryMaxAutoResetPoints) + }, defaultWorkflowTaskCompletionLimits) for i := 0; i < 10; i++ { _, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{ @@ -1168,7 +1170,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: "some random identity", Commands: commands, - }, configs.DefaultHistoryMaxAutoResetPoints) + }, defaultWorkflowTaskCompletionLimits) for i := 0; i < 10; i++ { _, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{ @@ -1267,7 +1269,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen }}, }, }, - }, configs.DefaultHistoryMaxAutoResetPoints) + }, defaultWorkflowTaskCompletionLimits) _, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{ Namespace: "child namespace1", diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index 5fe2744f1a5..3925e5462b2 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -99,6 +99,11 @@ type ( HistorySizeBytes int64 } + WorkflowTaskCompletionLimits struct { + MaxResetPoints int + MaxTrackedBuildIds int + } + MutableState interface { AddActivityTaskCancelRequestedEvent(int64, int64, string) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error) AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error) @@ -115,7 +120,7 @@ type ( AddChildWorkflowExecutionTimedOutEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionTimedOutEventAttributes) (*historypb.HistoryEvent, error) AddCompletedWorkflowEvent(int64, *commandpb.CompleteWorkflowExecutionCommandAttributes, string) (*historypb.HistoryEvent, error) AddContinueAsNewEvent(context.Context, int64, int64, namespace.Name, *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes) (*historypb.HistoryEvent, MutableState, error) - AddWorkflowTaskCompletedEvent(*WorkflowTaskInfo, *workflowservice.RespondWorkflowTaskCompletedRequest, int) (*historypb.HistoryEvent, error) + AddWorkflowTaskCompletedEvent(*WorkflowTaskInfo, *workflowservice.RespondWorkflowTaskCompletedRequest, WorkflowTaskCompletionLimits) (*historypb.HistoryEvent, error) AddWorkflowTaskFailedEvent(workflowTask *WorkflowTaskInfo, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*historypb.HistoryEvent, error) AddWorkflowTaskScheduleToStartTimeoutEvent(int64) (*historypb.HistoryEvent, error) AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 7f04cc3546f..a4f4da88bd4 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -1898,57 +1898,119 @@ func (ms *MutableStateImpl) addBinaryCheckSumIfNotExists( if len(binChecksum) == 0 { return nil } + if !ms.addResetPointFromCompletion(binChecksum, event.GetEventId(), maxResetPoints) { + return nil + } exeInfo := ms.executionInfo - var currResetPoints []*workflowpb.ResetPointInfo - if exeInfo.AutoResetPoints != nil && exeInfo.AutoResetPoints.Points != nil { - currResetPoints = ms.executionInfo.AutoResetPoints.Points - } else { - currResetPoints = make([]*workflowpb.ResetPointInfo, 0, 1) + resetPoints := exeInfo.AutoResetPoints.Points + // List of all recent binary checksums associated with the workflow. + recentBinaryChecksums := make([]string, len(resetPoints)) + + for i, rp := range resetPoints { + recentBinaryChecksums[i] = rp.GetBinaryChecksum() } - // List of all recent binary checksums associated with the workflow. - var recentBinaryChecksums []string + checksumsPayload, err := searchattribute.EncodeValue(recentBinaryChecksums, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) + if err != nil { + return err + } + if exeInfo.SearchAttributes == nil { + exeInfo.SearchAttributes = make(map[string]*commonpb.Payload, 1) + } + exeInfo.SearchAttributes[searchattribute.BinaryChecksums] = checksumsPayload + return ms.taskGenerator.GenerateUpsertVisibilityTask() +} - for _, rp := range currResetPoints { - recentBinaryChecksums = append(recentBinaryChecksums, rp.GetBinaryChecksum()) - if rp.GetBinaryChecksum() == binChecksum { - // this checksum already exists - return nil +// Add a reset point for current task completion if needed. +// Returns true if the reset point was added or false if there was no need or no ability to add. +func (ms *MutableStateImpl) addResetPointFromCompletion( + binaryChecksum string, + eventID int64, + maxResetPoints int, +) bool { + if maxResetPoints < 1 { + // Nothing to do here + return false + } + exeInfo := ms.executionInfo + var resetPoints []*workflowpb.ResetPointInfo + if exeInfo.AutoResetPoints != nil && exeInfo.AutoResetPoints.Points != nil { + resetPoints = ms.executionInfo.AutoResetPoints.Points + if len(resetPoints) >= maxResetPoints { + // If limit is exceeded, drop the oldest ones. + resetPoints = resetPoints[len(resetPoints)-maxResetPoints+1:] } + } else { + resetPoints = make([]*workflowpb.ResetPointInfo, 0, 1) } - if len(currResetPoints) == maxResetPoints { - // If exceeding the max limit, do rotation by taking the oldest one out. - currResetPoints = currResetPoints[1:] - recentBinaryChecksums = recentBinaryChecksums[1:] + for _, rp := range resetPoints { + if rp.GetBinaryChecksum() == binaryChecksum { + return false + } } - // Adding current version of the binary checksum. - recentBinaryChecksums = append(recentBinaryChecksums, binChecksum) - resettable := true - err := ms.CheckResettable() - if err != nil { - resettable = false - } info := &workflowpb.ResetPointInfo{ - BinaryChecksum: binChecksum, + BinaryChecksum: binaryChecksum, RunId: ms.executionState.GetRunId(), - FirstWorkflowTaskCompletedId: event.GetEventId(), + FirstWorkflowTaskCompletedId: eventID, CreateTime: timestamp.TimePtr(ms.timeSource.Now()), - Resettable: resettable, + Resettable: ms.CheckResettable() == nil, } - currResetPoints = append(currResetPoints, info) exeInfo.AutoResetPoints = &workflowpb.ResetPoints{ - Points: currResetPoints, + Points: append(resetPoints, info), } - checksumsPayload, err := searchattribute.EncodeValue(recentBinaryChecksums, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) + return true +} + +// Similar to (the to-be-deprecated) addBinaryCheckSumIfNotExists but works on build IDs. +func (ms *MutableStateImpl) trackBuildIdFromCompletion( + buildID string, + eventID int64, + limits WorkflowTaskCompletionLimits, +) error { + ms.addResetPointFromCompletion(buildID, eventID, limits.MaxResetPoints) + if limits.MaxTrackedBuildIds < 1 { + // Can't track this build ID + return nil + } + searchAttributes := ms.executionInfo.SearchAttributes + if searchAttributes == nil { + searchAttributes = make(map[string]*commonpb.Payload, 1) + ms.executionInfo.SearchAttributes = searchAttributes + } + + var buildIDs []string + saPayload, found := searchAttributes[searchattribute.BuildIds] + if !found { + buildIDs = make([]string, 0, 1) + } else { + decoded, err := searchattribute.DecodeValue(saPayload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, true) + if err != nil { + return err + } + var ok bool + buildIDs, ok = decoded.([]string) + if !ok { + return serviceerror.NewInternal("invalid search attribute value stored for BuildIds") + } + for _, exisitingID := range buildIDs { + if exisitingID == buildID { + return nil + } + } + if len(buildIDs) >= limits.MaxTrackedBuildIds { + buildIDs = buildIDs[len(buildIDs)-limits.MaxTrackedBuildIds+1:] + } + } + + buildIDs = append(buildIDs, buildID) + + saPayload, err := searchattribute.EncodeValue(buildIDs, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) if err != nil { return err } - if exeInfo.SearchAttributes == nil { - exeInfo.SearchAttributes = make(map[string]*commonpb.Payload, 1) - } - exeInfo.SearchAttributes[searchattribute.BinaryChecksums] = checksumsPayload + searchAttributes[searchattribute.BuildIds] = saPayload return ms.taskGenerator.GenerateUpsertVisibilityTask() } @@ -1971,13 +2033,13 @@ func (ms *MutableStateImpl) CheckResettable() error { func (ms *MutableStateImpl) AddWorkflowTaskCompletedEvent( workflowTask *WorkflowTaskInfo, request *workflowservice.RespondWorkflowTaskCompletedRequest, - maxResetPoints int, + limits WorkflowTaskCompletionLimits, ) (*historypb.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowTaskCompleted if err := ms.checkMutability(opTag); err != nil { return nil, err } - return ms.workflowTaskManager.AddWorkflowTaskCompletedEvent(workflowTask, request, maxResetPoints) + return ms.workflowTaskManager.AddWorkflowTaskCompletedEvent(workflowTask, request, limits) } func (ms *MutableStateImpl) ReplicateWorkflowTaskCompletedEvent( diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 12c9d93c5ea..ae2b12855c8 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -57,6 +57,7 @@ import ( "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/shard" @@ -1049,3 +1050,64 @@ func (s *mutableStateSuite) TestSpeculativeWorkflowTaskNotPersisted() { }) } } + +func (s *mutableStateSuite) TestTrackBuildIdFromCompletion() { + dbState := s.buildWorkflowMutableState() + var err error + s.mutableState, err = newMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, tests.LocalNamespaceEntry, dbState, 123) + s.NoError(err) + + // Max 0 + err = s.mutableState.trackBuildIdFromCompletion("0.1", 4, WorkflowTaskCompletionLimits{MaxResetPoints: 0, MaxTrackedBuildIds: 0}) + s.NoError(err) + s.Equal([]string{}, s.getBuildIdsFromMutableState()) + s.Equal([]string{}, s.getResetPointsBinaryChecksumsFromMutableState()) + + err = s.mutableState.trackBuildIdFromCompletion("0.1", 4, WorkflowTaskCompletionLimits{MaxResetPoints: 2, MaxTrackedBuildIds: 2}) + s.NoError(err) + s.Equal([]string{"0.1"}, s.getBuildIdsFromMutableState()) + s.Equal([]string{"0.1"}, s.getResetPointsBinaryChecksumsFromMutableState()) + + // Add the same build ID + err = s.mutableState.trackBuildIdFromCompletion("0.1", 4, WorkflowTaskCompletionLimits{MaxResetPoints: 2, MaxTrackedBuildIds: 2}) + s.NoError(err) + s.Equal([]string{"0.1"}, s.getBuildIdsFromMutableState()) + s.Equal([]string{"0.1"}, s.getResetPointsBinaryChecksumsFromMutableState()) + + err = s.mutableState.trackBuildIdFromCompletion("0.2", 4, WorkflowTaskCompletionLimits{MaxResetPoints: 2, MaxTrackedBuildIds: 2}) + s.NoError(err) + s.Equal([]string{"0.1", "0.2"}, s.getBuildIdsFromMutableState()) + s.Equal([]string{"0.1", "0.2"}, s.getResetPointsBinaryChecksumsFromMutableState()) + + // Limit applies + err = s.mutableState.trackBuildIdFromCompletion("0.3", 4, WorkflowTaskCompletionLimits{MaxResetPoints: 2, MaxTrackedBuildIds: 2}) + s.NoError(err) + s.Equal([]string{"0.2", "0.3"}, s.getBuildIdsFromMutableState()) + s.Equal([]string{"0.2", "0.3"}, s.getResetPointsBinaryChecksumsFromMutableState()) +} + +func (s *mutableStateSuite) getBuildIdsFromMutableState() []string { + searchAttributes := s.mutableState.executionInfo.SearchAttributes + if searchAttributes == nil { + return []string{} + } + + payload, found := searchAttributes[searchattribute.BuildIds] + if !found { + return []string{} + } + decoded, err := searchattribute.DecodeValue(payload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, true) + s.NoError(err) + buildIDs, ok := decoded.([]string) + s.True(ok) + return buildIDs +} + +func (s *mutableStateSuite) getResetPointsBinaryChecksumsFromMutableState() []string { + resetPoints := s.mutableState.executionInfo.GetAutoResetPoints().GetPoints() + binaryChecksums := make([]string, len(resetPoints)) + for i, point := range resetPoints { + binaryChecksums[i] = point.GetBinaryChecksum() + } + return binaryChecksums +} diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 6bd58b38561..676b4a6df48 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -714,7 +714,7 @@ func (mr *MockMutableStateMockRecorder) AddWorkflowPropertiesModifiedEvent(arg0, } // AddWorkflowTaskCompletedEvent mocks base method. -func (m *MockMutableState) AddWorkflowTaskCompletedEvent(arg0 *WorkflowTaskInfo, arg1 *v17.RespondWorkflowTaskCompletedRequest, arg2 int) (*v13.HistoryEvent, error) { +func (m *MockMutableState) AddWorkflowTaskCompletedEvent(arg0 *WorkflowTaskInfo, arg1 *v17.RespondWorkflowTaskCompletedRequest, arg2 WorkflowTaskCompletionLimits) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddWorkflowTaskCompletedEvent", arg0, arg1, arg2) ret0, _ := ret[0].(*v13.HistoryEvent) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index f82de1ae26f..034b63b9879 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -212,7 +212,7 @@ func (m *workflowTaskStateMachine) ReplicateWorkflowTaskCompletedEvent( event *historypb.HistoryEvent, ) error { m.beforeAddWorkflowTaskCompletedEvent() - return m.afterAddWorkflowTaskCompletedEvent(event, math.MaxInt32) + return m.afterAddWorkflowTaskCompletedEvent(event, WorkflowTaskCompletionLimits{math.MaxInt32, math.MaxInt32}) } func (m *workflowTaskStateMachine) ReplicateWorkflowTaskFailedEvent() error { @@ -500,7 +500,7 @@ func (m *workflowTaskStateMachine) skipWorkflowTaskCompletedEvent(workflowTaskTy func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( workflowTask *WorkflowTaskInfo, request *workflowservice.RespondWorkflowTaskCompletedRequest, - maxResetPoints int, + limits WorkflowTaskCompletionLimits, ) (*historypb.HistoryEvent, error) { // Capture if WorkflowTaskScheduled and WorkflowTaskStarted events were created @@ -534,22 +534,17 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( } // Now write the completed event - versionStamp := request.WorkerVersionStamp - if !request.UseVersioning { - versionStamp = nil - } - // TODO: omit BinaryChecksum if version stamp is present and matches BinaryChecksum event := m.ms.hBuilder.AddWorkflowTaskCompletedEvent( workflowTask.ScheduledEventID, workflowTask.StartedEventID, request.Identity, request.BinaryChecksum, - versionStamp, + request.WorkerVersionStamp, request.SdkMetadata, request.MeteringMetadata, ) - err := m.afterAddWorkflowTaskCompletedEvent(event, maxResetPoints) + err := m.afterAddWorkflowTaskCompletedEvent(event, limits) if err != nil { return nil, err } @@ -876,12 +871,16 @@ func (m *workflowTaskStateMachine) beforeAddWorkflowTaskCompletedEvent() { func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent( event *historypb.HistoryEvent, - maxResetPoints int, + limits WorkflowTaskCompletionLimits, ) error { attrs := event.GetWorkflowTaskCompletedEventAttributes() m.ms.executionInfo.LastWorkflowTaskStartedEventId = attrs.GetStartedEventId() m.ms.executionInfo.WorkerVersionStamp = attrs.GetWorkerVersion() - return m.ms.addBinaryCheckSumIfNotExists(event, maxResetPoints) + buildID := attrs.GetWorkerVersion().GetBuildId() + if buildID != "" { + return m.ms.trackBuildIdFromCompletion(buildID, event.GetEventId(), limits) + } + return m.ms.addBinaryCheckSumIfNotExists(event, limits.MaxResetPoints) } func (m *workflowTaskStateMachine) emitWorkflowTaskAttemptStats( diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index c9fcf61b9a9..f38755c1db4 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -406,14 +406,16 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( } // It's an error if the workflow has used versioning in the past but this task has no versioning info. - if ms.GetWorkerVersionStamp() != nil { - if !request.UseVersioning || len(request.WorkerVersionStamp.GetBuildId()) == 0 { - return nil, serviceerror.NewInvalidArgument("Workflow using versioning must continue to use versioning.") - } + if ms.GetWorkerVersionStamp().GetUseVersioning() && !request.GetWorkerVersionStamp().UseVersioning { + return nil, serviceerror.NewInvalidArgument("Workflow using versioning must continue to use versioning.") } - maxResetPoints := handler.config.MaxAutoResetPoints(namespaceEntry.Name().String()) - if ms.GetExecutionInfo().AutoResetPoints != nil && maxResetPoints == len(ms.GetExecutionInfo().AutoResetPoints.Points) { + limits := workflow.WorkflowTaskCompletionLimits{ + MaxResetPoints: handler.config.MaxAutoResetPoints(namespaceEntry.Name().String()), + MaxTrackedBuildIds: handler.config.MaxTrackedBuildIds(namespaceEntry.Name().String()), + } + // TODO: this metric is inaccurate, it should only be emitted if a new binary checksum (or build ID) is added in this completion. + if ms.GetExecutionInfo().AutoResetPoints != nil && limits.MaxResetPoints == len(ms.GetExecutionInfo().AutoResetPoints.Points) { handler.metricsHandler.Counter(metrics.AutoResetPointsLimitExceededCounter.GetMetricName()).Record( 1, metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)) @@ -442,13 +444,13 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( } ms.ClearStickyness() } else { - completedEvent, err = ms.AddWorkflowTaskCompletedEvent(currentWorkflowTask, request, maxResetPoints) + completedEvent, err = ms.AddWorkflowTaskCompletedEvent(currentWorkflowTask, request, limits) if err != nil { return nil, err } } } else { - completedEvent, err = ms.AddWorkflowTaskCompletedEvent(currentWorkflowTask, request, maxResetPoints) + completedEvent, err = ms.AddWorkflowTaskCompletedEvent(currentWorkflowTask, request, limits) if err != nil { return nil, err } diff --git a/service/matching/config.go b/service/matching/config.go index 36d17dd9bc2..31f9b3b6e44 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -58,7 +58,7 @@ type ( ForwarderMaxRatePerSecond dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters VersionCompatibleSetLimitPerQueue dynamicconfig.IntPropertyFn - VersionBuildIDLimitPerQueue dynamicconfig.IntPropertyFn + VersionBuildIdLimitPerQueue dynamicconfig.IntPropertyFn UserDataPollFrequency dynamicconfig.DurationPropertyFn // Time to hold a poll request before returning an empty response if there are no tasks @@ -149,7 +149,7 @@ func NewConfig(dc *dynamicconfig.Collection) *Config { ForwarderMaxChildrenPerNode: dc.GetIntPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingForwarderMaxChildrenPerNode, 20), ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.MatchingShutdownDrainDuration, 0*time.Second), VersionCompatibleSetLimitPerQueue: dc.GetIntProperty(dynamicconfig.VersionCompatibleSetLimitPerQueue, 10), - VersionBuildIDLimitPerQueue: dc.GetIntProperty(dynamicconfig.VersionBuildIDLimitPerQueue, 1000), + VersionBuildIdLimitPerQueue: dc.GetIntProperty(dynamicconfig.VersionBuildIdLimitPerQueue, 1000), UserDataPollFrequency: dc.GetDurationProperty(dynamicconfig.MatchingUserDataPollFrequency, 5*time.Minute), AdminNamespaceToPartitionDispatchRate: dc.GetFloatPropertyFilteredByNamespace(dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate, 10000), diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 6a4b11982f7..2e0af68990b 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -767,7 +767,7 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility( data.GetVersioningData(), req.GetRequest(), e.config.VersionCompatibleSetLimitPerQueue(), - e.config.VersionBuildIDLimitPerQueue(), + e.config.VersionBuildIdLimitPerQueue(), ) if err != nil { return nil, err @@ -1185,6 +1185,9 @@ func (e *matchingEngineImpl) redirectToVersionedQueueForAdd( if kind == enumspb.TASK_QUEUE_KIND_STICKY { return taskQueue, nil } + if !stamp.GetUseVersioning() { + return taskQueue, nil + } unversionedTQM, err := e.getTaskQueueManager(ctx, taskQueue, kind, true) if err != nil { return nil, err diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index 8f558d4f90d..afe9eb6d4a5 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -40,7 +40,7 @@ import ( var ( // TODO: all of these errors are temporary, we'll handle all these cases in future PRs - errBuildIDNotFound = serviceerror.NewInvalidArgument("build ID not found") + errBuildIdNotFound = serviceerror.NewInvalidArgument("build ID not found") errEmptyVersioningData = serviceerror.NewInvalidArgument("versioning data is empty") errPollWithVersionOnUnversionedQueue = serviceerror.NewInvalidArgument("poll with version capabilities on unversioned queue") errPollOnVersionedQueueWithNoVersion = serviceerror.NewInvalidArgument("poll on versioned queue with no version capabilities") @@ -67,20 +67,20 @@ func ToBuildIdOrderingResponse(data *persistencespb.VersioningData, maxSets int) return &workflowservice.GetWorkerBuildIdCompatibilityResponse{MajorVersionSets: versionSets} } -func checkLimits(g *persistencespb.VersioningData, maxSets, maxBuildIDs int) error { +func checkLimits(g *persistencespb.VersioningData, maxSets, maxBuildIds int) error { sets := g.GetVersionSets() if maxSets > 0 && len(sets) > maxSets { return serviceerror.NewFailedPrecondition(fmt.Sprintf("update would exceed number of compatible version sets permitted in namespace dynamic config (%v/%v)", len(sets), maxSets)) } - if maxBuildIDs == 0 { + if maxBuildIds == 0 { return nil } - numBuildIDs := 0 + numBuildIds := 0 for _, set := range sets { - numBuildIDs += len(set.GetBuildIds()) + numBuildIds += len(set.GetBuildIds()) } - if numBuildIDs > maxBuildIDs { - return serviceerror.NewFailedPrecondition(fmt.Sprintf("update would exceed number of build IDs permitted in namespace dynamic config (%v/%v)", numBuildIDs, maxBuildIDs)) + if numBuildIds > maxBuildIds { + return serviceerror.NewFailedPrecondition(fmt.Sprintf("update would exceed number of build IDs permitted in namespace dynamic config (%v/%v)", numBuildIds, maxBuildIds)) } return nil } @@ -112,18 +112,18 @@ func checkLimits(g *persistencespb.VersioningData, maxSets, maxBuildIDs int) err // Deletions are performed by a background process which verifies build IDs are no longer in use and safe to delete (not yet implemented). // // Update may fail with FailedPrecondition if it would cause exceeding the supplied limits. -func UpdateVersionSets(clock hlc.Clock, data *persistencespb.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIDs int) (*persistencespb.VersioningData, error) { +func UpdateVersionSets(clock hlc.Clock, data *persistencespb.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIds int) (*persistencespb.VersioningData, error) { data, err := updateImpl(clock, data, req) if err != nil { return nil, err } - if err := checkLimits(data, maxSets, maxBuildIDs); err != nil { + if err := checkLimits(data, maxSets, maxBuildIds); err != nil { return nil, err } return data, nil } -func hashBuildID(buildID string) string { +func hashBuildId(buildID string) string { bytes := []byte(buildID) summed := sha256.Sum256(bytes) // 20 base64 chars of entropy is enough for this case @@ -155,8 +155,8 @@ func updateImpl(timestamp hlc.Clock, existingData *persistencespb.VersioningData } modifiedData.VersionSets = append(modifiedData.VersionSets, &persistencespb.CompatibleVersionSet{ - SetIds: []string{hashBuildID(targetedVersion)}, - BuildIds: []*persistencespb.BuildID{{Id: targetedVersion, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: ×tamp}}, + SetIds: []string{hashBuildId(targetedVersion)}, + BuildIds: []*persistencespb.BuildId{{Id: targetedVersion, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: ×tamp}}, }) makeVersionInSetDefault(&modifiedData, len(modifiedData.VersionSets)-1, 0, ×tamp) makeDefaultSet(&modifiedData, len(modifiedData.VersionSets)-1, ×tamp) @@ -187,13 +187,13 @@ func updateImpl(timestamp hlc.Clock, existingData *persistencespb.VersioningData lastIdx := len(existingData.VersionSets[compatSetIdx].BuildIds) modifiedData.VersionSets[compatSetIdx] = &persistencespb.CompatibleVersionSet{ SetIds: existingData.VersionSets[compatSetIdx].SetIds, - BuildIds: make([]*persistencespb.BuildID, lastIdx+1), + BuildIds: make([]*persistencespb.BuildId, lastIdx+1), } copy(modifiedData.VersionSets[compatSetIdx].BuildIds, existingData.VersionSets[compatSetIdx].BuildIds) // If the version doesn't exist, add it to the compatible set modifiedData.VersionSets[compatSetIdx].BuildIds[lastIdx] = - &persistencespb.BuildID{Id: targetedVersion, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: ×tamp} + &persistencespb.BuildId{Id: targetedVersion, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: ×tamp} makeVersionInSetDefault(&modifiedData, compatSetIdx, lastIdx, ×tamp) if addNew.GetMakeSetDefault() { makeDefaultSet(&modifiedData, compatSetIdx, ×tamp) @@ -216,8 +216,8 @@ func updateImpl(timestamp hlc.Clock, existingData *persistencespb.VersioningData return existingData, nil } // We're gonna have to copy here to to avoid mutating the original - numBuildIDs := len(existingData.GetVersionSets()[targetSetIdx].BuildIds) - buildIDsCopy := make([]*persistencespb.BuildID, numBuildIDs) + numBuildIds := len(existingData.GetVersionSets()[targetSetIdx].BuildIds) + buildIDsCopy := make([]*persistencespb.BuildId, numBuildIds) copy(buildIDsCopy, existingData.VersionSets[targetSetIdx].BuildIds) modifiedData.VersionSets[targetSetIdx] = &persistencespb.CompatibleVersionSet{ SetIds: existingData.VersionSets[targetSetIdx].SetIds, @@ -287,7 +287,7 @@ func lookupVersionSetForPoll(data *persistencespb.VersioningData, caps *commonpb setIdx, _ := findVersion(data, caps.BuildId) if setIdx < 0 { // TODO: consider making an ephemeral set so we can match even if replication fails - return "", errBuildIDNotFound + return "", errBuildIdNotFound } set := data.VersionSets[setIdx] latestInSet := set.BuildIds[len(set.BuildIds)-1].Id @@ -314,7 +314,7 @@ func lookupVersionSetForAdd(data *persistencespb.VersioningData, stamp *commonpb setIdx, _ := findVersion(data, stamp.BuildId) if setIdx < 0 { // TODO: consider making an ephemeral set so we can match even if replication fails - return "", errBuildIDNotFound + return "", errBuildIdNotFound } set = data.VersionSets[setIdx] } diff --git a/service/matching/version_sets_merge.go b/service/matching/version_sets_merge.go index 163776a9b6c..bc459243af9 100644 --- a/service/matching/version_sets_merge.go +++ b/service/matching/version_sets_merge.go @@ -70,13 +70,13 @@ func findSetWithSetIDs(sets []*persistencespb.CompatibleVersionSet, ids []string } type buildIDInfo struct { - state persistencespb.BuildID_State + state persistencespb.BuildId_State stateUpdateTimestamp hlc.Clock setIDs []string madeDefaultAt hlc.Clock } -func collectBuildIDInfo(sets []*persistencespb.CompatibleVersionSet) map[string]buildIDInfo { +func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string]buildIDInfo { buildIDToInfo := make(map[string]buildIDInfo, 0) for _, set := range sets { lastIdx := len(set.BuildIds) - 1 @@ -125,7 +125,7 @@ func intoVersionSets(buildIDToInfo map[string]buildIDInfo, defaultSetIds []strin defaultTimestamp := hlc.Zero(0) set = &persistencespb.CompatibleVersionSet{ SetIds: info.setIDs, - BuildIds: make([]*persistencespb.BuildID, 0), + BuildIds: make([]*persistencespb.BuildId, 0), DefaultUpdateTimestamp: &defaultTimestamp, } sets = append(sets, set) @@ -133,7 +133,7 @@ func intoVersionSets(buildIDToInfo map[string]buildIDInfo, defaultSetIds []strin set.SetIds = mergeSetIDs(set.SetIds, info.setIDs) } timestamp := info.stateUpdateTimestamp - buildID := &persistencespb.BuildID{ + buildID := &persistencespb.BuildId{ Id: id, State: info.state, StateUpdateTimestamp: ×tamp, @@ -189,7 +189,7 @@ func MergeVersioningData(a *persistencespb.VersioningData, b *persistencespb.Ver } // Collect information about each build ID from both sources - buildIDToInfo := collectBuildIDInfo(append(a.VersionSets, b.VersionSets...)) + buildIDToInfo := collectBuildIdInfo(append(a.VersionSets, b.VersionSets...)) maxDefaultTimestamp := hlc.Max(*b.DefaultUpdateTimestamp, *a.DefaultUpdateTimestamp) diff --git a/service/matching/version_sets_merge_test.go b/service/matching/version_sets_merge_test.go index 9f0533b98aa..99ed3d1437a 100644 --- a/service/matching/version_sets_merge_test.go +++ b/service/matching/version_sets_merge_test.go @@ -37,23 +37,23 @@ func fromWallClock(wallclock int64) *hlc.Clock { return &hlc.Clock{WallClock: wallclock, Version: 0, ClusterId: 1} } -func buildID(wallclock int64, id string, optionalState ...persistencespb.BuildID_State) *persistencespb.BuildID { +func buildID(wallclock int64, id string, optionalState ...persistencespb.BuildId_State) *persistencespb.BuildId { state := persistencespb.STATE_ACTIVE if len(optionalState) == 1 { state = optionalState[0] } - return &persistencespb.BuildID{ + return &persistencespb.BuildId{ Id: id, State: state, StateUpdateTimestamp: fromWallClock(wallclock), } } -func mkBuildIDs(buildIDs ...*persistencespb.BuildID) []*persistencespb.BuildID { - buildIDStructs := make([]*persistencespb.BuildID, len(buildIDs)) +func mkBuildIds(buildIDs ...*persistencespb.BuildId) []*persistencespb.BuildId { + buildIDStructs := make([]*persistencespb.BuildId, len(buildIDs)) for i, buildID := range buildIDs { - buildIDStructs[i] = &persistencespb.BuildID{ + buildIDStructs[i] = &persistencespb.BuildId{ Id: buildID.Id, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: buildID.StateUpdateTimestamp, @@ -62,22 +62,22 @@ func mkBuildIDs(buildIDs ...*persistencespb.BuildID) []*persistencespb.BuildID { return buildIDStructs } -func mkSet(setID string, buildIDs ...*persistencespb.BuildID) *persistencespb.CompatibleVersionSet { +func mkSet(setID string, buildIDs ...*persistencespb.BuildId) *persistencespb.CompatibleVersionSet { return &persistencespb.CompatibleVersionSet{ SetIds: []string{setID}, - BuildIds: mkBuildIDs(buildIDs...), + BuildIds: mkBuildIds(buildIDs...), DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, } } -func mkSingleSetData(setID string, buildIDs ...*persistencespb.BuildID) *persistencespb.VersioningData { +func mkSingleSetData(setID string, buildIDs ...*persistencespb.BuildId) *persistencespb.VersioningData { return &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{mkSet(setID, buildIDs...)}, DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp, } } -func TestSetMerge_IdenticalBuildIDsAndGreaterUpdateTimestamp_SetsMaxUpdateTimestamp(t *testing.T) { +func TestSetMerge_IdenticalBuildIdsAndGreaterUpdateTimestamp_SetsMaxUpdateTimestamp(t *testing.T) { // look here 👇 a := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(6, "0.2")) b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) @@ -93,7 +93,7 @@ func TestSetMerge_DataIsNil(t *testing.T) { assert.Equal(t, nilData, MergeVersioningData(nil, nil)) } -func TestSetMerge_AdditionalBuildIDAndGreaterUpdateTimestamp_MergesBuildIDsAndSetsMaxUpdateTimestamp(t *testing.T) { +func TestSetMerge_AdditionalBuildIdAndGreaterUpdateTimestamp_MergesBuildIdsAndSetsMaxUpdateTimestamp(t *testing.T) { a := mkSingleSetData("0.1", buildID(6, "0.1")) b := mkSingleSetData("0.1", buildID(1, "0.1"), buildID(3, "0.2")) expected := mkSingleSetData("0.1", buildID(3, "0.2"), buildID(6, "0.1")) @@ -134,7 +134,7 @@ func TestSetMerge_DifferentSetIDs_MergesSetIDs(t *testing.T) { expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{{ SetIds: []string{"0.1", "0.2"}, - BuildIds: mkBuildIDs(buildID(1, "0.1"), buildID(6, "0.2")), + BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(6, "0.2")), DefaultUpdateTimestamp: fromWallClock(6), }}, DefaultUpdateTimestamp: fromWallClock(6), @@ -162,7 +162,7 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) { expected := &persistencespb.VersioningData{ VersionSets: []*persistencespb.CompatibleVersionSet{{ SetIds: []string{"0.1", "0.2"}, - BuildIds: mkBuildIDs(buildID(1, "0.1"), buildID(3, "0.2")), + BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(3, "0.2")), DefaultUpdateTimestamp: fromWallClock(3), }}, DefaultUpdateTimestamp: fromWallClock(3), @@ -171,7 +171,7 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) { assert.Equal(t, expected, MergeVersioningData(b, a)) } -func TestSetMerge_BuildIDPromoted_PreservesSetDefault(t *testing.T) { +func TestSetMerge_BuildIdPromoted_PreservesSetDefault(t *testing.T) { a := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) a.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3) b := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2")) diff --git a/service/matching/version_sets_test.go b/service/matching/version_sets_test.go index f077dac2ea4..2d55ef738ac 100644 --- a/service/matching/version_sets_test.go +++ b/service/matching/version_sets_test.go @@ -41,8 +41,8 @@ import ( func mkNewSet(id string, clock clockspb.HybridLogicalClock) *persistencespb.CompatibleVersionSet { return &persistencespb.CompatibleVersionSet{ - SetIds: []string{hashBuildID(id)}, - BuildIds: []*persistencespb.BuildID{{Id: id, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, + SetIds: []string{hashBuildId(id)}, + BuildIds: []*persistencespb.BuildId{{Id: id, State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, DefaultUpdateTimestamp: &clock, } } @@ -113,18 +113,18 @@ func TestNewDefaultUpdate(t *testing.T) { DefaultUpdateTimestamp: &nextClock, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{{Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{{Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, DefaultUpdateTimestamp: &clock, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, DefaultUpdateTimestamp: &clock, }, { - SetIds: []string{hashBuildID("2")}, - BuildIds: []*persistencespb.BuildID{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}}, + SetIds: []string{hashBuildId("2")}, + BuildIds: []*persistencespb.BuildId{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}}, DefaultUpdateTimestamp: &nextClock, }, }, @@ -149,8 +149,8 @@ func TestNewDefaultSetUpdateOfEmptyData(t *testing.T) { DefaultUpdateTimestamp: &nextClock, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}}, DefaultUpdateTimestamp: &nextClock, }, }, @@ -172,13 +172,13 @@ func TestNewDefaultSetUpdateCompatWithCurDefault(t *testing.T) { DefaultUpdateTimestamp: &nextClock, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{{Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{{Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, DefaultUpdateTimestamp: &clock, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{ {Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}, {Id: "1.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}, }, @@ -203,13 +203,13 @@ func TestNewDefaultSetUpdateCompatWithNonDefaultSet(t *testing.T) { DefaultUpdateTimestamp: &nextClock, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, DefaultUpdateTimestamp: &clock, }, { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{ {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}, {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}, }, @@ -234,16 +234,16 @@ func TestNewCompatibleWithVerInOlderSet(t *testing.T) { DefaultUpdateTimestamp: &clock, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{ {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}, {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &nextClock}, }, DefaultUpdateTimestamp: &nextClock, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock}}, DefaultUpdateTimestamp: &clock, }, }, @@ -272,8 +272,8 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { DefaultUpdateTimestamp: &clock0, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{ {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock1}, {Id: "0.2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, @@ -281,8 +281,8 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { DefaultUpdateTimestamp: &clock2, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, DefaultUpdateTimestamp: &clock0, }, }, @@ -299,8 +299,8 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { DefaultUpdateTimestamp: &clock0, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{ {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock1}, {Id: "0.2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, @@ -309,8 +309,8 @@ func TestNewCompatibleWithNonDefaultSetUpdate(t *testing.T) { DefaultUpdateTimestamp: &clock3, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, DefaultUpdateTimestamp: &clock0, }, }, @@ -343,20 +343,20 @@ func TestMakeExistingSetDefault(t *testing.T) { DefaultUpdateTimestamp: &clock1, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{ {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, }, DefaultUpdateTimestamp: &clock0, }, { - SetIds: []string{hashBuildID("2")}, - BuildIds: []*persistencespb.BuildID{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, + SetIds: []string{hashBuildId("2")}, + BuildIds: []*persistencespb.BuildId{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, DefaultUpdateTimestamp: &clock0, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, DefaultUpdateTimestamp: &clock0, }, }, @@ -374,18 +374,18 @@ func TestMakeExistingSetDefault(t *testing.T) { DefaultUpdateTimestamp: &clock2, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("2")}, - BuildIds: []*persistencespb.BuildID{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, + SetIds: []string{hashBuildId("2")}, + BuildIds: []*persistencespb.BuildId{{Id: "2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, DefaultUpdateTimestamp: &clock0, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, DefaultUpdateTimestamp: &clock0, }, { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{ {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, }, @@ -421,13 +421,13 @@ func TestLimitsMaxSets(t *testing.T) { assert.ErrorAs(t, err, &failedPrecondition) } -func TestLimitsMaxBuildIDs(t *testing.T) { +func TestLimitsMaxBuildIds(t *testing.T) { clock := hlc.Zero(1) - maxBuildIDs := 10 - data := mkInitialData(maxBuildIDs, clock) + maxBuildIds := 10 + data := mkInitialData(maxBuildIds, clock) req := mkNewDefReq("10") - _, err := UpdateVersionSets(clock, data, req, 0, maxBuildIDs) + _, err := UpdateVersionSets(clock, data, req, 0, maxBuildIds) var failedPrecondition *serviceerror.FailedPrecondition assert.ErrorAs(t, err, &failedPrecondition) } @@ -453,8 +453,8 @@ func TestPromoteWithinVersion(t *testing.T) { DefaultUpdateTimestamp: &clock0, VersionSets: []*persistencespb.CompatibleVersionSet{ { - SetIds: []string{hashBuildID("0")}, - BuildIds: []*persistencespb.BuildID{ + SetIds: []string{hashBuildId("0")}, + BuildIds: []*persistencespb.BuildId{ {Id: "0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}, {Id: "0.2", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock2}, {Id: "0.1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock1}, @@ -462,8 +462,8 @@ func TestPromoteWithinVersion(t *testing.T) { DefaultUpdateTimestamp: &clock3, }, { - SetIds: []string{hashBuildID("1")}, - BuildIds: []*persistencespb.BuildID{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, + SetIds: []string{hashBuildId("1")}, + BuildIds: []*persistencespb.BuildId{{Id: "1", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &clock0}}, DefaultUpdateTimestamp: &clock0, }, }, diff --git a/tests/advanced_visibility_test.go b/tests/advanced_visibility_test.go index c70f699b973..dd7b24d0c34 100644 --- a/tests/advanced_visibility_test.go +++ b/tests/advanced_visibility_test.go @@ -1816,6 +1816,116 @@ func (s *advancedVisibilitySuite) Test_LongWorkflowID() { s.testHelperForReadOnce(we.GetRunId(), query, false) } +func (s *advancedVisibilitySuite) Test_BuildIdIndexedOnCompletion() { + ctx := NewContext() + id := s.randomizeStr(s.T().Name()) + workflowType := "integration-build-id" + taskQueue := s.randomizeStr(s.T().Name()) + + request := s.createStartWorkflowExecutionRequest(id, workflowType, taskQueue) + _, err := s.engine.StartWorkflowExecution(ctx, request) + s.NoError(err) + + pollRequest := &workflowservice.PollWorkflowTaskQueueRequest{Namespace: s.namespace, TaskQueue: request.TaskQueue, Identity: id} + task, err := s.engine.PollWorkflowTaskQueue(ctx, pollRequest) + s.NoError(err) + s.Greater(len(task.TaskToken), 0) + _, err = s.engine.RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Namespace: s.namespace, + Identity: id, + WorkerVersionStamp: &commonpb.WorkerVersionStamp{BuildId: "1.0"}, + TaskToken: task.TaskToken, + }) + s.NoError(err) + + buildIDs := s.getBuildIds(ctx, task.WorkflowExecution) + s.Equal([]string{"1.0"}, buildIDs) + + _, err = s.engine.SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{Namespace: s.namespace, WorkflowExecution: task.WorkflowExecution, SignalName: "continue"}) + s.NoError(err) + + task, err = s.engine.PollWorkflowTaskQueue(ctx, pollRequest) + s.NoError(err) + s.Greater(len(task.TaskToken), 0) + _, err = s.engine.RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Namespace: s.namespace, + Identity: id, + WorkerVersionStamp: &commonpb.WorkerVersionStamp{BuildId: "1.1"}, + TaskToken: task.TaskToken, + Commands: []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ + ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowType: task.WorkflowType, + TaskQueue: request.TaskQueue, + }, + }, + }}, + }) + s.NoError(err) + + buildIDs = s.getBuildIds(ctx, task.WorkflowExecution) + s.Equal([]string{"1.0", "1.1"}, buildIDs) + + task, err = s.engine.PollWorkflowTaskQueue(ctx, pollRequest) + s.NoError(err) + s.Greater(len(task.TaskToken), 0) + + buildIDs = s.getBuildIds(ctx, task.WorkflowExecution) + s.Equal([]string{}, buildIDs) + + _, err = s.engine.RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Namespace: s.namespace, + Identity: id, + WorkerVersionStamp: &commonpb.WorkerVersionStamp{BuildId: "1.2"}, + TaskToken: task.TaskToken, + Commands: []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, + }, + }}, + }) + s.NoError(err) + + buildIDs = s.getBuildIds(ctx, task.WorkflowExecution) + s.Equal([]string{"1.2"}, buildIDs) + + for minor := 1; minor <= 2; minor++ { + s.Eventually(func() bool { + response, err := s.engine.ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: s.namespace, + Query: fmt.Sprintf("BuildIds = '1.%d'", minor), + PageSize: defaultPageSize, + }) + if err != nil { + return false + } + if len(response.Executions) == 0 { + return false + } + s.Equal(id, response.Executions[0].Execution.WorkflowId) + return true + }, 10*time.Second, 100*time.Millisecond) + } +} + +func (s *advancedVisibilitySuite) getBuildIds(ctx context.Context, execution *commonpb.WorkflowExecution) []string { + description, err := s.engine.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.namespace, + Execution: execution, + }) + s.NoError(err) + attr, found := description.WorkflowExecutionInfo.SearchAttributes.IndexedFields[searchattribute.BuildIds] + if !found { + return []string{} + } + var buildIDs []string + err = payload.Decode(attr, &buildIDs) + s.NoError(err) + return buildIDs +} + func (s *advancedVisibilitySuite) updateMaxResultWindow() { esConfig := s.testClusterConfig.ESConfig diff --git a/tests/xdc/base.go b/tests/xdc/base.go index 373a22935ed..4a3d56d31e9 100644 --- a/tests/xdc/base.go +++ b/tests/xdc/base.go @@ -141,19 +141,3 @@ func (s *xdcBaseSuite) setupTest() { // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil s.Assertions = require.New(s.T()) } - -func (s *xdcBaseSuite) retry(attempts int, interval time.Duration, fn func() error) { - var lastErr error - for attempt := 1; attempt <= attempts; attempt++ { - lastErr = fn() - if lastErr == nil { - return - } - time.Sleep(interval) - } - s.FailNow(fmt.Sprintf("%v after %v attempts", lastErr, attempts)) -} - -func (s *xdcBaseSuite) retryReasonably(fn func() error) { - s.retry(30, 500*time.Millisecond, fn) -} diff --git a/tests/xdc/user_data_replication_test.go b/tests/xdc/user_data_replication_test.go index 2c5d903a381..9e1a6d2e7f7 100644 --- a/tests/xdc/user_data_replication_test.go +++ b/tests/xdc/user_data_replication_test.go @@ -29,7 +29,6 @@ package xdc import ( - "errors" "flag" "fmt" "testing" @@ -102,7 +101,7 @@ func (s *userDataReplicationTestSuite) TestUserDataIsReplicatedFromActiveToPassi }) s.Require().NoError(err) - s.retryReasonably(func() error { + s.Eventually(func() bool { // Call matching directly in case frontend is configured to redirect API calls to the active cluster response, err := standbyMatchingClient.GetWorkerBuildIdCompatibility(tests.NewContext(), &matchingservice.GetWorkerBuildIdCompatibilityRequest{ NamespaceId: description.GetNamespaceInfo().Id, @@ -112,13 +111,10 @@ func (s *userDataReplicationTestSuite) TestUserDataIsReplicatedFromActiveToPassi }, }) if err != nil { - return err + return false } - if len(response.GetResponse().GetMajorVersionSets()) != 1 { - return errors.New("passive has no versioning data") - } - return nil - }) + return len(response.GetResponse().GetMajorVersionSets()) == 1 + }, 15*time.Second, 500*time.Millisecond) } func (s *userDataReplicationTestSuite) TestUserDataIsReplicatedFromPassiveToActive() { @@ -140,29 +136,26 @@ func (s *userDataReplicationTestSuite) TestUserDataIsReplicatedFromPassiveToActi standbyFrontendClient := s.cluster2.GetFrontendClient() s.cluster1.GetExecutionManager() - s.retryReasonably(func() error { + s.Eventually(func() bool { // Call matching directly in case frontend is configured to redirect API calls to the active cluster _, err = standbyFrontendClient.UpdateWorkerBuildIdCompatibility(tests.NewContext(), &workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ Namespace: namespace, TaskQueue: taskQueue, Operation: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewBuildIdInNewDefaultSet{AddNewBuildIdInNewDefaultSet: "0.1"}, }) - return err - }) + return err == nil + }, 15*time.Second, 500*time.Millisecond) - s.retryReasonably(func() error { + s.Eventually(func() bool { response, err := activeFrontendClient.GetWorkerBuildIdCompatibility(tests.NewContext(), &workflowservice.GetWorkerBuildIdCompatibilityRequest{ Namespace: namespace, TaskQueue: taskQueue, }) if err != nil { - return err + return false } - if len(response.GetMajorVersionSets()) != 1 { - return errors.New("active has no versioning data") - } - return nil - }) + return len(response.GetMajorVersionSets()) == 1 + }, 15*time.Second, 500*time.Millisecond) } func (s *userDataReplicationTestSuite) TestUserDataEntriesAreReplicatedOnDemand() {