Skip to content

Commit

Permalink
Spark: Fix empty scan issue when start timestamp retrieves root snaps…
Browse files Browse the repository at this point in the history
…hot and end timestamp is missing (#11967)

Co-authored-by: Leon Lin <[email protected]>
  • Loading branch information
lliangyu-lin and Leon Lin authored Jan 16, 2025
1 parent b128bba commit 246439a
Show file tree
Hide file tree
Showing 6 changed files with 599 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,60 @@ public void testNoSnapshotIdInput() {
sql("select * from %s order by _change_ordinal, id", viewName));
}

@Test
public void testOnlyStartSnapshotIdInput() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s'," + "options => map('%s', '%s'))",
catalogName, tableName, SparkReadOptions.START_SNAPSHOT_ID, snap0.snapshotId());

assertEquals(
"Rows should match",
ImmutableList.of(
row(2, "b", INSERT, 0, snap1.snapshotId()),
row(-2, "b", INSERT, 1, snap2.snapshotId()),
row(2, "b", DELETE, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testOnlyEndTimestampIdInput() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s'," + "options => map('%s', '%s'))",
catalogName, tableName, SparkReadOptions.END_SNAPSHOT_ID, snap1.snapshotId());

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()), row(2, "b", INSERT, 1, snap1.snapshotId())),
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testTimestampsBasedQuery() {
createTableWithTwoColumns();
Expand Down Expand Up @@ -186,6 +240,149 @@ public void testTimestampsBasedQuery() {
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testOnlyStartTimestampInput() {
createTableWithTwoColumns();
long beginning = System.currentTimeMillis();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
long afterFirstInsert = waitUntilAfter(snap0.timestampMillis());

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s', " + "options => map('%s', '%s'))",
catalogName, tableName, SparkReadOptions.START_TIMESTAMP, beginning);

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()),
row(2, "b", INSERT, 1, snap1.snapshotId()),
row(-2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", DELETE, 2, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));

returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s', " + "options => map('%s', '%s'))",
catalogName, tableName, SparkReadOptions.START_TIMESTAMP, afterFirstInsert);

assertEquals(
"Rows should match",
ImmutableList.of(
row(2, "b", INSERT, 0, snap1.snapshotId()),
row(-2, "b", INSERT, 1, snap2.snapshotId()),
row(2, "b", DELETE, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testOnlyEndTimestampInput() {
createTableWithTwoColumns();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();
long afterSecondInsert = waitUntilAfter(snap1.timestampMillis());

sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s', " + "options => map('%s', '%s'))",
catalogName, tableName, SparkReadOptions.END_TIMESTAMP, afterSecondInsert);

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()), row(2, "b", INSERT, 1, snap1.snapshotId())),
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testStartTimeStampEndSnapshotId() {
createTableWithTwoColumns();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
long afterFirstInsert = waitUntilAfter(snap0.timestampMillis());

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s', "
+ "options => map('%s', '%s', '%s', '%s'))",
catalogName,
tableName,
SparkReadOptions.START_TIMESTAMP,
afterFirstInsert,
SparkReadOptions.END_SNAPSHOT_ID,
snap2.snapshotId());

assertEquals(
"Rows should match",
ImmutableList.of(
row(2, "b", INSERT, 0, snap1.snapshotId()),
row(-2, "b", INSERT, 1, snap2.snapshotId()),
row(2, "b", DELETE, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testStartSnapshotIdEndTimestamp() {
createTableWithTwoColumns();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();
long afterSecondInsert = waitUntilAfter(snap1.timestampMillis());

sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s', "
+ "options => map('%s', '%s', '%s', '%s'))",
catalogName,
tableName,
SparkReadOptions.START_SNAPSHOT_ID,
snap0.snapshotId(),
SparkReadOptions.END_TIMESTAMP,
afterSecondInsert);

assertEquals(
"Rows should match",
ImmutableList.of(row(2, "b", INSERT, 0, snap1.snapshotId())),
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testUpdate() {
createTableWithTwoColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,11 @@ public Scan buildChangelogScan() {

boolean emptyScan = false;
if (startTimestamp != null) {
startSnapshotId = getStartSnapshotId(startTimestamp);
if (startSnapshotId == null && endTimestamp == null) {
if (table.currentSnapshot() == null
|| startTimestamp > table.currentSnapshot().timestampMillis()) {
emptyScan = true;
}
startSnapshotId = getStartSnapshotId(startTimestamp);
}

if (endTimestamp != null) {
Expand Down
Loading

0 comments on commit 246439a

Please sign in to comment.