Skip to content

Commit

Permalink
Fix error on empty snapshot (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Jan 9, 2025
1 parent 53755a2 commit 9bea3c0
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
8 changes: 3 additions & 5 deletions source/snapshot/fetch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,9 @@ func (f *FetchWorker) updateSnapshotEnd(ctx context.Context, tx pgx.Tx) error {
return nil
}

if err := tx.QueryRow(
ctx,
fmt.Sprintf("SELECT max(%s) FROM %s", f.conf.Key, f.conf.Table),
).Scan(&f.snapshotEnd); err != nil {
return fmt.Errorf("failed to query max on %q.%q: %w", f.conf.Table, f.conf.Key, err)
query := fmt.Sprintf("SELECT COALESCE(max(%s), 0) FROM %s", f.conf.Key, f.conf.Table)
if err := tx.QueryRow(ctx, query).Scan(&f.snapshotEnd); err != nil {
return fmt.Errorf("failed to get snapshot end with query %q: %w", query, err)
}

return nil
Expand Down
50 changes: 42 additions & 8 deletions source/snapshot/fetch_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,40 @@ func Test_FetcherValidate(t *testing.T) {
})
}

func Test_FetcherRun_EmptySnapshot(t *testing.T) {
var (
is = is.New(t)
ctx = test.Context(t)
pool = test.ConnectPool(context.Background(), t, test.RegularConnString)
table = test.SetupEmptyTestTable(context.Background(), t, pool)
out = make(chan FetchData)
testTomb = &tomb.Tomb{}
)

f := NewFetchWorker(pool, out, FetchConfig{
Table: table,
Key: "id",
})

testTomb.Go(func() error {
ctx = testTomb.Context(ctx)
defer close(out)

if err := f.Validate(ctx); err != nil {
return err
}
return f.Run(ctx)
})

var gotFetchData []FetchData
for data := range out {
gotFetchData = append(gotFetchData, data)
}

is.NoErr(testTomb.Err())
is.True(len(gotFetchData) == 0)
}

func Test_FetcherRun_Initial(t *testing.T) {
var (
pool = test.ConnectPool(context.Background(), t, test.RegularConnString)
Expand All @@ -226,13 +260,13 @@ func Test_FetcherRun_Initial(t *testing.T) {
return f.Run(ctx)
})

var dd []FetchData
var gotFetchData []FetchData
for data := range out {
dd = append(dd, data)
gotFetchData = append(gotFetchData, data)
}

is.NoErr(tt.Err())
is.True(len(dd) == 4)
is.True(len(gotFetchData) == 4)

expectedMatch := []opencdc.StructuredData{
{"id": int64(1), "key": []uint8{49}, "column1": "foo", "column2": int32(123), "column3": false, "column4": 12.2, "column5": int64(4)},
Expand All @@ -241,17 +275,17 @@ func Test_FetcherRun_Initial(t *testing.T) {
{"id": int64(4), "key": []uint8{52}, "column1": nil, "column2": nil, "column3": nil, "column4": 91.1, "column5": nil},
}

for i, d := range dd {
for i, got := range gotFetchData {
t.Run(fmt.Sprintf("payload_%d", i+1), func(t *testing.T) {
is := is.New(t)
is.Equal(d.Key, opencdc.StructuredData{"id": int64(i + 1)})
is.Equal("", cmp.Diff(expectedMatch[i], d.Payload))
is.Equal(got.Key, opencdc.StructuredData{"id": int64(i + 1)})
is.Equal("", cmp.Diff(expectedMatch[i], got.Payload))

is.Equal(d.Position, position.SnapshotPosition{
is.Equal(got.Position, position.SnapshotPosition{
LastRead: int64(i + 1),
SnapshotEnd: 4,
})
is.Equal(d.Table, table)
is.Equal(got.Table, table)
})
}
}
Expand Down
14 changes: 11 additions & 3 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func ConnectSimple(ctx context.Context, t *testing.T, connString string) *pgx.Co
}

// SetupTestTable creates a new table and returns its name.
func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string {
func SetupEmptyTestTable(ctx context.Context, t *testing.T, conn Querier) string {
is := is.New(t)

table := RandomIdentifier(t)
Expand All @@ -189,14 +189,22 @@ func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string {
is.NoErr(err)
})

query = `
return table
}

// SetupTestTable creates a new table and returns its name.
func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string {
is := is.New(t)
table := SetupEmptyTestTable(ctx, t, conn)

query := `
INSERT INTO %s (key, column1, column2, column3, column4, column5)
VALUES ('1', 'foo', 123, false, 12.2, 4),
('2', 'bar', 456, true, 13.42, 8),
('3', 'baz', 789, false, null, 9),
('4', null, null, null, 91.1, null)`
query = fmt.Sprintf(query, table)
_, err = conn.Exec(ctx, query)
_, err := conn.Exec(ctx, query)
is.NoErr(err)

return table
Expand Down

0 comments on commit 9bea3c0

Please sign in to comment.