Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port: Support for Vitess server to send binlog events #317

Merged
merged 8 commits into from
Mar 12, 2024
90 changes: 90 additions & 0 deletions go/mysql/binlog_dump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2022 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysql

import (
"encoding/binary"
vtrpcpb "github.com/dolthub/vitess/go/vt/proto/vtrpc"
"github.com/dolthub/vitess/go/vt/vterrors"
"io"
)

var (
// BinglogMagicNumber is 4-byte number at the beginning of every binary log
BinglogMagicNumber = []byte{0xfe, 0x62, 0x69, 0x6e}
readPacketErr = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error reading BinlogDumpGTID packet")
)

const (
BinlogDumpNonBlock = 0x01
BinlogThroughPosition = 0x02
BinlogThroughGTID = 0x04
)

func (c *Conn) parseComBinlogDump(data []byte) (logFile string, binlogPos uint32, err error) {
pos := 1

binlogPos, pos, ok := readUint32(data, pos)
if !ok {
return logFile, binlogPos, readPacketErr
}

pos += 2 // flags
pos += 4 // server-id

logFile = string(data[pos:])
return logFile, binlogPos, nil
}

func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint64, position Position, err error) {
// see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
pos := 1

flags := binary.LittleEndian.Uint16(data[pos : pos+2])
pos += 2 // flags
pos += 4 // server-id

fileNameLen, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
}
logFile = string(data[pos : pos+int(fileNameLen)])
pos += int(fileNameLen)

logPos, pos, ok = readUint64(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
}

if flags&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
}
if flags&BinlogThroughGTID != 0 {
dataSize, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
}
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
position, err = DecodePosition(gtid)
if err != nil {
return logFile, logPos, position, err
}
}
}

return logFile, logPos, position, nil
}
3 changes: 3 additions & 0 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ type BinlogEvent interface {

// IsPseudo is for custom implementations of GTID.
IsPseudo() bool

// Bytes returns the binary representation of the event
Bytes() []byte
}

// BinlogFormat contains relevant data from the FORMAT_DESCRIPTION_EVENT.
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type filePosQueryEvent struct {
filePosFakeEvent
}

func (ev filePosQueryEvent) Bytes() []byte {
panic("not implemented")
}

func newFilePosQueryEvent(query string, ts uint32) filePosQueryEvent {
return filePosQueryEvent{
query: query,
Expand Down Expand Up @@ -221,6 +225,10 @@ type filePosGTIDEvent struct {
gtid filePosGTID
}

func (ev filePosGTIDEvent) Bytes() []byte {
panic("not implemented")
}

func newFilePosGTIDEvent(file string, pos int, timestamp uint32) filePosGTIDEvent {
return filePosGTIDEvent{
filePosFakeEvent: filePosFakeEvent{
Expand Down
92 changes: 65 additions & 27 deletions go/mysql/binlog_event_make.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package mysql

import (
"encoding/binary"
"hash/crc32"
)

const (
FlagLogEventArtificial = 0x20
)

// This file contains utility methods to create binlog replication
Expand Down Expand Up @@ -100,7 +105,12 @@ func (s *FakeBinlogStream) Packetize(f BinlogFormat, typ byte, flags uint16, dat
}

result := make([]byte, length)
binary.LittleEndian.PutUint32(result[0:4], s.Timestamp)
switch typ {
case eRotateEvent, eHeartbeatEvent:
// timestamp remains zero
default:
binary.LittleEndian.PutUint32(result[0:4], s.Timestamp)
}
result[4] = typ
binary.LittleEndian.PutUint32(result[5:9], s.ServerID)
binary.LittleEndian.PutUint32(result[9:13], uint32(length))
Expand All @@ -109,6 +119,13 @@ func (s *FakeBinlogStream) Packetize(f BinlogFormat, typ byte, flags uint16, dat
binary.LittleEndian.PutUint16(result[17:19], flags)
}
copy(result[f.HeaderLength:], data)

switch f.ChecksumAlgorithm {
case BinlogChecksumAlgCRC32:
checksum := crc32.ChecksumIEEE(result[0 : length-4])
binary.LittleEndian.PutUint32(result[length-4:], checksum)
}

return result
}

Expand Down Expand Up @@ -157,12 +174,38 @@ func NewRotateEvent(f BinlogFormat, s *FakeBinlogStream, position uint64, filena
len(filename)
data := make([]byte, length)
binary.LittleEndian.PutUint64(data[0:8], position)
copy(data[8:], filename)

ev := s.Packetize(f, eRotateEvent, 0, data)
ev[0] = 0
ev[1] = 0
ev[2] = 0
ev[3] = 0
return NewMysql56BinlogEvent(ev)
}

func NewFakeRotateEvent(f BinlogFormat, s *FakeBinlogStream, filename string) BinlogEvent {
length := 8 + // position
len(filename)
data := make([]byte, length)
binary.LittleEndian.PutUint64(data[0:8], 4)
copy(data[8:], filename)

ev := s.Packetize(f, eRotateEvent, FlagLogEventArtificial, data)
return NewMysql56BinlogEvent(ev)
}

// NewHeartbeatEvent returns a HeartbeatEvent.
// see https://dev.mysql.com/doc/internals/en/heartbeat-event.html
func NewHeartbeatEvent(f BinlogFormat, s *FakeBinlogStream) BinlogEvent {
ev := s.Packetize(f, eHeartbeatEvent, 0, []byte{})
return NewMysql56BinlogEvent(ev)
}

// NewHeartbeatEvent returns a HeartbeatEvent.
// see https://dev.mysql.com/doc/internals/en/heartbeat-event.html
func NewHeartbeatEventWithLogFile(f BinlogFormat, s *FakeBinlogStream, filename string) BinlogEvent {
length := len(filename)
data := make([]byte, length)
copy(data, filename)

ev := s.Packetize(f, eHeartbeatEvent, 0, data)
return NewMysql56BinlogEvent(ev)
}

Expand All @@ -172,7 +215,7 @@ func NewQueryEvent(f BinlogFormat, s *FakeBinlogStream, q Query) BinlogEvent {
if q.Charset != nil {
statusVarLength += 1 + 2 + 2 + 2
}
length := 4 + // slave proxy id
length := 4 + // proxy id
4 + // execution time
1 + // schema length
2 + // error code
Expand Down Expand Up @@ -296,9 +339,9 @@ func NewTableMapEvent(f BinlogFormat, s *FakeBinlogStream, tableID uint64, tm *T
1 + // table name length
len(tm.Name) +
1 + // [00]
1 + // column-count FIXME(alainjobart) len enc
lenEncIntSize(uint64(len(tm.Types))) + // column-count len enc
len(tm.Types) +
1 + // lenenc-str column-meta-def FIXME(alainjobart) len enc
lenEncIntSize(uint64(metadataLength)) + // lenenc-str column-meta-def
metadataLength +
len(tm.CanBeNull.data)
data := make([]byte, length)
Expand All @@ -320,15 +363,10 @@ func NewTableMapEvent(f BinlogFormat, s *FakeBinlogStream, tableID uint64, tm *T
data[pos] = 0
pos++

data[pos] = byte(len(tm.Types)) // FIXME(alainjobart) lenenc
pos++

pos = writeLenEncInt(data, pos, uint64(len(tm.Types)))
pos += copy(data[pos:], tm.Types)

// Per-column meta data. Starting with len-enc length.
// FIXME(alainjobart) lenenc
data[pos] = byte(metadataLength)
pos++
pos = writeLenEncInt(data, pos, uint64(metadataLength))
for c, typ := range tm.Types {
pos = metadataWrite(data, pos, typ, tm.Metadata[c])
}
Expand Down Expand Up @@ -366,10 +404,20 @@ func newRowsEvent(f BinlogFormat, s *FakeBinlogStream, typ byte, tableID uint64,
panic("Not implemented, post_header_length==6")
}

hasIdentify := typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2 ||
typ == eDeleteRowsEventV1 || typ == eDeleteRowsEventV2
hasData := typ == eWriteRowsEventV1 || typ == eWriteRowsEventV2 ||
typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2

rowLen := rows.DataColumns.Count()
if hasIdentify {
rowLen = rows.IdentifyColumns.Count()
}

length := 6 + // table id
2 + // flags
2 + // extra data length, no extra data.
1 + // num columns FIXME(alainjobart) len enc
lenEncIntSize(uint64(rowLen)) + // num columns
len(rows.IdentifyColumns.data) + // only > 0 for Update & Delete
len(rows.DataColumns.data) // only > 0 for Write & Update
for _, row := range rows.Rows {
Expand All @@ -380,11 +428,6 @@ func newRowsEvent(f BinlogFormat, s *FakeBinlogStream, typ byte, tableID uint64,
}
data := make([]byte, length)

hasIdentify := typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2 ||
typ == eDeleteRowsEventV1 || typ == eDeleteRowsEventV2
hasData := typ == eWriteRowsEventV1 || typ == eWriteRowsEventV2 ||
typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2

data[0] = byte(tableID)
data[1] = byte(tableID >> 8)
data[2] = byte(tableID >> 16)
Expand All @@ -396,12 +439,7 @@ func newRowsEvent(f BinlogFormat, s *FakeBinlogStream, typ byte, tableID uint64,
data[8] = 0x02
data[9] = 0x00

if hasIdentify {
data[10] = byte(rows.IdentifyColumns.Count()) // FIXME(alainjobart) len
} else {
data[10] = byte(rows.DataColumns.Count()) // FIXME(alainjobart) len
}
pos := 11
pos := writeLenEncInt(data, 10, uint64(rowLen))

if hasIdentify {
pos += copy(data[pos:], rows.IdentifyColumns.data)
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,4 @@ func TestMysql56ParsePosition(t *testing.T) {
if !got.Equal(want) {
t.Errorf("(&mysql56{}).ParsePosition(%#v) = %#v, want %#v", input, got, want)
}
}
}
43 changes: 43 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,19 @@ func (c *Conn) handleNextCommand(handler Handler) error {
c.writeErrorPacketFromError(err)
}

case ComBinlogDumpGTID:
ok := c.handleComBinlogDumpGTID(handler, data)
if !ok {
return fmt.Errorf("error handling ComBinlogDumpGTID packet: %v", data)
}
return nil

case ComRegisterReplica:
// TODO: Seems like we probably need this command implemented, too, but it hasn't been needed
// yet in a simple Vitess <-> Vitess replication test, so skipping for now.
//return c.handleComRegisterReplica(handler, data)
return fmt.Errorf("ComRegisterReplica not implemented")

default:
log.Errorf("Got unhandled packet (default) from %s, returning error: %v", c, data)
c.recycleReadPacket()
Expand All @@ -1371,6 +1384,36 @@ func (c *Conn) handleNextCommand(handler Handler) error {
return nil
}

func (c *Conn) handleComBinlogDumpGTID(handler Handler, data []byte) (kontinue bool) {
binlogReplicaHandler, ok := handler.(BinlogReplicaHandler)
if !ok {
log.Warningf("received BINLOG_DUMP_GTID command, but handler does not implement BinlogReplicaHandler")
return true
}

c.recycleReadPacket()
kontinue = true

c.startWriterBuffering()
defer func() {
if err := c.flush(); err != nil {
log.Errorf("conn %v: flush() failed: %v", c.ID(), err)
kontinue = false
}
}()

logFile, logPos, position, err := c.parseComBinlogDumpGTID(data)
if err != nil {
log.Errorf("conn %v: parseComBinlogDumpGTID failed: %v", c.ID(), err)
return false
}
if err := binlogReplicaHandler.ComBinlogDumpGTID(c, logFile, logPos, position.GTIDSet); err != nil {
log.Error(err.Error())
return false
}
return kontinue
}

// writeNumRows writes the specified number of rows to the handler, the end result, and flushes
func (c *Conn) writeNumRows(numRows int) (err error) {
origRows := c.cs.pending.Rows
Expand Down
3 changes: 3 additions & 0 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ const (
// ComBinlogDumpGTID is COM_BINLOG_DUMP_GTID.
ComBinlogDumpGTID = 0x1e

// ComRegisterReplica is COM_REGISTER_REPLICA
ComRegisterReplica = 0x15

// OKPacket is the header of the OK packet.
OKPacket = 0x00

Expand Down
28 changes: 28 additions & 0 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,31 @@ func (c *Conn) SemiSyncExtensionLoaded() bool {
}
return len(qr.Rows) >= 1
}

// WriteBinlogEvent writes a binlog event as part of a replication stream
// https://dev.mysql.com/doc/internals/en/binlog-network-stream.html
// https://dev.mysql.com/doc/internals/en/binlog-event.html
func (c *Conn) WriteBinlogEvent(ev BinlogEvent, semiSyncEnabled bool) error {
extraBytes := 1 // OK packet
if semiSyncEnabled {
extraBytes += 2
}

// NOTE: The latest Vitess code has changed startEphemeralPacket to startEphemeralPacketWithHeader,
// but we haven't ported that over yet, so instead, we use startEphemeralPacket and assign
// 0 to pos to indicate no header was included.
//data, pos := c.startEphemeralPacketWithHeader(len(ev.Bytes()) + extraBytes)

data, pos := c.startEphemeralPacket(len(ev.Bytes()) + extraBytes), 0
pos = writeByte(data, pos, 0) // "OK" prefix
if semiSyncEnabled {
pos = writeByte(data, pos, 0xef) // semi sync indicator
pos = writeByte(data, pos, 0) // no ack expected
}
_ = writeEOFString(data, pos, string(ev.Bytes()))
if err := c.writeEphemeralPacket(); err != nil {
return NewSQLError(CRServerGone, SSUnknownSQLState, "%v", err)
}
return nil
}

Loading
Loading