Skip to content

Commit

Permalink
fix: (*RowsEvent).handleUnsigned() panic (#856)
Browse files Browse the repository at this point in the history
* fix: (*RowsEvent).handleUnsigned()  panic

* add unit test: TestRowsEvent_handleUnsigned()

* import require
  • Loading branch information
equnchen authored Apr 11, 2024
1 parent 2199dfb commit f0df38a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
7 changes: 7 additions & 0 deletions canal/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func (r *RowsEvent) handleUnsigned() {

for i := 0; i < len(r.Rows); i++ {
for _, columnIdx := range r.Table.UnsignedColumns {
// When Canal.delay is big, after call Canal.StartFromGTID(),
// we will get the newest table schema (for example, after DDL "alter table add column xxx unsigned..."),
// but the binlog data can be very old (before DDL "alter table add column xxx unsigned..."),
// results in max(columnIdx) >= len(r.Rows[i]), then r.Rows[i][columnIdx] panic.
if columnIdx >= len(r.Rows[i]) {
continue
}
switch value := r.Rows[i][columnIdx].(type) {
case int8:
r.Rows[i][columnIdx] = uint8(value)
Expand Down
60 changes: 60 additions & 0 deletions canal/rows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package canal

import (
"testing"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/go-mysql-org/go-mysql/schema"
"github.com/stretchr/testify/require"
)

func TestRowsEvent_handleUnsigned(t *testing.T) {
type fields struct {
Table *schema.Table
Action string
Rows [][]interface{}
Header *replication.EventHeader
}
tests := []struct {
name string
fields fields
wantRows [][]interface{}
}{
{
name: "rows_event_handle_unsigned",
fields: fields{
Table: &schema.Table{
// columns 1,3,5,7,9 should be converted from signed to unsigned,
// column 10 is out of range and should be ignored, don't panic.
UnsignedColumns: []int{1, 3, 5, 7, 9, 10},
},
Rows: [][]interface{}{{
int8(8), int8(8),
int16(16), int16(16),
int32(32), int32(32),
int64(64), int64(64),
int(128), int(128)},
},
},
wantRows: [][]interface{}{{
int8(8), uint8(8),
int16(16), uint16(16),
int32(32), uint32(32),
int64(64), uint64(64),
int(128), uint(128)},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &RowsEvent{
Table: tt.fields.Table,
Action: tt.fields.Action,
Rows: tt.fields.Rows,
Header: tt.fields.Header,
}
r.handleUnsigned()
require.Equal(t, tt.fields.Rows, tt.wantRows)
})
}
}

0 comments on commit f0df38a

Please sign in to comment.