Skip to content

Commit

Permalink
Merge pull request #17910 from rapidsai/branch-25.02
Browse files Browse the repository at this point in the history
Forward-merge branch-25.02 into branch-25.04
  • Loading branch information
GPUtester authored Feb 4, 2025
2 parents 81c383c + 8b89ea0 commit 19dea56
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 29 deletions.
65 changes: 37 additions & 28 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,26 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size)
}

/**
* @brief Return the upper bound on the batch size for the JSON reader.
* @brief Return the batch size for the JSON reader.
*
* The datasources passed to the JSON reader are split into batches demarcated by byte range
* offsets and read iteratively. The batch size is capped at INT_MAX bytes, which is the
* default value returned by the function. This value can be overridden at runtime using the
* environment variable LIBCUDF_JSON_BATCH_SIZE
* The datasources passed to the JSON reader are read iteratively in batches demarcated by byte
* range offsets. The tokenizer requires the JSON buffer read in each batch to be of size at most
* INT_MAX bytes.
* Since the byte range corresponding to a given batch can cause the last JSON line
* in the batch to be incomplete, the batch size returned by this function allows for an additional
* `max_subchunks_prealloced` subchunks to be allocated beyond the byte range offsets. Since the
* size of the subchunk depends on the size of the byte range, the batch size is variable and cannot
* be directly controlled by the user. As a workaround, the environment variable
* LIBCUDF_JSON_BATCH_SIZE can be used to set a fixed batch size at runtime.
*
* @return size in bytes
*/
std::size_t get_batch_size_upper_bound()
std::size_t get_batch_size(std::size_t chunk_size)
{
auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE");
int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L;
auto const batch_limit = static_cast<int64_t>(std::numeric_limits<int32_t>::max());
auto const batch_size_upper_bound = static_cast<std::size_t>(
(batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit);
return batch_size_upper_bound;
auto const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
auto const batch_limit = static_cast<std::size_t>(std::numeric_limits<int32_t>::max()) -
(max_subchunks_prealloced * size_per_subchunk);
return std::min(batch_limit, getenv_or<std::size_t>("LIBCUDF_JSON_BATCH_SIZE", batch_limit));
}

/**
Expand Down Expand Up @@ -295,6 +298,10 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
}
}

auto const batch_limit = static_cast<size_t>(std::numeric_limits<int32_t>::max());
CUDF_EXPECTS(static_cast<size_t>(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) <
batch_limit,
"The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes");
return datasource::owning_buffer<rmm::device_buffer>(
std::move(buffer),
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
Expand Down Expand Up @@ -365,17 +372,11 @@ table_with_metadata read_json_impl(host_span<std::unique_ptr<datasource>> source
reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits<int32_t>::max(),
"Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported");

std::size_t chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);

std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
std::size_t const batch_size_upper_bound = get_batch_size_upper_bound();
std::size_t const batch_size =
batch_size_upper_bound < (max_subchunks_prealloced * size_per_subchunk)
? batch_size_upper_bound
: batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk);
std::size_t chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);
std::size_t const batch_size = get_batch_size(chunk_size);

/*
* Identify the position (zero-indexed) of starting source file from which to begin
Expand Down Expand Up @@ -490,11 +491,19 @@ table_with_metadata read_json_impl(host_span<std::unique_ptr<datasource>> source
// Dispatch individual batches to read_batch and push the resulting table into
// partial_tables array. Note that the reader options need to be updated for each
// batch to adjust byte range offset and byte range size.
for (std::size_t i = 1; i < batch_offsets.size() - 1; i++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[i]);
batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]);
partial_tables.emplace_back(
read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()));
for (std::size_t batch_offset_pos = 1; batch_offset_pos < batch_offsets.size() - 1;
batch_offset_pos++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]);
batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] -
batch_offsets[batch_offset_pos]);
auto partial_table =
read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref());
if (partial_table.tbl->num_columns() == 0 && partial_table.tbl->num_rows() == 0) {
CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2,
"Only the partial table generated by the last batch can be empty");
break;
}
partial_tables.emplace_back(std::move(partial_table));
}

auto expects_schema_equality =
Expand Down
44 changes: 43 additions & 1 deletion cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -3461,4 +3461,46 @@ TEST_F(JsonReaderTest, MismatchedBeginEndTokens)
EXPECT_THROW(cudf::io::read_json(opts), cudf::logic_error);
}

/**
* @brief Base test fixture for JSON batched reader tests
*/
struct JsonBatchedReaderTest : public cudf::test::BaseFixture {
public:
void set_batch_size(size_t batch_size_upper_bound)
{
setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(batch_size_upper_bound).c_str(), 1);
}

~JsonBatchedReaderTest() { unsetenv("LIBCUDF_JSON_BATCH_SIZE"); }
};

TEST_F(JsonBatchedReaderTest, EmptyLastBatch)
{
std::string data = R"(
{"a": "b"}
{"a": "b"}
{"a": "b"}
{"a": "b"}
)";
size_t size_of_last_batch = 5;
// This test constructs two batches by setting the batch size such that the last batch is an
// incomplete line. The JSON string corresponding to the first batch is
// '\n{"a": "b"}\n{"a": "b"}\n{"a": "b"}\n{"a": '
// The JSON string corresponding to the second batch is
// '"b"}\n'
this->set_batch_size(data.size() - size_of_last_batch);
auto opts =
cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()})
.lines(true)
.build();
auto result = cudf::io::read_json(opts);

EXPECT_EQ(result.tbl->num_columns(), 1);
EXPECT_EQ(result.tbl->num_rows(), 4);
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING);
EXPECT_EQ(result.metadata.schema_info[0].name, "a");
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0),
cudf::test::strings_column_wrapper{{"b", "b", "b", "b"}});
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit 19dea56

Please sign in to comment.