diff --git a/tests/integrationbase.go b/tests/integrationbase.go index 680dad651c8..dfd41f5de32 100644 --- a/tests/integrationbase.go +++ b/tests/integrationbase.go @@ -28,8 +28,11 @@ import ( "bytes" "context" "encoding/binary" + "encoding/json" "fmt" "os" + "reflect" + "strconv" "strings" "time" @@ -231,13 +234,13 @@ func (s *IntegrationBase) randomizeStr(id string) string { func (s *IntegrationBase) printWorkflowHistory(namespace string, execution *commonpb.WorkflowExecution) { events := s.getHistory(namespace, execution) - common.PrettyPrint(events) + _, _ = fmt.Println(s.formatHistory(&historypb.History{Events: events})) } //lint:ignore U1000 used for debugging. func (s *IntegrationBase) printWorkflowHistoryCompact(namespace string, execution *commonpb.WorkflowExecution) { events := s.getHistory(namespace, execution) - fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events})) + _, _ = fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events})) } func (s *IntegrationBase) getHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent { @@ -340,7 +343,7 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin s.T().Helper() var sb strings.Builder for _, event := range history.Events { - sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType())) + _, _ = sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType())) } if sb.Len() > 0 { return sb.String()[:sb.Len()-1] @@ -348,14 +351,138 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin return "" } -func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) { +func (s *IntegrationBase) formatHistory(history *historypb.History) string { s.T().Helper() - expectedHistoryTrimmed := strings.Trim(expectedHistory, "\n") - actualHistoryStr := s.formatHistoryCompact(actualHistory) - s.Equal(expectedHistoryTrimmed, actualHistoryStr) + var sb strings.Builder + for _, event := range history.Events { + eventAttrs := reflect.ValueOf(event.Attributes).Elem().Field(0).Elem().Interface() + eventAttrsMap := s.structToMap(eventAttrs) + eventAttrsJson, err := json.Marshal(eventAttrsMap) + s.NoError(err) + _, _ = sb.WriteString(fmt.Sprintf("%3d %s %s\n", event.GetEventId(), event.GetEventType(), string(eventAttrsJson))) + } + if sb.Len() > 0 { + return sb.String()[:sb.Len()-1] + } + return "" +} + +func (s *IntegrationBase) structToMap(strct any) map[string]any { + + strctV := reflect.ValueOf(strct) + strctT := strctV.Type() + + ret := map[string]any{} + + for i := 0; i < strctV.NumField(); i++ { + field := strctV.Field(i) + var fieldData any + + if field.Kind() == reflect.Pointer && field.IsNil() { + continue + } else if field.Kind() == reflect.Pointer && field.Elem().Kind() == reflect.Struct { + fieldData = s.structToMap(field.Elem().Interface()) + } else if field.Kind() == reflect.Struct { + fieldData = s.structToMap(field.Interface()) + } else { + fieldData = field.Interface() + } + ret[strctT.Field(i).Name] = fieldData + } + + return ret } func (s *IntegrationBase) EqualHistoryEvents(expectedHistory string, actualHistoryEvents []*historypb.HistoryEvent) { s.T().Helper() s.EqualHistory(expectedHistory, &historypb.History{Events: actualHistoryEvents}) } + +func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) { + s.T().Helper() + expectedCompactHistory, expectedEventsAttributes := s.parseHistory(expectedHistory) + actualCompactHistory := s.formatHistoryCompact(actualHistory) + s.Equal(expectedCompactHistory, actualCompactHistory) + for _, actualHistoryEvent := range actualHistory.Events { + if expectedEventAttributes, ok := expectedEventsAttributes[actualHistoryEvent.EventId]; ok { + actualEventAttributes := reflect.ValueOf(actualHistoryEvent.Attributes).Elem().Field(0).Elem() + s.equalStructToMap(expectedEventAttributes, actualEventAttributes, actualHistoryEvent.EventId, "") + } + } +} + +func (s *IntegrationBase) equalStructToMap(expectedMap map[string]any, actualStructV reflect.Value, eventID int64, attrPrefix string) { + s.T().Helper() + + for attrName, expectedValue := range expectedMap { + actualFieldV := actualStructV.FieldByName(attrName) + if actualFieldV.Kind() == reflect.Invalid { + s.Failf("", "Expected property %s%s wasn't found for EventID=%v", attrPrefix, attrName, eventID) + } + + if ep, ok := expectedValue.(map[string]any); ok { + if actualFieldV.IsNil() { + s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was nil", attrPrefix, attrName, eventID) + } + if actualFieldV.Kind() == reflect.Pointer { + actualFieldV = actualFieldV.Elem() + } + if actualFieldV.Kind() != reflect.Struct { + s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was of type %s", attrPrefix, attrName, eventID, actualFieldV.Type().String()) + } + s.equalStructToMap(ep, actualFieldV, eventID, attrPrefix+attrName+".") + continue + } + actualFieldValue := actualFieldV.Interface() + s.EqualValues(expectedValue, actualFieldValue, "Values of %s%s property are not equal for EventID=%v", attrPrefix, attrName, eventID) + } +} + +// parseHistory accept history in a formatHistory format and returns compact history string and map of eventID to map of event attributes. +func (s *IntegrationBase) parseHistory(expectedHistory string) (string, map[int64]map[string]any) { + s.T().Helper() + h := &historypb.History{} + eventsAttrs := make(map[int64]map[string]any) + prevEventID := 0 + for lineNum, eventLine := range strings.Split(expectedHistory, "\n") { + fields := strings.Fields(eventLine) + if len(fields) == 0 { + continue + } + if len(fields) < 2 { + s.FailNowf("", "Not enough fields on line %d", lineNum+1) + } + eventID, err := strconv.Atoi(fields[0]) + if err != nil { + s.FailNowf(err.Error(), "Failed to parse EventID on line %d", lineNum+1) + } + if eventID != prevEventID+1 && prevEventID != 0 { + s.FailNowf("", "Wrong EventID sequence after EventID %d on line %d", prevEventID, lineNum+1) + } + prevEventID = eventID + eventType, ok := enumspb.EventType_value[fields[1]] + if !ok { + s.FailNowf("", "Unknown event type %s for EventID=%d", fields[1], lineNum+1) + } + h.Events = append(h.Events, &historypb.HistoryEvent{ + EventId: int64(eventID), + EventType: enumspb.EventType(eventType), + }) + var jb strings.Builder + for i := 2; i < len(fields); i++ { + if strings.HasPrefix(fields[i], "//") { + break + } + _, _ = jb.WriteString(fields[i]) + } + if jb.Len() > 0 { + var eventAttrs map[string]any + err := json.Unmarshal([]byte(jb.String()), &eventAttrs) + if err != nil { + s.FailNowf(err.Error(), "Failed to unmarshal attributes %q for EventID=%d", jb.String(), lineNum+1) + } + eventsAttrs[int64(eventID)] = eventAttrs + } + } + return s.formatHistoryCompact(h), eventsAttrs +} diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index bb68cafa56c..39dbd10d45e 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -314,15 +314,16 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC 3 WorkflowTaskStarted 4 WorkflowTaskCompleted 5 ActivityTaskScheduled - 6 WorkflowTaskScheduled + 6 WorkflowTaskScheduled // Normal WT events were written for speculative WT at complete. 7 WorkflowTaskStarted 8 WorkflowTaskCompleted 9 WorkflowExecutionUpdateAccepted - 10 WorkflowExecutionUpdateCompleted + 10 WorkflowExecutionUpdateCompleted {"AcceptedEventId": 9} 11 WorkflowTaskScheduled 12 WorkflowTaskStarted 13 WorkflowTaskCompleted - 14 WorkflowExecutionCompleted`, events) + 14 WorkflowExecutionCompleted +`, events) }) } } @@ -2965,8 +2966,6 @@ func (s *integrationSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbeat( s.Equal(4, wtHandlerCalls) s.Equal(4, msgHandlerCalls) - s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution()) - events := s.getHistory(s.namespace, tv.WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted @@ -3636,15 +3635,11 @@ func (s *integrationSuite) TestUpdateWorkflow_CompletedSpeculativeWorkflowTask_D updateResult := <-updateResultCh s.EqualValues(tv.String("success-result", "1"), decodeString(s, updateResult.GetOutcome().GetSuccess())) - s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution()) - if tc.CloseShard { // Close shard to make sure that for completed updates deduplication works even after shard reload. s.closeShard(tv.WorkflowID()) } - s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution()) - // Send second update with the same ID. updateResultCh2 := make(chan *workflowservice.UpdateWorkflowExecutionResponse) go func() {