-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow start many replication pollers in one shard #3790
Changes from all commits
52ab61f
d5ce7d2
811faa2
da5bb3a
fe395fd
ce06871
48d1d0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ package replication | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
@@ -49,6 +50,10 @@ import ( | |
wcache "go.temporal.io/server/service/history/workflow/cache" | ||
) | ||
|
||
const ( | ||
clusterCallbackKey = "%s-%d" // <cluster name>-<polling shard id> | ||
) | ||
|
||
type ( | ||
// taskProcessorManagerImpl is to manage replication task processors | ||
taskProcessorManagerImpl struct { | ||
|
@@ -62,6 +67,7 @@ type ( | |
workflowCache wcache.Cache | ||
resender xdc.NDCHistoryResender | ||
taskExecutorProvider TaskExecutorProvider | ||
taskPollerManager pollerManager | ||
metricsHandler metrics.Handler | ||
logger log.Logger | ||
|
||
|
@@ -110,6 +116,7 @@ func NewTaskProcessorManager( | |
metricsHandler: shard.GetMetricsHandler(), | ||
taskProcessors: make(map[string]TaskProcessor), | ||
taskExecutorProvider: taskExecutorProvider, | ||
taskPollerManager: newPollerManager(shard.GetShardID(), shard.GetClusterMetadata()), | ||
minTxAckedTaskID: persistence.EmptyQueueMessageID, | ||
shutdownChan: make(chan struct{}), | ||
} | ||
|
@@ -167,36 +174,39 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( | |
if clusterName == currentClusterName { | ||
continue | ||
} | ||
// The metadata triggers a update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address | ||
// The callback covers three cases: | ||
// Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. | ||
|
||
if processor, ok := r.taskProcessors[clusterName]; ok { | ||
// Case 1 and Case 3 | ||
processor.Stop() | ||
delete(r.taskProcessors, clusterName) | ||
} | ||
|
||
if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled { | ||
// Case 2 and Case 3 | ||
fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) | ||
replicationTaskProcessor := NewTaskProcessor( | ||
r.shard, | ||
r.engine, | ||
r.config, | ||
r.shard.GetMetricsHandler(), | ||
fetcher, | ||
r.taskExecutorProvider(TaskExecutorParams{ | ||
RemoteCluster: clusterName, | ||
Shard: r.shard, | ||
HistoryResender: r.resender, | ||
DeleteManager: r.deleteMgr, | ||
WorkflowCache: r.workflowCache, | ||
}), | ||
r.eventSerializer, | ||
) | ||
replicationTaskProcessor.Start() | ||
r.taskProcessors[clusterName] = replicationTaskProcessor | ||
sourceShardIds := r.taskPollerManager.getSourceClusterShardIDs(clusterName) | ||
for _, sourceShardId := range sourceShardIds { | ||
perShardTaskProcessorKey := fmt.Sprintf(clusterCallbackKey, clusterName, sourceShardId) | ||
// The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address | ||
// The callback covers three cases: | ||
// Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. | ||
if processor, ok := r.taskProcessors[perShardTaskProcessorKey]; ok { | ||
// Case 1 and Case 3 | ||
processor.Stop() | ||
delete(r.taskProcessors, perShardTaskProcessorKey) | ||
} | ||
if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if a cluster exists in both old & new cluster metadata? stop & restart? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if a cluster exists in both old and new, it means the metadata is updated. So stop the processor and restart it to load the new metadata. If a cluster only exists in new cluster. It means this cluster is newly added, so we just start a new processor. |
||
// Case 2 and Case 3 | ||
fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) | ||
replicationTaskProcessor := NewTaskProcessor( | ||
sourceShardId, | ||
r.shard, | ||
r.engine, | ||
r.config, | ||
r.shard.GetMetricsHandler(), | ||
fetcher, | ||
r.taskExecutorProvider(TaskExecutorParams{ | ||
RemoteCluster: clusterName, | ||
Shard: r.shard, | ||
HistoryResender: r.resender, | ||
DeleteManager: r.deleteMgr, | ||
WorkflowCache: r.workflowCache, | ||
}), | ||
r.eventSerializer, | ||
) | ||
replicationTaskProcessor.Start() | ||
r.taskProcessors[perShardTaskProcessorKey] = replicationTaskProcessor | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why will it break? Using local shardID for storing DLQ seems the right behavior to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Here is ok. But when hydrate the replication task from the source cluster, it needs to calculate the shard id instead of using this. We will not hit this case in a short term.