diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index e3cd9d9be391..852490efd040 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -45,7 +45,6 @@ ALL_TESTS = [ "//pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl:spanconfigsqlwatcherccl_test", "//pkg/ccl/sqlproxyccl/balancer:balancer_test", "//pkg/ccl/sqlproxyccl/denylist:denylist_test", - "//pkg/ccl/sqlproxyccl/idle:idle_test", "//pkg/ccl/sqlproxyccl/interceptor:interceptor_test", "//pkg/ccl/sqlproxyccl/tenant:tenant_test", "//pkg/ccl/sqlproxyccl/throttler:throttler_test", diff --git a/pkg/ccl/sqlproxyccl/BUILD.bazel b/pkg/ccl/sqlproxyccl/BUILD.bazel index dc06d31379f2..81ce3bf7228b 100644 --- a/pkg/ccl/sqlproxyccl/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/BUILD.bazel @@ -23,7 +23,6 @@ go_library( "//pkg/base", "//pkg/ccl/sqlproxyccl/balancer", "//pkg/ccl/sqlproxyccl/denylist", - "//pkg/ccl/sqlproxyccl/idle", "//pkg/ccl/sqlproxyccl/interceptor", "//pkg/ccl/sqlproxyccl/tenant", "//pkg/ccl/sqlproxyccl/tenantdirsvr", diff --git a/pkg/ccl/sqlproxyccl/connector.go b/pkg/ccl/sqlproxyccl/connector.go index fdcca77d303d..9b3b46cd7d29 100644 --- a/pkg/ccl/sqlproxyccl/connector.go +++ b/pkg/ccl/sqlproxyccl/connector.go @@ -72,16 +72,6 @@ type connector struct { // NOTE: This field is optional. TLSConfig *tls.Config - // IdleMonitorWrapperFn is used to wrap the connection to the SQL pod with - // an idle monitor. If not specified, the raw connection to the SQL pod - // will be returned. - // - // In the case of connecting with an authentication phase, the connection - // will be wrapped before starting the authentication. - // - // NOTE: This field is optional. - IdleMonitorWrapperFn func(serverConn net.Conn) net.Conn - // Testing knobs for internal connector calls. If specified, these will // be called instead of the actual logic. testingKnobs struct { @@ -112,10 +102,6 @@ func (c *connector) OpenTenantConnWithToken( } }() - if c.IdleMonitorWrapperFn != nil { - serverConn = c.IdleMonitorWrapperFn(serverConn) - } - // When we use token-based authentication, we will still get the initial // connection data messages (e.g. ParameterStatus and BackendKeyData). // Since this method is only used during connection migration (i.e. proxy @@ -161,10 +147,6 @@ func (c *connector) OpenTenantConnWithAuth( } }() - if c.IdleMonitorWrapperFn != nil { - serverConn = c.IdleMonitorWrapperFn(serverConn) - } - // Perform user authentication for non-token-based auth methods. This will // block until the server has authenticated the client. if err := authenticate(clientConn, serverConn, throttleHook); err != nil { diff --git a/pkg/ccl/sqlproxyccl/connector_test.go b/pkg/ccl/sqlproxyccl/connector_test.go index 0be2b6d3eded..001f021bd6d1 100644 --- a/pkg/ccl/sqlproxyccl/connector_test.go +++ b/pkg/ccl/sqlproxyccl/connector_test.go @@ -152,59 +152,6 @@ func TestConnector_OpenTenantConnWithToken(t *testing.T) { _, ok := c.StartupMsg.Parameters[sessionRevivalTokenStartupParam] require.False(t, ok) }) - - t.Run("idle monitor wrapper is called", func(t *testing.T) { - var wrapperCalled bool - f := &forwarder{} - c := &connector{ - StartupMsg: &pgproto3.StartupMessage{ - Parameters: make(map[string]string), - }, - IdleMonitorWrapperFn: func(crdbConn net.Conn) net.Conn { - wrapperCalled = true - return crdbConn - }, - } - - conn, _ := net.Pipe() - defer conn.Close() - - var openCalled bool - c.testingKnobs.dialTenantCluster = func( - ctx context.Context, requester balancer.ConnectionHandle, - ) (net.Conn, error) { - require.Equal(t, f, requester) - openCalled = true - - // Validate that token is set. - str, ok := c.StartupMsg.Parameters[sessionRevivalTokenStartupParam] - require.True(t, ok) - require.Equal(t, token, str) - - return conn, nil - } - - var authCalled bool - defer testutils.TestingHook( - &readTokenAuthResult, - func(serverConn net.Conn) error { - authCalled = true - require.Equal(t, conn, serverConn) - return nil - }, - )() - - crdbConn, err := c.OpenTenantConnWithToken(ctx, f, token) - require.True(t, wrapperCalled) - require.True(t, openCalled) - require.True(t, authCalled) - require.NoError(t, err) - require.Equal(t, conn, crdbConn) - - // Ensure that token is deleted. - _, ok := c.StartupMsg.Parameters[sessionRevivalTokenStartupParam] - require.False(t, ok) - }) } func TestConnector_OpenTenantConnWithAuth(t *testing.T) { @@ -334,68 +281,6 @@ func TestConnector_OpenTenantConnWithAuth(t *testing.T) { require.False(t, sentToClient) require.Equal(t, serverConn, crdbConn) }) - - t.Run("idle monitor wrapper is called", func(t *testing.T) { - clientConn, _ := net.Pipe() - defer clientConn.Close() - - serverConn, _ := net.Pipe() - defer serverConn.Close() - - var wrapperCalled bool - f := &forwarder{} - c := &connector{ - StartupMsg: &pgproto3.StartupMessage{ - Parameters: map[string]string{ - // Passing in a token should have no effect. - sessionRevivalTokenStartupParam: "foo", - }, - }, - IdleMonitorWrapperFn: func(crdbConn net.Conn) net.Conn { - wrapperCalled = true - return crdbConn - }, - } - - var openCalled bool - c.testingKnobs.dialTenantCluster = func( - ctx context.Context, requester balancer.ConnectionHandle, - ) (net.Conn, error) { - require.Equal(t, f, requester) - openCalled = true - - // Validate that token is not set. - _, ok := c.StartupMsg.Parameters[sessionRevivalTokenStartupParam] - require.False(t, ok) - - return serverConn, nil - } - - var authCalled bool - defer testutils.TestingHook( - &authenticate, - func( - client net.Conn, - server net.Conn, - throttleHook func(status throttler.AttemptStatus) error, - ) error { - authCalled = true - require.Equal(t, clientConn, client) - require.NotNil(t, server) - require.Equal(t, reflect.ValueOf(dummyHook).Pointer(), - reflect.ValueOf(throttleHook).Pointer()) - return nil - }, - )() - - crdbConn, sentToClient, err := c.OpenTenantConnWithAuth(ctx, f, clientConn, dummyHook) - require.True(t, openCalled) - require.True(t, wrapperCalled) - require.True(t, authCalled) - require.NoError(t, err) - require.False(t, sentToClient) - require.Equal(t, serverConn, crdbConn) - }) } func TestConnector_dialTenantCluster(t *testing.T) { diff --git a/pkg/ccl/sqlproxyccl/error.go b/pkg/ccl/sqlproxyccl/error.go index 1eb79f6f3a85..13b31bf7724f 100644 --- a/pkg/ccl/sqlproxyccl/error.go +++ b/pkg/ccl/sqlproxyccl/error.go @@ -41,10 +41,6 @@ const ( // or vice versa. codeUnexpectedInsecureStartupMessage - // codeSNIRoutingFailed indicates an error choosing a backend address based on - // the client's SNI header. - codeSNIRoutingFailed - // codeUnexpectedStartupMessage indicates an unexpected startup message // received from the client after TLS negotiation. codeUnexpectedStartupMessage @@ -77,10 +73,6 @@ const ( // has expired and should be closed. codeExpiredClientConnection - // codeIdleDisconnect indicates that the connection was disconnected for - // being idle for longer than the specified timeout. - codeIdleDisconnect - // codeUnavailable indicates that the backend SQL server exists but is not // accepting connections. For example, a tenant cluster that has maxPods set to 0. codeUnavailable diff --git a/pkg/ccl/sqlproxyccl/errorcode_string.go b/pkg/ccl/sqlproxyccl/errorcode_string.go index 908b6bc1e6cb..d14e5b70a361 100644 --- a/pkg/ccl/sqlproxyccl/errorcode_string.go +++ b/pkg/ccl/sqlproxyccl/errorcode_string.go @@ -14,22 +14,20 @@ func _() { _ = x[codeClientReadFailed-4] _ = x[codeClientWriteFailed-5] _ = x[codeUnexpectedInsecureStartupMessage-6] - _ = x[codeSNIRoutingFailed-7] - _ = x[codeUnexpectedStartupMessage-8] - _ = x[codeParamsRoutingFailed-9] - _ = x[codeBackendDown-10] - _ = x[codeBackendRefusedTLS-11] - _ = x[codeBackendDisconnected-12] - _ = x[codeClientDisconnected-13] - _ = x[codeProxyRefusedConnection-14] - _ = x[codeExpiredClientConnection-15] - _ = x[codeIdleDisconnect-16] - _ = x[codeUnavailable-17] + _ = x[codeUnexpectedStartupMessage-7] + _ = x[codeParamsRoutingFailed-8] + _ = x[codeBackendDown-9] + _ = x[codeBackendRefusedTLS-10] + _ = x[codeBackendDisconnected-11] + _ = x[codeClientDisconnected-12] + _ = x[codeProxyRefusedConnection-13] + _ = x[codeExpiredClientConnection-14] + _ = x[codeUnavailable-15] } -const _errorCode_name = "codeAuthFailedcodeBackendReadFailedcodeBackendWriteFailedcodeClientReadFailedcodeClientWriteFailedcodeUnexpectedInsecureStartupMessagecodeSNIRoutingFailedcodeUnexpectedStartupMessagecodeParamsRoutingFailedcodeBackendDowncodeBackendRefusedTLScodeBackendDisconnectedcodeClientDisconnectedcodeProxyRefusedConnectioncodeExpiredClientConnectioncodeIdleDisconnectcodeUnavailable" +const _errorCode_name = "codeAuthFailedcodeBackendReadFailedcodeBackendWriteFailedcodeClientReadFailedcodeClientWriteFailedcodeUnexpectedInsecureStartupMessagecodeUnexpectedStartupMessagecodeParamsRoutingFailedcodeBackendDowncodeBackendRefusedTLScodeBackendDisconnectedcodeClientDisconnectedcodeProxyRefusedConnectioncodeExpiredClientConnectioncodeUnavailable" -var _errorCode_index = [...]uint16{0, 14, 35, 57, 77, 98, 134, 154, 182, 205, 220, 241, 264, 286, 312, 339, 357, 372} +var _errorCode_index = [...]uint16{0, 14, 35, 57, 77, 98, 134, 162, 185, 200, 221, 244, 266, 292, 319, 334} func (i errorCode) String() string { i -= 1 diff --git a/pkg/ccl/sqlproxyccl/idle/BUILD.bazel b/pkg/ccl/sqlproxyccl/idle/BUILD.bazel deleted file mode 100644 index 45a09e715e5d..000000000000 --- a/pkg/ccl/sqlproxyccl/idle/BUILD.bazel +++ /dev/null @@ -1,29 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "idle", - srcs = [ - "monitor.go", - "monitor_conn.go", - ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/idle", - visibility = ["//visibility:public"], - deps = [ - "//pkg/util/syncutil", - "//pkg/util/timeutil", - ], -) - -go_test( - name = "idle_test", - srcs = [ - "monitor_conn_test.go", - "monitor_test.go", - ], - embed = [":idle"], - deps = [ - "//pkg/util/leaktest", - "//pkg/util/log", - "@com_github_stretchr_testify//require", - ], -) diff --git a/pkg/ccl/sqlproxyccl/idle/monitor.go b/pkg/ccl/sqlproxyccl/idle/monitor.go deleted file mode 100644 index 6c2592039520..000000000000 --- a/pkg/ccl/sqlproxyccl/idle/monitor.go +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package idle - -import ( - "context" - "net" - "sync/atomic" - "time" - - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" -) - -// maxIdleConns limits the number of idle connections that can be found in a -// given pass of the idle monitor. -const maxIdleConns = 1000 - -// OnIdleFunc is called by the monitor when a connection has been idle for the -// timeout period. -type OnIdleFunc func() - -// connMap maps from a wrapped monitorConn to the idle function which should be -// called if the connection goes idle. -type connMap map[*monitorConn]OnIdleFunc - -// Monitor detects connections which have had no read or write activity for a -// timeout period. Connections are grouped by their "remote" address (i.e. the -// address to which they connect). Idle checks can be turned off and on for each -// group. Example usage: -// -// mon := NewMonitor(ctx, 10*time.Second) -// wrapped := mon.DetectIdle(conn, func() { -// // Handle idle connection. -// }) -// defer wrapped.Close() -// mon.SetIdleChecks(conn.RemoteAddr().String()) -// -// NOTE: All methods on Monitor are thread-safe. -type Monitor struct { - timeout time.Duration - - mu struct { - syncutil.Mutex - - // conns maps from remote address to a map of connections to that address. - conns map[string]connMap - - // checks is a set of remote addresses. All connections to those addresses - // will be monitored for idleness. - checks map[string]struct{} - } -} - -// NewMonitor constructs a new idle connection monitor. It runs periodically on -// a background thread, until the given context is canceled. Callers must -// always ensure the context is canceled to avoid a goroutine leak. -func NewMonitor(ctx context.Context, timeout time.Duration) *Monitor { - if timeout == 0 { - panic("monitor should never be constructed with a zero timeout") - } - - m := &Monitor{timeout: timeout} - m.mu.conns = make(map[string]connMap) - m.mu.checks = make(map[string]struct{}) - - go m.start(ctx) - - return m -} - -// SetIdleChecks instructs the monitor to detect idleness for any connection -// to the given remote address. -func (m *Monitor) SetIdleChecks(addr string) { - m.mu.Lock() - defer m.mu.Unlock() - m.mu.checks[addr] = struct{}{} -} - -// ClearIdleChecks disables idle detection for connections to the given remote -// address. -func (m *Monitor) ClearIdleChecks(addr string) { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.mu.checks, addr) -} - -// DetectIdle adds a new connection to the monitor. The monitor will call the -// given "onIdle" callback function if that connection ever becomes idle. -// DetectIdle returns a "wrapper" connection that intercepts Read/Write calls -// to the given connection in order to detect idleness. Callers must therefore -// redirect all traffic through the returned connection rather than the original -// connection, and be sure to close the wrapper connection (it will remove -// itself from the monitor when closed). -// NOTE: The callback function will be called from a background goroutine. It -// will be called repeatedly for the same connection each time the Monitor runs, -// until the connection is closed or activity on the connection resumes. -func (m *Monitor) DetectIdle(conn net.Conn, onIdle OnIdleFunc) net.Conn { - m.mu.Lock() - defer m.mu.Unlock() - - wrapper := newMonitorConn(m, conn) - - addr := conn.RemoteAddr().String() - existing, ok := m.mu.conns[addr] - if !ok { - existing = make(map[*monitorConn]OnIdleFunc) - m.mu.conns[addr] = existing - } - - existing[wrapper] = onIdle - return wrapper -} - -// start runs on a background goroutine. It runs 10 times per timeout period, so -// the idle detection granularity error can be as high as ~10%. Each time it -// wakes up, it scans all checked connections for any that have been idle for at -// least the timeout period. -func (m *Monitor) start(ctx context.Context) { - var checkAddrs []string - var idleFuncs []OnIdleFunc - - for { - select { - case <-ctx.Done(): - return - - case <-time.After(m.timeout / 10): - } - - // Get the addresses of all pods that need to be monitored for idleness. - // Copy the addresses into a separate slice in order to avoid holding - // the mutex for too long (just long enough to copy). - checkAddrs = m.getAddrsToCheck(checkAddrs) - for _, addr := range checkAddrs { - // Get the idle callback functions for all connections to this pod - // address that have gone idle. Any given pod should have at worst a - // few thousand connections to it. Benchmarks show that even scanning - // 10,000 connections for idleness takes only 250us - it's OK to hold - // the lock for that long. - idleFuncs = m.findIdleConns(addr, idleFuncs) - - // Callback functions are copied to a separate slice outside the - // scope of a lock, so there are no concerns with the onIdle callback - // function causing re-entrancy deadlocks. - for _, onIdle := range idleFuncs { - onIdle() - } - } - } -} - -// getAddrsToCheck returns the addresses of all pods that require idle timeouts -// to be checked. The list is appended to the given "checkAddrs" slice and the -// slice is returned. Copying into the slice minimizes the amount of time the -// lock needs to be held. -func (m *Monitor) getAddrsToCheck(checkAddrs []string) []string { - checkAddrs = checkAddrs[:0] - - m.mu.Lock() - defer m.mu.Unlock() - for addr := range m.mu.checks { - checkAddrs = append(checkAddrs, addr) - } - return checkAddrs -} - -// findIdleConns finds all connections to the given pod address that have been -// idle for longer than the timeout period. It appends the idle callback -// functions associated with those connections to the given "idleFuncs" slice -// and returns that slice. Copying into the slice minimizes the amount of time -// the lock needs to be held. -func (m *Monitor) findIdleConns(addr string, idleFuncs []OnIdleFunc) []OnIdleFunc { - idleFuncs = idleFuncs[:0] - now := timeutil.Now().UnixNano() - - // Compute new deadline for connections with recent activity. - deadline := now + int64(m.timeout) - - m.mu.Lock() - defer m.mu.Unlock() - - connMap := m.mu.conns[addr] - for conn, onIdle := range connMap { - // Get current deadline of the connection. - // See monitorConn comment for more details on how the deadline field is - // used by the monitor and connection. - connDeadline := atomic.LoadInt64(&conn.deadline) - if connDeadline != 0 { - // Check if the deadline has passed, in which case the connection is - // considered idle. - if now > connDeadline { - idleFuncs = append(idleFuncs, onIdle) - if len(idleFuncs) >= maxIdleConns { - // Limit max size of the slice. Additional idle connections - // can be found the next time the monitor runs. - break - } - } - } else { - // Set a new deadline for the connection. - atomic.StoreInt64(&conn.deadline, deadline) - } - } - - return idleFuncs -} - -// removeConn is called by a monitorConn when it is called, in order to remove -// itself from the connection map. -func (m *Monitor) removeConn(conn *monitorConn) { - m.mu.Lock() - defer m.mu.Unlock() - - addr := conn.RemoteAddr().String() - if connMap, ok := m.mu.conns[addr]; ok { - delete(connMap, conn) - if len(connMap) == 0 { - // No more connections to this address, so remove map entries. - delete(m.mu.conns, addr) - } - } -} - -func (m *Monitor) countConnsToAddr(addr string) int { - m.mu.Lock() - defer m.mu.Unlock() - return len(m.mu.conns[addr]) -} - -func (m *Monitor) countAddrsToCheck() int { - m.mu.Lock() - defer m.mu.Unlock() - return len(m.mu.checks) -} diff --git a/pkg/ccl/sqlproxyccl/idle/monitor_conn.go b/pkg/ccl/sqlproxyccl/idle/monitor_conn.go deleted file mode 100644 index 1dc4346d8bcb..000000000000 --- a/pkg/ccl/sqlproxyccl/idle/monitor_conn.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package idle - -import ( - "net" - "sync/atomic" -) - -// monitorConn is a wrapper around net.Conn that intercepts reads/writes and -// registers the activity so that the idle monitor will not trigger connection -// closure. This is accomplished by using the "deadline" field as the central -// point of coordination between the monitor and this connection. The monitor -// atomically sets the deadline each time it wakes up, and the connection -// atomically clears it, by setting it to zero when activity occurs. If no -// activity occurs (and the connection is not clearing the deadline), then -// eventually the deadline expires and the monitor reports the connection as -// idle. -type monitorConn struct { - net.Conn - monitor *Monitor - deadline int64 -} - -// newMonitorConn wraps the given connection with a monitorConn. -func newMonitorConn(monitor *Monitor, conn net.Conn) *monitorConn { - return &monitorConn{Conn: conn, monitor: monitor} -} - -// Read reads data from the connection with timeout. -func (c *monitorConn) Read(b []byte) (n int, err error) { - c.clearDeadline() - return c.Conn.Read(b) -} - -// Write writes data to the connection and sets the read timeout. -func (c *monitorConn) Write(b []byte) (n int, err error) { - c.clearDeadline() - return c.Conn.Write(b) -} - -// Close removes this connection from the monitor and passes through the call to -// the wrapped connection. -func (c *monitorConn) Close() error { - c.monitor.removeConn(c) - return c.Conn.Close() -} - -// clearDeadline atomically sets the deadline field to zero in order to prevent -// the monitor from declaring the connection as idle. -func (c *monitorConn) clearDeadline() { - deadline := atomic.LoadInt64(&c.deadline) - if deadline == 0 { - return - } - atomic.StoreInt64(&c.deadline, 0) -} diff --git a/pkg/ccl/sqlproxyccl/idle/monitor_conn_test.go b/pkg/ccl/sqlproxyccl/idle/monitor_conn_test.go deleted file mode 100644 index 07a52689b888..000000000000 --- a/pkg/ccl/sqlproxyccl/idle/monitor_conn_test.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package idle - -import ( - "context" - "net" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/stretchr/testify/require" -) - -func TestMonitorConn(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mon := NewMonitor(ctx, time.Second) - - conn := &testConn{Addr: "addr"} - wrapped := newMonitorConn(mon, conn) - defer func() { - _ = wrapped.Close() - }() - - wrapped.deadline = 1 - _, _ = wrapped.Read(nil) - require.Equal(t, int64(0), wrapped.deadline) - - wrapped.deadline = 1 - _, _ = wrapped.Write(nil) - require.Equal(t, int64(0), wrapped.deadline) -} - -type testConn struct { - net.Conn - Addr string -} - -func (c *testConn) RemoteAddr() net.Addr { - return &net.UnixAddr{Name: c.Addr, Net: "unix"} -} - -func (c *testConn) Read(b []byte) (n int, err error) { - return len(b), nil -} - -func (c *testConn) Write(b []byte) (n int, err error) { - return len(b), nil -} - -func (c *testConn) Close() error { - return nil -} diff --git a/pkg/ccl/sqlproxyccl/idle/monitor_test.go b/pkg/ccl/sqlproxyccl/idle/monitor_test.go deleted file mode 100644 index a53178576afb..000000000000 --- a/pkg/ccl/sqlproxyccl/idle/monitor_test.go +++ /dev/null @@ -1,213 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package idle - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/stretchr/testify/require" -) - -func TestMonitor(t *testing.T) { - const timeout = 100 * time.Millisecond - defer leaktest.AfterTest(t)() - - t.Run("subset of connections idled", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mon := NewMonitor(ctx, timeout) - - var idled int64 - createTestConns := func(addr string, n int) { - for i := 0; i < n; i++ { - conn := &testConn{Addr: addr} - wrapped := mon.DetectIdle(conn, func() { - if addr != "addr2" { - t.Errorf("did not expect %s connections to be idled", addr) - return - } - atomic.AddInt64(&idled, 1) - }) - t.Cleanup(func() { - _ = wrapped.Close() - }) - } - } - - createTestConns("addr1", 5) - createTestConns("addr2", 5) - createTestConns("addr3", 5) - - // Idle only connections to addr2. - mon.SetIdleChecks("addr2") - - require.Eventually(t, func() bool { - // >= 5 to handle race condition where the same connection is marked - // idle multiple times. - return atomic.LoadInt64(&idled) >= 5 - }, 10*time.Second, 10*time.Millisecond) - }) - - t.Run("don't idle busy connections", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mon := NewMonitor(ctx, timeout) - - wrapped1 := mon.DetectIdle(&testConn{Addr: "test"}, func() { - t.Error("did not expect conn #1 to be idled") - }) - - wrapped2 := mon.DetectIdle(&testConn{Addr: "test"}, func() { - t.Error("did not expect conn #2 to be idled") - }) - - var idled int64 - _ = mon.DetectIdle(&testConn{Addr: "test"}, func() { - atomic.AddInt64(&idled, 1) - }) - - mon.SetIdleChecks("test") - - require.Eventually(t, func() bool { - _, _ = wrapped1.Read(nil) - _, _ = wrapped2.Write(nil) - - // >= 1 to handle race condition where the same connection is marked - // idle multiple times. - return atomic.LoadInt64(&idled) >= 1 - }, 10*time.Second, 10*time.Millisecond) - }) - - t.Run("ClearIdleChecks", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mon := NewMonitor(ctx, timeout) - - _ = mon.DetectIdle(&testConn{Addr: "test"}, func() { - t.Error("did not expect connection to be idled") - }) - - require.Equal(t, 0, mon.countAddrsToCheck()) - mon.SetIdleChecks("test") - require.Equal(t, 1, mon.countAddrsToCheck()) - mon.ClearIdleChecks("test") - require.Equal(t, 0, mon.countAddrsToCheck()) - }) - - t.Run("close wrapped connection", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mon := NewMonitor(ctx, timeout) - - wrapped1 := mon.DetectIdle(&testConn{Addr: "test"}, func() {}) - wrapped2 := mon.DetectIdle(&testConn{Addr: "test"}, func() {}) - _ = mon.DetectIdle(&testConn{Addr: "test2"}, func() {}) - - require.Equal(t, 2, mon.countConnsToAddr("test")) - - require.Equal(t, 0, mon.countAddrsToCheck()) - mon.SetIdleChecks("test") - require.Equal(t, 1, mon.countAddrsToCheck()) - - // Closing the wrapped connection should remove it from the set of - // monitored connections. - wrapped1.Close() - require.Equal(t, 1, mon.countConnsToAddr("test")) - require.Equal(t, 1, mon.countAddrsToCheck()) - - wrapped2.Close() - require.Equal(t, 0, mon.countConnsToAddr("test")) - require.Equal(t, 1, mon.countAddrsToCheck()) - - mon.ClearIdleChecks("test") - require.Equal(t, 0, mon.countAddrsToCheck()) - }) -} - -func TestMonitorStress(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - const numConns = 100 - const timeout = 100 * time.Millisecond - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mon := NewMonitor(ctx, timeout) - - var wg sync.WaitGroup - var idled [numConns]int64 - for i := 0; i < numConns; i++ { - wg.Add(1) - go func(i int) { - addr := fmt.Sprintf("addr%d", i%3) - log.Infof(ctx, "Start connection %s", addr) - - conn := &testConn{Addr: addr} - wrapped := mon.DetectIdle(conn, func() { - atomic.StoreInt64(&idled[i], 1) - }) - t.Cleanup(func() { - _ = wrapped.Close() - }) - - wg.Done() - - // Send traffic on 1/2 the connections. - if i%2 == 0 { - for { - // Read 1/2 the connections, write the other 1/2. - if i%4 == 0 { - _, _ = wrapped.Read(nil) - } else { - _, _ = wrapped.Write(nil) - } - time.Sleep(timeout / 10) - if ctx.Err() != nil { - break - } - } - } - }(i) - } - - // Wait for all goroutines to get running. - wg.Wait() - - // Detect idleness on 2 of 3 addresses. - log.Info(ctx, "SetIdleChecks") - mon.SetIdleChecks("addr0") - mon.SetIdleChecks("addr2") - - // Let the background routines run for a bit. - time.Sleep(timeout * 3) - - // Cancel the traffic and ensure the right number of connections have been - // idled. - log.Info(ctx, "Cancel") - cancel() - for i := 0; i < numConns; i++ { - idled := int(atomic.LoadInt64(&idled[i])) - - // Only odd connections are idled, and only if they are not targeting - // addr1. - if i%2 == 0 || i%3 == 1 { - require.Equal(t, 0, idled, "connection #%d", i) - } else { - require.Equal(t, 1, idled, "connection #%d", i) - } - } -} diff --git a/pkg/ccl/sqlproxyccl/metrics.go b/pkg/ccl/sqlproxyccl/metrics.go index da0253fd7b24..fa8f572aeace 100644 --- a/pkg/ccl/sqlproxyccl/metrics.go +++ b/pkg/ccl/sqlproxyccl/metrics.go @@ -200,8 +200,6 @@ func (metrics *metrics) updateForError(err error) { metrics.BackendDisconnectCount.Inc(1) case codeClientDisconnected: metrics.ClientDisconnectCount.Inc(1) - case codeIdleDisconnect: - metrics.IdleDisconnectCount.Inc(1) case codeProxyRefusedConnection: metrics.RefusedConnCount.Inc(1) metrics.BackendDownCount.Inc(1) diff --git a/pkg/ccl/sqlproxyccl/proxy.go b/pkg/ccl/sqlproxyccl/proxy.go index 726c0a912339..781a93310e22 100644 --- a/pkg/ccl/sqlproxyccl/proxy.go +++ b/pkg/ccl/sqlproxyccl/proxy.go @@ -41,7 +41,6 @@ func toPgError(err error) *pgproto3.ErrorResponse { codeBackendDisconnected, codeAuthFailed, codeProxyRefusedConnection, - codeIdleDisconnect, codeUnavailable: msg = codeErr.Error() // The rest - the message sent back is sanitized. @@ -49,16 +48,9 @@ func toPgError(err error) *pgproto3.ErrorResponse { msg = "server requires encryption" } - var pgCode string - if codeErr.code == codeIdleDisconnect { - pgCode = pgcode.AdminShutdown.String() - } else { - pgCode = pgcode.SQLserverRejectedEstablishmentOfSQLconnection.String() - } - return &pgproto3.ErrorResponse{ Severity: "FATAL", - Code: pgCode, + Code: pgcode.SQLserverRejectedEstablishmentOfSQLconnection.String(), Message: msg, Hint: errors.FlattenHints(err), } diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index 9fe39f72de57..9c85db56e4ce 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist" - "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/idle" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler" @@ -96,9 +95,6 @@ type ProxyOptions struct { // PollConfigInterval defines polling interval for pickup up changes in // config file. PollConfigInterval time.Duration - // DrainTimeout if set, will close DRAINING connections that have been idle - // for this duration. - DrainTimeout time.Duration // ThrottleBaseDelay is the initial exponential backoff triggered in // response to the first connection failure. ThrottleBaseDelay time.Duration @@ -135,9 +131,6 @@ type proxyHandler struct { // throttleService will do throttling of incoming connection requests. throttleService throttler.Service - // idleMonitor will detect idle connections to DRAINING pods. - idleMonitor *idle.Monitor - // directoryCache is used to resolve tenants to their IP addresses. directoryCache tenant.DirectoryCache @@ -227,18 +220,9 @@ func newProxyHandler( _ = conn.Close() // nolint:grpcconnclose })) - // If a drain timeout has been specified, then start the idle monitor and - // the pod watcher. When a pod enters the DRAINING state, the pod watcher - // will set the idle monitor to detect connections without activity and - // terminate them. var dirOpts []tenant.DirOption - if options.DrainTimeout != 0 { - handler.idleMonitor = idle.NewMonitor(ctx, options.DrainTimeout) - - podWatcher := make(chan *tenant.Pod) - go handler.startPodWatcher(ctx, podWatcher) - dirOpts = append(dirOpts, tenant.PodWatcher(podWatcher)) - } + podWatcher := make(chan *tenant.Pod) + dirOpts = append(dirOpts, tenant.PodWatcher(podWatcher)) if handler.testingKnobs.dirOpts != nil { dirOpts = append(dirOpts, handler.testingKnobs.dirOpts...) } @@ -256,6 +240,10 @@ func newProxyHandler( return nil, err } + // Only start the pod watcher once everything has been initialized. This + // will depend on the balancer eventually. + go handler.startPodWatcher(ctx, podWatcher) + return &handler, nil } @@ -346,19 +334,6 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn connector.TLSConfig = &tls.Config{InsecureSkipVerify: handler.SkipVerify} } - // Monitor for idle connection, if requested. - if handler.idleMonitor != nil { - connector.IdleMonitorWrapperFn = func(serverConn net.Conn) net.Conn { - return handler.idleMonitor.DetectIdle(serverConn, func() { - err := newErrorf(codeIdleDisconnect, "idle connection closed") - select { - case errConnection <- err: /* error reported */ - default: /* the channel already contains an error */ - } - }) - } - } - f := newForwarder(ctx, connector, handler.metrics, nil /* timeSource */) defer f.Close() @@ -420,7 +395,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn case err := <-f.errCh: // From forwarder. handler.metrics.updateForError(err) return err - case err := <-errConnection: // From denyListWatcher or idleMonitor. + case err := <-errConnection: // From denyListWatcher. handler.metrics.updateForError(err) return err case <-handler.stopper.ShouldQuiesce(): @@ -435,20 +410,18 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn // are subject to an idle timeout that closes them after a short period of // inactivity. If a pod transitions back to the RUNNING state or to the DELETING // state, then the idle timeout needs to be cleared. +// +// TODO(jaylim-crl): Update comment above. func (handler *proxyHandler) startPodWatcher(ctx context.Context, podWatcher chan *tenant.Pod) { for { select { case <-ctx.Done(): return - case pod := <-podWatcher: + case <-podWatcher: // TODO(jaylim-crl): Invoke rebalance logic here whenever we see // a new SQL pod. - if pod.State == tenant.DRAINING { - handler.idleMonitor.SetIdleChecks(pod.Addr) - } else { - // Clear idle checks either for RUNNING or DELETING. - handler.idleMonitor.ClearIdleChecks(pod.Addr) - } + // + // Do nothing for now. } } } diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 41a54f74104d..0e88996c057a 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -679,14 +679,12 @@ func TestDirectoryConnect(t *testing.T) { // New proxy server using the directory. Define both the directory and the // routing rule so that fallback to the routing rule can be tested. - const drainTimeout = 200 * time.Millisecond opts := &ProxyOptions{ RoutingRule: srv.ServingSQLAddr(), DirectoryAddr: tdsAddr.String(), Insecure: true, - DrainTimeout: drainTimeout, } - proxy, addr := newProxyServer(ctx, t, srv.Stopper(), opts) + _, addr := newProxyServer(ctx, t, srv.Stopper(), opts) t.Run("fallback when tenant not found", func(t *testing.T) { url := fmt.Sprintf( @@ -763,35 +761,6 @@ func TestDirectoryConnect(t *testing.T) { return true }, 30*time.Second, 100*time.Millisecond) }) - - t.Run("drain connection", func(t *testing.T) { - url := fmt.Sprintf("postgres://root:admin@%s/?sslmode=disable&options=--cluster=tenant-cluster-28", addr) - te.TestConnect(ctx, t, url, func(conn *pgx.Conn) { - // The current connection count can take a bit of time to drop to 1, - // since the previous successful connection asynchronously closes. - // PGX cuts the connection on the client side, but it can take time - // for the proxy to get the notification and react. - require.Eventually(t, func() bool { - return proxy.metrics.CurConnCount.Value() == 1 - }, 10*time.Second, 10*time.Millisecond) - - // Connection should be forcefully terminated after the drain timeout, - // even though it's being continuously used. - require.Eventually(t, func() bool { - // Trigger drain of connections. Do this repeatedly inside the - // loop in order to avoid race conditions where the proxy is not - // yet hooked up to the directory server (and thus misses any - // one-time DRAIN notifications). - tds2.Drain() - - // Run query until it fails (because connection was closed). - return runTestQuery(ctx, conn) != nil - }, 30*time.Second, 5*drainTimeout) - - // Ensure failure was due to forced drain disconnection. - require.Equal(t, int64(1), proxy.metrics.IdleDisconnectCount.Count()) - }) - }) } func TestConnectionMigration(t *testing.T) { diff --git a/pkg/cli/cliflags/flags_mt.go b/pkg/cli/cliflags/flags_mt.go index 583ca72d111b..731e79751eca 100644 --- a/pkg/cli/cliflags/flags_mt.go +++ b/pkg/cli/cliflags/flags_mt.go @@ -91,11 +91,6 @@ var ( Description: "Polling interval changes in config file.", } - DrainTimeout = FlagInfo{ - Name: "drain-timeout", - Description: "Close DRAINING connections idle for this duration.", - } - TestDirectoryListenPort = FlagInfo{ Name: "port", Description: "Test directory server binds and listens on this port.", diff --git a/pkg/cli/context.go b/pkg/cli/context.go index 148a2f66d65a..78839a907d0b 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -649,7 +649,6 @@ func setProxyContextDefaults() { proxyContext.RatelimitBaseDelay = 50 * time.Millisecond proxyContext.ValidateAccessInterval = 30 * time.Second proxyContext.PollConfigInterval = 30 * time.Second - proxyContext.DrainTimeout = 0 proxyContext.ThrottleBaseDelay = time.Second } diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index fce8381d614f..b6506c3374d3 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -1032,7 +1032,6 @@ func init() { boolFlag(f, &proxyContext.Insecure, cliflags.InsecureBackend) durationFlag(f, &proxyContext.ValidateAccessInterval, cliflags.ValidateAccessInterval) durationFlag(f, &proxyContext.PollConfigInterval, cliflags.PollConfigInterval) - durationFlag(f, &proxyContext.DrainTimeout, cliflags.DrainTimeout) durationFlag(f, &proxyContext.ThrottleBaseDelay, cliflags.ThrottleBaseDelay) } // Multi-tenancy test directory command flags.