Skip to content

Commit

Permalink
Refactor workflow execution context & mutable state (#2221)
Browse files Browse the repository at this point in the history
* Remove redundant append first event batch APIs
* Remove mutableStateSessionUpdates in favor of WorkflowSnapshot & WorkflowMutation
* Move force fail decision for failover / buffered events size limit into CloseTransactionAsMutation & CloseTransactionAsSnapshot
* Mutable state will now keep track of all transfer / timer tasks by AddTransferTasks & AddTimerTasks
* Remove the requirement for transactionID from shard when create / update workflow
* Remove special handling of LimitExceededError since buffered events size limit is handled before close transaction
  • Loading branch information
wxing1292 authored Jul 18, 2019
1 parent eaf9410 commit e18bb92
Show file tree
Hide file tree
Showing 33 changed files with 1,349 additions and 1,596 deletions.
2 changes: 0 additions & 2 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ func (d *dnsUpdater) Start() {
d.logger.Error("Failed to update peerList", tag.Error(err), tag.Address(d.dnsAddress))
}
d.currentPeers = res.newPeers
} else {
d.logger.Debug("No change in DNS lookup", tag.Address(d.dnsAddress))
}
sleepDu := now.Add(d.interval).Sub(now)
time.Sleep(sleepDu)
Expand Down
21 changes: 18 additions & 3 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ import (
"github.com/uber/cadence/common/persistence"
)

// ReplicationPolicy is the domain's replication policy,
// derived from domain's replication config
type ReplicationPolicy int

const (
// ReplicationPolicyOneCluster indicate that workflows does not need to be replicated
// applicable to local domain & global domain with one cluster
ReplicationPolicyOneCluster ReplicationPolicy = 0
// ReplicationPolicyMultiCluster indicate that workflows need to be replicated
ReplicationPolicyMultiCluster ReplicationPolicy = 1
)

const (
domainCacheInitialSize = 10 * 1024
domainCacheMaxSize = 64 * 1024
Expand Down Expand Up @@ -685,11 +697,14 @@ func (entry *DomainCacheEntry) IsDomainActive() bool {
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName
}

// CanReplicateEvent return whether the workflows within this domain should be replicated
func (entry *DomainCacheEntry) CanReplicateEvent() bool {
// GetReplicationPolicy return the derived workflow replication policy
func (entry *DomainCacheEntry) GetReplicationPolicy() ReplicationPolicy {
// frontend guarantee that the clusters always contains the active domain, so if the # of clusters is 1
// then we do not need to send out any events for replication
return entry.isGlobalDomain && len(entry.replicationConfig.Clusters) > 1
if entry.isGlobalDomain && len(entry.replicationConfig.Clusters) > 1 {
return ReplicationPolicyMultiCluster
}
return ReplicationPolicyOneCluster
}

// GetDomainNotActiveErr return err if domain is not active, nil otherwise
Expand Down
2 changes: 1 addition & 1 deletion common/service/config/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *RPCFactory) CreateDispatcher() *yarpc.Dispatcher {
func (d *RPCFactory) CreateDispatcherForOutbound(
callerName, serviceName, hostName string) *yarpc.Dispatcher {
// Setup dispatcher(outbound) for onebox
d.logger.Info("Created RPC dispatcher outbound for service '%v' for host '%v'", tag.Service(d.serviceName), tag.Address(hostName))
d.logger.Info("Created RPC dispatcher outbound", tag.Service(d.serviceName), tag.Address(hostName))
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: callerName,
Outbounds: yarpc.Outbounds{
Expand Down
116 changes: 57 additions & 59 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,29 +1122,6 @@ func (_m *mockMutableState) ClearStickyness() {
_m.Called()
}

// CloseUpdateSession provides a mock function with given fields:
func (_m *mockMutableState) CloseUpdateSession() (*mutableStateSessionUpdates, error) {
ret := _m.Called()

var r0 *mutableStateSessionUpdates
if rf, ok := ret.Get(0).(func() *mutableStateSessionUpdates); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*mutableStateSessionUpdates)
}
}

var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// CopyToPersistence provides a mock function with given fields:
func (_m *mockMutableState) CopyToPersistence() *persistence.WorkflowMutableState {
ret := _m.Called()
Expand Down Expand Up @@ -1467,22 +1444,6 @@ func (_m *mockMutableState) GetCompletionEvent() (*shared.HistoryEvent, bool) {
return r0, r1
}

// GetContinueAsNew provides a mock function with given fields:
func (_m *mockMutableState) GetContinueAsNew() *persistence.WorkflowSnapshot {
ret := _m.Called()

var r0 *persistence.WorkflowSnapshot
if rf, ok := ret.Get(0).(func() *persistence.WorkflowSnapshot); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.WorkflowSnapshot)
}
}

return r0
}

// GetCronBackoffDuration provides a mock function with given fields:
func (_m *mockMutableState) GetCronBackoffDuration() time.Duration {
ret := _m.Called()
Expand Down Expand Up @@ -2730,6 +2691,11 @@ func (_m *mockMutableState) UpdateDecision(_a0 *decisionInfo) {
_m.Called(_a0)
}

// UpdateReplicationPolicy provides a mock function with given fields: _a0
func (_m *mockMutableState) UpdateReplicationPolicy(_a0 cache.ReplicationPolicy) {
_m.Called(_a0)
}

// UpdateReplicationStateLastEventID provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) UpdateReplicationStateLastEventID(_a0 int64, _a1 int64) {
_m.Called(_a0, _a1)
Expand All @@ -2745,73 +2711,105 @@ func (_m *mockMutableState) UpdateUserTimer(_a0 string, _a1 *persistence.TimerIn
_m.Called(_a0, _a1)
}

// UpdateUserTimer provides a mock function with given fields: _a0
// AddTransferTasks provides a mock function with given fields: _a0
func (_m *mockMutableState) AddTransferTasks(_a0 ...persistence.Task) {
_m.Called(_a0)
}

// UpdateUserTimer provides a mock function with given fields: _a0
// GetTransferTasks provides a mock function with given fields:
func (_m *mockMutableState) GetTransferTasks() []persistence.Task {
ret := _m.Called()

var r0 []persistence.Task
if rf, ok := ret.Get(0).(func() []persistence.Task); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]persistence.Task)
}
}

return r0
}

// AddTimerTasks provides a mock function with given fields: _a0
func (_m *mockMutableState) AddTimerTasks(_a0 ...persistence.Task) {
_m.Called(_a0)
}

// UpdateUserTimer provides a mock function with given fields: _a0
func (_m *mockMutableState) CloseTransactionAsMutation(_a0 time.Time) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) {
ret := _m.Called(_a0)
// GetTimerTasks provides a mock function with given fields:
func (_m *mockMutableState) GetTimerTasks() []persistence.Task {
ret := _m.Called()

var r0 []persistence.Task
if rf, ok := ret.Get(0).(func() []persistence.Task); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]persistence.Task)
}
}

return r0
}

// CloseTransactionAsMutation provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) CloseTransactionAsMutation(_a0 time.Time, _a1 transactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) {
ret := _m.Called(_a0, _a1)

var r0 *persistence.WorkflowMutation
if rf, ok := ret.Get(0).(func(_a0 time.Time) *persistence.WorkflowMutation); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(time.Time, transactionPolicy) *persistence.WorkflowMutation); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.WorkflowMutation)
}
}

var r1 []*persistence.WorkflowEvents
if rf, ok := ret.Get(1).(func(_a0 time.Time) []*persistence.WorkflowEvents); ok {
r1 = rf(_a0)
if rf, ok := ret.Get(1).(func(time.Time, transactionPolicy) []*persistence.WorkflowEvents); ok {
r1 = rf(_a0, _a1)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]*persistence.WorkflowEvents)
}
}

var r2 error
if rf, ok := ret.Get(2).(func(_a0 time.Time) error); ok {
r2 = rf(_a0)
if rf, ok := ret.Get(2).(func(time.Time, transactionPolicy) error); ok {
r2 = rf(_a0, _a1)
} else {
r2 = ret.Error(2)
}

return r0, r1, r2
}

// UpdateUserTimer provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) CloseTransactionAsSnapshot(_a0 time.Time) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error) {
ret := _m.Called(_a0)
// CloseTransactionAsSnapshot provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) CloseTransactionAsSnapshot(_a0 time.Time, _a1 transactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error) {
ret := _m.Called(_a0, _a1)

var r0 *persistence.WorkflowSnapshot
if rf, ok := ret.Get(0).(func(_a0 time.Time) *persistence.WorkflowSnapshot); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(time.Time, transactionPolicy) *persistence.WorkflowSnapshot); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.WorkflowSnapshot)
}
}

var r1 []*persistence.WorkflowEvents
if rf, ok := ret.Get(1).(func(_a0 time.Time) []*persistence.WorkflowEvents); ok {
r1 = rf(_a0)
if rf, ok := ret.Get(1).(func(time.Time, transactionPolicy) []*persistence.WorkflowEvents); ok {
r1 = rf(_a0, _a1)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]*persistence.WorkflowEvents)
}
}

var r2 error
if rf, ok := ret.Get(2).(func(_a0 time.Time) error); ok {
r2 = rf(_a0)
if rf, ok := ret.Get(2).(func(time.Time, transactionPolicy) error); ok {
r2 = rf(_a0, _a1)
} else {
r2 = ret.Error(2)
}
Expand Down
Loading

0 comments on commit e18bb92

Please sign in to comment.