diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 6bf1fcb10331..e437a062cc37 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -26,7 +26,7 @@ ALL_TESTS = [ "//pkg/ccl/sqlproxyccl/cache:cache_test", "//pkg/ccl/sqlproxyccl/denylist:denylist_test", "//pkg/ccl/sqlproxyccl/idle:idle_test", - "//pkg/ccl/sqlproxyccl/tenantdirsvr:tenantdirsvr_test", + "//pkg/ccl/sqlproxyccl/tenant:tenant_test", "//pkg/ccl/sqlproxyccl/throttler:throttler_test", "//pkg/ccl/sqlproxyccl:sqlproxyccl_test", "//pkg/ccl/storageccl/engineccl:engineccl_test", diff --git a/pkg/ccl/sqlproxyccl/BUILD.bazel b/pkg/ccl/sqlproxyccl/BUILD.bazel index 8c4f2370770f..af3926bab327 100644 --- a/pkg/ccl/sqlproxyccl/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/BUILD.bazel @@ -38,6 +38,8 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_jackc_pgproto3_v2//:pgproto3", "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", ], ) @@ -51,17 +53,13 @@ go_test( "proxy_handler_test.go", "server_test.go", ], - data = [ - ":testserver.crt", - ":testserver.key", - ":testserver_config.cnf", - ], + data = [":testdata"], embed = [":sqlproxyccl"], deps = [ "//pkg/base", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/sqlproxyccl/denylist", - "//pkg/ccl/sqlproxyccl/tenant", + "//pkg/ccl/sqlproxyccl/tenantdirsvr", "//pkg/ccl/utilccl", "//pkg/roachpb", "//pkg/security", diff --git a/pkg/ccl/sqlproxyccl/backend_dialer.go b/pkg/ccl/sqlproxyccl/backend_dialer.go index 70b37b760a47..f83c08cc04cf 100644 --- a/pkg/ccl/sqlproxyccl/backend_dialer.go +++ b/pkg/ccl/sqlproxyccl/backend_dialer.go @@ -42,8 +42,8 @@ var backendDial = func( return conn, nil } -// sslOverlay attempts to upgrade the PG connection to use SSL -// if a tls.Config is specified.. +// sslOverlay attempts to upgrade the PG connection to use SSL if a tls.Config +// is specified. func sslOverlay(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) { if tlsConfig == nil { return conn, nil diff --git a/pkg/ccl/sqlproxyccl/cache/cache.go b/pkg/ccl/sqlproxyccl/cache/cache.go index a5c1553ad823..cd566d26659b 100644 --- a/pkg/ccl/sqlproxyccl/cache/cache.go +++ b/pkg/ccl/sqlproxyccl/cache/cache.go @@ -39,7 +39,8 @@ type cappedConnCache struct { } } -// NewCappedConnCache returns a cache service that has a limit on the total entries. +// NewCappedConnCache returns a cache service that has a limit on the total +// entries. func NewCappedConnCache(maxMapSize int) ConnCache { c := &cappedConnCache{ maxMapSize: maxMapSize, diff --git a/pkg/ccl/sqlproxyccl/denylist/file.go b/pkg/ccl/sqlproxyccl/denylist/file.go index 67bdb768ec37..de389f4e6d6b 100644 --- a/pkg/ccl/sqlproxyccl/denylist/file.go +++ b/pkg/ccl/sqlproxyccl/denylist/file.go @@ -70,11 +70,12 @@ type Denylist struct { ctx context.Context } -// NewDenylistWithFile returns a new denylist that automatically watches updates to a file. -// Note: this currently does not return an error. This is by design, since even if we trouble -// initiating a denylist with file, we can always update the file with correct content during -// runtime. We don't want sqlproxy fail to start just because there's something wrong with -// contents of a denylist file. +// NewDenylistWithFile returns a new denylist that automatically watches updates +// to a file. +// Note: this currently does not return an error. This is by design, since even +// if we trouble initiating a denylist with file, we can always update the file +// with correct content during runtime. We don't want sqlproxy fail to start +// just because there's something wrong with contents of a denylist file. func NewDenylistWithFile(ctx context.Context, filename string, opts ...Option) *Denylist { ret := &Denylist{ pollingInterval: defaultPollingInterval, @@ -89,9 +90,10 @@ func NewDenylistWithFile(ctx context.Context, filename string, opts ...Option) * } err := ret.update(filename) if err != nil { - // don't return just yet; sqlproxy should be able to carry on without a proper denylist - // and we still have a chance to recover. - // TODO(ye): add monitoring for failed updates; we don't want silent failures + // don't return just yet; sqlproxy should be able to carry on without a + // proper denylist and we still have a chance to recover. + // TODO(ye): add monitoring for failed updates; we don't want silent + // failures. log.Errorf(ctx, "error when reading from file %s: %v", filename, err) } @@ -103,7 +105,8 @@ func NewDenylistWithFile(ctx context.Context, filename string, opts ...Option) * // Option allows configuration of a denylist service. type Option func(*Denylist) -// WithPollingInterval specifies interval between polling for config file changes. +// WithPollingInterval specifies interval between polling for config file +// changes. func WithPollingInterval(d time.Duration) Option { return func(dl *Denylist) { dl.pollingInterval = d @@ -163,7 +166,8 @@ func (dl *Denylist) Denied(entity DenyEntity) (*Entry, error) { func (dl *Denylist) watchForUpdate(filename string) { go func() { // TODO(ye): use notification via SIGHUP instead. - // TODO(ye): use inotify or similar mechanism for watching file updates instead of polling. + // TODO(ye): use inotify or similar mechanism for watching file updates + // instead of polling. t := timeutil.NewTimer() defer t.Stop() for { diff --git a/pkg/ccl/sqlproxyccl/denylist/mocks_generated.go b/pkg/ccl/sqlproxyccl/denylist/mocks_generated.go deleted file mode 100644 index d40c42f4982b..000000000000 --- a/pkg/ccl/sqlproxyccl/denylist/mocks_generated.go +++ /dev/null @@ -1,49 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: service.go - -// Package denylist is a generated GoMock package. -package denylist - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockService is a mock of Service interface. -type MockService struct { - ctrl *gomock.Controller - recorder *MockServiceMockRecorder -} - -// MockServiceMockRecorder is the mock recorder for MockService. -type MockServiceMockRecorder struct { - mock *MockService -} - -// NewMockService creates a new mock instance. -func NewMockService(ctrl *gomock.Controller) *MockService { - mock := &MockService{ctrl: ctrl} - mock.recorder = &MockServiceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockService) EXPECT() *MockServiceMockRecorder { - return m.recorder -} - -// Denied mocks base method. -func (m *MockService) Denied(entity DenyEntity) (*Entry, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Denied", entity) - ret0, _ := ret[0].(*Entry) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Denied indicates an expected call of Denied. -func (mr *MockServiceMockRecorder) Denied(entity interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Denied", reflect.TypeOf((*MockService)(nil).Denied), entity) -} diff --git a/pkg/ccl/sqlproxyccl/denylist/service.go b/pkg/ccl/sqlproxyccl/denylist/service.go index 5e15726b89c5..bfa3cea27d84 100644 --- a/pkg/ccl/sqlproxyccl/denylist/service.go +++ b/pkg/ccl/sqlproxyccl/denylist/service.go @@ -10,8 +10,6 @@ package denylist import "strings" -//go:generate mockgen -package=denylist -destination=mocks_generated.go -source=service.go . Service - // Entry records the reason for putting an item on the denylist. // TODO(spaskob): add codes for different denial reasons. type Entry struct { diff --git a/pkg/ccl/sqlproxyccl/frontend_admitter.go b/pkg/ccl/sqlproxyccl/frontend_admitter.go index 7811b1d5f323..4f74aa73c460 100644 --- a/pkg/ccl/sqlproxyccl/frontend_admitter.go +++ b/pkg/ccl/sqlproxyccl/frontend_admitter.go @@ -16,11 +16,11 @@ import ( ) // frontendAdmit is the default implementation of a frontend admitter. It can -// upgrade to an optional SSL connection, and will handle and verify -// the startup message received from the PG SQL client. -// The connection returned should never be nil in case of error. Depending -// on whether the error happened before the connection was upgraded to TLS or not -// it will either be the original or the TLS connection. +// upgrade to an optional SSL connection, and will handle and verify the startup +// message received from the PG SQL client. The connection returned should never +// be nil in case of error. Depending on whether the error happened before the +// connection was upgraded to TLS or not it will either be the original or the +// TLS connection. var frontendAdmit = func( conn net.Conn, incomingTLSConfig *tls.Config, ) (net.Conn, *pgproto3.StartupMessage, error) { @@ -38,9 +38,9 @@ var frontendAdmit = func( switch m.(type) { case *pgproto3.SSLRequest: case *pgproto3.CancelRequest: - // Ignore CancelRequest explicitly. We don't need to do this but it makes - // testing easier by avoiding a call to sendErrToClient on this path - // (which would confuse assertCtx). + // Ignore CancelRequest explicitly. We don't need to do this but it + // makes testing easier by avoiding a call to sendErrToClient on this + // path (which would confuse assertCtx). return conn, nil, nil default: code := codeUnexpectedInsecureStartupMessage diff --git a/pkg/ccl/sqlproxyccl/frontend_admitter_test.go b/pkg/ccl/sqlproxyccl/frontend_admitter_test.go index 5d6fc04f2418..f485d395a2fc 100644 --- a/pkg/ccl/sqlproxyccl/frontend_admitter_test.go +++ b/pkg/ccl/sqlproxyccl/frontend_admitter_test.go @@ -23,7 +23,7 @@ import ( ) func tlsConfig() (*tls.Config, error) { - cer, err := tls.LoadX509KeyPair("testserver.crt", "testserver.key") + cer, err := tls.LoadX509KeyPair("testdata/testserver.crt", "testdata/testserver.key") if err != nil { return nil, err } diff --git a/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go b/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go index edb074dbd7cb..a929ce0e5ef1 100644 --- a/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go +++ b/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go @@ -82,7 +82,8 @@ func benchmarkSocketRead(timeout time.Duration, b *testing.B) { } // No statistically significant difference in a single roundtrip time between -// using and not using deadline as implemented above. Both show the same value in my tests. +// using and not using deadline as implemented above. Both show the same value +// in my tests. // SocketReadWithDeadline-32 11.1µs ± 1% // SocketReadWithoutDeadline-32 11.0µs ± 3% func BenchmarkSocketReadWithoutDeadline(b *testing.B) { diff --git a/pkg/ccl/sqlproxyccl/metrics.go b/pkg/ccl/sqlproxyccl/metrics.go index bea2ff15d3de..7b5afdf2c012 100644 --- a/pkg/ccl/sqlproxyccl/metrics.go +++ b/pkg/ccl/sqlproxyccl/metrics.go @@ -13,8 +13,7 @@ import ( "github.com/cockroachdb/errors" ) -// metrics contains pointers to the metrics for monitoring proxy -// operations. +// metrics contains pointers to the metrics for monitoring proxy operations. type metrics struct { BackendDisconnectCount *metric.Counter IdleDisconnectCount *metric.Counter @@ -112,8 +111,8 @@ func makeProxyMetrics() metrics { } } -// updateForError updates the metrics relevant for the type of the -// error message. +// updateForError updates the metrics relevant for the type of the error +// message. func (metrics *metrics) updateForError(err error) { if err == nil { return diff --git a/pkg/ccl/sqlproxyccl/proxy.go b/pkg/ccl/sqlproxyccl/proxy.go index cdd275b73e61..2f6484ccfbf6 100644 --- a/pkg/ccl/sqlproxyccl/proxy.go +++ b/pkg/ccl/sqlproxyccl/proxy.go @@ -87,10 +87,10 @@ func connectionCopy(crdbConn, conn net.Conn) error { select { // NB: when using pgx, we see a nil errIncoming first on clean connection // termination. Using psql I see a nil errOutgoing first. I think the PG - // protocol stipulates sending a message to the server at which point - // the server closes the connection (errIncoming), but presumably the - // client gets to close the connection once it's sent that message, - // meaning either case is possible. + // protocol stipulates sending a message to the server at which point the + // server closes the connection (errIncoming), but presumably the client + // gets to close the connection once it's sent that message, meaning either + // case is possible. case err := <-errIncoming: if err == nil { return nil diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index 5e877aa731ac..c650679750da 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -34,6 +34,8 @@ import ( "github.com/cockroachdb/logtags" "github.com/jackc/pgproto3/v2" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ( @@ -65,13 +67,14 @@ type ProxyOptions struct { Denylist string // ListenAddr is the listen address for incoming connections. ListenAddr string - // ListenCert is the file containing PEM-encoded x509 certificate for listen address. - // Set to "*" to auto-generate self-signed cert. + // ListenCert is the file containing PEM-encoded x509 certificate for listen + // address. Set to "*" to auto-generate self-signed cert. ListenCert string // ListenKey is the file containing PEM-encoded x509 key for listen address. // Set to "*" to auto-generate self-signed cert. ListenKey string - // MetricsAddress is the listen address for incoming connections for metrics retrieval. + // MetricsAddress is the listen address for incoming connections for metrics + // retrieval. MetricsAddress string // SkipVerify if set will skip the identity verification of the // backend. This is for testing only. @@ -82,9 +85,9 @@ type ProxyOptions struct { // connection. Optionally use '{{clusterName}}' // which will be substituted with the cluster name. RoutingRule string - // DirectoryAddr specified optional {HOSTNAME}:{PORT} for service that does the resolution - // from backend id to IP address. If specified - it will be used instead of the - // routing rule above. + // DirectoryAddr specified optional {HOSTNAME}:{PORT} for service that does + // the resolution from backend id to IP address. If specified - it will be + // used instead of the routing rule above. DirectoryAddr string // RatelimitBaseDelay is the initial backoff after a failed login attempt. // Set to 0 to disable rate limiting. @@ -92,9 +95,11 @@ type ProxyOptions struct { // ValidateAccessInterval is the time interval between validations, confirming // that current connections are still valid. ValidateAccessInterval time.Duration - // PollConfigInterval defines polling interval for pickup up changes in config file. + // PollConfigInterval defines polling interval for pickup up changes in + // config file. PollConfigInterval time.Duration - // IdleTimeout if set, will close connections that have been idle for that duration. + // IdleTimeout if set, will close connections that have been idle for that + // duration. IdleTimeout time.Duration } @@ -147,8 +152,12 @@ func newProxyHandler( } ctx, _ = stopper.WithCancelOnQuiesce(ctx) - handler.denyListService = denylist.NewDenylistWithFile(ctx, options.Denylist, - denylist.WithPollingInterval(options.PollConfigInterval)) + + // If denylist functionality is requested, create the denylist service. + if options.Denylist != "" { + handler.denyListService = denylist.NewDenylistWithFile(ctx, options.Denylist, + denylist.WithPollingInterval(options.PollConfigInterval)) + } handler.throttleService = throttler.NewLocalService(throttler.WithBaseDelay(options.RatelimitBaseDelay)) handler.connCache = cache.NewCappedConnCache(maxKnownConnCacheSize) @@ -181,7 +190,7 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error } // This currently only happens for CancelRequest type of startup messages - // that we don't support + // that we don't support. if msg == nil { return nil } @@ -221,15 +230,30 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error var crdbConn net.Conn var outgoingAddress string - retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond, MaxBackoff: time.Second} + + // Repeatedly try to make a connection. Any failures are assumed to be + // transient unless the tenant cannot be found (e.g. because it was + // deleted). We will simply loop forever, or until the context is canceled + // (e.g. by client disconnect). This is preferable to terminating client + // connections, because in most cases those connections will simply be + // retried, further increasing load on the system. + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Millisecond, + MaxBackoff: 5 * time.Second, + } + outgoingAddressErr := log.Every(time.Minute) backendDialErr := log.Every(time.Minute) reportFailureErr := log.Every(time.Minute) var outgoingAddressErrs, codeBackendDownErrs, reportFailureErrs int + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + // Get the DNS/IP address of the backend server to dial. outgoingAddress, err = handler.outgoingAddress(ctx, clusterName, tenID) if err != nil { - if err.Error() != "not found" { + // Failure is assumed to be transient (and should be retried) except + // in case where the server was not found. + if status.Code(err) != codes.NotFound { outgoingAddressErrs++ if outgoingAddressErr.ShouldLog() { log.Ops.Errorf(ctx, @@ -241,19 +265,24 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error } continue } - clientErr := newErrorf(codeParamsRoutingFailed, "cluster %s-%d not found", clusterName, tenID.ToUint64()) - updateMetricsAndSendErrToClient(clientErr, conn, handler.metrics) + + // Remap error for external consumption. + err = newErrorf( + codeParamsRoutingFailed, "cluster %s-%d not found", clusterName, tenID.ToUint64()) break } + // Now actually dial the backend server. crdbConn, err = backendDial(backendStartupMsg, outgoingAddress, TLSConf) - // If we get a backend down error and are using the directory - report the - // error to the directory and retry the connection. - codeErr := (*codeError)(nil) - if err != nil && - errors.As(err, &codeErr) && - codeErr.code == codeBackendDown && - handler.directory != nil { + + // If we get a backend down error, retry the connection. + var codeErr *codeError + if err != nil && errors.As(err, &codeErr) && codeErr.code == codeBackendDown { + if handler.directory == nil { + // Don't retry unless directory is enabled (for now). + break + } + codeBackendDownErrs++ if backendDialErr.ShouldLog() { log.Ops.Errorf(ctx, @@ -263,6 +292,9 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error ) codeBackendDownErrs = 0 } + + // Report the failure to the directory so that it can refresh any + // stale information that may have caused the problem. err = handler.directory.ReportFailure(ctx, tenID, outgoingAddress) if err != nil { reportFailureErrs++ @@ -285,10 +317,12 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error return err } + // Set up the idle timeout monitor. crdbConn = idle.DisconnectOverlay(crdbConn, handler.IdleTimeout) defer func() { _ = crdbConn.Close() }() + // Perform user authentication. if err := authenticate(conn, crdbConn); err != nil { handler.metrics.updateForError(err) log.Ops.Errorf(ctx, "authenticate: %s", err) @@ -307,9 +341,9 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error ctx, cancel := context.WithCancel(ctx) defer cancel() - // TODO(darinpp): starting a new go routine for every connection here is inefficient. - // Change to maintain a map of connections by IP/tenant and have single - // go routine that closes connections that are blocklisted. + // TODO(darinpp): starting a new go routine for every connection here is + // inefficient. Change to maintain a map of connections by IP/tenant and + // have single go routine that closes connections that are blocklisted. go func() { errExpired <- func(ctx context.Context) error { t := timeutil.NewTimer() @@ -336,6 +370,8 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error log.Infof(ctx, "closing after %.2fs", timeutil.Since(connBegin).Seconds()) }() + // Copy all pgwire messages from frontend to backend connection until we + // encounter an error or shutdown signal. go func() { err := connectionCopy(crdbConn, conn) errConnectionCopy <- err @@ -374,7 +410,7 @@ func (handler *proxyHandler) outgoingAddress( return addr, nil } - // This doesn't verify the name part of the tenant and relies just on the int id. + // Lookup tenant in directory. addr, err := handler.directory.EnsureTenantIP(ctx, tenID, name) if err != nil { return "", err @@ -406,8 +442,12 @@ func (handler *proxyHandler) validateAccessAndThrottle( func (handler *proxyHandler) validateAccess( ctx context.Context, tenID roachpb.TenantID, ipAddr string, ) error { - // First validate against the deny list service + // Validate against the denylist service if it exists. list := handler.denyListService + if list == nil { + return nil + } + if entry, err := list.Denied(denylist.DenyEntity{Item: tenID.String(), Type: denylist.ClusterType}); err != nil { // Log error but don't return since this could be transient. log.Errorf(ctx, "could not consult denied list for tenant: %d", tenID.ToUint64()) diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index b54dd41305c2..dd842bae9116 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -23,7 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist" - "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" @@ -71,12 +71,12 @@ func newSecureProxyServer( ) (server *Server, addr string) { // Created via: const _ = ` -openssl genrsa -out testserver.key 2048 -openssl req -new -x509 -sha256 -key testserver.key -out testserver.crt \ - -days 3650 -config testserver_config.cnf +openssl genrsa -out testdata/testserver.key 2048 +openssl req -new -x509 -sha256 -key testdata/testserver.key -out testdata/testserver.crt \ + -days 3650 -config testdata/testserver_config.cnf ` - opts.ListenKey = "testserver.key" - opts.ListenCert = "testserver.crt" + opts.ListenKey = "testdata/testserver.key" + opts.ListenCert = "testdata/testserver.crt" return newProxyServer(ctx, t, stopper, opts) } @@ -204,11 +204,11 @@ func TestFailedConnection(t *testing.T) { } ac.assertConnectErr( - ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-ca&sslrootcert=testserver.crt", + ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-ca&sslrootcert=testdata/testserver.crt", codeBackendDown, "unable to reach backend SQL server", ) ac.assertConnectErr( - ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-full&sslrootcert=testserver.crt", + ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-full&sslrootcert=testdata/testserver.crt", codeBackendDown, "unable to reach backend SQL server", ) require.Equal(t, int64(4), s.metrics.BackendDownCount.Count()) @@ -595,7 +595,10 @@ func TestDenylistUpdate(t *testing.T) { defer sql.Stopper().Stop(ctx) - s, addr := newSecureProxyServer(ctx, t, sql.Stopper(), &ProxyOptions{Denylist: denyList.Name()}) + s, addr := newSecureProxyServer(ctx, t, sql.Stopper(), &ProxyOptions{ + Denylist: denyList.Name(), + PollConfigInterval: 10 * time.Millisecond, + }) defer func() { _ = os.Remove(denyList.Name()) }() url := fmt.Sprintf("postgres://root:admin@%s/defaultdb_29?sslmode=require&options=--cluster=dim-dog-28", addr) @@ -684,11 +687,11 @@ func newDirectoryServer( return err == nil }, 30*time.Second, time.Second) require.NoError(t, err) - tds, err := tenant.NewTestDirectoryServer(tdsStopper) + tds, err := tenantdirsvr.New(tdsStopper) require.NoError(t, err) - tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenant.Process, error) { + tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenantdirsvr.Process, error) { log.TestingClearServerIdentifiers() - tenantStopper := tenant.NewSubStopper(tdsStopper) + tenantStopper := tenantdirsvr.NewSubStopper(tdsStopper) ten, err := srv.StartTenant(ctx, base.TestTenantArgs{ Existing: true, TenantID: roachpb.MakeTenantID(tenantID), @@ -699,7 +702,7 @@ func newDirectoryServer( sqlAddr, err := net.ResolveTCPAddr("tcp", ten.SQLAddr()) require.NoError(t, err) ten.PGServer().(*pgwire.Server).TestingSetTrustClientProvidedRemoteAddr(true) - return &tenant.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil + return &tenantdirsvr.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil } go func() { require.NoError(t, tds.Serve(listener)) }() return tdsStopper, listener.Addr().(*net.TCPAddr) diff --git a/pkg/ccl/sqlproxyccl/server.go b/pkg/ccl/sqlproxyccl/server.go index e772e8e9959a..e5ac4b56a43e 100644 --- a/pkg/ccl/sqlproxyccl/server.go +++ b/pkg/ccl/sqlproxyccl/server.go @@ -29,9 +29,9 @@ import ( // individual new incoming connection. type proxyConnHandler func(ctx context.Context, proxyConn *conn) error -// Server is a TCP server that proxies SQL connections to a -// configurable backend. It may also run an HTTP server to expose a -// health check and prometheus metrics. +// Server is a TCP server that proxies SQL connections to a configurable +// backend. It may also run an HTTP server to expose a health check and +// prometheus metrics. type Server struct { Stopper *stop.Stopper connHandler proxyConnHandler diff --git a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel index 243234607fa9..0a6350862f17 100644 --- a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel @@ -1,5 +1,5 @@ load("@bazel_gomock//:gomock.bzl", "gomock") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") load("@rules_proto//proto:defs.bzl", "proto_library") @@ -25,7 +25,6 @@ go_library( srcs = [ "directory.go", "entry.go", - "test_directory_svr.go", ":mocks_tenant", # keep ], embed = [":tenant_go_proto"], @@ -39,9 +38,8 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_logtags//:logtags", "@com_github_golang_mock//gomock", # keep - "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes", "@org_golang_google_grpc//metadata", # keep "@org_golang_google_grpc//status", ], @@ -58,3 +56,34 @@ gomock( package = "tenant", self_package = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant", ) + +go_test( + name = "tenant_test", + srcs = [ + "directory_test.go", + "main_test.go", + ], + deps = [ + ":tenant", + "//pkg/base", + "//pkg/ccl", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/sqlproxyccl/tenantdirsvr", + "//pkg/ccl/utilccl", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "//pkg/util/stop", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + ], +) diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.go b/pkg/ccl/sqlproxyccl/tenant/directory.go index efc46470a262..def7a283f3ac 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.go @@ -19,12 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -//go:generate mockgen -package=tenant -destination=mocks_generated.go . DirectoryClient,Directory_WatchEndpointsClient - // dirOptions control the behavior of tenant.Directory. type dirOptions struct { deterministic bool @@ -36,9 +34,9 @@ type dirOptions struct { type DirOption func(opts *dirOptions) // RefreshDelay specifies the minimum amount of time that must elapse between -// attempts to refresh endpoints for a given tenant after ReportFailure is called. -// This delay has the effect of throttling calls to directory server, in order to -// avoid overloading it. +// attempts to refresh endpoints for a given tenant after ReportFailure is +// called. This delay has the effect of throttling calls to directory server, in +// order to avoid overloading it. // // RefreshDelay defaults to 100ms. Use -1 to never throttle. func RefreshDelay(delay time.Duration) func(opts *dirOptions) { @@ -47,14 +45,15 @@ func RefreshDelay(delay time.Duration) func(opts *dirOptions) { } } -// Directory tracks the network locations of SQL tenant processes. It is used by the -// sqlproxy to route incoming traffic to the correct backend process. Process -// information is populated and kept relatively up-to-date using a streaming watcher. -// However, since watchers deliver slightly stale information, the directory will also make -// direct server calls to fetch the latest information about a process that is not yet -// in the cache, or when a process is suspected to have failed. When a new tenant is -// created, or is resumed from suspension, this capability allows the directory -// to immediately return the IP address for the new process. +// Directory tracks the network locations of SQL tenant processes. It is used by +// the sqlproxy to route incoming traffic to the correct backend process. +// Process information is populated and kept relatively up-to-date using a +// streaming watcher. However, since watchers deliver slightly stale +// information, the directory will also make direct server calls to fetch the +// latest information about a process that is not yet in the cache, or when a +// process is suspected to have failed. When a new tenant is created, or is +// resumed from suspension, this capability allows the directory to immediately +// return the IP address for the new process. // // All methods in the directory are thread-safe. Methods are intended to be // called concurrently by many threads at once, and so locking is carefully @@ -63,7 +62,8 @@ func RefreshDelay(delay time.Duration) func(opts *dirOptions) { // has its own locks that are used to synchronize per-tenant operations such as // making directory server calls to fetch updated tenant information. type Directory struct { - // client is the directory client instance used to make directory server calls. + // client is the directory client instance used to make directory server + // calls. client DirectoryClient // stopper use used for graceful shutdown of the endpoint watcher. @@ -73,8 +73,8 @@ type Directory struct { options dirOptions // mut synchronizes access to the in-memory tenant entry caches. Take care - // to never hold this lock during directory server calls - it should only be used - // while adding and removing tenant entries to/from the caches. + // to never hold this lock during directory server calls - it should only be + // used while adding and removing tenant entries to/from the caches. mut struct { syncutil.Mutex @@ -85,11 +85,12 @@ type Directory struct { } } -// NewDirectory constructs a new Directory instance that tracks SQL tenant processes -// managed by a given Directory server. The given context is used for tracing -// endpoint watcher activity. +// NewDirectory constructs a new Directory instance that tracks SQL tenant +// processes managed by a given Directory server. The given context is used for +// tracing endpoint watcher activity. // -// NOTE: stopper.Stop must be called on the directory when it is no longer needed. +// NOTE: stopper.Stop must be called on the directory when it is no longer +// needed. func NewDirectory( ctx context.Context, stopper *stop.Stopper, client DirectoryClient, opts ...DirOption, ) (*Directory, error) { @@ -112,17 +113,19 @@ func NewDirectory( return dir, nil } -// EnsureTenantIP returns the IP address of one of the given tenant's SQL processes. -// If the tenant was just created or is suspended, such that there are no -// available processes, then EnsureTenantIP will trigger resumption of a new instance and -// block until the process is ready. If there are multiple processes for -// the tenant, then LookupTenantIPs will choose one of them (note that currently -// there is always at most one SQL process per tenant). +// EnsureTenantIP returns the IP address of one of the given tenant's SQL +// processes. If the tenant was just created or is suspended, such that there +// are no available processes, then EnsureTenantIP will trigger resumption of a +// new instance and block until the process is ready. If there are multiple +// processes for the tenant, then LookupTenantIPs will choose one of them (note +// that currently there is always at most one SQL process per tenant). // -// If clusterName is non-empty, then an error is returned if no endpoints match the -// cluster name. This can be used to ensure that the incoming SQL connection "knows" -// some additional information about the tenant, such as the name of the -// cluster, before being allowed to connect. +// If clusterName is non-empty, then a GRPC NotFound error is returned if no +// endpoints match the cluster name. This can be used to ensure that the +// incoming SQL connection "knows" some additional information about the tenant, +// such as the name of the cluster, before being allowed to connect. Similarly, +// if the tenant does not exist (e.g. because it was deleted), EnsureTenantIP +// returns a GRPC NotFound error. func (d *Directory) EnsureTenantIP( ctx context.Context, tenantID roachpb.TenantID, clusterName string, ) (string, error) { @@ -134,23 +137,28 @@ func (d *Directory) EnsureTenantIP( // Check the cluster name matches, if specified. if clusterName != "" && clusterName != entry.ClusterName { + // Return a GRPC NotFound error. log.Errorf(ctx, "cluster name %s doesn't match expected %s", clusterName, entry.ClusterName) - return "", errors.New("not found") + return "", status.Errorf(codes.NotFound, + "cluster name %s doesn't match expected %s", clusterName, entry.ClusterName) } + ctx, _ = d.stopper.WithCancelOnQuiesce(ctx) ip, err := entry.ChooseEndpointIP(ctx, d.client, d.options.deterministic) if err != nil { - if s, ok := status.FromError(err); ok && s.Message() == "not found" { + if status.Code(err) == codes.NotFound { d.deleteEntry(tenantID) } } return ip, err } -// LookupTenantIPs returns the IP addresses for all available SQL processes for the -// given tenant. It returns an error if the tenant has not yet been created. If -// no processes are available for the tenant, LookupTenantIPs will return the empty -// set (unlike EnsureTenantIP). +// LookupTenantIPs returns the IP addresses for all available SQL processes for +// the given tenant. It returns a GRPC NotFound error if the tenant does not +// exist (e.g. it has not yet been created) or if it has not yet been fetched +// into the directory's cache (LookupTenantIPs will never attempt to fetch it). +// If no processes are available for the tenant, LookupTenantIPs will return the +// empty set (unlike EnsureTenantIP). func (d *Directory) LookupTenantIPs( ctx context.Context, tenantID roachpb.TenantID, ) ([]string, error) { @@ -161,7 +169,8 @@ func (d *Directory) LookupTenantIPs( } if entry == nil { - return nil, errors.New("not found") + return nil, status.Errorf( + codes.NotFound, "tenant %d not in directory cache", tenantID.ToUint64()) } return entry.getEndpointIPs(), nil } @@ -171,9 +180,9 @@ func (d *Directory) LookupTenantIPs( // ReportFailure will attempt to refresh the cache with the latest information // about available tenant processes. // TODO(andyk): In the future, the ip parameter will be used to mark a -// particular endpoint as "unhealthy" so that it's less likely to be chosen. However, -// today there can be at most one endpoint for a given tenant, so it must always be -// chosen. Keep the parameter as a placeholder for the future. +// particular endpoint as "unhealthy" so that it's less likely to be chosen. +// However, today there can be at most one endpoint for a given tenant, so it +// must always be chosen. Keep the parameter as a placeholder for the future. func (d *Directory) ReportFailure(ctx context.Context, tenantID roachpb.TenantID, ip string) error { entry, err := d.getEntry(ctx, tenantID, false /* allowCreate */) if err != nil { @@ -253,16 +262,16 @@ func (d *Directory) deleteEntry(tenantID roachpb.TenantID) { delete(d.mut.tenants, tenantID) } -// watchEndpoints establishes a watcher that looks for changes to tenant endpoint addresses. -// Whenever tenant processes start or terminate, the watcher will get -// a notification and update the directory to reflect that change. +// watchEndpoints establishes a watcher that looks for changes to tenant +// endpoint addresses. Whenever tenant processes start or terminate, the watcher +// will get a notification and update the directory to reflect that change. func (d *Directory) watchEndpoints(ctx context.Context, stopper *stop.Stopper) error { req := WatchEndpointsRequest{} - // The loop that processes the event stream is running in a separate go routine. - // It is desirable however, before we return, to have a guarantee that the - // separate go routine started processing events. This wait group helps us - // achieve this. Without the wait group, it will be possible to: + // The loop that processes the event stream is running in a separate go + // routine. It is desirable however, before we return, to have a guarantee + // that the separate go routine started processing events. This wait group + // helps us achieve this. Without the wait group, it will be possible to: // // 1. call watchEndpoints // 2. call EnsureTenantIP @@ -324,7 +333,8 @@ func (d *Directory) watchEndpoints(ctx context.Context, stopper *stop.Stopper) e if grpcutil.IsContextCanceled(err) { break } - // This should never happen. + // This should only happen in case of a deleted tenant or a transient + // error during fetch of tenant metadata. log.Errorf(ctx, "ignoring error getting entry for tenant %d: %v", resp.TenantID, err) continue } @@ -348,6 +358,7 @@ func (d *Directory) watchEndpoints(ctx context.Context, stopper *stop.Stopper) e if err != nil { return err } + // Block until the initial endpoint watcher client stream is constructed. waitInit.Wait() return err diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.pb.go b/pkg/ccl/sqlproxyccl/tenant/directory.pb.go index 5490250fbafe..ae873721af6e 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.pb.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.pb.go @@ -27,7 +27,8 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -// EventType shows the event type of the notifications that the server streams to its clients. +// EventType shows the event type of the notifications that the server streams +// to its clients. type EventType int32 const ( @@ -89,8 +90,8 @@ func (m *WatchEndpointsRequest) XXX_DiscardUnknown() { var xxx_messageInfo_WatchEndpointsRequest proto.InternalMessageInfo -// WatchEndpointsResponse represents the notifications that the server sends to its clients when clients -// want to monitor the directory server activity. +// WatchEndpointsResponse represents the notifications that the server sends to +// its clients when clients want to monitor the directory server activity. type WatchEndpointsResponse struct { // EventType is the type of the notifications - added, modified, deleted. Typ EventType `protobuf:"varint,1,opt,name=typ,proto3,enum=cockroach.ccl.sqlproxyccl.tenant.EventType" json:"typ,omitempty"` @@ -129,9 +130,11 @@ func (m *WatchEndpointsResponse) XXX_DiscardUnknown() { var xxx_messageInfo_WatchEndpointsResponse proto.InternalMessageInfo -// ListEndpointsRequest is used to query the server for the list of current endpoints of a given tenant. +// ListEndpointsRequest is used to query the server for the list of current +// endpoints of a given tenant. type ListEndpointsRequest struct { - // TenantID identifies the tenant for which the client is requesting a list of the endpoints. + // TenantID identifies the tenant for which the client is requesting a list of + // the endpoints. TenantID uint64 `protobuf:"varint,1,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` } @@ -164,8 +167,9 @@ func (m *ListEndpointsRequest) XXX_DiscardUnknown() { var xxx_messageInfo_ListEndpointsRequest proto.InternalMessageInfo -// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If there is an active backend then the -// server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. +// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If +// there is an active backend then the server doesn't have to do anything. If +// there isn't an active backend, then the server has to bring a new one up. type EnsureEndpointRequest struct { // TenantID is the id of the tenant for which an active backend is requested. TenantID uint64 `protobuf:"varint,1,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` @@ -200,7 +204,8 @@ func (m *EnsureEndpointRequest) XXX_DiscardUnknown() { var xxx_messageInfo_EnsureEndpointRequest proto.InternalMessageInfo -// EnsureEndpointResponse is empty and indicates that the server processed the request. +// EnsureEndpointResponse is empty and indicates that the server processed the +// request. type EnsureEndpointResponse struct { } @@ -233,8 +238,8 @@ func (m *EnsureEndpointResponse) XXX_DiscardUnknown() { var xxx_messageInfo_EnsureEndpointResponse proto.InternalMessageInfo -// Endpoint contains the information about a tenant endpoint. Most often it is a combination of an ip address and port. -// i.e. 132.130.1.11:34576 +// Endpoint contains the information about a tenant endpoint. Most often it is a +// combination of an ip address and port, e.g. 132.130.1.11:34576. type Endpoint struct { // IP is the ip and port combo identifying the tenant endpoint. IP string `protobuf:"bytes,1,opt,name=IP,proto3" json:"IP,omitempty"` @@ -269,7 +274,8 @@ func (m *Endpoint) XXX_DiscardUnknown() { var xxx_messageInfo_Endpoint proto.InternalMessageInfo -// ListEndpointsResponse is sent back as a result of requesting the list of endpoints for a given tenant. +// ListEndpointsResponse is sent back as a result of requesting the list of +// endpoints for a given tenant. type ListEndpointsResponse struct { // Endpoints is the list of endpoints currently active for the requested tenant. Endpoints []*Endpoint `protobuf:"bytes,1,rep,name=endpoints,proto3" json:"endpoints,omitempty"` @@ -304,7 +310,8 @@ func (m *ListEndpointsResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ListEndpointsResponse proto.InternalMessageInfo -// GetTenantRequest is used by a client to request from the sever metadata related to a given tenant. +// GetTenantRequest is used by a client to request from the sever metadata +// related to a given tenant. type GetTenantRequest struct { // TenantID identifies the tenant for which the metadata is being requested. TenantID uint64 `protobuf:"varint,1,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` @@ -440,13 +447,15 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type DirectoryClient interface { - // ListEndpoints is used to query the server for the list of current endpoints of a given tenant. + // ListEndpoints is used to query the server for the list of current endpoints + // of a given tenant. ListEndpoints(ctx context.Context, in *ListEndpointsRequest, opts ...grpc.CallOption) (*ListEndpointsResponse, error) - // WatchEndpoints is used to get a stream, that is used to receive notifications about changes in tenant - // backend's state - added, modified and deleted. + // WatchEndpoints is used to get a stream, that is used to receive notifications + // about changes in tenant backend's state - added, modified and deleted. WatchEndpoints(ctx context.Context, in *WatchEndpointsRequest, opts ...grpc.CallOption) (Directory_WatchEndpointsClient, error) - // EnsureEndpoint is used to ensure that a tenant's backend is active. If there is an active backend then the - // server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. + // EnsureEndpoint is used to ensure that a tenant's backend is active. If there + // is an active backend then the server doesn't have to do anything. If there + // isn't an active backend, then the server has to bring a new one up. EnsureEndpoint(ctx context.Context, in *EnsureEndpointRequest, opts ...grpc.CallOption) (*EnsureEndpointResponse, error) // GetTenant is used to fetch the metadata of a specific tenant. GetTenant(ctx context.Context, in *GetTenantRequest, opts ...grpc.CallOption) (*GetTenantResponse, error) @@ -521,13 +530,15 @@ func (c *directoryClient) GetTenant(ctx context.Context, in *GetTenantRequest, o // DirectoryServer is the server API for Directory service. type DirectoryServer interface { - // ListEndpoints is used to query the server for the list of current endpoints of a given tenant. + // ListEndpoints is used to query the server for the list of current endpoints + // of a given tenant. ListEndpoints(context.Context, *ListEndpointsRequest) (*ListEndpointsResponse, error) - // WatchEndpoints is used to get a stream, that is used to receive notifications about changes in tenant - // backend's state - added, modified and deleted. + // WatchEndpoints is used to get a stream, that is used to receive notifications + // about changes in tenant backend's state - added, modified and deleted. WatchEndpoints(*WatchEndpointsRequest, Directory_WatchEndpointsServer) error - // EnsureEndpoint is used to ensure that a tenant's backend is active. If there is an active backend then the - // server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. + // EnsureEndpoint is used to ensure that a tenant's backend is active. If there + // is an active backend then the server doesn't have to do anything. If there + // isn't an active backend, then the server has to bring a new one up. EnsureEndpoint(context.Context, *EnsureEndpointRequest) (*EnsureEndpointResponse, error) // GetTenant is used to fetch the metadata of a specific tenant. GetTenant(context.Context, *GetTenantRequest) (*GetTenantResponse, error) diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.proto b/pkg/ccl/sqlproxyccl/tenant/directory.proto index f62217774d43..682fdb08cf4c 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.proto +++ b/pkg/ccl/sqlproxyccl/tenant/directory.proto @@ -15,7 +15,8 @@ import "gogoproto/gogo.proto"; // WatchEndpointsRequest is empty as we want to get all notifications. message WatchEndpointsRequest {} -// EventType shows the event type of the notifications that the server streams to its clients. +// EventType shows the event type of the notifications that the server streams +// to its clients. enum EventType { option (gogoproto.goproto_enum_prefix) = false; @@ -24,8 +25,8 @@ enum EventType { DELETED = 2; } -// WatchEndpointsResponse represents the notifications that the server sends to its clients when clients -// want to monitor the directory server activity. +// WatchEndpointsResponse represents the notifications that the server sends to +// its clients when clients want to monitor the directory server activity. message WatchEndpointsResponse { // EventType is the type of the notifications - added, modified, deleted. EventType typ = 1; @@ -35,37 +36,43 @@ message WatchEndpointsResponse { uint64 tenant_id = 3[(gogoproto.customname) = "TenantID"]; } -// ListEndpointsRequest is used to query the server for the list of current endpoints of a given tenant. +// ListEndpointsRequest is used to query the server for the list of current +// endpoints of a given tenant. message ListEndpointsRequest { - // TenantID identifies the tenant for which the client is requesting a list of the endpoints. + // TenantID identifies the tenant for which the client is requesting a list of + // the endpoints. uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; } -// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If there is an active backend then the -// server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. +// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If +// there is an active backend then the server doesn't have to do anything. If +// there isn't an active backend, then the server has to bring a new one up. message EnsureEndpointRequest { // TenantID is the id of the tenant for which an active backend is requested. uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; } -// EnsureEndpointResponse is empty and indicates that the server processed the request. +// EnsureEndpointResponse is empty and indicates that the server processed the +// request. message EnsureEndpointResponse { } -// Endpoint contains the information about a tenant endpoint. Most often it is a combination of an ip address and port. -// i.e. 132.130.1.11:34576 +// Endpoint contains the information about a tenant endpoint. Most often it is a +// combination of an ip address and port, e.g. 132.130.1.11:34576. message Endpoint { // IP is the ip and port combo identifying the tenant endpoint. string IP = 1[(gogoproto.customname) = "IP"]; } -// ListEndpointsResponse is sent back as a result of requesting the list of endpoints for a given tenant. +// ListEndpointsResponse is sent back as a result of requesting the list of +// endpoints for a given tenant. message ListEndpointsResponse { // Endpoints is the list of endpoints currently active for the requested tenant. repeated Endpoint endpoints = 1; } -// GetTenantRequest is used by a client to request from the sever metadata related to a given tenant. +// GetTenantRequest is used by a client to request from the sever metadata +// related to a given tenant. message GetTenantRequest { // TenantID identifies the tenant for which the metadata is being requested. uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; @@ -77,15 +84,18 @@ message GetTenantResponse { string cluster_name = 1; // add more metadata if needed } -// Directory specifies a service that keeps track and manages tenant backends, related metadata and their endpoints. +// Directory specifies a service that keeps track and manages tenant backends, +// related metadata and their endpoints. service Directory { - // ListEndpoints is used to query the server for the list of current endpoints of a given tenant. + // ListEndpoints is used to query the server for the list of current endpoints + // of a given tenant. rpc ListEndpoints(ListEndpointsRequest) returns (ListEndpointsResponse); - // WatchEndpoints is used to get a stream, that is used to receive notifications about changes in tenant - // backend's state - added, modified and deleted. + // WatchEndpoints is used to get a stream, that is used to receive notifications + // about changes in tenant backend's state - added, modified and deleted. rpc WatchEndpoints(WatchEndpointsRequest) returns (stream WatchEndpointsResponse); - // EnsureEndpoint is used to ensure that a tenant's backend is active. If there is an active backend then the - // server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. + // EnsureEndpoint is used to ensure that a tenant's backend is active. If there + // is an active backend then the server doesn't have to do anything. If there + // isn't an active backend, then the server has to bring a new one up. rpc EnsureEndpoint(EnsureEndpointRequest) returns (EnsureEndpointResponse); // GetTenant is used to fetch the metadata of a specific tenant. rpc GetTenant(GetTenantRequest) returns (GetTenantResponse); diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/directory_test.go b/pkg/ccl/sqlproxyccl/tenant/directory_test.go similarity index 89% rename from pkg/ccl/sqlproxyccl/tenantdirsvr/directory_test.go rename to pkg/ccl/sqlproxyccl/tenant/directory_test.go index 1c50c075c7dc..29fbf5432399 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/directory_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package tenantdirsvr +package tenant_test import ( "context" @@ -18,15 +18,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // To ensure tenant startup code is included. @@ -44,19 +46,19 @@ func TestDirectoryErrors(t *testing.T) { defer tc.Stopper().Stop(ctx) _, err := dir.LookupTenantIPs(ctx, roachpb.MakeTenantID(1000)) - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1000 not in directory cache") _, err = dir.LookupTenantIPs(ctx, roachpb.MakeTenantID(1001)) - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1001 not in directory cache") _, err = dir.LookupTenantIPs(ctx, roachpb.MakeTenantID(1002)) - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1002 not in directory cache") // Fail to find tenant that does not exist. _, err = dir.EnsureTenantIP(ctx, roachpb.MakeTenantID(1000), "") - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1000 not found") // Fail to find tenant when cluster name doesn't match. _, err = dir.EnsureTenantIP(ctx, roachpb.MakeTenantID(tenantID), "unknown") - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = cluster name unknown doesn't match expected tenant-cluster") // No-op when reporting failure for tenant that doesn't exit. require.NoError(t, dir.ReportFailure(ctx, roachpb.MakeTenantID(1000), "")) @@ -193,7 +195,7 @@ func TestResume(t *testing.T) { }(i) } - var processes map[net.Addr]*tenant.Process + var processes map[net.Addr]*tenantdirsvr.Process // Eventually the tenant process will be resumed. require.Eventually(t, func() bool { processes = tds.Get(tenantID) @@ -257,9 +259,9 @@ func TestDeleteTenant(t *testing.T) { // Now EnsureTenantIP should return an error and the directory should no // longer cache the tenant. _, err = dir.EnsureTenantIP(ctx, tenantID, "") - require.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 50 not found") ips, err = dir.LookupTenantIPs(ctx, tenantID) - require.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 50 not in directory cache") require.Nil(t, ips) } @@ -330,9 +332,9 @@ func destroyTenant(tc serverutils.TestClusterInterface, id roachpb.TenantID) err func startTenant( ctx context.Context, srv serverutils.TestServerInterface, id uint64, -) (*tenant.Process, error) { +) (*tenantdirsvr.Process, error) { log.TestingClearServerIdentifiers() - tenantStopper := tenant.NewSubStopper(srv.Stopper()) + tenantStopper := tenantdirsvr.NewSubStopper(srv.Stopper()) t, err := srv.StartTenant( ctx, base.TestTenantArgs{ @@ -342,13 +344,17 @@ func startTenant( Stopper: tenantStopper, }) if err != nil { + // Remap tenant "not found" error to GRPC NotFound error. + if err.Error() == "not found" { + return nil, status.Errorf(codes.NotFound, "tenant %d not found", id) + } return nil, err } sqlAddr, err := net.ResolveTCPAddr("tcp", t.SQLAddr()) if err != nil { return nil, err } - return &tenant.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil + return &tenantdirsvr.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil } // Setup directory that uses a client connected to a test directory server @@ -358,7 +364,7 @@ func newTestDirectory( ) ( tc serverutils.TestClusterInterface, directory *tenant.Directory, - tds *tenant.TestDirectoryServer, + tds *tenantdirsvr.TestDirectoryServer, ) { tc = serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ // We need to start the cluster insecure in order to not @@ -369,9 +375,9 @@ func newTestDirectory( }) clusterStopper := tc.Stopper() var err error - tds, err = tenant.NewTestDirectoryServer(clusterStopper) + tds, err = tenantdirsvr.New(clusterStopper) require.NoError(t, err) - tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenant.Process, error) { + tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenantdirsvr.Process, error) { t.Logf("starting tenant %d", tenantID) process, err := startTenant(ctx, tc.Server(0), tenantID) if err != nil { diff --git a/pkg/ccl/sqlproxyccl/tenant/entry.go b/pkg/ccl/sqlproxyccl/tenant/entry.go index 34b681231cd4..0ecaa1eed18d 100644 --- a/pkg/ccl/sqlproxyccl/tenant/entry.go +++ b/pkg/ccl/sqlproxyccl/tenant/entry.go @@ -21,8 +21,8 @@ import ( ) // tenantEntry is an entry in the tenant directory that records information -// about a single tenant, including its ID, cluster name, and the IP addresses for -// available endpoints. +// about a single tenant, including its ID, cluster name, and the IP addresses +// for available endpoints. type tenantEntry struct { // These fields can be read by callers without synchronization, since // they're written once during initialization, and are immutable thereafter. @@ -35,7 +35,8 @@ type tenantEntry struct { ClusterName string // RefreshDelay is the minimum amount of time that must elapse between - // attempts to refresh endpoints for this tenant after ReportFailure is called. + // attempts to refresh endpoints for this tenant after ReportFailure is + // called. RefreshDelay time.Duration // initialized is set to true once Initialized has been called. @@ -45,27 +46,27 @@ type tenantEntry struct { // error occurred). initError error - // endpoints synchronizes access to information about the tenant's SQL endpoints. - // These fields can be updated over time, so a lock must be obtained before - // accessing them. + // endpoints synchronizes access to information about the tenant's SQL + // endpoints. These fields can be updated over time, so a lock must be + // obtained before accessing them. endpoints struct { syncutil.Mutex ips []string } // calls synchronizes calls to the K8s API for this tenant (e.g. calls to - // RefreshEndpoints). Synchronization is needed to ensure that only one thread at - // a time is calling on behalf of a tenant, and that calls are rate limited - // to prevent storms. + // RefreshEndpoints). Synchronization is needed to ensure that only one + // thread at a time is calling on behalf of a tenant, and that calls are rate + // limited to prevent storms. calls struct { syncutil.Mutex lastRefresh time.Time } } -// Initialize fetches metadata about a tenant, such as its cluster name, and stores -// that in the entry. After this is called once, all future calls return the -// same result (and do nothing). +// Initialize fetches metadata about a tenant, such as its cluster name, and +// stores that in the entry. After this is called once, all future calls return +// the same result (and do nothing). func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) error { // Synchronize multiple threads trying to initialize. Only the first thread // does the initialization. @@ -90,8 +91,9 @@ func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) er return nil } -// RefreshEndpoints makes a synchronous directory server call to fetch the latest information -// about the tenant's available endpoints, such as their IP addresses. +// RefreshEndpoints makes a synchronous directory server call to fetch the +// latest information about the tenant's available endpoints, such as their IP +// addresses. func (e *tenantEntry) RefreshEndpoints(ctx context.Context, client DirectoryClient) error { if !e.initialized { return errors.AssertionFailedf("entry for tenant %d is not initialized", e.TenantID) @@ -114,14 +116,15 @@ func (e *tenantEntry) RefreshEndpoints(ctx context.Context, client DirectoryClie return err } -// ChooseEndpointIP returns the IP address of one of this tenant's available endpoints. -// If a tenant has multiple endpoints, then ChooseEndpointIP returns the IP address of one -// of those endpoints. If the tenant is suspended and no endpoints are available, then -// ChooseEndpointIP will trigger resumption of the tenant and return the IP address -// of the new endpoint. Note that resuming a tenant requires directory server calls, so -// ChooseEndpointIP can block for some time, until the resumption process is -// complete. However, if errorIfNoEndpoints is true, then ChooseEndpointIP returns an -// error if there are no endpoints available rather than blocking. +// ChooseEndpointIP returns the IP address of one of this tenant's available +// endpoints. If a tenant has multiple endpoints, then ChooseEndpointIP returns +// the IP address of one of those endpoints. If the tenant is suspended and no +// endpoints are available, then ChooseEndpointIP will trigger resumption of the +// tenant and return the IP address of the new endpoint. Note that resuming a +// tenant requires directory server calls, so ChooseEndpointIP can block for +// some time, until the resumption process is complete. However, if +// errorIfNoEndpoints is true, then ChooseEndpointIP returns an error if there +// are no endpoints available rather than blocking. // // TODO(andyk): Use better load-balancing algorithm once tenants can have more // than one endpoint. @@ -134,9 +137,9 @@ func (e *tenantEntry) ChooseEndpointIP( ips := e.getEndpointIPs() if len(ips) == 0 { - // There are no known endpoint IP addresses, so fetch endpoint information from - // the directory server. Resume the tenant if it is suspended; that will - // always result in at least one endpoint IP address (or an error). + // There are no known endpoint IP addresses, so fetch endpoint information + // from the directory server. Resume the tenant if it is suspended; that + // will always result in at least one endpoint IP address (or an error). var err error if ips, err = e.ensureTenantEndpoint(ctx, client, errorIfNoEndpoints); err != nil { return "", err @@ -145,8 +148,8 @@ func (e *tenantEntry) ChooseEndpointIP( return ips[0], nil } -// AddEndpointIP inserts the given IP address into the tenant's list of Endpoint IPs. If -// it is already present, then AddEndpointIP returns false. +// AddEndpointIP inserts the given IP address into the tenant's list of Endpoint +// IPs. If it is already present, then AddEndpointIP returns false. func (e *tenantEntry) AddEndpointIP(ip string) bool { e.endpoints.Lock() defer e.endpoints.Unlock() @@ -161,8 +164,8 @@ func (e *tenantEntry) AddEndpointIP(ip string) bool { return true } -// RemoveEndpointIP removes the given IP address from the tenant's list of Endpoint IPs. -// If it was not present, RemoveEndpointIP returns false. +// RemoveEndpointIP removes the given IP address from the tenant's list of +// Endpoint IPs. If it was not present, RemoveEndpointIP returns false. func (e *tenantEntry) RemoveEndpointIP(ip string) bool { e.endpoints.Lock() defer e.endpoints.Unlock() @@ -177,8 +180,8 @@ func (e *tenantEntry) RemoveEndpointIP(ip string) bool { return false } -// getEndpointIPs gets the current list of endpoint IP addresses within scope of lock and -// returns them. +// getEndpointIPs gets the current list of endpoint IP addresses within scope of +// lock and returns them. func (e *tenantEntry) getEndpointIPs() []string { e.endpoints.Lock() defer e.endpoints.Unlock() @@ -187,8 +190,8 @@ func (e *tenantEntry) getEndpointIPs() []string { // ensureTenantEndpoint ensures that at least one SQL process exists for this // tenant, and is ready for connection attempts to its IP address. If -// errorIfNoEndpoints is true, then ensureTenantEndpoint returns an error if there are no -// endpoints available rather than blocking. +// errorIfNoEndpoints is true, then ensureTenantEndpoint returns an error if +// there are no endpoints available rather than blocking. func (e *tenantEntry) ensureTenantEndpoint( ctx context.Context, client DirectoryClient, errorIfNoEndpoints bool, ) (ips []string, err error) { @@ -205,37 +208,28 @@ func (e *tenantEntry) ensureTenantEndpoint( return ips, nil } - // Get up-to-date count of endpoints for the tenant from the K8s server. - resp, err := client.ListEndpoints(ctx, &ListEndpointsRequest{TenantID: e.TenantID.ToUint64()}) - if err != nil { - return nil, err - } - for { // Check for context cancellation or timeout. if err = ctx.Err(); err != nil { return nil, err } - // Check if tenant needs to be resumed. - if len(resp.Endpoints) == 0 { - log.Infof(ctx, "resuming tenant %d", e.TenantID) - - if _, err := client.EnsureEndpoint( - ctx, &EnsureEndpointRequest{e.TenantID.ToUint64()}, - ); err != nil { - return nil, err - } + // Try to resume the tenant if not yet resumed. + _, err = client.EnsureEndpoint(ctx, &EnsureEndpointRequest{e.TenantID.ToUint64()}) + if err != nil { + return nil, err } - // Get endpoint information for the newly resumed tenant. Except in rare race - // conditions, this is expected to immediately find an IP address, since - // the above call started a tenant process that already has an IP address. + // Get endpoint information for the newly resumed tenant. Except in rare + // race conditions, this is expected to immediately find an IP address, + // since the above call started a tenant process that already has an IP + // address. ips, err = e.fetchEndpointsLocked(ctx, client) if err != nil { return nil, err } if len(ips) != 0 { + log.Infof(ctx, "resumed tenant %d", e.TenantID) break } @@ -250,8 +244,9 @@ func (e *tenantEntry) ensureTenantEndpoint( return ips, nil } -// fetchEndpointsLocked makes a synchronous directory server call to get the latest -// information about the tenant's available endpoints, such as their IP addresses. +// fetchEndpointsLocked makes a synchronous directory server call to get the +// latest information about the tenant's available endpoints, such as their IP +// addresses. // // NOTE: Caller must lock the "calls" mutex before calling fetchEndpointsLocked. func (e *tenantEntry) fetchEndpointsLocked( @@ -263,7 +258,8 @@ func (e *tenantEntry) fetchEndpointsLocked( return nil, err } - // Get updated list of running process endpoint IP addresses and save it to the entry. + // Get updated list of running process endpoint IP addresses and save it to + // the entry. ips = make([]string, 0, len(list.Endpoints)) for i := range list.Endpoints { endpoint := list.Endpoints[i] @@ -284,8 +280,8 @@ func (e *tenantEntry) fetchEndpointsLocked( } // canRefreshLocked returns true if it's been at least X milliseconds since the -// last time the tenant endpoint information was refreshed. This has the effect of -// rate limiting RefreshEndpoints calls. +// last time the tenant endpoint information was refreshed. This has the effect +// of rate limiting RefreshEndpoints calls. // // NOTE: Caller must lock the "calls" mutex before calling canRefreshLocked. func (e *tenantEntry) canRefreshLocked() bool { diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/main_test.go b/pkg/ccl/sqlproxyccl/tenant/main_test.go similarity index 98% rename from pkg/ccl/sqlproxyccl/tenantdirsvr/main_test.go rename to pkg/ccl/sqlproxyccl/tenant/main_test.go index 90199cefb604..78839929b3c2 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/main_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/main_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package tenantdirsvr +package tenant_test import ( "os" diff --git a/pkg/ccl/sqlproxyccl/tenant/mocks_generated.go b/pkg/ccl/sqlproxyccl/tenant/mocks_generated.go deleted file mode 100644 index 15c887ef9ea2..000000000000 --- a/pkg/ccl/sqlproxyccl/tenant/mocks_generated.go +++ /dev/null @@ -1,240 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant (interfaces: DirectoryClient,Directory_WatchEndpointsClient) - -// Package tenant is a generated GoMock package. -package tenant - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - grpc "google.golang.org/grpc" - metadata "google.golang.org/grpc/metadata" -) - -// MockDirectoryClient is a mock of DirectoryClient interface. -type MockDirectoryClient struct { - ctrl *gomock.Controller - recorder *MockDirectoryClientMockRecorder -} - -// MockDirectoryClientMockRecorder is the mock recorder for MockDirectoryClient. -type MockDirectoryClientMockRecorder struct { - mock *MockDirectoryClient -} - -// NewMockDirectoryClient creates a new mock instance. -func NewMockDirectoryClient(ctrl *gomock.Controller) *MockDirectoryClient { - mock := &MockDirectoryClient{ctrl: ctrl} - mock.recorder = &MockDirectoryClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockDirectoryClient) EXPECT() *MockDirectoryClientMockRecorder { - return m.recorder -} - -// EnsureEndpoint mocks base method. -func (m *MockDirectoryClient) EnsureEndpoint(arg0 context.Context, arg1 *EnsureEndpointRequest, arg2 ...grpc.CallOption) (*EnsureEndpointResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "EnsureEndpoint", varargs...) - ret0, _ := ret[0].(*EnsureEndpointResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// EnsureEndpoint indicates an expected call of EnsureEndpoint. -func (mr *MockDirectoryClientMockRecorder) EnsureEndpoint(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureEndpoint", reflect.TypeOf((*MockDirectoryClient)(nil).EnsureEndpoint), varargs...) -} - -// GetTenant mocks base method. -func (m *MockDirectoryClient) GetTenant(arg0 context.Context, arg1 *GetTenantRequest, arg2 ...grpc.CallOption) (*GetTenantResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "GetTenant", varargs...) - ret0, _ := ret[0].(*GetTenantResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetTenant indicates an expected call of GetTenant. -func (mr *MockDirectoryClientMockRecorder) GetTenant(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTenant", reflect.TypeOf((*MockDirectoryClient)(nil).GetTenant), varargs...) -} - -// ListEndpoints mocks base method. -func (m *MockDirectoryClient) ListEndpoints(arg0 context.Context, arg1 *ListEndpointsRequest, arg2 ...grpc.CallOption) (*ListEndpointsResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "ListEndpoints", varargs...) - ret0, _ := ret[0].(*ListEndpointsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListEndpoints indicates an expected call of ListEndpoints. -func (mr *MockDirectoryClientMockRecorder) ListEndpoints(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListEndpoints", reflect.TypeOf((*MockDirectoryClient)(nil).ListEndpoints), varargs...) -} - -// WatchEndpoints mocks base method. -func (m *MockDirectoryClient) WatchEndpoints(arg0 context.Context, arg1 *WatchEndpointsRequest, arg2 ...grpc.CallOption) (Directory_WatchEndpointsClient, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "WatchEndpoints", varargs...) - ret0, _ := ret[0].(Directory_WatchEndpointsClient) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WatchEndpoints indicates an expected call of WatchEndpoints. -func (mr *MockDirectoryClientMockRecorder) WatchEndpoints(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchEndpoints", reflect.TypeOf((*MockDirectoryClient)(nil).WatchEndpoints), varargs...) -} - -// MockDirectory_WatchEndpointsClient is a mock of Directory_WatchEndpointsClient interface. -type MockDirectory_WatchEndpointsClient struct { - ctrl *gomock.Controller - recorder *MockDirectory_WatchEndpointsClientMockRecorder -} - -// MockDirectory_WatchEndpointsClientMockRecorder is the mock recorder for MockDirectory_WatchEndpointsClient. -type MockDirectory_WatchEndpointsClientMockRecorder struct { - mock *MockDirectory_WatchEndpointsClient -} - -// NewMockDirectory_WatchEndpointsClient creates a new mock instance. -func NewMockDirectory_WatchEndpointsClient(ctrl *gomock.Controller) *MockDirectory_WatchEndpointsClient { - mock := &MockDirectory_WatchEndpointsClient{ctrl: ctrl} - mock.recorder = &MockDirectory_WatchEndpointsClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockDirectory_WatchEndpointsClient) EXPECT() *MockDirectory_WatchEndpointsClientMockRecorder { - return m.recorder -} - -// CloseSend mocks base method. -func (m *MockDirectory_WatchEndpointsClient) CloseSend() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CloseSend") - ret0, _ := ret[0].(error) - return ret0 -} - -// CloseSend indicates an expected call of CloseSend. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) CloseSend() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).CloseSend)) -} - -// Context mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Context() context.Context { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Context") - ret0, _ := ret[0].(context.Context) - return ret0 -} - -// Context indicates an expected call of Context. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Context() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Context)) -} - -// Header mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Header() (metadata.MD, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Header") - ret0, _ := ret[0].(metadata.MD) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Header indicates an expected call of Header. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Header() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Header)) -} - -// Recv mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Recv() (*WatchEndpointsResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Recv") - ret0, _ := ret[0].(*WatchEndpointsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Recv indicates an expected call of Recv. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Recv() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Recv)) -} - -// RecvMsg mocks base method. -func (m *MockDirectory_WatchEndpointsClient) RecvMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RecvMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// RecvMsg indicates an expected call of RecvMsg. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).RecvMsg), arg0) -} - -// SendMsg mocks base method. -func (m *MockDirectory_WatchEndpointsClient) SendMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendMsg indicates an expected call of SendMsg. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).SendMsg), arg0) -} - -// Trailer mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Trailer() metadata.MD { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Trailer") - ret0, _ := ret[0].(metadata.MD) - return ret0 -} - -// Trailer indicates an expected call of Trailer. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Trailer() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Trailer)) -} diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel index 475c9d728605..66ab50368c40 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel @@ -1,30 +1,18 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") -go_test( - name = "tenantdirsvr_test", - srcs = [ - "directory_test.go", - "main_test.go", - ], +go_library( + name = "tenantdirsvr", + srcs = ["test_directory_svr.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr", + visibility = ["//visibility:public"], deps = [ - "//pkg/base", - "//pkg/ccl", - "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/sqlproxyccl/tenant", - "//pkg/ccl/utilccl", "//pkg/roachpb", - "//pkg/security", - "//pkg/security/securitytest", - "//pkg/server", - "//pkg/sql", - "//pkg/testutils/serverutils", - "//pkg/testutils/testcluster", - "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/randutil", "//pkg/util/stop", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/ccl/sqlproxyccl/tenant/test_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go similarity index 77% rename from pkg/ccl/sqlproxyccl/tenant/test_directory_svr.go rename to pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go index d783a81e398d..37e96fc8b7a3 100644 --- a/pkg/ccl/sqlproxyccl/tenant/test_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package tenant +package tenantdirsvr import ( "bytes" @@ -18,6 +18,7 @@ import ( "os/exec" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -27,8 +28,8 @@ import ( "google.golang.org/grpc" ) -// Making sure that TestDirectoryServer implements the DirectoryServer interface. -var _ DirectoryServer = (*TestDirectoryServer)(nil) +// Make sure that TestDirectoryServer implements the DirectoryServer interface. +var _ tenant.DirectoryServer = (*TestDirectoryServer)(nil) // Process stores information about a running tenant process. type Process struct { @@ -37,12 +38,11 @@ type Process struct { SQL net.Addr } -// NewSubStopper creates a new stopper that will be stopped -// when either the parent is stopped or its own Stop is called. -// The code is slightly more complicated that simply calling -// NewStopper followed by AddCloser since there is a possibility that between -// the two calls, the parent stopper completes a stop and then the leak detection -// may find a leaked stopper. +// NewSubStopper creates a new stopper that will be stopped when either the +// parent is stopped or its own Stop is called. The code is slightly more +// complicated that simply calling NewStopper followed by AddCloser since there +// is a possibility that between the two calls, the parent stopper completes a +// stop and then the leak detection may find a leaked stopper. func NewSubStopper(parentStopper *stop.Stopper) *stop.Stopper { mu := &syncutil.Mutex{} var subStopper *stop.Stopper @@ -62,8 +62,8 @@ func NewSubStopper(parentStopper *stop.Stopper) *stop.Stopper { return subStopper } -// TestDirectoryServer is a directory server implementation that is used -// for testing. +// TestDirectoryServer is a directory server implementation that is used for +// testing. type TestDirectoryServer struct { stopper *stop.Stopper grpcServer *grpc.Server @@ -71,8 +71,8 @@ type TestDirectoryServer struct { // TenantStarterFunc will be used to launch a new tenant process. TenantStarterFunc func(ctx context.Context, tenantID uint64) (*Process, error) - // When both mutexes need to be held, the locking should always be - // proc first and listen second. + // When both mutexes need to be held, the locking should always be proc + // first and listen second. proc struct { syncutil.RWMutex processByAddrByTenantID map[uint64]map[net.Addr]*Process @@ -83,7 +83,28 @@ type TestDirectoryServer struct { } } -// Get a tenant's list of endpoints and the process information for each endpoint. +// New will create a new server. +func New(stopper *stop.Stopper) (*TestDirectoryServer, error) { + // Determine the path to cockroach executable. + cockroachExecutable, err := os.Executable() + if err != nil { + return nil, err + } + dir := &TestDirectoryServer{ + grpcServer: grpc.NewServer(), + stopper: stopper, + cockroachExecutable: cockroachExecutable, + } + dir.TenantStarterFunc = dir.startTenantLocked + dir.proc.processByAddrByTenantID = map[uint64]map[net.Addr]*Process{} + dir.listen.eventListeners = list.New() + stopper.AddCloser(stop.CloserFn(func() { dir.grpcServer.GracefulStop() })) + tenant.RegisterDirectoryServer(dir.grpcServer, dir) + return dir, nil +} + +// Get a tenant's list of endpoints and the process information for each +// endpoint. func (s *TestDirectoryServer) Get(id roachpb.TenantID) (result map[net.Addr]*Process) { result = make(map[net.Addr]*Process) s.proc.RLock() @@ -97,30 +118,31 @@ func (s *TestDirectoryServer) Get(id roachpb.TenantID) (result map[net.Addr]*Pro return } -// GetTenant returns tenant metadata for a given ID. Hard coded to return -// every tenant's cluster name as "tenant-cluster" +// GetTenant returns tenant metadata for a given ID. Hard coded to return every +// tenant's cluster name as "tenant-cluster" func (s *TestDirectoryServer) GetTenant( - _ context.Context, _ *GetTenantRequest, -) (*GetTenantResponse, error) { - return &GetTenantResponse{ + _ context.Context, _ *tenant.GetTenantRequest, +) (*tenant.GetTenantResponse, error) { + return &tenant.GetTenantResponse{ ClusterName: "tenant-cluster", }, nil } -// ListEndpoints returns a list of tenant process endpoints as well as status of the -// processes. +// ListEndpoints returns a list of tenant process endpoints as well as status of +// the processes. func (s *TestDirectoryServer) ListEndpoints( - ctx context.Context, req *ListEndpointsRequest, -) (*ListEndpointsResponse, error) { + ctx context.Context, req *tenant.ListEndpointsRequest, +) (*tenant.ListEndpointsResponse, error) { ctx = logtags.AddTag(ctx, "tenant", req.TenantID) s.proc.RLock() defer s.proc.RUnlock() return s.listLocked(ctx, req) } -// WatchEndpoints returns a new stream, that can be used to monitor server activity. +// WatchEndpoints returns a new stream, that can be used to monitor server +// activity. func (s *TestDirectoryServer) WatchEndpoints( - _ *WatchEndpointsRequest, server Directory_WatchEndpointsServer, + _ *tenant.WatchEndpointsRequest, server tenant.Directory_WatchEndpointsServer, ) error { select { case <-s.stopper.ShouldQuiesce(): @@ -129,7 +151,7 @@ func (s *TestDirectoryServer) WatchEndpoints( } // Make the channel with a small buffer to allow for a burst of notifications // and a slow receiver. - c := make(chan *WatchEndpointsResponse, 10) + c := make(chan *tenant.WatchEndpointsResponse, 10) s.listen.Lock() elem := s.listen.eventListeners.PushBack(c) s.listen.Unlock() @@ -161,28 +183,28 @@ func (s *TestDirectoryServer) WatchEndpoints( return err } -func (s *TestDirectoryServer) notifyEventListenersLocked(req *WatchEndpointsResponse) { +func (s *TestDirectoryServer) notifyEventListenersLocked(req *tenant.WatchEndpointsResponse) { for e := s.listen.eventListeners.Front(); e != nil; { select { - case e.Value.(chan *WatchEndpointsResponse) <- req: + case e.Value.(chan *tenant.WatchEndpointsResponse) <- req: e = e.Next() default: // The receiver is unable to consume fast enough. Close the channel and // remove it from the list. eToClose := e e = e.Next() - close(eToClose.Value.(chan *WatchEndpointsResponse)) + close(eToClose.Value.(chan *tenant.WatchEndpointsResponse)) s.listen.eventListeners.Remove(eToClose) } } } -// EnsureEndpoint will ensure that there is either an already active tenant process or -// it will start a new one. It will return an error if starting a new tenant -// process is impossible. +// EnsureEndpoint will ensure that there is either an already active tenant +// process or it will start a new one. It will return an error if starting a new +// tenant process is impossible. func (s *TestDirectoryServer) EnsureEndpoint( - ctx context.Context, req *EnsureEndpointRequest, -) (*EnsureEndpointResponse, error) { + ctx context.Context, req *tenant.EnsureEndpointRequest, +) (*tenant.EnsureEndpointResponse, error) { select { case <-s.stopper.ShouldQuiesce(): return nil, context.Canceled @@ -194,7 +216,7 @@ func (s *TestDirectoryServer) EnsureEndpoint( s.proc.Lock() defer s.proc.Unlock() - lst, err := s.listLocked(ctx, &ListEndpointsRequest{TenantID: req.TenantID}) + lst, err := s.listLocked(ctx, &tenant.ListEndpointsRequest{TenantID: req.TenantID}) if err != nil { return nil, err } @@ -209,7 +231,7 @@ func (s *TestDirectoryServer) EnsureEndpoint( })) } - return &EnsureEndpointResponse{}, nil + return &tenant.EnsureEndpointResponse{}, nil } // Serve requests on the given listener. @@ -217,36 +239,16 @@ func (s *TestDirectoryServer) Serve(listener net.Listener) error { return s.grpcServer.Serve(listener) } -// NewTestDirectoryServer will create a new server. -func NewTestDirectoryServer(stopper *stop.Stopper) (*TestDirectoryServer, error) { - // Determine the path to cockroach executable. - cockroachExecutable, err := os.Executable() - if err != nil { - return nil, err - } - dir := &TestDirectoryServer{ - grpcServer: grpc.NewServer(), - stopper: stopper, - cockroachExecutable: cockroachExecutable, - } - dir.TenantStarterFunc = dir.startTenantLocked - dir.proc.processByAddrByTenantID = map[uint64]map[net.Addr]*Process{} - dir.listen.eventListeners = list.New() - stopper.AddCloser(stop.CloserFn(func() { dir.grpcServer.GracefulStop() })) - RegisterDirectoryServer(dir.grpcServer, dir) - return dir, nil -} - func (s *TestDirectoryServer) listLocked( - _ context.Context, req *ListEndpointsRequest, -) (*ListEndpointsResponse, error) { + _ context.Context, req *tenant.ListEndpointsRequest, +) (*tenant.ListEndpointsResponse, error) { processByAddr, ok := s.proc.processByAddrByTenantID[req.TenantID] if !ok { - return &ListEndpointsResponse{}, nil + return &tenant.ListEndpointsResponse{}, nil } - resp := ListEndpointsResponse{} + resp := tenant.ListEndpointsResponse{} for addr := range processByAddr { - resp.Endpoints = append(resp.Endpoints, &Endpoint{IP: addr.String()}) + resp.Endpoints = append(resp.Endpoints, &tenant.Endpoint{IP: addr.String()}) } return &resp, nil } @@ -261,8 +263,8 @@ func (s *TestDirectoryServer) registerInstanceLocked(tenantID uint64, process *P s.listen.RLock() defer s.listen.RUnlock() - s.notifyEventListenersLocked(&WatchEndpointsResponse{ - Typ: ADDED, + s.notifyEventListenersLocked(&tenant.WatchEndpointsResponse{ + Typ: tenant.ADDED, IP: process.SQL.String(), TenantID: tenantID, }) @@ -281,8 +283,8 @@ func (s *TestDirectoryServer) deregisterInstance(tenantID uint64, sql net.Addr) s.listen.RLock() defer s.listen.RUnlock() - s.notifyEventListenersLocked(&WatchEndpointsResponse{ - Typ: DELETED, + s.notifyEventListenersLocked(&tenant.WatchEndpointsResponse{ + Typ: tenant.DELETED, IP: sql.String(), TenantID: tenantID, }) diff --git a/pkg/ccl/sqlproxyccl/testserver.crt b/pkg/ccl/sqlproxyccl/testdata/testserver.crt similarity index 100% rename from pkg/ccl/sqlproxyccl/testserver.crt rename to pkg/ccl/sqlproxyccl/testdata/testserver.crt diff --git a/pkg/ccl/sqlproxyccl/testserver.key b/pkg/ccl/sqlproxyccl/testdata/testserver.key similarity index 100% rename from pkg/ccl/sqlproxyccl/testserver.key rename to pkg/ccl/sqlproxyccl/testdata/testserver.key diff --git a/pkg/ccl/sqlproxyccl/testserver_config.cnf b/pkg/ccl/sqlproxyccl/testdata/testserver_config.cnf similarity index 100% rename from pkg/ccl/sqlproxyccl/testserver_config.cnf rename to pkg/ccl/sqlproxyccl/testdata/testserver_config.cnf diff --git a/pkg/ccl/sqlproxyccl/throttler/local_test.go b/pkg/ccl/sqlproxyccl/throttler/local_test.go index 22bde2acace0..6af98a751aae 100644 --- a/pkg/ccl/sqlproxyccl/throttler/local_test.go +++ b/pkg/ccl/sqlproxyccl/throttler/local_test.go @@ -100,7 +100,7 @@ func TestLocalService_Eviction(t *testing.T) { s := newTestLocalService() s.maxMapSize = 10 - for i := 0; i < 10000; i++ { + for i := 0; i < 20; i++ { ipAddress := fmt.Sprintf("%d", i) _ = s.LoginCheck(ipAddress, s.clock.Now()) require.Less(t, len(s.mu.limiters), 11) diff --git a/pkg/ccl/sqlproxyccl/throttler/mocks_generated.go b/pkg/ccl/sqlproxyccl/throttler/mocks_generated.go deleted file mode 100644 index 5499db346db9..000000000000 --- a/pkg/ccl/sqlproxyccl/throttler/mocks_generated.go +++ /dev/null @@ -1,49 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: service.go - -// Package throttler is a generated GoMock package. -package throttler - -import ( - reflect "reflect" - time "time" - - gomock "github.com/golang/mock/gomock" -) - -// MockService is a mock of Service interface. -type MockService struct { - ctrl *gomock.Controller - recorder *MockServiceMockRecorder -} - -// MockServiceMockRecorder is the mock recorder for MockService. -type MockServiceMockRecorder struct { - mock *MockService -} - -// NewMockService creates a new mock instance. -func NewMockService(ctrl *gomock.Controller) *MockService { - mock := &MockService{ctrl: ctrl} - mock.recorder = &MockServiceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockService) EXPECT() *MockServiceMockRecorder { - return m.recorder -} - -// LoginCheck mocks base method. -func (m *MockService) LoginCheck(ipAddress string, now time.Time) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LoginCheck", ipAddress, now) - ret0, _ := ret[0].(error) - return ret0 -} - -// LoginCheck indicates an expected call of LoginCheck. -func (mr *MockServiceMockRecorder) LoginCheck(ipAddress, now interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoginCheck", reflect.TypeOf((*MockService)(nil).LoginCheck), ipAddress, now) -} diff --git a/pkg/ccl/sqlproxyccl/throttler/service.go b/pkg/ccl/sqlproxyccl/throttler/service.go index bac2218ac7e9..e91d5a9cfe77 100644 --- a/pkg/ccl/sqlproxyccl/throttler/service.go +++ b/pkg/ccl/sqlproxyccl/throttler/service.go @@ -11,10 +11,8 @@ package throttler import "time" -//go:generate mockgen -package=throttler -destination=mocks_generated.go -source=service.go . Service - // Service provides the interface for performing throttle checks before -// allowing requests into the managed service system. +// requests into the managed service system. type Service interface { // LoginCheck determines whether a login request should be allowed to // proceed. It rate limits login attempts from IP addresses and it rate diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 9681be3fec92..0fe3c1d2f9cd 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -97,7 +97,7 @@ go_library( "//pkg/base", "//pkg/build", "//pkg/ccl/sqlproxyccl", - "//pkg/ccl/sqlproxyccl/tenant", + "//pkg/ccl/sqlproxyccl/tenantdirsvr", "//pkg/cli/cliflags", "//pkg/cli/exit", "//pkg/cli/syncbench", diff --git a/pkg/cli/mt_test_directory.go b/pkg/cli/mt_test_directory.go index 8f4789a37597..b35a2912b077 100644 --- a/pkg/cli/mt_test_directory.go +++ b/pkg/cli/mt_test_directory.go @@ -15,16 +15,17 @@ import ( "fmt" "net" - "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/spf13/cobra" ) var mtTestDirectorySvr = &cobra.Command{ Use: "test-directory", - Short: "Run a test directory service.", + Short: "run a test directory service", Long: ` -Run a test directory service. +Run a test directory service that starts and manages tenant SQL instances as +processes on the local machine. `, Args: cobra.NoArgs, RunE: MaybeDecorateGRPCError(runDirectorySvr), @@ -40,7 +41,7 @@ func runDirectorySvr(cmd *cobra.Command, args []string) (returnErr error) { } defer stopper.Stop(ctx) - tds, err := tenant.NewTestDirectoryServer(stopper) + tds, err := tenantdirsvr.New(stopper) if err != nil { return err } diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index ac50901b59f1..2e2f5282f948 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -840,7 +840,7 @@ func TestLint(t *testing.T) { ":!util/grpcutil/grpc_util_test.go", ":!server/testserver.go", ":!util/tracing/*_test.go", - ":!ccl/sqlproxyccl/tenant/test_directory_svr.go", + ":!ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go", ) if err != nil { t.Fatal(err)