Skip to content

Commit

Permalink
Implement Tiered replication stream sender (temporalio#5899)
Browse files Browse the repository at this point in the history
- Initial commit
- Implement tiered replication sender

## What changed?
<!-- Describe what has changed in this PR -->
Implement tiered replication sender
## Why?
<!-- Tell your future self why have you made these changes -->
To allow sender to send task based on priority
## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
unit test
## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
n/a
## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
n/a
## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
no
  • Loading branch information
xwduan authored May 16, 2024
1 parent e55476d commit 12f46f7
Show file tree
Hide file tree
Showing 5 changed files with 699 additions and 38 deletions.
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,11 @@ that task will be sent to DLQ.`,
false,
`EnableReplicateLocalGeneratedEvents is a feature flag for replicating locally generated events`,
)
EnableReplicationTaskTieredProcessing = NewGlobalBoolSetting(
"history.EnableReplicationTaskTieredProcessing",
false,
`EnableReplicationTaskTieredProcessing is a feature flag for enabling tiered replication task processing stack`,
)

// keys for worker

Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ type Config struct {
EnableReplicationEagerRefreshNamespace dynamicconfig.BoolPropertyFn
EnableReplicationTaskBatching dynamicconfig.BoolPropertyFn
EnableReplicateLocalGeneratedEvent dynamicconfig.BoolPropertyFn
EnableReplicationTaskTieredProcessing dynamicconfig.BoolPropertyFn

// The following are used by consistent query
MaxBufferedQueryCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -490,6 +491,7 @@ func NewConfig(
EnableReplicationEagerRefreshNamespace: dynamicconfig.EnableEagerNamespaceRefresher.Get(dc),
EnableReplicationTaskBatching: dynamicconfig.EnableReplicationTaskBatching.Get(dc),
EnableReplicateLocalGeneratedEvent: dynamicconfig.EnableReplicateLocalGeneratedEvents.Get(dc),
EnableReplicationTaskTieredProcessing: dynamicconfig.EnableReplicationTaskTieredProcessing.Get(dc),

MaximumBufferedEventsBatch: dynamicconfig.MaximumBufferedEventsBatch.Get(dc),
MaximumBufferedEventsSizeInBytes: dynamicconfig.MaximumBufferedEventsSizeInBytes.Get(dc),
Expand Down
1 change: 1 addition & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,6 +2069,7 @@ func (h *Handler) StreamWorkflowReplicationMessages(
clientClusterName,
replication.NewClusterShardKey(clientClusterShardID.ClusterID, clientClusterShardID.ShardID),
replication.NewClusterShardKey(serverClusterShardID.ClusterID, serverClusterShardID.ShardID),
h.config,
)
h.streamReceiverMonitor.RegisterInboundStream(streamSender)
streamSender.Start()
Expand Down
Loading

0 comments on commit 12f46f7

Please sign in to comment.