Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply 5 min delay for standby task #920

Merged
merged 9 commits into from
Jul 18, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ const (
`domain_id: ?, ` +
`workflow_id: ?, ` +
`run_id: ?, ` +
`visibility_ts: ?, ` +
`task_id: ?, ` +
`target_domain_id: ?, ` +
`target_workflow_id: ?, ` +
Expand Down Expand Up @@ -2470,6 +2471,7 @@ func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferT
domainID,
workflowID,
runID,
task.GetVisibilityTimestamp(),
task.GetTaskID(),
targetDomainID,
targetWorkflowID,
Expand Down Expand Up @@ -3156,6 +3158,8 @@ func createTransferTaskInfo(result map[string]interface{}) *TransferTaskInfo {
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "visibility_ts":
info.VisibilityTimestamp = v.(time.Time)
case "task_id":
info.TaskID = v.(int64)
case "target_domain_id":
Expand Down
16 changes: 10 additions & 6 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,13 +993,14 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() {
targetWorkflowID := "some random target domain ID"
targetRunID := uuid.New()
currentTransferID := s.GetTransferReadLevel()
now := time.Now()
tasks := []Task{
&ActivityTask{currentTransferID + 10001, domainID, tasklist, scheduleID, 111},
&DecisionTask{currentTransferID + 10002, domainID, tasklist, scheduleID, 222},
&CloseExecutionTask{currentTransferID + 10003, 333},
&CancelExecutionTask{currentTransferID + 10004, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 444},
&SignalExecutionTask{currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555},
&StartChildExecutionTask{currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666},
&ActivityTask{now, currentTransferID + 10001, domainID, tasklist, scheduleID, 111},
&DecisionTask{now, currentTransferID + 10002, domainID, tasklist, scheduleID, 222},
&CloseExecutionTask{now, currentTransferID + 10003, 333},
&CancelExecutionTask{now, currentTransferID + 10004, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 444},
&SignalExecutionTask{now, currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555},
&StartChildExecutionTask{now, currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666},
}
err2 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, nil, nil, int64(3), tasks)
s.Nil(err2, "No error expected.")
Expand All @@ -1008,6 +1009,9 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() {
s.Nil(err1, "No error expected.")
s.NotNil(txTasks, "expected valid list of tasks.")
s.Equal(len(tasks), len(txTasks))
for index := range tasks {
s.True(timeComparator(tasks[index].GetVisibilityTimestamp(), txTasks[index].VisibilityTimestamp, timePrecision))
}
s.Equal(TransferTaskTypeActivityTask, txTasks[0].TaskType)
s.Equal(TransferTaskTypeDecisionTask, txTasks[1].TaskType)
s.Equal(TransferTaskTypeCloseExecution, txTasks[2].TaskType)
Expand Down
139 changes: 122 additions & 17 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ type (
DomainID string
WorkflowID string
RunID string
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
Expand Down Expand Up @@ -265,30 +266,35 @@ type (
SetVersion(version int64)
GetTaskID() int64
SetTaskID(id int64)
GetVisibilityTimestamp() time.Time
SetVisibilityTimestamp(timestamp time.Time)
}

// ActivityTask identifies a transfer task for activity
ActivityTask struct {
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
}

// DecisionTask identifies a transfer task for decision
DecisionTask struct {
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
}

// CloseExecutionTask identifies a transfer task for deletion of execution
CloseExecutionTask struct {
TaskID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
Version int64
}

// DeleteHistoryEventTask identifies a timer task for deletion of history events of completed execution.
Expand Down Expand Up @@ -317,6 +323,7 @@ type (

// CancelExecutionTask identifies a transfer task for cancel of execution
CancelExecutionTask struct {
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
Expand All @@ -328,6 +335,7 @@ type (

// SignalExecutionTask identifies a transfer task for signal execution
SignalExecutionTask struct {
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
Expand All @@ -339,11 +347,12 @@ type (

// StartChildExecutionTask identifies a transfer task for starting child execution
StartChildExecutionTask struct {
TaskID int64
TargetDomainID string
TargetWorkflowID string
InitiatedID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
InitiatedID int64
Version int64
}

// ActivityTimeoutTask identifies a timeout task.
Expand Down Expand Up @@ -375,6 +384,7 @@ type (

// HistoryReplicationTask is the transfer task created for shipping history replication events to other clusters
HistoryReplicationTask struct {
VisibilityTimestamp time.Time
TaskID int64
FirstEventID int64
NextEventID int64
Expand Down Expand Up @@ -1009,6 +1019,16 @@ func (a *ActivityTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *ActivityTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *ActivityTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetType returns the type of the decision task
func (d *DecisionTask) GetType() int {
return TransferTaskTypeDecisionTask
Expand All @@ -1034,6 +1054,16 @@ func (d *DecisionTask) SetTaskID(id int64) {
d.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (d *DecisionTask) GetVisibilityTimestamp() time.Time {
return d.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (d *DecisionTask) SetVisibilityTimestamp(timestamp time.Time) {
d.VisibilityTimestamp = timestamp
}

// GetType returns the type of the close execution task
func (a *CloseExecutionTask) GetType() int {
return TransferTaskTypeCloseExecution
Expand All @@ -1059,6 +1089,16 @@ func (a *CloseExecutionTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *CloseExecutionTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *CloseExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetType returns the type of the delete execution task
func (a *DeleteHistoryEventTask) GetType() int {
return TaskTypeDeleteHistoryEvent
Expand All @@ -1084,6 +1124,16 @@ func (a *DeleteHistoryEventTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *DeleteHistoryEventTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *DeleteHistoryEventTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetType returns the type of the timer task
func (d *DecisionTimeoutTask) GetType() int {
return TaskTypeDecisionTimeout
Expand Down Expand Up @@ -1284,6 +1334,16 @@ func (u *CancelExecutionTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *CancelExecutionTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *CancelExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the signal transfer task
func (u *SignalExecutionTask) GetType() int {
return TransferTaskTypeSignalExecution
Expand All @@ -1309,6 +1369,16 @@ func (u *SignalExecutionTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *SignalExecutionTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *SignalExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the start child transfer task
func (u *StartChildExecutionTask) GetType() int {
return TransferTaskTypeStartChildExecution
Expand All @@ -1334,6 +1404,16 @@ func (u *StartChildExecutionTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *StartChildExecutionTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *StartChildExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the history replication task
func (a *HistoryReplicationTask) GetType() int {
return ReplicationTaskTypeHistory
Expand All @@ -1359,6 +1439,16 @@ func (a *HistoryReplicationTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *HistoryReplicationTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *HistoryReplicationTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetTaskID returns the task ID for transfer task
func (t *TransferTaskInfo) GetTaskID() int64 {
return t.TaskID
Expand All @@ -1374,6 +1464,11 @@ func (t *TransferTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetVisibilityTimestamp returns the task type for transfer task
func (t *TransferTaskInfo) GetVisibilityTimestamp() time.Time {
return t.VisibilityTimestamp
}

// String returns string
func (t *TransferTaskInfo) String() string {
return fmt.Sprintf(
Expand All @@ -1397,6 +1492,11 @@ func (t *ReplicationTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetVisibilityTimestamp returns the task type for transfer task
func (t *ReplicationTaskInfo) GetVisibilityTimestamp() time.Time {
return time.Time{}
}

// GetTaskID returns the task ID for timer task
func (t *TimerTaskInfo) GetTaskID() int64 {
return t.TaskID
Expand All @@ -1412,6 +1512,11 @@ func (t *TimerTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetVisibilityTimestamp returns the task type for transfer task
func (t *TimerTaskInfo) GetVisibilityTimestamp() time.Time {
return t.VisibilityTimestamp
}

// GetTaskType returns the task type for timer task
func (t *TimerTaskInfo) String() string {
return fmt.Sprintf(
Expand Down
9 changes: 3 additions & 6 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var keys = map[Key]string{
HistoryCacheMaxSize: "history.cacheMaxSize",
HistoryCacheTTL: "history.cacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
StandbyClusterDelay: "history.standbyClusterDelay",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
Expand All @@ -92,7 +93,6 @@ var keys = map[Key]string{
TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS",
TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval",
TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient",
TimerProcessorStandbyTaskDelay: "history.timerProcessorStandbyTaskDelay",
TransferTaskBatchSize: "history.transferTaskBatchSize",
TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS",
TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS",
Expand All @@ -106,7 +106,6 @@ var keys = map[Key]string{
TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient",
TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval",
TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval",
TransferProcessorStandbyTaskDelay: "history.transferProcessorStandbyTaskDelay",
ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize",
ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount",
ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount",
Expand Down Expand Up @@ -198,6 +197,8 @@ const (
HistoryCacheTTL
// AcquireShardInterval is interval that timer used to acquire shard
AcquireShardInterval
// StandbyClusterDelay is the atrificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay
// TimerTaskBatchSize is batch size for timer processor to process tasks
TimerTaskBatchSize
// TimerTaskWorkerCount is number of task workers for timer processor
Expand Down Expand Up @@ -226,8 +227,6 @@ const (
TimerProcessorMaxPollInterval
// TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
TimerProcessorMaxPollIntervalJitterCoefficient
// TimerProcessorStandbyTaskDelay is task delay for standby task in timer processor
TimerProcessorStandbyTaskDelay
// TransferTaskBatchSize is batch size for transferQueueProcessor
TransferTaskBatchSize
// TransferProcessorFailoverMaxPollRPS is max poll rate per second for transferQueueProcessor
Expand All @@ -254,8 +253,6 @@ const (
TransferProcessorUpdateAckInterval
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
TransferProcessorCompleteTransferInterval
// TransferProcessorStandbyTaskDelay is delay time for standby task in transferQueueProcessor
TransferProcessorStandbyTaskDelay
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize
// ReplicatorTaskWorkerCount is number of worker for ReplicatorProcessor
Expand Down
7 changes: 4 additions & 3 deletions common/time_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewRealTimeSource() *RealTimeSource {

// Now return the real current time
func (ts *RealTimeSource) Now() time.Time {
return time.Now()
return time.Now().UTC()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually the API is UtcNow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no UtcNow() function
ref: https://golang.org/pkg/time/

}

// NewFakeTimeSource returns a time source that servers
Expand All @@ -62,6 +62,7 @@ func (ts *FakeTimeSource) Now() time.Time {
}

// Update update the fake current time
func (ts *FakeTimeSource) Update(now time.Time) {
ts.now = now
func (ts *FakeTimeSource) Update(now time.Time) *FakeTimeSource {
ts.now = now.UTC()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't do UTC here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason?

return ts
}
Loading