Skip to content

Commit

Permalink
Freeport 2.0 (temporalio#6966)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

New freeport implementation; taken from battle-tested Temporal projects.

## Why?
<!-- Tell your future self why have you made these changes -->

Follow-up to temporalio#6915; making it
simpler and more robust.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

- [x] works on Linux (ie CI)
- [x] works on Mac (ie my computer)

[First CI
run](https://github.com/temporalio/temporal/actions/runs/12269035797/attempts/1?pr=6966),
`TestNexusCallbackReplicated` failed with `Error while dialing: dial tcp
127.0.0.1:33279: connect: connection refused`. Looks like there was an
unrelated data race (that I've reported to Slack).

[Second CI
run](https://github.com/temporalio/temporal/actions/runs/12269035797/attempts/2?pr=6966)
passed.

[Third CI
run](https://github.com/temporalio/temporal/actions/runs/12269035797?pr=6966),
Versioning suite seems to have [timed
out](https://github.com/temporalio/temporal/actions/runs/12269035797/job/34233452135?pr=6966#step:8:966).
Unrelated.

So it seems to be fine from a random port perspective.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Dec 12, 2024
1 parent 09a61bf commit 53b1425
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 140 deletions.
9 changes: 3 additions & 6 deletions common/nexus/nexustest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ import (

"github.com/nexus-rpc/sdk-go/nexus"
"github.com/stretchr/testify/require"
"go.temporal.io/server/internal/temporalite"
"go.temporal.io/server/internal/freeport"
)

func AllocListenAddress(t *testing.T) string {
pp := temporalite.NewPortProvider()
listenAddr := fmt.Sprintf("localhost:%d", pp.MustGetFreePort())
require.NoError(t, pp.Close())
return listenAddr
func AllocListenAddress() string {
return fmt.Sprintf("localhost:%d", freeport.MustGetFreePort())
}

func NewNexusServer(t *testing.T, listenAddr string, handler nexus.Handler) {
Expand Down
4 changes: 2 additions & 2 deletions components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func TestProcessInvocationTask(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
listenAddr := nexustest.AllocListenAddress(t)
listenAddr := nexustest.AllocListenAddress()
h := handler{}
h.OnStartOperation = func(
ctx context.Context,
Expand Down Expand Up @@ -716,7 +716,7 @@ func TestProcessCancelationTask(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
listenAddr := nexustest.AllocListenAddress(t)
listenAddr := nexustest.AllocListenAddress()
h := handler{}
h.OnCancelOperation = tc.onCancelOperation
nexustest.NewNexusServer(t, listenAddr, h)
Expand Down
96 changes: 96 additions & 0 deletions internal/freeport/freeport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package freeport

import (
"fmt"
"net"
"runtime"
)

// MustGetFreePort returns a TCP port that is available to listen on,
// for the given (local) host.
//
// This works by binding a new TCP socket on port 0, which requests the OS to
// allocate a free port. There is no strict guarantee that the port will remain
// available after this function returns, but it should be safe to assume that
// a given port will not be allocated again to any process on this machine
// within a few seconds.
//
// On Unix-based systems, binding to the port returned by this function requires
// setting the `SO_REUSEADDR` socket option (Go already does that by default,
// but other languages may not); otherwise, the OS may fail with a message such
// as "address already in use". Windows default behavior is already appropriate
// in this regard; on that platform, `SO_REUSEADDR` has a different meaning and
// should not be set (setting it may have unpredictable consequences).
func MustGetFreePort() int {
port, err := getFreePort("127.0.0.1")
if err != nil {
// try ipv6
port, err = getFreePort("[::1]")
if err != nil {
panic(fmt.Errorf("failed assigning ephemeral port: %w", err))
}
}
return port
}

func getFreePort(host string) (int, error) {
l, err := net.Listen("tcp", host+":0")
if err != nil {
return 0, fmt.Errorf("failed to assign a free port: %v", err)
}
defer l.Close()
port := l.Addr().(*net.TCPAddr).Port

// On Linux and some BSD variants, ephemeral ports are randomized, and may
// consequently repeat within a short time frame after the listening end
// has been closed. To avoid this, we make a connection to the port, then
// close that connection from the server's side (this is very important),
// which puts the connection in TIME_WAIT state for some time (by default,
// 60s on Linux). While it remains in that state, the OS will not reallocate
// that port number for bind(:0) syscalls, yet we are not prevented from
// explicitly binding to it (thanks to SO_REUSEADDR).
//
// On macOS and Windows, the above technique is not necessary, as the OS
// allocates ephemeral ports sequentially, meaning a port number will only
// be reused after the entire range has been exhausted. Quite the opposite,
// given that these OSes use a significantly smaller range for ephemeral
// ports, making an extra connection just to reserve a port might actually
// be harmful (by hastening ephemeral port exhaustion).
if runtime.GOOS != "darwin" && runtime.GOOS != "windows" {
r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr))
if err != nil {
return 0, fmt.Errorf("failed to assign a free port: %v", err)
}
c, err := l.Accept()
if err != nil {
return 0, fmt.Errorf("failed to assign a free port: %v", err)
}
// Closing the socket from the server side
_ = c.Close()
defer r.Close()
}

return port, nil
}
75 changes: 0 additions & 75 deletions internal/temporalite/freeport.go

This file was deleted.

29 changes: 13 additions & 16 deletions internal/temporalite/lite_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
sqliteplugin "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
"go.temporal.io/server/internal/freeport"
"go.temporal.io/server/schema/sqlite"
"go.temporal.io/server/temporal"
expmaps "golang.org/x/exp/maps"
Expand Down Expand Up @@ -96,7 +97,7 @@ type LiteServerConfig struct {
SearchAttributes map[string]enums.IndexedValueType
}

func (cfg *LiteServerConfig) apply(serverConfig *config.Config, provider *PortProvider) {
func (cfg *LiteServerConfig) apply(serverConfig *config.Config) {
sqliteConfig := config.SQL{
PluginName: sqliteplugin.PluginName,
ConnectAttributes: make(map[string]string),
Expand All @@ -117,12 +118,12 @@ func (cfg *LiteServerConfig) apply(serverConfig *config.Config, provider *PortPr
}

if cfg.FrontendPort == 0 {
cfg.FrontendPort = provider.MustGetFreePort()
cfg.FrontendPort = freeport.MustGetFreePort()
}
if cfg.MetricsPort == 0 {
cfg.MetricsPort = provider.MustGetFreePort()
cfg.MetricsPort = freeport.MustGetFreePort()
}
pprofPort := provider.MustGetFreePort()
pprofPort := freeport.MustGetFreePort()

serverConfig.Global.Membership = config.Membership{
MaxJoinDuration: 30 * time.Second,
Expand Down Expand Up @@ -160,10 +161,10 @@ func (cfg *LiteServerConfig) apply(serverConfig *config.Config, provider *PortPr
Policy: "noop",
}
serverConfig.Services = map[string]config.Service{
"frontend": cfg.mustGetService(0, provider),
"history": cfg.mustGetService(1, provider),
"matching": cfg.mustGetService(2, provider),
"worker": cfg.mustGetService(3, provider),
"frontend": cfg.mustGetService(0),
"history": cfg.mustGetService(1),
"matching": cfg.mustGetService(2),
"worker": cfg.mustGetService(3),
}
serverConfig.Archival = config.Archival{
History: config.HistoryArchival{
Expand Down Expand Up @@ -244,11 +245,7 @@ func NewLiteServer(liteConfig *LiteServerConfig, opts ...temporal.ServerOption)
return nil, err
}

p := NewPortProvider()
liteConfig.apply(liteConfig.BaseConfig, p)
if err := p.Close(); err != nil {
return nil, err
}
liteConfig.apply(liteConfig.BaseConfig)

sqlConfig := liteConfig.BaseConfig.Persistence.DataStores[sqliteplugin.PluginName].SQL

Expand Down Expand Up @@ -379,19 +376,19 @@ func getAllowedPragmas() []string {
return allowedPragmaList
}

func (cfg *LiteServerConfig) mustGetService(frontendPortOffset int, provider *PortProvider) config.Service {
func (cfg *LiteServerConfig) mustGetService(frontendPortOffset int) config.Service {
svc := config.Service{
RPC: config.RPC{
GRPCPort: cfg.FrontendPort + frontendPortOffset,
MembershipPort: provider.MustGetFreePort(),
MembershipPort: freeport.MustGetFreePort(),
BindOnLocalHost: true,
BindOnIP: "",
},
}

// Assign any open port when configured to use dynamic ports
if frontendPortOffset != 0 {
svc.RPC.GRPCPort = provider.MustGetFreePort()
svc.RPC.GRPCPort = freeport.MustGetFreePort()
}

// Optionally bind frontend to IPv4 address
Expand Down
10 changes: 3 additions & 7 deletions tests/callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/components/callbacks"
"go.temporal.io/server/internal/temporalite"
"go.temporal.io/server/internal/freeport"
"go.temporal.io/server/tests/testcore"
"google.golang.org/protobuf/types/known/durationpb"
)
Expand Down Expand Up @@ -250,7 +250,6 @@ func (s *CallbacksSuite) TestWorkflowNexusCallbacks_CarriedOver() {
Namespace: s.Namespace(),
})
s.NoError(err)
pp := temporalite.NewPortProvider()

taskQueue := testcore.RandomizeStr(s.T().Name())
workflowType := "test"
Expand All @@ -259,8 +258,7 @@ func (s *CallbacksSuite) TestWorkflowNexusCallbacks_CarriedOver() {
requestCh: make(chan *nexus.CompletionRequest, 1),
requestCompleteCh: make(chan error, 1),
}
callbackAddress := fmt.Sprintf("localhost:%d", pp.MustGetFreePort())
s.NoError(pp.Close())
callbackAddress := fmt.Sprintf("localhost:%d", freeport.MustGetFreePort())
shutdownServer := s.runNexusCompletionHTTPServer(ch, callbackAddress)
t.Cleanup(func() {
require.NoError(t, shutdownServer())
Expand Down Expand Up @@ -349,16 +347,14 @@ func (s *CallbacksSuite) TestNexusResetWorkflowWithCallback() {
Namespace: s.Namespace(),
})
s.NoError(err)
pp := temporalite.NewPortProvider()

taskQueue := testcore.RandomizeStr(s.T().Name())

ch := &completionHandler{
requestCh: make(chan *nexus.CompletionRequest, 1),
requestCompleteCh: make(chan error, 1),
}
callbackAddress := fmt.Sprintf("localhost:%d", pp.MustGetFreePort())
s.NoError(pp.Close())
callbackAddress := fmt.Sprintf("localhost:%d", freeport.MustGetFreePort())
shutdownServer := s.runNexusCompletionHTTPServer(ch, callbackAddress)
s.T().Cleanup(func() {
require.NoError(s.T(), shutdownServer())
Expand Down
Loading

0 comments on commit 53b1425

Please sign in to comment.