Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return the correct timestamp for the right-hand table of the ASOF join #4867

Merged
merged 6 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 59 additions & 39 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ void Join::setSampleBlock(const Block & block)

sample_block_with_columns_to_add = materializeBlock(block);

blocklist_sample = Block(block.getColumnsWithTypeAndName());
prepareBlockListStructure(blocklist_sample);

/// Move from `sample_block_with_columns_to_add` key columns to `sample_block_with_keys`, keeping the order.
size_t pos = 0;
while (pos < sample_block_with_columns_to_add.columns())
Expand Down Expand Up @@ -482,10 +485,47 @@ namespace
}
}

void Join::prepareBlockListStructure(Block & stored_block)
{
if (isRightOrFull(kind))
{
/** Move the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will expect.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block.getPositionByName(name);
ColumnWithTypeAndName col = stored_block.safeGetByPosition(pos);
stored_block.erase(pos);
stored_block.insert(key_num, std::move(col));
++key_num;
}
}
else
{
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section

/// Remove the key columns from stored_block, as they are not needed.
/// However, do not erase the ASOF column if this is an asof join
for (const auto & name : key_names_right)
{
if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back())
{
LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name);
break; // this is the last column so break is OK
}

if (!erased.count(name))
stored_block.erase(stored_block.getPositionByName(name));
erased.insert(name);
}
}
}

bool Join::insertFromBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());

if (empty())
throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -514,33 +554,9 @@ bool Join::insertFromBlock(const Block & block)
blocks.push_back(block);
Block * stored_block = &blocks.back();

if (isRightOrFull(kind))
{
/** Move the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will expect.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block->getPositionByName(name);
ColumnWithTypeAndName col = stored_block->safeGetByPosition(pos);
stored_block->erase(pos);
stored_block->insert(key_num, std::move(col));
++key_num;
}
}
else
{
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section
prepareBlockListStructure(*stored_block);

/// Remove the key columns from stored_block, as they are not needed.
for (const auto & name : key_names_right)
{
if (!erased.count(name))
stored_block->erase(stored_block->getPositionByName(name));
erased.insert(name);
}
}
LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure());

size_t size = stored_block->columns();

Expand Down Expand Up @@ -579,7 +595,9 @@ class AddedColumns

AddedColumns(const Block & sample_block_with_columns_to_add,
const Block & block_with_columns_to_add,
const Block & block, size_t num_columns_to_skip)
const Block & block,
const Block & blocklist_sample,
const ColumnsWithTypeAndName & extras)
{
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();

Expand All @@ -593,8 +611,14 @@ class AddedColumns

/// Don't insert column if it's in left block or not explicitly required.
if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name))
addColumn(src_column, num_columns_to_skip + i);
addColumn(src_column);
}

for (auto & extra : extras)
addColumn(extra);

for (auto & tn : type_name)
right_indexes.push_back(blocklist_sample.getPositionByName(tn.second));
}

size_t size() const { return columns.size(); }
Expand Down Expand Up @@ -622,12 +646,11 @@ class AddedColumns
MutableColumns columns;
std::vector<size_t> right_indexes;

void addColumn(const ColumnWithTypeAndName & src_column, size_t idx)
void addColumn(const ColumnWithTypeAndName & src_column)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name);
right_indexes.push_back(idx);
}
};

Expand Down Expand Up @@ -819,14 +842,12 @@ void Join::joinBlockImpl(
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
* For FULL/RIGHT JOIN, the saved blocks contain keys;
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column
*/
size_t num_columns_to_skip = 0;
if constexpr (right_or_full)
num_columns_to_skip = keys_size;

/// Add new columns to the block.

AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip);
ColumnsWithTypeAndName extras;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
extras.push_back(sample_block_with_keys.getByName(key_names_right.back()));
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, blocklist_sample, extras);

std::unique_ptr<IColumn::Offsets> offsets_to_replicate;

Expand All @@ -837,7 +858,6 @@ void Join::joinBlockImpl(
block.insert(added.moveColumn(i));

/// Filter & insert missing rows

auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join);

if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof)
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,9 @@ class Join
/// Block with key columns in the same order they appear in the right-side table.
Block sample_block_with_keys;

/// Block as it would appear in the BlockList
Block blocklist_sample;

Poco::Logger * log;

/// Limits for maximum map size.
Expand All @@ -393,6 +396,11 @@ class Join

void init(Type type_);

/** Take an inserted block and discard everything that does not need to be stored
* Example, remove the keys as they come from the LHS block, but do keep the ASOF timestamps
*/
void prepareBlockListStructure(Block & stored_block);

/// Throw an exception if blocks have different types of key columns.
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right) const;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
1 101 1 0 0 0
1 102 2 2 102 1
1 103 3 2 102 1
1 104 4 4 104 1
1 105 5 4 104 1
1 101 1 0 0 0
1 102 2 2 102 1
1 103 3 2 102 1
1 104 4 4 104 1
1 105 5 4 104 1
1 101 1 0 0 0
1 102 2 2 102 1
1 103 3 2 102 1
1 104 4 4 104 1
1 105 5 4 104 1
25 changes: 25 additions & 0 deletions dbms/tests/queries/0_stateless/00927_asof_join_correct_bt.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
USE test;

DROP TABLE IF EXISTS A;
DROP TABLE IF EXISTS B;

CREATE TABLE A(k UInt32, t UInt32, a UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO A(k,t,a) VALUES (1,101,1),(1,102,2),(1,103,3),(1,104,4),(1,105,5);

CREATE TABLE B(k UInt32, t UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE B;


CREATE TABLE B(t UInt32, k UInt32, b UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE B;

CREATE TABLE B(k UInt32, b UInt64, t UInt32) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,102,2), (1,104,4);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE B;

DROP TABLE A;
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
1 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1
2 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
2 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2
3 1970-01-01 00:00:01 1 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:02 2 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:03 3 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:04 4 0 0000-00-00 00:00:00 0
3 1970-01-01 00:00:05 5 0 0000-00-00 00:00:00 0
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2
1 1970-01-01 00:00:02 2 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:03 1
1 1970-01-01 00:00:03 3 2 1970-01-01 00:00:02 1
1 1970-01-01 00:00:04 4 4 1970-01-01 00:00:04 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:05 1
1 1970-01-01 00:00:05 5 4 1970-01-01 00:00:04 1
2 1970-01-01 00:00:03 3 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:04 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:05 2
2 1970-01-01 00:00:04 4 3 1970-01-01 00:00:03 2
2 1970-01-01 00:00:05 5 3 1970-01-01 00:00:03 2