From 92eb96228aca430bda11b64aff689d8b2fd85cf4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 24 Jan 2024 09:20:23 -0700 Subject: [PATCH 1/8] repro integration test --- integration_tests/src/main/python/json_test.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index ac76139111d..8276cff7db6 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -822,6 +822,17 @@ def test_from_json_struct_of_list(schema): .select(f.from_json('a', schema)), conf={"spark.rapids.sql.expression.JsonToStructs": True}) +@pytest.mark.parametrize('schema', [ + 'struct' +]) +@allow_non_gpu(*non_utc_allow) +def test_from_json_mixed_types_list_struct(schema): + json_string_gen = StringGen(r'{"a": (\[1,2,3\]|{"b": "[a-z]{2}"}) }') + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select('a', f.from_json('a', schema)), + conf={"spark.rapids.sql.expression.JsonToStructs": True}) + @pytest.mark.parametrize('schema', ['struct', 'struct']) @allow_non_gpu(*non_utc_allow) def test_from_json_struct_all_empty_string_input(schema): From ba154f733e86794767c7510a100cd9c0ccad1525 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 24 Jan 2024 09:31:10 -0700 Subject: [PATCH 2/8] enable mixed types --- integration_tests/src/main/python/json_test.py | 2 +- .../scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 8276cff7db6..e3ccdfde3b7 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -827,7 +827,7 @@ def test_from_json_struct_of_list(schema): ]) @allow_non_gpu(*non_utc_allow) def test_from_json_mixed_types_list_struct(schema): - json_string_gen = StringGen(r'{"a": (\[1,2,3\]|{"b": "[a-z]{2}"}) }') + json_string_gen = StringGen(r'{"a": (\[1,2,3\]|{"b":"[a-z]{2}"}) }') assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, json_string_gen) \ .select('a', f.from_json('a', schema)), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index 1e3f232c3ab..867006e9823 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -177,9 +177,7 @@ case class GpuJsonToStructs( val jsonOptions = cudf.JSONOptions.builder() .withRecoverWithNull(true) - // tracking issue for enabling mixed type as string - // https://github.com/NVIDIA/spark-rapids/issues/10253 - .withMixedTypesAsStrings(false) + .withMixedTypesAsStrings(true) .build() withResource(cudf.Table.readJSON(jsonOptions, data, start, length)) { tableWithMeta => val names = tableWithMeta.getColumnNames From f587f2b251033e2f6c9ea7f21e27d65c9cf60bc6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 25 Jan 2024 11:51:17 -0700 Subject: [PATCH 3/8] Create config for enabling mixedTypesAsString --- docs/compatibility.md | 6 ++++++ integration_tests/src/main/python/json_test.py | 4 +++- .../com/nvidia/spark/rapids/GpuOverrides.scala | 6 ++++-- .../com/nvidia/spark/rapids/RapidsConf.scala | 8 ++++++++ .../sql/catalyst/json/rapids/GpuJsonScan.scala | 16 ++++++++++------ .../json/rapids/GpuReadJsonFileFormat.scala | 3 ++- .../spark/sql/rapids/GpuJsonToStructs.scala | 3 ++- 7 files changed, 35 insertions(+), 11 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 2644c873e98..735c27d840d 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -378,6 +378,12 @@ In particular, the output map is not resulted from a regular JSON parsing but in * If the input JSON is given as multiple rows, any row containing invalid JSON format will be parsed as an empty struct instead of a null value ([#9592](https://github.com/NVIDIA/spark-rapids/issues/9592)). +When a JSON attribute contains mixed types (different types in different rows), such as a mix of dictionaries +and lists, Spark will return a string representation of the JSON, but when running on GPU, the default +behavior is to throw an exception. There is an experimental setting +`spark.rapids.sql.json.read.mixedTypesAsString.enabled` that can be set to true to support reading +mixed types as string, but there are known issues where it could also read structs as string in some cases. + ### `to_json` function The `to_json` function is disabled by default because it is experimental and has some known incompatibilities diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index e3ccdfde3b7..49a0c11882c 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -380,7 +380,9 @@ def test_read_invalid_json(spark_tmp_table_factory, std_input_path, read_func, f @pytest.mark.parametrize('schema', [_int_schema]) @pytest.mark.parametrize('v1_enabled_list', ["", "json"]) def test_read_valid_json(spark_tmp_table_factory, std_input_path, read_func, filename, schema, v1_enabled_list): - conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True}) assert_gpu_and_cpu_are_equal_collect( read_func(std_input_path + '/' + filename, schema, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index b330eb5b52d..dbeb7a41461 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3662,7 +3662,8 @@ object GpuOverrides extends Logging { override def convertToGpu(child: Expression): GpuExpression = // GPU implementation currently does not support duplicated json key names in input - GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId) + GpuJsonToStructs(a.schema, a.options, child, conf.isJsonMixedTypesAsStringEnabled, + a.timeZoneId) }).disabledByDefault("parsing JSON from a column has a large number of issues and " + "should be considered beta quality right now."), expr[StructsToJson]( @@ -3850,7 +3851,8 @@ object GpuOverrides extends Logging { a.dataFilters, conf.maxReadBatchSizeRows, conf.maxReadBatchSizeBytes, - conf.maxGpuColumnSizeBytes) + conf.maxGpuColumnSizeBytes, + conf.isJsonMixedTypesAsStringEnabled) })).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap val scans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 4857bde2ac0..cac7b5b13c9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1197,6 +1197,12 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(false) + val ENABLE_READ_JSON_MIXED_TYPES_AS_STRING = + conf("spark.rapids.sql.json.read.mixedTypesAsString.enabled") + .doc("JSON reading is not 100% compatible when reading mixed types as string.") + .booleanConf + .createWithDefault(false) + val ENABLE_AVRO = conf("spark.rapids.sql.format.avro.enabled") .doc("When set to true enables all avro input and output acceleration. " + "(only input is currently supported anyways)") @@ -2621,6 +2627,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isJsonDecimalReadEnabled: Boolean = get(ENABLE_READ_JSON_DECIMALS) + lazy val isJsonMixedTypesAsStringEnabled: Boolean = get(ENABLE_READ_JSON_MIXED_TYPES_AS_STRING) + lazy val isAvroEnabled: Boolean = get(ENABLE_AVRO) lazy val isAvroReadEnabled: Boolean = get(ENABLE_AVRO_READ) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 53f86e3d75e..138f99b0c72 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -249,7 +249,8 @@ case class GpuJsonScan( dataFilters: Seq[Expression], maxReaderBatchSizeRows: Integer, maxReaderBatchSizeBytes: Long, - maxGpuColumnSizeBytes: Long) + maxGpuColumnSizeBytes: Long, + mixedTypesAsStringEnabled: Boolean) extends TextBasedFileScan(sparkSession, options) with GpuScan { private lazy val parsedOptions: JSONOptions = new JSONOptions( @@ -272,7 +273,8 @@ case class GpuJsonScan( GpuJsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, parsedOptions, maxReaderBatchSizeRows, - maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap) + maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap, + mixedTypesAsStringEnabled) } override def withInputFile(): GpuScan = this @@ -290,7 +292,8 @@ case class GpuJsonPartitionReaderFactory( maxReaderBatchSizeBytes: Long, maxGpuColumnSizeBytes: Long, metrics: Map[String, GpuMetric], - @transient params: Map[String, String]) extends ShimFilePartitionReaderFactory(params) { + @transient params: Map[String, String], + mixedTypesAsStringEnabled: Boolean) extends ShimFilePartitionReaderFactory(params) { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { throw new IllegalStateException("ROW BASED PARSING IS NOT SUPPORTED ON THE GPU...") @@ -300,7 +303,7 @@ case class GpuJsonPartitionReaderFactory( val conf = broadcastedConf.value.value val reader = new PartitionReaderWithBytesRead(new JsonPartitionReader(conf, partFile, dataSchema, readDataSchema, parsedOptions, maxReaderBatchSizeRows, maxReaderBatchSizeBytes, - metrics)) + metrics, mixedTypesAsStringEnabled)) ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema, maxGpuColumnSizeBytes) } @@ -346,7 +349,8 @@ class JsonPartitionReader( parsedOptions: JSONOptions, maxRowsPerChunk: Integer, maxBytesPerChunk: Long, - execMetrics: Map[String, GpuMetric]) + execMetrics: Map[String, GpuMetric], + enableMixedTypesAsString: Boolean) extends GpuTextBasedPartitionReader[HostLineBufferer, HostLineBuffererFactory.type](conf, partFile, dataSchema, readDataSchema, parsedOptions.lineSeparatorInRead, maxRowsPerChunk, maxBytesPerChunk, execMetrics, HostLineBuffererFactory) { @@ -354,7 +358,7 @@ class JsonPartitionReader( def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions = { cudf.JSONOptions.builder() .withRecoverWithNull(true) - .withMixedTypesAsStrings(true) + .withMixedTypesAsStrings(enableMixedTypesAsString) .build } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala index 464d0a7cb15..f182e5d8d88 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala @@ -67,7 +67,8 @@ class GpuReadJsonFileFormat extends JsonFileFormat with GpuReadFileFormatWithMet rapidsConf.maxReadBatchSizeBytes, rapidsConf.maxGpuColumnSizeBytes, metrics, - options) + options, + rapidsConf.isJsonMixedTypesAsStringEnabled) PartitionReaderIterator.buildReader(factory) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index 867006e9823..a5cef4c7fbb 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -33,6 +33,7 @@ case class GpuJsonToStructs( schema: DataType, options: Map[String, String], child: Expression, + enableMixedTypesAsString: Boolean, timeZoneId: Option[String] = None) extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes with NullIntolerant { @@ -177,7 +178,7 @@ case class GpuJsonToStructs( val jsonOptions = cudf.JSONOptions.builder() .withRecoverWithNull(true) - .withMixedTypesAsStrings(true) + .withMixedTypesAsStrings(enableMixedTypesAsString) .build() withResource(cudf.Table.readJSON(jsonOptions, data, start, length)) { tableWithMeta => val names = tableWithMeta.getColumnNames From 05849d0928a4cc07c8e1465cb6f4262f89306e22 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 25 Jan 2024 11:57:02 -0700 Subject: [PATCH 4/8] signoff Signed-off-by: Andy Grove From c00f8efc6838dc130e093d5b04f5157e5677bb20 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 25 Jan 2024 12:01:17 -0700 Subject: [PATCH 5/8] update docs --- docs/compatibility.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 735c27d840d..36da8800212 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -382,7 +382,9 @@ When a JSON attribute contains mixed types (different types in different rows), and lists, Spark will return a string representation of the JSON, but when running on GPU, the default behavior is to throw an exception. There is an experimental setting `spark.rapids.sql.json.read.mixedTypesAsString.enabled` that can be set to true to support reading -mixed types as string, but there are known issues where it could also read structs as string in some cases. +mixed types as string, but there are known issues where it could also read structs as string in some cases. There +can also be minor formatting differences. Spark will return a parsed and formatted representation, but the +GPU implementation returns the unparsed JSON string. ### `to_json` function From 0b97ee2008b93d8d4475c278e1c298844bead51d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 25 Jan 2024 12:09:56 -0700 Subject: [PATCH 6/8] update a test --- integration_tests/src/main/python/json_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 49a0c11882c..54605455353 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -833,7 +833,8 @@ def test_from_json_mixed_types_list_struct(schema): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, json_string_gen) \ .select('a', f.from_json('a', schema)), - conf={"spark.rapids.sql.expression.JsonToStructs": True}) + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True}) @pytest.mark.parametrize('schema', ['struct', 'struct']) @allow_non_gpu(*non_utc_allow) From b2d9dc92a1803ce33149d00bae3a93a03a8cd94b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 25 Jan 2024 12:38:50 -0700 Subject: [PATCH 7/8] generated config docs --- docs/additional-functionality/advanced_configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 3644b09951e..da04b2ddee5 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -128,6 +128,7 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.json.read.decimal.enabled|JSON reading is not 100% compatible when reading decimals.|false|Runtime spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime +spark.rapids.sql.json.read.mixedTypesAsString.enabled|JSON reading is not 100% compatible when reading mixed types as string.|false|Runtime spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime From a7eedc374d1af4d69f33b1d0418fd5373280256b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 26 Jan 2024 09:55:12 -0700 Subject: [PATCH 8/8] copyright years --- .../spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala | 2 +- .../scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala index f182e5d8d88..3cec8943cf6 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index a5cef4c7fbb..a6dcb9d8edf 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.