From d3087b17ee08cc8437c54d08a409dd9875501021 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Wed, 31 May 2023 17:57:54 -0700 Subject: [PATCH 1/6] Add abiliy to add comments to exepcted history in tests --- tests/integrationbase.go | 51 +++++++++++++++++++++++++++++++++-- tests/update_workflow_test.go | 5 ++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/tests/integrationbase.go b/tests/integrationbase.go index 680dad651c8..46936918d78 100644 --- a/tests/integrationbase.go +++ b/tests/integrationbase.go @@ -350,12 +350,59 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) { s.T().Helper() - expectedHistoryTrimmed := strings.Trim(expectedHistory, "\n") + expectedHistorySanitized := s.sanitizeHistory(expectedHistory) actualHistoryStr := s.formatHistoryCompact(actualHistory) - s.Equal(expectedHistoryTrimmed, actualHistoryStr) + s.Equal(expectedHistorySanitized, actualHistoryStr) } func (s *IntegrationBase) EqualHistoryEvents(expectedHistory string, actualHistoryEvents []*historypb.HistoryEvent) { s.T().Helper() s.EqualHistory(expectedHistory, &historypb.History{Events: actualHistoryEvents}) } + +// sanitizeHistory removes all empty new lines and comments from expectedHistory string. +func (s *IntegrationBase) sanitizeHistory(expectedHistory string) string { + type state int + const ( + stateStart state = iota + stateNormal + stateComment + stateNewLine + ) + + var ret strings.Builder + st := stateStart + spaceCounter := 0 + for _, r := range expectedHistory { + switch r { + case '\n': + if st == stateStart { + continue + } + spaceCounter = 0 + st = stateNewLine + case '/': + spaceCounter = 0 + st = stateComment + case ' ': + if st == stateComment { + continue + } + spaceCounter++ + default: + if st == stateComment { + continue + } + if st == stateNewLine { + ret.WriteRune('\n') + } + if spaceCounter > 0 { + ret.WriteString(strings.Repeat(" ", spaceCounter)) + spaceCounter = 0 + } + ret.WriteRune(r) + st = stateNormal + } + } + return ret.String() +} diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index bb68cafa56c..d485368dbbf 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -314,7 +314,7 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC 3 WorkflowTaskStarted 4 WorkflowTaskCompleted 5 ActivityTaskScheduled - 6 WorkflowTaskScheduled + 6 WorkflowTaskScheduled // Was speculative WT but was converted to normal WT. 7 WorkflowTaskStarted 8 WorkflowTaskCompleted 9 WorkflowExecutionUpdateAccepted @@ -322,7 +322,8 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC 11 WorkflowTaskScheduled 12 WorkflowTaskStarted 13 WorkflowTaskCompleted - 14 WorkflowExecutionCompleted`, events) + 14 WorkflowExecutionCompleted +`, events) }) } } From a188a2c752b61cae6e07c9dd7045b45f14a603b9 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Wed, 31 May 2023 18:48:30 -0700 Subject: [PATCH 2/6] Fix lint errors --- tests/integrationbase.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integrationbase.go b/tests/integrationbase.go index 46936918d78..2c9fff5ff90 100644 --- a/tests/integrationbase.go +++ b/tests/integrationbase.go @@ -394,13 +394,13 @@ func (s *IntegrationBase) sanitizeHistory(expectedHistory string) string { continue } if st == stateNewLine { - ret.WriteRune('\n') + _, _ = ret.WriteRune('\n') } if spaceCounter > 0 { - ret.WriteString(strings.Repeat(" ", spaceCounter)) + _, _ = ret.WriteString(strings.Repeat(" ", spaceCounter)) spaceCounter = 0 } - ret.WriteRune(r) + _, _ = ret.WriteRune(r) st = stateNormal } } From 90d45f67cd997098be38ae8749b133bfb779cef5 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 5 Jun 2023 18:39:28 -0700 Subject: [PATCH 3/6] Add support of attributes --- tests/integrationbase.go | 162 +++++++++++++++++++++++++--------- tests/update_workflow_test.go | 5 +- 2 files changed, 123 insertions(+), 44 deletions(-) diff --git a/tests/integrationbase.go b/tests/integrationbase.go index 2c9fff5ff90..52d1c81b34b 100644 --- a/tests/integrationbase.go +++ b/tests/integrationbase.go @@ -28,8 +28,11 @@ import ( "bytes" "context" "encoding/binary" + "encoding/json" "fmt" "os" + "reflect" + "strconv" "strings" "time" @@ -231,7 +234,7 @@ func (s *IntegrationBase) randomizeStr(id string) string { func (s *IntegrationBase) printWorkflowHistory(namespace string, execution *commonpb.WorkflowExecution) { events := s.getHistory(namespace, execution) - common.PrettyPrint(events) + fmt.Println(s.formatHistory(&historypb.History{Events: events})) } //lint:ignore U1000 used for debugging. @@ -348,11 +351,46 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin return "" } -func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) { +func (s *IntegrationBase) formatHistory(history *historypb.History) string { s.T().Helper() - expectedHistorySanitized := s.sanitizeHistory(expectedHistory) - actualHistoryStr := s.formatHistoryCompact(actualHistory) - s.Equal(expectedHistorySanitized, actualHistoryStr) + var sb strings.Builder + for _, event := range history.Events { + eventAttrs := reflect.ValueOf(event.Attributes).Elem().Field(0).Elem().Interface() + eventAttrsMap := s.structToMap(eventAttrs) + eventAttrsJson, err := json.Marshal(eventAttrsMap) + s.NoError(err) + sb.WriteString(fmt.Sprintf("%3d %s %s\n", event.GetEventId(), event.GetEventType(), string(eventAttrsJson))) + } + if sb.Len() > 0 { + return sb.String()[:sb.Len()-1] + } + return "" +} + +func (s *IntegrationBase) structToMap(strct any) map[string]any { + + strctV := reflect.ValueOf(strct) + strctT := strctV.Type() + + ret := map[string]any{} + + for i := 0; i < strctV.NumField(); i++ { + field := strctV.Field(i) + var fieldData any + + if field.Kind() == reflect.Pointer && field.IsNil() { + continue + } else if field.Kind() == reflect.Pointer && field.Elem().Kind() == reflect.Struct { + fieldData = s.structToMap(field.Elem().Interface()) + } else if field.Kind() == reflect.Struct { + fieldData = s.structToMap(field.Interface()) + } else { + fieldData = field.Interface() + } + ret[strctT.Field(i).Name] = fieldData + } + + return ret } func (s *IntegrationBase) EqualHistoryEvents(expectedHistory string, actualHistoryEvents []*historypb.HistoryEvent) { @@ -360,49 +398,89 @@ func (s *IntegrationBase) EqualHistoryEvents(expectedHistory string, actualHisto s.EqualHistory(expectedHistory, &historypb.History{Events: actualHistoryEvents}) } -// sanitizeHistory removes all empty new lines and comments from expectedHistory string. -func (s *IntegrationBase) sanitizeHistory(expectedHistory string) string { - type state int - const ( - stateStart state = iota - stateNormal - stateComment - stateNewLine - ) +func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) { + s.T().Helper() + expectedCompactHistory, expectedEventsAttributes := s.parseHistory(expectedHistory) + actualCompactHistory := s.formatHistoryCompact(actualHistory) + s.Equal(expectedCompactHistory, actualCompactHistory) + for eventID, expectedEventAttributes := range expectedEventsAttributes { + actualEventAttributes := reflect.ValueOf(actualHistory.Events[eventID-1].Attributes).Elem().Field(0).Elem() + s.equalStructToMap(expectedEventAttributes, actualEventAttributes, eventID, "") + } +} + +func (s *IntegrationBase) equalStructToMap(expectedMap map[string]any, actualStructV reflect.Value, eventID int64, attrPrefix string) { + s.T().Helper() + + for attrName, expectedValue := range expectedMap { + actualFieldV := actualStructV.FieldByName(attrName) + if actualFieldV.Kind() == reflect.Invalid { + s.Failf("", "Expected property %s%s wasn't found for EventID=%v", attrPrefix, attrName, eventID) + } - var ret strings.Builder - st := stateStart - spaceCounter := 0 - for _, r := range expectedHistory { - switch r { - case '\n': - if st == stateStart { - continue + if ep, ok := expectedValue.(map[string]any); ok { + if actualFieldV.IsNil() { + s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was nil", attrPrefix, attrName, eventID) } - spaceCounter = 0 - st = stateNewLine - case '/': - spaceCounter = 0 - st = stateComment - case ' ': - if st == stateComment { - continue + if actualFieldV.Kind() == reflect.Pointer { + actualFieldV = actualFieldV.Elem() } - spaceCounter++ - default: - if st == stateComment { - continue + if actualFieldV.Kind() != reflect.Struct { + s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was of type %s", attrPrefix, attrName, eventID, actualFieldV.Type().String()) } - if st == stateNewLine { - _, _ = ret.WriteRune('\n') + s.equalStructToMap(ep, actualFieldV, eventID, attrPrefix+attrName+".") + continue + } + actualFieldValue := actualFieldV.Interface() + s.EqualValues(expectedValue, actualFieldValue, "Values of %s%s property are not equal for EventID=%v", attrPrefix, attrName, eventID) + } +} + +// parseHistory accept history in a formatHistory format and returns compact history string and map of eventID to map of event attributes. +func (s *IntegrationBase) parseHistory(expectedHistory string) (string, map[int64]map[string]any) { + s.T().Helper() + h := &historypb.History{} + eventsAttrs := make(map[int64]map[string]any) + prevEventID := 0 + for lineNum, eventLine := range strings.Split(expectedHistory, "\n") { + fields := strings.Fields(eventLine) + if len(fields) == 0 { + continue + } + if len(fields) < 2 { + s.FailNowf("", "Not enough fields in line %d", lineNum+1) + } + eventID, err := strconv.Atoi(fields[0]) + if err != nil { + s.FailNowf(err.Error(), "Failed to parse EventID in line %d", lineNum+1) + } + if eventID != prevEventID+1 && prevEventID != 0 { + s.FailNowf("", "Wrong EventID sequence after EventID %d in line %d", prevEventID, lineNum+1) + } + prevEventID = eventID + eventType, ok := enumspb.EventType_value[fields[1]] + if !ok { + s.FailNowf("", "Unknown event type %s for EventID=%d", fields[1], lineNum+1) + } + h.Events = append(h.Events, &historypb.HistoryEvent{ + EventId: int64(eventID), + EventType: enumspb.EventType(eventType), + }) + var jb strings.Builder + for i := 2; i < len(fields); i++ { + if strings.HasPrefix(fields[i], "//") { + break } - if spaceCounter > 0 { - _, _ = ret.WriteString(strings.Repeat(" ", spaceCounter)) - spaceCounter = 0 + jb.WriteString(fields[i]) + } + if jb.Len() > 0 { + var eventAttrs map[string]any + err := json.Unmarshal([]byte(jb.String()), &eventAttrs) + if err != nil { + s.FailNowf(err.Error(), "Failed to unmarshal attributes %q for EventID=%d", jb.String(), lineNum+1) } - _, _ = ret.WriteRune(r) - st = stateNormal + eventsAttrs[int64(eventID)] = eventAttrs } } - return ret.String() + return s.formatHistoryCompact(h), eventsAttrs } diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index d485368dbbf..6140f46be07 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -308,17 +308,18 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC events := s.getHistory(s.namespace, tv.WorkflowExecution()) + s.printWorkflowHistory(s.namespace, tv.WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled 3 WorkflowTaskStarted 4 WorkflowTaskCompleted 5 ActivityTaskScheduled - 6 WorkflowTaskScheduled // Was speculative WT but was converted to normal WT. + 6 WorkflowTaskScheduled // Normal WT events were written for speculative WT at complete. 7 WorkflowTaskStarted 8 WorkflowTaskCompleted 9 WorkflowExecutionUpdateAccepted - 10 WorkflowExecutionUpdateCompleted + 10 WorkflowExecutionUpdateCompleted {"AcceptedEventId": 9} 11 WorkflowTaskScheduled 12 WorkflowTaskStarted 13 WorkflowTaskCompleted From a1a9b51e994b5d4a5ae63a9d186dae3926fc50f4 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 6 Jun 2023 14:36:46 -0700 Subject: [PATCH 4/6] Fix lint errors --- tests/integrationbase.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integrationbase.go b/tests/integrationbase.go index 52d1c81b34b..637fa198347 100644 --- a/tests/integrationbase.go +++ b/tests/integrationbase.go @@ -234,13 +234,13 @@ func (s *IntegrationBase) randomizeStr(id string) string { func (s *IntegrationBase) printWorkflowHistory(namespace string, execution *commonpb.WorkflowExecution) { events := s.getHistory(namespace, execution) - fmt.Println(s.formatHistory(&historypb.History{Events: events})) + _, _ = fmt.Println(s.formatHistory(&historypb.History{Events: events})) } //lint:ignore U1000 used for debugging. func (s *IntegrationBase) printWorkflowHistoryCompact(namespace string, execution *commonpb.WorkflowExecution) { events := s.getHistory(namespace, execution) - fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events})) + _, _ = fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events})) } func (s *IntegrationBase) getHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent { @@ -343,7 +343,7 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin s.T().Helper() var sb strings.Builder for _, event := range history.Events { - sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType())) + _, _ = sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType())) } if sb.Len() > 0 { return sb.String()[:sb.Len()-1] @@ -359,7 +359,7 @@ func (s *IntegrationBase) formatHistory(history *historypb.History) string { eventAttrsMap := s.structToMap(eventAttrs) eventAttrsJson, err := json.Marshal(eventAttrsMap) s.NoError(err) - sb.WriteString(fmt.Sprintf("%3d %s %s\n", event.GetEventId(), event.GetEventType(), string(eventAttrsJson))) + _, _ = sb.WriteString(fmt.Sprintf("%3d %s %s\n", event.GetEventId(), event.GetEventType(), string(eventAttrsJson))) } if sb.Len() > 0 { return sb.String()[:sb.Len()-1] @@ -471,7 +471,7 @@ func (s *IntegrationBase) parseHistory(expectedHistory string) (string, map[int6 if strings.HasPrefix(fields[i], "//") { break } - jb.WriteString(fields[i]) + _, _ = jb.WriteString(fields[i]) } if jb.Len() > 0 { var eventAttrs map[string]any From 569a721bcfdb36624026017b4c84812d44045e69 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 6 Jun 2023 21:37:57 -0700 Subject: [PATCH 5/6] Reverse lookup --- tests/integrationbase.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/integrationbase.go b/tests/integrationbase.go index 637fa198347..dfd41f5de32 100644 --- a/tests/integrationbase.go +++ b/tests/integrationbase.go @@ -403,9 +403,11 @@ func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *hi expectedCompactHistory, expectedEventsAttributes := s.parseHistory(expectedHistory) actualCompactHistory := s.formatHistoryCompact(actualHistory) s.Equal(expectedCompactHistory, actualCompactHistory) - for eventID, expectedEventAttributes := range expectedEventsAttributes { - actualEventAttributes := reflect.ValueOf(actualHistory.Events[eventID-1].Attributes).Elem().Field(0).Elem() - s.equalStructToMap(expectedEventAttributes, actualEventAttributes, eventID, "") + for _, actualHistoryEvent := range actualHistory.Events { + if expectedEventAttributes, ok := expectedEventsAttributes[actualHistoryEvent.EventId]; ok { + actualEventAttributes := reflect.ValueOf(actualHistoryEvent.Attributes).Elem().Field(0).Elem() + s.equalStructToMap(expectedEventAttributes, actualEventAttributes, actualHistoryEvent.EventId, "") + } } } @@ -448,14 +450,14 @@ func (s *IntegrationBase) parseHistory(expectedHistory string) (string, map[int6 continue } if len(fields) < 2 { - s.FailNowf("", "Not enough fields in line %d", lineNum+1) + s.FailNowf("", "Not enough fields on line %d", lineNum+1) } eventID, err := strconv.Atoi(fields[0]) if err != nil { - s.FailNowf(err.Error(), "Failed to parse EventID in line %d", lineNum+1) + s.FailNowf(err.Error(), "Failed to parse EventID on line %d", lineNum+1) } if eventID != prevEventID+1 && prevEventID != 0 { - s.FailNowf("", "Wrong EventID sequence after EventID %d in line %d", prevEventID, lineNum+1) + s.FailNowf("", "Wrong EventID sequence after EventID %d on line %d", prevEventID, lineNum+1) } prevEventID = eventID eventType, ok := enumspb.EventType_value[fields[1]] From d28e85c1cd6af2d1e9159275cda32ac452ce668b Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Wed, 7 Jun 2023 10:07:36 -0700 Subject: [PATCH 6/6] Remove debugging prints --- tests/update_workflow_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index 6140f46be07..39dbd10d45e 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -308,7 +308,6 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC events := s.getHistory(s.namespace, tv.WorkflowExecution()) - s.printWorkflowHistory(s.namespace, tv.WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -2967,8 +2966,6 @@ func (s *integrationSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbeat( s.Equal(4, wtHandlerCalls) s.Equal(4, msgHandlerCalls) - s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution()) - events := s.getHistory(s.namespace, tv.WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted @@ -3638,15 +3635,11 @@ func (s *integrationSuite) TestUpdateWorkflow_CompletedSpeculativeWorkflowTask_D updateResult := <-updateResultCh s.EqualValues(tv.String("success-result", "1"), decodeString(s, updateResult.GetOutcome().GetSuccess())) - s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution()) - if tc.CloseShard { // Close shard to make sure that for completed updates deduplication works even after shard reload. s.closeShard(tv.WorkflowID()) } - s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution()) - // Send second update with the same ID. updateResultCh2 := make(chan *workflowservice.UpdateWorkflowExecutionResponse) go func() {