Skip to content

Commit

Permalink
Change update namespace to upsert custom search attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Mar 22, 2023
1 parent 2af5bf4 commit 4892c3e
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 42 deletions.
54 changes: 37 additions & 17 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,17 +342,21 @@ func (adh *AdminHandler) addSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

customSearchAttributes := currentSearchAttributes.Custom()
mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
aliasToFieldMap := util.InverseMap(fieldToAliasMap)
for saName, saType := range request.GetSearchAttributes() {
// check if alias is already in use
if _, err := mapper.GetFieldName(saName, nsName); err == nil {
if _, ok := aliasToFieldMap[saName]; ok {
return serviceerror.NewAlreadyExist(
fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName),
)
Expand All @@ -375,13 +379,13 @@ func (adh *AdminHandler) addSearchAttributesSQL(
fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()),
)
}
fieldToAliasMap[targetFieldName] = saName
upsertFieldToAliasMap[targetFieldName] = saName
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
return err
Expand Down Expand Up @@ -470,25 +474,28 @@ func (adh *AdminHandler) removeSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
for _, saName := range request.GetSearchAttributes() {
fieldName, err := mapper.GetFieldName(saName, nsName)
if err != nil {
fieldName, ok := aliasToFieldMap[saName]
if !ok {
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
}
delete(fieldToAliasMap, fieldName)
upsertFieldToAliasMap[fieldName] = ""
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
return err
Expand Down Expand Up @@ -523,7 +530,7 @@ func (adh *AdminHandler) GetSearchAttributes(
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return adh.getSearchAttributesSQL(request, searchAttributes)
return adh.getSearchAttributesSQL(ctx, request, searchAttributes)
}

func (adh *AdminHandler) getSearchAttributesElasticsearch(
Expand Down Expand Up @@ -567,23 +574,36 @@ func (adh *AdminHandler) getSearchAttributesElasticsearch(
}

func (adh *AdminHandler) getSearchAttributesSQL(
ctx context.Context,
request *adminservice.GetSearchAttributesRequest,
searchAttributes searchattribute.NameTypeMap,
) (*adminservice.GetSearchAttributesResponse, error) {
_, client, err := adh.clientFactory.NewLocalFrontendClientWithTimeout(
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err))
}

nsName := request.GetNamespace()
if nsName == "" {
return nil, errNamespaceNotSet
}
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return nil, serviceerror.NewUnavailable(
fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName),
)
}
mapper := ns.CustomSearchAttributesMapper()

fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
customSearchAttributes := make(map[string]enumspb.IndexedValueType)
for field, tp := range searchAttributes.Custom() {
if alias, err := mapper.GetAlias(field, nsName); err == nil {
if alias, ok := fieldToAliasMap[field]; ok {
customSearchAttributes[alias] = tp
}
}
Expand Down
2 changes: 0 additions & 2 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ func OperatorHandlerProvider(
saManager searchattribute.Manager,
healthServer *health.Server,
historyClient historyservice.HistoryServiceClient,
namespaceRegistry namespace.Registry,
clusterMetadataManager persistence.ClusterMetadataManager,
clusterMetadata cluster.Metadata,
clientFactory client.Factory,
Expand All @@ -523,7 +522,6 @@ func OperatorHandlerProvider(
saManager,
healthServer,
historyClient,
namespaceRegistry,
clusterMetadataManager,
clusterMetadata,
clientFactory,
Expand Down
20 changes: 18 additions & 2 deletions service/frontend/namespace_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/util"
)

type (
Expand Down Expand Up @@ -512,9 +513,24 @@ func (d *namespaceHandlerImpl) UpdateNamespace(
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Total resetBinaries cannot exceed the max limit: %v", maxLength))
}
}
if updatedConfig.CustomSearchAttributeAliases != nil {
if len(updatedConfig.CustomSearchAttributeAliases) > 0 {
configurationChanged = true
config.CustomSearchAttributeAliases = updatedConfig.CustomSearchAttributeAliases
customSearchAttributeAliases := util.CloneMapNonNil(config.CustomSearchAttributeAliases)
for saFieldName, saAlias := range updatedConfig.CustomSearchAttributeAliases {
if saAlias == "" {
delete(config.CustomSearchAttributeAliases, saFieldName)
} else if _, ok := customSearchAttributeAliases[saFieldName]; !ok {
customSearchAttributeAliases[saFieldName] = saAlias
} else {
return nil, serviceerror.NewInvalidArgument(
fmt.Sprintf(
"Custom search attribute field name already allocated. Please retry.",
saFieldName,
),
)
}
}
config.CustomSearchAttributeAliases = customSearchAttributeAliases
}
}

Expand Down
57 changes: 37 additions & 20 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type (
saManager searchattribute.Manager
healthServer *health.Server
historyClient historyservice.HistoryServiceClient
namespaceRegistry namespace.Registry
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata clustermetadata.Metadata
clientFactory svc.Factory
Expand All @@ -101,7 +100,6 @@ type (
SaManager searchattribute.Manager
healthServer *health.Server
historyClient historyservice.HistoryServiceClient
namespaceRegistry namespace.Registry
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata clustermetadata.Metadata
clientFactory svc.Factory
Expand All @@ -125,7 +123,6 @@ func NewOperatorHandlerImpl(
saManager: args.SaManager,
healthServer: args.healthServer,
historyClient: args.historyClient,
namespaceRegistry: args.namespaceRegistry,
clusterMetadataManager: args.clusterMetadataManager,
clusterMetadata: args.clusterMetadata,
clientFactory: args.clientFactory,
Expand Down Expand Up @@ -280,18 +277,22 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

dbCustomSearchAttributes := searchattribute.GetSqlDbIndexSearchAttributes().CustomSearchAttributes
cmCustomSearchAttributes := currentSearchAttributes.Custom()
mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
aliasToFieldMap := util.InverseMap(fieldToAliasMap)
for saName, saType := range request.GetSearchAttributes() {
// check if alias is already in use
if _, err := mapper.GetFieldName(saName, nsName); err == nil {
if _, ok := aliasToFieldMap[saName]; ok {
return serviceerror.NewAlreadyExist(
fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName),
)
Expand All @@ -318,13 +319,13 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL(
fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()),
)
}
fieldToAliasMap[targetFieldName] = saName
upsertFieldToAliasMap[targetFieldName] = saName
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
return err
Expand Down Expand Up @@ -410,25 +411,28 @@ func (h *OperatorHandlerImpl) removeSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
for _, saName := range request.GetSearchAttributes() {
fieldName, err := mapper.GetFieldName(saName, nsName)
if err != nil {
fieldName, ok := aliasToFieldMap[saName]
if !ok {
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
}
delete(fieldToAliasMap, fieldName)
upsertFieldToAliasMap[fieldName] = ""
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
return err
Expand Down Expand Up @@ -460,7 +464,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return h.listSearchAttributesSQL(request, searchAttributes)
return h.listSearchAttributesSQL(ctx, request, searchAttributes)
}

func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch(
Expand All @@ -486,23 +490,36 @@ func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch(
}

func (h *OperatorHandlerImpl) listSearchAttributesSQL(
ctx context.Context,
request *operatorservice.ListSearchAttributesRequest,
searchAttributes searchattribute.NameTypeMap,
) (*operatorservice.ListSearchAttributesResponse, error) {
_, client, err := h.clientFactory.NewLocalFrontendClientWithTimeout(
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err))
}

nsName := request.GetNamespace()
if nsName == "" {
return nil, errNamespaceNotSet
}
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return nil, serviceerror.NewUnavailable(
fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName),
)
}
mapper := ns.CustomSearchAttributesMapper()

fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
customSearchAttributes := make(map[string]enumspb.IndexedValueType)
for field, tp := range searchAttributes.Custom() {
if alias, err := mapper.GetAlias(field, nsName); err == nil {
if alias, ok := fieldToAliasMap[field]; ok {
customSearchAttributes[alias] = tp
}
}
Expand Down
1 change: 0 additions & 1 deletion service/frontend/operator_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (s *operatorHandlerSuite) SetupTest() {
s.mockResource.GetSearchAttributesManager(),
health.NewServer(),
s.mockResource.GetHistoryClient(),
s.mockResource.GetNamespaceRegistry(),
s.mockResource.GetClusterMetadataManager(),
s.mockResource.GetClusterMetadata(),
s.mockResource.GetClientFactory(),
Expand Down

0 comments on commit 4892c3e

Please sign in to comment.