From 84c40458f1ab9cde8c75947990038b11573938cc Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Fri, 24 Mar 2023 16:58:51 -0700 Subject: [PATCH] Change update namespace to upsert custom search attributes --- service/frontend/adminHandler.go | 57 ++++++++++++++------- service/frontend/errors.go | 1 + service/frontend/fx.go | 2 - service/frontend/namespace_handler.go | 31 +++++++++++- service/frontend/operator_handler.go | 60 +++++++++++++++-------- service/frontend/operator_handler_test.go | 1 - 6 files changed, 110 insertions(+), 42 deletions(-) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index e5bfee0591e..af306df3308 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -344,17 +344,21 @@ func (adh *AdminHandler) addSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } customSearchAttributes := currentSearchAttributes.Custom() - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases + aliasToFieldMap := util.InverseMap(fieldToAliasMap) for saName, saType := range request.GetSearchAttributes() { // check if alias is already in use - if _, err := mapper.GetFieldName(saName, nsName); err == nil { + if _, ok := aliasToFieldMap[saName]; ok { return serviceerror.NewAlreadyExist( fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName), ) @@ -377,15 +381,18 @@ func (adh *AdminHandler) addSearchAttributesSQL( fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()), ) } - fieldToAliasMap[targetFieldName] = saName + upsertFieldToAliasMap[targetFieldName] = saName } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) + if err.Error() == errCustomSearchAttributeFieldAlreadyAllocated.Error() { + return errRaceConditionAddingSearchAttributes + } return err } @@ -472,25 +479,28 @@ func (adh *AdminHandler) removeSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases) for _, saName := range request.GetSearchAttributes() { - fieldName, err := mapper.GetFieldName(saName, nsName) - if err != nil { + fieldName, ok := aliasToFieldMap[saName] + if !ok { return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName)) } - delete(fieldToAliasMap, fieldName) + upsertFieldToAliasMap[fieldName] = "" } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) return err @@ -525,7 +535,7 @@ func (adh *AdminHandler) GetSearchAttributes( if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes) } - return adh.getSearchAttributesSQL(request, searchAttributes) + return adh.getSearchAttributesSQL(ctx, request, searchAttributes) } func (adh *AdminHandler) getSearchAttributesElasticsearch( @@ -569,23 +579,36 @@ func (adh *AdminHandler) getSearchAttributesElasticsearch( } func (adh *AdminHandler) getSearchAttributesSQL( + ctx context.Context, request *adminservice.GetSearchAttributesRequest, searchAttributes searchattribute.NameTypeMap, ) (*adminservice.GetSearchAttributesResponse, error) { + _, client, err := adh.clientFactory.NewLocalFrontendClientWithTimeout( + frontend.DefaultTimeout, + frontend.DefaultLongPollTimeout, + ) + if err != nil { + return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err)) + } + nsName := request.GetNamespace() if nsName == "" { return nil, errNamespaceNotSet } - ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return nil, serviceerror.NewUnavailable( fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName), ) } - mapper := ns.CustomSearchAttributesMapper() + + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases customSearchAttributes := make(map[string]enumspb.IndexedValueType) for field, tp := range searchAttributes.Custom() { - if alias, err := mapper.GetAlias(field, nsName); err == nil { + if alias, ok := fieldToAliasMap[field]; ok { customSearchAttributes[alias] = tp } } diff --git a/service/frontend/errors.go b/service/frontend/errors.go index 4fe52d951fb..150884324ae 100644 --- a/service/frontend/errors.go +++ b/service/frontend/errors.go @@ -80,6 +80,7 @@ var ( errBatchOperationNotSet = serviceerror.NewInvalidArgument("Batch operation is not set on request.") errCronAndStartDelaySet = serviceerror.NewInvalidArgument("CronSchedule and WorkflowStartDelay may not be used together.") errInvalidWorkflowStartDelaySeconds = serviceerror.NewInvalidArgument("An invalid WorkflowStartDelaySeconds is set on request.") + errRaceConditionAddingSearchAttributes = serviceerror.NewUnavailable("Generated search attributes mapping unavailble.") errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.") errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.") diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 6dc04f2c941..b1330462f26 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -507,7 +507,6 @@ func OperatorHandlerProvider( saManager searchattribute.Manager, healthServer *health.Server, historyClient historyservice.HistoryServiceClient, - namespaceRegistry namespace.Registry, clusterMetadataManager persistence.ClusterMetadataManager, clusterMetadata cluster.Metadata, clientFactory client.Factory, @@ -523,7 +522,6 @@ func OperatorHandlerProvider( saManager, healthServer, historyClient, - namespaceRegistry, clusterMetadataManager, clusterMetadata, clientFactory, diff --git a/service/frontend/namespace_handler.go b/service/frontend/namespace_handler.go index e6e4febe795..ab78899dd84 100644 --- a/service/frontend/namespace_handler.go +++ b/service/frontend/namespace_handler.go @@ -52,6 +52,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/util" ) type ( @@ -107,6 +108,8 @@ var ( errCannotDoNamespaceFailoverAndUpdate = serviceerror.NewInvalidArgument("Cannot set active cluster to current cluster when other parameters are set.") errInvalidRetentionPeriod = serviceerror.NewInvalidArgument("A valid retention period is not set on request.") errInvalidNamespaceStateUpdate = serviceerror.NewInvalidArgument("Invalid namespace state update.") + + errCustomSearchAttributeFieldAlreadyAllocated = serviceerror.NewInvalidArgument("Custom search attribute field name already allocated.") ) // newNamespaceHandler create a new namespace handler @@ -512,9 +515,16 @@ func (d *namespaceHandlerImpl) UpdateNamespace( return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Total resetBinaries cannot exceed the max limit: %v", maxLength)) } } - if updatedConfig.CustomSearchAttributeAliases != nil { + if len(updatedConfig.CustomSearchAttributeAliases) > 0 { configurationChanged = true - config.CustomSearchAttributeAliases = updatedConfig.CustomSearchAttributeAliases + csaAliases, err := d.upsertCustomSearchAttributesAliases( + config.CustomSearchAttributeAliases, + updatedConfig.CustomSearchAttributeAliases, + ) + if err != nil { + return nil, err + } + config.CustomSearchAttributeAliases = csaAliases } } @@ -778,6 +788,23 @@ func (d *namespaceHandlerImpl) mergeNamespaceData( return old } +func (d *namespaceHandlerImpl) upsertCustomSearchAttributesAliases( + current map[string]string, + upsert map[string]string, +) (map[string]string, error) { + result := util.CloneMapNonNil(current) + for key, value := range upsert { + if value == "" { + delete(result, key) + } else if _, ok := current[key]; !ok { + result[key] = value + } else { + return nil, errCustomSearchAttributeFieldAlreadyAllocated + } + } + return result, nil +} + func (d *namespaceHandlerImpl) toArchivalRegisterEvent( state enumspb.ArchivalState, URI string, diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index 8e532b0bcce..8297a2cdafd 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -84,7 +84,6 @@ type ( saManager searchattribute.Manager healthServer *health.Server historyClient historyservice.HistoryServiceClient - namespaceRegistry namespace.Registry clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory @@ -101,7 +100,6 @@ type ( SaManager searchattribute.Manager healthServer *health.Server historyClient historyservice.HistoryServiceClient - namespaceRegistry namespace.Registry clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory @@ -125,7 +123,6 @@ func NewOperatorHandlerImpl( saManager: args.SaManager, healthServer: args.healthServer, historyClient: args.historyClient, - namespaceRegistry: args.namespaceRegistry, clusterMetadataManager: args.clusterMetadataManager, clusterMetadata: args.clusterMetadata, clientFactory: args.clientFactory, @@ -280,18 +277,22 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } dbCustomSearchAttributes := searchattribute.GetSqlDbIndexSearchAttributes().CustomSearchAttributes cmCustomSearchAttributes := currentSearchAttributes.Custom() - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases + aliasToFieldMap := util.InverseMap(fieldToAliasMap) for saName, saType := range request.GetSearchAttributes() { // check if alias is already in use - if _, err := mapper.GetFieldName(saName, nsName); err == nil { + if _, ok := aliasToFieldMap[saName]; ok { return serviceerror.NewAlreadyExist( fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName), ) @@ -318,15 +319,18 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL( fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()), ) } - fieldToAliasMap[targetFieldName] = saName + upsertFieldToAliasMap[targetFieldName] = saName } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) + if err.Error() == errCustomSearchAttributeFieldAlreadyAllocated.Error() { + return errRaceConditionAddingSearchAttributes + } return err } @@ -410,25 +414,28 @@ func (h *OperatorHandlerImpl) removeSearchAttributesSQL( if nsName == "" { return errNamespaceNotSet } - ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName)) } - mapper := ns.CustomSearchAttributesMapper() - fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap()) + upsertFieldToAliasMap := make(map[string]string) + aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases) for _, saName := range request.GetSearchAttributes() { - fieldName, err := mapper.GetFieldName(saName, nsName) - if err != nil { + fieldName, ok := aliasToFieldMap[saName] + if !ok { return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName)) } - delete(fieldToAliasMap, fieldName) + upsertFieldToAliasMap[fieldName] = "" } _, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ Namespace: nsName, Config: &namespacepb.NamespaceConfig{ - CustomSearchAttributeAliases: fieldToAliasMap, + CustomSearchAttributeAliases: upsertFieldToAliasMap, }, }) return err @@ -460,7 +467,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes( if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes) } - return h.listSearchAttributesSQL(request, searchAttributes) + return h.listSearchAttributesSQL(ctx, request, searchAttributes) } func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch( @@ -486,23 +493,36 @@ func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch( } func (h *OperatorHandlerImpl) listSearchAttributesSQL( + ctx context.Context, request *operatorservice.ListSearchAttributesRequest, searchAttributes searchattribute.NameTypeMap, ) (*operatorservice.ListSearchAttributesResponse, error) { + _, client, err := h.clientFactory.NewLocalFrontendClientWithTimeout( + frontend.DefaultTimeout, + frontend.DefaultLongPollTimeout, + ) + if err != nil { + return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err)) + } + nsName := request.GetNamespace() if nsName == "" { return nil, errNamespaceNotSet } - ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName)) + resp, err := client.DescribeNamespace( + ctx, + &workflowservice.DescribeNamespaceRequest{Namespace: nsName}, + ) if err != nil { return nil, serviceerror.NewUnavailable( fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName), ) } - mapper := ns.CustomSearchAttributesMapper() + + fieldToAliasMap := resp.Config.CustomSearchAttributeAliases customSearchAttributes := make(map[string]enumspb.IndexedValueType) for field, tp := range searchAttributes.Custom() { - if alias, err := mapper.GetAlias(field, nsName); err == nil { + if alias, ok := fieldToAliasMap[field]; ok { customSearchAttributes[alias] = tp } } diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index ab2befec294..ddcc90190c9 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -88,7 +88,6 @@ func (s *operatorHandlerSuite) SetupTest() { s.mockResource.GetSearchAttributesManager(), health.NewServer(), s.mockResource.GetHistoryClient(), - s.mockResource.GetNamespaceRegistry(), s.mockResource.GetClusterMetadataManager(), s.mockResource.GetClusterMetadata(), s.mockResource.GetClientFactory(),