From 432b2f5f591ec157b460e538d5f27ae9fa89d169 Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Thu, 10 Jun 2021 19:56:22 -0700 Subject: [PATCH 1/4] sqlproxyccl: reorganize/tidy up files Do a bit of reorganization/tidying of files: 1. Remove mock files, which are never used. 2. Move tests into package directory they are testing. Avoid import cycles by qualifying their package name with "_test". 3. Move the TestDirectoryServer into the tenantdirsvr package and rename the NewTestDirectoryServer method to just be New. 4. Move certificate files into new testdata directory. Release note: None --- pkg/BUILD.bazel | 2 +- pkg/ccl/sqlproxyccl/BUILD.bazel | 10 +- .../sqlproxyccl/denylist/mocks_generated.go | 49 ---- pkg/ccl/sqlproxyccl/denylist/service.go | 2 - pkg/ccl/sqlproxyccl/frontend_admitter_test.go | 2 +- pkg/ccl/sqlproxyccl/proxy_handler_test.go | 14 +- pkg/ccl/sqlproxyccl/tenant/BUILD.bazel | 37 ++- pkg/ccl/sqlproxyccl/tenant/directory.go | 2 - pkg/ccl/sqlproxyccl/tenant/directory.pb.go | 55 ++-- .../directory_test.go | 17 +- .../{tenantdirsvr => tenant}/main_test.go | 2 +- pkg/ccl/sqlproxyccl/tenant/mocks_generated.go | 240 ------------------ pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel | 30 +-- .../test_directory_svr.go | 91 +++---- .../sqlproxyccl/{ => testdata}/testserver.crt | 0 .../sqlproxyccl/{ => testdata}/testserver.key | 0 .../{ => testdata}/testserver_config.cnf | 0 .../sqlproxyccl/throttler/mocks_generated.go | 49 ---- pkg/ccl/sqlproxyccl/throttler/service.go | 2 - pkg/cli/BUILD.bazel | 2 +- pkg/cli/mt_test_directory.go | 4 +- 21 files changed, 147 insertions(+), 463 deletions(-) delete mode 100644 pkg/ccl/sqlproxyccl/denylist/mocks_generated.go rename pkg/ccl/sqlproxyccl/{tenantdirsvr => tenant}/directory_test.go (96%) rename pkg/ccl/sqlproxyccl/{tenantdirsvr => tenant}/main_test.go (98%) delete mode 100644 pkg/ccl/sqlproxyccl/tenant/mocks_generated.go rename pkg/ccl/sqlproxyccl/{tenant => tenantdirsvr}/test_directory_svr.go (85%) rename pkg/ccl/sqlproxyccl/{ => testdata}/testserver.crt (100%) rename pkg/ccl/sqlproxyccl/{ => testdata}/testserver.key (100%) rename pkg/ccl/sqlproxyccl/{ => testdata}/testserver_config.cnf (100%) delete mode 100644 pkg/ccl/sqlproxyccl/throttler/mocks_generated.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 6bf1fcb10331..e437a062cc37 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -26,7 +26,7 @@ ALL_TESTS = [ "//pkg/ccl/sqlproxyccl/cache:cache_test", "//pkg/ccl/sqlproxyccl/denylist:denylist_test", "//pkg/ccl/sqlproxyccl/idle:idle_test", - "//pkg/ccl/sqlproxyccl/tenantdirsvr:tenantdirsvr_test", + "//pkg/ccl/sqlproxyccl/tenant:tenant_test", "//pkg/ccl/sqlproxyccl/throttler:throttler_test", "//pkg/ccl/sqlproxyccl:sqlproxyccl_test", "//pkg/ccl/storageccl/engineccl:engineccl_test", diff --git a/pkg/ccl/sqlproxyccl/BUILD.bazel b/pkg/ccl/sqlproxyccl/BUILD.bazel index 8c4f2370770f..3fa48fa37e23 100644 --- a/pkg/ccl/sqlproxyccl/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/BUILD.bazel @@ -36,8 +36,10 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", + "@com_github_gogo_status//:status", "@com_github_jackc_pgproto3_v2//:pgproto3", "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes", ], ) @@ -51,17 +53,13 @@ go_test( "proxy_handler_test.go", "server_test.go", ], - data = [ - ":testserver.crt", - ":testserver.key", - ":testserver_config.cnf", - ], + data = [":testdata"], embed = [":sqlproxyccl"], deps = [ "//pkg/base", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/sqlproxyccl/denylist", - "//pkg/ccl/sqlproxyccl/tenant", + "//pkg/ccl/sqlproxyccl/tenantdirsvr", "//pkg/ccl/utilccl", "//pkg/roachpb", "//pkg/security", diff --git a/pkg/ccl/sqlproxyccl/denylist/mocks_generated.go b/pkg/ccl/sqlproxyccl/denylist/mocks_generated.go deleted file mode 100644 index d40c42f4982b..000000000000 --- a/pkg/ccl/sqlproxyccl/denylist/mocks_generated.go +++ /dev/null @@ -1,49 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: service.go - -// Package denylist is a generated GoMock package. -package denylist - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockService is a mock of Service interface. -type MockService struct { - ctrl *gomock.Controller - recorder *MockServiceMockRecorder -} - -// MockServiceMockRecorder is the mock recorder for MockService. -type MockServiceMockRecorder struct { - mock *MockService -} - -// NewMockService creates a new mock instance. -func NewMockService(ctrl *gomock.Controller) *MockService { - mock := &MockService{ctrl: ctrl} - mock.recorder = &MockServiceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockService) EXPECT() *MockServiceMockRecorder { - return m.recorder -} - -// Denied mocks base method. -func (m *MockService) Denied(entity DenyEntity) (*Entry, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Denied", entity) - ret0, _ := ret[0].(*Entry) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Denied indicates an expected call of Denied. -func (mr *MockServiceMockRecorder) Denied(entity interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Denied", reflect.TypeOf((*MockService)(nil).Denied), entity) -} diff --git a/pkg/ccl/sqlproxyccl/denylist/service.go b/pkg/ccl/sqlproxyccl/denylist/service.go index 5e15726b89c5..bfa3cea27d84 100644 --- a/pkg/ccl/sqlproxyccl/denylist/service.go +++ b/pkg/ccl/sqlproxyccl/denylist/service.go @@ -10,8 +10,6 @@ package denylist import "strings" -//go:generate mockgen -package=denylist -destination=mocks_generated.go -source=service.go . Service - // Entry records the reason for putting an item on the denylist. // TODO(spaskob): add codes for different denial reasons. type Entry struct { diff --git a/pkg/ccl/sqlproxyccl/frontend_admitter_test.go b/pkg/ccl/sqlproxyccl/frontend_admitter_test.go index 5d6fc04f2418..f485d395a2fc 100644 --- a/pkg/ccl/sqlproxyccl/frontend_admitter_test.go +++ b/pkg/ccl/sqlproxyccl/frontend_admitter_test.go @@ -23,7 +23,7 @@ import ( ) func tlsConfig() (*tls.Config, error) { - cer, err := tls.LoadX509KeyPair("testserver.crt", "testserver.key") + cer, err := tls.LoadX509KeyPair("testdata/testserver.crt", "testdata/testserver.key") if err != nil { return nil, err } diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index b54dd41305c2..94273f829ec8 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -71,12 +71,12 @@ func newSecureProxyServer( ) (server *Server, addr string) { // Created via: const _ = ` -openssl genrsa -out testserver.key 2048 -openssl req -new -x509 -sha256 -key testserver.key -out testserver.crt \ - -days 3650 -config testserver_config.cnf +openssl genrsa -out testdata/testserver.key 2048 +openssl req -new -x509 -sha256 -key testdata/testserver.key -out testdata/testserver.crt \ + -days 3650 -config testdata/testserver_config.cnf ` - opts.ListenKey = "testserver.key" - opts.ListenCert = "testserver.crt" + opts.ListenKey = "testdata/testserver.key" + opts.ListenCert = "testdata/testserver.crt" return newProxyServer(ctx, t, stopper, opts) } @@ -204,11 +204,11 @@ func TestFailedConnection(t *testing.T) { } ac.assertConnectErr( - ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-ca&sslrootcert=testserver.crt", + ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-ca&sslrootcert=testdata/testserver.crt", codeBackendDown, "unable to reach backend SQL server", ) ac.assertConnectErr( - ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-full&sslrootcert=testserver.crt", + ctx, t, u, "?options=--cluster=dim-dog-28&sslmode=verify-full&sslrootcert=testdata/testserver.crt", codeBackendDown, "unable to reach backend SQL server", ) require.Equal(t, int64(4), s.metrics.BackendDownCount.Count()) diff --git a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel index 243234607fa9..0a6350862f17 100644 --- a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel @@ -1,5 +1,5 @@ load("@bazel_gomock//:gomock.bzl", "gomock") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") load("@rules_proto//proto:defs.bzl", "proto_library") @@ -25,7 +25,6 @@ go_library( srcs = [ "directory.go", "entry.go", - "test_directory_svr.go", ":mocks_tenant", # keep ], embed = [":tenant_go_proto"], @@ -39,9 +38,8 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_logtags//:logtags", "@com_github_golang_mock//gomock", # keep - "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes", "@org_golang_google_grpc//metadata", # keep "@org_golang_google_grpc//status", ], @@ -58,3 +56,34 @@ gomock( package = "tenant", self_package = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant", ) + +go_test( + name = "tenant_test", + srcs = [ + "directory_test.go", + "main_test.go", + ], + deps = [ + ":tenant", + "//pkg/base", + "//pkg/ccl", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/sqlproxyccl/tenantdirsvr", + "//pkg/ccl/utilccl", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "//pkg/util/stop", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + ], +) diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.go b/pkg/ccl/sqlproxyccl/tenant/directory.go index efc46470a262..7e49ed451247 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.go @@ -23,8 +23,6 @@ import ( "google.golang.org/grpc/status" ) -//go:generate mockgen -package=tenant -destination=mocks_generated.go . DirectoryClient,Directory_WatchEndpointsClient - // dirOptions control the behavior of tenant.Directory. type dirOptions struct { deterministic bool diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.pb.go b/pkg/ccl/sqlproxyccl/tenant/directory.pb.go index 5490250fbafe..ae873721af6e 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.pb.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.pb.go @@ -27,7 +27,8 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -// EventType shows the event type of the notifications that the server streams to its clients. +// EventType shows the event type of the notifications that the server streams +// to its clients. type EventType int32 const ( @@ -89,8 +90,8 @@ func (m *WatchEndpointsRequest) XXX_DiscardUnknown() { var xxx_messageInfo_WatchEndpointsRequest proto.InternalMessageInfo -// WatchEndpointsResponse represents the notifications that the server sends to its clients when clients -// want to monitor the directory server activity. +// WatchEndpointsResponse represents the notifications that the server sends to +// its clients when clients want to monitor the directory server activity. type WatchEndpointsResponse struct { // EventType is the type of the notifications - added, modified, deleted. Typ EventType `protobuf:"varint,1,opt,name=typ,proto3,enum=cockroach.ccl.sqlproxyccl.tenant.EventType" json:"typ,omitempty"` @@ -129,9 +130,11 @@ func (m *WatchEndpointsResponse) XXX_DiscardUnknown() { var xxx_messageInfo_WatchEndpointsResponse proto.InternalMessageInfo -// ListEndpointsRequest is used to query the server for the list of current endpoints of a given tenant. +// ListEndpointsRequest is used to query the server for the list of current +// endpoints of a given tenant. type ListEndpointsRequest struct { - // TenantID identifies the tenant for which the client is requesting a list of the endpoints. + // TenantID identifies the tenant for which the client is requesting a list of + // the endpoints. TenantID uint64 `protobuf:"varint,1,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` } @@ -164,8 +167,9 @@ func (m *ListEndpointsRequest) XXX_DiscardUnknown() { var xxx_messageInfo_ListEndpointsRequest proto.InternalMessageInfo -// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If there is an active backend then the -// server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. +// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If +// there is an active backend then the server doesn't have to do anything. If +// there isn't an active backend, then the server has to bring a new one up. type EnsureEndpointRequest struct { // TenantID is the id of the tenant for which an active backend is requested. TenantID uint64 `protobuf:"varint,1,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` @@ -200,7 +204,8 @@ func (m *EnsureEndpointRequest) XXX_DiscardUnknown() { var xxx_messageInfo_EnsureEndpointRequest proto.InternalMessageInfo -// EnsureEndpointResponse is empty and indicates that the server processed the request. +// EnsureEndpointResponse is empty and indicates that the server processed the +// request. type EnsureEndpointResponse struct { } @@ -233,8 +238,8 @@ func (m *EnsureEndpointResponse) XXX_DiscardUnknown() { var xxx_messageInfo_EnsureEndpointResponse proto.InternalMessageInfo -// Endpoint contains the information about a tenant endpoint. Most often it is a combination of an ip address and port. -// i.e. 132.130.1.11:34576 +// Endpoint contains the information about a tenant endpoint. Most often it is a +// combination of an ip address and port, e.g. 132.130.1.11:34576. type Endpoint struct { // IP is the ip and port combo identifying the tenant endpoint. IP string `protobuf:"bytes,1,opt,name=IP,proto3" json:"IP,omitempty"` @@ -269,7 +274,8 @@ func (m *Endpoint) XXX_DiscardUnknown() { var xxx_messageInfo_Endpoint proto.InternalMessageInfo -// ListEndpointsResponse is sent back as a result of requesting the list of endpoints for a given tenant. +// ListEndpointsResponse is sent back as a result of requesting the list of +// endpoints for a given tenant. type ListEndpointsResponse struct { // Endpoints is the list of endpoints currently active for the requested tenant. Endpoints []*Endpoint `protobuf:"bytes,1,rep,name=endpoints,proto3" json:"endpoints,omitempty"` @@ -304,7 +310,8 @@ func (m *ListEndpointsResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ListEndpointsResponse proto.InternalMessageInfo -// GetTenantRequest is used by a client to request from the sever metadata related to a given tenant. +// GetTenantRequest is used by a client to request from the sever metadata +// related to a given tenant. type GetTenantRequest struct { // TenantID identifies the tenant for which the metadata is being requested. TenantID uint64 `protobuf:"varint,1,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` @@ -440,13 +447,15 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type DirectoryClient interface { - // ListEndpoints is used to query the server for the list of current endpoints of a given tenant. + // ListEndpoints is used to query the server for the list of current endpoints + // of a given tenant. ListEndpoints(ctx context.Context, in *ListEndpointsRequest, opts ...grpc.CallOption) (*ListEndpointsResponse, error) - // WatchEndpoints is used to get a stream, that is used to receive notifications about changes in tenant - // backend's state - added, modified and deleted. + // WatchEndpoints is used to get a stream, that is used to receive notifications + // about changes in tenant backend's state - added, modified and deleted. WatchEndpoints(ctx context.Context, in *WatchEndpointsRequest, opts ...grpc.CallOption) (Directory_WatchEndpointsClient, error) - // EnsureEndpoint is used to ensure that a tenant's backend is active. If there is an active backend then the - // server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. + // EnsureEndpoint is used to ensure that a tenant's backend is active. If there + // is an active backend then the server doesn't have to do anything. If there + // isn't an active backend, then the server has to bring a new one up. EnsureEndpoint(ctx context.Context, in *EnsureEndpointRequest, opts ...grpc.CallOption) (*EnsureEndpointResponse, error) // GetTenant is used to fetch the metadata of a specific tenant. GetTenant(ctx context.Context, in *GetTenantRequest, opts ...grpc.CallOption) (*GetTenantResponse, error) @@ -521,13 +530,15 @@ func (c *directoryClient) GetTenant(ctx context.Context, in *GetTenantRequest, o // DirectoryServer is the server API for Directory service. type DirectoryServer interface { - // ListEndpoints is used to query the server for the list of current endpoints of a given tenant. + // ListEndpoints is used to query the server for the list of current endpoints + // of a given tenant. ListEndpoints(context.Context, *ListEndpointsRequest) (*ListEndpointsResponse, error) - // WatchEndpoints is used to get a stream, that is used to receive notifications about changes in tenant - // backend's state - added, modified and deleted. + // WatchEndpoints is used to get a stream, that is used to receive notifications + // about changes in tenant backend's state - added, modified and deleted. WatchEndpoints(*WatchEndpointsRequest, Directory_WatchEndpointsServer) error - // EnsureEndpoint is used to ensure that a tenant's backend is active. If there is an active backend then the - // server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. + // EnsureEndpoint is used to ensure that a tenant's backend is active. If there + // is an active backend then the server doesn't have to do anything. If there + // isn't an active backend, then the server has to bring a new one up. EnsureEndpoint(context.Context, *EnsureEndpointRequest) (*EnsureEndpointResponse, error) // GetTenant is used to fetch the metadata of a specific tenant. GetTenant(context.Context, *GetTenantRequest) (*GetTenantResponse, error) diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/directory_test.go b/pkg/ccl/sqlproxyccl/tenant/directory_test.go similarity index 96% rename from pkg/ccl/sqlproxyccl/tenantdirsvr/directory_test.go rename to pkg/ccl/sqlproxyccl/tenant/directory_test.go index 1c50c075c7dc..ccef757e4178 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/directory_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package tenantdirsvr +package tenant_test import ( "context" @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -193,7 +194,7 @@ func TestResume(t *testing.T) { }(i) } - var processes map[net.Addr]*tenant.Process + var processes map[net.Addr]*tenantdirsvr.Process // Eventually the tenant process will be resumed. require.Eventually(t, func() bool { processes = tds.Get(tenantID) @@ -330,9 +331,9 @@ func destroyTenant(tc serverutils.TestClusterInterface, id roachpb.TenantID) err func startTenant( ctx context.Context, srv serverutils.TestServerInterface, id uint64, -) (*tenant.Process, error) { +) (*tenantdirsvr.Process, error) { log.TestingClearServerIdentifiers() - tenantStopper := tenant.NewSubStopper(srv.Stopper()) + tenantStopper := tenantdirsvr.NewSubStopper(srv.Stopper()) t, err := srv.StartTenant( ctx, base.TestTenantArgs{ @@ -348,7 +349,7 @@ func startTenant( if err != nil { return nil, err } - return &tenant.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil + return &tenantdirsvr.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil } // Setup directory that uses a client connected to a test directory server @@ -358,7 +359,7 @@ func newTestDirectory( ) ( tc serverutils.TestClusterInterface, directory *tenant.Directory, - tds *tenant.TestDirectoryServer, + tds *tenantdirsvr.TestDirectoryServer, ) { tc = serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ // We need to start the cluster insecure in order to not @@ -369,9 +370,9 @@ func newTestDirectory( }) clusterStopper := tc.Stopper() var err error - tds, err = tenant.NewTestDirectoryServer(clusterStopper) + tds, err = tenantdirsvr.New(clusterStopper) require.NoError(t, err) - tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenant.Process, error) { + tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenantdirsvr.Process, error) { t.Logf("starting tenant %d", tenantID) process, err := startTenant(ctx, tc.Server(0), tenantID) if err != nil { diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/main_test.go b/pkg/ccl/sqlproxyccl/tenant/main_test.go similarity index 98% rename from pkg/ccl/sqlproxyccl/tenantdirsvr/main_test.go rename to pkg/ccl/sqlproxyccl/tenant/main_test.go index 90199cefb604..78839929b3c2 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/main_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/main_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package tenantdirsvr +package tenant_test import ( "os" diff --git a/pkg/ccl/sqlproxyccl/tenant/mocks_generated.go b/pkg/ccl/sqlproxyccl/tenant/mocks_generated.go deleted file mode 100644 index 15c887ef9ea2..000000000000 --- a/pkg/ccl/sqlproxyccl/tenant/mocks_generated.go +++ /dev/null @@ -1,240 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant (interfaces: DirectoryClient,Directory_WatchEndpointsClient) - -// Package tenant is a generated GoMock package. -package tenant - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - grpc "google.golang.org/grpc" - metadata "google.golang.org/grpc/metadata" -) - -// MockDirectoryClient is a mock of DirectoryClient interface. -type MockDirectoryClient struct { - ctrl *gomock.Controller - recorder *MockDirectoryClientMockRecorder -} - -// MockDirectoryClientMockRecorder is the mock recorder for MockDirectoryClient. -type MockDirectoryClientMockRecorder struct { - mock *MockDirectoryClient -} - -// NewMockDirectoryClient creates a new mock instance. -func NewMockDirectoryClient(ctrl *gomock.Controller) *MockDirectoryClient { - mock := &MockDirectoryClient{ctrl: ctrl} - mock.recorder = &MockDirectoryClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockDirectoryClient) EXPECT() *MockDirectoryClientMockRecorder { - return m.recorder -} - -// EnsureEndpoint mocks base method. -func (m *MockDirectoryClient) EnsureEndpoint(arg0 context.Context, arg1 *EnsureEndpointRequest, arg2 ...grpc.CallOption) (*EnsureEndpointResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "EnsureEndpoint", varargs...) - ret0, _ := ret[0].(*EnsureEndpointResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// EnsureEndpoint indicates an expected call of EnsureEndpoint. -func (mr *MockDirectoryClientMockRecorder) EnsureEndpoint(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureEndpoint", reflect.TypeOf((*MockDirectoryClient)(nil).EnsureEndpoint), varargs...) -} - -// GetTenant mocks base method. -func (m *MockDirectoryClient) GetTenant(arg0 context.Context, arg1 *GetTenantRequest, arg2 ...grpc.CallOption) (*GetTenantResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "GetTenant", varargs...) - ret0, _ := ret[0].(*GetTenantResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetTenant indicates an expected call of GetTenant. -func (mr *MockDirectoryClientMockRecorder) GetTenant(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTenant", reflect.TypeOf((*MockDirectoryClient)(nil).GetTenant), varargs...) -} - -// ListEndpoints mocks base method. -func (m *MockDirectoryClient) ListEndpoints(arg0 context.Context, arg1 *ListEndpointsRequest, arg2 ...grpc.CallOption) (*ListEndpointsResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "ListEndpoints", varargs...) - ret0, _ := ret[0].(*ListEndpointsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListEndpoints indicates an expected call of ListEndpoints. -func (mr *MockDirectoryClientMockRecorder) ListEndpoints(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListEndpoints", reflect.TypeOf((*MockDirectoryClient)(nil).ListEndpoints), varargs...) -} - -// WatchEndpoints mocks base method. -func (m *MockDirectoryClient) WatchEndpoints(arg0 context.Context, arg1 *WatchEndpointsRequest, arg2 ...grpc.CallOption) (Directory_WatchEndpointsClient, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "WatchEndpoints", varargs...) - ret0, _ := ret[0].(Directory_WatchEndpointsClient) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WatchEndpoints indicates an expected call of WatchEndpoints. -func (mr *MockDirectoryClientMockRecorder) WatchEndpoints(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchEndpoints", reflect.TypeOf((*MockDirectoryClient)(nil).WatchEndpoints), varargs...) -} - -// MockDirectory_WatchEndpointsClient is a mock of Directory_WatchEndpointsClient interface. -type MockDirectory_WatchEndpointsClient struct { - ctrl *gomock.Controller - recorder *MockDirectory_WatchEndpointsClientMockRecorder -} - -// MockDirectory_WatchEndpointsClientMockRecorder is the mock recorder for MockDirectory_WatchEndpointsClient. -type MockDirectory_WatchEndpointsClientMockRecorder struct { - mock *MockDirectory_WatchEndpointsClient -} - -// NewMockDirectory_WatchEndpointsClient creates a new mock instance. -func NewMockDirectory_WatchEndpointsClient(ctrl *gomock.Controller) *MockDirectory_WatchEndpointsClient { - mock := &MockDirectory_WatchEndpointsClient{ctrl: ctrl} - mock.recorder = &MockDirectory_WatchEndpointsClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockDirectory_WatchEndpointsClient) EXPECT() *MockDirectory_WatchEndpointsClientMockRecorder { - return m.recorder -} - -// CloseSend mocks base method. -func (m *MockDirectory_WatchEndpointsClient) CloseSend() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CloseSend") - ret0, _ := ret[0].(error) - return ret0 -} - -// CloseSend indicates an expected call of CloseSend. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) CloseSend() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).CloseSend)) -} - -// Context mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Context() context.Context { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Context") - ret0, _ := ret[0].(context.Context) - return ret0 -} - -// Context indicates an expected call of Context. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Context() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Context)) -} - -// Header mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Header() (metadata.MD, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Header") - ret0, _ := ret[0].(metadata.MD) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Header indicates an expected call of Header. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Header() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Header)) -} - -// Recv mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Recv() (*WatchEndpointsResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Recv") - ret0, _ := ret[0].(*WatchEndpointsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Recv indicates an expected call of Recv. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Recv() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Recv)) -} - -// RecvMsg mocks base method. -func (m *MockDirectory_WatchEndpointsClient) RecvMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RecvMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// RecvMsg indicates an expected call of RecvMsg. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).RecvMsg), arg0) -} - -// SendMsg mocks base method. -func (m *MockDirectory_WatchEndpointsClient) SendMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendMsg indicates an expected call of SendMsg. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).SendMsg), arg0) -} - -// Trailer mocks base method. -func (m *MockDirectory_WatchEndpointsClient) Trailer() metadata.MD { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Trailer") - ret0, _ := ret[0].(metadata.MD) - return ret0 -} - -// Trailer indicates an expected call of Trailer. -func (mr *MockDirectory_WatchEndpointsClientMockRecorder) Trailer() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockDirectory_WatchEndpointsClient)(nil).Trailer)) -} diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel index 475c9d728605..66ab50368c40 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/BUILD.bazel @@ -1,30 +1,18 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") -go_test( - name = "tenantdirsvr_test", - srcs = [ - "directory_test.go", - "main_test.go", - ], +go_library( + name = "tenantdirsvr", + srcs = ["test_directory_svr.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr", + visibility = ["//visibility:public"], deps = [ - "//pkg/base", - "//pkg/ccl", - "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/sqlproxyccl/tenant", - "//pkg/ccl/utilccl", "//pkg/roachpb", - "//pkg/security", - "//pkg/security/securitytest", - "//pkg/server", - "//pkg/sql", - "//pkg/testutils/serverutils", - "//pkg/testutils/testcluster", - "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/randutil", "//pkg/util/stop", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/ccl/sqlproxyccl/tenant/test_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go similarity index 85% rename from pkg/ccl/sqlproxyccl/tenant/test_directory_svr.go rename to pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go index d783a81e398d..5aed9494e0fe 100644 --- a/pkg/ccl/sqlproxyccl/tenant/test_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package tenant +package tenantdirsvr import ( "bytes" @@ -18,6 +18,7 @@ import ( "os/exec" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -28,7 +29,7 @@ import ( ) // Making sure that TestDirectoryServer implements the DirectoryServer interface. -var _ DirectoryServer = (*TestDirectoryServer)(nil) +var _ tenant.DirectoryServer = (*TestDirectoryServer)(nil) // Process stores information about a running tenant process. type Process struct { @@ -83,6 +84,26 @@ type TestDirectoryServer struct { } } +// New will create a new server. +func New(stopper *stop.Stopper) (*TestDirectoryServer, error) { + // Determine the path to cockroach executable. + cockroachExecutable, err := os.Executable() + if err != nil { + return nil, err + } + dir := &TestDirectoryServer{ + grpcServer: grpc.NewServer(), + stopper: stopper, + cockroachExecutable: cockroachExecutable, + } + dir.TenantStarterFunc = dir.startTenantLocked + dir.proc.processByAddrByTenantID = map[uint64]map[net.Addr]*Process{} + dir.listen.eventListeners = list.New() + stopper.AddCloser(stop.CloserFn(func() { dir.grpcServer.GracefulStop() })) + tenant.RegisterDirectoryServer(dir.grpcServer, dir) + return dir, nil +} + // Get a tenant's list of endpoints and the process information for each endpoint. func (s *TestDirectoryServer) Get(id roachpb.TenantID) (result map[net.Addr]*Process) { result = make(map[net.Addr]*Process) @@ -100,9 +121,9 @@ func (s *TestDirectoryServer) Get(id roachpb.TenantID) (result map[net.Addr]*Pro // GetTenant returns tenant metadata for a given ID. Hard coded to return // every tenant's cluster name as "tenant-cluster" func (s *TestDirectoryServer) GetTenant( - _ context.Context, _ *GetTenantRequest, -) (*GetTenantResponse, error) { - return &GetTenantResponse{ + _ context.Context, _ *tenant.GetTenantRequest, +) (*tenant.GetTenantResponse, error) { + return &tenant.GetTenantResponse{ ClusterName: "tenant-cluster", }, nil } @@ -110,8 +131,8 @@ func (s *TestDirectoryServer) GetTenant( // ListEndpoints returns a list of tenant process endpoints as well as status of the // processes. func (s *TestDirectoryServer) ListEndpoints( - ctx context.Context, req *ListEndpointsRequest, -) (*ListEndpointsResponse, error) { + ctx context.Context, req *tenant.ListEndpointsRequest, +) (*tenant.ListEndpointsResponse, error) { ctx = logtags.AddTag(ctx, "tenant", req.TenantID) s.proc.RLock() defer s.proc.RUnlock() @@ -120,7 +141,7 @@ func (s *TestDirectoryServer) ListEndpoints( // WatchEndpoints returns a new stream, that can be used to monitor server activity. func (s *TestDirectoryServer) WatchEndpoints( - _ *WatchEndpointsRequest, server Directory_WatchEndpointsServer, + _ *tenant.WatchEndpointsRequest, server tenant.Directory_WatchEndpointsServer, ) error { select { case <-s.stopper.ShouldQuiesce(): @@ -129,7 +150,7 @@ func (s *TestDirectoryServer) WatchEndpoints( } // Make the channel with a small buffer to allow for a burst of notifications // and a slow receiver. - c := make(chan *WatchEndpointsResponse, 10) + c := make(chan *tenant.WatchEndpointsResponse, 10) s.listen.Lock() elem := s.listen.eventListeners.PushBack(c) s.listen.Unlock() @@ -161,17 +182,17 @@ func (s *TestDirectoryServer) WatchEndpoints( return err } -func (s *TestDirectoryServer) notifyEventListenersLocked(req *WatchEndpointsResponse) { +func (s *TestDirectoryServer) notifyEventListenersLocked(req *tenant.WatchEndpointsResponse) { for e := s.listen.eventListeners.Front(); e != nil; { select { - case e.Value.(chan *WatchEndpointsResponse) <- req: + case e.Value.(chan *tenant.WatchEndpointsResponse) <- req: e = e.Next() default: // The receiver is unable to consume fast enough. Close the channel and // remove it from the list. eToClose := e e = e.Next() - close(eToClose.Value.(chan *WatchEndpointsResponse)) + close(eToClose.Value.(chan *tenant.WatchEndpointsResponse)) s.listen.eventListeners.Remove(eToClose) } } @@ -181,8 +202,8 @@ func (s *TestDirectoryServer) notifyEventListenersLocked(req *WatchEndpointsResp // it will start a new one. It will return an error if starting a new tenant // process is impossible. func (s *TestDirectoryServer) EnsureEndpoint( - ctx context.Context, req *EnsureEndpointRequest, -) (*EnsureEndpointResponse, error) { + ctx context.Context, req *tenant.EnsureEndpointRequest, +) (*tenant.EnsureEndpointResponse, error) { select { case <-s.stopper.ShouldQuiesce(): return nil, context.Canceled @@ -194,7 +215,7 @@ func (s *TestDirectoryServer) EnsureEndpoint( s.proc.Lock() defer s.proc.Unlock() - lst, err := s.listLocked(ctx, &ListEndpointsRequest{TenantID: req.TenantID}) + lst, err := s.listLocked(ctx, &tenant.ListEndpointsRequest{TenantID: req.TenantID}) if err != nil { return nil, err } @@ -209,7 +230,7 @@ func (s *TestDirectoryServer) EnsureEndpoint( })) } - return &EnsureEndpointResponse{}, nil + return &tenant.EnsureEndpointResponse{}, nil } // Serve requests on the given listener. @@ -217,36 +238,16 @@ func (s *TestDirectoryServer) Serve(listener net.Listener) error { return s.grpcServer.Serve(listener) } -// NewTestDirectoryServer will create a new server. -func NewTestDirectoryServer(stopper *stop.Stopper) (*TestDirectoryServer, error) { - // Determine the path to cockroach executable. - cockroachExecutable, err := os.Executable() - if err != nil { - return nil, err - } - dir := &TestDirectoryServer{ - grpcServer: grpc.NewServer(), - stopper: stopper, - cockroachExecutable: cockroachExecutable, - } - dir.TenantStarterFunc = dir.startTenantLocked - dir.proc.processByAddrByTenantID = map[uint64]map[net.Addr]*Process{} - dir.listen.eventListeners = list.New() - stopper.AddCloser(stop.CloserFn(func() { dir.grpcServer.GracefulStop() })) - RegisterDirectoryServer(dir.grpcServer, dir) - return dir, nil -} - func (s *TestDirectoryServer) listLocked( - _ context.Context, req *ListEndpointsRequest, -) (*ListEndpointsResponse, error) { + _ context.Context, req *tenant.ListEndpointsRequest, +) (*tenant.ListEndpointsResponse, error) { processByAddr, ok := s.proc.processByAddrByTenantID[req.TenantID] if !ok { - return &ListEndpointsResponse{}, nil + return &tenant.ListEndpointsResponse{}, nil } - resp := ListEndpointsResponse{} + resp := tenant.ListEndpointsResponse{} for addr := range processByAddr { - resp.Endpoints = append(resp.Endpoints, &Endpoint{IP: addr.String()}) + resp.Endpoints = append(resp.Endpoints, &tenant.Endpoint{IP: addr.String()}) } return &resp, nil } @@ -261,8 +262,8 @@ func (s *TestDirectoryServer) registerInstanceLocked(tenantID uint64, process *P s.listen.RLock() defer s.listen.RUnlock() - s.notifyEventListenersLocked(&WatchEndpointsResponse{ - Typ: ADDED, + s.notifyEventListenersLocked(&tenant.WatchEndpointsResponse{ + Typ: tenant.ADDED, IP: process.SQL.String(), TenantID: tenantID, }) @@ -281,8 +282,8 @@ func (s *TestDirectoryServer) deregisterInstance(tenantID uint64, sql net.Addr) s.listen.RLock() defer s.listen.RUnlock() - s.notifyEventListenersLocked(&WatchEndpointsResponse{ - Typ: DELETED, + s.notifyEventListenersLocked(&tenant.WatchEndpointsResponse{ + Typ: tenant.DELETED, IP: sql.String(), TenantID: tenantID, }) diff --git a/pkg/ccl/sqlproxyccl/testserver.crt b/pkg/ccl/sqlproxyccl/testdata/testserver.crt similarity index 100% rename from pkg/ccl/sqlproxyccl/testserver.crt rename to pkg/ccl/sqlproxyccl/testdata/testserver.crt diff --git a/pkg/ccl/sqlproxyccl/testserver.key b/pkg/ccl/sqlproxyccl/testdata/testserver.key similarity index 100% rename from pkg/ccl/sqlproxyccl/testserver.key rename to pkg/ccl/sqlproxyccl/testdata/testserver.key diff --git a/pkg/ccl/sqlproxyccl/testserver_config.cnf b/pkg/ccl/sqlproxyccl/testdata/testserver_config.cnf similarity index 100% rename from pkg/ccl/sqlproxyccl/testserver_config.cnf rename to pkg/ccl/sqlproxyccl/testdata/testserver_config.cnf diff --git a/pkg/ccl/sqlproxyccl/throttler/mocks_generated.go b/pkg/ccl/sqlproxyccl/throttler/mocks_generated.go deleted file mode 100644 index 5499db346db9..000000000000 --- a/pkg/ccl/sqlproxyccl/throttler/mocks_generated.go +++ /dev/null @@ -1,49 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: service.go - -// Package throttler is a generated GoMock package. -package throttler - -import ( - reflect "reflect" - time "time" - - gomock "github.com/golang/mock/gomock" -) - -// MockService is a mock of Service interface. -type MockService struct { - ctrl *gomock.Controller - recorder *MockServiceMockRecorder -} - -// MockServiceMockRecorder is the mock recorder for MockService. -type MockServiceMockRecorder struct { - mock *MockService -} - -// NewMockService creates a new mock instance. -func NewMockService(ctrl *gomock.Controller) *MockService { - mock := &MockService{ctrl: ctrl} - mock.recorder = &MockServiceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockService) EXPECT() *MockServiceMockRecorder { - return m.recorder -} - -// LoginCheck mocks base method. -func (m *MockService) LoginCheck(ipAddress string, now time.Time) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LoginCheck", ipAddress, now) - ret0, _ := ret[0].(error) - return ret0 -} - -// LoginCheck indicates an expected call of LoginCheck. -func (mr *MockServiceMockRecorder) LoginCheck(ipAddress, now interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoginCheck", reflect.TypeOf((*MockService)(nil).LoginCheck), ipAddress, now) -} diff --git a/pkg/ccl/sqlproxyccl/throttler/service.go b/pkg/ccl/sqlproxyccl/throttler/service.go index bac2218ac7e9..679bde955528 100644 --- a/pkg/ccl/sqlproxyccl/throttler/service.go +++ b/pkg/ccl/sqlproxyccl/throttler/service.go @@ -11,8 +11,6 @@ package throttler import "time" -//go:generate mockgen -package=throttler -destination=mocks_generated.go -source=service.go . Service - // Service provides the interface for performing throttle checks before // allowing requests into the managed service system. type Service interface { diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 9681be3fec92..0fe3c1d2f9cd 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -97,7 +97,7 @@ go_library( "//pkg/base", "//pkg/build", "//pkg/ccl/sqlproxyccl", - "//pkg/ccl/sqlproxyccl/tenant", + "//pkg/ccl/sqlproxyccl/tenantdirsvr", "//pkg/cli/cliflags", "//pkg/cli/exit", "//pkg/cli/syncbench", diff --git a/pkg/cli/mt_test_directory.go b/pkg/cli/mt_test_directory.go index 8f4789a37597..e6ad7e8c39c8 100644 --- a/pkg/cli/mt_test_directory.go +++ b/pkg/cli/mt_test_directory.go @@ -15,7 +15,7 @@ import ( "fmt" "net" - "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/spf13/cobra" ) @@ -40,7 +40,7 @@ func runDirectorySvr(cmd *cobra.Command, args []string) (returnErr error) { } defer stopper.Stop(ctx) - tds, err := tenant.NewTestDirectoryServer(stopper) + tds, err := tenantdirsvr.New(stopper) if err != nil { return err } From 0eb21372517426a167b1bcac478cfafc4da65242 Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Thu, 10 Jun 2021 20:01:58 -0700 Subject: [PATCH 2/4] sqlproxyccl: fix logging problem in tests The tests were spewing out 150,000 warnings to the log when run because there is no denylist file present. This commit changes the proxy to not create the denylist service if no denylist file is provided. Release note: None --- pkg/ccl/sqlproxyccl/proxy_handler.go | 14 +++++++++++--- pkg/ccl/sqlproxyccl/proxy_handler_test.go | 15 +++++++++------ pkg/ccl/sqlproxyccl/throttler/local_test.go | 2 +- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index 5e877aa731ac..f7208a92de0e 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -147,8 +147,12 @@ func newProxyHandler( } ctx, _ = stopper.WithCancelOnQuiesce(ctx) - handler.denyListService = denylist.NewDenylistWithFile(ctx, options.Denylist, - denylist.WithPollingInterval(options.PollConfigInterval)) + + // If denylist functionality is requested, create the denylist service. + if options.Denylist != "" { + handler.denyListService = denylist.NewDenylistWithFile(ctx, options.Denylist, + denylist.WithPollingInterval(options.PollConfigInterval)) + } handler.throttleService = throttler.NewLocalService(throttler.WithBaseDelay(options.RatelimitBaseDelay)) handler.connCache = cache.NewCappedConnCache(maxKnownConnCacheSize) @@ -406,8 +410,12 @@ func (handler *proxyHandler) validateAccessAndThrottle( func (handler *proxyHandler) validateAccess( ctx context.Context, tenID roachpb.TenantID, ipAddr string, ) error { - // First validate against the deny list service + // Validate against the denylist service if it exists. list := handler.denyListService + if list == nil { + return nil + } + if entry, err := list.Denied(denylist.DenyEntity{Item: tenID.String(), Type: denylist.ClusterType}); err != nil { // Log error but don't return since this could be transient. log.Errorf(ctx, "could not consult denied list for tenant: %d", tenID.ToUint64()) diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 94273f829ec8..dd842bae9116 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -23,7 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist" - "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" @@ -595,7 +595,10 @@ func TestDenylistUpdate(t *testing.T) { defer sql.Stopper().Stop(ctx) - s, addr := newSecureProxyServer(ctx, t, sql.Stopper(), &ProxyOptions{Denylist: denyList.Name()}) + s, addr := newSecureProxyServer(ctx, t, sql.Stopper(), &ProxyOptions{ + Denylist: denyList.Name(), + PollConfigInterval: 10 * time.Millisecond, + }) defer func() { _ = os.Remove(denyList.Name()) }() url := fmt.Sprintf("postgres://root:admin@%s/defaultdb_29?sslmode=require&options=--cluster=dim-dog-28", addr) @@ -684,11 +687,11 @@ func newDirectoryServer( return err == nil }, 30*time.Second, time.Second) require.NoError(t, err) - tds, err := tenant.NewTestDirectoryServer(tdsStopper) + tds, err := tenantdirsvr.New(tdsStopper) require.NoError(t, err) - tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenant.Process, error) { + tds.TenantStarterFunc = func(ctx context.Context, tenantID uint64) (*tenantdirsvr.Process, error) { log.TestingClearServerIdentifiers() - tenantStopper := tenant.NewSubStopper(tdsStopper) + tenantStopper := tenantdirsvr.NewSubStopper(tdsStopper) ten, err := srv.StartTenant(ctx, base.TestTenantArgs{ Existing: true, TenantID: roachpb.MakeTenantID(tenantID), @@ -699,7 +702,7 @@ func newDirectoryServer( sqlAddr, err := net.ResolveTCPAddr("tcp", ten.SQLAddr()) require.NoError(t, err) ten.PGServer().(*pgwire.Server).TestingSetTrustClientProvidedRemoteAddr(true) - return &tenant.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil + return &tenantdirsvr.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil } go func() { require.NoError(t, tds.Serve(listener)) }() return tdsStopper, listener.Addr().(*net.TCPAddr) diff --git a/pkg/ccl/sqlproxyccl/throttler/local_test.go b/pkg/ccl/sqlproxyccl/throttler/local_test.go index 22bde2acace0..6af98a751aae 100644 --- a/pkg/ccl/sqlproxyccl/throttler/local_test.go +++ b/pkg/ccl/sqlproxyccl/throttler/local_test.go @@ -100,7 +100,7 @@ func TestLocalService_Eviction(t *testing.T) { s := newTestLocalService() s.maxMapSize = 10 - for i := 0; i < 10000; i++ { + for i := 0; i < 20; i++ { ipAddress := fmt.Sprintf("%d", i) _ = s.LoginCheck(ipAddress, s.clock.Now()) require.Less(t, len(s.mu.limiters), 11) From 39956c4529e86969a1b81186db93cce5384c86a4 Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Thu, 10 Jun 2021 20:25:10 -0700 Subject: [PATCH 3/4] sqlproxyccl: regularize comments to not go past 80 char limit Our coding standard is to keep comments within an 80 char limit, assuming two characters per tab. This commit fixes up the code to conform to that. Relase note: None --- pkg/ccl/sqlproxyccl/backend_dialer.go | 4 +- pkg/ccl/sqlproxyccl/cache/cache.go | 3 +- pkg/ccl/sqlproxyccl/denylist/file.go | 24 ++--- pkg/ccl/sqlproxyccl/frontend_admitter.go | 16 ++-- .../idle/idle_disconnect_connection_test.go | 3 +- pkg/ccl/sqlproxyccl/metrics.go | 7 +- pkg/ccl/sqlproxyccl/proxy.go | 8 +- pkg/ccl/sqlproxyccl/proxy_handler.go | 28 +++--- pkg/ccl/sqlproxyccl/server.go | 6 +- pkg/ccl/sqlproxyccl/tenant/directory.go | 85 +++++++++--------- pkg/ccl/sqlproxyccl/tenant/directory.proto | 46 ++++++---- pkg/ccl/sqlproxyccl/tenant/entry.go | 88 ++++++++++--------- .../tenantdirsvr/test_directory_svr.go | 41 ++++----- pkg/ccl/sqlproxyccl/throttler/service.go | 2 +- 14 files changed, 195 insertions(+), 166 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/backend_dialer.go b/pkg/ccl/sqlproxyccl/backend_dialer.go index 70b37b760a47..f83c08cc04cf 100644 --- a/pkg/ccl/sqlproxyccl/backend_dialer.go +++ b/pkg/ccl/sqlproxyccl/backend_dialer.go @@ -42,8 +42,8 @@ var backendDial = func( return conn, nil } -// sslOverlay attempts to upgrade the PG connection to use SSL -// if a tls.Config is specified.. +// sslOverlay attempts to upgrade the PG connection to use SSL if a tls.Config +// is specified. func sslOverlay(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) { if tlsConfig == nil { return conn, nil diff --git a/pkg/ccl/sqlproxyccl/cache/cache.go b/pkg/ccl/sqlproxyccl/cache/cache.go index a5c1553ad823..cd566d26659b 100644 --- a/pkg/ccl/sqlproxyccl/cache/cache.go +++ b/pkg/ccl/sqlproxyccl/cache/cache.go @@ -39,7 +39,8 @@ type cappedConnCache struct { } } -// NewCappedConnCache returns a cache service that has a limit on the total entries. +// NewCappedConnCache returns a cache service that has a limit on the total +// entries. func NewCappedConnCache(maxMapSize int) ConnCache { c := &cappedConnCache{ maxMapSize: maxMapSize, diff --git a/pkg/ccl/sqlproxyccl/denylist/file.go b/pkg/ccl/sqlproxyccl/denylist/file.go index 67bdb768ec37..de389f4e6d6b 100644 --- a/pkg/ccl/sqlproxyccl/denylist/file.go +++ b/pkg/ccl/sqlproxyccl/denylist/file.go @@ -70,11 +70,12 @@ type Denylist struct { ctx context.Context } -// NewDenylistWithFile returns a new denylist that automatically watches updates to a file. -// Note: this currently does not return an error. This is by design, since even if we trouble -// initiating a denylist with file, we can always update the file with correct content during -// runtime. We don't want sqlproxy fail to start just because there's something wrong with -// contents of a denylist file. +// NewDenylistWithFile returns a new denylist that automatically watches updates +// to a file. +// Note: this currently does not return an error. This is by design, since even +// if we trouble initiating a denylist with file, we can always update the file +// with correct content during runtime. We don't want sqlproxy fail to start +// just because there's something wrong with contents of a denylist file. func NewDenylistWithFile(ctx context.Context, filename string, opts ...Option) *Denylist { ret := &Denylist{ pollingInterval: defaultPollingInterval, @@ -89,9 +90,10 @@ func NewDenylistWithFile(ctx context.Context, filename string, opts ...Option) * } err := ret.update(filename) if err != nil { - // don't return just yet; sqlproxy should be able to carry on without a proper denylist - // and we still have a chance to recover. - // TODO(ye): add monitoring for failed updates; we don't want silent failures + // don't return just yet; sqlproxy should be able to carry on without a + // proper denylist and we still have a chance to recover. + // TODO(ye): add monitoring for failed updates; we don't want silent + // failures. log.Errorf(ctx, "error when reading from file %s: %v", filename, err) } @@ -103,7 +105,8 @@ func NewDenylistWithFile(ctx context.Context, filename string, opts ...Option) * // Option allows configuration of a denylist service. type Option func(*Denylist) -// WithPollingInterval specifies interval between polling for config file changes. +// WithPollingInterval specifies interval between polling for config file +// changes. func WithPollingInterval(d time.Duration) Option { return func(dl *Denylist) { dl.pollingInterval = d @@ -163,7 +166,8 @@ func (dl *Denylist) Denied(entity DenyEntity) (*Entry, error) { func (dl *Denylist) watchForUpdate(filename string) { go func() { // TODO(ye): use notification via SIGHUP instead. - // TODO(ye): use inotify or similar mechanism for watching file updates instead of polling. + // TODO(ye): use inotify or similar mechanism for watching file updates + // instead of polling. t := timeutil.NewTimer() defer t.Stop() for { diff --git a/pkg/ccl/sqlproxyccl/frontend_admitter.go b/pkg/ccl/sqlproxyccl/frontend_admitter.go index 7811b1d5f323..4f74aa73c460 100644 --- a/pkg/ccl/sqlproxyccl/frontend_admitter.go +++ b/pkg/ccl/sqlproxyccl/frontend_admitter.go @@ -16,11 +16,11 @@ import ( ) // frontendAdmit is the default implementation of a frontend admitter. It can -// upgrade to an optional SSL connection, and will handle and verify -// the startup message received from the PG SQL client. -// The connection returned should never be nil in case of error. Depending -// on whether the error happened before the connection was upgraded to TLS or not -// it will either be the original or the TLS connection. +// upgrade to an optional SSL connection, and will handle and verify the startup +// message received from the PG SQL client. The connection returned should never +// be nil in case of error. Depending on whether the error happened before the +// connection was upgraded to TLS or not it will either be the original or the +// TLS connection. var frontendAdmit = func( conn net.Conn, incomingTLSConfig *tls.Config, ) (net.Conn, *pgproto3.StartupMessage, error) { @@ -38,9 +38,9 @@ var frontendAdmit = func( switch m.(type) { case *pgproto3.SSLRequest: case *pgproto3.CancelRequest: - // Ignore CancelRequest explicitly. We don't need to do this but it makes - // testing easier by avoiding a call to sendErrToClient on this path - // (which would confuse assertCtx). + // Ignore CancelRequest explicitly. We don't need to do this but it + // makes testing easier by avoiding a call to sendErrToClient on this + // path (which would confuse assertCtx). return conn, nil, nil default: code := codeUnexpectedInsecureStartupMessage diff --git a/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go b/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go index edb074dbd7cb..a929ce0e5ef1 100644 --- a/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go +++ b/pkg/ccl/sqlproxyccl/idle/idle_disconnect_connection_test.go @@ -82,7 +82,8 @@ func benchmarkSocketRead(timeout time.Duration, b *testing.B) { } // No statistically significant difference in a single roundtrip time between -// using and not using deadline as implemented above. Both show the same value in my tests. +// using and not using deadline as implemented above. Both show the same value +// in my tests. // SocketReadWithDeadline-32 11.1µs ± 1% // SocketReadWithoutDeadline-32 11.0µs ± 3% func BenchmarkSocketReadWithoutDeadline(b *testing.B) { diff --git a/pkg/ccl/sqlproxyccl/metrics.go b/pkg/ccl/sqlproxyccl/metrics.go index bea2ff15d3de..7b5afdf2c012 100644 --- a/pkg/ccl/sqlproxyccl/metrics.go +++ b/pkg/ccl/sqlproxyccl/metrics.go @@ -13,8 +13,7 @@ import ( "github.com/cockroachdb/errors" ) -// metrics contains pointers to the metrics for monitoring proxy -// operations. +// metrics contains pointers to the metrics for monitoring proxy operations. type metrics struct { BackendDisconnectCount *metric.Counter IdleDisconnectCount *metric.Counter @@ -112,8 +111,8 @@ func makeProxyMetrics() metrics { } } -// updateForError updates the metrics relevant for the type of the -// error message. +// updateForError updates the metrics relevant for the type of the error +// message. func (metrics *metrics) updateForError(err error) { if err == nil { return diff --git a/pkg/ccl/sqlproxyccl/proxy.go b/pkg/ccl/sqlproxyccl/proxy.go index cdd275b73e61..2f6484ccfbf6 100644 --- a/pkg/ccl/sqlproxyccl/proxy.go +++ b/pkg/ccl/sqlproxyccl/proxy.go @@ -87,10 +87,10 @@ func connectionCopy(crdbConn, conn net.Conn) error { select { // NB: when using pgx, we see a nil errIncoming first on clean connection // termination. Using psql I see a nil errOutgoing first. I think the PG - // protocol stipulates sending a message to the server at which point - // the server closes the connection (errIncoming), but presumably the - // client gets to close the connection once it's sent that message, - // meaning either case is possible. + // protocol stipulates sending a message to the server at which point the + // server closes the connection (errIncoming), but presumably the client + // gets to close the connection once it's sent that message, meaning either + // case is possible. case err := <-errIncoming: if err == nil { return nil diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index f7208a92de0e..c3e3008400bf 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -65,13 +65,14 @@ type ProxyOptions struct { Denylist string // ListenAddr is the listen address for incoming connections. ListenAddr string - // ListenCert is the file containing PEM-encoded x509 certificate for listen address. - // Set to "*" to auto-generate self-signed cert. + // ListenCert is the file containing PEM-encoded x509 certificate for listen + // address. Set to "*" to auto-generate self-signed cert. ListenCert string // ListenKey is the file containing PEM-encoded x509 key for listen address. // Set to "*" to auto-generate self-signed cert. ListenKey string - // MetricsAddress is the listen address for incoming connections for metrics retrieval. + // MetricsAddress is the listen address for incoming connections for metrics + // retrieval. MetricsAddress string // SkipVerify if set will skip the identity verification of the // backend. This is for testing only. @@ -82,9 +83,9 @@ type ProxyOptions struct { // connection. Optionally use '{{clusterName}}' // which will be substituted with the cluster name. RoutingRule string - // DirectoryAddr specified optional {HOSTNAME}:{PORT} for service that does the resolution - // from backend id to IP address. If specified - it will be used instead of the - // routing rule above. + // DirectoryAddr specified optional {HOSTNAME}:{PORT} for service that does + // the resolution from backend id to IP address. If specified - it will be + // used instead of the routing rule above. DirectoryAddr string // RatelimitBaseDelay is the initial backoff after a failed login attempt. // Set to 0 to disable rate limiting. @@ -92,9 +93,11 @@ type ProxyOptions struct { // ValidateAccessInterval is the time interval between validations, confirming // that current connections are still valid. ValidateAccessInterval time.Duration - // PollConfigInterval defines polling interval for pickup up changes in config file. + // PollConfigInterval defines polling interval for pickup up changes in + // config file. PollConfigInterval time.Duration - // IdleTimeout if set, will close connections that have been idle for that duration. + // IdleTimeout if set, will close connections that have been idle for that + // duration. IdleTimeout time.Duration } @@ -311,9 +314,9 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error ctx, cancel := context.WithCancel(ctx) defer cancel() - // TODO(darinpp): starting a new go routine for every connection here is inefficient. - // Change to maintain a map of connections by IP/tenant and have single - // go routine that closes connections that are blocklisted. + // TODO(darinpp): starting a new go routine for every connection here is + // inefficient. Change to maintain a map of connections by IP/tenant and + // have single go routine that closes connections that are blocklisted. go func() { errExpired <- func(ctx context.Context) error { t := timeutil.NewTimer() @@ -378,7 +381,8 @@ func (handler *proxyHandler) outgoingAddress( return addr, nil } - // This doesn't verify the name part of the tenant and relies just on the int id. + // This doesn't verify the name part of the tenant and relies just on the + // int id. addr, err := handler.directory.EnsureTenantIP(ctx, tenID, name) if err != nil { return "", err diff --git a/pkg/ccl/sqlproxyccl/server.go b/pkg/ccl/sqlproxyccl/server.go index e772e8e9959a..e5ac4b56a43e 100644 --- a/pkg/ccl/sqlproxyccl/server.go +++ b/pkg/ccl/sqlproxyccl/server.go @@ -29,9 +29,9 @@ import ( // individual new incoming connection. type proxyConnHandler func(ctx context.Context, proxyConn *conn) error -// Server is a TCP server that proxies SQL connections to a -// configurable backend. It may also run an HTTP server to expose a -// health check and prometheus metrics. +// Server is a TCP server that proxies SQL connections to a configurable +// backend. It may also run an HTTP server to expose a health check and +// prometheus metrics. type Server struct { Stopper *stop.Stopper connHandler proxyConnHandler diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.go b/pkg/ccl/sqlproxyccl/tenant/directory.go index 7e49ed451247..7bf39df10ff8 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.go @@ -34,9 +34,9 @@ type dirOptions struct { type DirOption func(opts *dirOptions) // RefreshDelay specifies the minimum amount of time that must elapse between -// attempts to refresh endpoints for a given tenant after ReportFailure is called. -// This delay has the effect of throttling calls to directory server, in order to -// avoid overloading it. +// attempts to refresh endpoints for a given tenant after ReportFailure is +// called. This delay has the effect of throttling calls to directory server, in +// order to avoid overloading it. // // RefreshDelay defaults to 100ms. Use -1 to never throttle. func RefreshDelay(delay time.Duration) func(opts *dirOptions) { @@ -45,14 +45,15 @@ func RefreshDelay(delay time.Duration) func(opts *dirOptions) { } } -// Directory tracks the network locations of SQL tenant processes. It is used by the -// sqlproxy to route incoming traffic to the correct backend process. Process -// information is populated and kept relatively up-to-date using a streaming watcher. -// However, since watchers deliver slightly stale information, the directory will also make -// direct server calls to fetch the latest information about a process that is not yet -// in the cache, or when a process is suspected to have failed. When a new tenant is -// created, or is resumed from suspension, this capability allows the directory -// to immediately return the IP address for the new process. +// Directory tracks the network locations of SQL tenant processes. It is used by +// the sqlproxy to route incoming traffic to the correct backend process. +// Process information is populated and kept relatively up-to-date using a +// streaming watcher. However, since watchers deliver slightly stale +// information, the directory will also make direct server calls to fetch the +// latest information about a process that is not yet in the cache, or when a +// process is suspected to have failed. When a new tenant is created, or is +// resumed from suspension, this capability allows the directory to immediately +// return the IP address for the new process. // // All methods in the directory are thread-safe. Methods are intended to be // called concurrently by many threads at once, and so locking is carefully @@ -61,7 +62,8 @@ func RefreshDelay(delay time.Duration) func(opts *dirOptions) { // has its own locks that are used to synchronize per-tenant operations such as // making directory server calls to fetch updated tenant information. type Directory struct { - // client is the directory client instance used to make directory server calls. + // client is the directory client instance used to make directory server + // calls. client DirectoryClient // stopper use used for graceful shutdown of the endpoint watcher. @@ -71,8 +73,8 @@ type Directory struct { options dirOptions // mut synchronizes access to the in-memory tenant entry caches. Take care - // to never hold this lock during directory server calls - it should only be used - // while adding and removing tenant entries to/from the caches. + // to never hold this lock during directory server calls - it should only be + // used while adding and removing tenant entries to/from the caches. mut struct { syncutil.Mutex @@ -83,11 +85,12 @@ type Directory struct { } } -// NewDirectory constructs a new Directory instance that tracks SQL tenant processes -// managed by a given Directory server. The given context is used for tracing -// endpoint watcher activity. +// NewDirectory constructs a new Directory instance that tracks SQL tenant +// processes managed by a given Directory server. The given context is used for +// tracing endpoint watcher activity. // -// NOTE: stopper.Stop must be called on the directory when it is no longer needed. +// NOTE: stopper.Stop must be called on the directory when it is no longer +// needed. func NewDirectory( ctx context.Context, stopper *stop.Stopper, client DirectoryClient, opts ...DirOption, ) (*Directory, error) { @@ -110,16 +113,16 @@ func NewDirectory( return dir, nil } -// EnsureTenantIP returns the IP address of one of the given tenant's SQL processes. -// If the tenant was just created or is suspended, such that there are no -// available processes, then EnsureTenantIP will trigger resumption of a new instance and -// block until the process is ready. If there are multiple processes for -// the tenant, then LookupTenantIPs will choose one of them (note that currently -// there is always at most one SQL process per tenant). +// EnsureTenantIP returns the IP address of one of the given tenant's SQL +// processes. If the tenant was just created or is suspended, such that there +// are no available processes, then EnsureTenantIP will trigger resumption of a +// new instance and block until the process is ready. If there are multiple +// processes for the tenant, then LookupTenantIPs will choose one of them (note +// that currently there is always at most one SQL process per tenant). // -// If clusterName is non-empty, then an error is returned if no endpoints match the -// cluster name. This can be used to ensure that the incoming SQL connection "knows" -// some additional information about the tenant, such as the name of the +// If clusterName is non-empty, then an error is returned if no endpoints match +// the cluster name. This can be used to ensure that the incoming SQL connection +// "knows" some additional information about the tenant, such as the name of the // cluster, before being allowed to connect. func (d *Directory) EnsureTenantIP( ctx context.Context, tenantID roachpb.TenantID, clusterName string, @@ -145,10 +148,10 @@ func (d *Directory) EnsureTenantIP( return ip, err } -// LookupTenantIPs returns the IP addresses for all available SQL processes for the -// given tenant. It returns an error if the tenant has not yet been created. If -// no processes are available for the tenant, LookupTenantIPs will return the empty -// set (unlike EnsureTenantIP). +// LookupTenantIPs returns the IP addresses for all available SQL processes for +// the given tenant. It returns an error if the tenant has not yet been created. +// If no processes are available for the tenant, LookupTenantIPs will return the +// empty set (unlike EnsureTenantIP). func (d *Directory) LookupTenantIPs( ctx context.Context, tenantID roachpb.TenantID, ) ([]string, error) { @@ -169,9 +172,9 @@ func (d *Directory) LookupTenantIPs( // ReportFailure will attempt to refresh the cache with the latest information // about available tenant processes. // TODO(andyk): In the future, the ip parameter will be used to mark a -// particular endpoint as "unhealthy" so that it's less likely to be chosen. However, -// today there can be at most one endpoint for a given tenant, so it must always be -// chosen. Keep the parameter as a placeholder for the future. +// particular endpoint as "unhealthy" so that it's less likely to be chosen. +// However, today there can be at most one endpoint for a given tenant, so it +// must always be chosen. Keep the parameter as a placeholder for the future. func (d *Directory) ReportFailure(ctx context.Context, tenantID roachpb.TenantID, ip string) error { entry, err := d.getEntry(ctx, tenantID, false /* allowCreate */) if err != nil { @@ -251,16 +254,16 @@ func (d *Directory) deleteEntry(tenantID roachpb.TenantID) { delete(d.mut.tenants, tenantID) } -// watchEndpoints establishes a watcher that looks for changes to tenant endpoint addresses. -// Whenever tenant processes start or terminate, the watcher will get -// a notification and update the directory to reflect that change. +// watchEndpoints establishes a watcher that looks for changes to tenant +// endpoint addresses. Whenever tenant processes start or terminate, the watcher +// will get a notification and update the directory to reflect that change. func (d *Directory) watchEndpoints(ctx context.Context, stopper *stop.Stopper) error { req := WatchEndpointsRequest{} - // The loop that processes the event stream is running in a separate go routine. - // It is desirable however, before we return, to have a guarantee that the - // separate go routine started processing events. This wait group helps us - // achieve this. Without the wait group, it will be possible to: + // The loop that processes the event stream is running in a separate go + // routine. It is desirable however, before we return, to have a guarantee + // that the separate go routine started processing events. This wait group + // helps us achieve this. Without the wait group, it will be possible to: // // 1. call watchEndpoints // 2. call EnsureTenantIP diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.proto b/pkg/ccl/sqlproxyccl/tenant/directory.proto index f62217774d43..682fdb08cf4c 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.proto +++ b/pkg/ccl/sqlproxyccl/tenant/directory.proto @@ -15,7 +15,8 @@ import "gogoproto/gogo.proto"; // WatchEndpointsRequest is empty as we want to get all notifications. message WatchEndpointsRequest {} -// EventType shows the event type of the notifications that the server streams to its clients. +// EventType shows the event type of the notifications that the server streams +// to its clients. enum EventType { option (gogoproto.goproto_enum_prefix) = false; @@ -24,8 +25,8 @@ enum EventType { DELETED = 2; } -// WatchEndpointsResponse represents the notifications that the server sends to its clients when clients -// want to monitor the directory server activity. +// WatchEndpointsResponse represents the notifications that the server sends to +// its clients when clients want to monitor the directory server activity. message WatchEndpointsResponse { // EventType is the type of the notifications - added, modified, deleted. EventType typ = 1; @@ -35,37 +36,43 @@ message WatchEndpointsResponse { uint64 tenant_id = 3[(gogoproto.customname) = "TenantID"]; } -// ListEndpointsRequest is used to query the server for the list of current endpoints of a given tenant. +// ListEndpointsRequest is used to query the server for the list of current +// endpoints of a given tenant. message ListEndpointsRequest { - // TenantID identifies the tenant for which the client is requesting a list of the endpoints. + // TenantID identifies the tenant for which the client is requesting a list of + // the endpoints. uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; } -// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If there is an active backend then the -// server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. +// EnsureEndpointRequest is used to ensure that a tenant's backend is active. If +// there is an active backend then the server doesn't have to do anything. If +// there isn't an active backend, then the server has to bring a new one up. message EnsureEndpointRequest { // TenantID is the id of the tenant for which an active backend is requested. uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; } -// EnsureEndpointResponse is empty and indicates that the server processed the request. +// EnsureEndpointResponse is empty and indicates that the server processed the +// request. message EnsureEndpointResponse { } -// Endpoint contains the information about a tenant endpoint. Most often it is a combination of an ip address and port. -// i.e. 132.130.1.11:34576 +// Endpoint contains the information about a tenant endpoint. Most often it is a +// combination of an ip address and port, e.g. 132.130.1.11:34576. message Endpoint { // IP is the ip and port combo identifying the tenant endpoint. string IP = 1[(gogoproto.customname) = "IP"]; } -// ListEndpointsResponse is sent back as a result of requesting the list of endpoints for a given tenant. +// ListEndpointsResponse is sent back as a result of requesting the list of +// endpoints for a given tenant. message ListEndpointsResponse { // Endpoints is the list of endpoints currently active for the requested tenant. repeated Endpoint endpoints = 1; } -// GetTenantRequest is used by a client to request from the sever metadata related to a given tenant. +// GetTenantRequest is used by a client to request from the sever metadata +// related to a given tenant. message GetTenantRequest { // TenantID identifies the tenant for which the metadata is being requested. uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; @@ -77,15 +84,18 @@ message GetTenantResponse { string cluster_name = 1; // add more metadata if needed } -// Directory specifies a service that keeps track and manages tenant backends, related metadata and their endpoints. +// Directory specifies a service that keeps track and manages tenant backends, +// related metadata and their endpoints. service Directory { - // ListEndpoints is used to query the server for the list of current endpoints of a given tenant. + // ListEndpoints is used to query the server for the list of current endpoints + // of a given tenant. rpc ListEndpoints(ListEndpointsRequest) returns (ListEndpointsResponse); - // WatchEndpoints is used to get a stream, that is used to receive notifications about changes in tenant - // backend's state - added, modified and deleted. + // WatchEndpoints is used to get a stream, that is used to receive notifications + // about changes in tenant backend's state - added, modified and deleted. rpc WatchEndpoints(WatchEndpointsRequest) returns (stream WatchEndpointsResponse); - // EnsureEndpoint is used to ensure that a tenant's backend is active. If there is an active backend then the - // server doesn't have to do anything. If there isn't an active backend, then the server has to bring a new one up. + // EnsureEndpoint is used to ensure that a tenant's backend is active. If there + // is an active backend then the server doesn't have to do anything. If there + // isn't an active backend, then the server has to bring a new one up. rpc EnsureEndpoint(EnsureEndpointRequest) returns (EnsureEndpointResponse); // GetTenant is used to fetch the metadata of a specific tenant. rpc GetTenant(GetTenantRequest) returns (GetTenantResponse); diff --git a/pkg/ccl/sqlproxyccl/tenant/entry.go b/pkg/ccl/sqlproxyccl/tenant/entry.go index 34b681231cd4..c4cd7b9e43d3 100644 --- a/pkg/ccl/sqlproxyccl/tenant/entry.go +++ b/pkg/ccl/sqlproxyccl/tenant/entry.go @@ -21,8 +21,8 @@ import ( ) // tenantEntry is an entry in the tenant directory that records information -// about a single tenant, including its ID, cluster name, and the IP addresses for -// available endpoints. +// about a single tenant, including its ID, cluster name, and the IP addresses +// for available endpoints. type tenantEntry struct { // These fields can be read by callers without synchronization, since // they're written once during initialization, and are immutable thereafter. @@ -35,7 +35,8 @@ type tenantEntry struct { ClusterName string // RefreshDelay is the minimum amount of time that must elapse between - // attempts to refresh endpoints for this tenant after ReportFailure is called. + // attempts to refresh endpoints for this tenant after ReportFailure is + // called. RefreshDelay time.Duration // initialized is set to true once Initialized has been called. @@ -45,27 +46,27 @@ type tenantEntry struct { // error occurred). initError error - // endpoints synchronizes access to information about the tenant's SQL endpoints. - // These fields can be updated over time, so a lock must be obtained before - // accessing them. + // endpoints synchronizes access to information about the tenant's SQL + // endpoints. These fields can be updated over time, so a lock must be + // obtained before accessing them. endpoints struct { syncutil.Mutex ips []string } // calls synchronizes calls to the K8s API for this tenant (e.g. calls to - // RefreshEndpoints). Synchronization is needed to ensure that only one thread at - // a time is calling on behalf of a tenant, and that calls are rate limited - // to prevent storms. + // RefreshEndpoints). Synchronization is needed to ensure that only one + // thread at a time is calling on behalf of a tenant, and that calls are rate + // limited to prevent storms. calls struct { syncutil.Mutex lastRefresh time.Time } } -// Initialize fetches metadata about a tenant, such as its cluster name, and stores -// that in the entry. After this is called once, all future calls return the -// same result (and do nothing). +// Initialize fetches metadata about a tenant, such as its cluster name, and +// stores that in the entry. After this is called once, all future calls return +// the same result (and do nothing). func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) error { // Synchronize multiple threads trying to initialize. Only the first thread // does the initialization. @@ -90,8 +91,9 @@ func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) er return nil } -// RefreshEndpoints makes a synchronous directory server call to fetch the latest information -// about the tenant's available endpoints, such as their IP addresses. +// RefreshEndpoints makes a synchronous directory server call to fetch the +// latest information about the tenant's available endpoints, such as their IP +// addresses. func (e *tenantEntry) RefreshEndpoints(ctx context.Context, client DirectoryClient) error { if !e.initialized { return errors.AssertionFailedf("entry for tenant %d is not initialized", e.TenantID) @@ -114,14 +116,15 @@ func (e *tenantEntry) RefreshEndpoints(ctx context.Context, client DirectoryClie return err } -// ChooseEndpointIP returns the IP address of one of this tenant's available endpoints. -// If a tenant has multiple endpoints, then ChooseEndpointIP returns the IP address of one -// of those endpoints. If the tenant is suspended and no endpoints are available, then -// ChooseEndpointIP will trigger resumption of the tenant and return the IP address -// of the new endpoint. Note that resuming a tenant requires directory server calls, so -// ChooseEndpointIP can block for some time, until the resumption process is -// complete. However, if errorIfNoEndpoints is true, then ChooseEndpointIP returns an -// error if there are no endpoints available rather than blocking. +// ChooseEndpointIP returns the IP address of one of this tenant's available +// endpoints. If a tenant has multiple endpoints, then ChooseEndpointIP returns +// the IP address of one of those endpoints. If the tenant is suspended and no +// endpoints are available, then ChooseEndpointIP will trigger resumption of the +// tenant and return the IP address of the new endpoint. Note that resuming a +// tenant requires directory server calls, so ChooseEndpointIP can block for +// some time, until the resumption process is complete. However, if +// errorIfNoEndpoints is true, then ChooseEndpointIP returns an error if there +// are no endpoints available rather than blocking. // // TODO(andyk): Use better load-balancing algorithm once tenants can have more // than one endpoint. @@ -134,9 +137,9 @@ func (e *tenantEntry) ChooseEndpointIP( ips := e.getEndpointIPs() if len(ips) == 0 { - // There are no known endpoint IP addresses, so fetch endpoint information from - // the directory server. Resume the tenant if it is suspended; that will - // always result in at least one endpoint IP address (or an error). + // There are no known endpoint IP addresses, so fetch endpoint information + // from the directory server. Resume the tenant if it is suspended; that + // will always result in at least one endpoint IP address (or an error). var err error if ips, err = e.ensureTenantEndpoint(ctx, client, errorIfNoEndpoints); err != nil { return "", err @@ -145,8 +148,8 @@ func (e *tenantEntry) ChooseEndpointIP( return ips[0], nil } -// AddEndpointIP inserts the given IP address into the tenant's list of Endpoint IPs. If -// it is already present, then AddEndpointIP returns false. +// AddEndpointIP inserts the given IP address into the tenant's list of Endpoint +// IPs. If it is already present, then AddEndpointIP returns false. func (e *tenantEntry) AddEndpointIP(ip string) bool { e.endpoints.Lock() defer e.endpoints.Unlock() @@ -161,8 +164,8 @@ func (e *tenantEntry) AddEndpointIP(ip string) bool { return true } -// RemoveEndpointIP removes the given IP address from the tenant's list of Endpoint IPs. -// If it was not present, RemoveEndpointIP returns false. +// RemoveEndpointIP removes the given IP address from the tenant's list of +// Endpoint IPs. If it was not present, RemoveEndpointIP returns false. func (e *tenantEntry) RemoveEndpointIP(ip string) bool { e.endpoints.Lock() defer e.endpoints.Unlock() @@ -177,8 +180,8 @@ func (e *tenantEntry) RemoveEndpointIP(ip string) bool { return false } -// getEndpointIPs gets the current list of endpoint IP addresses within scope of lock and -// returns them. +// getEndpointIPs gets the current list of endpoint IP addresses within scope of +// lock and returns them. func (e *tenantEntry) getEndpointIPs() []string { e.endpoints.Lock() defer e.endpoints.Unlock() @@ -187,8 +190,8 @@ func (e *tenantEntry) getEndpointIPs() []string { // ensureTenantEndpoint ensures that at least one SQL process exists for this // tenant, and is ready for connection attempts to its IP address. If -// errorIfNoEndpoints is true, then ensureTenantEndpoint returns an error if there are no -// endpoints available rather than blocking. +// errorIfNoEndpoints is true, then ensureTenantEndpoint returns an error if +// there are no endpoints available rather than blocking. func (e *tenantEntry) ensureTenantEndpoint( ctx context.Context, client DirectoryClient, errorIfNoEndpoints bool, ) (ips []string, err error) { @@ -228,9 +231,10 @@ func (e *tenantEntry) ensureTenantEndpoint( } } - // Get endpoint information for the newly resumed tenant. Except in rare race - // conditions, this is expected to immediately find an IP address, since - // the above call started a tenant process that already has an IP address. + // Get endpoint information for the newly resumed tenant. Except in rare + // race conditions, this is expected to immediately find an IP address, + // since the above call started a tenant process that already has an IP + // address. ips, err = e.fetchEndpointsLocked(ctx, client) if err != nil { return nil, err @@ -250,8 +254,9 @@ func (e *tenantEntry) ensureTenantEndpoint( return ips, nil } -// fetchEndpointsLocked makes a synchronous directory server call to get the latest -// information about the tenant's available endpoints, such as their IP addresses. +// fetchEndpointsLocked makes a synchronous directory server call to get the +// latest information about the tenant's available endpoints, such as their IP +// addresses. // // NOTE: Caller must lock the "calls" mutex before calling fetchEndpointsLocked. func (e *tenantEntry) fetchEndpointsLocked( @@ -263,7 +268,8 @@ func (e *tenantEntry) fetchEndpointsLocked( return nil, err } - // Get updated list of running process endpoint IP addresses and save it to the entry. + // Get updated list of running process endpoint IP addresses and save it to + // the entry. ips = make([]string, 0, len(list.Endpoints)) for i := range list.Endpoints { endpoint := list.Endpoints[i] @@ -284,8 +290,8 @@ func (e *tenantEntry) fetchEndpointsLocked( } // canRefreshLocked returns true if it's been at least X milliseconds since the -// last time the tenant endpoint information was refreshed. This has the effect of -// rate limiting RefreshEndpoints calls. +// last time the tenant endpoint information was refreshed. This has the effect +// of rate limiting RefreshEndpoints calls. // // NOTE: Caller must lock the "calls" mutex before calling canRefreshLocked. func (e *tenantEntry) canRefreshLocked() bool { diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go index 5aed9494e0fe..37e96fc8b7a3 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go @@ -28,7 +28,7 @@ import ( "google.golang.org/grpc" ) -// Making sure that TestDirectoryServer implements the DirectoryServer interface. +// Make sure that TestDirectoryServer implements the DirectoryServer interface. var _ tenant.DirectoryServer = (*TestDirectoryServer)(nil) // Process stores information about a running tenant process. @@ -38,12 +38,11 @@ type Process struct { SQL net.Addr } -// NewSubStopper creates a new stopper that will be stopped -// when either the parent is stopped or its own Stop is called. -// The code is slightly more complicated that simply calling -// NewStopper followed by AddCloser since there is a possibility that between -// the two calls, the parent stopper completes a stop and then the leak detection -// may find a leaked stopper. +// NewSubStopper creates a new stopper that will be stopped when either the +// parent is stopped or its own Stop is called. The code is slightly more +// complicated that simply calling NewStopper followed by AddCloser since there +// is a possibility that between the two calls, the parent stopper completes a +// stop and then the leak detection may find a leaked stopper. func NewSubStopper(parentStopper *stop.Stopper) *stop.Stopper { mu := &syncutil.Mutex{} var subStopper *stop.Stopper @@ -63,8 +62,8 @@ func NewSubStopper(parentStopper *stop.Stopper) *stop.Stopper { return subStopper } -// TestDirectoryServer is a directory server implementation that is used -// for testing. +// TestDirectoryServer is a directory server implementation that is used for +// testing. type TestDirectoryServer struct { stopper *stop.Stopper grpcServer *grpc.Server @@ -72,8 +71,8 @@ type TestDirectoryServer struct { // TenantStarterFunc will be used to launch a new tenant process. TenantStarterFunc func(ctx context.Context, tenantID uint64) (*Process, error) - // When both mutexes need to be held, the locking should always be - // proc first and listen second. + // When both mutexes need to be held, the locking should always be proc + // first and listen second. proc struct { syncutil.RWMutex processByAddrByTenantID map[uint64]map[net.Addr]*Process @@ -104,7 +103,8 @@ func New(stopper *stop.Stopper) (*TestDirectoryServer, error) { return dir, nil } -// Get a tenant's list of endpoints and the process information for each endpoint. +// Get a tenant's list of endpoints and the process information for each +// endpoint. func (s *TestDirectoryServer) Get(id roachpb.TenantID) (result map[net.Addr]*Process) { result = make(map[net.Addr]*Process) s.proc.RLock() @@ -118,8 +118,8 @@ func (s *TestDirectoryServer) Get(id roachpb.TenantID) (result map[net.Addr]*Pro return } -// GetTenant returns tenant metadata for a given ID. Hard coded to return -// every tenant's cluster name as "tenant-cluster" +// GetTenant returns tenant metadata for a given ID. Hard coded to return every +// tenant's cluster name as "tenant-cluster" func (s *TestDirectoryServer) GetTenant( _ context.Context, _ *tenant.GetTenantRequest, ) (*tenant.GetTenantResponse, error) { @@ -128,8 +128,8 @@ func (s *TestDirectoryServer) GetTenant( }, nil } -// ListEndpoints returns a list of tenant process endpoints as well as status of the -// processes. +// ListEndpoints returns a list of tenant process endpoints as well as status of +// the processes. func (s *TestDirectoryServer) ListEndpoints( ctx context.Context, req *tenant.ListEndpointsRequest, ) (*tenant.ListEndpointsResponse, error) { @@ -139,7 +139,8 @@ func (s *TestDirectoryServer) ListEndpoints( return s.listLocked(ctx, req) } -// WatchEndpoints returns a new stream, that can be used to monitor server activity. +// WatchEndpoints returns a new stream, that can be used to monitor server +// activity. func (s *TestDirectoryServer) WatchEndpoints( _ *tenant.WatchEndpointsRequest, server tenant.Directory_WatchEndpointsServer, ) error { @@ -198,9 +199,9 @@ func (s *TestDirectoryServer) notifyEventListenersLocked(req *tenant.WatchEndpoi } } -// EnsureEndpoint will ensure that there is either an already active tenant process or -// it will start a new one. It will return an error if starting a new tenant -// process is impossible. +// EnsureEndpoint will ensure that there is either an already active tenant +// process or it will start a new one. It will return an error if starting a new +// tenant process is impossible. func (s *TestDirectoryServer) EnsureEndpoint( ctx context.Context, req *tenant.EnsureEndpointRequest, ) (*tenant.EnsureEndpointResponse, error) { diff --git a/pkg/ccl/sqlproxyccl/throttler/service.go b/pkg/ccl/sqlproxyccl/throttler/service.go index 679bde955528..e91d5a9cfe77 100644 --- a/pkg/ccl/sqlproxyccl/throttler/service.go +++ b/pkg/ccl/sqlproxyccl/throttler/service.go @@ -12,7 +12,7 @@ package throttler import "time" // Service provides the interface for performing throttle checks before -// allowing requests into the managed service system. +// requests into the managed service system. type Service interface { // LoginCheck determines whether a login request should be allowed to // proceed. It rate limits login attempts from IP addresses and it rate From 9f583e3bdda081ee5623803534802378689d0a2f Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Thu, 10 Jun 2021 22:33:09 -0700 Subject: [PATCH 4/4] sqlproxy: fix some small issues in the code 1. updateMetricsAndSendErrToClient was called twice in proxyHandler. 2. Use GRPC NotFound error rather than error with text "not found". 3. Remove unnecessary code in ensureTenantEndpoint that was fetching endpoints before calling EnsureEndpoint. Release note: None --- pkg/ccl/sqlproxyccl/BUILD.bazel | 2 +- pkg/ccl/sqlproxyccl/proxy_handler.go | 56 +++++++++++++++----- pkg/ccl/sqlproxyccl/tenant/directory.go | 30 +++++++---- pkg/ccl/sqlproxyccl/tenant/directory_test.go | 21 +++++--- pkg/ccl/sqlproxyccl/tenant/entry.go | 20 ++----- pkg/cli/mt_test_directory.go | 5 +- pkg/testutils/lint/lint_test.go | 2 +- 7 files changed, 85 insertions(+), 51 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/BUILD.bazel b/pkg/ccl/sqlproxyccl/BUILD.bazel index 3fa48fa37e23..af3926bab327 100644 --- a/pkg/ccl/sqlproxyccl/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/BUILD.bazel @@ -36,10 +36,10 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", - "@com_github_gogo_status//:status", "@com_github_jackc_pgproto3_v2//:pgproto3", "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", ], ) diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index c3e3008400bf..c650679750da 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -34,6 +34,8 @@ import ( "github.com/cockroachdb/logtags" "github.com/jackc/pgproto3/v2" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ( @@ -188,7 +190,7 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error } // This currently only happens for CancelRequest type of startup messages - // that we don't support + // that we don't support. if msg == nil { return nil } @@ -228,15 +230,30 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error var crdbConn net.Conn var outgoingAddress string - retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond, MaxBackoff: time.Second} + + // Repeatedly try to make a connection. Any failures are assumed to be + // transient unless the tenant cannot be found (e.g. because it was + // deleted). We will simply loop forever, or until the context is canceled + // (e.g. by client disconnect). This is preferable to terminating client + // connections, because in most cases those connections will simply be + // retried, further increasing load on the system. + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Millisecond, + MaxBackoff: 5 * time.Second, + } + outgoingAddressErr := log.Every(time.Minute) backendDialErr := log.Every(time.Minute) reportFailureErr := log.Every(time.Minute) var outgoingAddressErrs, codeBackendDownErrs, reportFailureErrs int + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + // Get the DNS/IP address of the backend server to dial. outgoingAddress, err = handler.outgoingAddress(ctx, clusterName, tenID) if err != nil { - if err.Error() != "not found" { + // Failure is assumed to be transient (and should be retried) except + // in case where the server was not found. + if status.Code(err) != codes.NotFound { outgoingAddressErrs++ if outgoingAddressErr.ShouldLog() { log.Ops.Errorf(ctx, @@ -248,19 +265,24 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error } continue } - clientErr := newErrorf(codeParamsRoutingFailed, "cluster %s-%d not found", clusterName, tenID.ToUint64()) - updateMetricsAndSendErrToClient(clientErr, conn, handler.metrics) + + // Remap error for external consumption. + err = newErrorf( + codeParamsRoutingFailed, "cluster %s-%d not found", clusterName, tenID.ToUint64()) break } + // Now actually dial the backend server. crdbConn, err = backendDial(backendStartupMsg, outgoingAddress, TLSConf) - // If we get a backend down error and are using the directory - report the - // error to the directory and retry the connection. - codeErr := (*codeError)(nil) - if err != nil && - errors.As(err, &codeErr) && - codeErr.code == codeBackendDown && - handler.directory != nil { + + // If we get a backend down error, retry the connection. + var codeErr *codeError + if err != nil && errors.As(err, &codeErr) && codeErr.code == codeBackendDown { + if handler.directory == nil { + // Don't retry unless directory is enabled (for now). + break + } + codeBackendDownErrs++ if backendDialErr.ShouldLog() { log.Ops.Errorf(ctx, @@ -270,6 +292,9 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error ) codeBackendDownErrs = 0 } + + // Report the failure to the directory so that it can refresh any + // stale information that may have caused the problem. err = handler.directory.ReportFailure(ctx, tenID, outgoingAddress) if err != nil { reportFailureErrs++ @@ -292,10 +317,12 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error return err } + // Set up the idle timeout monitor. crdbConn = idle.DisconnectOverlay(crdbConn, handler.IdleTimeout) defer func() { _ = crdbConn.Close() }() + // Perform user authentication. if err := authenticate(conn, crdbConn); err != nil { handler.metrics.updateForError(err) log.Ops.Errorf(ctx, "authenticate: %s", err) @@ -343,6 +370,8 @@ func (handler *proxyHandler) handle(ctx context.Context, proxyConn *conn) error log.Infof(ctx, "closing after %.2fs", timeutil.Since(connBegin).Seconds()) }() + // Copy all pgwire messages from frontend to backend connection until we + // encounter an error or shutdown signal. go func() { err := connectionCopy(crdbConn, conn) errConnectionCopy <- err @@ -381,8 +410,7 @@ func (handler *proxyHandler) outgoingAddress( return addr, nil } - // This doesn't verify the name part of the tenant and relies just on the - // int id. + // Lookup tenant in directory. addr, err := handler.directory.EnsureTenantIP(ctx, tenID, name) if err != nil { return "", err diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.go b/pkg/ccl/sqlproxyccl/tenant/directory.go index 7bf39df10ff8..def7a283f3ac 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -120,10 +120,12 @@ func NewDirectory( // processes for the tenant, then LookupTenantIPs will choose one of them (note // that currently there is always at most one SQL process per tenant). // -// If clusterName is non-empty, then an error is returned if no endpoints match -// the cluster name. This can be used to ensure that the incoming SQL connection -// "knows" some additional information about the tenant, such as the name of the -// cluster, before being allowed to connect. +// If clusterName is non-empty, then a GRPC NotFound error is returned if no +// endpoints match the cluster name. This can be used to ensure that the +// incoming SQL connection "knows" some additional information about the tenant, +// such as the name of the cluster, before being allowed to connect. Similarly, +// if the tenant does not exist (e.g. because it was deleted), EnsureTenantIP +// returns a GRPC NotFound error. func (d *Directory) EnsureTenantIP( ctx context.Context, tenantID roachpb.TenantID, clusterName string, ) (string, error) { @@ -135,13 +137,16 @@ func (d *Directory) EnsureTenantIP( // Check the cluster name matches, if specified. if clusterName != "" && clusterName != entry.ClusterName { + // Return a GRPC NotFound error. log.Errorf(ctx, "cluster name %s doesn't match expected %s", clusterName, entry.ClusterName) - return "", errors.New("not found") + return "", status.Errorf(codes.NotFound, + "cluster name %s doesn't match expected %s", clusterName, entry.ClusterName) } + ctx, _ = d.stopper.WithCancelOnQuiesce(ctx) ip, err := entry.ChooseEndpointIP(ctx, d.client, d.options.deterministic) if err != nil { - if s, ok := status.FromError(err); ok && s.Message() == "not found" { + if status.Code(err) == codes.NotFound { d.deleteEntry(tenantID) } } @@ -149,7 +154,9 @@ func (d *Directory) EnsureTenantIP( } // LookupTenantIPs returns the IP addresses for all available SQL processes for -// the given tenant. It returns an error if the tenant has not yet been created. +// the given tenant. It returns a GRPC NotFound error if the tenant does not +// exist (e.g. it has not yet been created) or if it has not yet been fetched +// into the directory's cache (LookupTenantIPs will never attempt to fetch it). // If no processes are available for the tenant, LookupTenantIPs will return the // empty set (unlike EnsureTenantIP). func (d *Directory) LookupTenantIPs( @@ -162,7 +169,8 @@ func (d *Directory) LookupTenantIPs( } if entry == nil { - return nil, errors.New("not found") + return nil, status.Errorf( + codes.NotFound, "tenant %d not in directory cache", tenantID.ToUint64()) } return entry.getEndpointIPs(), nil } @@ -325,7 +333,8 @@ func (d *Directory) watchEndpoints(ctx context.Context, stopper *stop.Stopper) e if grpcutil.IsContextCanceled(err) { break } - // This should never happen. + // This should only happen in case of a deleted tenant or a transient + // error during fetch of tenant metadata. log.Errorf(ctx, "ignoring error getting entry for tenant %d: %v", resp.TenantID, err) continue } @@ -349,6 +358,7 @@ func (d *Directory) watchEndpoints(ctx context.Context, stopper *stop.Stopper) e if err != nil { return err } + // Block until the initial endpoint watcher client stream is constructed. waitInit.Wait() return err diff --git a/pkg/ccl/sqlproxyccl/tenant/directory_test.go b/pkg/ccl/sqlproxyccl/tenant/directory_test.go index ccef757e4178..29fbf5432399 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_test.go @@ -25,9 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // To ensure tenant startup code is included. @@ -45,19 +46,19 @@ func TestDirectoryErrors(t *testing.T) { defer tc.Stopper().Stop(ctx) _, err := dir.LookupTenantIPs(ctx, roachpb.MakeTenantID(1000)) - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1000 not in directory cache") _, err = dir.LookupTenantIPs(ctx, roachpb.MakeTenantID(1001)) - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1001 not in directory cache") _, err = dir.LookupTenantIPs(ctx, roachpb.MakeTenantID(1002)) - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1002 not in directory cache") // Fail to find tenant that does not exist. _, err = dir.EnsureTenantIP(ctx, roachpb.MakeTenantID(1000), "") - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 1000 not found") // Fail to find tenant when cluster name doesn't match. _, err = dir.EnsureTenantIP(ctx, roachpb.MakeTenantID(tenantID), "unknown") - assert.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = cluster name unknown doesn't match expected tenant-cluster") // No-op when reporting failure for tenant that doesn't exit. require.NoError(t, dir.ReportFailure(ctx, roachpb.MakeTenantID(1000), "")) @@ -258,9 +259,9 @@ func TestDeleteTenant(t *testing.T) { // Now EnsureTenantIP should return an error and the directory should no // longer cache the tenant. _, err = dir.EnsureTenantIP(ctx, tenantID, "") - require.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 50 not found") ips, err = dir.LookupTenantIPs(ctx, tenantID) - require.Regexp(t, "not found", err) + require.EqualError(t, err, "rpc error: code = NotFound desc = tenant 50 not in directory cache") require.Nil(t, ips) } @@ -343,6 +344,10 @@ func startTenant( Stopper: tenantStopper, }) if err != nil { + // Remap tenant "not found" error to GRPC NotFound error. + if err.Error() == "not found" { + return nil, status.Errorf(codes.NotFound, "tenant %d not found", id) + } return nil, err } sqlAddr, err := net.ResolveTCPAddr("tcp", t.SQLAddr()) diff --git a/pkg/ccl/sqlproxyccl/tenant/entry.go b/pkg/ccl/sqlproxyccl/tenant/entry.go index c4cd7b9e43d3..0ecaa1eed18d 100644 --- a/pkg/ccl/sqlproxyccl/tenant/entry.go +++ b/pkg/ccl/sqlproxyccl/tenant/entry.go @@ -208,27 +208,16 @@ func (e *tenantEntry) ensureTenantEndpoint( return ips, nil } - // Get up-to-date count of endpoints for the tenant from the K8s server. - resp, err := client.ListEndpoints(ctx, &ListEndpointsRequest{TenantID: e.TenantID.ToUint64()}) - if err != nil { - return nil, err - } - for { // Check for context cancellation or timeout. if err = ctx.Err(); err != nil { return nil, err } - // Check if tenant needs to be resumed. - if len(resp.Endpoints) == 0 { - log.Infof(ctx, "resuming tenant %d", e.TenantID) - - if _, err := client.EnsureEndpoint( - ctx, &EnsureEndpointRequest{e.TenantID.ToUint64()}, - ); err != nil { - return nil, err - } + // Try to resume the tenant if not yet resumed. + _, err = client.EnsureEndpoint(ctx, &EnsureEndpointRequest{e.TenantID.ToUint64()}) + if err != nil { + return nil, err } // Get endpoint information for the newly resumed tenant. Except in rare @@ -240,6 +229,7 @@ func (e *tenantEntry) ensureTenantEndpoint( return nil, err } if len(ips) != 0 { + log.Infof(ctx, "resumed tenant %d", e.TenantID) break } diff --git a/pkg/cli/mt_test_directory.go b/pkg/cli/mt_test_directory.go index e6ad7e8c39c8..b35a2912b077 100644 --- a/pkg/cli/mt_test_directory.go +++ b/pkg/cli/mt_test_directory.go @@ -22,9 +22,10 @@ import ( var mtTestDirectorySvr = &cobra.Command{ Use: "test-directory", - Short: "Run a test directory service.", + Short: "run a test directory service", Long: ` -Run a test directory service. +Run a test directory service that starts and manages tenant SQL instances as +processes on the local machine. `, Args: cobra.NoArgs, RunE: MaybeDecorateGRPCError(runDirectorySvr), diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index ac50901b59f1..2e2f5282f948 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -840,7 +840,7 @@ func TestLint(t *testing.T) { ":!util/grpcutil/grpc_util_test.go", ":!server/testserver.go", ":!util/tracing/*_test.go", - ":!ccl/sqlproxyccl/tenant/test_directory_svr.go", + ":!ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go", ) if err != nil { t.Fatal(err)