-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Supports UUID column #395
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @huaxingao should we get this covered by tests?
This is for iceberg/comet integration. I don't think there is an easy way to test this now. I have tested this on my local, though. |
Maybe we can return uuid value and assert it somehow, although its non-determenistic? I'm thinking if we can be protected from regression if anyone else changes this code later? |
I am thinking of adding Iceberg tests in Comet after the Iceberg/Comet integration is complete, to ensure that the changes won't regress. |
cc @viirya |
@@ -169,6 +170,7 @@ public void close() { | |||
|
|||
/** Returns a decoded {@link CometDecodedVector Comet vector}. */ | |||
public CometDecodedVector loadVector() { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed extra line
@@ -207,6 +214,7 @@ public CometDecodedVector loadVector() { | |||
DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); | |||
|
|||
CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); | |||
cometVector.setIsUuid(isUuid); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put it into constructor parameter list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added isUuid
in constructor parameter list
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #395 +/- ##
============================================
+ Coverage 33.47% 34.18% +0.70%
- Complexity 795 851 +56
============================================
Files 110 116 +6
Lines 37533 38545 +1012
Branches 8215 8521 +306
============================================
+ Hits 12563 13175 +612
- Misses 22322 22606 +284
- Partials 2648 2764 +116 ☔ View full report in Codecov by Sentry. |
return UTF8String.fromBytes(result); | ||
} else { | ||
return UTF8String.fromString(convertToUuid(result).toString()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For test, I think if it is possible to create a Parquet file with uuid column in unit test and read it? Due to this change the column should be read as uuid instead of string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark doesn't support uuid data type. Can we create a table with uuid column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We created parquet files in makeParquetFileAllTypes
, for example. It uses parquet writer to directly write parquet files instead of using Dataset/DataFrame API. So you don't need to have uuid column in a table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for create a parquet file with UUID logical annotation.
Or, maybe we can have an iceberg-integration module in the comet project and include iceberg as a dep in that module. We can generate iceberg parquet files with UUID column in that module directly and then test there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked makeParquetFileAllTypes
, seems I can only use parquet type INT64
, INT32
, BOOLEAN
, BINARY
, FLOAT
, DOUBLE
, INT96
, FIXED_LEN_BYTE_ARRAY
. It doesn't seem I can use a UUID logical type in a Parquet schema. When I did my local test using iceberg, I was using iceberg's UUIDType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be something wrong with the UUID value. I will check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed the UUID data problem, but now I got illegalParquetTypeError
from Spark. I don't think Spark supports Parquet's UUID
. Iceberg maps UUID
to UTF8String
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.50.140 executor driver): org.apache.spark.sql.AnalysisException: Illegal Parquet type: FIXED_LEN_BYTE_ARRAY (UUID).
at org.apache.spark.sql.errors.QueryCompilationErrors$.illegalParquetTypeError(QueryCompilationErrors.scala:1762)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.illegalType$1(ParquetSchemaConverter.scala:206)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertPrimitiveField$2(ParquetSchemaConverter.scala:310)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:224)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:187)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3(ParquetSchemaConverter.scala:147)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3$adapted(ParquetSchemaConverter.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertInternal(ParquetSchemaConverter.scala:117)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:87)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readSchemaFromFooter$2(ParquetFileFormat.scala:493)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:493)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$2(ParquetFileFormat.scala:473)
at scala.collection.immutable.Stream.map(Stream.scala:418)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:473)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:464)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, okay, we still use Spark ParquetToSparkSchemaConverter to convert Parquet schema to Spark schema? I thought that we may have some custom one in Comet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It failed here. It's Spark code, not Comet code yet. That's why it uses Spark's ParquetToSparkSchemaConverter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failed one is Spark reads Parquet file using its data source. Can you try to read it with Comet? If Comet scan doesn't use ParquetToSparkSchemaConverter
, I think we won't hit the above error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks okay. Just wondering if it is possible to add a test.
typeAnnotation match { | ||
case _: DecimalLogicalTypeAnnotation => | ||
makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength)) | ||
case _: UUIDLogicalTypeAnnotation => StringType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the code are copied from Spark's ParquetToSparkSchemaConverter
except this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we adding this just to test? Or is this likely to be useful in other places?
Also, instead of copying, could we not just extend the Spark class and override convertField
? WE can then call our impl of convertPrimitiveField for UUID and let the parent implementation handle the rest?
We are likely to miss changes made in Spark if we make a copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason that I didn't override convertField
is that it returns ParquetColumn
, which is only exists in Spark3.3 and Spark3.4. Since we need to make Spark 3.2 work, so I made our own version of CometParquetColumn
and was trying to make it work for Spark 3.2 too. It actually took quite some effort to make CometParquetColumn
to work for all three version of Spark, because ParquetColumn
uses different methods for Spark3.3 (which uses parquet 1.12.x) from Spark 3.4 (which uses parquet 1.13.x).
I had an offline discussion with @viirya. We will merge this PR without a test for now. After iceberg integration is done, we can probably add some iceberg test with uuid.
I will remove the tests for now.
6d87d34
to
6e3cc14
Compare
Merged. Thanks @huaxingao @advancedxy @comphead @parthchandra |
Thanks, everyone! |
* fix uuid * address comments --------- Co-authored-by: Huaxin Gao <[email protected]> (cherry picked from commit 7b0a7e0)
Which issue does this PR close?
Closes #.
Rationale for this change
Supports UUID column. This is for Iceberg/Comet integration
What changes are included in this PR?
How are these changes tested?
This has been tested locally using iceberg