diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 419e7bb120f..4b0af7d6e81 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -165,23 +165,26 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size) } /** - * @brief Return the upper bound on the batch size for the JSON reader. + * @brief Return the batch size for the JSON reader. * - * The datasources passed to the JSON reader are split into batches demarcated by byte range - * offsets and read iteratively. The batch size is capped at INT_MAX bytes, which is the - * default value returned by the function. This value can be overridden at runtime using the - * environment variable LIBCUDF_JSON_BATCH_SIZE + * The datasources passed to the JSON reader are read iteratively in batches demarcated by byte + * range offsets. The tokenizer requires the JSON buffer read in each batch to be of size at most + * INT_MAX bytes. + * Since the byte range corresponding to a given batch can cause the last JSON line + * in the batch to be incomplete, the batch size returned by this function allows for an additional + * `max_subchunks_prealloced` subchunks to be allocated beyond the byte range offsets. Since the + * size of the subchunk depends on the size of the byte range, the batch size is variable and cannot + * be directly controlled by the user. As a workaround, the environment variable + * LIBCUDF_JSON_BATCH_SIZE can be used to set a fixed batch size at runtime. * * @return size in bytes */ -std::size_t get_batch_size_upper_bound() +std::size_t get_batch_size(std::size_t chunk_size) { - auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE"); - int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L; - auto const batch_limit = static_cast(std::numeric_limits::max()); - auto const batch_size_upper_bound = static_cast( - (batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit); - return batch_size_upper_bound; + auto const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + auto const batch_limit = static_cast(std::numeric_limits::max()) - + (max_subchunks_prealloced * size_per_subchunk); + return std::min(batch_limit, getenv_or("LIBCUDF_JSON_BATCH_SIZE", batch_limit)); } /** @@ -295,6 +298,10 @@ datasource::owning_buffer get_record_range_raw_input( } } + auto const batch_limit = static_cast(std::numeric_limits::max()); + CUDF_EXPECTS(static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < + batch_limit, + "The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes"); return datasource::owning_buffer( std::move(buffer), reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, @@ -365,17 +372,11 @@ table_with_metadata read_json_impl(host_span> source reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits::max(), "Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported"); - std::size_t chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? total_source_size - chunk_offset - : std::min(chunk_size, total_source_size - chunk_offset); - - std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - std::size_t const batch_size_upper_bound = get_batch_size_upper_bound(); - std::size_t const batch_size = - batch_size_upper_bound < (max_subchunks_prealloced * size_per_subchunk) - ? batch_size_upper_bound - : batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); + std::size_t chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); + std::size_t const batch_size = get_batch_size(chunk_size); /* * Identify the position (zero-indexed) of starting source file from which to begin @@ -490,11 +491,19 @@ table_with_metadata read_json_impl(host_span> source // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - for (std::size_t i = 1; i < batch_offsets.size() - 1; i++) { - batched_reader_opts.set_byte_range_offset(batch_offsets[i]); - batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); - partial_tables.emplace_back( - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + for (std::size_t batch_offset_pos = 1; batch_offset_pos < batch_offsets.size() - 1; + batch_offset_pos++) { + batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); + batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - + batch_offsets[batch_offset_pos]); + auto partial_table = + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); + if (partial_table.tbl->num_columns() == 0 && partial_table.tbl->num_rows() == 0) { + CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2, + "Only the partial table generated by the last batch can be empty"); + break; + } + partial_tables.emplace_back(std::move(partial_table)); } auto expects_schema_equality = diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 23ca5734ded..00f46975fdc 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -3461,4 +3461,46 @@ TEST_F(JsonReaderTest, MismatchedBeginEndTokens) EXPECT_THROW(cudf::io::read_json(opts), cudf::logic_error); } +/** + * @brief Base test fixture for JSON batched reader tests + */ +struct JsonBatchedReaderTest : public cudf::test::BaseFixture { + public: + void set_batch_size(size_t batch_size_upper_bound) + { + setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(batch_size_upper_bound).c_str(), 1); + } + + ~JsonBatchedReaderTest() { unsetenv("LIBCUDF_JSON_BATCH_SIZE"); } +}; + +TEST_F(JsonBatchedReaderTest, EmptyLastBatch) +{ + std::string data = R"( + {"a": "b"} + {"a": "b"} + {"a": "b"} + {"a": "b"} + )"; + size_t size_of_last_batch = 5; + // This test constructs two batches by setting the batch size such that the last batch is an + // incomplete line. The JSON string corresponding to the first batch is + // '\n{"a": "b"}\n{"a": "b"}\n{"a": "b"}\n{"a": ' + // The JSON string corresponding to the second batch is + // '"b"}\n' + this->set_batch_size(data.size() - size_of_last_batch); + auto opts = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .lines(true) + .build(); + auto result = cudf::io::read_json(opts); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 4); + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING); + EXPECT_EQ(result.metadata.schema_info[0].name, "a"); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + cudf::test::strings_column_wrapper{{"b", "b", "b", "b"}}); +} + CUDF_TEST_PROGRAM_MAIN()