Skip to content

Commit

Permalink
Merge branch 'master' into rate-limiter-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner committed May 22, 2023
2 parents c037475 + 981d22b commit 9543764
Show file tree
Hide file tree
Showing 41 changed files with 1,436 additions and 640 deletions.
639 changes: 340 additions & 299 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

479 changes: 261 additions & 218 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ type (
// DCRedirectionPolicy contains the frontend datacenter redirection policy
DCRedirectionPolicy struct {
Policy string `yaml:"policy"`
ToDC string `yaml:"toDC"`
}

// Archival contains the config for archival
Expand Down
1 change: 1 addition & 0 deletions common/rpc/interceptor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (ti *TelemetryInterceptor) handleError(
// we emit service_error_with_type metrics, no need to emit specific metric for these known error types.
case *serviceerror.AlreadyExists,
*serviceerror.CancellationAlreadyRequested,
*serviceerror.FailedPrecondition,
*serviceerror.NamespaceInvalidState,
*serviceerror.NamespaceNotActive,
*serviceerror.NamespaceNotFound,
Expand Down
1 change: 0 additions & 1 deletion config/development-cass-archival.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-cass-es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-cass-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-cass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-cluster-a.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "selected-apis-forwarding"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-cluster-b.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "selected-apis-forwarding"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-cluster-c.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "selected-apis-forwarding"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-mysql-es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-mysql8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-postgres-es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-postgres12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-sqlite-file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion config/development-sqlite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
1 change: 0 additions & 1 deletion docker/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ clusterMetadata:

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ message GenerateLastHistoryReplicationTasksRequest {
}

message GenerateLastHistoryReplicationTasksResponse {
int64 state_transition_count = 1;
}

message GetReplicationStatusRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message WorkflowExecutionInfo {
int64 user_timer_count = 73;
int64 request_cancel_external_count = 74;
int64 signal_external_count = 75;
int64 update_count = 77;
reserved 47;
reserved 48;
reserved 49;
Expand Down
14 changes: 12 additions & 2 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/common/log/tag"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
Expand All @@ -48,6 +50,9 @@ import (
"go.temporal.io/server/service/history/workflow"
)

// Fail query fast if workflow task keeps failing (attempt >= 3).
const failQueryWorkflowTaskAttemptCount = 3

func Invoke(
ctx context.Context,
request *historyservice.QueryWorkflowRequest,
Expand Down Expand Up @@ -117,10 +122,15 @@ func Invoke(
return nil, consts.ErrWorkflowTaskNotScheduled
}

if mutableState.IsTransientWorkflowTask() {
if mutableState.GetExecutionInfo().WorkflowTaskAttempt >= failQueryWorkflowTaskAttemptCount {
// while workflow task is failing, the query to that workflow will also fail. Failing fast here to prevent wasting
// resources to load history for a query that will fail.
return nil, serviceerror.NewFailedPrecondition("Cannot query workflow due to Workflow Task in failed state.")
shard.GetLogger().Info("Fail query fast due to WorkflowTask in failed state.",
tag.WorkflowNamespace(request.Request.Namespace),
tag.WorkflowNamespaceID(workflowKey.NamespaceID),
tag.WorkflowID(workflowKey.WorkflowID),
tag.WorkflowRunID(workflowKey.RunID))
return nil, serviceerror.NewWorkflowNotReady("Unable to query workflow due to Workflow Task in failed state.")
}

// There are two ways in which queries get dispatched to workflow worker. First, queries can be dispatched on workflow tasks.
Expand Down
7 changes: 5 additions & 2 deletions service/history/api/replication/generate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func GenerateTask(
}
defer func() { wfContext.GetReleaseFn()(retError) }()

task, err := wfContext.GetMutableState().GenerateMigrationTasks()
mutableState := wfContext.GetMutableState()
task, stateTransitionCount, err := mutableState.GenerateMigrationTasks()
if err != nil {
return nil, err
}
Expand All @@ -83,5 +84,7 @@ func GenerateTask(
if err != nil {
return nil, err
}
return &historyservice.GenerateLastHistoryReplicationTasksResponse{}, nil
return &historyservice.GenerateLastHistoryReplicationTasksResponse{
StateTransitionCount: stateTransitionCount,
}, nil
}
19 changes: 19 additions & 0 deletions service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/internal/effect"
Expand All @@ -52,6 +53,11 @@ import (
"go.temporal.io/server/service/history/workflow/update"
)

const (
// Fail update fast if workflow task keeps failing (attempt >= 3).
failUpdateWorkflowTaskAttemptCount = 3
)

func Invoke(
ctx context.Context,
req *historyservice.UpdateWorkflowExecutionRequest,
Expand Down Expand Up @@ -127,6 +133,19 @@ func Invoke(
return consts.ErrWorkflowExecutionNotFound
}

if ms.GetExecutionInfo().WorkflowTaskAttempt >= failUpdateWorkflowTaskAttemptCount {
// If workflow task is constantly failing, the update to that workflow will also fail.
// Additionally, workflow update can't "fix" workflow state because updates (delivered with messages)
// are applied after events.
// Failing API call fast here to prevent wasting resources for an update that will fail.
shardCtx.GetLogger().Info("Fail update fast due to WorkflowTask in failed state.",
tag.WorkflowNamespace(req.Request.Namespace),
tag.WorkflowNamespaceID(wfKey.NamespaceID),
tag.WorkflowID(wfKey.WorkflowID),
tag.WorkflowRunID(wfKey.RunID))
return serviceerror.NewWorkflowNotReady("Unable to perform workflow execution update due to Workflow Task in failed state.")
}

updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId()
updateReg := weCtx.GetUpdateRegistry(ctx)
var alreadyExisted bool
Expand Down
2 changes: 0 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ type Config struct {
ReplicationEnableDLQMetrics dynamicconfig.BoolPropertyFn

ReplicationStreamSyncStatusDuration dynamicconfig.DurationPropertyFn
ReplicationStreamMinReconnectDuration dynamicconfig.DurationPropertyFn
ReplicationProcessorSchedulerQueueSize dynamicconfig.IntPropertyFn
ReplicationProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn

Expand Down Expand Up @@ -419,7 +418,6 @@ func NewConfig(
ReplicationEnableDLQMetrics: dc.GetBoolProperty(dynamicconfig.ReplicationEnableDLQMetrics, true),

ReplicationStreamSyncStatusDuration: dc.GetDurationProperty(dynamicconfig.ReplicationStreamSyncStatusDuration, 1*time.Second),
ReplicationStreamMinReconnectDuration: dc.GetDurationProperty(dynamicconfig.ReplicationStreamMinReconnectDuration, 4*time.Second),
ReplicationProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerQueueSize, 128),
ReplicationProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerWorkerCount, 512),

Expand Down
7 changes: 7 additions & 0 deletions service/history/replication/bi_direction_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
Send(Req) error
Recv() (<-chan StreamResp[Resp], error)
Close()
IsValid() bool
}
StreamResp[Resp any] struct {
Resp Resp
Expand Down Expand Up @@ -135,6 +136,12 @@ func (s *BiDirectionStreamImpl[Req, Resp]) Close() {
s.closeLocked()
}

func (s *BiDirectionStreamImpl[Req, Resp]) IsValid() bool {
s.Lock()
defer s.Unlock()
return s.status != streamStatusClosed
}

func (s *BiDirectionStreamImpl[Req, Resp]) closeLocked() {
if s.status == streamStatusClosed {
return
Expand Down
20 changes: 8 additions & 12 deletions service/history/replication/bi_direction_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,21 @@ func (s *biDirectionStreamSuite) TestLazyInit() {
s.biDirectionStream.Unlock()
s.NoError(err)
s.Equal(s.streamClient, s.biDirectionStream.streamingClient)
s.True(s.biDirectionStream.IsValid())

s.biDirectionStream.Lock()
err = s.biDirectionStream.lazyInitLocked()
s.biDirectionStream.Unlock()
s.NoError(err)
s.Equal(s.streamClient, s.biDirectionStream.streamingClient)
s.True(s.biDirectionStream.IsValid())

s.biDirectionStream.Close()
s.biDirectionStream.Lock()
err = s.biDirectionStream.lazyInitLocked()
s.biDirectionStream.Unlock()
s.Error(err)
s.False(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestSend() {
Expand All @@ -140,9 +143,7 @@ func (s *biDirectionStreamSuite) TestSend() {
s.NoError(err)
}
s.Equal(reqs, s.streamClient.requests)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusOpen, s.biDirectionStream.status)
s.True(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestSend_Err() {
Expand All @@ -152,9 +153,7 @@ func (s *biDirectionStreamSuite) TestSend_Err() {

err := s.biDirectionStream.Send(rand.Int())
s.Error(err)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusClosed, s.biDirectionStream.status)
s.False(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestRecv() {
Expand All @@ -168,9 +167,7 @@ func (s *biDirectionStreamSuite) TestRecv() {
resps = append(resps, streamResp.Resp)
}
s.Equal(s.streamClient.responses, resps)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusClosed, s.biDirectionStream.status)
s.False(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestRecv_Err() {
Expand All @@ -183,9 +180,8 @@ func (s *biDirectionStreamSuite) TestRecv_Err() {
s.Error(streamResp.Err)
_, ok := <-streamRespChan
s.False(ok)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusClosed, s.biDirectionStream.status)
s.False(s.biDirectionStream.IsValid())

}

func (p *mockStreamClientProvider) Get(
Expand Down
Loading

0 comments on commit 9543764

Please sign in to comment.