Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handover callback #3847

Merged
merged 2 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (e *historyEngineImpl) registerNamespaceStateChangeCallback() {

e.shard.GetNamespaceRegistry().RegisterStateChangeCallback(e, func(ns *namespace.Namespace, deletedFromDb bool) {
if e.shard.GetClusterMetadata().IsGlobalNamespaceEnabled() {
e.shard.UpdateHandoverNamespaces(ns, deletedFromDb)
e.shard.UpdateHandoverNamespace(ns, deletedFromDb)
}

if deletedFromDb {
Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type (
// TODO: deprecate UpdateNamespaceNotificationVersion in v1.21 and remove
// NamespaceNotificationVersion from shardInfo proto blob
UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error
UpdateHandoverNamespaces(ns *namespace.Namespace, deletedFromDb bool)
UpdateHandoverNamespace(ns *namespace.Namespace, deletedFromDb bool)

AppendHistoryEvents(ctx context.Context, request *persistence.AppendHistoryNodesRequest, namespaceID namespace.ID, execution commonpb.WorkflowExecution) (int, error)

Expand Down
36 changes: 17 additions & 19 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,17 @@ func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVe
return nil
}

func (s *ContextImpl) UpdateHandoverNamespaces(ns *namespace.Namespace, deletedFromDb bool) {
func (s *ContextImpl) UpdateHandoverNamespace(ns *namespace.Namespace, deletedFromDb bool) {
nsName := ns.Name()
// NOTE: replication state field won't be replicated and currently we only update a namespace
// to handover state from active cluster, so the second condition will always be true. Adding
// it here to be more safe in case above assumption no longer holds in the future.
isHandoverNamespace := ns.IsGlobalNamespace() &&
ns.ActiveInCluster(s.GetClusterMetadata().GetCurrentClusterName()) &&
ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER

s.wLock()
if deletedFromDb {
if deletedFromDb || !isHandoverNamespace {
delete(s.handoverNamespaces, ns.Name())
s.wUnlock()
return
Expand All @@ -595,23 +601,15 @@ func (s *ContextImpl) UpdateHandoverNamespaces(ns *namespace.Namespace, deletedF
maxReplicationTaskID = pendingMaxReplicationTaskID
}

// NOTE: replication state field won't be replicated and currently we only update a namespace
// to handover state from active cluster, so the second condition will always be true. Adding
// it here to be more safe in case above assumption no longer holds in the future.
if ns.IsGlobalNamespace() &&
ns.ActiveInCluster(s.GetClusterMetadata().GetCurrentClusterName()) &&
ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER {

if handover, ok := s.handoverNamespaces[nsName]; ok {
if handover.NotificationVersion < ns.NotificationVersion() {
handover.NotificationVersion = ns.NotificationVersion()
handover.MaxReplicationTaskID = maxReplicationTaskID
}
} else {
s.handoverNamespaces[nsName] = &namespaceHandOverInfo{
NotificationVersion: ns.NotificationVersion(),
MaxReplicationTaskID: maxReplicationTaskID,
}
if handover, ok := s.handoverNamespaces[nsName]; ok {
if handover.NotificationVersion < ns.NotificationVersion() {
handover.NotificationVersion = ns.NotificationVersion()
handover.MaxReplicationTaskID = maxReplicationTaskID
}
} else {
s.handoverNamespaces[nsName] = &namespaceHandOverInfo{
NotificationVersion: ns.NotificationVersion(),
MaxReplicationTaskID: maxReplicationTaskID,
}
}

Expand Down
12 changes: 6 additions & 6 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.