Skip to content

Commit

Permalink
Add maximum timeout protection (#946)
Browse files Browse the repository at this point in the history
* Add protection on multiple timeout values

* Add MaxDecisionStartToCloseTimeout

* Update host test

* Add force convert for large timeout

* make fmt

* Address comments
  • Loading branch information
vancexu authored Jul 13, 2018
1 parent 31b98fa commit 19e4de2
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 74 deletions.
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ type (
// EncodingType is an enum that represents various data encoding types
EncodingType string
)

// MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds
const MaxTaskTimeout = 31622400
10 changes: 10 additions & 0 deletions common/logging/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,13 @@ func LogCriticalErrorEvent(logger bark.Logger, msg string, err error) {
TagWorkflowErr: err,
}).Error(msg)
}

// LogDecisionTimeoutTooLarge is used to log warning msg for workflow that contains large decision timeout
func LogDecisionTimeoutTooLarge(logger bark.Logger, t int32, domain, wid, wfType string) {
logger.WithFields(bark.Fields{
"Domain": domain,
"WorkflowID": wid,
"WorkflowType": wfType,
"DecisionTimeout": t,
}).Warn("Decision timeout is too large")
}
82 changes: 59 additions & 23 deletions common/persistence/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,33 @@ const (
domainPartition = 0
defaultCloseTTLSeconds = 86400
openExecutionTTLBuffer = int64(86400) // setting it to a day to account for shard going down

maxCassandraTTL = int64(630720000) // Cassandra TTL maximum, 20 years in second
)

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` +
templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` +
`VALUES (?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` +
`VALUES (?, ?, ?, ?, ?, ?)`

templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time = ? ` +
`AND run_id = ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
templateCreateWorkflowExecutionClosedWithTTL = `INSERT INTO closed_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
`FROM open_executions ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -151,15 +161,27 @@ func (v *cassandraVisibilityPersistence) Close() {
func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
request *RecordWorkflowExecutionStartedRequest) error {
ttl := request.WorkflowTimeout + openExecutionTTLBuffer
query := v.session.Query(templateCreateWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
request.WorkflowTypeName,
ttl,
)
var query *gocql.Query
if ttl > maxCassandraTTL {
query = v.session.Query(templateCreateWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
request.WorkflowTypeName,
)
} else {
query = v.session.Query(templateCreateWorkflowExecutionStartedWithTTL,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
request.WorkflowTypeName,
ttl,
)
}
query = query.WithTimestamp(common.UnixNanoToCQLTimestamp(request.StartTimestamp))
err := query.Exec()
if err != nil {
Expand Down Expand Up @@ -196,18 +218,32 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
retention = defaultCloseTTLSeconds
}

batch.Query(templateCreateWorkflowExecutionClosed,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
retention,
)
if retention > maxCassandraTTL {
batch.Query(templateCreateWorkflowExecutionClosed,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
)
} else {
batch.Query(templateCreateWorkflowExecutionClosedWithTTL,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
retention,
)
}

batch = batch.WithTimestamp(common.UnixNanoToCQLTimestamp(request.CloseTimestamp))
err := v.session.ExecuteBatch(batch)
Expand Down
13 changes: 8 additions & 5 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ var keys = map[Key]string{
EnableGlobalDomain: "system.enableGlobalDomain",

// frontend settings
FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS",
FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize",
FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize",
FrontendRPS: "frontend.rps",
FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns",
FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS",
FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize",
FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize",
FrontendRPS: "frontend.rps",
FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns",
MaxDecisionStartToCloseTimeout: "frontend.maxDecisionStartToCloseTimeout",

// matching settings
MatchingPersistenceMaxQPS: "matching.persistenceMaxQPS",
Expand Down Expand Up @@ -155,6 +156,8 @@ const (
FrontendRPS
// FrontendHistoryMgrNumConns is for persistence cluster.NumConns
FrontendHistoryMgrNumConns
// MaxDecisionStartToCloseTimeout is max decision timeout in seconds
MaxDecisionStartToCloseTimeout

// key for matching

Expand Down
8 changes: 8 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,11 @@ func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecision
}
return matchingResp
}

// MinInt32 return smaller one of two inputs int32
func MinInt32(a, b int32) int32 {
if a < b {
return a
}
return b
}
4 changes: 2 additions & 2 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func testActivity(ctx context.Context, msg string) (string, error) {

func testDataConverterWorkflow(ctx workflow.Context, tl string) (string, error) {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
ScheduleToStartTimeout: 20 * time.Second,
StartToCloseTimeout: 40 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

Expand Down
13 changes: 8 additions & 5 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ type Config struct {

// Persistence settings
HistoryMgrNumConns dynamicconfig.IntPropertyFn

MaxDecisionStartToCloseTimeout dynamicconfig.IntPropertyFnWithDomainFilter
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection) *Config {
return &Config{
PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.FrontendPersistenceMaxQPS, 2000),
VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, 1000),
RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10),
PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.FrontendPersistenceMaxQPS, 2000),
VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, 1000),
RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10),
MaxDecisionStartToCloseTimeout: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseTimeout, 600),
}
}

Expand Down
34 changes: 34 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,23 @@ func (wh *WorkflowHandler) StartWorkflowExecution(
Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope)
}

maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(startRequest.GetDomain()))
// TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout
if startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
startRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout)
logging.LogDecisionTimeoutTooLarge(wh.Service.GetLogger(),
startRequest.GetTaskStartToCloseTimeoutSeconds(),
startRequest.GetDomain(),
startRequest.GetWorkflowId(),
startRequest.WorkflowType.GetName(),
)
}
if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() ||
startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope)
}

domainName := startRequest.GetDomain()
wh.Service.GetLogger().Debugf("Start workflow execution request domain: %v", domainName)
domainID, err := wh.domainCache.GetDomainID(domainName)
Expand Down Expand Up @@ -1626,6 +1643,23 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope)
}

maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(signalWithStartRequest.GetDomain()))
// TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout
if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
signalWithStartRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout)
logging.LogDecisionTimeoutTooLarge(wh.Service.GetLogger(),
signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds(),
signalWithStartRequest.GetDomain(),
signalWithStartRequest.GetWorkflowId(),
signalWithStartRequest.WorkflowType.GetName(),
)
}
if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() ||
signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope)
}

domainID, err := wh.domainCache.GetDomainID(signalWithStartRequest.GetDomain())
if err != nil {
return nil, wh.error(err, scope)
Expand Down
21 changes: 19 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ Update_History_Loop:
targetDomainID = domainEntry.GetInfo().ID
}

if err = validateActivityScheduleAttributes(attributes); err != nil {
if err = validateActivityScheduleAttributes(attributes, executionInfo.WorkflowTimeout); err != nil {
failDecision = true
failCause = workflow.DecisionTaskFailedCauseBadScheduleActivityAttributes
break Process_Decision_Loop
Expand Down Expand Up @@ -2428,7 +2428,7 @@ func (s *shardContextWrapper) NotifyNewHistoryEvent(event *historyEventNotificat
return err
}

func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTaskDecisionAttributes) error {
func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTaskDecisionAttributes, wfTimeout int32) error {
if attributes == nil {
return &workflow.BadRequestError{Message: "ScheduleActivityTaskDecisionAttributes is not set on decision."}
}
Expand Down Expand Up @@ -2463,6 +2463,20 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas
return &workflow.BadRequestError{Message: "A valid timeout may not be negative."}
}

// ensure activity timeout never larger than workflow timeout
if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout {
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetStartToCloseTimeoutSeconds() > wfTimeout {
attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetHeartbeatTimeoutSeconds() > wfTimeout {
attributes.HeartbeatTimeoutSeconds = common.Int32Ptr(wfTimeout)
}

validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0
validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0
validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0
Expand All @@ -2476,6 +2490,9 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas
}
} else if validScheduleToStart && validStartToClose {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds())
if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
} else {
// Deduction failed as there's not enough information to fill in missing timeouts.
return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeout is not set on decision."}
Expand Down
Loading

0 comments on commit 19e4de2

Please sign in to comment.