-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refine the logical and physical plan serialization and deserialization #4659
Conversation
…th the physical plan code
2d8bc8f
to
fb9ce98
Compare
Hi @alamb and @andygrove, could you help review this PR? After this PR merged, we can go further to upgrade the Datafusion dependency in Ballista, like yahoNanJing/arrow-ballista@279d9af does. The main impact at the Ballista side is that since we moved the physical plan part to the datafusion, we can not leverage the build-in way to deal with shuffle write and shuffle reader. Instead, we have to leverage the physical plan extension for Ballista specific shuffle reader and shuffler write. From the above code link, we have implemented by that way and works well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks @yahoNanJing
This makes the protobuf serialization code a bit nicer -- thank you
I can't say I was able to verify every line in this PR given it was so large (e.g I didn't double check that all the tests were correctly moved)
What I did see looks great to me 👍
@@ -1330,77 +1337,4 @@ message ColumnStats { | |||
ScalarValue max_value = 2; | |||
uint32 null_count = 3; | |||
uint32 distinct_count = 4; | |||
} | |||
|
|||
message PartitionLocation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 -- I believe these are ballista specific
if let Some(field) = $PB.as_ref() { | ||
Ok(field.try_into()?) | ||
} else { | ||
Err(proto_error("Missing required field in protobuf")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be helpful (as a follow on PR) to include the field name that were missing
} | ||
|
||
#[cfg(test)] | ||
mod roundtrip_tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are moved to datafusion/proto/src/logical_plan/mod.rs
repeated uint32 projection = 4; | ||
ScanLimit limit = 5; | ||
Statistics statistics = 6; | ||
repeated string table_partition_cols = 7; | ||
string object_store_url = 8; | ||
repeated PhysicalSortExprNode output_ordering = 9; | ||
repeated ConfigOption options = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I general I am not sure it is a good idea to serialize the config options as upon deserialization they will be their own copy.
However that being said, perhaps the issue is that FileScanExecConfig
has a copy of the config options in the first place. I think that should get better over time as we consolidate the configuration more
@@ -304,6 +306,96 @@ pub fn parse_protobuf_hash_partitioning( | |||
} | |||
} | |||
|
|||
pub fn parse_protobuf_file_scan_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
use datafusion_common::DataFusionError; | ||
|
||
impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> { | ||
impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Since it's a big refactor for the proto mod, I propose to merge this PR as soon as possible. Otherwise, there will be many conflicts when new commits merged to the master branch. |
I'll merge it first. |
Benchmark runs are scheduled for baseline = 9f6387e and contender = 09d3378. 09d3378 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Thanks again @yahoNanJing |
Which issue does this PR close?
Closes #4597.
Rationale for this change
What changes are included in this PR?
After #4390, there're still parts which needs to follow up for the physical plan.
Remove Ballista related things in the datafusion.proto
Add missing fields for FileScanExecConf, like output_ordering and config_options. Otherwise, it will block some Ballista UT.
Make the code well organized for serialization and deserialization like physical plan does
Are these changes tested?
Are there any user-facing changes?