Skip to content

Commit

Permalink
Reimplement DC redirection handler (#3887)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Feb 4, 2023
1 parent 008f056 commit ca5be29
Show file tree
Hide file tree
Showing 22 changed files with 745 additions and 3,154 deletions.
159 changes: 77 additions & 82 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
Expand All @@ -48,14 +49,16 @@ type (
// Bean is a collection of clients
Bean interface {
GetHistoryClient() historyservice.HistoryServiceClient
SetHistoryClient(client historyservice.HistoryServiceClient)
GetMatchingClient(namespaceIDToName NamespaceIDToNameFunc) (matchingservice.MatchingServiceClient, error)
SetMatchingClient(client matchingservice.MatchingServiceClient)
GetFrontendClient() workflowservice.WorkflowServiceClient
SetFrontendClient(client workflowservice.WorkflowServiceClient)
GetRemoteAdminClient(cluster string) (adminservice.AdminServiceClient, error)
SetRemoteAdminClient(cluster string, client adminservice.AdminServiceClient)
GetRemoteFrontendClient(cluster string) (workflowservice.WorkflowServiceClient, error)
GetRemoteAdminClient(string) (adminservice.AdminServiceClient, error)
SetRemoteAdminClient(string, adminservice.AdminServiceClient)
GetRemoteFrontendClient(string) (grpc.ClientConnInterface, workflowservice.WorkflowServiceClient, error)
}

frontendClient struct {
connection grpc.ClientConnInterface
workflowservice.WorkflowServiceClient
}

clientBeanImpl struct {
Expand All @@ -68,7 +71,7 @@ type (
adminClientsLock sync.RWMutex
adminClients map[string]adminservice.AdminServiceClient
frontendClientsLock sync.RWMutex
frontendClients map[string]workflowservice.WorkflowServiceClient
frontendClients map[string]frontendClient
}
)

Expand All @@ -81,7 +84,7 @@ func NewClientBean(factory Factory, clusterMetadata cluster.Metadata) (Bean, err
}

adminClients := map[string]adminservice.AdminServiceClient{}
frontendClients := map[string]workflowservice.WorkflowServiceClient{}
frontendClients := map[string]frontendClient{}

currentClusterName := clusterMetadata.GetCurrentClusterName()
// Init local cluster client with membership info
Expand All @@ -92,15 +95,18 @@ func NewClientBean(factory Factory, clusterMetadata cluster.Metadata) (Bean, err
if err != nil {
return nil, err
}
frontendClient, err := factory.NewLocalFrontendClientWithTimeout(
conn, client, err := factory.NewLocalFrontendClientWithTimeout(
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
if err != nil {
return nil, err
}
adminClients[currentClusterName] = adminClient
frontendClients[currentClusterName] = frontendClient
frontendClients[currentClusterName] = frontendClient{
connection: conn,
WorkflowServiceClient: client,
}

for clusterName, info := range clusterMetadata.GetAllClusterInfo() {
if !info.Enabled || clusterName == currentClusterName {
Expand All @@ -111,13 +117,16 @@ func NewClientBean(factory Factory, clusterMetadata cluster.Metadata) (Bean, err
admin.DefaultTimeout,
admin.DefaultLargeTimeout,
)
frontendClient = factory.NewRemoteFrontendClientWithTimeout(
conn, client = factory.NewRemoteFrontendClientWithTimeout(
info.RPCAddress,
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
adminClients[clusterName] = adminClient
frontendClients[clusterName] = frontendClient
frontendClients[clusterName] = frontendClient{
connection: conn,
WorkflowServiceClient: client,
}
}

bean := &clientBeanImpl{
Expand Down Expand Up @@ -154,64 +163,49 @@ func (h *clientBeanImpl) GetHistoryClient() historyservice.HistoryServiceClient
return h.historyClient
}

func (h *clientBeanImpl) SetHistoryClient(
client historyservice.HistoryServiceClient,
) {
h.historyClient = client
}

func (h *clientBeanImpl) GetMatchingClient(namespaceIDToName NamespaceIDToNameFunc) (matchingservice.MatchingServiceClient, error) {
if client := h.matchingClient.Load(); client != nil {
return client.(matchingservice.MatchingServiceClient), nil
}
return h.lazyInitMatchingClient(namespaceIDToName)
}

func (h *clientBeanImpl) SetMatchingClient(
client matchingservice.MatchingServiceClient,
) {
h.matchingClient.Store(client)
}

func (h *clientBeanImpl) GetFrontendClient() workflowservice.WorkflowServiceClient {
return h.frontendClients[h.clusterMetadata.GetCurrentClusterName()]
}

func (h *clientBeanImpl) SetFrontendClient(
client workflowservice.WorkflowServiceClient,
) {
h.frontendClients[h.clusterMetadata.GetCurrentClusterName()] = client
}

func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) (adminservice.AdminServiceClient, error) {
h.adminClientsLock.RLock()
client, ok := h.adminClients[cluster]
h.adminClientsLock.RUnlock()
if ok {
return client, nil
}

if !ok {
clusterInfo, clusterFound := h.clusterMetadata.GetAllClusterInfo()[cluster]
if !clusterFound {
return nil, &serviceerror.NotFound{
Message: fmt.Sprintf(
"Unknown cluster name: %v with given cluster information map: %v.",
cluster,
clusterInfo,
),
}
clusterInfo, clusterFound := h.clusterMetadata.GetAllClusterInfo()[cluster]
if !clusterFound {
return nil, &serviceerror.NotFound{
Message: fmt.Sprintf(
"Unknown cluster name: %v with given cluster information map: %v.",
cluster,
clusterInfo,
),
}
}

h.adminClientsLock.Lock()
defer h.adminClientsLock.Unlock()
client, ok = h.adminClients[cluster]
if !ok {
client = h.factory.NewRemoteAdminClientWithTimeout(
clusterInfo.RPCAddress,
admin.DefaultTimeout,
admin.DefaultLargeTimeout,
)
h.setRemoteAdminClientLocked(cluster, client)
}
h.adminClientsLock.Lock()
defer h.adminClientsLock.Unlock()
client, ok = h.adminClients[cluster]
if ok {
return client, nil
}

client = h.factory.NewRemoteAdminClientWithTimeout(
clusterInfo.RPCAddress,
admin.DefaultTimeout,
admin.DefaultLargeTimeout,
)
h.adminClients[cluster] = client
return client, nil
}

Expand All @@ -222,46 +216,47 @@ func (h *clientBeanImpl) SetRemoteAdminClient(
h.adminClientsLock.Lock()
defer h.adminClientsLock.Unlock()

h.setRemoteAdminClientLocked(cluster, client)
h.adminClients[cluster] = client
}

func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) (workflowservice.WorkflowServiceClient, error) {
func (h *clientBeanImpl) GetRemoteFrontendClient(clusterName string) (grpc.ClientConnInterface, workflowservice.WorkflowServiceClient, error) {
h.frontendClientsLock.RLock()
client, ok := h.frontendClients[cluster]
client, ok := h.frontendClients[clusterName]
h.frontendClientsLock.RUnlock()
if ok {
return client.connection, client, nil
}

if !ok {
clusterInfo, clusterFound := h.clusterMetadata.GetAllClusterInfo()[cluster]
if !clusterFound {
return nil, &serviceerror.NotFound{
Message: fmt.Sprintf(
"Unknown cluster name: %v with given cluster information map: %v.",
cluster,
clusterInfo,
),
}
clusterInfo, clusterFound := h.clusterMetadata.GetAllClusterInfo()[clusterName]
if !clusterFound {
return nil, nil, &serviceerror.NotFound{
Message: fmt.Sprintf(
"Unknown clusterName name: %v with given clusterName information map: %v.",
clusterName,
clusterInfo,
),
}
}

h.frontendClientsLock.Lock()
defer h.frontendClientsLock.Unlock()
client, ok = h.frontendClients[cluster]
if !ok {
client = h.factory.NewRemoteFrontendClientWithTimeout(
clusterInfo.RPCAddress,
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
h.setRemoteFrontendClientLocked(cluster, client)
}
h.frontendClientsLock.Lock()
defer h.frontendClientsLock.Unlock()

client, ok = h.frontendClients[clusterName]
if ok {
return client.connection, client, nil
}
return client, nil
}

func (h *clientBeanImpl) setRemoteFrontendClientLocked(
cluster string,
client workflowservice.WorkflowServiceClient,
) {
h.frontendClients[cluster] = client
conn, fClient := h.factory.NewRemoteFrontendClientWithTimeout(
clusterInfo.RPCAddress,
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
client = frontendClient{
connection: conn,
WorkflowServiceClient: fClient,
}
h.frontendClients[clusterName] = client
return client.connection, client, nil
}

func (h *clientBeanImpl) setRemoteAdminClientLocked(
Expand Down
68 changes: 17 additions & 51 deletions client/clientBean_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ca5be29

Please sign in to comment.