From 0d85aab857233d5cbbeab002f12ea5250126db70 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 11 Dec 2024 17:40:18 -0800 Subject: [PATCH] Freeport 2.0 (#6966) ## What changed? New freeport implementation; taken from battle-tested Temporal projects. ## Why? Follow-up to https://github.com/temporalio/temporal/pull/6915; making it simpler and more robust. ## How did you test it? - [x] works on Linux (ie CI) - [x] works on Mac (ie my computer) [First CI run](https://github.com/temporalio/temporal/actions/runs/12269035797/attempts/1?pr=6966), `TestNexusCallbackReplicated` failed with `Error while dialing: dial tcp 127.0.0.1:33279: connect: connection refused`. Looks like there was an unrelated data race (that I've reported to Slack). [Second CI run](https://github.com/temporalio/temporal/actions/runs/12269035797/attempts/2?pr=6966) passed. [Third CI run](https://github.com/temporalio/temporal/actions/runs/12269035797?pr=6966), Versioning suite seems to have [timed out](https://github.com/temporalio/temporal/actions/runs/12269035797/job/34233452135?pr=6966#step:8:966). Unrelated. So it seems to be fine from a random port perspective. ## Potential risks ## Documentation ## Is hotfix candidate? --- common/nexus/nexustest/server.go | 9 +- components/nexusoperations/executors_test.go | 4 +- internal/freeport/freeport.go | 96 ++++++++++++++++++++ internal/temporalite/freeport.go | 75 --------------- internal/temporalite/lite_server.go | 29 +++--- tests/callbacks_test.go | 10 +- tests/nexus_workflow_test.go | 14 +-- tests/testcore/test_cluster.go | 17 ++-- tests/xdc/nexus_request_forwarding_test.go | 2 +- tests/xdc/nexus_state_replication_test.go | 4 +- 10 files changed, 135 insertions(+), 125 deletions(-) create mode 100644 internal/freeport/freeport.go delete mode 100644 internal/temporalite/freeport.go diff --git a/common/nexus/nexustest/server.go b/common/nexus/nexustest/server.go index 70da19fb86c..7309c95bc01 100644 --- a/common/nexus/nexustest/server.go +++ b/common/nexus/nexustest/server.go @@ -32,14 +32,11 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/require" - "go.temporal.io/server/internal/temporalite" + "go.temporal.io/server/internal/freeport" ) -func AllocListenAddress(t *testing.T) string { - pp := temporalite.NewPortProvider() - listenAddr := fmt.Sprintf("localhost:%d", pp.MustGetFreePort()) - require.NoError(t, pp.Close()) - return listenAddr +func AllocListenAddress() string { + return fmt.Sprintf("localhost:%d", freeport.MustGetFreePort()) } func NewNexusServer(t *testing.T, listenAddr string, handler nexus.Handler) { diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index 1f3cea10f53..9421f5dd8ad 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -410,7 +410,7 @@ func TestProcessInvocationTask(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) - listenAddr := nexustest.AllocListenAddress(t) + listenAddr := nexustest.AllocListenAddress() h := handler{} h.OnStartOperation = func( ctx context.Context, @@ -699,7 +699,7 @@ func TestProcessCancelationTask(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) - listenAddr := nexustest.AllocListenAddress(t) + listenAddr := nexustest.AllocListenAddress() h := handler{} h.OnCancelOperation = tc.onCancelOperation nexustest.NewNexusServer(t, listenAddr, h) diff --git a/internal/freeport/freeport.go b/internal/freeport/freeport.go new file mode 100644 index 00000000000..b5dde1c7101 --- /dev/null +++ b/internal/freeport/freeport.go @@ -0,0 +1,96 @@ +// The MIT License +// +// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package freeport + +import ( + "fmt" + "net" + "runtime" +) + +// MustGetFreePort returns a TCP port that is available to listen on, +// for the given (local) host. +// +// This works by binding a new TCP socket on port 0, which requests the OS to +// allocate a free port. There is no strict guarantee that the port will remain +// available after this function returns, but it should be safe to assume that +// a given port will not be allocated again to any process on this machine +// within a few seconds. +// +// On Unix-based systems, binding to the port returned by this function requires +// setting the `SO_REUSEADDR` socket option (Go already does that by default, +// but other languages may not); otherwise, the OS may fail with a message such +// as "address already in use". Windows default behavior is already appropriate +// in this regard; on that platform, `SO_REUSEADDR` has a different meaning and +// should not be set (setting it may have unpredictable consequences). +func MustGetFreePort() int { + port, err := getFreePort("127.0.0.1") + if err != nil { + // try ipv6 + port, err = getFreePort("[::1]") + if err != nil { + panic(fmt.Errorf("failed assigning ephemeral port: %w", err)) + } + } + return port +} + +func getFreePort(host string) (int, error) { + l, err := net.Listen("tcp", host+":0") + if err != nil { + return 0, fmt.Errorf("failed to assign a free port: %v", err) + } + defer l.Close() + port := l.Addr().(*net.TCPAddr).Port + + // On Linux and some BSD variants, ephemeral ports are randomized, and may + // consequently repeat within a short time frame after the listening end + // has been closed. To avoid this, we make a connection to the port, then + // close that connection from the server's side (this is very important), + // which puts the connection in TIME_WAIT state for some time (by default, + // 60s on Linux). While it remains in that state, the OS will not reallocate + // that port number for bind(:0) syscalls, yet we are not prevented from + // explicitly binding to it (thanks to SO_REUSEADDR). + // + // On macOS and Windows, the above technique is not necessary, as the OS + // allocates ephemeral ports sequentially, meaning a port number will only + // be reused after the entire range has been exhausted. Quite the opposite, + // given that these OSes use a significantly smaller range for ephemeral + // ports, making an extra connection just to reserve a port might actually + // be harmful (by hastening ephemeral port exhaustion). + if runtime.GOOS != "darwin" && runtime.GOOS != "windows" { + r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr)) + if err != nil { + return 0, fmt.Errorf("failed to assign a free port: %v", err) + } + c, err := l.Accept() + if err != nil { + return 0, fmt.Errorf("failed to assign a free port: %v", err) + } + // Closing the socket from the server side + _ = c.Close() + defer r.Close() + } + + return port, nil +} diff --git a/internal/temporalite/freeport.go b/internal/temporalite/freeport.go deleted file mode 100644 index 1c6f69f8732..00000000000 --- a/internal/temporalite/freeport.go +++ /dev/null @@ -1,75 +0,0 @@ -// The MIT License -// -// Copyright (c) 2021 Datadog, Inc. -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package temporalite - -import ( - "fmt" - "net" -) - -func NewPortProvider() *PortProvider { - return &PortProvider{} -} - -type PortProvider struct { - listeners []*net.TCPListener -} - -// GetFreePort finds an open port on the system which is ready to use. -func (p *PortProvider) GetFreePort() (int, error) { - addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") - if err != nil { - if addr, err = net.ResolveTCPAddr("tcp6", "[::1]:0"); err != nil { - return 0, fmt.Errorf("failed to get free port: %w", err) - } - } - - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return 0, err - } - - p.listeners = append(p.listeners, l) - - return l.Addr().(*net.TCPAddr).Port, nil -} - -// MustGetFreePort calls GetFreePort, panicking on error. -func (p *PortProvider) MustGetFreePort() int { - port, err := p.GetFreePort() - if err != nil { - panic(err) - } - return port -} - -func (p *PortProvider) Close() error { - for _, l := range p.listeners { - if err := l.Close(); err != nil { - return err - } - } - return nil -} diff --git a/internal/temporalite/lite_server.go b/internal/temporalite/lite_server.go index 1da1a6bd78d..ef5460567ea 100644 --- a/internal/temporalite/lite_server.go +++ b/internal/temporalite/lite_server.go @@ -48,6 +48,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" sqliteplugin "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" + "go.temporal.io/server/internal/freeport" "go.temporal.io/server/schema/sqlite" "go.temporal.io/server/temporal" expmaps "golang.org/x/exp/maps" @@ -96,7 +97,7 @@ type LiteServerConfig struct { SearchAttributes map[string]enums.IndexedValueType } -func (cfg *LiteServerConfig) apply(serverConfig *config.Config, provider *PortProvider) { +func (cfg *LiteServerConfig) apply(serverConfig *config.Config) { sqliteConfig := config.SQL{ PluginName: sqliteplugin.PluginName, ConnectAttributes: make(map[string]string), @@ -117,12 +118,12 @@ func (cfg *LiteServerConfig) apply(serverConfig *config.Config, provider *PortPr } if cfg.FrontendPort == 0 { - cfg.FrontendPort = provider.MustGetFreePort() + cfg.FrontendPort = freeport.MustGetFreePort() } if cfg.MetricsPort == 0 { - cfg.MetricsPort = provider.MustGetFreePort() + cfg.MetricsPort = freeport.MustGetFreePort() } - pprofPort := provider.MustGetFreePort() + pprofPort := freeport.MustGetFreePort() serverConfig.Global.Membership = config.Membership{ MaxJoinDuration: 30 * time.Second, @@ -160,10 +161,10 @@ func (cfg *LiteServerConfig) apply(serverConfig *config.Config, provider *PortPr Policy: "noop", } serverConfig.Services = map[string]config.Service{ - "frontend": cfg.mustGetService(0, provider), - "history": cfg.mustGetService(1, provider), - "matching": cfg.mustGetService(2, provider), - "worker": cfg.mustGetService(3, provider), + "frontend": cfg.mustGetService(0), + "history": cfg.mustGetService(1), + "matching": cfg.mustGetService(2), + "worker": cfg.mustGetService(3), } serverConfig.Archival = config.Archival{ History: config.HistoryArchival{ @@ -244,11 +245,7 @@ func NewLiteServer(liteConfig *LiteServerConfig, opts ...temporal.ServerOption) return nil, err } - p := NewPortProvider() - liteConfig.apply(liteConfig.BaseConfig, p) - if err := p.Close(); err != nil { - return nil, err - } + liteConfig.apply(liteConfig.BaseConfig) sqlConfig := liteConfig.BaseConfig.Persistence.DataStores[sqliteplugin.PluginName].SQL @@ -379,11 +376,11 @@ func getAllowedPragmas() []string { return allowedPragmaList } -func (cfg *LiteServerConfig) mustGetService(frontendPortOffset int, provider *PortProvider) config.Service { +func (cfg *LiteServerConfig) mustGetService(frontendPortOffset int) config.Service { svc := config.Service{ RPC: config.RPC{ GRPCPort: cfg.FrontendPort + frontendPortOffset, - MembershipPort: provider.MustGetFreePort(), + MembershipPort: freeport.MustGetFreePort(), BindOnLocalHost: true, BindOnIP: "", }, @@ -391,7 +388,7 @@ func (cfg *LiteServerConfig) mustGetService(frontendPortOffset int, provider *Po // Assign any open port when configured to use dynamic ports if frontendPortOffset != 0 { - svc.RPC.GRPCPort = provider.MustGetFreePort() + svc.RPC.GRPCPort = freeport.MustGetFreePort() } // Optionally bind frontend to IPv4 address diff --git a/tests/callbacks_test.go b/tests/callbacks_test.go index 1fa75829224..6295b4e7255 100644 --- a/tests/callbacks_test.go +++ b/tests/callbacks_test.go @@ -46,7 +46,7 @@ import ( "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/components/callbacks" - "go.temporal.io/server/internal/temporalite" + "go.temporal.io/server/internal/freeport" "go.temporal.io/server/tests/testcore" "google.golang.org/protobuf/types/known/durationpb" ) @@ -250,7 +250,6 @@ func (s *CallbacksSuite) TestWorkflowNexusCallbacks_CarriedOver() { Namespace: s.Namespace(), }) s.NoError(err) - pp := temporalite.NewPortProvider() taskQueue := testcore.RandomizeStr(s.T().Name()) workflowType := "test" @@ -259,8 +258,7 @@ func (s *CallbacksSuite) TestWorkflowNexusCallbacks_CarriedOver() { requestCh: make(chan *nexus.CompletionRequest, 1), requestCompleteCh: make(chan error, 1), } - callbackAddress := fmt.Sprintf("localhost:%d", pp.MustGetFreePort()) - s.NoError(pp.Close()) + callbackAddress := fmt.Sprintf("localhost:%d", freeport.MustGetFreePort()) shutdownServer := s.runNexusCompletionHTTPServer(ch, callbackAddress) t.Cleanup(func() { require.NoError(t, shutdownServer()) @@ -349,7 +347,6 @@ func (s *CallbacksSuite) TestNexusResetWorkflowWithCallback() { Namespace: s.Namespace(), }) s.NoError(err) - pp := temporalite.NewPortProvider() taskQueue := testcore.RandomizeStr(s.T().Name()) @@ -357,8 +354,7 @@ func (s *CallbacksSuite) TestNexusResetWorkflowWithCallback() { requestCh: make(chan *nexus.CompletionRequest, 1), requestCompleteCh: make(chan error, 1), } - callbackAddress := fmt.Sprintf("localhost:%d", pp.MustGetFreePort()) - s.NoError(pp.Close()) + callbackAddress := fmt.Sprintf("localhost:%d", freeport.MustGetFreePort()) shutdownServer := s.runNexusCompletionHTTPServer(ch, callbackAddress) s.T().Cleanup(func() { require.NoError(s.T(), shutdownServer()) diff --git a/tests/nexus_workflow_test.go b/tests/nexus_workflow_test.go index 4703871d93b..158b3730e77 100644 --- a/tests/nexus_workflow_test.go +++ b/tests/nexus_workflow_test.go @@ -87,7 +87,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationCancelation() { return nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ @@ -212,7 +212,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationSyncCompletion() { return &nexus.HandlerStartOperationResultSync[any]{Value: "result"}, nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ @@ -317,7 +317,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationSyncCompletion_LargePayload() return &nexus.HandlerStartOperationResultSync[any]{Value: strings.Repeat("a", (2*1024*1024)-10)}, nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ @@ -485,7 +485,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion() { }, nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) _, err = s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ @@ -951,7 +951,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncFailure() { return &nexus.HandlerStartOperationResultAsync{OperationID: "test"}, nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ @@ -1369,7 +1369,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationCancelBeforeStarted_Cancelati return nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ @@ -1485,7 +1485,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAfterReset() { return &nexus.HandlerStartOperationResultAsync{OperationID: "test"}, nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ diff --git a/tests/testcore/test_cluster.go b/tests/testcore/test_cluster.go index 76dda3e8cd7..124e6cedb3a 100644 --- a/tests/testcore/test_cluster.go +++ b/tests/testcore/test_cluster.go @@ -63,7 +63,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/rpc/encryption" "go.temporal.io/server/common/searchattribute" - "go.temporal.io/server/internal/temporalite" + "go.temporal.io/server/internal/freeport" "go.temporal.io/server/temporal" "go.temporal.io/server/tests/testutils" "go.uber.org/fx" @@ -209,16 +209,15 @@ func NewClusterWithPersistenceTestBaseFactory(t *testing.T, options *TestCluster } // allocate ports - pp := temporalite.NewPortProvider() hostsByProtocolByService := map[transferProtocol]map[primitives.ServiceName]static.Hosts{ grpcProtocol: { - primitives.FrontendService: {All: makeAddresses(pp, options.FrontendConfig.NumFrontendHosts)}, - primitives.MatchingService: {All: makeAddresses(pp, options.MatchingConfig.NumMatchingHosts)}, - primitives.HistoryService: {All: makeAddresses(pp, options.HistoryConfig.NumHistoryHosts)}, - primitives.WorkerService: {All: makeAddresses(pp, options.WorkerConfig.NumWorkers)}, + primitives.FrontendService: {All: makeAddresses(options.FrontendConfig.NumFrontendHosts)}, + primitives.MatchingService: {All: makeAddresses(options.MatchingConfig.NumMatchingHosts)}, + primitives.HistoryService: {All: makeAddresses(options.HistoryConfig.NumHistoryHosts)}, + primitives.WorkerService: {All: makeAddresses(options.WorkerConfig.NumWorkers)}, }, httpProtocol: { - primitives.FrontendService: {All: makeAddresses(pp, options.FrontendConfig.NumFrontendHosts)}, + primitives.FrontendService: {All: makeAddresses(options.FrontendConfig.NumFrontendHosts)}, }, } if err := pp.Close(); err != nil { @@ -654,10 +653,10 @@ func createFixedTLSConfigProvider() (*encryption.FixedTLSConfigProvider, error) }, nil } -func makeAddresses(pp *temporalite.PortProvider, count int) []string { +func makeAddresses(count int) []string { hosts := make([]string, count) for i := range hosts { - hosts[i] = fmt.Sprintf("127.0.0.1:%d", pp.MustGetFreePort()) + hosts[i] = fmt.Sprintf("127.0.0.1:%d", freeport.MustGetFreePort()) } return hosts } diff --git a/tests/xdc/nexus_request_forwarding_test.go b/tests/xdc/nexus_request_forwarding_test.go index ed8fe46a54b..4cc35700aa3 100644 --- a/tests/xdc/nexus_request_forwarding_test.go +++ b/tests/xdc/nexus_request_forwarding_test.go @@ -347,7 +347,7 @@ func (s *NexusRequestForwardingSuite) TestCompleteOperationForwardedFromStandbyT return &nexus.HandlerStartOperationResultAsync{OperationID: "test"}, nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) createEndpointReq := &operatorservice.CreateNexusEndpointRequest{ diff --git a/tests/xdc/nexus_state_replication_test.go b/tests/xdc/nexus_state_replication_test.go index 0007c736dc4..1b65dc68a7c 100644 --- a/tests/xdc/nexus_state_replication_test.go +++ b/tests/xdc/nexus_state_replication_test.go @@ -114,7 +114,7 @@ func (s *NexusStateReplicationSuite) TestNexusOperationEventsReplicated() { return &nexus.HandlerStartOperationResultAsync{OperationID: "test"}, nil }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) ctx := testcore.NewContext() @@ -260,7 +260,7 @@ func (s *NexusStateReplicationSuite) TestNexusOperationCancelationReplicated() { return nil, errors.New("injected error for failing nexus start operation") }, } - listenAddr := nexustest.AllocListenAddress(s.T()) + listenAddr := nexustest.AllocListenAddress() nexustest.NewNexusServer(s.T(), listenAddr, h) ctx := testcore.NewContext()