From e83d9caeccdd55a386d425968a2b894f49065740 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 26 Jan 2022 16:34:22 -0700 Subject: [PATCH 01/13] Improve support for reading CSV and JSON floating-point values Signed-off-by: Andy Grove --- integration_tests/src/main/python/csv_test.py | 22 +++++----- .../src/main/python/json_test.py | 8 ++-- .../rapids/GpuTextBasedPartitionReader.scala | 41 ++++++++++++++++--- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index f10166fc078..4047e60941a 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -194,7 +194,7 @@ def read_impl(spark): @pytest.mark.parametrize('name,schema,options', [ ('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}), ('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}), - pytest.param('ts.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1091')), + pytest.param('ts.csv', _date_schema, {}), pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')), ('ts.csv', _ts_schema, {}), ('str.csv', _bad_str_schema, {'header': 'true'}), @@ -224,11 +224,11 @@ def read_impl(spark): pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}), pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), - pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125i, https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125, https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')), pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')), pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')) @@ -246,12 +246,10 @@ def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), StringGen('[aAbB ]{0,10}'), byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen, - DoubleGen(no_nans=True), # NaN, Inf, and -Inf are not supported - # Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124 - # are fixed we should not have to special case float values any more. - pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), - pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')), - pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), + DoubleGen(no_nans=False), + pytest.param(double_gen), + pytest.param(FloatGen(no_nans=False)), + pytest.param(float_gen), TimestampGen()] @approximate_float diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 436ea277786..454c71d8f34 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -26,11 +26,9 @@ StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), StringGen('[aAbB ]{0,10}'), byte_gen, short_gen, int_gen, long_gen, boolean_gen, - # Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124 - # are fixed we should not have to special case float values any more. - pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), - pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')), - pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), + pytest.param(double_gen), + pytest.param(FloatGen(no_nans=True)), + pytest.param(float_gen), DoubleGen(no_nans=True) ] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index d67b710382a..993cf73a77f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -17,19 +17,20 @@ package com.nvidia.spark.rapids import scala.math.max - -import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory - import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch +import scala.collection.mutable.ListBuffer + /** * The text based PartitionReader * @param conf the Hadoop configuration @@ -164,7 +165,18 @@ abstract class GpuTextBasedPartitionReader( } else { readDataSchema } - val cudfSchema = GpuColumnVector.from(dataSchema) + + val dataSchemaWithStrings = StructType(dataSchema.fields + .map(f => { + f.dataType match { + case DataTypes.FloatType | DataTypes.DoubleType => + f.copy(dataType = DataTypes.StringType) + case _ => + f + } + })) + val cudfSchema = GpuColumnVector.from(dataSchemaWithStrings) + // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME)) @@ -175,7 +187,24 @@ abstract class GpuTextBasedPartitionReader( } maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - handleResult(newReadDataSchema, table) + val t2 = withResource(table) { _ => + val columns = new ListBuffer[ColumnVector]() + val ansiEnabled = SQLConf.get.ansiEnabled + for (i <- 0 until table.getNumberOfColumns) { + val castColumn = dataSchema.fields(i).dataType match { + case DataTypes.FloatType => + GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) + case DataTypes.DoubleType => + GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64) + case _ => + table.getColumn(i) + } + columns += castColumn + } + new Table(columns: _*) + } + + handleResult(newReadDataSchema, t2) } } finally { dataBuffer.close() From 81b11556d51ddac4791a575cf30567309275abac Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 26 Jan 2022 16:46:22 -0700 Subject: [PATCH 02/13] scalastyle and add comments --- integration_tests/src/main/python/json_test.py | 4 ++-- .../spark/rapids/GpuTextBasedPartitionReader.scala | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 454c71d8f34..c7d16c42ea5 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -27,9 +27,9 @@ StringGen('[aAbB ]{0,10}'), byte_gen, short_gen, int_gen, long_gen, boolean_gen, pytest.param(double_gen), - pytest.param(FloatGen(no_nans=True)), + pytest.param(FloatGen(no_nans=False)), pytest.param(float_gen), - DoubleGen(no_nans=True) + DoubleGen(no_nans=False) ] _enable_all_types_conf = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 993cf73a77f..9a5b6d2998d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -16,11 +16,14 @@ package com.nvidia.spark.rapids +import scala.collection.mutable.ListBuffer import scala.math.max + import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory + import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException @@ -29,8 +32,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch -import scala.collection.mutable.ListBuffer - /** * The text based PartitionReader * @param conf the Hadoop configuration @@ -166,6 +167,7 @@ abstract class GpuTextBasedPartitionReader( readDataSchema } + // read floating-point columns as strings in cuDF val dataSchemaWithStrings = StructType(dataSchema.fields .map(f => { f.dataType match { @@ -187,7 +189,8 @@ abstract class GpuTextBasedPartitionReader( } maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - val t2 = withResource(table) { _ => + // parse floating-point columns that were read as strings + val castTable = withResource(table) { _ => val columns = new ListBuffer[ColumnVector]() val ansiEnabled = SQLConf.get.ansiEnabled for (i <- 0 until table.getNumberOfColumns) { @@ -204,7 +207,7 @@ abstract class GpuTextBasedPartitionReader( new Table(columns: _*) } - handleResult(newReadDataSchema, t2) + handleResult(newReadDataSchema, castTable) } } finally { dataBuffer.close() From aa45a265409fcb39939e6e432ec2df5c7d080945 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Jan 2022 08:42:23 -0700 Subject: [PATCH 03/13] Fix resource leak Signed-off-by: Andy Grove --- .../nvidia/spark/rapids/GpuTextBasedPartitionReader.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 9a5b6d2998d..621823f0427 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -200,11 +200,15 @@ abstract class GpuTextBasedPartitionReader( case DataTypes.DoubleType => GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64) case _ => - table.getColumn(i) + table.getColumn(i).incRefCount() } columns += castColumn } - new Table(columns: _*) + // Table increases the ref counts on the columns so we have + // to close them after creating the table + withResource(columns) { _ => + new Table(columns: _*) + } } handleResult(newReadDataSchema, castTable) From 45abe0878fb107fae5936823dd0c1e672bc2f9e4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Jan 2022 10:22:04 -0700 Subject: [PATCH 04/13] Add file reader tests for JSON --- integration_tests/src/main/python/csv_test.py | 2 + .../src/main/python/json_test.py | 40 ++++++++++++++++++- .../src/test/resources/nan_and_inf.json | 12 ++++++ 3 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 integration_tests/src/test/resources/nan_and_inf.json diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 4047e60941a..86e7db6a21b 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -245,6 +245,8 @@ def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled # This would require multiLine reads to work correctly so we avoid these chars StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), StringGen('[aAbB ]{0,10}'), + StringGen('[nN][aA][nN]'), + StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'), byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen, DoubleGen(no_nans=False), pytest.param(double_gen), diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index c7d16c42ea5..47e5179e222 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -25,6 +25,8 @@ # This would require multiLine reads to work correctly, so we avoid these chars StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), StringGen('[aAbB ]{0,10}'), + StringGen('[nN][aA][nN]'), + StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'), byte_gen, short_gen, int_gen, long_gen, boolean_gen, pytest.param(double_gen), pytest.param(FloatGen(no_nans=False)), @@ -36,6 +38,31 @@ 'spark.rapids.sql.format.json.enabled': 'true', 'spark.rapids.sql.format.json.read.enabled': 'true'} +_float_schema = StructType([ + StructField('number', FloatType())]) + +_double_schema = StructType([ + StructField('number', DoubleType())]) + +def read_json_df(data_path, schema, options = {}): + def read_impl(spark): + reader = spark.read + if not schema is None: + reader = reader.schema(schema) + for key, value in options.items(): + reader = reader.option(key, value) + return debug_df(reader.json(data_path)) + return read_impl + +def read_json_sql(data_path, schema, options = {}): + opts = options + if not schema is None: + opts = copy_and_update(options, {'schema': schema}) + def read_impl(spark): + spark.sql('DROP TABLE IF EXISTS `TMP_json_TABLE`') + return spark.catalog.createTable('TMP_json_TABLE', source='json', path=data_path, **opts) + return read_impl + @approximate_float @pytest.mark.parametrize('data_gen', [ StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), @@ -136,4 +163,15 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena .schema(schema)\ .option('timestampFormat', full_format)\ .json(data_path), - conf=updated_conf) \ No newline at end of file + conf=updated_conf) + +@pytest.mark.parametrize('filename', ['nan_and_inf.json']) +@pytest.mark.parametrize('schema', [_float_schema, _double_schema]) +@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) +@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) +def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers): + assert_gpu_and_cpu_are_equal_collect( + read_func(std_input_path + '/' + filename, + schema, + { "allowNonNumericNumbers": allow_non_numeric_numbers }), + conf=_enable_all_types_conf) diff --git a/integration_tests/src/test/resources/nan_and_inf.json b/integration_tests/src/test/resources/nan_and_inf.json new file mode 100644 index 00000000000..72a078c1f6d --- /dev/null +++ b/integration_tests/src/test/resources/nan_and_inf.json @@ -0,0 +1,12 @@ +{ "number": "NaN" } +{ "number": "NAN" } +{ "number": "nan" } +{ "number": "INF" } +{ "number": "+INF" } +{ "number": "-INF" } +{ "number": "Inf" } +{ "number": "+Inf" } +{ "number": "-Inf" } +{ "number": "Infinity" } +{ "number": "+Infinity" } +{ "number": "-Infinity" } \ No newline at end of file From 6ebaf4af9dea161f0ca702fd7d8fa474a8a3e9e8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Jan 2022 12:28:50 -0700 Subject: [PATCH 05/13] more tests --- docs/compatibility.md | 14 +++++++++++++- integration_tests/src/main/python/json_test.py | 8 +++++++- integration_tests/src/test/resources/floats.json | 5 +++++ .../src/test/resources/floats_edge_cases.json | 3 +++ .../src/test/resources/nan_and_inf.csv | 5 +++++ .../src/test/resources/nan_and_inf.json | 9 --------- .../src/test/resources/nan_and_inf_edge_cases.json | 12 ++++++++++++ 7 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 integration_tests/src/test/resources/floats.json create mode 100644 integration_tests/src/test/resources/floats_edge_cases.json create mode 100644 integration_tests/src/test/resources/nan_and_inf_edge_cases.json diff --git a/docs/compatibility.md b/docs/compatibility.md index 635eeff2762..ee1b89cb790 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -480,7 +480,19 @@ The nested types(array, map and struct) are not supported yet in current version ### JSON Floating Point -Like the CSV reader, the JSON reader has the same floating point issue. Please refer to [CSV Floating Point](#csv-floating-point) section. +The GPU JSON reader does not support `NaN` and `Inf` values with full compatibility with Spark. + +The following are the only formats that are parsed consistently between CPU and GPU. Any other variation, including +these formats when unquoted, will produce `null` on the CPU and may produce valid `NaN` and `Inf` results on the GPU. + +```json +{ "number": "NaN" } +{ "number": "Infinity" } +{ "number": "-Infinity" } +``` + +Another limitation of the GPU JSON reader is that it will parse numeric values in quotes where Spark will just +return `null`. ## LIKE diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 47e5179e222..23a75e4a2c5 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -165,7 +165,13 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena .json(data_path), conf=updated_conf) -@pytest.mark.parametrize('filename', ['nan_and_inf.json']) +@approximate_float +@pytest.mark.parametrize('filename', [ + 'nan_and_inf.json', + pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), + 'floats.json', + pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), +]) @pytest.mark.parametrize('schema', [_float_schema, _double_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) diff --git a/integration_tests/src/test/resources/floats.json b/integration_tests/src/test/resources/floats.json new file mode 100644 index 00000000000..b24a003c1ab --- /dev/null +++ b/integration_tests/src/test/resources/floats.json @@ -0,0 +1,5 @@ +{ "number": -3.141592 } +{ "number": 3.141592 } +{ "number": 0.0 } +{ "number": -0.0 } +{ "number": -3.4028234663852886e+38 } \ No newline at end of file diff --git a/integration_tests/src/test/resources/floats_edge_cases.json b/integration_tests/src/test/resources/floats_edge_cases.json new file mode 100644 index 00000000000..504d783418d --- /dev/null +++ b/integration_tests/src/test/resources/floats_edge_cases.json @@ -0,0 +1,3 @@ +{ "number": "-3.141592" } +{ "number": "3.141592" } +{ "number": "-3.4028234663852886e+38" } diff --git a/integration_tests/src/test/resources/nan_and_inf.csv b/integration_tests/src/test/resources/nan_and_inf.csv index b2f8f78e751..51e148e9d37 100644 --- a/integration_tests/src/test/resources/nan_and_inf.csv +++ b/integration_tests/src/test/resources/nan_and_inf.csv @@ -1,8 +1,13 @@ "number" NaN Inf ++Inf -Inf NAN nan INF ++INF -INF +Infinity ++Infinity +-Infinity diff --git a/integration_tests/src/test/resources/nan_and_inf.json b/integration_tests/src/test/resources/nan_and_inf.json index 72a078c1f6d..e4aab168de4 100644 --- a/integration_tests/src/test/resources/nan_and_inf.json +++ b/integration_tests/src/test/resources/nan_and_inf.json @@ -1,12 +1,3 @@ { "number": "NaN" } -{ "number": "NAN" } -{ "number": "nan" } -{ "number": "INF" } -{ "number": "+INF" } -{ "number": "-INF" } -{ "number": "Inf" } -{ "number": "+Inf" } -{ "number": "-Inf" } { "number": "Infinity" } -{ "number": "+Infinity" } { "number": "-Infinity" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/nan_and_inf_edge_cases.json b/integration_tests/src/test/resources/nan_and_inf_edge_cases.json new file mode 100644 index 00000000000..c27a2291626 --- /dev/null +++ b/integration_tests/src/test/resources/nan_and_inf_edge_cases.json @@ -0,0 +1,12 @@ +{ "number": "NAN" } +{ "number": "nan" } +{ "number": "INF" } +{ "number": "+INF" } +{ "number": "-INF" } +{ "number": INF } +{ "number": +INF } +{ "number": -INF } +{ "number": "Inf" } +{ "number": "+Inf" } +{ "number": "-Inf" } +{ "number": "+Infinity" } \ No newline at end of file From 6f35a331acc9ef63bcf9fad095a893e9eee60c60 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Jan 2022 15:00:54 -0700 Subject: [PATCH 06/13] docs update --- docs/compatibility.md | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index ee1b89cb790..833dbed5390 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -377,13 +377,6 @@ date. Typically, one that overflowed. ### CSV Floating Point -The CSV parser is not able to parse `NaN` values. These are -likely to be turned into null values, as described in this -[issue](https://github.com/NVIDIA/spark-rapids/issues/125). - -Some floating-point values also appear to overflow but do not for the CPU as described in this -[issue](https://github.com/NVIDIA/spark-rapids/issues/124). - Any number that overflows will not be turned into a null value. Also parsing of some values will not produce bit for bit identical results to what the CPU does. @@ -491,8 +484,8 @@ these formats when unquoted, will produce `null` on the CPU and may produce vali { "number": "-Infinity" } ``` -Another limitation of the GPU JSON reader is that it will parse numeric values in quotes where Spark will just -return `null`. +Another limitation of the GPU JSON reader is that it will parse strings containing floating-point values where +Spark will treat them as invalid inputs and will just return `null`. ## LIKE From 01bd6471cfa1ddb948ebdda7ce2e01f0798c24b9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Jan 2022 17:08:12 -0700 Subject: [PATCH 07/13] ansi mode tests and bug fix Signed-off-by: Andy Grove --- integration_tests/src/main/python/csv_test.py | 8 ++++++-- integration_tests/src/main/python/json_test.py | 6 ++++-- .../nvidia/spark/rapids/GpuTextBasedPartitionReader.scala | 4 ++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 86e7db6a21b..1c5e942368a 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -235,8 +235,12 @@ def read_impl(spark): ], ids=idfn) @pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql]) @pytest.mark.parametrize('v1_enabled_list', ["", "csv"]) -def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled_list): - updated_conf=copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +def test_basic_csv_read(std_input_path, name, schema, options, read_func, v1_enabled_list, ansi_enabled): + updated_conf=copy_and_update(_enable_all_types_conf, { + 'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.ansi.enabled': ansi_enabled + }) assert_gpu_and_cpu_are_equal_collect(read_func(std_input_path + '/' + name, schema, options), conf=updated_conf) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 23a75e4a2c5..3758442aa2f 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -175,9 +175,11 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena @pytest.mark.parametrize('schema', [_float_schema, _double_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) -def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers): +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, ansi_enabled): + updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled}) assert_gpu_and_cpu_are_equal_collect( read_func(std_input_path + '/' + filename, schema, { "allowNonNumericNumbers": allow_non_numeric_numbers }), - conf=_enable_all_types_conf) + conf=updated_conf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 621823f0427..d099af1715c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -28,7 +28,6 @@ import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -192,7 +191,8 @@ abstract class GpuTextBasedPartitionReader( // parse floating-point columns that were read as strings val castTable = withResource(table) { _ => val columns = new ListBuffer[ColumnVector]() - val ansiEnabled = SQLConf.get.ansiEnabled + // ansi mode does not apply to text inputs + val ansiEnabled = false for (i <- 0 until table.getNumberOfColumns) { val castColumn = dataSchema.fields(i).dataType match { case DataTypes.FloatType => From 8d1f9c2855cd0f6b625beb451c61eda8d721471a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 28 Jan 2022 08:45:31 -0700 Subject: [PATCH 08/13] add invalid float values so that ansi is covered correctly by the tests --- integration_tests/src/main/python/json_test.py | 1 + integration_tests/src/test/resources/floats_invalid.json | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 integration_tests/src/test/resources/floats_invalid.json diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 3758442aa2f..d2725b2f7b8 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -170,6 +170,7 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena 'nan_and_inf.json', pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), 'floats.json', + 'floats_invalid.json', pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), ]) @pytest.mark.parametrize('schema', [_float_schema, _double_schema]) diff --git a/integration_tests/src/test/resources/floats_invalid.json b/integration_tests/src/test/resources/floats_invalid.json new file mode 100644 index 00000000000..60b1845ebf1 --- /dev/null +++ b/integration_tests/src/test/resources/floats_invalid.json @@ -0,0 +1,3 @@ +{ "number": true } +{ "number": "not a float" } +{ "number": "" } \ No newline at end of file From 334a2fc91985c8edb871e51b87d96fc53f3bd988 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 28 Jan 2022 09:13:19 -0700 Subject: [PATCH 09/13] add invalid floats to csv_test --- integration_tests/src/main/python/csv_test.py | 2 ++ integration_tests/src/test/resources/floats_invalid.csv | 5 +++++ 2 files changed, 7 insertions(+) create mode 100644 integration_tests/src/test/resources/floats_invalid.csv diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 1c5e942368a..9421923eb7a 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -227,6 +227,8 @@ def read_impl(spark): pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}), pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), + pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}), + pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')), diff --git a/integration_tests/src/test/resources/floats_invalid.csv b/integration_tests/src/test/resources/floats_invalid.csv new file mode 100644 index 00000000000..ccfdaaf08ae --- /dev/null +++ b/integration_tests/src/test/resources/floats_invalid.csv @@ -0,0 +1,5 @@ +"number" +true +false +bad +"bad" \ No newline at end of file From 240ac198b120aefefe392eab729202545d9bdb58 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 28 Jan 2022 15:28:44 -0700 Subject: [PATCH 10/13] Remove Nan/Inf values from simple_float_values.csv and enable overflow tests. Also update compatibility guide. Signed-off-by: Andy Grove --- docs/compatibility.md | 4 +++- integration_tests/src/main/python/csv_test.py | 4 ++-- .../src/test/resources/simple_float_values.csv | 10 +--------- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 833dbed5390..89ee85c706a 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -377,7 +377,7 @@ date. Typically, one that overflowed. ### CSV Floating Point -Any number that overflows will not be turned into a null value. +Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float). Also parsing of some values will not produce bit for bit identical results to what the CPU does. They are within round-off errors except when they are close enough to overflow to Inf or -Inf which @@ -473,6 +473,8 @@ The nested types(array, map and struct) are not supported yet in current version ### JSON Floating Point +Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float). + The GPU JSON reader does not support `NaN` and `Inf` values with full compatibility with Spark. The following are the only formats that are parsed consistently between CPU and GPU. Any other variation, including diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 9421923eb7a..0aa1cb50b4d 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -229,8 +229,8 @@ def read_impl(spark): pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}), pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}), - pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')), pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')), pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')) diff --git a/integration_tests/src/test/resources/simple_float_values.csv b/integration_tests/src/test/resources/simple_float_values.csv index f7a20131283..1fdc6e048b5 100644 --- a/integration_tests/src/test/resources/simple_float_values.csv +++ b/integration_tests/src/test/resources/simple_float_values.csv @@ -16,12 +16,4 @@ bad 1.7976931348623157E308 1.7976931348623157e+308 1.7976931348623158E308 -1.2e-234 -NAN -nan -NaN -Inf --Inf -INF --INF - +1.2e-234 \ No newline at end of file From 001d549c576d104d0b1bcfc42093ea0ed6e04f17 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 31 Jan 2022 10:47:14 -0700 Subject: [PATCH 11/13] move withResource to enclose for loop --- .../rapids/GpuTextBasedPartitionReader.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index d099af1715c..515db04fc11 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -191,22 +191,22 @@ abstract class GpuTextBasedPartitionReader( // parse floating-point columns that were read as strings val castTable = withResource(table) { _ => val columns = new ListBuffer[ColumnVector]() - // ansi mode does not apply to text inputs - val ansiEnabled = false - for (i <- 0 until table.getNumberOfColumns) { - val castColumn = dataSchema.fields(i).dataType match { - case DataTypes.FloatType => - GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) - case DataTypes.DoubleType => - GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64) - case _ => - table.getColumn(i).incRefCount() - } - columns += castColumn - } // Table increases the ref counts on the columns so we have // to close them after creating the table withResource(columns) { _ => + // ansi mode does not apply to text inputs + val ansiEnabled = false + for (i <- 0 until table.getNumberOfColumns) { + val castColumn = dataSchema.fields(i).dataType match { + case DataTypes.FloatType => + GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) + case DataTypes.DoubleType => + GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64) + case _ => + table.getColumn(i).incRefCount() + } + columns += castColumn + } new Table(columns: _*) } } From 085a5cbed2886f7cc7461d64bcabe435974ef82f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 1 Feb 2022 08:38:14 -0700 Subject: [PATCH 12/13] fix test regression --- .../com/nvidia/spark/rapids/HashAggregatesSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala index 95ed3a5b2be..308ab374337 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,7 +59,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { conf: SparkConf = new SparkConf(), execsAllowedNonGpu: Seq[String] = Seq.empty, batchSize: Int = 0, - repart: Int = 1) + repart: Int = 1, + maxFloatDiff: Double = 0.0) (fn: DataFrame => DataFrame) { if (batchSize > 0) { makeBatchedBytes(batchSize, conf) @@ -69,7 +70,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual(testName, df, conf = conf, repart = repart, execsAllowedNonGpu = execsAllowedNonGpu, - incompat = true, sort = true)(fn) + incompat = true, sort = true, maxFloatDiff = maxFloatDiff)(fn) } def firstDf(spark: SparkSession): DataFrame = { @@ -637,6 +638,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by doubles", doubleCsvDf, + maxFloatDiff = 0.0001, conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("doubles").agg( lit(456f), @@ -653,6 +655,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by more_doubles", doubleCsvDf, + maxFloatDiff = 0.0001, conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("more_doubles").agg( lit(456f), From 5d70ced2167db81400fc96f0ea02f99ebf28ca47 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 1 Feb 2022 12:53:43 -0700 Subject: [PATCH 13/13] change maxFloatDiff to 1e-6 --- .../scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala index 308ab374337..05104801314 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala @@ -638,7 +638,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by doubles", doubleCsvDf, - maxFloatDiff = 0.0001, + maxFloatDiff = 0.000001, conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("doubles").agg( lit(456f), @@ -655,7 +655,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by more_doubles", doubleCsvDf, - maxFloatDiff = 0.0001, + maxFloatDiff = 0.000001, conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("more_doubles").agg( lit(456f),