Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comments and event attributes in expected history in tests #4426

Merged
merged 6 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 134 additions & 7 deletions tests/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -231,13 +234,13 @@ func (s *IntegrationBase) randomizeStr(id string) string {

func (s *IntegrationBase) printWorkflowHistory(namespace string, execution *commonpb.WorkflowExecution) {
events := s.getHistory(namespace, execution)
common.PrettyPrint(events)
_, _ = fmt.Println(s.formatHistory(&historypb.History{Events: events}))
}

//lint:ignore U1000 used for debugging.
func (s *IntegrationBase) printWorkflowHistoryCompact(namespace string, execution *commonpb.WorkflowExecution) {
events := s.getHistory(namespace, execution)
fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events}))
_, _ = fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events}))
}

func (s *IntegrationBase) getHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
Expand Down Expand Up @@ -340,22 +343,146 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin
s.T().Helper()
var sb strings.Builder
for _, event := range history.Events {
sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType()))
_, _ = sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType()))
}
if sb.Len() > 0 {
return sb.String()[:sb.Len()-1]
}
return ""
}

func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) {
func (s *IntegrationBase) formatHistory(history *historypb.History) string {
s.T().Helper()
expectedHistoryTrimmed := strings.Trim(expectedHistory, "\n")
actualHistoryStr := s.formatHistoryCompact(actualHistory)
s.Equal(expectedHistoryTrimmed, actualHistoryStr)
var sb strings.Builder
for _, event := range history.Events {
eventAttrs := reflect.ValueOf(event.Attributes).Elem().Field(0).Elem().Interface()
eventAttrsMap := s.structToMap(eventAttrs)
eventAttrsJson, err := json.Marshal(eventAttrsMap)
s.NoError(err)
_, _ = sb.WriteString(fmt.Sprintf("%3d %s %s\n", event.GetEventId(), event.GetEventType(), string(eventAttrsJson)))
}
if sb.Len() > 0 {
return sb.String()[:sb.Len()-1]
}
return ""
}

func (s *IntegrationBase) structToMap(strct any) map[string]any {

strctV := reflect.ValueOf(strct)
strctT := strctV.Type()

ret := map[string]any{}

for i := 0; i < strctV.NumField(); i++ {
field := strctV.Field(i)
var fieldData any

if field.Kind() == reflect.Pointer && field.IsNil() {
continue
} else if field.Kind() == reflect.Pointer && field.Elem().Kind() == reflect.Struct {
fieldData = s.structToMap(field.Elem().Interface())
} else if field.Kind() == reflect.Struct {
fieldData = s.structToMap(field.Interface())
} else {
fieldData = field.Interface()
}
ret[strctT.Field(i).Name] = fieldData
}

return ret
}

func (s *IntegrationBase) EqualHistoryEvents(expectedHistory string, actualHistoryEvents []*historypb.HistoryEvent) {
s.T().Helper()
s.EqualHistory(expectedHistory, &historypb.History{Events: actualHistoryEvents})
}

func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) {
s.T().Helper()
expectedCompactHistory, expectedEventsAttributes := s.parseHistory(expectedHistory)
actualCompactHistory := s.formatHistoryCompact(actualHistory)
s.Equal(expectedCompactHistory, actualCompactHistory)
for _, actualHistoryEvent := range actualHistory.Events {
if expectedEventAttributes, ok := expectedEventsAttributes[actualHistoryEvent.EventId]; ok {
actualEventAttributes := reflect.ValueOf(actualHistoryEvent.Attributes).Elem().Field(0).Elem()
s.equalStructToMap(expectedEventAttributes, actualEventAttributes, actualHistoryEvent.EventId, "")
}
}
}

func (s *IntegrationBase) equalStructToMap(expectedMap map[string]any, actualStructV reflect.Value, eventID int64, attrPrefix string) {
s.T().Helper()

for attrName, expectedValue := range expectedMap {
actualFieldV := actualStructV.FieldByName(attrName)
if actualFieldV.Kind() == reflect.Invalid {
s.Failf("", "Expected property %s%s wasn't found for EventID=%v", attrPrefix, attrName, eventID)
}

if ep, ok := expectedValue.(map[string]any); ok {
if actualFieldV.IsNil() {
s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was nil", attrPrefix, attrName, eventID)
}
if actualFieldV.Kind() == reflect.Pointer {
actualFieldV = actualFieldV.Elem()
}
if actualFieldV.Kind() != reflect.Struct {
s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was of type %s", attrPrefix, attrName, eventID, actualFieldV.Type().String())
}
s.equalStructToMap(ep, actualFieldV, eventID, attrPrefix+attrName+".")
continue
}
actualFieldValue := actualFieldV.Interface()
s.EqualValues(expectedValue, actualFieldValue, "Values of %s%s property are not equal for EventID=%v", attrPrefix, attrName, eventID)
}
}

// parseHistory accept history in a formatHistory format and returns compact history string and map of eventID to map of event attributes.
func (s *IntegrationBase) parseHistory(expectedHistory string) (string, map[int64]map[string]any) {
s.T().Helper()
h := &historypb.History{}
eventsAttrs := make(map[int64]map[string]any)
prevEventID := 0
for lineNum, eventLine := range strings.Split(expectedHistory, "\n") {
fields := strings.Fields(eventLine)
if len(fields) == 0 {
continue
}
if len(fields) < 2 {
s.FailNowf("", "Not enough fields on line %d", lineNum+1)
}
eventID, err := strconv.Atoi(fields[0])
if err != nil {
s.FailNowf(err.Error(), "Failed to parse EventID on line %d", lineNum+1)
}
if eventID != prevEventID+1 && prevEventID != 0 {
s.FailNowf("", "Wrong EventID sequence after EventID %d on line %d", prevEventID, lineNum+1)
}
prevEventID = eventID
eventType, ok := enumspb.EventType_value[fields[1]]
if !ok {
s.FailNowf("", "Unknown event type %s for EventID=%d", fields[1], lineNum+1)
}
h.Events = append(h.Events, &historypb.HistoryEvent{
EventId: int64(eventID),
EventType: enumspb.EventType(eventType),
})
var jb strings.Builder
for i := 2; i < len(fields); i++ {
if strings.HasPrefix(fields[i], "//") {
break
}
_, _ = jb.WriteString(fields[i])
}
if jb.Len() > 0 {
var eventAttrs map[string]any
err := json.Unmarshal([]byte(jb.String()), &eventAttrs)
if err != nil {
s.FailNowf(err.Error(), "Failed to unmarshal attributes %q for EventID=%d", jb.String(), lineNum+1)
}
eventsAttrs[int64(eventID)] = eventAttrs
}
}
return s.formatHistoryCompact(h), eventsAttrs
}
13 changes: 4 additions & 9 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,16 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
6 WorkflowTaskScheduled // Normal WT events were written for speculative WT at complete.
7 WorkflowTaskStarted
8 WorkflowTaskCompleted
9 WorkflowExecutionUpdateAccepted
10 WorkflowExecutionUpdateCompleted
10 WorkflowExecutionUpdateCompleted {"AcceptedEventId": 9}
11 WorkflowTaskScheduled
12 WorkflowTaskStarted
13 WorkflowTaskCompleted
14 WorkflowExecutionCompleted`, events)
14 WorkflowExecutionCompleted
`, events)
})
}
}
Expand Down Expand Up @@ -2965,8 +2966,6 @@ func (s *integrationSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbeat(
s.Equal(4, wtHandlerCalls)
s.Equal(4, msgHandlerCalls)

s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution())

events := s.getHistory(s.namespace, tv.WorkflowExecution())
s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
Expand Down Expand Up @@ -3636,15 +3635,11 @@ func (s *integrationSuite) TestUpdateWorkflow_CompletedSpeculativeWorkflowTask_D
updateResult := <-updateResultCh
s.EqualValues(tv.String("success-result", "1"), decodeString(s, updateResult.GetOutcome().GetSuccess()))

s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution())

if tc.CloseShard {
// Close shard to make sure that for completed updates deduplication works even after shard reload.
s.closeShard(tv.WorkflowID())
}

s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution())

// Send second update with the same ID.
updateResultCh2 := make(chan *workflowservice.UpdateWorkflowExecutionResponse)
go func() {
Expand Down