diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 1d11e9c15e8..8c97c3ee197 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -208,13 +208,22 @@ const ( ReachabilityQuerySetDurationSinceDefault = "frontend.reachabilityQuerySetDurationSinceDefault" // TaskQueuesPerBuildIdLimit limits the number of task queue names that can be mapped to a single build id. TaskQueuesPerBuildIdLimit = "limit.taskQueuesPerBuildId" - // NexusOutgoingServiceURLMaxLength is the maximum length of an outgoing service URL + + // NexusIncomingServiceNameMaxLength is the maximum length of a Nexus incoming service name. + NexusIncomingServiceNameMaxLength = "limit.incomingServiceNameMaxLength" + // NexusIncomingServiceMaxSize is the maximum size of a Nexus incoming service in bytes. + NexusIncomingServiceMaxSize = "limit.incomingServiceMaxSize" + // NexusIncomingServiceListDefaultPageSize is the default page size for listing Nexus incoming services. + NexusIncomingServiceListDefaultPageSize = "limit.incomingServiceListDefaultPageSize" + // NexusIncomingServiceListMaxPageSize is the maximum page size for listing Nexus incoming services. + NexusIncomingServiceListMaxPageSize = "limit.incomingServiceListMaxPageSize" + // NexusOutgoingServiceURLMaxLength is the maximum length of an outgoing service URL. NexusOutgoingServiceURLMaxLength = "limit.outgoingServiceURLMaxLength" - // NexusOutgoingServiceNameMaxLength is the maximum length of an outgoing service name + // NexusOutgoingServiceNameMaxLength is the maximum length of an outgoing service name. NexusOutgoingServiceNameMaxLength = "limit.outgoingServiceNameMaxLength" - // NexusOutgoingServiceListDefaultPageSize is the default page size for listing outgoing services + // NexusOutgoingServiceListDefaultPageSize is the default page size for listing outgoing services. NexusOutgoingServiceListDefaultPageSize = "limit.outgoingServiceListDefaultPageSize" - // NexusOutgoingServiceListMaxPageSize is the maximum page size for listing outgoing services + // NexusOutgoingServiceListMaxPageSize is the maximum page size for listing outgoing services. NexusOutgoingServiceListMaxPageSize = "limit.outgoingServiceListMaxPageSize" // RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its containing diff --git a/common/nexus/util.go b/common/nexus/util.go new file mode 100644 index 00000000000..0c3588b8e94 --- /dev/null +++ b/common/nexus/util.go @@ -0,0 +1,48 @@ +// The MIT License +// +// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package nexus + +import ( + "go.temporal.io/api/nexus/v1" + "google.golang.org/protobuf/types/known/timestamppb" + + persistencepb "go.temporal.io/server/api/persistence/v1" + hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" +) + +func IncomingServicePersistedEntryToExternalAPI(entry *persistencepb.NexusIncomingServiceEntry) *nexus.IncomingService { + var lastModifiedTime *timestamppb.Timestamp + // Only set last modified if there were modifications as stated in the UI contract. + if entry.Version > 1 { + lastModifiedTime = timestamppb.New(hlc.UTC(entry.Service.Clock)) + } + + return &nexus.IncomingService{ + Version: entry.Version, + Id: entry.Id, + Spec: entry.Service.Spec, + CreatedTime: entry.Service.CreatedTime, + LastModifiedTime: lastModifiedTime, + UrlPrefix: "/" + Routes().DispatchNexusTaskByService.Path(entry.Id), + } +} diff --git a/common/persistence/cassandra/nexus_incoming_service_store.go b/common/persistence/cassandra/nexus_incoming_service_store.go index 9248c2931ab..c2f20b99745 100644 --- a/common/persistence/cassandra/nexus_incoming_service_store.go +++ b/common/persistence/cassandra/nexus_incoming_service_store.go @@ -200,7 +200,7 @@ func (s *NexusIncomingServiceStore) ListNexusIncomingServices( ctx context.Context, request *p.ListNexusIncomingServicesRequest, ) (*p.InternalListNexusIncomingServicesResponse, error) { - if request.LastKnownTableVersion == 0 { + if request.LastKnownTableVersion == 0 && request.NextPageToken == nil { return s.listFirstPageWithVersion(ctx, request) } @@ -230,8 +230,11 @@ func (s *NexusIncomingServiceStore) ListNexusIncomingServices( response.TableVersion = currentTableVersion - if request.LastKnownTableVersion != currentTableVersion { - // If table has been updated during pagination, throw error to indicate caller must start over + if request.LastKnownTableVersion != 0 && request.LastKnownTableVersion != currentTableVersion { + // If request.LastKnownTableVersion == 0 then caller does not care about checking whether they have the most + // current view while paginating. + // Otherwise, if there is a version mismatch, then the table has been updated during pagination, and throw + // error to indicate caller must start over. return nil, fmt.Errorf("%w. provided table version: %v current table version: %v", p.ErrNexusTableVersionConflict, request.LastKnownTableVersion, diff --git a/service/frontend/fx.go b/service/frontend/fx.go index c2c8c14fd2a..c2f6839f481 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -29,12 +29,13 @@ import ( "net" "github.com/gorilla/mux" - "go.temporal.io/server/common/nexus" "go.uber.org/fx" "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/keepalive" + "go.temporal.io/server/common/nexus" + "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" @@ -113,6 +114,7 @@ var Module = fx.Options( fx.Provide(OpenAPIHTTPHandlerProvider), fx.Provide(HTTPAPIServerProvider), fx.Provide(NewServiceProvider), + fx.Provide(IncomingServiceClientProvider), fx.Provide(OutgoingServiceRegistryProvider), fx.Invoke(ServiceLifetimeHooks), ) @@ -590,6 +592,7 @@ func OperatorHandlerProvider( clusterMetadataManager persistence.ClusterMetadataManager, clusterMetadata cluster.Metadata, clientFactory client.Factory, + incomingServiceClient *NexusIncomingServiceClient, outgoingServiceRegistry *nexus.OutgoingServiceRegistry, ) *OperatorHandlerImpl { args := NewOperatorHandlerImplArgs{ @@ -605,6 +608,7 @@ func OperatorHandlerProvider( clusterMetadataManager, clusterMetadata, clientFactory, + incomingServiceClient, outgoingServiceRegistry, } return NewOperatorHandlerImpl(args) @@ -743,6 +747,23 @@ func HTTPAPIServerProvider( ) } +func IncomingServiceClientProvider( + dc *dynamicconfig.Collection, + namespaceRegistry namespace.Registry, + matchingClient resource.MatchingClient, + incomingServiceManager persistence.NexusIncomingServiceManager, + logger log.Logger, +) *NexusIncomingServiceClient { + clientConfig := newNexusIncomingServiceClientConfig(dc) + return newNexusIncomingServiceClient( + clientConfig, + namespaceRegistry, + matchingClient, + incomingServiceManager, + logger, + ) +} + func OutgoingServiceRegistryProvider( metadataManager persistence.MetadataManager, dc *dynamicconfig.Collection, diff --git a/service/frontend/nexus_incoming_service_client.go b/service/frontend/nexus_incoming_service_client.go new file mode 100644 index 00000000000..4c1024967bb --- /dev/null +++ b/service/frontend/nexus_incoming_service_client.go @@ -0,0 +1,336 @@ +// The MIT License +// +// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + "errors" + "fmt" + "regexp" + + "github.com/google/uuid" + "go.temporal.io/api/nexus/v1" + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/namespace" + cnexus "go.temporal.io/server/common/nexus" + p "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/rpc" +) + +// ServiceNameRegex is the regular expression that incoming service names must match. +var ServiceNameRegex = regexp.MustCompile(`[a-zA-Z_][a-zA-Z0-9_]*`) + +type ( + // NexusIncomingServiceClient manages frontend CRUD requests for Nexus incoming services. + // Create, Update, and Delete requests are forwarded to matching service which owns the incoming services table. + // Read (Get and List) requests are sent directly to persistence. This is to ensure read-after-write consistency. + NexusIncomingServiceClient struct { + config *nexusIncomingServiceClientConfig + + namespaceRegistry namespace.Registry // used to validate referenced namespaces exist + matchingClient matchingservice.MatchingServiceClient + persistence p.NexusIncomingServiceManager + + logger log.Logger + } + + nexusIncomingServiceClientConfig struct { + maxNameLength dynamicconfig.IntPropertyFn + maxTaskQueueLength dynamicconfig.IntPropertyFn + maxSize dynamicconfig.IntPropertyFn + listDefaultPageSize dynamicconfig.IntPropertyFn + listMaxPageSize dynamicconfig.IntPropertyFn + } +) + +func newNexusIncomingServiceClientConfig(dc *dynamicconfig.Collection) *nexusIncomingServiceClientConfig { + return &nexusIncomingServiceClientConfig{ + maxNameLength: dc.GetIntProperty(dynamicconfig.NexusIncomingServiceNameMaxLength, 200), + maxTaskQueueLength: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), + maxSize: dc.GetIntProperty(dynamicconfig.NexusIncomingServiceMaxSize, 4*1024), + listDefaultPageSize: dc.GetIntProperty(dynamicconfig.NexusIncomingServiceListDefaultPageSize, 100), + listMaxPageSize: dc.GetIntProperty(dynamicconfig.NexusIncomingServiceListMaxPageSize, 1000), + } +} + +func newNexusIncomingServiceClient( + config *nexusIncomingServiceClientConfig, + namespaceRegistry namespace.Registry, + matchingClient matchingservice.MatchingServiceClient, + persistence p.NexusIncomingServiceManager, + logger log.Logger, +) *NexusIncomingServiceClient { + return &NexusIncomingServiceClient{ + config: config, + namespaceRegistry: namespaceRegistry, + matchingClient: matchingClient, + persistence: persistence, + logger: logger, + } +} + +func (c *NexusIncomingServiceClient) Create( + ctx context.Context, + request *operatorservice.CreateNexusIncomingServiceRequest, +) (*operatorservice.CreateNexusIncomingServiceResponse, error) { + if err := c.validateUpsertSpec(request.GetSpec()); err != nil { + return nil, err + } + + resp, err := c.matchingClient.CreateNexusIncomingService(ctx, &matchingservice.CreateNexusIncomingServiceRequest{ + Spec: request.Spec, + }) + if err != nil { + return nil, err + } + + return &operatorservice.CreateNexusIncomingServiceResponse{ + Service: resp.GetService(), + }, nil +} + +func (c *NexusIncomingServiceClient) Update( + ctx context.Context, + request *operatorservice.UpdateNexusIncomingServiceRequest, +) (*operatorservice.UpdateNexusIncomingServiceResponse, error) { + if err := c.validateUpsertSpec(request.GetSpec()); err != nil { + return nil, err + } + + resp, err := c.matchingClient.UpdateNexusIncomingService(ctx, &matchingservice.UpdateNexusIncomingServiceRequest{ + Id: request.Id, + Version: request.Version, + Spec: request.Spec, + }) + if err != nil { + return nil, err + } + + return &operatorservice.UpdateNexusIncomingServiceResponse{ + Service: resp.Service, + }, nil +} + +func (c *NexusIncomingServiceClient) Delete( + ctx context.Context, + request *operatorservice.DeleteNexusIncomingServiceRequest, +) (*operatorservice.DeleteNexusIncomingServiceResponse, error) { + if err := validateDeleteRequest(request); err != nil { + return nil, err + } + + _, err := c.matchingClient.DeleteNexusIncomingService(ctx, &matchingservice.DeleteNexusIncomingServiceRequest{ + Id: request.Id, + }) + if err != nil { + return nil, err + } + + return &operatorservice.DeleteNexusIncomingServiceResponse{}, nil +} + +func (c *NexusIncomingServiceClient) Get( + ctx context.Context, + request *operatorservice.GetNexusIncomingServiceRequest, +) (*operatorservice.GetNexusIncomingServiceResponse, error) { + if err := validateGetRequest(request); err != nil { + return nil, err + } + + entry, err := c.persistence.GetNexusIncomingService(ctx, &p.GetNexusIncomingServiceRequest{ + ServiceID: request.Id, + }) + if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + return nil, err + } + c.logger.Error(fmt.Sprintf("error looking up Nexus incoming service with ID `%v` from persistence", request.Id), tag.Error(err)) + return nil, serviceerror.NewInternal(fmt.Sprintf("error looking up Nexus incoming service with ID `%v`", request.Id)) + } + + return &operatorservice.GetNexusIncomingServiceResponse{ + Service: cnexus.IncomingServicePersistedEntryToExternalAPI(entry), + }, nil +} + +func (c *NexusIncomingServiceClient) List( + ctx context.Context, + request *operatorservice.ListNexusIncomingServicesRequest, +) (*operatorservice.ListNexusIncomingServicesResponse, error) { + if request.GetName() != "" { + return c.listAndFilterByName(ctx, request) + } + + pageSize := request.GetPageSize() + if pageSize == 0 { + pageSize = int32(c.config.listDefaultPageSize()) + } else if err := c.validatePageSize(pageSize); err != nil { + return nil, err + } + + resp, err := c.persistence.ListNexusIncomingServices(ctx, &p.ListNexusIncomingServicesRequest{ + LastKnownTableVersion: 0, + NextPageToken: request.NextPageToken, + PageSize: int(pageSize), + }) + if err != nil { + c.logger.Error(fmt.Sprintf("error listing Nexus incoming services from persistence. NextPageToken: %v PageSize: %d", request.NextPageToken, pageSize), tag.Error(err)) + return nil, serviceerror.NewInternal("error listing Nexus incoming services") + } + + services := make([]*nexus.IncomingService, len(resp.Entries)) + for i, entry := range resp.Entries { + services[i] = cnexus.IncomingServicePersistedEntryToExternalAPI(entry) + } + + return &operatorservice.ListNexusIncomingServicesResponse{ + NextPageToken: resp.NextPageToken, + Services: services, + }, nil +} + +// listAndFilterByName paginates over all services returned by persistence layer to find the service name +// indicated in the request. Returns that service if found or an empty response if not. +// PageSize and NextPageToken fields on the request are ignored. +func (c *NexusIncomingServiceClient) listAndFilterByName( + ctx context.Context, + request *operatorservice.ListNexusIncomingServicesRequest, +) (*operatorservice.ListNexusIncomingServicesResponse, error) { + result := &operatorservice.ListNexusIncomingServicesResponse{Services: []*nexus.IncomingService{}} + pageSize := c.config.listDefaultPageSize() + var currentPageToken []byte + + for ctx.Err() == nil { + resp, err := c.persistence.ListNexusIncomingServices(ctx, &p.ListNexusIncomingServicesRequest{ + LastKnownTableVersion: 0, + NextPageToken: currentPageToken, + PageSize: pageSize, + }) + if err != nil { + c.logger.Error(fmt.Sprintf("error listing Nexus incoming services from persistence with Name filter. CurrentPageToken: %v PageSize: %d ServiceName: %v", currentPageToken, pageSize, request.Name), tag.Error(err)) + return nil, serviceerror.NewInternal("error listing Nexus incoming services") + } + + for _, entry := range resp.Entries { + if request.Name == entry.Service.Spec.Name { + result.Services = []*nexus.IncomingService{cnexus.IncomingServicePersistedEntryToExternalAPI(entry)} + return result, nil + } + } + + if resp.NextPageToken == nil { + return result, nil + } + + currentPageToken = resp.NextPageToken + } + + return nil, ctx.Err() +} + +func (c *NexusIncomingServiceClient) getServiceNameIssues(name string) rpc.RequestIssues { + var issues rpc.RequestIssues + + if name == "" { + issues.Append("incoming service name not set") + return issues + } + + maxNameLength := c.config.maxNameLength() + if len(name) > maxNameLength { + issues.Appendf("incoming service name exceeds length limit of %d", maxNameLength) + } + + if !ServiceNameRegex.MatchString(name) { + issues.Appendf("incoming service name must match the regex: %q", ServiceNameRegex.String()) + } + + return issues +} + +func (c *NexusIncomingServiceClient) validateUpsertSpec(spec *nexus.IncomingServiceSpec) error { + issues := c.getServiceNameIssues(spec.GetName()) + + if spec.Namespace == "" { + issues.Append("incoming service namespace not set") + } else if _, nsErr := c.namespaceRegistry.GetNamespace(namespace.Name(spec.Namespace)); nsErr != nil { + return serviceerror.NewFailedPrecondition(fmt.Sprintf("could not verify namespace referenced by incoming service exists: %v", nsErr.Error())) + } + + if err := validateTaskQueueName(spec.GetTaskQueue(), c.config.maxTaskQueueLength()); err != nil { + issues.Appendf("invalid incoming service task queue: %q", err.Error()) + } + + maxSize := c.config.maxSize() + if spec.Size() > maxSize { + issues.Appendf("incoming service size exceeds limit of %d", maxSize) + } + + return issues.GetError() +} + +func getServiceIDIssues(ID string) rpc.RequestIssues { + var issues rpc.RequestIssues + if ID == "" { + issues.Append("incoming service ID not set") + } else if _, err := uuid.Parse(ID); err != nil { + issues.Appendf("malformed incoming service ID: %q", err.Error()) + } + return issues +} + +func validateDeleteRequest(request *operatorservice.DeleteNexusIncomingServiceRequest) error { + issues := getServiceIDIssues(request.GetId()) + + if request.GetVersion() <= 0 { + issues.Append("incoming service version is non-positive") + } + + return issues.GetError() +} + +func validateGetRequest(request *operatorservice.GetNexusIncomingServiceRequest) error { + issues := getServiceIDIssues(request.GetId()) + return issues.GetError() +} + +func (c *NexusIncomingServiceClient) validatePageSize(pageSize int32) error { + // pageSize == 0 is treated as unset and will be changed to the default and does not go through this validation + if pageSize < 0 { + return serviceerror.NewInvalidArgument("page_size is negative") + } + + maxPageSize := c.config.listMaxPageSize() + if pageSize > int32(maxPageSize) { + return serviceerror.NewInvalidArgument(fmt.Sprintf("page_size exceeds limit of %d", maxPageSize)) + } + + return nil +} diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index 29748071058..b0b1a769a6c 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -29,12 +29,11 @@ import ( "fmt" "sync/atomic" - cnexus "go.temporal.io/server/common/nexus" "golang.org/x/exp/maps" - "google.golang.org/grpc/codes" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" + + cnexus "go.temporal.io/server/common/nexus" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -91,6 +90,7 @@ type ( clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory + incomingServiceClient *NexusIncomingServiceClient outgoingServiceRegistry *cnexus.OutgoingServiceRegistry } @@ -107,6 +107,7 @@ type ( clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory + incomingServiceClient *NexusIncomingServiceClient outgoingServiceRegistry *cnexus.OutgoingServiceRegistry } ) @@ -136,6 +137,7 @@ func NewOperatorHandlerImpl( clusterMetadataManager: args.clusterMetadataManager, clusterMetadata: args.clusterMetadata, clientFactory: args.clientFactory, + incomingServiceClient: args.incomingServiceClient, outgoingServiceRegistry: args.outgoingServiceRegistry, } @@ -829,24 +831,44 @@ func (h *OperatorHandlerImpl) validateRemoteClusterMetadata(metadata *adminservi return nil } -func (*OperatorHandlerImpl) CreateNexusIncomingService(context.Context, *operatorservice.CreateNexusIncomingServiceRequest) (*operatorservice.CreateNexusIncomingServiceResponse, error) { - return nil, status.Error(codes.Unimplemented, "unimplemented") +func (h *OperatorHandlerImpl) CreateNexusIncomingService( + ctx context.Context, + request *operatorservice.CreateNexusIncomingServiceRequest, +) (_ *operatorservice.CreateNexusIncomingServiceResponse, retError error) { + defer log.CapturePanic(h.logger, &retError) + return h.incomingServiceClient.Create(ctx, request) } -func (*OperatorHandlerImpl) UpdateNexusIncomingService(context.Context, *operatorservice.UpdateNexusIncomingServiceRequest) (*operatorservice.UpdateNexusIncomingServiceResponse, error) { - return nil, status.Error(codes.Unimplemented, "unimplemented") +func (h *OperatorHandlerImpl) UpdateNexusIncomingService( + ctx context.Context, + request *operatorservice.UpdateNexusIncomingServiceRequest, +) (_ *operatorservice.UpdateNexusIncomingServiceResponse, retError error) { + defer log.CapturePanic(h.logger, &retError) + return h.incomingServiceClient.Update(ctx, request) } -func (*OperatorHandlerImpl) DeleteNexusIncomingService(context.Context, *operatorservice.DeleteNexusIncomingServiceRequest) (*operatorservice.DeleteNexusIncomingServiceResponse, error) { - return nil, status.Error(codes.Unimplemented, "unimplemented") +func (h *OperatorHandlerImpl) DeleteNexusIncomingService( + ctx context.Context, + request *operatorservice.DeleteNexusIncomingServiceRequest, +) (_ *operatorservice.DeleteNexusIncomingServiceResponse, retError error) { + defer log.CapturePanic(h.logger, &retError) + return h.incomingServiceClient.Delete(ctx, request) } -func (*OperatorHandlerImpl) GetNexusIncomingService(context.Context, *operatorservice.GetNexusIncomingServiceRequest) (*operatorservice.GetNexusIncomingServiceResponse, error) { - return nil, status.Error(codes.Unimplemented, "unimplemented") +func (h *OperatorHandlerImpl) GetNexusIncomingService( + ctx context.Context, + request *operatorservice.GetNexusIncomingServiceRequest, +) (_ *operatorservice.GetNexusIncomingServiceResponse, retError error) { + defer log.CapturePanic(h.logger, &retError) + return h.incomingServiceClient.Get(ctx, request) } -func (*OperatorHandlerImpl) ListNexusIncomingServices(context.Context, *operatorservice.ListNexusIncomingServicesRequest) (*operatorservice.ListNexusIncomingServicesResponse, error) { - return nil, status.Error(codes.Unimplemented, "unimplemented") +func (h *OperatorHandlerImpl) ListNexusIncomingServices( + ctx context.Context, + request *operatorservice.ListNexusIncomingServicesRequest, +) (_ *operatorservice.ListNexusIncomingServicesResponse, retError error) { + defer log.CapturePanic(h.logger, &retError) + return h.incomingServiceClient.List(ctx, request) } func (h *OperatorHandlerImpl) GetNexusOutgoingService( diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index adb868872ce..58d525258df 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -41,10 +41,11 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" - cnexus "go.temporal.io/server/common/nexus" "golang.org/x/exp/maps" "google.golang.org/grpc/health" + cnexus "go.temporal.io/server/common/nexus" + "go.temporal.io/server/api/adminservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/cluster" @@ -91,10 +92,18 @@ func (s *operatorHandlerSuite) SetupTest() { s.mockResource = resourcetest.NewTest(s.controller, primitives.FrontendService) s.mockResource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes() + incomingServiceClient := newNexusIncomingServiceClient( + newNexusIncomingServiceClientConfig(dynamicconfig.NewNoopCollection()), + s.mockResource.NamespaceCache, + s.mockResource.MatchingClient, + persistence.NewMockNexusIncomingServiceManager(s.controller), + s.mockResource.Logger, + ) outgoingServiceRegistry := cnexus.NewOutgoingServiceRegistry( s.mockResource.MetadataMgr, cnexus.NewOutgoingServiceRegistryConfig(dynamicconfig.NewNoopCollection()), ) + args := NewOperatorHandlerImplArgs{ &Config{NumHistoryShards: 4}, s.mockResource.ESClient, @@ -108,6 +117,7 @@ func (s *operatorHandlerSuite) SetupTest() { s.mockResource.GetClusterMetadataManager(), s.mockResource.GetClusterMetadata(), s.mockResource.GetClientFactory(), + incomingServiceClient, outgoingServiceRegistry, } s.handler = NewOperatorHandlerImpl(args) diff --git a/service/frontend/validators.go b/service/frontend/validators.go index 848acb06623..07693672f62 100644 --- a/service/frontend/validators.go +++ b/service/frontend/validators.go @@ -27,6 +27,8 @@ package frontend import ( "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" + + "go.temporal.io/server/common" ) func validateExecution(w *commonpb.WorkflowExecution) error { @@ -41,3 +43,17 @@ func validateExecution(w *commonpb.WorkflowExecution) error { } return nil } + +// validateTaskQueueName does simple verification on a high-level (user-provided) task queue name. +// See also: service/frontend/workflow_handler.go#validateTaskQueue and +// service/history/command_checker.go#validateTaskQueue for more information on task queue validation. +// TODO: standardize task queue validation across the codebase +func validateTaskQueueName(name string, maxLength int) error { + if name == "" { + return errTaskQueueNotSet + } + if len(name) > maxLength { + return errTaskQueueTooLong + } + return common.ValidateUTF8String("TaskQueue", name) +} diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index c348cf245ee..8bcea74d3c3 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4039,15 +4039,13 @@ func (wh *WorkflowHandler) validateSearchAttributes(searchAttributes *commonpb.S } func (wh *WorkflowHandler) validateTaskQueue(t *taskqueuepb.TaskQueue, namespaceName namespace.Name) error { - if t == nil || t.GetName() == "" { + if t == nil { return errTaskQueueNotSet } - if len(t.GetName()) > wh.config.MaxIDLengthLimit() { - return errTaskQueueTooLong - } - if err := common.ValidateUTF8String("TaskQueue", t.GetName()); err != nil { + if err := validateTaskQueueName(t.GetName(), wh.config.MaxIDLengthLimit()); err != nil { return err } + if t.GetKind() == enumspb.TASK_QUEUE_KIND_STICKY { if err := common.ValidateUTF8String("TaskQueue", t.GetNormalName()); err != nil { return err diff --git a/service/matching/incoming_nexus_service_manager.go b/service/matching/incoming_nexus_service_manager.go index d97a5fc30d6..3a6fbc102b0 100644 --- a/service/matching/incoming_nexus_service_manager.go +++ b/service/matching/incoming_nexus_service_manager.go @@ -147,13 +147,7 @@ func (m *incomingNexusServiceManager) CreateNexusIncomingService( close(ch) return &matchingservice.CreateNexusIncomingServiceResponse{ - Service: &nexuspb.IncomingService{ - Version: entry.Version, - Id: entry.Id, - Spec: entry.Service.Spec, - CreatedTime: entry.Service.CreatedTime, - UrlPrefix: "/" + commonnexus.Routes().DispatchNexusTaskByService.Path(entry.Id), - }, + Service: commonnexus.IncomingServicePersistedEntryToExternalAPI(entry), }, nil } @@ -208,14 +202,7 @@ func (m *incomingNexusServiceManager) UpdateNexusIncomingService( close(ch) return &matchingservice.UpdateNexusIncomingServiceResponse{ - Service: &nexuspb.IncomingService{ - Version: entry.Version, - Id: entry.Id, - Spec: entry.Service.Spec, - CreatedTime: entry.Service.CreatedTime, - LastModifiedTime: timestamppb.New(hlc.UTC(entry.Service.Clock)), - UrlPrefix: "/" + commonnexus.Routes().DispatchNexusTaskByService.Path(entry.Id), - }, + Service: commonnexus.IncomingServicePersistedEntryToExternalAPI(entry), }, nil } @@ -325,19 +312,7 @@ func (m *incomingNexusServiceManager) ListNexusIncomingServices( services := make([]*nexuspb.IncomingService, endIdx-startIdx) for i := 0; i < endIdx-startIdx; i++ { entry := m.services[i+startIdx] - var lastModifiedTime *timestamppb.Timestamp - // Only set last modified if there were modifications as stated in the UI contract. - if entry.Version > 1 { - lastModifiedTime = timestamppb.New(hlc.UTC(entry.Service.Clock)) - } - services[i] = &nexuspb.IncomingService{ - Version: entry.Version, - Id: entry.Id, - Spec: entry.Service.Spec, - CreatedTime: entry.Service.CreatedTime, - LastModifiedTime: lastModifiedTime, - UrlPrefix: "/" + commonnexus.Routes().DispatchNexusTaskByService.Path(entry.Id), - } + services[i] = commonnexus.IncomingServicePersistedEntryToExternalAPI(entry) } resp := &matchingservice.ListNexusIncomingServicesResponse{ diff --git a/tests/nexus_incoming_service_test.go b/tests/nexus_incoming_service_test.go index 292c3cced2c..f2334e48ca2 100644 --- a/tests/nexus_incoming_service_test.go +++ b/tests/nexus_incoming_service_test.go @@ -23,17 +23,145 @@ package tests import ( + "fmt" "testing" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/nexus/v1" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "google.golang.org/protobuf/types/known/anypb" "go.temporal.io/server/api/matchingservice/v1" commonnexus "go.temporal.io/server/common/nexus" p "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/testing/protorequire" ) -func (s *FunctionalSuite) TestCreateNexusIncomingService_Matching() { +func TestNexusIncomingServicesFunctionalSuite(t *testing.T) { + t.Run("Common", func(t *testing.T) { + s := new(CommonSuite) + suite.Run(t, s) + }) + t.Run("Matching", func(t *testing.T) { + s := new(MatchingSuite) + suite.Run(t, s) + }) + t.Run("Operator", func(t *testing.T) { + s := new(OperatorSuite) + suite.Run(t, s) + }) +} + +type NexusIncomingServiceFunctionalSuite struct { + FunctionalTestBase + // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, + // not merely log an error + *require.Assertions + protorequire.ProtoAssertions +} + +func (s *NexusIncomingServiceFunctionalSuite) SetupSuite() { + s.setupSuite("testdata/es_cluster.yaml") +} + +func (s *NexusIncomingServiceFunctionalSuite) TearDownSuite() { + s.tearDownSuite() +} + +func (s *NexusIncomingServiceFunctionalSuite) SetupTest() { + // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil + s.Assertions = require.New(s.T()) + s.ProtoAssertions = protorequire.New(s.T()) +} + +type CommonSuite struct { + NexusIncomingServiceFunctionalSuite +} + +func (s *CommonSuite) TestListOrdering() { + // get initial table version since it has been modified by other tests + resp, err := s.testCluster.GetMatchingClient().ListNexusIncomingServices(NewContext(), &matchingservice.ListNexusIncomingServicesRequest{ + LastKnownTableVersion: 0, + PageSize: 0, + }) + s.NoError(err) + initialTableVersion := resp.TableVersion + + // create some services + numServices := 40 // minimum number of services to test, there may be more in DB from other tests + for i := 0; i < numServices; i++ { + s.createNexusIncomingService(s.randomizeStr("test-service-name")) + } + tableVersion := initialTableVersion + int64(numServices) + + // list from persistence manager level + persistence := s.testCluster.testBase.NexusIncomingServiceManager + persistenceResp1, err := persistence.ListNexusIncomingServices(NewContext(), &p.ListNexusIncomingServicesRequest{ + LastKnownTableVersion: tableVersion, + PageSize: numServices / 2, + }) + s.NoError(err) + s.Len(persistenceResp1.Entries, numServices/2) + s.NotNil(persistenceResp1.NextPageToken) + persistenceResp2, err := persistence.ListNexusIncomingServices(NewContext(), &p.ListNexusIncomingServicesRequest{ + LastKnownTableVersion: tableVersion, + PageSize: numServices / 2, + NextPageToken: persistenceResp1.NextPageToken, + }) + s.NoError(err) + s.Len(persistenceResp2.Entries, numServices/2) + + // list from matching level + matchingClient := s.testCluster.GetMatchingClient() + matchingResp1, err := matchingClient.ListNexusIncomingServices(NewContext(), &matchingservice.ListNexusIncomingServicesRequest{ + LastKnownTableVersion: tableVersion, + PageSize: int32(numServices / 2), + }) + s.NoError(err) + s.Len(matchingResp1.Services, numServices/2) + s.NotNil(matchingResp1.NextPageToken) + matchingResp2, err := matchingClient.ListNexusIncomingServices(NewContext(), &matchingservice.ListNexusIncomingServicesRequest{ + LastKnownTableVersion: tableVersion, + PageSize: int32(numServices / 2), + NextPageToken: matchingResp1.NextPageToken, + }) + s.NoError(err) + s.Len(matchingResp2.Services, numServices/2) + + // list from operator level + operatorResp1, err := s.operatorClient.ListNexusIncomingServices(NewContext(), &operatorservice.ListNexusIncomingServicesRequest{ + PageSize: int32(numServices / 2), + }) + s.NoError(err) + s.Len(operatorResp1.Services, numServices/2) + s.NotNil(operatorResp1.NextPageToken) + operatorResp2, err := s.operatorClient.ListNexusIncomingServices(NewContext(), &operatorservice.ListNexusIncomingServicesRequest{ + PageSize: int32(numServices / 2), + NextPageToken: operatorResp1.NextPageToken, + }) + s.NoError(err) + s.Len(operatorResp2.Services, numServices/2) + + // assert list orders match + for i := 0; i < numServices/2; i++ { + s.Equal(persistenceResp1.Entries[i].Id, matchingResp1.Services[i].Id) + s.Equal(persistenceResp2.Entries[i].Id, matchingResp2.Services[i].Id) + + s.Equal(persistenceResp1.Entries[i].Id, operatorResp1.Services[i].Id) + s.Equal(persistenceResp2.Entries[i].Id, operatorResp2.Services[i].Id) + } +} + +type MatchingSuite struct { + NexusIncomingServiceFunctionalSuite +} + +func (s *MatchingSuite) TestCreate() { service := s.createNexusIncomingService(s.T().Name()) s.Equal(int64(1), service.Version) s.Nil(service.LastModifiedTime) @@ -53,7 +181,8 @@ func (s *FunctionalSuite) TestCreateNexusIncomingService_Matching() { var existsErr *serviceerror.AlreadyExists s.ErrorAs(err, &existsErr) } -func (s *FunctionalSuite) TestUpdateNexusIncomingService_Matching() { + +func (s *MatchingSuite) TestUpdate() { service := s.createNexusIncomingService(s.T().Name()) type testcase struct { name string @@ -125,8 +254,8 @@ func (s *FunctionalSuite) TestUpdateNexusIncomingService_Matching() { } } -func (s *FunctionalSuite) TestDeleteNexusIncomingService_Matching() { - service := s.createNexusIncomingService("service-to-delete") +func (s *MatchingSuite) TestDelete() { + service := s.createNexusIncomingService("service-to-delete-matching") type testcase struct { name string serviceId string @@ -164,7 +293,7 @@ func (s *FunctionalSuite) TestDeleteNexusIncomingService_Matching() { } } -func (s *FunctionalSuite) TestListNexusIncomingServices_Matching() { +func (s *MatchingSuite) TestList() { // initialize some services s.createNexusIncomingService("list-test-service0") s.createNexusIncomingService("list-test-service1") @@ -307,64 +436,431 @@ func (s *FunctionalSuite) TestListNexusIncomingServices_Matching() { } } -func (s *FunctionalSuite) TestListNexusIncomingServicesOrdering_Matching() { - // get initial table version since it has been modified by other tests - resp, err := s.testCluster.GetMatchingClient().ListNexusIncomingServices(NewContext(), &matchingservice.ListNexusIncomingServicesRequest{ - LastKnownTableVersion: 0, - PageSize: 0, - }) - s.NoError(err) - initialTableVersion := resp.TableVersion +type OperatorSuite struct { + NexusIncomingServiceFunctionalSuite +} - // create some services - numServices := 40 // minimum number of services to test, there may be more in DB from other tests - for i := 0; i < numServices; i++ { - s.createNexusIncomingService(s.randomizeStr("test-service-name")) +func (s *OperatorSuite) TestCreate() { + type testcase struct { + name string + request *operatorservice.CreateNexusIncomingServiceRequest + assertion func(*operatorservice.CreateNexusIncomingServiceResponse, error) + } + testCases := []testcase{ + { + name: "valid create", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: s.T().Name(), + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.NotNil(resp.Service) + s.Equal(int64(1), resp.Service.Version) + s.Nil(resp.Service.LastModifiedTime) + s.NotNil(resp.Service.CreatedTime) + s.NotEmpty(resp.Service.Id) + s.Equal(resp.Service.Spec.Name, s.T().Name()) + s.Equal(resp.Service.Spec.Namespace, s.namespace) + s.Equal("/"+commonnexus.Routes().DispatchNexusTaskByService.Path(resp.Service.Id), resp.Service.UrlPrefix) + }, + }, + { + name: "invalid: name already in use", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: s.T().Name(), + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + var existsErr *serviceerror.AlreadyExists + s.ErrorAs(err, &existsErr) + }, + }, + { + name: "invalid: name unset", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "incoming service name not set") + }, + }, + { + name: "invalid: name too long", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: string(make([]byte, 300)), + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "incoming service name exceeds length limit") + }, + }, + { + name: "invalid: malformed name", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: "\n```\n", + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "incoming service name must match the regex") + }, + }, + { + name: "invalid: namespace unset", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: s.randomizeStr(s.T().Name()), + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "incoming service namespace not set") + }, + }, + { + name: "invalid: namespace not found", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: s.randomizeStr(s.T().Name()), + Namespace: "missing-namespace", + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + var preCondErr *serviceerror.FailedPrecondition + s.ErrorAs(err, &preCondErr) + }, + }, + { + name: "invalid: task queue unset", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: s.randomizeStr(s.T().Name()), + Namespace: s.namespace, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "TaskQueue is not set on request") + }, + }, + { + name: "invalid: task queue too long", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: s.randomizeStr(s.T().Name()), + Namespace: s.namespace, + TaskQueue: string(make([]byte, 1005)), + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "TaskQueue length exceeds limit") + }, + }, + { + name: "invalid: service too large", + request: &operatorservice.CreateNexusIncomingServiceRequest{ + Spec: &nexus.IncomingServiceSpec{ + Name: s.randomizeStr(s.T().Name()), + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + Metadata: map[string]*anypb.Any{"k1": {Value: make([]byte, 4100)}}, + }, + }, + assertion: func(resp *operatorservice.CreateNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "incoming service size exceeds limit") + }, + }, } - tableVersion := initialTableVersion + int64(numServices) - // list from persistence manager level - persistence := s.testCluster.testBase.NexusIncomingServiceManager - persistenceResp1, err := persistence.ListNexusIncomingServices(NewContext(), &p.ListNexusIncomingServicesRequest{ - LastKnownTableVersion: tableVersion, - PageSize: numServices / 2, - }) - s.NoError(err) - s.Len(persistenceResp1.Entries, numServices/2) - s.NotNil(persistenceResp1.NextPageToken) - persistenceResp2, err := persistence.ListNexusIncomingServices(NewContext(), &p.ListNexusIncomingServicesRequest{ - LastKnownTableVersion: tableVersion, - PageSize: numServices / 2, - NextPageToken: persistenceResp1.NextPageToken, - }) - s.NoError(err) - s.Len(persistenceResp2.Entries, numServices/2) + for _, tc := range testCases { + tc := tc + s.T().Run(tc.name, func(t *testing.T) { + resp, err := s.operatorClient.CreateNexusIncomingService(NewContext(), tc.request) + tc.assertion(resp, err) + }) + } +} - // list from matching level - matchingClient := s.testCluster.GetMatchingClient() - matchingResp1, err := matchingClient.ListNexusIncomingServices(NewContext(), &matchingservice.ListNexusIncomingServicesRequest{ - LastKnownTableVersion: tableVersion, - PageSize: int32(numServices / 2), - }) +func (s *OperatorSuite) TestUpdate() { + service := s.createNexusIncomingService(s.T().Name()) + type testcase struct { + name string + request *operatorservice.UpdateNexusIncomingServiceRequest + assertion func(*operatorservice.UpdateNexusIncomingServiceResponse, error) + } + testCases := []testcase{ + { + name: "valid update", + request: &operatorservice.UpdateNexusIncomingServiceRequest{ + Version: 1, + Id: service.Id, + Spec: &nexus.IncomingServiceSpec{ + Name: "updated name", + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.UpdateNexusIncomingServiceResponse, err error) { + s.NoError(err) + s.NotNil(resp.Service) + s.Equal("/"+commonnexus.Routes().DispatchNexusTaskByService.Path(service.Id), service.UrlPrefix) + s.Equal(int64(2), resp.Service.Version) + s.Equal("updated name", resp.Service.Spec.Name) + s.NotNil(resp.Service.LastModifiedTime) + }, + }, + { + name: "invalid: service not found", + request: &operatorservice.UpdateNexusIncomingServiceRequest{ + Version: 1, + Id: "not-found", + Spec: &nexus.IncomingServiceSpec{ + Name: "updated name", + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.UpdateNexusIncomingServiceResponse, err error) { + var notFoundErr *serviceerror.NotFound + s.ErrorAs(err, ¬FoundErr) + }, + }, + { + name: "invalid: service version mismatch", + request: &operatorservice.UpdateNexusIncomingServiceRequest{ + Version: 1, + Id: service.Id, + Spec: &nexus.IncomingServiceSpec{ + Name: "updated name", + Namespace: s.namespace, + TaskQueue: s.defaultTaskQueue().Name, + }, + }, + assertion: func(resp *operatorservice.UpdateNexusIncomingServiceResponse, err error) { + var fpErr *serviceerror.FailedPrecondition + s.ErrorAs(err, &fpErr) + }, + }, + } + + for _, tc := range testCases { + tc := tc + s.T().Run(tc.name, func(t *testing.T) { + resp, err := s.operatorClient.UpdateNexusIncomingService(NewContext(), tc.request) + tc.assertion(resp, err) + }) + } +} + +func (s *OperatorSuite) TestDelete() { + service := s.createNexusIncomingService("service-to-delete-operator") + type testcase struct { + name string + serviceId string + assertion func(*operatorservice.DeleteNexusIncomingServiceResponse, error) + } + testCases := []testcase{ + { + name: "invalid delete: not found", + serviceId: uuid.NewString(), + assertion: func(resp *operatorservice.DeleteNexusIncomingServiceResponse, err error) { + var notFoundErr *serviceerror.NotFound + s.ErrorAs(err, ¬FoundErr) + }, + }, + { + name: "valid delete", + serviceId: service.Id, + assertion: func(resp *operatorservice.DeleteNexusIncomingServiceResponse, err error) { + s.NoError(err) + }, + }, + } + + for _, tc := range testCases { + tc := tc + s.T().Run(tc.name, func(t *testing.T) { + resp, err := s.operatorClient.DeleteNexusIncomingService( + NewContext(), + &operatorservice.DeleteNexusIncomingServiceRequest{ + Id: tc.serviceId, + Version: 1, + }) + tc.assertion(resp, err) + }) + } +} + +func (s *OperatorSuite) TestList() { + // initialize some services + s.createNexusIncomingService("operator-list-test-service0") + s.createNexusIncomingService("operator-list-test-service1") + serviceToFilter := s.createNexusIncomingService("operator-list-test-service2") + + // get ordered services for the course of the tests + resp, err := s.operatorClient.ListNexusIncomingServices(NewContext(), &operatorservice.ListNexusIncomingServicesRequest{}) s.NoError(err) - s.Len(matchingResp1.Services, numServices/2) - s.NotNil(matchingResp1.NextPageToken) - matchingResp2, err := matchingClient.ListNexusIncomingServices(NewContext(), &matchingservice.ListNexusIncomingServicesRequest{ - LastKnownTableVersion: tableVersion, - PageSize: int32(numServices / 2), - NextPageToken: matchingResp1.NextPageToken, - }) + s.NotNil(resp) + servicesOrdered := resp.Services + + resp, err = s.operatorClient.ListNexusIncomingServices(NewContext(), &operatorservice.ListNexusIncomingServicesRequest{PageSize: 2}) s.NoError(err) - s.Len(matchingResp2.Services, numServices/2) + s.NotNil(resp) + nextPageToken := resp.NextPageToken - // assert list orders match - for i := 0; i < numServices/2; i++ { - s.Equal(persistenceResp1.Entries[i].Id, matchingResp1.Services[i].Id) - s.Equal(persistenceResp2.Entries[i].Id, matchingResp2.Services[i].Id) + type testcase struct { + name string + request *operatorservice.ListNexusIncomingServicesRequest + assertion func(*operatorservice.ListNexusIncomingServicesResponse, error) + } + testCases := []testcase{ + { + name: "list first page", + request: &operatorservice.ListNexusIncomingServicesRequest{ + NextPageToken: nil, + PageSize: 2, + }, + assertion: func(resp *operatorservice.ListNexusIncomingServicesResponse, err error) { + s.NoError(err) + s.Equal(nextPageToken, resp.NextPageToken) + s.ProtoElementsMatch(resp.Services, servicesOrdered[0:2]) + }, + }, + { + name: "list non-first page", + request: &operatorservice.ListNexusIncomingServicesRequest{ + NextPageToken: nextPageToken, + PageSize: 2, + }, + assertion: func(resp *operatorservice.ListNexusIncomingServicesResponse, err error) { + s.NoError(err) + s.ProtoEqual(resp.Services[0], servicesOrdered[2]) + }, + }, + { + name: "list with no page size", + request: &operatorservice.ListNexusIncomingServicesRequest{}, + assertion: func(resp *operatorservice.ListNexusIncomingServicesResponse, err error) { + s.NoError(err) + s.NotEmpty(resp.Services) + }, + }, + { + name: "list with filter found", + request: &operatorservice.ListNexusIncomingServicesRequest{ + NextPageToken: nil, + PageSize: 2, + Name: serviceToFilter.Spec.Name, + }, + assertion: func(resp *operatorservice.ListNexusIncomingServicesResponse, err error) { + s.NoError(err) + s.Nil(resp.NextPageToken) + s.Len(resp.Services, 1) + s.ProtoEqual(resp.Services[0], serviceToFilter) + }, + }, + { + name: "list with filter not found", + request: &operatorservice.ListNexusIncomingServicesRequest{ + NextPageToken: nil, + PageSize: 2, + Name: "missing-service", + }, + assertion: func(resp *operatorservice.ListNexusIncomingServicesResponse, err error) { + s.NoError(err) + s.Nil(resp.NextPageToken) + s.Empty(resp.Services) + }, + }, + { + name: "list with page size too large", + request: &operatorservice.ListNexusIncomingServicesRequest{ + NextPageToken: nil, + PageSize: 1005, + }, + assertion: func(resp *operatorservice.ListNexusIncomingServicesResponse, err error) { + var invalidErr *serviceerror.InvalidArgument + s.ErrorAs(err, &invalidErr) + }, + }, + } + + for _, tc := range testCases { + tc := tc + s.T().Run(tc.name, func(t *testing.T) { + resp, err := s.operatorClient.ListNexusIncomingServices(NewContext(), tc.request) + tc.assertion(resp, err) + }) + } +} + +func (s *OperatorSuite) TestGet() { + service := s.createNexusIncomingService(s.T().Name()) + + type testcase struct { + name string + request *operatorservice.GetNexusIncomingServiceRequest + assertion func(*operatorservice.GetNexusIncomingServiceResponse, error) + } + testCases := []testcase{ + { + name: "valid get", + request: &operatorservice.GetNexusIncomingServiceRequest{ + Id: service.Id, + }, + assertion: func(response *operatorservice.GetNexusIncomingServiceResponse, err error) { + s.NoError(err) + s.ProtoEqual(service, response.Service) + }, + }, + { + name: "invalid: missing service", + request: &operatorservice.GetNexusIncomingServiceRequest{ + Id: uuid.NewString(), + }, + assertion: func(response *operatorservice.GetNexusIncomingServiceResponse, err error) { + var notFoundErr *serviceerror.NotFound + s.ErrorAs(err, ¬FoundErr) + }, + }, + { + name: "invalid: service ID not set", + request: &operatorservice.GetNexusIncomingServiceRequest{}, + assertion: func(response *operatorservice.GetNexusIncomingServiceResponse, err error) { + s.ErrorContains(err, "incoming service ID not set") + }, + }, + } + + for _, tc := range testCases { + tc := tc + s.T().Run(tc.name, func(t *testing.T) { + resp, err := s.operatorClient.GetNexusIncomingService(NewContext(), tc.request) + tc.assertion(resp, err) + }) } } -func (s *FunctionalSuite) createNexusIncomingService(name string) *nexus.IncomingService { +func (s *NexusIncomingServiceFunctionalSuite) defaultTaskQueue() *taskqueuepb.TaskQueue { + name := fmt.Sprintf("functional-queue-%v", s.T().Name()) + return &taskqueuepb.TaskQueue{Name: name, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} +} + +func (s *NexusIncomingServiceFunctionalSuite) createNexusIncomingService(name string) *nexus.IncomingService { resp, err := s.testCluster.GetMatchingClient().CreateNexusIncomingService( NewContext(), &matchingservice.CreateNexusIncomingServiceRequest{