Skip to content

Commit

Permalink
Implement Nexus incoming service operator APIs (temporalio#5621)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Added implementations for Operator service CRUD APIs for Nexus incoming
services
Some details:
* Create, Update, and Delete requests are forwarded to matching
* Read (Get and List) requests are sent directly to persistence
* Frontend does not do any caching of services

Depends on temporalio#5616

## Why?
<!-- Tell your future self why have you made these changes -->
So users can interact with Nexus incoming services

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
New functional tests
  • Loading branch information
pdoerner authored Mar 29, 2024
1 parent 4c78ceb commit 65b0fc8
Show file tree
Hide file tree
Showing 11 changed files with 1,042 additions and 108 deletions.
17 changes: 13 additions & 4 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,22 @@ const (
ReachabilityQuerySetDurationSinceDefault = "frontend.reachabilityQuerySetDurationSinceDefault"
// TaskQueuesPerBuildIdLimit limits the number of task queue names that can be mapped to a single build id.
TaskQueuesPerBuildIdLimit = "limit.taskQueuesPerBuildId"
// NexusOutgoingServiceURLMaxLength is the maximum length of an outgoing service URL

// NexusIncomingServiceNameMaxLength is the maximum length of a Nexus incoming service name.
NexusIncomingServiceNameMaxLength = "limit.incomingServiceNameMaxLength"
// NexusIncomingServiceMaxSize is the maximum size of a Nexus incoming service in bytes.
NexusIncomingServiceMaxSize = "limit.incomingServiceMaxSize"
// NexusIncomingServiceListDefaultPageSize is the default page size for listing Nexus incoming services.
NexusIncomingServiceListDefaultPageSize = "limit.incomingServiceListDefaultPageSize"
// NexusIncomingServiceListMaxPageSize is the maximum page size for listing Nexus incoming services.
NexusIncomingServiceListMaxPageSize = "limit.incomingServiceListMaxPageSize"
// NexusOutgoingServiceURLMaxLength is the maximum length of an outgoing service URL.
NexusOutgoingServiceURLMaxLength = "limit.outgoingServiceURLMaxLength"
// NexusOutgoingServiceNameMaxLength is the maximum length of an outgoing service name
// NexusOutgoingServiceNameMaxLength is the maximum length of an outgoing service name.
NexusOutgoingServiceNameMaxLength = "limit.outgoingServiceNameMaxLength"
// NexusOutgoingServiceListDefaultPageSize is the default page size for listing outgoing services
// NexusOutgoingServiceListDefaultPageSize is the default page size for listing outgoing services.
NexusOutgoingServiceListDefaultPageSize = "limit.outgoingServiceListDefaultPageSize"
// NexusOutgoingServiceListMaxPageSize is the maximum page size for listing outgoing services
// NexusOutgoingServiceListMaxPageSize is the maximum page size for listing outgoing services.
NexusOutgoingServiceListMaxPageSize = "limit.outgoingServiceListMaxPageSize"

// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its containing
Expand Down
48 changes: 48 additions & 0 deletions common/nexus/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package nexus

import (
"go.temporal.io/api/nexus/v1"
"google.golang.org/protobuf/types/known/timestamppb"

persistencepb "go.temporal.io/server/api/persistence/v1"
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
)

func IncomingServicePersistedEntryToExternalAPI(entry *persistencepb.NexusIncomingServiceEntry) *nexus.IncomingService {
var lastModifiedTime *timestamppb.Timestamp
// Only set last modified if there were modifications as stated in the UI contract.
if entry.Version > 1 {
lastModifiedTime = timestamppb.New(hlc.UTC(entry.Service.Clock))
}

return &nexus.IncomingService{
Version: entry.Version,
Id: entry.Id,
Spec: entry.Service.Spec,
CreatedTime: entry.Service.CreatedTime,
LastModifiedTime: lastModifiedTime,
UrlPrefix: "/" + Routes().DispatchNexusTaskByService.Path(entry.Id),
}
}
9 changes: 6 additions & 3 deletions common/persistence/cassandra/nexus_incoming_service_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *NexusIncomingServiceStore) ListNexusIncomingServices(
ctx context.Context,
request *p.ListNexusIncomingServicesRequest,
) (*p.InternalListNexusIncomingServicesResponse, error) {
if request.LastKnownTableVersion == 0 {
if request.LastKnownTableVersion == 0 && request.NextPageToken == nil {
return s.listFirstPageWithVersion(ctx, request)
}

Expand Down Expand Up @@ -230,8 +230,11 @@ func (s *NexusIncomingServiceStore) ListNexusIncomingServices(

response.TableVersion = currentTableVersion

if request.LastKnownTableVersion != currentTableVersion {
// If table has been updated during pagination, throw error to indicate caller must start over
if request.LastKnownTableVersion != 0 && request.LastKnownTableVersion != currentTableVersion {
// If request.LastKnownTableVersion == 0 then caller does not care about checking whether they have the most
// current view while paginating.
// Otherwise, if there is a version mismatch, then the table has been updated during pagination, and throw
// error to indicate caller must start over.
return nil, fmt.Errorf("%w. provided table version: %v current table version: %v",
p.ErrNexusTableVersionConflict,
request.LastKnownTableVersion,
Expand Down
23 changes: 22 additions & 1 deletion service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (
"net"

"github.com/gorilla/mux"
"go.temporal.io/server/common/nexus"
"go.uber.org/fx"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/keepalive"

"go.temporal.io/server/common/nexus"

"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -113,6 +114,7 @@ var Module = fx.Options(
fx.Provide(OpenAPIHTTPHandlerProvider),
fx.Provide(HTTPAPIServerProvider),
fx.Provide(NewServiceProvider),
fx.Provide(IncomingServiceClientProvider),
fx.Provide(OutgoingServiceRegistryProvider),
fx.Invoke(ServiceLifetimeHooks),
)
Expand Down Expand Up @@ -590,6 +592,7 @@ func OperatorHandlerProvider(
clusterMetadataManager persistence.ClusterMetadataManager,
clusterMetadata cluster.Metadata,
clientFactory client.Factory,
incomingServiceClient *NexusIncomingServiceClient,
outgoingServiceRegistry *nexus.OutgoingServiceRegistry,
) *OperatorHandlerImpl {
args := NewOperatorHandlerImplArgs{
Expand All @@ -605,6 +608,7 @@ func OperatorHandlerProvider(
clusterMetadataManager,
clusterMetadata,
clientFactory,
incomingServiceClient,
outgoingServiceRegistry,
}
return NewOperatorHandlerImpl(args)
Expand Down Expand Up @@ -743,6 +747,23 @@ func HTTPAPIServerProvider(
)
}

func IncomingServiceClientProvider(
dc *dynamicconfig.Collection,
namespaceRegistry namespace.Registry,
matchingClient resource.MatchingClient,
incomingServiceManager persistence.NexusIncomingServiceManager,
logger log.Logger,
) *NexusIncomingServiceClient {
clientConfig := newNexusIncomingServiceClientConfig(dc)
return newNexusIncomingServiceClient(
clientConfig,
namespaceRegistry,
matchingClient,
incomingServiceManager,
logger,
)
}

func OutgoingServiceRegistryProvider(
metadataManager persistence.MetadataManager,
dc *dynamicconfig.Collection,
Expand Down
Loading

0 comments on commit 65b0fc8

Please sign in to comment.