Skip to content

Commit

Permalink
Merge #58742 #58894
Browse files Browse the repository at this point in the history
58742: sqlproxy: complete connection handshake r=spaskob a=spaskob

Currently the proxy receives the startup message from the server
but does not actually check if it is of type `pgproto3.ReadyForQuery`
which is signifies that the connection has been established and the
server can start serve queries. Because of this we cannot recognize
successful connections.

Release note: none.

58894: tree: fix serialization of null values in tuples with enums r=yuzefovich a=yuzefovich

Enum values need to be serialized in a special way - with the type
annotation containing an Oid type reference. Previously, NULL values
when used in a tuple with enum values had a different type annotation
which caused the remote nodes to hit an assertion about not being able
to resolve the type by name.

Fixes: #58889.

Release note (bug fix): Previously, CockroachDB could encounter an
internal error when executing queries with tuples containing null values
and enums in a distributed setting, and this is now fixed.

Co-authored-by: Spas Bojanov <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jan 14, 2021
3 parents b1f5a62 + 24890c0 + de2a466 commit a989dd4
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 74 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "sqlproxyccl",
srcs = [
"authentication.go",
"backend_dialer.go",
"error.go",
"errorcode_string.go",
Expand All @@ -29,6 +30,7 @@ go_library(
go_test(
name = "sqlproxyccl_test",
srcs = [
"authentication_test.go",
"frontend_admitter_test.go",
"idle_disconnect_connection_test.go",
"main_test.go",
Expand All @@ -42,7 +44,9 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/randutil",
Expand Down
77 changes: 77 additions & 0 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package sqlproxyccl

import (
"net"

"github.com/jackc/pgproto3/v2"
)

func authenticate(clientConn, crdbConn net.Conn) error {
fe := pgproto3.NewBackend(pgproto3.NewChunkReader(clientConn), clientConn)
be := pgproto3.NewFrontend(pgproto3.NewChunkReader(crdbConn), crdbConn)

// The auth step should require only a few back and forths so 20 iterations
// should be enough.
var i int
for ; i < 20; i++ {
// Read the server response and forward it to the client.
// TODO(spaskob): in verbose mode, log these messages.
backendMsg, err := be.Receive()
if err != nil {
return NewErrorf(CodeBackendReadFailed, "unable to receive message from backend: %v", err)
}

err = fe.Send(backendMsg)
if err != nil {
return NewErrorf(
CodeClientWriteFailed, "unable to send message %v to client: %v", backendMsg, err,
)
}

// Decide what to do based on the type of the server response.
switch tp := backendMsg.(type) {
case *pgproto3.ReadyForQuery:
// Server has authenticated the connection successfully and is ready to
// serve queries.
return nil
case *pgproto3.AuthenticationOk:
// Server has authenticated the connection; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered which signifies that server
// is ready to serve queries.
case *pgproto3.ParameterStatus:
// Server sent status message; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered.
case *pgproto3.ErrorResponse:
// Server has rejected the authentication response from the client and
// has closed the connection.
return NewErrorf(CodeAuthFailed, "authentication failed: %v", backendMsg)
case
*pgproto3.AuthenticationCleartextPassword,
*pgproto3.AuthenticationMD5Password,
*pgproto3.AuthenticationSASL:
// The backend is requesting the user to authenticate.
// Read the client response and forward it to server.
fntMsg, err := fe.Receive()
if err != nil {
return NewErrorf(CodeClientReadFailed, "unable to receive message from client: %v", err)
}
err = be.Send(fntMsg)
if err != nil {
return NewErrorf(
CodeBackendWriteFailed, "unable to send message %v to backend: %v", fntMsg, err,
)
}
default:
return NewErrorf(CodeBackendDisconnected, "received unexpected backend message type: %v", tp)
}
}
return NewErrorf(CodeBackendDisconnected, "authentication took more than %d iterations", i)
}
123 changes: 123 additions & 0 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package sqlproxyccl

import (
"net"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/jackc/pgproto3/v2"
"github.com/stretchr/testify/require"
)

func TestAuthenticateOK(t *testing.T) {
defer leaktest.AfterTest(t)()

cli, srv := net.Pipe()
be := pgproto3.NewBackend(pgproto3.NewChunkReader(srv), srv)
fe := pgproto3.NewFrontend(pgproto3.NewChunkReader(cli), cli)

go func() {
err := be.Send(&pgproto3.ReadyForQuery{})
require.NoError(t, err)
beMsg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.ReadyForQuery{})
}()

require.NoError(t, authenticate(srv, cli))
}

func TestAuthenticateClearText(t *testing.T) {
defer leaktest.AfterTest(t)()

cli, srv := net.Pipe()
be := pgproto3.NewBackend(pgproto3.NewChunkReader(srv), srv)
fe := pgproto3.NewFrontend(pgproto3.NewChunkReader(cli), cli)

go func() {
err := be.Send(&pgproto3.AuthenticationCleartextPassword{})
require.NoError(t, err)
beMsg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.AuthenticationCleartextPassword{})

err = fe.Send(&pgproto3.PasswordMessage{Password: "password"})
require.NoError(t, err)
feMsg, err := be.Receive()
require.NoError(t, err)
require.Equal(t, feMsg, &pgproto3.PasswordMessage{Password: "password"})

err = be.Send(&pgproto3.AuthenticationOk{})
require.NoError(t, err)
beMsg, err = fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.AuthenticationOk{})

err = be.Send(&pgproto3.ParameterStatus{Name: "Server Version", Value: "1.3"})
require.NoError(t, err)
beMsg, err = fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.ParameterStatus{Name: "Server Version", Value: "1.3"})

err = be.Send(&pgproto3.ReadyForQuery{})
require.NoError(t, err)
beMsg, err = fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.ReadyForQuery{})
}()

require.NoError(t, authenticate(srv, cli))
}

func TestAuthenticateError(t *testing.T) {
defer leaktest.AfterTest(t)()

cli, srv := net.Pipe()
be := pgproto3.NewBackend(pgproto3.NewChunkReader(srv), srv)
fe := pgproto3.NewFrontend(pgproto3.NewChunkReader(cli), cli)

go func() {
err := be.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "foo"})
require.NoError(t, err)
beMsg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.ErrorResponse{Severity: "FATAL", Code: "foo"})
}()

err := authenticate(srv, cli)
require.Error(t, err)
codeErr := (*CodeError)(nil)
require.True(t, errors.As(err, &codeErr))
require.Equal(t, CodeAuthFailed, codeErr.code)
}

func TestAuthenticateUnexpectedMessage(t *testing.T) {
defer leaktest.AfterTest(t)()

cli, srv := net.Pipe()
be := pgproto3.NewBackend(pgproto3.NewChunkReader(srv), srv)
fe := pgproto3.NewFrontend(pgproto3.NewChunkReader(cli), cli)

go func() {
err := be.Send(&pgproto3.BackendKeyData{})
require.NoError(t, err)
beMsg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.BackendKeyData{})
}()

err := authenticate(srv, cli)
require.Error(t, err)
codeErr := (*CodeError)(nil)
require.True(t, errors.As(err, &codeErr))
require.Equal(t, CodeBackendDisconnected, codeErr.code)
}
10 changes: 10 additions & 0 deletions pkg/ccl/sqlproxyccl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ type ErrorCode int

const (
_ ErrorCode = iota

// CodeAuthFailed indicates that client authentication attempt has failed and
// backend has closed the connection.
CodeAuthFailed

// CodeBackendReadFailed indicates an error reading from backend connection.
CodeBackendReadFailed
// CodeBackendWriteFailed indicates an error writing to backend connection.
CodeBackendWriteFailed

// CodeClientReadFailed indicates an error reading from the client connection
CodeClientReadFailed
// CodeClientWriteFailed indicates an error writing to the client connection.
Expand Down
33 changes: 18 additions & 15 deletions pkg/ccl/sqlproxyccl/errorcode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Metrics struct {
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
SuccessfulConnCount *metric.Counter
AuthFailedCount *metric.Counter
ExpiredClientConnCount *metric.Counter
}

Expand Down Expand Up @@ -78,6 +79,12 @@ var (
Measurement: "Successful Connections",
Unit: metric.Unit_COUNT,
}
metaAuthFailedCount = metric.Metadata{
Name: "proxy.sql.authentication_failures",
Help: "Number of authentication failures",
Measurement: "Authentication Failures",
Unit: metric.Unit_COUNT,
}
metaExpiredClientConnCount = metric.Metadata{
Name: "proxy.sql.expired_client_conns",
Help: "Number of expired client connections",
Expand All @@ -97,6 +104,7 @@ func MakeProxyMetrics() Metrics {
RoutingErrCount: metric.NewCounter(metaRoutingErrCount),
RefusedConnCount: metric.NewCounter(metaRefusedConnCount),
SuccessfulConnCount: metric.NewCounter(metaSuccessfulConnCount),
AuthFailedCount: metric.NewCounter(metaAuthFailedCount),
ExpiredClientConnCount: metric.NewCounter(metaExpiredClientConnCount),
}
}
9 changes: 9 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ func (s *Server) Proxy(proxyConn *Conn) error {
}
defer func() { _ = crdbConn.Close() }()

if err := authenticate(conn, crdbConn); err != nil {
s.metrics.AuthFailedCount.Inc(1)
if codeErr := (*CodeError)(nil); errors.As(err, &codeErr) {
sendErrToClient(conn, codeErr.code, codeErr.Error())
return err
}
return errors.AssertionFailedf("unrecognized auth failure")
}

s.metrics.SuccessfulConnCount.Inc(1)

// These channels are buffered because we'll only consume one of them.
Expand Down
Loading

0 comments on commit a989dd4

Please sign in to comment.