From dd8b1f99ca4f6bcf9a394e0639d9b74588d70cef Mon Sep 17 00:00:00 2001 From: jiahua Date: Wed, 5 Mar 2025 13:58:10 +0800 Subject: [PATCH 1/2] feat: RowsEvent include NextPos(include name) --- canal/dump.go | 2 +- canal/rows.go | 6 +++++- canal/sync.go | 6 +++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/canal/dump.go b/canal/dump.go index 91b303976..58973feb9 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -110,7 +110,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error } } - events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil) + events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil, nil) return h.c.eventHandler.OnRow(events) } diff --git a/canal/rows.go b/canal/rows.go index 9f618a9fb..fc0947690 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -2,6 +2,7 @@ package canal import ( "fmt" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" @@ -26,15 +27,18 @@ type RowsEvent struct { Rows [][]interface{} // Header can be used to inspect the event Header *replication.EventHeader + // NextPos including binlog name + NextPos *mysql.Position } -func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader) *RowsEvent { +func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader, nextPos *mysql.Position) *RowsEvent { e := new(RowsEvent) e.Table = table e.Action = action e.Rows = rows e.Header = header + e.NextPos = nextPos e.handleUnsigned() diff --git a/canal/sync.go b/canal/sync.go index 6e4e538c3..f81484c18 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -101,7 +101,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { } case *replication.RowsEvent: // we only focus row based event - err = c.handleRowsEvent(ev) + err = c.handleRowsEvent(ev, pos) if err != nil { c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) return errors.Trace(err) @@ -262,7 +262,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) { atomic.StoreUint32(c.delay, newDelay) } -func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { +func (c *Canal) handleRowsEvent(e *replication.BinlogEvent, nextPos mysql.Position) error { ev := e.Event.(*replication.RowsEvent) // Caveat: table may be altered at runtime. @@ -290,7 +290,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { default: return errors.Errorf("%s not supported now", e.Header.EventType) } - events := newRowsEvent(t, action, ev.Rows, e.Header) + events := newRowsEvent(t, action, ev.Rows, e.Header, &nextPos) return c.eventHandler.OnRow(events) } From 2d5ba49d77a904247775c67886b4a70df4fa1cd8 Mon Sep 17 00:00:00 2001 From: jiahua Date: Thu, 6 Mar 2025 09:18:54 +0800 Subject: [PATCH 2/2] fix: format with gofumpt --- canal/rows.go | 1 + 1 file changed, 1 insertion(+) diff --git a/canal/rows.go b/canal/rows.go index fc0947690..08d545865 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -2,6 +2,7 @@ package canal import ( "fmt" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication"