Skip to content

Commit

Permalink
Fix and Make aggregation planner handle aggregation functions better (#…
Browse files Browse the repository at this point in the history
…13228)

* handle unpushed aggregation better

Signed-off-by: Andres Taylor <[email protected]>

* distinct on aggregator and changed distinct engine primitive to take offset than truncate bool

Signed-off-by: Harshit Gangal <[email protected]>

* handle SUM with the new operator horizon planning

Signed-off-by: Andres Taylor <[email protected]>

* empty t10 after running tests

Signed-off-by: Andres Taylor <[email protected]>

* add collations and weight_string for aggregations where engine primitive is supported

Signed-off-by: Harshit Gangal <[email protected]>

* compare columns using semantic equality

Signed-off-by: Andres Taylor <[email protected]>

* handle grouping expressions that are returned in multiple columns

Signed-off-by: Andres Taylor <[email protected]>

* add end2end test to show that query works

Signed-off-by: Andres Taylor <[email protected]>

* on aggregate count and sum splittling create new aggr and update the column offset based on where it is pushed

Signed-off-by: Harshit Gangal <[email protected]>

* add waitForAuthoritative for last insert id test

Signed-off-by: Harshit Gangal <[email protected]>

* fix: push nil for min max cases for other side

Signed-off-by: Harshit Gangal <[email protected]>

* saves vtgate startup time as most of the test only have primary

Signed-off-by: Harshit Gangal <[email protected]>

* allow min/max without weight_string, projection to be created always on aggregation pushing with join, compact done later

Signed-off-by: Harshit Gangal <[email protected]>

---------

Signed-off-by: Andres Taylor <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
  • Loading branch information
harshit-gangal and systay authored Jun 8, 2023
1 parent fae1e38 commit bb26a7d
Show file tree
Hide file tree
Showing 29 changed files with 1,253 additions and 726 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess {
cluster.Cell,
cluster.Cell,
cluster.Hostname,
"PRIMARY,REPLICA",
"PRIMARY",
cluster.TopoProcess.Port,
cluster.TmpDirectory,
cluster.VtGateExtraArgs,
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,15 @@ func AssertMatchesWithTimeout(t *testing.T, conn *mysql.Conn, query, expected st
}

// WaitForAuthoritative waits for a table to become authoritative
func WaitForAuthoritative(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) error {
func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*interface{}, error)) error {
timeout := time.After(10 * time.Second)
for {
select {
case <-timeout:
return fmt.Errorf("schema tracking didn't mark table t2 as authoritative until timeout")
default:
time.Sleep(1 * time.Second)
res, err := vtgateProcess.ReadVSchema()
res, err := readVSchema()
require.NoError(t, err, res)
t2Map := getTableT2Map(res, ks, tbl)
authoritative, fieldPresent := t2Map["column_list_authoritative"]
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vtgate/gen4/gen4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,9 @@ func TestOuterJoin(t *testing.T) {
}

func TestUsingJoin(t *testing.T) {
require.NoError(t, utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, shardedKs, "t1"))
require.NoError(t, utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, shardedKs, "t2"))
require.NoError(t, utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, shardedKs, "t3"))
require.NoError(t, utils.WaitForAuthoritative(t, shardedKs, "t1", clusterInstance.VtgateProcess.ReadVSchema))
require.NoError(t, utils.WaitForAuthoritative(t, shardedKs, "t2", clusterInstance.VtgateProcess.ReadVSchema))
require.NoError(t, utils.WaitForAuthoritative(t, shardedKs, "t3", clusterInstance.VtgateProcess.ReadVSchema))

mcmp, closer := start(t)
defer closer()
Expand Down
30 changes: 29 additions & 1 deletion go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
deleteAll := func() {
_, _ = utils.ExecAllowError(t, mcmp.VtConn, "set workload = oltp")

tables := []string{"t9", "aggr_test", "t3", "t7_xxhash", "aggr_test_dates", "t7_xxhash_idx", "t1", "t2"}
tables := []string{"t9", "aggr_test", "t3", "t7_xxhash", "aggr_test_dates", "t7_xxhash_idx", "t1", "t2", "t10"}
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete from " + table)
}
Expand Down Expand Up @@ -424,3 +424,31 @@ func TestAggregationRandomOnAnAggregatedValue(t *testing.T) {
mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=gen4 */ A.a, A.b, (A.a / A.b) as d from (select sum(a) as a, sum(b) as b from t10 where a = 100) A;",
`[[DECIMAL(100) DECIMAL(10) DECIMAL(10.0000)]]`)
}

func TestBuggyQueries(t *testing.T) {
// These queries have been found to be producing the wrong results by the query fuzzer
// Adding them as end2end tests to make sure we never get them wrong again
mcmp, closer := start(t)
defer closer()

mcmp.Exec("insert into t10(k, a, b) values (0, 100, 10), (10, 200, 20), (20, null, null)")

mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ sum(t1.a) from t10 as t1, t10 as t2",
`[[DECIMAL(900)]]`)

mcmp.AssertMatches("select /*vt+ PLANNER=gen4 */t1.a, sum(t1.a), count(*), t1.a, sum(t1.a), count(*) from t10 as t1, t10 as t2 group by t1.a",
"[[NULL NULL INT64(3) NULL NULL INT64(3)] "+
"[INT32(100) DECIMAL(300) INT64(3) INT32(100) DECIMAL(300) INT64(3)] "+
"[INT32(200) DECIMAL(600) INT64(3) INT32(200) DECIMAL(600) INT64(3)]]")
}

func TestMinMaxAcrossJoins(t *testing.T) {
mcmp, closer := start(t)
defer closer()
mcmp.Exec("insert into t1(t1_id, name, value, shardKey) values (1, 'name 1', 'value 1', 1), (2, 'name 2', 'value 2', 2)")
mcmp.Exec("insert into t2(id, shardKey) values (1, 10), (2, 20)")

mcmp.AssertMatchesNoOrder(
`SELECT /*vt+ PLANNER=gen4 */ t1.name, max(t1.shardKey), t2.shardKey, min(t2.id) FROM t1 JOIN t2 ON t1.t1_id != t2.shardKey GROUP BY t1.name, t2.shardKey`,
`[[VARCHAR("name 2") INT64(2) INT64(10) INT64(1)] [VARCHAR("name 1") INT64(1) INT64(10) INT64(1)] [VARCHAR("name 2") INT64(2) INT64(20) INT64(2)] [VARCHAR("name 1") INT64(1) INT64(20) INT64(2)]]`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ func TestDistinctIt(t *testing.T) {
mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ distinct val1 from aggr_test order by val1 desc", `[[VARCHAR("e")] [VARCHAR("d")] [VARCHAR("c")] [VARCHAR("b")] [VARCHAR("a")]]`)
mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=Gen4 */ distinct val1, count(*) from aggr_test group by val1", `[[VARCHAR("a") INT64(2)] [VARCHAR("b") INT64(1)] [VARCHAR("c") INT64(2)] [VARCHAR("d") INT64(1)] [VARCHAR("e") INT64(2)]]`)
mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=Gen4 */ distinct val1+val2 from aggr_test", `[[NULL] [FLOAT64(1)] [FLOAT64(3)] [FLOAT64(4)]]`)
mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=Gen4 */ distinct count(*) from aggr_test group by val1", `[[INT64(2)] [INT64(1)]]`)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestFoundRows(t *testing.T) {
// Wait for schema tracking to run and mark t2 as authoritative before we try out the queries.
// Some of the queries depend on schema tracking to run successfully to be able to replace the StarExpr
// in the select clause with the definitive column list.
err = utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, keyspaceName, "t2")
err = utils.WaitForAuthoritative(t, keyspaceName, "t2", clusterInstance.VtgateProcess.ReadVSchema)
require.NoError(t, err)
runTests := func(workload string) {
mcmp.AssertFoundRowsValue("select * from t2", workload, 5)
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vtgate/endtoend/last_insert_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import (
"fmt"
"testing"

"vitess.io/vitess/go/vt/vtgate/evalengine"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)

func TestLastInsertId(t *testing.T) {
require.NoError(t,
utils.WaitForAuthoritative(t, "ks", "t1_last_insert_id", cluster.VTProcess().ReadVSchema))

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
Expand All @@ -53,6 +56,9 @@ func TestLastInsertId(t *testing.T) {
}

func TestLastInsertIdWithRollback(t *testing.T) {
require.NoError(t,
utils.WaitForAuthoritative(t, "ks", "t1_last_insert_id", cluster.VTProcess().ReadVSchema))

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
Expand Down
84 changes: 5 additions & 79 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ package endtoend

import (
"context"
_ "embed"
"fmt"
"os"
"testing"

_flag "vitess.io/vitess/go/internal/flag"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vttest"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vttestpb "vitess.io/vitess/go/vt/proto/vttest"
"vitess.io/vitess/go/vt/vttest"
)

var (
Expand All @@ -37,82 +37,8 @@ var (
mysqlParams mysql.ConnParams
grpcAddress string

schema = `
create table t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_copy_basic(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_copy_all(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_copy_resume(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
primary key(id2)
) Engine=InnoDB;
create table vstream_test(
id bigint,
val bigint,
primary key(id)
) Engine=InnoDB;
create table aggr_test(
id bigint,
val1 varchar(16),
val2 bigint,
primary key(id)
) Engine=InnoDB;
create table t2(
id3 bigint,
id4 bigint,
primary key(id3)
) Engine=InnoDB;
create table t2_id4_idx(
id bigint not null auto_increment,
id4 bigint,
id3 bigint,
primary key(id),
key idx_id4(id4)
) Engine=InnoDB;
create table t1_last_insert_id(
id bigint not null auto_increment,
id1 bigint,
primary key(id)
) Engine=InnoDB;
create table t1_row_count(
id bigint not null,
id1 bigint,
primary key(id)
) Engine=InnoDB;
create table t1_sharded(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
`
//go:embed schema.sql
Schema string

vschema = &vschemapb.Keyspace{
Sharded: true,
Expand Down Expand Up @@ -281,7 +207,7 @@ func TestMain(m *testing.M) {
},
},
}
if err := cfg.InitSchemas("ks", schema, vschema); err != nil {
if err := cfg.InitSchemas("ks", Schema, vschema); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.RemoveAll(cfg.SchemaDir)
return 1
Expand Down
74 changes: 74 additions & 0 deletions go/vt/vtgate/endtoend/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
create table t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_copy_basic(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_copy_all(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_copy_resume(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
primary key(id2)
) Engine=InnoDB;

create table vstream_test(
id bigint,
val bigint,
primary key(id)
) Engine=InnoDB;

create table aggr_test(
id bigint,
val1 varchar(16),
val2 bigint,
primary key(id)
) Engine=InnoDB;

create table t2(
id3 bigint,
id4 bigint,
primary key(id3)
) Engine=InnoDB;

create table t2_id4_idx(
id bigint not null auto_increment,
id4 bigint,
id3 bigint,
primary key(id),
key idx_id4(id4)
) Engine=InnoDB;

create table t1_last_insert_id(
id bigint not null auto_increment,
id1 bigint,
primary key(id)
) Engine=InnoDB;

create table t1_row_count(
id bigint not null,
id1 bigint,
primary key(id)
) Engine=InnoDB;

create table t1_sharded(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
26 changes: 0 additions & 26 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bb26a7d

Please sign in to comment.