Skip to content

Commit

Permalink
test: Remove e2e tests for BETWEEN and IN
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Feb 19, 2025
1 parent bb8c5bc commit 2a71721
Showing 1 changed file with 0 additions and 202 deletions.
202 changes: 0 additions & 202 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,208 +1191,6 @@ func TestVStreamPushdownFilters(t *testing.T) {
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
}

// TestVStreamPushdownBetweenFilter is same as TestVStreamPushdownFilters, but it tests `BETWEEN`.
func TestVStreamPushdownBetweenFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
setSidecarDBName("_vt")
config := *mainClusterConfig
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()
require.NotNil(t, vc)
ks := "product"
shard := "0"
defaultCell := vc.Cells[vc.CellNames[0]]

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)
insertInitialData(t)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()

_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name between 'a' and 'c'", ks), 1, false)
require.NoError(t, err)
require.Len(t, res.Rows, 1)
startingRowCount, err := res.Rows[0][0].ToInt()
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
createdBetweenRows := startingRowCount
createdNonBetweenRows := 0
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
if id%10 == 0 {
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
createdBetweenRows += 3
} else {
insertRow(ks, "customer", id)
createdNonBetweenRows++
}
time.Sleep(10 * time.Millisecond)
id++
}
}
}()

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer where name between 'a' and 'c'",
}},
}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

copyPhaseRowEvents, runningPhaseRowEvents := runVStreamAndGetNumOfRowEvents(t, ctx, vstreamConn, vgtid, filter, done)

require.NotZero(t, createdBetweenRows)
require.NotZero(t, createdNonBetweenRows)
require.Greater(t, createdNonBetweenRows, createdBetweenRows)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

require.Equal(t, createdBetweenRows, copyPhaseRowEvents+runningPhaseRowEvents)
}

// TestVStreamPushdownBetweenFilter is same as TestVStreamPushdownFilters, but it tests `IN`.
func TestVStreamPushdownInFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
setSidecarDBName("_vt")
config := *mainClusterConfig
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()
require.NotNil(t, vc)
ks := "product"
shard := "0"
defaultCell := vc.Cells[vc.CellNames[0]]

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)
insertInitialData(t)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()

_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name in ('a', 'b', 'c')", ks), 1, false)
require.NoError(t, err)
require.Len(t, res.Rows, 1)
startingRowCount, err := res.Rows[0][0].ToInt()
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
createdInRows := startingRowCount
createdNonInRows := 0
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
if id%10 == 0 {
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
createdInRows += 3
} else {
insertRow(ks, "customer", id)
createdNonInRows++
}
time.Sleep(10 * time.Millisecond)
id++
}
}
}()

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer where name in ('a', 'b', 'c')",
}},
}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

copyPhaseRowEvents, runningPhaseRowEvents := runVStreamAndGetNumOfRowEvents(t, ctx, vstreamConn, vgtid, filter, done)

require.NotZero(t, createdInRows)
require.NotZero(t, createdNonInRows)
require.Greater(t, createdNonInRows, createdInRows)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

require.Equal(t, createdInRows, copyPhaseRowEvents+runningPhaseRowEvents)
}

// runVStreamAndGetNumOfRowEvents runs VStream with the specified filter and
// returns number of copy phase and running phase row events.
func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamConn *vtgateconn.VTGateConn,
Expand Down

0 comments on commit 2a71721

Please sign in to comment.