-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow start many replication pollers in one shard #3790
Changes from 3 commits
52ab61f
d5ce7d2
811faa2
da5bb3a
fe395fd
ce06871
48d1d0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,9 +74,10 @@ type ( | |
|
||
// taskProcessorImpl is responsible for processing replication tasks for a shard. | ||
taskProcessorImpl struct { | ||
currentCluster string | ||
status int32 | ||
|
||
sourceCluster string | ||
status int32 | ||
pollingShardID int32 | ||
shard shard.Context | ||
historyEngine shard.Engine | ||
historySerializer serialization.Serializer | ||
|
@@ -109,6 +110,7 @@ type ( | |
|
||
// NewTaskProcessor creates a new replication task processor. | ||
func NewTaskProcessor( | ||
pollingShardID int32, | ||
shard shard.Context, | ||
historyEngine shard.Engine, | ||
config *configs.Config, | ||
|
@@ -117,24 +119,23 @@ func NewTaskProcessor( | |
replicationTaskExecutor TaskExecutor, | ||
eventSerializer serialization.Serializer, | ||
) TaskProcessor { | ||
shardID := shard.GetShardID() | ||
taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)). | ||
WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)). | ||
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)). | ||
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)). | ||
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID)) | ||
taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)). | ||
WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)). | ||
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)). | ||
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)). | ||
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID)) | ||
|
||
// TODO: define separate set of configs for dlq retry | ||
dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)). | ||
WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)). | ||
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)). | ||
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)). | ||
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID)) | ||
dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)). | ||
WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)). | ||
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)). | ||
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)). | ||
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID)) | ||
|
||
return &taskProcessorImpl{ | ||
currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), | ||
sourceCluster: replicationTaskFetcher.getSourceCluster(), | ||
status: common.DaemonStatusInitialized, | ||
pollingShardID: pollingShardID, | ||
sourceCluster: replicationTaskFetcher.getSourceCluster(), | ||
shard: shard, | ||
historyEngine: historyEngine, | ||
historySerializer: eventSerializer, | ||
|
@@ -370,6 +371,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask( | |
switch replicationTask.TaskType { | ||
case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK: | ||
taskAttributes := replicationTask.GetSyncActivityTaskAttributes() | ||
// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication. | ||
return &persistence.PutReplicationTaskToDLQRequest{ | ||
ShardID: p.shard.GetShardID(), | ||
SourceClusterName: p.sourceCluster, | ||
|
@@ -401,6 +403,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask( | |
// NOTE: last event vs next event, next event ID is exclusive | ||
nextEventID := lastEvent.GetEventId() + 1 | ||
|
||
// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why will it break? Using local shardID for storing DLQ seems the right behavior to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Here is ok. But when hydrate the replication task from the source cluster, it needs to calculate the shard id instead of using this. We will not hit this case in a short term. |
||
return &persistence.PutReplicationTaskToDLQRequest{ | ||
ShardID: p.shard.GetShardID(), | ||
SourceClusterName: p.sourceCluster, | ||
|
@@ -429,6 +432,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask( | |
return nil, err | ||
} | ||
|
||
// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication. | ||
return &persistence.PutReplicationTaskToDLQRequest{ | ||
ShardID: p.shard.GetShardID(), | ||
SourceClusterName: p.sourceCluster, | ||
|
@@ -451,7 +455,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error | |
respChan := make(chan *replicationspb.ReplicationMessages, 1) | ||
p.requestChan <- &replicationTaskRequest{ | ||
token: &replicationspb.ReplicationToken{ | ||
ShardId: p.shard.GetShardID(), | ||
ShardId: p.pollingShardID, | ||
LastProcessedMessageId: p.maxRxProcessedTaskID, | ||
LastProcessedVisibilityTime: &p.maxRxProcessedTimestamp, | ||
LastRetrievedMessageId: p.maxRxReceivedTaskID, | ||
|
@@ -486,7 +490,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error | |
if resp.GetHasMore() { | ||
p.rxTaskBackoff = time.Duration(0) | ||
} else { | ||
p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.shard.GetShardID()) | ||
p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.pollingShardID) | ||
} | ||
return tasks, nil, nil | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ package replication | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
@@ -49,6 +50,10 @@ import ( | |
wcache "go.temporal.io/server/service/history/workflow/cache" | ||
) | ||
|
||
const ( | ||
clusterCallbackKey = "%s-%d" // <cluster name>-<polling shard id> | ||
) | ||
|
||
type ( | ||
// taskProcessorManagerImpl is to manage replication task processors | ||
taskProcessorManagerImpl struct { | ||
|
@@ -62,6 +67,7 @@ type ( | |
workflowCache wcache.Cache | ||
resender xdc.NDCHistoryResender | ||
taskExecutorProvider TaskExecutorProvider | ||
taskPollerManager pollerManager | ||
metricsHandler metrics.Handler | ||
logger log.Logger | ||
|
||
|
@@ -110,6 +116,7 @@ func NewTaskProcessorManager( | |
metricsHandler: shard.GetMetricsHandler(), | ||
taskProcessors: make(map[string]TaskProcessor), | ||
taskExecutorProvider: taskExecutorProvider, | ||
taskPollerManager: newPollerManager(shard.GetShardID(), shard.GetClusterMetadata()), | ||
minTxAckedTaskID: persistence.EmptyQueueMessageID, | ||
shutdownChan: make(chan struct{}), | ||
} | ||
|
@@ -167,37 +174,40 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( | |
if clusterName == currentClusterName { | ||
continue | ||
} | ||
// The metadata triggers a update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address | ||
// The callback covers three cases: | ||
// Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. | ||
|
||
if processor, ok := r.taskProcessors[clusterName]; ok { | ||
// Case 1 and Case 3 | ||
processor.Stop() | ||
delete(r.taskProcessors, clusterName) | ||
} | ||
|
||
if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled { | ||
// Case 2 and Case 3 | ||
fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) | ||
replicationTaskProcessor := NewTaskProcessor( | ||
r.shard, | ||
r.engine, | ||
r.config, | ||
r.shard.GetMetricsHandler(), | ||
fetcher, | ||
r.taskExecutorProvider(TaskExecutorParams{ | ||
RemoteCluster: clusterName, | ||
Shard: r.shard, | ||
HistoryResender: r.resender, | ||
HistoryEngine: r.engine, | ||
DeleteManager: r.deleteMgr, | ||
WorkflowCache: r.workflowCache, | ||
}), | ||
r.eventSerializer, | ||
) | ||
replicationTaskProcessor.Start() | ||
r.taskProcessors[clusterName] = replicationTaskProcessor | ||
pollingShardIds := r.taskPollerManager.getSourceClusterShardIDs(clusterName) | ||
for _, pollingShardId := range pollingShardIds { | ||
perShardTaskProcessorKey := fmt.Sprintf(clusterCallbackKey, clusterName, pollingShardId) | ||
// The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address | ||
// The callback covers three cases: | ||
// Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. | ||
if processor, ok := r.taskProcessors[perShardTaskProcessorKey]; ok { | ||
// Case 1 and Case 3 | ||
processor.Stop() | ||
delete(r.taskProcessors, perShardTaskProcessorKey) | ||
} | ||
if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if a cluster exists in both old & new cluster metadata? stop & restart? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if a cluster exists in both old and new, it means the metadata is updated. So stop the processor and restart it to load the new metadata. If a cluster only exists in new cluster. It means this cluster is newly added, so we just start a new processor. |
||
// Case 2 and Case 3 | ||
fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) | ||
replicationTaskProcessor := NewTaskProcessor( | ||
pollingShardId, | ||
r.shard, | ||
r.engine, | ||
r.config, | ||
r.shard.GetMetricsHandler(), | ||
fetcher, | ||
r.taskExecutorProvider(TaskExecutorParams{ | ||
RemoteCluster: clusterName, | ||
Shard: r.shard, | ||
HistoryResender: r.resender, | ||
HistoryEngine: r.engine, | ||
DeleteManager: r.deleteMgr, | ||
WorkflowCache: r.workflowCache, | ||
}), | ||
r.eventSerializer, | ||
) | ||
replicationTaskProcessor.Start() | ||
r.taskProcessors[perShardTaskProcessorKey] = replicationTaskProcessor | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config cannot be based on pollingShardID, right? or why would it be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you have 3 clusters and the shard counts are all different, then what will you specify in the config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. You are right. I am going to revert this.