From 018d5973d0bbd1e00b7cd9b368eba11f1fe69841 Mon Sep 17 00:00:00 2001 From: Wenquan Xing Date: Thu, 31 May 2018 10:26:13 -0700 Subject: [PATCH 1/6] bugfix: reset mutable state --- service/history/conflictResolver.go | 3 ++- service/history/historyReplicator.go | 2 +- service/history/mutableStateBuilder.go | 7 +++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/service/history/conflictResolver.go b/service/history/conflictResolver.go index ed855baadd2..42ef9a847af 100644 --- a/service/history/conflictResolver.go +++ b/service/history/conflictResolver.go @@ -52,7 +52,7 @@ func newConflictResolver(shard ShardContext, context *workflowExecutionContext, } } -func (r *conflictResolver) reset(replayEventID int64, startTime time.Time) (*mutableStateBuilder, error) { +func (r *conflictResolver) reset(sourceCluster string, replayEventID int64, startTime time.Time) (*mutableStateBuilder, error) { domainID := r.context.domainID execution := r.context.workflowExecution replayNextEventID := replayEventID + 1 @@ -106,6 +106,7 @@ func (r *conflictResolver) reset(replayEventID int64, startTime time.Time) (*mut resetMutableStateBuilder.executionInfo.StartTimestamp = startTime // the last updated time is not important here, since this should be updated with event time afterwards resetMutableStateBuilder.executionInfo.LastUpdatedTimestamp = startTime + resetMutableStateBuilder.updateReplicationStateLastEventID(sourceCluster, replayEventID) r.logger.Infof("All events applied for execution. WorkflowID: %v, RunID: %v, NextEventID: %v", execution.GetWorkflowId(), execution.GetRunId(), resetMutableStateBuilder.GetNextEventID()) diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 38b2a39cfaf..403c95dd976 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -142,7 +142,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE request.GetSourceCluster(), request.GetVersion(), request.GetFirstEventId(), request.GetNextEventId()) resolver := newConflictResolver(r.shard, context, r.historyMgr, r.logger) - msBuilder, err = resolver.reset(ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp) + msBuilder, err = resolver.reset(request.GetSourceCluster(), ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp) r.logger.Infof("Completed Resetting of workflow execution: Err: %v", err) if err != nil { return err diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index c8f3eb253e7..59261b7f03e 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -163,8 +163,11 @@ func newMutableStateBuilder(config *Config, logger bark.Logger) *mutableStateBui func newMutableStateBuilderWithReplicationState(config *Config, logger bark.Logger, version int64) *mutableStateBuilder { s := newMutableStateBuilder(config, logger) s.replicationState = &persistence.ReplicationState{ - StartVersion: version, - CurrentVersion: version, + StartVersion: version, + CurrentVersion: version, + LastWriteVersion: common.EmptyVersion, + LastWriteEventID: common.EmptyEventID, + LastReplicationInfo: make(map[string]*persistence.ReplicationInfo), } return s } From 3cb63cedcee0e91f7c0d7edbe6e693f45334484a Mon Sep 17 00:00:00 2001 From: Wenquan Xing Date: Thu, 31 May 2018 14:26:54 -0700 Subject: [PATCH 2/6] UT for conflict resolver --- service/history/conflictResolver.go | 10 +- service/history/conflictResolver_test.go | 299 +++++++++++++++++++++++ service/history/historyReplicator.go | 4 +- service/history/mutableStateBuilder.go | 1 + service/history/stateBuilder.go | 6 +- 5 files changed, 310 insertions(+), 10 deletions(-) create mode 100644 service/history/conflictResolver_test.go diff --git a/service/history/conflictResolver.go b/service/history/conflictResolver.go index ed855baadd2..319de98cd15 100644 --- a/service/history/conflictResolver.go +++ b/service/history/conflictResolver.go @@ -23,7 +23,6 @@ package history import ( "time" - "github.com/pborman/uuid" "github.com/uber-common/bark" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" @@ -52,7 +51,7 @@ func newConflictResolver(shard ShardContext, context *workflowExecutionContext, } } -func (r *conflictResolver) reset(replayEventID int64, startTime time.Time) (*mutableStateBuilder, error) { +func (r *conflictResolver) reset(requestID string, sourceCluster string, replayEventID int64, startTime time.Time) (*mutableStateBuilder, error) { domainID := r.context.domainID execution := r.context.workflowExecution replayNextEventID := replayEventID + 1 @@ -63,7 +62,6 @@ func (r *conflictResolver) reset(replayEventID int64, startTime time.Time) (*mut var sBuilder *stateBuilder var lastFirstEventID int64 eventsToApply := replayNextEventID - common.FirstEventID - requestID := uuid.New() for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 { history, nextPageToken, lastFirstEventID, err = r.getHistory(domainID, execution, common.FirstEventID, replayNextEventID, nextPageToken) @@ -80,7 +78,7 @@ func (r *conflictResolver) reset(replayEventID int64, startTime time.Time) (*mut history.Events = history.Events[0:eventsToApply] } - eventsToApply -= batchSize + eventsToApply -= int64(len(history.Events)) if len(history.Events) == 0 { break @@ -94,7 +92,7 @@ func (r *conflictResolver) reset(replayEventID int64, startTime time.Time) (*mut sBuilder = newStateBuilder(r.shard, resetMutableStateBuilder, r.logger) } - _, _, _, err = sBuilder.applyEvents(common.EmptyVersion, "", domainID, requestID, execution, history, nil) + _, _, _, err = sBuilder.applyEvents(sourceCluster, domainID, requestID, execution, history, nil) if err != nil { return nil, err } @@ -146,5 +144,5 @@ func (r *conflictResolver) getHistory(domainID string, execution shared.Workflow executionHistory := &shared.History{} executionHistory.Events = historyEvents - return executionHistory, nextPageToken, lastFirstEventID, nil + return executionHistory, response.NextPageToken, lastFirstEventID, nil } diff --git a/service/history/conflictResolver_test.go b/service/history/conflictResolver_test.go new file mode 100644 index 00000000000..6633470f788 --- /dev/null +++ b/service/history/conflictResolver_test.go @@ -0,0 +1,299 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "os" + "testing" + "time" + + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + "github.com/uber-go/tally" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/service/dynamicconfig" +) + +type ( + conflictResolverSuite struct { + suite.Suite + logger bark.Logger + mockExecutionMgr *mocks.ExecutionManager + mockHistoryMgr *mocks.HistoryManager + mockShardManager *mocks.ShardManager + mockClusterMetadata *mocks.ClusterMetadata + mockProducer *mocks.KafkaProducer + mockMetadataMgr *mocks.MetadataManager + mockMessagingClient messaging.Client + mockService service.Service + mockShard *shardContextImpl + mockContext *workflowExecutionContext + + conflictResolver *conflictResolver + } +) + +func TestConflictResolverSuite(t *testing.T) { + s := new(conflictResolverSuite) + suite.Run(t, s) +} + +func (s *conflictResolverSuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } + +} + +func (s *conflictResolverSuite) TearDownSuite() { + +} + +func (s *conflictResolverSuite) SetupTest() { + log2 := log.New() + log2.Level = log.DebugLevel + s.logger = bark.NewLoggerFromLogrus(log2) + s.mockHistoryMgr = &mocks.HistoryManager{} + s.mockExecutionMgr = &mocks.ExecutionManager{} + s.mockClusterMetadata = &mocks.ClusterMetadata{} + s.mockShardManager = &mocks.ShardManager{} + s.mockProducer = &mocks.KafkaProducer{} + s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) + s.mockMetadataMgr = &mocks.MetadataManager{} + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.logger) + + s.mockShard = &shardContextImpl{ + service: s.mockService, + shardInfo: &persistence.ShardInfo{ShardID: 0, RangeID: 1, TransferAckLevel: 0}, + transferSequenceNumber: 1, + executionManager: s.mockExecutionMgr, + shardManager: s.mockShardManager, + historyMgr: s.mockHistoryMgr, + maxTransferSequenceNumber: 100000, + closeCh: make(chan int, 100), + config: NewConfig(dynamicconfig.NewNopCollection(), 1), + logger: s.logger, + domainCache: cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, s.logger), + metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), + } + s.mockContext = newWorkflowExecutionContext(validDomainID, shared.WorkflowExecution{ + WorkflowId: common.StringPtr("some random workflow ID"), + RunId: common.StringPtr(validRunID), + }, s.mockShard, s.mockExecutionMgr, s.logger) + + s.conflictResolver = newConflictResolver(s.mockShard, s.mockContext, s.mockHistoryMgr, s.logger) +} + +func (s *conflictResolverSuite) TearDownTest() { +} + +func (s *conflictResolverSuite) TestGetHistory() { + domainID := s.mockContext.domainID + execution := s.mockContext.workflowExecution + nextEventID := int64(101) + + event1 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(1), + WorkflowExecutionStartedEventAttributes: &shared.WorkflowExecutionStartedEventAttributes{}, + } + event2 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(2), + DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{}, + } + event3 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(3), + DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{}, + } + event4 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(4), + DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{}, + } + event5 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(5), + ActivityTaskScheduledEventAttributes: &shared.ActivityTaskScheduledEventAttributes{}, + } + + historySerializer := persistence.NewJSONHistorySerializer() + serializedBatch1, _ := historySerializer.Serialize(persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), []*shared.HistoryEvent{event1, event2})) + serializedBatch2, _ := historySerializer.Serialize(persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), []*shared.HistoryEvent{event3})) + serializedBatch3, _ := historySerializer.Serialize(persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), []*shared.HistoryEvent{event4, event5})) + + pageToken := []byte("some random token") + s.mockHistoryMgr.On("GetWorkflowExecutionHistory", &persistence.GetWorkflowExecutionHistoryRequest{ + DomainID: domainID, + Execution: execution, + FirstEventID: common.FirstEventID, + NextEventID: nextEventID, + PageSize: defaultHistoryPageSize, + NextPageToken: nil, + }).Return(&persistence.GetWorkflowExecutionHistoryResponse{ + Events: []persistence.SerializedHistoryEventBatch{*serializedBatch1}, + NextPageToken: pageToken, + }, nil) + history, token, firstEventID, err := s.conflictResolver.getHistory(domainID, execution, common.FirstEventID, nextEventID, nil) + s.Nil(err) + s.Equal(history.Events, []*shared.HistoryEvent{event1, event2}) + s.Equal(pageToken, token) + s.Equal(firstEventID, event1.GetEventId()) + + s.mockHistoryMgr.On("GetWorkflowExecutionHistory", &persistence.GetWorkflowExecutionHistoryRequest{ + DomainID: domainID, + Execution: execution, + FirstEventID: common.FirstEventID, + NextEventID: nextEventID, + PageSize: defaultHistoryPageSize, + NextPageToken: pageToken, + }).Return(&persistence.GetWorkflowExecutionHistoryResponse{ + Events: []persistence.SerializedHistoryEventBatch{*serializedBatch2, *serializedBatch3}, + NextPageToken: nil, + }, nil) + history, token, firstEventID, err = s.conflictResolver.getHistory(domainID, execution, common.FirstEventID, nextEventID, token) + s.Nil(err) + s.Equal(history.Events, []*shared.HistoryEvent{event3, event4, event5}) + s.Empty(token) + s.Equal(firstEventID, event4.GetEventId()) +} + +func (s *conflictResolverSuite) TestReset() { + sourceCluster := "some random source cluster" + startTime := time.Now() + domainID := s.mockContext.domainID + execution := s.mockContext.workflowExecution + nextEventID := int64(2) + + event1 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(1), + Version: common.Int64Ptr(12), + WorkflowExecutionStartedEventAttributes: &shared.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &shared.WorkflowType{Name: common.StringPtr("some random workflow type")}, + TaskList: &shared.TaskList{Name: common.StringPtr("some random workflow type")}, + Input: []byte("some random input"), + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(123), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(233), + Identity: common.StringPtr("some random identity"), + }, + } + event2 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(2), + DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{}, + } + + historySerializer := persistence.NewJSONHistorySerializer() + serializedBatch, _ := historySerializer.Serialize(persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), []*shared.HistoryEvent{event1, event2})) + + s.mockHistoryMgr.On("GetWorkflowExecutionHistory", &persistence.GetWorkflowExecutionHistoryRequest{ + DomainID: domainID, + Execution: execution, + FirstEventID: common.FirstEventID, + NextEventID: nextEventID, + PageSize: defaultHistoryPageSize, + NextPageToken: nil, + }).Return(&persistence.GetWorkflowExecutionHistoryResponse{ + Events: []persistence.SerializedHistoryEventBatch{*serializedBatch}, + NextPageToken: nil, + }, nil) + + s.mockContext.updateCondition = int64(59) + createRequestID := uuid.New() + // this is only a shallow test, meaning + // the mutable state only has the minimal information + // so we can test the conflict resolver + s.mockExecutionMgr.On("ResetMutableState", &persistence.ResetMutableStateRequest{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: domainID, + WorkflowID: execution.GetWorkflowId(), + RunID: execution.GetRunId(), + TaskList: event1.WorkflowExecutionStartedEventAttributes.TaskList.GetName(), + WorkflowTypeName: event1.WorkflowExecutionStartedEventAttributes.WorkflowType.GetName(), + WorkflowTimeout: *event1.WorkflowExecutionStartedEventAttributes.ExecutionStartToCloseTimeoutSeconds, + DecisionTimeoutValue: *event1.WorkflowExecutionStartedEventAttributes.TaskStartToCloseTimeoutSeconds, + State: persistence.WorkflowStateCreated, + CloseStatus: persistence.WorkflowCloseStatusNone, + LastFirstEventID: event1.GetEventId(), + NextEventID: nextEventID, + LastProcessedEvent: common.EmptyEventID, + StartTimestamp: startTime, + LastUpdatedTimestamp: startTime, + DecisionVersion: common.EmptyVersion, + DecisionScheduleID: common.EmptyEventID, + DecisionStartedID: common.EmptyEventID, + DecisionRequestID: emptyUUID, + DecisionTimeout: 0, + DecisionAttempt: 0, + DecisionTimestamp: 0, + CreateRequestID: createRequestID, + }, + ReplicationState: &persistence.ReplicationState{ + CurrentVersion: event1.GetVersion(), + StartVersion: event1.GetVersion(), + LastWriteVersion: event1.GetVersion(), + LastWriteEventID: event1.GetEventId(), + LastReplicationInfo: map[string]*persistence.ReplicationInfo{ + sourceCluster: &persistence.ReplicationInfo{ + Version: event1.GetVersion(), + LastEventID: event1.GetEventId(), + }, + }, + }, + Condition: s.mockContext.updateCondition, + RangeID: s.mockShard.shardInfo.RangeID, + InsertActivityInfos: []*persistence.ActivityInfo{}, + InsertTimerInfos: []*persistence.TimerInfo{}, + InsertChildExecutionInfos: []*persistence.ChildExecutionInfo{}, + InsertRequestCancelInfos: []*persistence.RequestCancelInfo{}, + InsertSignalInfos: []*persistence.SignalInfo{}, + InsertSignalRequestedIDs: []string{}, + }).Return(nil).Once() + s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{ + DomainID: domainID, + Execution: execution, + }).Return(&persistence.GetWorkflowExecutionResponse{}, nil).Once() // return empty resoonse since we are not testing the load + s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(true) + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( + &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestAlternativeClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + IsGlobalDomain: true, + }, + nil, + ) + _, err := s.conflictResolver.reset(createRequestID, sourceCluster, nextEventID-1, startTime) + s.Nil(err) +} diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 38b2a39cfaf..7f2ef4e2a7b 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -142,7 +142,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE request.GetSourceCluster(), request.GetVersion(), request.GetFirstEventId(), request.GetNextEventId()) resolver := newConflictResolver(r.shard, context, r.historyMgr, r.logger) - msBuilder, err = resolver.reset(ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp) + msBuilder, err = resolver.reset(uuid.New(), request.GetSourceCluster(), ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp) r.logger.Infof("Completed Resetting of workflow execution: Err: %v", err) if err != nil { return err @@ -198,7 +198,7 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. sBuilder := newStateBuilder(r.shard, msBuilder, r.logger) - lastEvent, di, newRunStateBuilder, err := sBuilder.applyEvents(request.GetVersion(), request.GetSourceCluster(), + lastEvent, di, newRunStateBuilder, err := sBuilder.applyEvents(request.GetSourceCluster(), domainID, requestID, execution, request.History, request.NewRunHistory) if err != nil { return err diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index c8f3eb253e7..bcd8d881288 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -1157,6 +1157,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent(domainID st e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusNone e.executionInfo.LastProcessedEvent = common.EmptyEventID e.executionInfo.CreateRequestID = requestID + e.executionInfo.DecisionVersion = common.EmptyVersion e.executionInfo.DecisionScheduleID = common.EmptyEventID e.executionInfo.DecisionStartedID = common.EmptyEventID e.executionInfo.DecisionRequestID = emptyUUID diff --git a/service/history/stateBuilder.go b/service/history/stateBuilder.go index 49e29de3190..173e545a444 100644 --- a/service/history/stateBuilder.go +++ b/service/history/stateBuilder.go @@ -55,7 +55,7 @@ func newStateBuilder(shard ShardContext, msBuilder *mutableStateBuilder, logger } } -func (b *stateBuilder) applyEvents(version int64, sourceClusterName string, domainID, requestID string, +func (b *stateBuilder) applyEvents(sourceClusterName string, domainID, requestID string, execution shared.WorkflowExecution, history *shared.History, newRunHistory *shared.History) (*shared.HistoryEvent, *decisionInfo, *mutableStateBuilder, error) { var lastEvent *shared.HistoryEvent @@ -63,6 +63,8 @@ func (b *stateBuilder) applyEvents(version int64, sourceClusterName string, doma var newRunStateBuilder *mutableStateBuilder for _, event := range history.Events { lastEvent = event + // must set the current version, since this is standby here, not active + b.msBuilder.updateReplicationStateVersion(event.GetVersion()) switch event.GetEventType() { case shared.EventTypeWorkflowExecutionStarted: attributes := event.WorkflowExecutionStartedEventAttributes @@ -321,7 +323,7 @@ func (b *stateBuilder) applyEvents(version int64, sourceClusterName string, doma } // Create mutable state updates for the new run - newRunStateBuilder = newMutableStateBuilderWithReplicationState(b.shard.GetConfig(), b.logger, version) + newRunStateBuilder = newMutableStateBuilderWithReplicationState(b.shard.GetConfig(), b.logger, event.GetVersion()) newRunStateBuilder.ReplicateWorkflowExecutionStartedEvent(domainID, parentDomainID, newExecution, uuid.New(), startedAttributes) di := newRunStateBuilder.ReplicateDecisionTaskScheduledEvent( From ab296cb7094709a8d82e1be0d16b2ab2fec27e40 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Mon, 4 Jun 2018 15:58:19 -0700 Subject: [PATCH 3/6] Address code review comments --- common/logging/tags.go | 4 ++ service/history/conflictResolver.go | 4 +- service/history/historyReplicator.go | 45 +++++++++++++++------ service/history/mutableStateBuilder.go | 6 ++- service/history/stateBuilder.go | 6 +-- service/history/workflowExecutionContext.go | 11 +++-- 6 files changed, 53 insertions(+), 23 deletions(-) diff --git a/common/logging/tags.go b/common/logging/tags.go index 45d178b962a..68d7964a4d2 100644 --- a/common/logging/tags.go +++ b/common/logging/tags.go @@ -50,6 +50,9 @@ const ( TagOffset = "offset" TagScope = "scope" TagFailover = "failover" + TagVersion = "version" + TagFirstEventID = "first-event-id" + TagNextEventID = "next-event-id" // workflow logging tag values // TagWorkflowComponent Values @@ -63,6 +66,7 @@ const ( TagValueMatchingEngineComponent = "matching-engine" TagValueReplicatorComponent = "replicator" TagValueReplicationTaskProcessorComponent = "replication-task-processor" + TagValueHistoryReplicatorComponent = "history-replicator" // TagHistoryBuilderAction values TagValueActionWorkflowStarted = "add-workflowexecution-started-event" diff --git a/service/history/conflictResolver.go b/service/history/conflictResolver.go index cb69f89f708..faf4c164fcf 100644 --- a/service/history/conflictResolver.go +++ b/service/history/conflictResolver.go @@ -64,6 +64,7 @@ func (r *conflictResolver) reset(requestID string, replayEventID int64, startTim var resetMutableStateBuilder *mutableStateBuilder var sBuilder *stateBuilder var lastFirstEventID int64 + var lastEvent *shared.HistoryEvent eventsToApply := replayNextEventID - common.FirstEventID for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 { history, nextPageToken, lastFirstEventID, err = r.getHistory(domainID, execution, common.FirstEventID, @@ -88,6 +89,7 @@ func (r *conflictResolver) reset(requestID string, replayEventID int64, startTim } firstEvent := history.Events[0] + lastEvent = history.Events[len(history.Events)-1] if firstEvent.GetEventId() == common.FirstEventID { resetMutableStateBuilder = newMutableStateBuilderWithReplicationState(r.shard.GetConfig(), r.logger, firstEvent.GetVersion()) @@ -109,7 +111,7 @@ func (r *conflictResolver) reset(requestID string, replayEventID int64, startTim resetMutableStateBuilder.executionInfo.LastUpdatedTimestamp = startTime sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(resetMutableStateBuilder.GetCurrentVersion()) - resetMutableStateBuilder.updateReplicationStateLastEventID(sourceCluster, replayEventID) + resetMutableStateBuilder.updateReplicationStateLastEventID(sourceCluster, lastEvent.GetVersion(), replayEventID) r.logger.Infof("All events applied for execution. WorkflowID: %v, RunID: %v, NextEventID: %v", execution.GetWorkflowId(), execution.GetRunId(), resetMutableStateBuilder.GetNextEventID()) diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index f72e90242dc..b8b5030db54 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -59,7 +59,7 @@ func newHistoryReplicator(shard ShardContext, historyEngine *historyEngineImpl, historyMgr: historyMgr, historySerializer: persistence.NewJSONHistorySerializer(), metadataMgr: shard.GetService().GetClusterMetadata(), - logger: logger, + logger: logger.WithField(logging.TagWorkflowComponent, logging.TagValueHistoryReplicatorComponent), } return replicator @@ -84,6 +84,14 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE } defer func() { release(retError) }() + logger := r.logger.WithFields(bark.Fields{ + logging.TagWorkflowExecutionID: execution.GetWorkflowId(), + logging.TagWorkflowRunID: execution.GetRunId(), + logging.TagSourceCluster: request.GetSourceCluster(), + logging.TagVersion: request.GetVersion(), + logging.TagFirstEventID: request.GetFirstEventId(), + logging.TagNextEventID: request.GetNextEventId(), + }) var msBuilder *mutableStateBuilder firstEvent := request.History.Events[0] switch firstEvent.GetEventType() { @@ -91,8 +99,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE msBuilder, err = context.loadWorkflowExecution() if err == nil { // Workflow execution already exist, looks like a duplicate start event, it is safe to ignore it - r.logger.Infof("Dropping stale replication task for start event. WorkflowID: %v, RunID: %v, Version: %v", - execution.GetWorkflowId(), execution.GetRunId(), request.GetVersion()) + logger.Info("Dropping stale replication task for start event.") return nil } @@ -115,19 +122,19 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE if rState.LastWriteVersion > request.GetVersion() { // Replication state is already on a higher version, we can drop this event // TODO: We need to replay external events like signal to the new version - r.logger.Warnf("Dropping stale replication task. LastWriteV: %v, CurrentV: %v, TaskV: %v", - rState.LastWriteVersion, rState.CurrentVersion, request.GetVersion()) + logger.Warnf("Dropping stale replication task. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v", + rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID) return nil } // Check if this is the first event after failover if rState.LastWriteVersion < request.GetVersion() { - r.logger.Infof("First Event after replication. WorkflowID: %v, RunID: %v, CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v", - execution.GetWorkflowId(), execution.GetRunId(), rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID) + logger.Infof("First Event after replication. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v", + rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID) previousActiveCluster := r.metadataMgr.ClusterNameForFailoverVersion(rState.LastWriteVersion) ri, ok := request.ReplicationInfo[previousActiveCluster] if !ok { - r.logger.Errorf("No replication information found for previous active cluster. Previous: %v, Request: %v, ReplicationInfo: %v", + logger.Errorf("No replication information found for previous active cluster. Previous: %v, Request: %v, ReplicationInfo: %v", previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo) // TODO: Handle missing replication information @@ -136,14 +143,13 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE // Detect conflict if ri.GetLastEventId() != rState.LastWriteEventID { - r.logger.Infof("Conflict detected. State: {V: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevC: %v, V: %v, LastEvent: %v}, Task: {SourceC: %v, V: %v, First: %v, Next: %v}", + logger.Infof("Conflict detected. State: {V: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevC: %v, V: %v, LastEvent: %v}", rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, - previousActiveCluster, ri.GetVersion(), ri.GetLastEventId(), - request.GetSourceCluster(), request.GetVersion(), request.GetFirstEventId(), request.GetNextEventId()) + previousActiveCluster, ri.GetVersion(), ri.GetLastEventId()) resolver := newConflictResolver(r.shard, context, r.historyMgr, r.logger) msBuilder, err = resolver.reset(uuid.New(), ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp) - r.logger.Infof("Completed Resetting of workflow execution: Err: %v", err) + logger.Infof("Completed Resetting of workflow execution. NextEventID:%v. Err: %v", msBuilder.GetNextEventID(), err) if err != nil { return err } @@ -152,12 +158,19 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE // Check for duplicate processing of replication task if firstEvent.GetEventId() < msBuilder.GetNextEventID() { + logger.Warnf("Dropping replication task. State: {NextEvent: %v, Version: %v, LastWriteV: %v, LastWriteEvent: %v}", + msBuilder.GetNextEventID(), msBuilder.replicationState.CurrentVersion, + msBuilder.replicationState.LastWriteVersion, msBuilder.replicationState.LastWriteEventID) return nil } // Check for out of order replication task and store it in the buffer if firstEvent.GetEventId() > msBuilder.GetNextEventID() { + logger.Infof("Buffer out of order replication task. NextEvent: %v, FirstEvent: %v", + msBuilder.GetNextEventID(), firstEvent.GetEventId()) + if err := msBuilder.BufferReplicationTask(request); err != nil { + logger.Errorf("Failed to buffer out of order replication task. Err: %v", err) return errors.New("failed to add buffered replication task") } @@ -168,17 +181,25 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE // First check if there are events which needs to be flushed before applying the update err = r.FlushBuffer(context, msBuilder, request) if err != nil { + logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(), + firstEvent.GetEventId(), err) return err } // Apply the replication task err = r.ApplyReplicationTask(context, msBuilder, request) if err != nil { + logger.Errorf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(), + firstEvent.GetEventId(), err) return err } // Flush buffered replication tasks after applying the update err = r.FlushBuffer(context, msBuilder, request) + if err != nil { + logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(), + firstEvent.GetEventId(), err) + } return err } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 504e7cd9a91..b9ec883284a 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -324,7 +324,9 @@ func (e *mutableStateBuilder) updateReplicationStateVersion(version int64) { // Assumption: It is expected CurrentVersion on replication state is updated at the start of transaction when // mutableState is loaded for this workflow execution. -func (e *mutableStateBuilder) updateReplicationStateLastEventID(clusterName string, lastEventID int64) { +func (e *mutableStateBuilder) updateReplicationStateLastEventID(clusterName string, lastWriteVersion, + lastEventID int64) { + e.replicationState.LastWriteVersion = lastWriteVersion e.replicationState.LastWriteVersion = e.replicationState.CurrentVersion // TODO: Rename this to NextEventID to stay consistent naming convention with rest of code base e.replicationState.LastWriteEventID = lastEventID @@ -2137,7 +2139,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour } if newStateBuilder.replicationState != nil { - newStateBuilder.updateReplicationStateLastEventID(sourceClusterName, di.ScheduleID) + newStateBuilder.updateReplicationStateLastEventID(sourceClusterName, startedEvent.GetVersion(), di.ScheduleID) } newTransferTasks := []persistence.Task{&persistence.DecisionTask{ diff --git a/service/history/stateBuilder.go b/service/history/stateBuilder.go index 32bb46f9e8f..8e171c3fb35 100644 --- a/service/history/stateBuilder.go +++ b/service/history/stateBuilder.go @@ -58,16 +58,14 @@ func newStateBuilder(shard ShardContext, msBuilder *mutableStateBuilder, logger } } -func (b *stateBuilder) applyEvents(domainID, requestID string, execution shared.WorkflowExecution, - history *shared.History, newRunHistory *shared.History) (*shared.HistoryEvent, +func (b *stateBuilder) applyEvents(domainID, requestID string, + execution shared.WorkflowExecution, history *shared.History, newRunHistory *shared.History) (*shared.HistoryEvent, *decisionInfo, *mutableStateBuilder, error) { var lastEvent *shared.HistoryEvent var lastDecision *decisionInfo var newRunStateBuilder *mutableStateBuilder for _, event := range history.Events { lastEvent = event - // must set the current version, since this is standby here, not active - b.msBuilder.updateReplicationStateVersion(event.GetVersion()) switch event.GetEventType() { case shared.EventTypeWorkflowExecutionStarted: attributes := event.WorkflowExecutionStartedEventAttributes diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index ff31abaf7ad..f56d02e88bf 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -141,7 +141,8 @@ func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.Replica c.msBuilder.executionInfo.NextEventID = nextEventID builder := newHistoryBuilderFromEvents(request.History.Events, c.logger) - return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), transactionID) + return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), request.GetVersion(), + transactionID) } func (c *workflowExecutionContext) updateVersion() error { @@ -166,11 +167,13 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi // Only generate replication task if this is a global domain createReplicationTask := c.msBuilder.replicationState != nil - return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", transactionID) + + lastWriteVersion := c.msBuilder.GetCurrentVersion() + return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", lastWriteVersion, transactionID) } func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task, - timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string, + timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string, lastWriteVersion int64, transactionID int64) (errRet error) { defer func() { @@ -192,7 +195,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe crossDCEnabled := c.msBuilder.replicationState != nil if crossDCEnabled { lastEventID := c.msBuilder.GetNextEventID() - 1 - c.msBuilder.updateReplicationStateLastEventID(sourceCluster, lastEventID) + c.msBuilder.updateReplicationStateLastEventID(sourceCluster, lastWriteVersion, lastEventID) } // Replicator passes in a custom builder as it already has the events From d49d82e793c6264e37a2896c2b3f5aaad239d1bc Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Mon, 4 Jun 2018 17:19:31 -0700 Subject: [PATCH 4/6] fix small typo --- service/history/mutableStateBuilder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index b9ec883284a..776d576918d 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -327,7 +327,6 @@ func (e *mutableStateBuilder) updateReplicationStateVersion(version int64) { func (e *mutableStateBuilder) updateReplicationStateLastEventID(clusterName string, lastWriteVersion, lastEventID int64) { e.replicationState.LastWriteVersion = lastWriteVersion - e.replicationState.LastWriteVersion = e.replicationState.CurrentVersion // TODO: Rename this to NextEventID to stay consistent naming convention with rest of code base e.replicationState.LastWriteEventID = lastEventID if clusterName != "" { From ad84e1c7d09da8ae49c8bf892d20dd5927f23bc1 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Mon, 4 Jun 2018 17:49:46 -0700 Subject: [PATCH 5/6] fix panic on version check for close execution processing --- service/history/mutableStateBuilder.go | 7 +++++++ service/history/timerQueueProcessorBase.go | 2 +- service/history/transferQueueActiveProcessor.go | 2 +- service/history/transferQueueStandbyProcessor.go | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 776d576918d..b5df666cd41 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -318,6 +318,13 @@ func (e *mutableStateBuilder) GetCurrentVersion() int64 { return e.replicationState.CurrentVersion } +func (e *mutableStateBuilder) GetLastWriteVersion() int64 { + if e.replicationState == nil { + return common.EmptyVersion + } + return e.replicationState.LastWriteVersion +} + func (e *mutableStateBuilder) updateReplicationStateVersion(version int64) { e.replicationState.CurrentVersion = version } diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index c1e0e125463..0c7ba2e9df4 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -386,7 +386,7 @@ func (t *timerQueueProcessorBase) processDeleteHistoryEvent(task *persistence.Ti } else if msBuilder == nil { return nil } - ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetCurrentVersion(), task) + ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetLastWriteVersion(), task) if err != nil { return err } else if !ok { diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index dd1443e637f..502076c6c97 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -440,7 +440,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten return nil } - ok, err := verifyTransferTaskVersion(t.shard, domainID, msBuilder.GetCurrentVersion(), task) + ok, err := verifyTransferTaskVersion(t.shard, domainID, msBuilder.GetLastWriteVersion(), task) if err != nil { return err } else if !ok { diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index 71323aa0600..0f0480b2117 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -252,7 +252,7 @@ func (t *transferQueueStandbyProcessorImpl) processCloseExecution(transferTask * processTaskIfClosed := true return t.processTransfer(processTaskIfClosed, transferTask, func(msBuilder *mutableStateBuilder) error { - ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, msBuilder.GetCurrentVersion(), transferTask) + ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, msBuilder.GetLastWriteVersion(), transferTask) if err != nil { return err } else if !ok { From 99748b597e6de3d0419ad95cda557c8126368122 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Tue, 5 Jun 2018 09:01:56 -0700 Subject: [PATCH 6/6] fix unit tests --- service/history/historyReplicator.go | 2 +- service/history/transferQueueActiveProcessor_test.go | 2 ++ service/history/transferQueueStandbyProcessor_test.go | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index b8b5030db54..8c827739881 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -149,7 +149,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE resolver := newConflictResolver(r.shard, context, r.historyMgr, r.logger) msBuilder, err = resolver.reset(uuid.New(), ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp) - logger.Infof("Completed Resetting of workflow execution. NextEventID:%v. Err: %v", msBuilder.GetNextEventID(), err) + logger.Infof("Completed Resetting of workflow execution. NextEventID: %v. Err: %v", msBuilder.GetNextEventID(), err) if err != nil { return err } diff --git a/service/history/transferQueueActiveProcessor_test.go b/service/history/transferQueueActiveProcessor_test.go index 82e7bb816bd..e729e4442f7 100644 --- a/service/history/transferQueueActiveProcessor_test.go +++ b/service/history/transferQueueActiveProcessor_test.go @@ -524,6 +524,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_HasParent( taskID := int64(59) event = addCompleteWorkflowEvent(msBuilder, event.GetEventId(), nil) + msBuilder.updateReplicationStateLastEventID("", version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ Version: version, @@ -582,6 +583,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_NoParent() taskID := int64(59) event = addCompleteWorkflowEvent(msBuilder, event.GetEventId(), nil) + msBuilder.updateReplicationStateLastEventID("", version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ Version: version, diff --git a/service/history/transferQueueStandbyProcessor_test.go b/service/history/transferQueueStandbyProcessor_test.go index 3014c2019b4..6b60e42a7dd 100644 --- a/service/history/transferQueueStandbyProcessor_test.go +++ b/service/history/transferQueueStandbyProcessor_test.go @@ -446,6 +446,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessCloseExecution() { taskID := int64(59) event = addCompleteWorkflowEvent(msBuilder, event.GetEventId(), nil) + msBuilder.updateReplicationStateLastEventID("", version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ Version: version,