Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add maximum timeout protection #946

Merged
merged 6 commits into from
Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ type (
// EncodingType is an enum that represents various data encoding types
EncodingType string
)

// MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds
const MaxTaskTimeout = 31622400
10 changes: 10 additions & 0 deletions common/logging/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,13 @@ func LogCriticalErrorEvent(logger bark.Logger, msg string, err error) {
TagWorkflowErr: err,
}).Error(msg)
}

// LogDecisionTimeoutTooLarge is used to log warning msg for workflow that contains large decision timeout
func LogDecisionTimeoutTooLarge(logger bark.Logger, t int32, domain, wid, wfType string) {
logger.WithFields(bark.Fields{
"Domain": domain,
"WorkflowID": wid,
"WorkflowType": wfType,
"DecisionTimeout": t,
}).Warn("Decision timeout is too large")
}
82 changes: 59 additions & 23 deletions common/persistence/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,33 @@ const (
domainPartition = 0
defaultCloseTTLSeconds = 86400
openExecutionTTLBuffer = int64(86400) // setting it to a day to account for shard going down

maxCassandraTTL = int64(630720000) // Cassandra TTL maximum, 20 years in second
)

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` +
templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` +
`VALUES (?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` +
`VALUES (?, ?, ?, ?, ?, ?)`

templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time = ? ` +
`AND run_id = ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
templateCreateWorkflowExecutionClosedWithTTL = `INSERT INTO closed_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
`FROM open_executions ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -151,15 +161,27 @@ func (v *cassandraVisibilityPersistence) Close() {
func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
request *RecordWorkflowExecutionStartedRequest) error {
ttl := request.WorkflowTimeout + openExecutionTTLBuffer
query := v.session.Query(templateCreateWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
request.WorkflowTypeName,
ttl,
)
var query *gocql.Query
if ttl > maxCassandraTTL {
query = v.session.Query(templateCreateWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
request.WorkflowTypeName,
)
} else {
query = v.session.Query(templateCreateWorkflowExecutionStartedWithTTL,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
request.WorkflowTypeName,
ttl,
)
}
query = query.WithTimestamp(common.UnixNanoToCQLTimestamp(request.StartTimestamp))
err := query.Exec()
if err != nil {
Expand Down Expand Up @@ -196,18 +218,32 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
retention = defaultCloseTTLSeconds
}

batch.Query(templateCreateWorkflowExecutionClosed,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
retention,
)
if retention > maxCassandraTTL {
batch.Query(templateCreateWorkflowExecutionClosed,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
)
} else {
batch.Query(templateCreateWorkflowExecutionClosedWithTTL,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
retention,
)
}

batch = batch.WithTimestamp(common.UnixNanoToCQLTimestamp(request.CloseTimestamp))
err := v.session.ExecuteBatch(batch)
Expand Down
13 changes: 8 additions & 5 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ var keys = map[Key]string{
EnableGlobalDomain: "system.enableGlobalDomain",

// frontend settings
FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS",
FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize",
FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize",
FrontendRPS: "frontend.rps",
FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns",
FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS",
FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize",
FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize",
FrontendRPS: "frontend.rps",
FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns",
MaxDecisionStartToCloseTimeout: "frontend.maxDecisionStartToCloseTimeout",

// matching settings
MatchingPersistenceMaxQPS: "matching.persistenceMaxQPS",
Expand Down Expand Up @@ -155,6 +156,8 @@ const (
FrontendRPS
// FrontendHistoryMgrNumConns is for persistence cluster.NumConns
FrontendHistoryMgrNumConns
// MaxDecisionStartToCloseTimeout is max decision timeout in seconds
MaxDecisionStartToCloseTimeout

// key for matching

Expand Down
8 changes: 8 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,11 @@ func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecision
}
return matchingResp
}

// MinInt32 return smaller one of two inputs int32
func MinInt32(a, b int32) int32 {
if a < b {
return a
}
return b
}
4 changes: 2 additions & 2 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func testActivity(ctx context.Context, msg string) (string, error) {

func testDataConverterWorkflow(ctx workflow.Context, tl string) (string, error) {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
ScheduleToStartTimeout: 20 * time.Second,
StartToCloseTimeout: 40 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

Expand Down
13 changes: 8 additions & 5 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ type Config struct {

// Persistence settings
HistoryMgrNumConns dynamicconfig.IntPropertyFn

MaxDecisionStartToCloseTimeout dynamicconfig.IntPropertyFnWithDomainFilter
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection) *Config {
return &Config{
PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.FrontendPersistenceMaxQPS, 2000),
VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, 1000),
RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10),
PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.FrontendPersistenceMaxQPS, 2000),
VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, 1000),
RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10),
MaxDecisionStartToCloseTimeout: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseTimeout, 600),
}
}

Expand Down
34 changes: 34 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,23 @@ func (wh *WorkflowHandler) StartWorkflowExecution(
Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope)
}

maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(startRequest.GetDomain()))
// TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout
if startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
startRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout)
logging.LogDecisionTimeoutTooLarge(wh.Service.GetLogger(),
startRequest.GetTaskStartToCloseTimeoutSeconds(),
startRequest.GetDomain(),
startRequest.GetWorkflowId(),
startRequest.WorkflowType.GetName(),
)
}
if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() ||
startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope)
}

domainName := startRequest.GetDomain()
wh.Service.GetLogger().Debugf("Start workflow execution request domain: %v", domainName)
domainID, err := wh.domainCache.GetDomainID(domainName)
Expand Down Expand Up @@ -1626,6 +1643,23 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope)
}

maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(signalWithStartRequest.GetDomain()))
// TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout
if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
signalWithStartRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout)
logging.LogDecisionTimeoutTooLarge(wh.Service.GetLogger(),
signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds(),
signalWithStartRequest.GetDomain(),
signalWithStartRequest.GetWorkflowId(),
signalWithStartRequest.WorkflowType.GetName(),
)
}
if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() ||
signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope)
}

domainID, err := wh.domainCache.GetDomainID(signalWithStartRequest.GetDomain())
if err != nil {
return nil, wh.error(err, scope)
Expand Down
21 changes: 19 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ Update_History_Loop:
targetDomainID = domainEntry.GetInfo().ID
}

if err = validateActivityScheduleAttributes(attributes); err != nil {
if err = validateActivityScheduleAttributes(attributes, executionInfo.WorkflowTimeout); err != nil {
failDecision = true
failCause = workflow.DecisionTaskFailedCauseBadScheduleActivityAttributes
break Process_Decision_Loop
Expand Down Expand Up @@ -2428,7 +2428,7 @@ func (s *shardContextWrapper) NotifyNewHistoryEvent(event *historyEventNotificat
return err
}

func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTaskDecisionAttributes) error {
func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTaskDecisionAttributes, wfTimeout int32) error {
if attributes == nil {
return &workflow.BadRequestError{Message: "ScheduleActivityTaskDecisionAttributes is not set on decision."}
}
Expand Down Expand Up @@ -2463,6 +2463,20 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas
return &workflow.BadRequestError{Message: "A valid timeout may not be negative."}
}

// ensure activity timeout never larger than workflow timeout
if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout {
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetStartToCloseTimeoutSeconds() > wfTimeout {
attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetHeartbeatTimeoutSeconds() > wfTimeout {
attributes.HeartbeatTimeoutSeconds = common.Int32Ptr(wfTimeout)
}

validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0
validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0
validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0
Expand All @@ -2476,6 +2490,9 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas
}
} else if validScheduleToStart && validStartToClose {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds())
if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
} else {
// Deduction failed as there's not enough information to fill in missing timeouts.
return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeout is not set on decision."}
Expand Down
Loading