From 3cbab42bc9922d7fb01295f29ca77fe44f1b318a Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Thu, 11 May 2023 14:38:00 -0700 Subject: [PATCH] Fix partial namespace creation (#4267) * Fix partial namespace creation * Check return result * Fix some bugs * Return Unavailable Error * Check duplicate name condition * Update id value --- .../persistence/cassandra/metadata_store.go | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/common/persistence/cassandra/metadata_store.go b/common/persistence/cassandra/metadata_store.go index eda95b6176e..76d19eb24be 100644 --- a/common/persistence/cassandra/metadata_store.go +++ b/common/persistence/cassandra/metadata_store.go @@ -125,15 +125,22 @@ func (m *MetadataStore) CreateNamespace( ctx context.Context, request *p.InternalCreateNamespaceRequest, ) (*p.CreateNamespaceResponse, error) { + query := m.session.Query(templateCreateNamespaceQuery, request.ID, request.Name).WithContext(ctx) - applied, err := query.MapScanCAS(make(map[string]interface{})) + existingRow := make(map[string]interface{}) + applied, err := query.MapScanCAS(existingRow) if err != nil { return nil, serviceerror.NewUnavailable(fmt.Sprintf("CreateNamespace operation failed. Inserting into namespaces table. Error: %v", err)) } + if !applied { - return nil, serviceerror.NewNamespaceAlreadyExists("CreateNamespace operation failed because of uuid collision.") + // if the id with the same name exists in `namespaces_by_id`, fall through and either add a row in `namespaces` table + // or fail if name exists in that table already. This is to make sure we do not end up with a row in `namespaces_by_id` + // table and no entry in `namespaces` table + if name, ok := existingRow["name"]; !ok || name != request.Name { + return nil, serviceerror.NewNamespaceAlreadyExists("CreateNamespace operation failed because of uuid collision.") + } } - return m.CreateNamespaceInV2Table(ctx, request) } @@ -161,23 +168,40 @@ func (m *MetadataStore) CreateNamespaceInV2Table( previous := make(map[string]interface{}) applied, iter, err := m.session.MapExecuteBatchCAS(batch, previous) + deleteOrphanNamespace := func() { + // Delete namespace from `namespaces_by_id` + if errDelete := m.session.Query(templateDeleteNamespaceQuery, request.ID).WithContext(ctx).Exec(); errDelete != nil { + m.logger.Warn("Unable to delete orphan namespace record. Error", tag.Error(errDelete)) + } + } + if err != nil { return nil, serviceerror.NewUnavailable(fmt.Sprintf("CreateNamespace operation failed. Inserting into namespaces table. Error: %v", err)) } + defer func() { _ = iter.Close() }() if !applied { - // Namespace already exist. Delete orphan namespace record before returning back to user - if errDelete := m.session.Query(templateDeleteNamespaceQuery, request.ID).WithContext(ctx).Exec(); errDelete != nil { - m.logger.Warn("Unable to delete orphan namespace record. Error", tag.Error(errDelete)) - } - if id, ok := previous["Id"].([]byte); ok { - msg := fmt.Sprintf("Namespace already exists. NamespaceId: %v", primitives.UUIDString(id)) + // if conditional failure is due to a duplicate name in namespaces table + if name, ok := previous["name"]; ok && name == request.Name { + existingID := request.ID + if id, ok := previous["id"]; ok && gocql.UUIDToString(id) != request.ID { + existingID = gocql.UUIDToString(id) + // Delete orphan namespace record before returning back to user + deleteOrphanNamespace() + } + + msg := fmt.Sprintf("Namespace already exists. NamespaceId: %v", existingID) return nil, serviceerror.NewNamespaceAlreadyExists(msg) + } - return nil, serviceerror.NewNamespaceAlreadyExists("CreateNamespace operation failed because of conditional failure.") + // If namespace does not exist already and applied is false, + // notification_version does not match our expectations and it's conditional failure. + // Delete orphan namespace record before returning back to user + deleteOrphanNamespace() + return nil, serviceerror.NewUnavailable("CreateNamespace operation failed because of conditional failure.") } return &p.CreateNamespaceResponse{ID: request.ID}, nil