Skip to content

Commit

Permalink
Add support of attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jun 6, 2023
1 parent 082478a commit 6e518a2
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 44 deletions.
162 changes: 120 additions & 42 deletions tests/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -231,7 +234,7 @@ func (s *IntegrationBase) randomizeStr(id string) string {

func (s *IntegrationBase) printWorkflowHistory(namespace string, execution *commonpb.WorkflowExecution) {
events := s.getHistory(namespace, execution)
common.PrettyPrint(events)
fmt.Println(s.formatHistory(&historypb.History{Events: events}))
}

//lint:ignore U1000 used for debugging.
Expand Down Expand Up @@ -348,61 +351,136 @@ func (s *IntegrationBase) formatHistoryCompact(history *historypb.History) strin
return ""
}

func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) {
func (s *IntegrationBase) formatHistory(history *historypb.History) string {
s.T().Helper()
expectedHistorySanitized := s.sanitizeHistory(expectedHistory)
actualHistoryStr := s.formatHistoryCompact(actualHistory)
s.Equal(expectedHistorySanitized, actualHistoryStr)
var sb strings.Builder
for _, event := range history.Events {
eventAttrs := reflect.ValueOf(event.Attributes).Elem().Field(0).Elem().Interface()
eventAttrsMap := s.structToMap(eventAttrs)
eventAttrsJson, err := json.Marshal(eventAttrsMap)
s.NoError(err)
sb.WriteString(fmt.Sprintf("%3d %s %s\n", event.GetEventId(), event.GetEventType(), string(eventAttrsJson)))
}
if sb.Len() > 0 {
return sb.String()[:sb.Len()-1]
}
return ""
}

func (s *IntegrationBase) structToMap(strct any) map[string]any {

strctV := reflect.ValueOf(strct)
strctT := strctV.Type()

ret := map[string]any{}

for i := 0; i < strctV.NumField(); i++ {
field := strctV.Field(i)
var fieldData any

if field.Kind() == reflect.Pointer && field.IsNil() {
continue
} else if field.Kind() == reflect.Pointer && field.Elem().Kind() == reflect.Struct {
fieldData = s.structToMap(field.Elem().Interface())
} else if field.Kind() == reflect.Struct {
fieldData = s.structToMap(field.Interface())
} else {
fieldData = field.Interface()
}
ret[strctT.Field(i).Name] = fieldData
}

return ret
}

func (s *IntegrationBase) EqualHistoryEvents(expectedHistory string, actualHistoryEvents []*historypb.HistoryEvent) {
s.T().Helper()
s.EqualHistory(expectedHistory, &historypb.History{Events: actualHistoryEvents})
}

// sanitizeHistory removes all empty new lines and comments from expectedHistory string.
func (s *IntegrationBase) sanitizeHistory(expectedHistory string) string {
type state int
const (
stateStart state = iota
stateNormal
stateComment
stateNewLine
)
func (s *IntegrationBase) EqualHistory(expectedHistory string, actualHistory *historypb.History) {
s.T().Helper()
expectedCompactHistory, expectedEventsAttributes := s.parseHistory(expectedHistory)
actualCompactHistory := s.formatHistoryCompact(actualHistory)
s.Equal(expectedCompactHistory, actualCompactHistory)
for eventID, expectedEventAttributes := range expectedEventsAttributes {
actualEventAttributes := reflect.ValueOf(actualHistory.Events[eventID-1].Attributes).Elem().Field(0).Elem()
s.equalStructToMap(expectedEventAttributes, actualEventAttributes, eventID, "")
}
}

func (s *IntegrationBase) equalStructToMap(expectedMap map[string]any, actualStructV reflect.Value, eventID int64, attrPrefix string) {
s.T().Helper()

for attrName, expectedValue := range expectedMap {
actualFieldV := actualStructV.FieldByName(attrName)
if actualFieldV.Kind() == reflect.Invalid {
s.Failf("", "Expected property %s%s wasn't found for EventID=%v", attrPrefix, attrName, eventID)
}

var ret strings.Builder
st := stateStart
spaceCounter := 0
for _, r := range expectedHistory {
switch r {
case '\n':
if st == stateStart {
continue
if ep, ok := expectedValue.(map[string]any); ok {
if actualFieldV.IsNil() {
s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was nil", attrPrefix, attrName, eventID)
}
spaceCounter = 0
st = stateNewLine
case '/':
spaceCounter = 0
st = stateComment
case ' ':
if st == stateComment {
continue
if actualFieldV.Kind() == reflect.Pointer {
actualFieldV = actualFieldV.Elem()
}
spaceCounter++
default:
if st == stateComment {
continue
if actualFieldV.Kind() != reflect.Struct {
s.Failf("", "Value of property %s%s for EventID=%v expected to be struct but was of type %s", attrPrefix, attrName, eventID, actualFieldV.Type().String())
}
if st == stateNewLine {
_, _ = ret.WriteRune('\n')
s.equalStructToMap(ep, actualFieldV, eventID, attrPrefix+attrName+".")
continue
}
actualFieldValue := actualFieldV.Interface()
s.EqualValues(expectedValue, actualFieldValue, "Values of %s%s property are not equal for EventID=%v", attrPrefix, attrName, eventID)
}
}

// parseHistory accept history in a formatHistory format and returns compact history string and map of eventID to map of event attributes.
func (s *IntegrationBase) parseHistory(expectedHistory string) (string, map[int64]map[string]any) {
s.T().Helper()
h := &historypb.History{}
eventsAttrs := make(map[int64]map[string]any)
prevEventID := 0
for lineNum, eventLine := range strings.Split(expectedHistory, "\n") {
fields := strings.Fields(eventLine)
if len(fields) == 0 {
continue
}
if len(fields) < 2 {
s.FailNowf("", "Not enough fields in line %d", lineNum+1)
}
eventID, err := strconv.Atoi(fields[0])
if err != nil {
s.FailNowf(err.Error(), "Failed to parse EventID in line %d", lineNum+1)
}
if eventID != prevEventID+1 && prevEventID != 0 {
s.FailNowf("", "Wrong EventID sequence after EventID %d in line %d", prevEventID, lineNum+1)
}
prevEventID = eventID
eventType, ok := enumspb.EventType_value[fields[1]]
if !ok {
s.FailNowf("", "Unknown event type %s for EventID=%d", fields[1], lineNum+1)
}
h.Events = append(h.Events, &historypb.HistoryEvent{
EventId: int64(eventID),
EventType: enumspb.EventType(eventType),
})
var jb strings.Builder
for i := 2; i < len(fields); i++ {
if strings.HasPrefix(fields[i], "//") {
break
}
if spaceCounter > 0 {
_, _ = ret.WriteString(strings.Repeat(" ", spaceCounter))
spaceCounter = 0
jb.WriteString(fields[i])
}
if jb.Len() > 0 {
var eventAttrs map[string]any
err := json.Unmarshal([]byte(jb.String()), &eventAttrs)
if err != nil {
s.FailNowf(err.Error(), "Failed to unmarshal attributes %q for EventID=%d", jb.String(), lineNum+1)
}
_, _ = ret.WriteRune(r)
st = stateNormal
eventsAttrs[int64(eventID)] = eventAttrs
}
}
return ret.String()
return s.formatHistoryCompact(h), eventsAttrs
}
5 changes: 3 additions & 2 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,18 @@ func (s *integrationSuite) TestUpdateWorkflow_NewSpeculativeWorkflowTask_AcceptC

events := s.getHistory(s.namespace, tv.WorkflowExecution())

s.printWorkflowHistory(s.namespace, tv.WorkflowExecution())
s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled // Was speculative WT but was converted to normal WT.
6 WorkflowTaskScheduled // Normal WT events were written for speculative WT at complete.
7 WorkflowTaskStarted
8 WorkflowTaskCompleted
9 WorkflowExecutionUpdateAccepted
10 WorkflowExecutionUpdateCompleted
10 WorkflowExecutionUpdateCompleted {"AcceptedEventId": 9}
11 WorkflowTaskScheduled
12 WorkflowTaskStarted
13 WorkflowTaskCompleted
Expand Down

0 comments on commit 6e518a2

Please sign in to comment.