From e7d713b2f5a4958c68892493a1fd1d4fe9cc9928 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Tue, 7 Jun 2022 19:37:09 +0200 Subject: [PATCH] Support proxy protocol v2 in MySQL (#12424) (#12993) #11684 added support for proxy protocol v2 for SSH and Postgres but MySQL uses different code path and it was missing. This change fixes that. It also adds tests for v2 protocol support for MySQL, Postgres, Mongo and Redis (cherry picked from commit 17fc07337994e7fe3e7984671205c7fe164ed486) --- lib/multiplexer/proxyline.go | 33 ++++++++++ lib/multiplexer/testproxy.go | 10 ++- lib/multiplexer/wrappers.go | 11 +++- lib/srv/db/mysql/proxy.go | 2 +- lib/srv/db/proxy_test.go | 117 ++++++++++++++++++++--------------- 5 files changed, 119 insertions(+), 54 deletions(-) diff --git a/lib/multiplexer/proxyline.go b/lib/multiplexer/proxyline.go index 02d2c3068f0de..7d379056d5cbe 100644 --- a/lib/multiplexer/proxyline.go +++ b/lib/multiplexer/proxyline.go @@ -61,6 +61,39 @@ func (p *ProxyLine) String() string { return fmt.Sprintf("PROXY %s %s %s %d %d\r\n", p.Protocol, p.Source.IP.String(), p.Destination.IP.String(), p.Source.Port, p.Destination.Port) } +// Bytes returns on-the wire bytes representation of proxy line conforming to the proxy v2 protocol +func (p *ProxyLine) Bytes() []byte { + b := &bytes.Buffer{} + header := proxyV2Header{VersionCommand: (Version2 << 4) | ProxyCommand} + copy(header.Signature[:], proxyV2Prefix) + var addr interface{} + switch p.Protocol { + case TCP4: + header.Protocol = ProtocolTCP4 + addr4 := proxyV2Address4{ + SourcePort: uint16(p.Source.Port), + DestinationPort: uint16(p.Destination.Port), + } + copy(addr4.Source[:], p.Source.IP.To4()) + copy(addr4.Destination[:], p.Destination.IP.To4()) + addr = addr4 + case TCP6: + header.Protocol = ProtocolTCP6 + addr6 := proxyV2Address6{ + SourcePort: uint16(p.Source.Port), + DestinationPort: uint16(p.Destination.Port), + } + copy(addr6.Source[:], p.Source.IP.To16()) + copy(addr6.Destination[:], p.Destination.IP.To16()) + addr = addr6 + } + header.Length = uint16(binary.Size(addr)) + binary.Write(b, binary.BigEndian, header) + binary.Write(b, binary.BigEndian, addr) + + return b.Bytes() +} + // ReadProxyLine reads proxy line protocol from the reader func ReadProxyLine(reader *bufio.Reader) (*ProxyLine, error) { line, err := reader.ReadString('\n') diff --git a/lib/multiplexer/testproxy.go b/lib/multiplexer/testproxy.go index b9f06ffb9f402..f2df2d1e31b66 100644 --- a/lib/multiplexer/testproxy.go +++ b/lib/multiplexer/testproxy.go @@ -33,11 +33,12 @@ type TestProxy struct { target string closeCh chan (struct{}) log logrus.FieldLogger + v2 bool } // NewTestProxy creates a new test proxy that sends a proxy-line when // proxying connections to the provided target address. -func NewTestProxy(target string) (*TestProxy, error) { +func NewTestProxy(target string, v2 bool) (*TestProxy, error) { listener, err := net.Listen("tcp", "localhost:0") if err != nil { return nil, trace.Wrap(err) @@ -47,6 +48,7 @@ func NewTestProxy(target string) (*TestProxy, error) { target: target, closeCh: make(chan struct{}), log: logrus.WithField(trace.Component, "test:proxy"), + v2: v2, }, nil } @@ -128,7 +130,11 @@ func (p *TestProxy) sendProxyLine(clientConn, serverConn net.Conn) error { Destination: net.TCPAddr{IP: net.ParseIP(serverAddr.Host()), Port: serverAddr.Port(0)}, } p.log.Debugf("Sending %v to %v.", proxyLine.String(), serverConn.RemoteAddr().String()) - _, err = serverConn.Write([]byte(proxyLine.String())) + if p.v2 { + _, err = serverConn.Write(proxyLine.Bytes()) + } else { + _, err = serverConn.Write([]byte(proxyLine.String())) + } if err != nil { return trace.Wrap(err) } diff --git a/lib/multiplexer/wrappers.go b/lib/multiplexer/wrappers.go index 6bb1805f8b3fb..e10afce3dc984 100644 --- a/lib/multiplexer/wrappers.go +++ b/lib/multiplexer/wrappers.go @@ -84,7 +84,16 @@ func (c *Conn) Detect() (Protocol, error) { // ReadProxyLine reads proxy-line from the connection. func (c *Conn) ReadProxyLine() (*ProxyLine, error) { - proxyLine, err := ReadProxyLine(c.reader) + var proxyLine *ProxyLine + protocol, err := c.Detect() + if err != nil { + return nil, trace.Wrap(err) + } + if protocol == ProtoProxyV2 { + proxyLine, err = ReadProxyLineV2(c.reader) + } else { + proxyLine, err = ReadProxyLine(c.reader) + } if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/srv/db/mysql/proxy.go b/lib/srv/db/mysql/proxy.go index 2f7d5ddd965cc..d47e6798f5d06 100644 --- a/lib/srv/db/mysql/proxy.go +++ b/lib/srv/db/mysql/proxy.go @@ -212,7 +212,7 @@ func (p *Proxy) maybeReadProxyLine(conn *multiplexer.Conn) error { if err != nil { return trace.Wrap(err) } - if proto != multiplexer.ProtoProxy { + if proto != multiplexer.ProtoProxy && proto != multiplexer.ProtoProxyV2 { return nil } proxyLine, err := conn.ReadProxyLine() diff --git a/lib/srv/db/proxy_test.go b/lib/srv/db/proxy_test.go index 178ca1a87eda5..4522b60b0ba18 100644 --- a/lib/srv/db/proxy_test.go +++ b/lib/srv/db/proxy_test.go @@ -18,6 +18,7 @@ package db import ( "context" + "fmt" "testing" "time" @@ -40,17 +41,21 @@ func TestProxyProtocolPostgres(t *testing.T) { testCtx.createUserAndRole(ctx, t, "alice", "admin", []string{"postgres"}, []string{"postgres"}) - // Point our proxy to the Teleport's db listener on the multiplexer. - proxy, err := multiplexer.NewTestProxy(testCtx.mux.DB().Addr().String()) - require.NoError(t, err) - t.Cleanup(func() { proxy.Close() }) - go proxy.Serve() - - // Connect to the proxy instead of directly to Postgres listener and make - // sure the connection succeeds. - psql, err := testCtx.postgresClientWithAddr(ctx, proxy.Address(), "alice", "postgres", "postgres", "postgres") - require.NoError(t, err) - require.NoError(t, psql.Close(ctx)) + for _, v2 := range []bool{false, true} { + t.Run(fmt.Sprintf("v2=%v", v2), func(t *testing.T) { + // Point our proxy to the Teleport's db listener on the multiplexer. + proxy, err := multiplexer.NewTestProxy(testCtx.mux.DB().Addr().String(), v2) + require.NoError(t, err) + t.Cleanup(func() { proxy.Close() }) + go proxy.Serve() + + // Connect to the proxy instead of directly to Postgres listener and make + // sure the connection succeeds. + psql, err := testCtx.postgresClientWithAddr(ctx, proxy.Address(), "alice", "postgres", "postgres", "postgres") + require.NoError(t, err) + require.NoError(t, psql.Close(ctx)) + }) + } } // TestProxyProtocolMySQL ensures that clients can successfully connect to a @@ -63,17 +68,21 @@ func TestProxyProtocolMySQL(t *testing.T) { testCtx.createUserAndRole(ctx, t, "alice", "admin", []string{"root"}, []string{types.Wildcard}) - // Point our proxy to the Teleport's MySQL listener. - proxy, err := multiplexer.NewTestProxy(testCtx.mysqlListener.Addr().String()) - require.NoError(t, err) - t.Cleanup(func() { proxy.Close() }) - go proxy.Serve() - - // Connect to the proxy instead of directly to MySQL listener and make - // sure the connection succeeds. - mysql, err := testCtx.mysqlClientWithAddr(proxy.Address(), "alice", "mysql", "root") - require.NoError(t, err) - require.NoError(t, mysql.Close()) + for _, v2 := range []bool{false, true} { + t.Run(fmt.Sprintf("v2=%v", v2), func(t *testing.T) { + // Point our proxy to the Teleport's MySQL listener. + proxy, err := multiplexer.NewTestProxy(testCtx.mysqlListener.Addr().String(), v2) + require.NoError(t, err) + t.Cleanup(func() { proxy.Close() }) + go proxy.Serve() + + // Connect to the proxy instead of directly to MySQL listener and make + // sure the connection succeeds. + mysql, err := testCtx.mysqlClientWithAddr(proxy.Address(), "alice", "mysql", "root") + require.NoError(t, err) + require.NoError(t, mysql.Close()) + }) + } } // TestProxyProtocolMongo ensures that clients can successfully connect to a @@ -86,17 +95,21 @@ func TestProxyProtocolMongo(t *testing.T) { testCtx.createUserAndRole(ctx, t, "alice", "admin", []string{"admin"}, []string{types.Wildcard}) - // Point our proxy to the Teleport's TLS listener. - proxy, err := multiplexer.NewTestProxy(testCtx.webListener.Addr().String()) - require.NoError(t, err) - t.Cleanup(func() { proxy.Close() }) - go proxy.Serve() - - // Connect to the proxy instead of directly to Teleport listener and make - // sure the connection succeeds. - mongo, err := testCtx.mongoClientWithAddr(ctx, proxy.Address(), "alice", "mongo", "admin") - require.NoError(t, err) - require.NoError(t, mongo.Disconnect(ctx)) + for _, v2 := range []bool{false, true} { + t.Run(fmt.Sprintf("v2=%v", v2), func(t *testing.T) { + // Point our proxy to the Teleport's TLS listener. + proxy, err := multiplexer.NewTestProxy(testCtx.webListener.Addr().String(), v2) + require.NoError(t, err) + t.Cleanup(func() { proxy.Close() }) + go proxy.Serve() + + // Connect to the proxy instead of directly to Teleport listener and make + // sure the connection succeeds. + mongo, err := testCtx.mongoClientWithAddr(ctx, proxy.Address(), "alice", "mongo", "admin") + require.NoError(t, err) + require.NoError(t, mongo.Disconnect(ctx)) + }) + } } func TestProxyProtocolRedis(t *testing.T) { @@ -106,23 +119,27 @@ func TestProxyProtocolRedis(t *testing.T) { testCtx.createUserAndRole(ctx, t, "alice", "admin", []string{"admin"}, []string{types.Wildcard}) - // Point our proxy to the Teleport's TLS listener. - proxy, err := multiplexer.NewTestProxy(testCtx.webListener.Addr().String()) - require.NoError(t, err) - t.Cleanup(func() { proxy.Close() }) - go proxy.Serve() - - // Connect to the proxy instead of directly to Teleport listener and make - // sure the connection succeeds. - redisClient, err := testCtx.redisClientWithAddr(ctx, proxy.Address(), "alice", "redis", "admin") - require.NoError(t, err) - - // Send ECHO to Redis server and check if we get it back. - resp := redisClient.Echo(ctx, "hello") - require.NoError(t, resp.Err()) - require.Equal(t, "hello", resp.Val()) - - require.NoError(t, redisClient.Close()) + for _, v2 := range []bool{false, true} { + t.Run(fmt.Sprintf("v2=%v", v2), func(t *testing.T) { + // Point our proxy to the Teleport's TLS listener. + proxy, err := multiplexer.NewTestProxy(testCtx.webListener.Addr().String(), v2) + require.NoError(t, err) + t.Cleanup(func() { proxy.Close() }) + go proxy.Serve() + + // Connect to the proxy instead of directly to Teleport listener and make + // sure the connection succeeds. + redisClient, err := testCtx.redisClientWithAddr(ctx, proxy.Address(), "alice", "redis", "admin") + require.NoError(t, err) + + // Send ECHO to Redis server and check if we get it back. + resp := redisClient.Echo(ctx, "hello") + require.NoError(t, resp.Err()) + require.Equal(t, "hello", resp.Val()) + + require.NoError(t, redisClient.Close()) + }) + } } // TestProxyClientDisconnectDueToIdleConnection ensures that idle clients will be disconnected.