Skip to content

Commit

Permalink
Add (TCP) passthrough MySQL processor (#1211)
Browse files Browse the repository at this point in the history
This PR adds a new processor and frontend/backend connectors that allows
Quesma to act as a MySQL proxy.

Added components:
- `TcpMysqlConnectionHandler` - splits incoming TCP stream into MySQL
wire protocol packets
- `TcpPassthroughMysqlProcessor` - reads incoming MySQL packets,
forwards them to a real MySQL server (via `TcpBackendConnector`) and
forwards back any responses from the real MySQL server to the connected
client.
- `TcpBackendConnector` - a raw TCP connection to some server (in this
case MySQL server)

This is a very early experimental version. In particular it has (at
least) these limitations:
- No support for encrypted MySQL traffic
- Manually parses MySQL packets - we should probably use Vitess for that
- Stores the connection inside metadata, not some `Session` abstraction
- Doesn't support asynchronous packets from MySQL server - we read from
the real MySQL server only after receiving some client request
- Reading from real MySQL server is very naive - it tries to read as
much as possible in a loop with 1s deadline. When 1s deadline passes, it
assumes that MySQL server sent everything
- TCP connection to real MySQL server is not closed properly (leak)

You can try the experiment by setting `mysql_passthrough_experiment` to
`true` in `main.go`. It assumes that MySQL is running at
`localhost:3306` and it will expose Quesma at `localhost:13306`. You can
connect to it with `mysql` CLI tool:
```
docker run --name some-mysql -e MYSQL_ROOT_PASSWORD=my-secret-pw -p 3306:3306 -d mysql:latest

# Start Quesma (mysql_passthrough_experiment = true in code)

mysql --skip-ssl -h localhost -P 13306 -u root -p
```

---------

Signed-off-by: Piotr Grabowski <[email protected]>
Co-authored-by: Jacek Migdal <[email protected]>
  • Loading branch information
avelanarius and jakozaur authored Jan 24, 2025
1 parent 2661385 commit c21d75d
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 0 deletions.
114 changes: 114 additions & 0 deletions quesma/backend_connectors/tcp_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package backend_connectors

import (
"bytes"
"context"
"errors"
"fmt"
"log"
"net"
"os"
"time"

quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
)

type TcpBackendConnector struct {
addr string
}

func NewTcpBackendConnector(addr string) (*TcpBackendConnector, error) {
return &TcpBackendConnector{
addr: addr,
}, nil
}

func (t TcpBackendConnector) InstanceName() string {
return "TcpBackendConnector"
}

func (t TcpBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.TcpBackend
}

func (t TcpBackendConnector) Open() error {
return nil
}

// FIXME: those functions below are only relevant for SQL connectors, we could potentially remove them from the BackendConnector interface

func (t TcpBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
return nil, fmt.Errorf("query is not available in TcpBackendConnector")
}

func (t TcpBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
log.Fatal("QueryRow is not available in TcpBackendConnector")
return nil
}

func (t TcpBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
return fmt.Errorf("exec is not available in TcpBackendConnector")
}

func (t TcpBackendConnector) Stats() quesma_api.DBStats {
log.Fatal("QueryRow is not available in TcpBackendConnector")
return quesma_api.DBStats{}
}

func (t TcpBackendConnector) Close() error {
return nil
}

func (t TcpBackendConnector) Ping() error {
return nil
}

func (t TcpBackendConnector) NewConnection() (net.Conn, error) {
return net.Dial("tcp", t.addr)
}

// FIXME: those functions below are just TCP net.Conn helpers, not actually a part of the backend connector logic

func ConnWrite(conn net.Conn, data []byte) error {
if conn == nil {
return fmt.Errorf("connection is nil")
}

n, err := conn.Write(data)
if err != nil {
return err
}

if n != len(data) {
return fmt.Errorf("short write: wrote %d bytes but expected to write %d bytes", n, len(data))
}
return nil
}

func ConnRead(conn net.Conn, n int) ([]byte, error) {
if conn == nil {
return nil, fmt.Errorf("connection is nil")
}

var result bytes.Buffer

err := conn.SetReadDeadline(time.Now().Add(1000 * time.Millisecond))
if err != nil {
return result.Bytes(), err
}

tmp := make([]byte, n)
n, err = conn.Read(tmp)
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
return result.Bytes(), nil
}
return result.Bytes(), err
}

result.Write(tmp[:n])
return result.Bytes(), nil
}
4 changes: 4 additions & 0 deletions quesma/frontend_connectors/basic_tcp_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (t *TCPListener) GetEndpoint() string {
return t.Endpoint
}

func (t *TCPListener) InstanceName() string {
return "TCPListener"
}

func (t *TCPListener) AddConnectionHandler(handler quesma_api.TCPConnectionHandler) {
t.handler = handler
}
Expand Down
108 changes: 108 additions & 0 deletions quesma/frontend_connectors/tcp_mysql_connection_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
// Experimental alpha frontend for MySQL protocol

package frontend_connectors

import (
"encoding/binary"
"fmt"
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"io"
"net"
)

type TcpMysqlConnectionHandler struct {
processors []quesma_api.Processor
}

var ErrInvalidPacket = fmt.Errorf("invalid packet")

func ReadMysqlPacket(conn net.Conn) ([]byte, error) {
// MySQL wire protocol packet format (see https://dev.mysql.com/doc/dev/mysql-server/8.4.3/PAGE_PROTOCOL.html):
// - 3 bytes: length of the packet (= LEN)
// - 1 byte: sequence ID
// - LEN bytes: packet body
//
// TODO: when packet is larger than 16MB, it's split into multiple packets. This code does NOT support this case yet.

packetLengthBytes, err := backend_connectors.ConnRead(conn, 3)
if err == io.EOF {
return nil, err
}
if err != nil || len(packetLengthBytes) != 3 {
return nil, ErrInvalidPacket
}
packetLength := int(binary.LittleEndian.Uint32(append(packetLengthBytes, 0)))

sequenceId, err := backend_connectors.ConnRead(conn, 1)
if err == io.EOF {
return nil, err
}
if err != nil || len(sequenceId) != 1 {
return nil, ErrInvalidPacket
}

body, err := backend_connectors.ConnRead(conn, packetLength)
if err == io.EOF {
return nil, err
}
if err != nil || len(body) != packetLength {
return nil, ErrInvalidPacket
}

fullPacketBytes := packetLengthBytes
fullPacketBytes = append(fullPacketBytes, sequenceId...)
fullPacketBytes = append(fullPacketBytes, body...)

return fullPacketBytes, nil
}

func (p *TcpMysqlConnectionHandler) HandleConnection(conn net.Conn) error {
dispatcher := quesma_api.Dispatcher{}
metadata := make(map[string]interface{})

// When you connect to MySQL, the server sends a greeting packet.
// Therefore, we dispatch a dummy nil message to the processor for it to be able to try to receive that initial packet
// (from its TCP backend connector).
{
var message any

metadata, message = dispatcher.Dispatch(p.processors, metadata, nil)
if message != nil {
_, err := conn.Write(message.([]byte))
if err != nil {
return fmt.Errorf("error sending response: %w", err)
}
}
}

for {
var message any

fullPacketBytes, err := ReadMysqlPacket(conn)
if err == io.EOF {
break
}
if err != nil {
continue
}

message = fullPacketBytes

metadata, message = dispatcher.Dispatch(p.processors, metadata, message)
if message != nil {
_, err = conn.Write(message.([]byte))
if err != nil {
return fmt.Errorf("error sending response: %w", err)
}
}
}

return nil
}

func (h *TcpMysqlConnectionHandler) SetHandlers(processors []quesma_api.Processor) {
h.processors = processors
}
36 changes: 36 additions & 0 deletions quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,26 @@ import (
"fmt"
"github.com/QuesmaOrg/quesma/quesma/ab_testing"
"github.com/QuesmaOrg/quesma/quesma/ab_testing/sender"
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
"github.com/QuesmaOrg/quesma/quesma/buildinfo"
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
"github.com/QuesmaOrg/quesma/quesma/common_table"
"github.com/QuesmaOrg/quesma/quesma/connectors"
"github.com/QuesmaOrg/quesma/quesma/elasticsearch"
"github.com/QuesmaOrg/quesma/quesma/elasticsearch/feature"
"github.com/QuesmaOrg/quesma/quesma/frontend_connectors"
"github.com/QuesmaOrg/quesma/quesma/ingest"
"github.com/QuesmaOrg/quesma/quesma/licensing"
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/persistence"
"github.com/QuesmaOrg/quesma/quesma/processors"
"github.com/QuesmaOrg/quesma/quesma/quesma"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/quesma/ui"
"github.com/QuesmaOrg/quesma/quesma/schema"
"github.com/QuesmaOrg/quesma/quesma/table_resolver"
"github.com/QuesmaOrg/quesma/quesma/telemetry"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -53,6 +57,13 @@ const EnableConcurrencyProfiling = false
//}

func main() {
// TODO: Experimental feature, move to the configuration after architecture v2
const mysql_passthrough_experiment = false
if mysql_passthrough_experiment {
launchMysqlPassthrough()
return
}

if EnableConcurrencyProfiling {
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
Expand Down Expand Up @@ -148,6 +159,31 @@ func main() {

}

func launchMysqlPassthrough() {
var frontendConn = frontend_connectors.NewTCPConnector(":13306")
var tcpProcessor quesma_api.Processor = processors.NewTcpMysqlPassthroughProcessor()
var tcpPostgressHandler = frontend_connectors.TcpMysqlConnectionHandler{}
frontendConn.AddConnectionHandler(&tcpPostgressHandler)
var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
postgressPipeline.AddProcessor(tcpProcessor)
postgressPipeline.AddFrontendConnector(frontendConn)
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())
backendConn, err := backend_connectors.NewTcpBackendConnector("localhost:3306")
if err != nil {
panic(err)
}
postgressPipeline.AddBackendConnector(backendConn)
quesmaBuilder.AddPipeline(postgressPipeline)
qb, err := quesmaBuilder.Build()
if err != nil {
panic(err)
}
qb.Start()
stop := make(chan os.Signal, 1)
<-stop
qb.Stop(context.Background())
}

func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender, indexRegistry table_resolver.TableResolver) *quesma.Quesma {
if cfg.TransparentProxy {
return quesma.NewQuesmaTcpProxy(cfg, quesmaManagementConsole, logChan, false)
Expand Down
Loading

0 comments on commit c21d75d

Please sign in to comment.