Skip to content

Commit

Permalink
Comments and event attributes in expected history in tests (#4426)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Jun 7, 2023
1 parent b61c3f3 commit a51abcf
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 16 deletions.
141 changes: 134 additions & 7 deletions tests/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -231,13 +234,13 @@ func (s *IntegrationBase) randomizeStr(id string) string {

func (s *IntegrationBase) printWorkflowHistory(namespace string, execution *commonpb.WorkflowExecution) {
events := s.getHistory(namespace, execution)
common.PrettyPrint(events)
_, _ = fmt.Println(s.formatHistory(&historypb.History{Events: events}))
}

//lint:ignore U1000 used for debugging.
func (s *IntegrationBase) printWorkflowHistoryCompact(namespace string, execution *commonpb.WorkflowExecution) {
events := s.getHistory(namespace, execution)
fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events}))
_, _ = fmt.Println(s.formatHistoryCompact(&historypb.History{Events: events}))
}

func (s *IntegrationBase) getHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
Expand Down Expand Up @@ -340,22 +343,146 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin
s.T().Helper()
var sb strings.Builder
for _, event := range history.Events {
sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType()))
_, _ = sb.WriteString(fmt.Sprintf("%3d %s\n", event.GetEventId(), event.GetEventType()))
}
if sb.Len() > 0 {
return sb.String()[:sb.Len()-1]
}
return ""
}

func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) {
func (s *IntegrationBase) formatHistory(history *historypb.History) string {
s.T().Helper()
expectedHistoryTrimmed := strings.Trim(expectedHistory, "\n")
actualHistoryStr := s.formatHistoryCompact(actualHistory)
s.Equal(expectedHistoryTrimmed, actualHistoryStr)
var sb strings.Builder
for _, event := range history.Events {
eventAttrs := reflect.ValueOf(event.Attributes).Elem().Field(0).Elem().Interface()
eventAttrsMap := s.structToMap(eventAttrs)
eventAttrsJson, err := json.Marshal(eventAttrsMap)
s.NoError(err)
_, _ = sb.WriteString(fmt.Sprintf("%3d %s %s\n", event.GetEventId(), event.GetEventType(), string(eventAttrsJson)))
}
if sb.Len() > 0 {
return sb.String()[:sb.Len()-1]
}
return ""
}

func (s *IntegrationBase) structToMap(strct any) map[string]any {

strctV := reflect.ValueOf(strct)
strctT := strctV.Type()

ret := map[string]any{}

for i := 0; i < strctV.NumField(); i++ {
field := strctV.Field(i)
var fieldData any

if field.Kind() == reflect.Pointer && field.IsNil() {
continue
} else if field.Kind() == reflect.Pointer && field.Elem().Kind() == reflect.Struct {
fieldData = s.structToMap(field.Elem().Interface())
} else if field.Kind() == reflect.Struct {
fieldData = s.structToMap(field.Interface())
} else {
fieldData = field.Interface()
}
ret[strctT.Field(i).Name] = fieldData
}

return ret
}

func (s *IntegrationBase) EqualHistoryEvents(expectedHistory string, actualHistoryEvents []*historypb.HistoryEvent) {
s.T().Helper()
s.EqualHistory(expectedHistory, &historypb.History{Events: actualHistoryEvents})
}

func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) {
s.T().Helper()
expectedCompactHistory, expectedEventsAttributes := s.parseHistory(expectedHistory)
actualCompactHistory := s.formatHistoryCompact(actualHistory)
s.Equal(expectedCompactHistory, actualCompactHistory)
for _, actualHistoryEvent := range actualHistory.Events {
if expectedEventAttributes, ok := expectedEventsAttributes[actualHistoryEvent.EventId]; ok {
actualEventAttributes := reflect.ValueOf(actualHistoryEvent.Attributes).Elem().Field(0).Elem()
s.equalStructToMap(expectedEventAttributes, actualEventAttributes, actualHistoryEvent.EventId, "")
}
}
}

func (s *IntegrationBase) equalStructToMap(expectedMap map[string]any, actualStructV reflect.Value, eventID int64, attrPrefix string) {
s.T().Helper()

for attrName, expectedValue := range expectedMap {
actualFieldV := actualStructV.FieldByName(attrName)
if actualFieldV.Kind() == reflect.Invalid {
s.Failf("", "Expected property %s%s wasn't found for EventID=%v", attrPrefix, attrName, eventID)
}

if ep, ok := expectedValue.(map[string]any); ok {
if actualFieldV.IsNil() {
s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was nil", attrPrefix, attrName, eventID)
}
if actualFieldV.Kind() == reflect.Pointer {
actualFieldV = actualFieldV.Elem()
}
if actualFieldV.Kind() != reflect.Struct {
s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was of type %s", attrPrefix, attrName, eventID, actualFieldV.Type().String())
}
s.equalStructToMap(ep, actualFieldV, eventID, attrPrefix+attrName+".")
continue
}
actualFieldValue := actualFieldV.Interface()
s.EqualValues(expectedValue, actualFieldValue, "Values of %s%s property are not equal for EventID=%v", attrPrefix, attrName, eventID)
}
}

// parseHistory accept history in a formatHistory format and returns compact history string and map of eventID to map of event attributes.
func (s *IntegrationBase) parseHistory(expectedHistory string) (string, map[int64]map[string]any) {
s.T().Helper()
h := &historypb.History{}
eventsAttrs := make(map[int64]map[string]any)
prevEventID := 0
for lineNum, eventLine := range strings.Split(expectedHistory, "\n") {
fields := strings.Fields(eventLine)
if len(fields) == 0 {
continue
}
if len(fields) < 2 {
s.FailNowf("", "Not enough fields on line %d", lineNum+1)
}
eventID, err := strconv.Atoi(fields[0])
if err != nil {
s.FailNowf(err.Error(), "Failed to parse EventID on line %d", lineNum+1)
}
if eventID != prevEventID+1 && prevEventID != 0 {
s.FailNowf("", "Wrong EventID sequence after EventID %d on line %d", prevEventID, lineNum+1)
}
prevEventID = eventID
eventType, ok := enumspb.EventType_value[fields[1]]
if !ok {
s.FailNowf("", "Unknown event type %s for EventID=%d", fields[1], lineNum+1)
}
h.Events = append(h.Events, &historypb.HistoryEvent{
EventId: int64(eventID),
EventType: enumspb.EventType(eventType),
})
var jb strings.Builder
for i := 2; i < len(fields); i++ {
if strings.HasPrefix(fields[i], "//") {
break
}
_, _ = jb.WriteString(fields[i])
}
if jb.Len() > 0 {
var eventAttrs map[string]any
err := json.Unmarshal([]byte(jb.String()), &eventAttrs)
if err != nil {
s.FailNowf(err.Error(), "Failed to unmarshal attributes %q for EventID=%d", jb.String(), lineNum+1)
}
eventsAttrs[int64(eventID)] = eventAttrs
}
}
return s.formatHistoryCompact(h), eventsAttrs
}
13 changes: 4 additions & 9 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,16 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
6 WorkflowTaskScheduled // Normal WT events were written for speculative WT at complete.
7 WorkflowTaskStarted
8 WorkflowTaskCompleted
9 WorkflowExecutionUpdateAccepted
10 WorkflowExecutionUpdateCompleted
10 WorkflowExecutionUpdateCompleted {"AcceptedEventId": 9}
11 WorkflowTaskScheduled
12 WorkflowTaskStarted
13 WorkflowTaskCompleted
14 WorkflowExecutionCompleted`, events)
14 WorkflowExecutionCompleted
`, events)
})
}
}
Expand Down Expand Up @@ -2965,8 +2966,6 @@ func (s *integrationSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbeat(
s.Equal(4, wtHandlerCalls)
s.Equal(4, msgHandlerCalls)

s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution())

events := s.getHistory(s.namespace, tv.WorkflowExecution())
s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
Expand Down Expand Up @@ -3636,15 +3635,11 @@ func (s *integrationSuite) TestUpdateWorkflow_CompletedSpeculativeWorkflowTask_D
updateResult := <-updateResultCh
s.EqualValues(tv.String("success-result", "1"), decodeString(s, updateResult.GetOutcome().GetSuccess()))

s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution())

if tc.CloseShard {
// Close shard to make sure that for completed updates deduplication works even after shard reload.
s.closeShard(tv.WorkflowID())
}

s.printWorkflowHistoryCompact(s.namespace, tv.WorkflowExecution())

// Send second update with the same ID.
updateResultCh2 := make(chan *workflowservice.UpdateWorkflowExecutionResponse)
go func() {
Expand Down

0 comments on commit a51abcf

Please sign in to comment.