-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement Spark unhex #342
Conversation
let fun = BuiltinScalarFunction::from_str(fun_name); | ||
if fun.is_err() { | ||
|
||
if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but more idiomatic IMO
let fun = BuiltinScalarFunction::from_str(fun_name); | ||
if fun.is_err() { | ||
Ok(ScalarFunctionDefinition::UDF(registry.udf(fun_name)?)) | ||
if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but more idiomatic IMO
@@ -1396,6 +1396,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { | |||
val optExpr = scalarExprToProto("atan2", leftExpr, rightExpr) | |||
optExprWithInfo(optExpr, expr, left, right) | |||
|
|||
case e @ Unhex(child, failOnError) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala#L1152
vs
https://github.com/apache/spark/blob/45ba9224602eb18fe45e339cbb8cf2e8a4924f0b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala#L1125
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use a shim approach. We currently have one folder with source for spark-3.x. We should add additional folders as needed e.g. spark-3.3.x / spark-3.4.x with any code that is specific to those versions.
I am happy to help with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can handle this using existing shim, something like
case e @ Unhex =>
e.child...
getFailOnError(e) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wrong, actually getFailOnError
needs to be modified for this approach as unhex
is unaryExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andygrove Would you mind having a look at the updated PR and let me know what you think of the shim implemntation? The thought was to setup a major and minor shim to keep the existing shim for all of spark 3 then apply the more granular difference in the minor. It gets a little weird in that I'm applying 3.3's shim to 3.2.
@kazuyukitanimura Thanks for looking!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tshauck I will review this tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can have duplicated shim for 3.3 and 3.2
pom.xml
Outdated
@@ -88,7 +88,8 @@ under the License. | |||
<argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine> | |||
<additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source> | |||
<additional.3_4.test.source>spark-3.4</additional.3_4.test.source> | |||
<shims.source>spark-3.x</shims.source> | |||
<shims.majorSource>spark-3.x</shims.majorSource> | |||
<shims.minorSource>spark-3.4.x</shims.minorSource> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI spark-3.x
will be gone and supposed to be replaced by independent spark-3.2
, spark-3.3
, and spark-3.4
dirs
So let's go ahead and start using spark-3.4
instead of spark-3.4.x
? I.e.
<shims.source.shared>spark-3.x</shims.source.shared>
<shims.source>spark-3.4</shims.source>
pom.xml
Outdated
@@ -500,6 +501,7 @@ under the License. | |||
<!-- we don't add special test suits for spark-3.2, so a not existed dir is specified--> | |||
<additional.3_3.test.source>not-needed-yet</additional.3_3.test.source> | |||
<additional.3_4.test.source>not-needed-yet</additional.3_4.test.source> | |||
<shims.minorSource>spark-3.3.x</shims.minorSource> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this should be spark-3.2
?
val childCast = Cast(unHex._1, StringType) | ||
val failOnErrorCast = Cast(unHex._2, BooleanType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if we do not use Cast
?
import org.apache.comet.shims.ShimQueryPlanSerde | ||
|
||
/** | ||
* An utility object for query plan and expression serialization. | ||
*/ | ||
object QueryPlanSerde extends Logging with ShimQueryPlanSerde { | ||
object QueryPlanSerde extends Logging with ShimQueryPlanSerde with ShimCometUnhexExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say ShimCometUnhexExpr
should be more generic name, not unhex specific. What about ShimCometExpr
(or if you have other idea, please feel free to propose)?
Otherwise, we will have to keep adding trait class per function.
Eventually we should merge ShimQueryPlanSerde
and ShimCometExpr
into one when we remove spark-3.x
dir.
Hi, I updated the shim handling for 3.2 and made various other updates (based on PR feedback and general cleanup). Please have another look and let me know what you think, thanks! |
spark/src/main/spark-3.2/org/apache/comet/shims/ShimCometUnhexExpr.scala
Outdated
Show resolved
Hide resolved
spark/src/main/spark-3.2/org/apache/comet/shims/ShimCometUnhexExpr.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Outdated
Show resolved
Hide resolved
|
||
// Adjust the string if it has an odd length, and prepare to add a padding byte if needed. | ||
let needs_padding = string.len() % 2 != 0; | ||
let adjusted_string = if needs_padding { &string[1..] } else { string }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, string[0]
is discarded when the length is odd, is it intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the logic in Spark 3.4.2 for handling the first char if the input is padded, for reference. It looks like there is some validation of the first digit that we do not have in this PR and it also looks like the unhexed digit is stored in the output is used in the return value if the length of the input string is 1. It would be good to make sure that we have tests covering this case.
if ((bytes.length & 0x01) != 0) {
// padding with '0'
if (bytes(0) < 0) {
return null
}
val v = Hex.unhexDigits(bytes(0))
if (v == -1) {
return null
}
out(0) = v
i += 1
oddShift = 1
}
fn test_unhex() -> Result<(), Box<dyn std::error::Error>> { | ||
let mut result = Vec::new(); | ||
|
||
unhex("537061726B2053514C", &mut result)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also have a test for the case where the input is padded?
val table = "test" | ||
withTable(table) { | ||
sql(s"create table $table(col string) using parquet") | ||
sql(s"insert into $table values('537061726B2053514C')") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see more values being tested here, both valid and invalid, and covering the padded case.
common/pom.xml
Outdated
@@ -179,7 +179,8 @@ under the License. | |||
</goals> | |||
<configuration> | |||
<sources> | |||
<source>src/main/${shims.source}</source> | |||
<source>src/main/${shims.majorSource}</source> | |||
<source>src/main/${shims.minorSource}</source> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the minor version shims. These are going to help me with some of work around supporting cast
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @tshauck. This is looking great. I added some comments around additional testing.
Thanks for all the feedback. I think I've addressed the build/naming/etc feedback, and will have a look at improving the tests and any associated implementation changes sometime tomorrow. I'll request a review via GH when it's ready. |
I think this is ready for review. I updated the |
Thanks for the updates @tshauck. I plan on reviewing later today. |
|
||
checkSparkAnswerAndOperator(s"SELECT unhex(col) FROM $table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be worth adding an ORDER BY
clause here to ensure that the test is deterministic.
checkSparkAnswerAndOperator(s"SELECT unhex(col) FROM $table") | |
checkSparkAnswerAndOperator(s"SELECT unhex(col) FROM $table ORDER BY col") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the Spark 3.2 failure is at all related to ordering?
[WrappedArray(10, 27)] [WrappedArray(10, 27)]
![WrappedArray(115, 116, 114, 105, 110, 103)] [WrappedArray(10, 27)]
![WrappedArray(27, 0)] [WrappedArray(115, 116, 114, 105, 110, 103)]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the spark repo a bit more, it looks like there was a bug in the 3.2 implementation that was fixed in subsequent versions.
How should I handle this? Like should I write a native implementation of the erroneous code in order to be faithful to the spark, have it be correct if different, or maybe just fallback to the spark implementation on that version (not sure if that's possible)? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually adding the ORDER BY
causes the test to fail because it no longer runs fully native, so ignore that suggestion.
Also, I do see code differences in Spark between 3.2 and 3.4 in the unhex algorithm. 3.2 does not have the oddShift
variable. oddShift
was added in apache/spark@276abe3
I guess the options are:
- mark
unhex
as incompat just for 3.2 and skip the test for 3.2 (probably the easiest path) - implement per-spark-version logic in Rust for unhex
Let me know what you think or if you have any questions on this. This is a good example of the challenge of supporting multiple Spark versions 😓
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking, I think we found the same commit w.r.t. 3.2's differences.
I'll just go the easy route since matching 3.2 would be implementing buggy code and 3.2 is the oldest supported version (so presumably it'll be dropped at some point). I'll use this as a chance to poke around the repo, but may ask in discord if I get blocked on marking it incompat for 3.2 and skipping the tests for that version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made an update in 36baf8e that seems to work (https://github.com/tshauck/arrow-datafusion-comet/actions/runs/8994978806). Happy to get any feedback about the approach taken.
Co-authored-by: Andy Grove <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @tshauck. This is a great first contribution! I added a suggestion to fix the Scala 2.13 build failures and also a suggestion for an additional comment.
LGTM pending CI.
@viirya @kazuyukitanimura do you have any additional feedback? |
I plan on merging this tomorrow if there is no more feedback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, one more comment
} else if fail_on_error { | ||
return exec_err!("Input to unhex is not a valid hex string: {s}"); | ||
} else { | ||
builder.append_null(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If unhex()
fails and fail_on_error=false
, the encoded
is not cleared? Is this a same behavior as Spark? It would be great if we could add a test to verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All good, thank you very much for looking. I think you're right in that there was a bug, which should be fixed and tested in c5c3fcd.
@@ -1396,6 +1397,16 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { | |||
val optExpr = scalarExprToProto("atan2", leftExpr, rightExpr) | |||
optExprWithInfo(optExpr, expr, left, right) | |||
|
|||
case e: Unhex if !isSpark32 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but potentially we can vote in the community when to deprecate 3.2 support...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I forgot to ask about dictionary and scalar. Filed #477 |
Which issue does this PR close?
Closes #341
Rationale for this change
unhex
is currently unsupported by comet. This my first PR into this repo, so certainly open to any feedback to make it more inline w/ expectations.What changes are included in this PR?
Add
unhex
as well as make some minor refactors.How are these changes tested?
Added simple tests to the rust and spark sql side of the code.