-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Take the top level schema
into account when creating UnionExec
#4753
Take the top level schema
into account when creating UnionExec
#4753
Conversation
Signed-off-by: remzi <[email protected]>
Signed-off-by: remzi <[email protected]>
Signed-off-by: remzi <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @HaoYang670 -- looks good to me
I tried this on main and it fails:
DataFusion CLI v15.0.0
❯ CREATE table t as SELECT 1 as a;
0 rows in set. Query took 0.022 seconds.
❯ SELECT COUNT(*) FROM (SELECT a from t UNION ALL SELECT a from t);
Internal("create_physical_expr expected same number of fields, got got Arrow schema with 1 and DataFusion schema with 0")
❯
And it passes after this PR 👍
async fn union_all_with_count() -> Result<()> { | ||
let ctx = SessionContext::new(); | ||
execute_to_batches(&ctx, "CREATE table t as SELECT 1 as a").await; | ||
let sql = "SELECT COUNT(*) FROM (SELECT a from t UNION ALL SELECT a from t)"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 agrees with postgres:
postgres=# CREATE table t as SELECT 1 as a;
SELECT 1
postgres=# SELECT COUNT(*) FROM (SELECT a from t UNION ALL SELECT a from t);
ERROR: subquery in FROM must have an alias
LINE 1: SELECT COUNT(*) FROM (SELECT a from t UNION ALL SELECT a fro...
^
HINT: For example, FROM (SELECT ...) [AS] foo.
postgres=# SELECT COUNT(*) FROM (SELECT a from t UNION ALL SELECT a from t) as sq;
count
-------
2
(1 row)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I'm not sure if it was a bug, but the PushDownProjection
doesn't push down the empty projection to the child plans of Union
in this example. Maybe @jackwener knows more about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I'm not sure if it was a bug, but the
PushDownProjection
doesn't push down the empty projection to the child plans ofUnion
in this example. Maybe @jackwener knows more about it.
we need an issue to track this problem, I think there are some different ways of dealing with this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please file an issue @jackwener ?
let physical_plans = futures::stream::iter(inputs) | ||
.then(|lp| self.create_initial_plan(lp, session_state)) | ||
.try_collect::<Vec<_>>() | ||
.await?; | ||
Ok(Arc::new(UnionExec::new(physical_plans))) | ||
if schema.fields().len() < physical_plans[0].schema().fields().len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what is the reason to not always run UnionExec::try_new_with_schema
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original implementation ignores the top level schema, but I don't know why. (Maybe @gandronchik knows more about it). So I try to minimize the scope of this change to avoid introducing new bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good -- maybe I'll try to simply the code once we have merged this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, after double thinking, I am uncertain whether it is correct to ignore the top level schema when generating physical plans. It would lose the information about type coercion and metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A great job to me.
But I am a little confusing about this PR. Why we can't correct the LogicalPlan but fix them in PhysicalPlan?
Current problem is cause by schema
, maybe we fix schema
in LogicalPlan?
I feel like this PR is good enough to merge in the sense that a query that used to fail now runs correctly and there is additional test coverage. It sounds like there are likely more improvements possible as well Thanks for the comments and review @jackwener and @HaoYang670 |
Benchmark runs are scheduled for baseline = cf45eb9 and contender = 5b125b1. 5b125b1 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
We could revert this PR if the bug in logical optimizer could be solved. This PR is just a quick fix. |
Signed-off-by: remzi [email protected]
Which issue does this PR close?
Closes #4677.
Rationale for this change
When creating
UnionExec
fromLogicalPlan::Union
, the schema of the top plan is ignored. This could cause errors when the top level schema doesn't match the child schema. (see the example in #4677)Before
After
What changes are included in this PR?
try_new_with_schema
for creatingUnionExec
try_new_schema
, else usenew
Are these changes tested?
Yes.
Are there any user-facing changes?
No.