Skip to content

Commit

Permalink
Add an iterable of rows comparison function in FlinkTestBase and reve…
Browse files Browse the repository at this point in the history
…rt set comparisons
  • Loading branch information
kbendick committed Apr 27, 2022
1 parent f2b566c commit 8dc8866
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -102,4 +103,10 @@ protected List<Row> sql(String query, Object... args) {
throw new RuntimeException("Failed to collect table result", e);
}
}

protected void assertSameElements(Iterable<Row> expected, Iterable<Row> actual) {
Assertions.assertThat(actual)
.isNotNull()
.containsExactlyInAnyOrderElementsOf(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.TableEnvironment;
Expand All @@ -32,7 +31,7 @@
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -115,12 +114,12 @@ public void testLimitPushDown() {
String sqlLimitExceed = String.format("SELECT * FROM %s LIMIT 4", TABLE_NAME);
List<Row> resultExceed = sql(sqlLimitExceed);
Assert.assertEquals("Should have 3 records", 3, resultExceed.size());
Set<Row> expectedList = Sets.newHashSet(
List<Row> expectedList = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0),
Row.of(3, null, 30.0)
);
Assert.assertEquals("Should produce the expected records", expectedList, Sets.newHashSet(resultExceed));
assertSameElements(expectedList, resultExceed);

String sqlMixed = String.format("SELECT * FROM %s WHERE id = 1 LIMIT 2", TABLE_NAME);
List<Row> mixedResult = sql(sqlMixed);
Expand All @@ -132,13 +131,12 @@ public void testLimitPushDown() {
public void testNoFilterPushDown() {
String sql = String.format("SELECT * FROM %s ", TABLE_NAME);
List<Row> result = sql(sql);
Set<Row> expectedRecords = Sets.newHashSet(
List<Row> expectedRecords = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0),
Row.of(3, null, 30.0)
);
Assert.assertEquals("Should have 3 records", 3, expectedRecords.size());
Assert.assertEquals("Should produce the expected record", expectedRecords, Sets.newHashSet(result));
assertSameElements(result, expectedRecords);
Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
}

Expand Down Expand Up @@ -185,12 +183,11 @@ public void testFilterPushDownNoEqual() {
List<Row> resultNE = sql(sqlNE);
Assert.assertEquals("Should have 2 records", 2, resultNE.size());

Set<Row> expectedNE = Sets.newHashSet(
List<Row> expectedNE = Lists.newArrayList(
Row.of(2, "b", 20.0),
Row.of(3, null, 30.0)
);
Assert.assertEquals("Should have two records", 2, resultNE.size());
Assert.assertEquals("Should produce the expected record", expectedNE, Sets.newHashSet(resultNE));
assertSameElements(expectedNE, resultNE);
Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
}
Expand Down Expand Up @@ -225,11 +222,11 @@ public void testFilterPushDownOr() {
List<Row> resultOr = sql(sqlOr);
Assert.assertEquals("Should have 2 record", 2, resultOr.size());

Set<Row> expectedOR = Sets.newHashSet(
List<Row> expectedOR = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0)
);
Assert.assertEquals("Should produce the expected record", expectedOR, Sets.newHashSet(resultOr));
assertSameElements(expectedOR, resultOr);

Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
Expand All @@ -243,11 +240,11 @@ public void testFilterPushDownGreaterThan() {
List<Row> resultGT = sql(sqlGT);
Assert.assertEquals("Should have 2 record", 2, resultGT.size());

Set<Row> expectedGT = Sets.newHashSet(
List<Row> expectedGT = Lists.newArrayList(
Row.of(2, "b", 20.0),
Row.of(3, null, 30.0)
);
Assert.assertEquals("Should produce the expected record", expectedGT, Sets.newHashSet(resultGT));
assertSameElements(expectedGT, resultGT);

Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
Expand All @@ -270,11 +267,11 @@ public void testFilterPushDownGreaterThanLiteralOnLeft() {
List<Row> resultGT = sql(sqlGT);
Assert.assertEquals("Should have 2 records", 2, resultGT.size());

Set<Row> expectedGT = Sets.newHashSet(
List<Row> expectedGT = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0)
);
Assert.assertEquals("Should produce the expected record", expectedGT, Sets.newHashSet(resultGT));
assertSameElements(expectedGT, resultGT);;

Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
Expand All @@ -288,11 +285,11 @@ public void testFilterPushDownGreaterThanEqual() {
List<Row> resultGTE = sql(sqlGTE);
Assert.assertEquals("Should have 2 records", 2, resultGTE.size());

Set<Row> expectedGTE = Sets.newHashSet(
List<Row> expectedGTE = Lists.newArrayList(
Row.of(2, "b", 20.0),
Row.of(3, null, 30.0)
);
Assert.assertEquals("Should produce the expected record", expectedGTE, Sets.newHashSet(resultGTE));
assertSameElements(resultGTE, expectedGTE);

Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
Expand All @@ -315,11 +312,11 @@ public void testFilterPushDownGreaterThanEqualLiteralOnLeft() {
List<Row> resultGTE = sql(sqlGTE);
Assert.assertEquals("Should have 2 records", 2, resultGTE.size());

Set<Row> expectedGTE = Sets.newHashSet(
List<Row> expectedGTE = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0)
);
Assert.assertEquals("Should produce the expected record", expectedGTE, Sets.newHashSet(resultGTE));
assertSameElements(expectedGTE, resultGTE);

Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
Expand Down Expand Up @@ -402,11 +399,11 @@ public void testFilterPushDownIn() {
List<Row> resultIN = sql(sqlIN);
Assert.assertEquals("Should have 2 records", 2, resultIN.size());

Set<Row> expectedIN = Sets.newHashSet(
List<Row> expectedIN = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0)
);
Assert.assertEquals("Should produce the expected record", expectedIN, Sets.newHashSet(resultIN));
assertSameElements(expectedIN, resultIN);
Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
}
Expand Down Expand Up @@ -449,11 +446,11 @@ public void testFilterPushDownIsNotNull() {
List<Row> resultNotNull = sql(sqlNotNull);
Assert.assertEquals("Should have 2 record", 2, resultNotNull.size());

Set<Row> expected = Sets.newHashSet(
List<Row> expected = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0)
);
Assert.assertEquals("Should produce the expected record", expected, Sets.newHashSet(resultNotNull));
assertSameElements(expected, resultNotNull);

Assert.assertEquals("Should create only one scan", 1, scanEventCount);
Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
Expand Down Expand Up @@ -492,12 +489,11 @@ public void testFilterPushDownBetween() {
List<Row> resultBetween = sql(sqlBetween);
Assert.assertEquals("Should have 2 record", 2, resultBetween.size());

Set<Row> expectedBetween = Sets.newHashSet(
List<Row> expectedBetween = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0)
);
Assert.assertEquals("Should have 2 records", 2, resultBetween.size());
Assert.assertEquals("Should produce the expected record", expectedBetween, Sets.newHashSet(resultBetween));
assertSameElements(expectedBetween, resultBetween);

Assert.assertEquals("Should create only one scan", 1, scanEventCount);
String expected = "(ref(name=\"id\") >= 1 and ref(name=\"id\") <= 2)";
Expand Down Expand Up @@ -553,12 +549,12 @@ public void testFilterNotPushDownLike() {
sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%' ";
resultLike = sql(sqlNoPushDown);
Assert.assertEquals("Should have 3 records", 3, resultLike.size());
Set<Row> expectedRecords = Sets.newHashSet(
List<Row> expectedRecords = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0),
Row.of(3, null, 30.0)
);
Assert.assertEquals("Should produce the expected record", expectedRecords, Sets.newHashSet(resultLike));
assertSameElements(expectedRecords, resultLike);
Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());

sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE 'iceber_' ";
Expand All @@ -578,13 +574,12 @@ public void testFilterNotPushDownLike() {
public void testFilterPushDown2Literal() {
String sql2Literal = String.format("SELECT * FROM %s WHERE 1 > 0 ", TABLE_NAME);
List<Row> result = sql(sql2Literal);
Set<Row> expectedRecords = Sets.newHashSet(
List<Row> expectedRecords = Lists.newArrayList(
Row.of(1, "iceberg", 10.0),
Row.of(2, "b", 20.0),
Row.of(3, null, 30.0)
);
Assert.assertEquals("Should have 3 records", 3, result.size());
Assert.assertEquals("Should produce the expected record", expectedRecords, Sets.newHashSet(result));
assertSameElements(expectedRecords, result);
Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
}

Expand Down

0 comments on commit 8dc8866

Please sign in to comment.