From 4892c3e922daa2259524163bfef77bf52a1e2a7f Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Wed, 22 Mar 2023 16:00:12 -0700 Subject: [PATCH] Change update namespace to upsert custom search attributes --- service/frontend/adminHandler.go | 54 ++++++++++++++------- service/frontend/fx.go | 2 - service/frontend/namespace_handler.go | 20 +++++++- service/frontend/operator_handler.go | 57 +++++++++++++++-------- service/frontend/operator_handler_test.go | 1 - 5 files changed, 92 insertions(+), 42 deletions(-) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 14827c0f2c82..325bbd8111ad 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -342,17 +342,21 @@ func (adh *AdminHandler) addSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } customSearchAttributes := currentSearchAttributes.Custom() - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases + aliasToFieldMap := util.InverseMap(fieldToAliasMap) for saName, saType := range request.GetSearchAttributes() { // check if alias is already in use - if _, err := mapper.GetFieldName(saName, nsName); err == nil { + if _, ok := aliasToFieldMap[saName]; ok { return serviceerror.NewAlreadyExist( fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName), ) @@ -375,13 +379,13 @@ func (adh *AdminHandler) addSearchAttributesSQL( fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()), ) } - fieldToAliasMap[targetFieldName] = saName + upsertFieldToAliasMap[targetFieldName] = saName } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) return err @@ -470,25 +474,28 @@ func (adh *AdminHandler) removeSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases) for _, saName := range request.GetSearchAttributes() { - fieldName, err := mapper.GetFieldName(saName, nsName) - if err != nil { + fieldName, ok := aliasToFieldMap[saName] + if !ok { return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName)) } - delete(fieldToAliasMap, fieldName) + upsertFieldToAliasMap[fieldName] = "" } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) return err @@ -523,7 +530,7 @@ func (adh *AdminHandler) GetSearchAttributes( if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes) } - return adh.getSearchAttributesSQL(request, searchAttributes) + return adh.getSearchAttributesSQL(ctx, request, searchAttributes) } func (adh *AdminHandler) getSearchAttributesElasticsearch( @@ -567,23 +574,36 @@ func (adh *AdminHandler) getSearchAttributesElasticsearch( } func (adh *AdminHandler) getSearchAttributesSQL( + ctx context.Context, request *adminservice.GetSearchAttributesRequest, searchAttributes searchattribute.NameTypeMap, ) (*adminservice.GetSearchAttributesResponse, error) { + _, client, err := adh.clientFactory.NewLocalFrontendClientWithTimeout( + frontend.DefaultTimeout, + frontend.DefaultLongPollTimeout, + ) + if err != nil { + return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err)) + } + nsName := request.GetNamespace() if nsName == "" { return nil, errNamespaceNotSet } - ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return nil, serviceerror.NewUnavailable( fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName), ) } - mapper := ns.CustomSearchAttributesMapper() + + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases customSearchAttributes := make(map[string]enumspb.IndexedValueType) for field, tp := range searchAttributes.Custom() { - if alias, err := mapper.GetAlias(field, nsName); err == nil { + if alias, ok := fieldToAliasMap[field]; ok { customSearchAttributes[alias] = tp } } diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 6dc04f2c9410..b1330462f268 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -507,7 +507,6 @@ func OperatorHandlerProvider( saManager searchattribute.Manager, healthServer *health.Server, historyClient historyservice.HistoryServiceClient, - namespaceRegistry namespace.Registry, clusterMetadataManager persistence.ClusterMetadataManager, clusterMetadata cluster.Metadata, clientFactory client.Factory, @@ -523,7 +522,6 @@ func OperatorHandlerProvider( saManager, healthServer, historyClient, - namespaceRegistry, clusterMetadataManager, clusterMetadata, clientFactory, diff --git a/service/frontend/namespace_handler.go b/service/frontend/namespace_handler.go index e6e4febe795d..1ce1762348fe 100644 --- a/service/frontend/namespace_handler.go +++ b/service/frontend/namespace_handler.go @@ -52,6 +52,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/util" ) type ( @@ -512,9 +513,24 @@ func (d *namespaceHandlerImpl) UpdateNamespace( return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Total resetBinaries cannot exceed the max limit: %v", maxLength)) } } - if updatedConfig.CustomSearchAttributeAliases != nil { + if len(updatedConfig.CustomSearchAttributeAliases) > 0 { configurationChanged = true - config.CustomSearchAttributeAliases = updatedConfig.CustomSearchAttributeAliases + customSearchAttributeAliases := util.CloneMapNonNil(config.CustomSearchAttributeAliases) + for saFieldName, saAlias := range updatedConfig.CustomSearchAttributeAliases { + if saAlias == "" { + delete(config.CustomSearchAttributeAliases, saFieldName) + } else if _, ok := customSearchAttributeAliases[saFieldName]; !ok { + customSearchAttributeAliases[saFieldName] = saAlias + } else { + return nil, serviceerror.NewInvalidArgument( + fmt.Sprintf( + "Custom search attribute field name already allocated. Please retry.", + saFieldName, + ), + ) + } + } + config.CustomSearchAttributeAliases = customSearchAttributeAliases } } diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index 8e532b0bcce6..8c65a949e1b8 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -84,7 +84,6 @@ type ( saManager searchattribute.Manager healthServer *health.Server historyClient historyservice.HistoryServiceClient - namespaceRegistry namespace.Registry clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory @@ -101,7 +100,6 @@ type ( SaManager searchattribute.Manager healthServer *health.Server historyClient historyservice.HistoryServiceClient - namespaceRegistry namespace.Registry clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory @@ -125,7 +123,6 @@ func NewOperatorHandlerImpl( saManager: args.SaManager, healthServer: args.healthServer, historyClient: args.historyClient, - namespaceRegistry: args.namespaceRegistry, clusterMetadataManager: args.clusterMetadataManager, clusterMetadata: args.clusterMetadata, clientFactory: args.clientFactory, @@ -280,18 +277,22 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } dbCustomSearchAttributes := searchattribute.GetSqlDbIndexSearchAttributes().CustomSearchAttributes cmCustomSearchAttributes := currentSearchAttributes.Custom() - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases + aliasToFieldMap := util.InverseMap(fieldToAliasMap) for saName, saType := range request.GetSearchAttributes() { // check if alias is already in use - if _, err := mapper.GetFieldName(saName, nsName); err == nil { + if _, ok := aliasToFieldMap[saName]; ok { return serviceerror.NewAlreadyExist( fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName), ) @@ -318,13 +319,13 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL( fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()), ) } - fieldToAliasMap[targetFieldName] = saName + upsertFieldToAliasMap[targetFieldName] = saName } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) return err @@ -410,25 +411,28 @@ func (h *OperatorHandlerImpl) removeSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases) for _, saName := range request.GetSearchAttributes() { - fieldName, err := mapper.GetFieldName(saName, nsName) - if err != nil { + fieldName, ok := aliasToFieldMap[saName] + if !ok { return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName)) } - delete(fieldToAliasMap, fieldName) + upsertFieldToAliasMap[fieldName] = "" } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) return err @@ -460,7 +464,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes( if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes) } - return h.listSearchAttributesSQL(request, searchAttributes) + return h.listSearchAttributesSQL(ctx, request, searchAttributes) } func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch( @@ -486,23 +490,36 @@ func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch( } func (h *OperatorHandlerImpl) listSearchAttributesSQL( + ctx context.Context, request *operatorservice.ListSearchAttributesRequest, searchAttributes searchattribute.NameTypeMap, ) (*operatorservice.ListSearchAttributesResponse, error) { + _, client, err := h.clientFactory.NewLocalFrontendClientWithTimeout( + frontend.DefaultTimeout, + frontend.DefaultLongPollTimeout, + ) + if err != nil { + return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err)) + } + nsName := request.GetNamespace() if nsName == "" { return nil, errNamespaceNotSet } - ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return nil, serviceerror.NewUnavailable( fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName), ) } - mapper := ns.CustomSearchAttributesMapper() + + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases customSearchAttributes := make(map[string]enumspb.IndexedValueType) for field, tp := range searchAttributes.Custom() { - if alias, err := mapper.GetAlias(field, nsName); err == nil { + if alias, ok := fieldToAliasMap[field]; ok { customSearchAttributes[alias] = tp } } diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index ab2befec294f..ddcc90190c91 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -88,7 +88,6 @@ func (s *operatorHandlerSuite) SetupTest() { s.mockResource.GetSearchAttributesManager(), health.NewServer(), s.mockResource.GetHistoryClient(), - s.mockResource.GetNamespaceRegistry(), s.mockResource.GetClusterMetadataManager(), s.mockResource.GetClusterMetadata(), s.mockResource.GetClientFactory(),