Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read cluster shard count from DB #3788

Merged
merged 2 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type (
InitialFailoverVersion int64 `yaml:"initialFailoverVersion"`
// Address indicate the remote service address(Host:Port). Host can be DNS name.
RPCAddress string `yaml:"rpcAddress"`
ShardCount int32 `yaml:"-"` // Ignore this field when loading config.
// private field to track cluster information updates
version int64
}
Expand Down Expand Up @@ -400,6 +401,7 @@ func (m *metadataImpl) RegisterMetadataChangeCallback(callbackId any, cb Callbac
Enabled: clusterInfo.Enabled,
InitialFailoverVersion: clusterInfo.InitialFailoverVersion,
RPCAddress: clusterInfo.RPCAddress,
ShardCount: clusterInfo.ShardCount,
version: clusterInfo.version,
}
}
Expand Down Expand Up @@ -453,6 +455,7 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
Enabled: newClusterInfo.Enabled,
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
RPCAddress: newClusterInfo.RPCAddress,
ShardCount: newClusterInfo.ShardCount,
version: newClusterInfo.version,
}
} else if newClusterInfo.version > oldClusterInfo.version {
Expand All @@ -467,12 +470,14 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
Enabled: oldClusterInfo.Enabled,
InitialFailoverVersion: oldClusterInfo.InitialFailoverVersion,
RPCAddress: oldClusterInfo.RPCAddress,
ShardCount: oldClusterInfo.ShardCount,
version: oldClusterInfo.version,
}
newEntries[clusterName] = &ClusterInformation{
Enabled: newClusterInfo.Enabled,
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
RPCAddress: newClusterInfo.RPCAddress,
ShardCount: newClusterInfo.ShardCount,
version: newClusterInfo.version,
}
}
Expand Down Expand Up @@ -576,6 +581,7 @@ func (m *metadataImpl) listAllClusterMetadataFromDB(
Enabled: getClusterResp.GetIsConnectionEnabled(),
InitialFailoverVersion: getClusterResp.GetInitialFailoverVersion(),
RPCAddress: getClusterResp.GetClusterAddress(),
ShardCount: getClusterResp.GetHistoryShardCount(),
version: getClusterResp.Version,
}
}
Expand Down
6 changes: 6 additions & 0 deletions common/cluster/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ func (s *metadataSuite) SetupTest() {
Enabled: true,
InitialFailoverVersion: int64(1),
RPCAddress: uuid.New(),
ShardCount: 1,
version: 1,
},
s.secondClusterName: {
Enabled: true,
InitialFailoverVersion: int64(4),
RPCAddress: uuid.New(),
ShardCount: 2,
version: 1,
},
}
Expand Down Expand Up @@ -174,6 +176,7 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
ClusterName: s.clusterName,
IsConnectionEnabled: true,
InitialFailoverVersion: 1,
HistoryShardCount: 1,
ClusterAddress: uuid.New(),
},
Version: 1,
Expand All @@ -183,6 +186,7 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
ClusterName: id,
IsConnectionEnabled: true,
InitialFailoverVersion: 2,
HistoryShardCount: 2,
ClusterAddress: uuid.New(),
},
Version: 2,
Expand All @@ -207,6 +211,7 @@ func (s *metadataSuite) Test_ListAllClusterMetadataFromDB_Success() {
ClusterName: s.clusterName,
IsConnectionEnabled: true,
InitialFailoverVersion: 1,
HistoryShardCount: 1,
ClusterAddress: uuid.New(),
},
Version: 1,
Expand All @@ -225,6 +230,7 @@ func (s *metadataSuite) Test_ListAllClusterMetadataFromDB_Success() {
ClusterName: newClusterName,
IsConnectionEnabled: true,
InitialFailoverVersion: 2,
HistoryShardCount: 2,
ClusterAddress: uuid.New(),
},
Version: 2,
Expand Down
6 changes: 6 additions & 0 deletions temporal/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,16 @@ func loadClusterInformationFromStore(ctx context.Context, config *config.Config,
return err
}
metadata := item.(*persistence.GetClusterMetadataResponse)
shardCount := metadata.HistoryShardCount
if shardCount == 0 {
// This is to add backward compatibility to the config based cluster connection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correct this only applies to ancient versions of Temporal Server?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recent versions should always have this stored in DB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If the clusters are using tctl to connect, then it should have the shard count. If the clusters connection is the legacy config-base, the shard count could be 0. We don't want to support the config base connection so I just assume the clusters shard count must be equal if they are using config base connection.

shardCount = config.Persistence.NumHistoryShards
}
newMetadata := cluster.ClusterInformation{
Enabled: metadata.IsConnectionEnabled,
InitialFailoverVersion: metadata.InitialFailoverVersion,
RPCAddress: metadata.ClusterAddress,
ShardCount: shardCount,
}
if staticClusterMetadata, ok := config.ClusterMetadata.ClusterInformation[metadata.ClusterName]; ok {
if metadata.ClusterName != config.ClusterMetadata.CurrentClusterName {
Expand Down