Skip to content

Commit 69fc6aa

Browse files
committed
Fix a row index entry error in ORC writer issue (rapidsai#10989)
Issue rapidsai#10755 Fixes an issue in protobuf writer where the length on the row index entry was being written into a single byte. This would cause errors when the size is larger than 127. The issue was uncovered when row group statistics were added. String statistics contain copies to min/max strings, so the size is unbounded. This PR changes the protobuf writer to write the entry size as a generic uint, allowing larger entries. Also fixed `start_row` in row group info array in the reader (unrelated). Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu) - David Wendt (https://github.com/davidwendt) - GALI PREM SAGAR (https://github.com/galipremsagar) URL: rapidsai#10989
1 parent d0b4e30 commit 69fc6aa

File tree

4 files changed

+60
-27
lines changed

4 files changed

+60
-27
lines changed

cpp/src/io/orc/orc.cpp

+32-24
Original file line numberDiff line numberDiff line change
@@ -212,51 +212,59 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk,
212212
TypeKind kind,
213213
ColStatsBlob const* stats)
214214
{
215-
size_t sz = 0, lpos;
216-
put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:RowIndex.entry
217-
lpos = m_buf->size();
218-
put_byte(0xcd); // sz+2
219-
put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true]
220-
put_byte(0xcd); // sz
221-
if (present_blk >= 0) sz += put_uint(present_blk);
215+
std::vector<uint8_t> positions_data;
216+
ProtobufWriter position_writer(&positions_data);
217+
auto const positions_size_offset = position_writer.put_uint(
218+
encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true]
219+
position_writer.put_byte(0xcd); // positions size placeholder
220+
uint32_t positions_size = 0;
221+
if (present_blk >= 0) positions_size += position_writer.put_uint(present_blk);
222222
if (present_ofs >= 0) {
223-
sz += put_uint(present_ofs);
224-
sz += put_byte(0); // run pos = 0
225-
sz += put_byte(0); // bit pos = 0
223+
positions_size += position_writer.put_uint(present_ofs);
224+
positions_size += position_writer.put_byte(0); // run pos = 0
225+
positions_size += position_writer.put_byte(0); // bit pos = 0
226226
}
227-
if (data_blk >= 0) { sz += put_uint(data_blk); }
227+
if (data_blk >= 0) { positions_size += position_writer.put_uint(data_blk); }
228228
if (data_ofs >= 0) {
229-
sz += put_uint(data_ofs);
229+
positions_size += position_writer.put_uint(data_ofs);
230230
if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) {
231231
// RLE run pos always zero (assumes RLE aligned with row index boundaries)
232-
sz += put_byte(0);
232+
positions_size += position_writer.put_byte(0);
233233
if (kind == BOOLEAN) {
234234
// bit position in byte, always zero
235-
sz += put_byte(0);
235+
positions_size += position_writer.put_byte(0);
236236
}
237237
}
238238
}
239239
// INT kind can be passed in to bypass 2nd stream index (dictionary length streams)
240240
if (kind != INT) {
241-
if (data2_blk >= 0) { sz += put_uint(data2_blk); }
241+
if (data2_blk >= 0) { positions_size += position_writer.put_uint(data2_blk); }
242242
if (data2_ofs >= 0) {
243-
sz += put_uint(data2_ofs);
243+
positions_size += position_writer.put_uint(data2_ofs);
244244
// RLE run pos always zero (assumes RLE aligned with row index boundaries)
245-
sz += put_byte(0);
245+
positions_size += position_writer.put_byte(0);
246246
}
247247
}
248248
// size of the field 1
249-
m_buf->data()[lpos + 2] = (uint8_t)(sz);
249+
positions_data[positions_size_offset] = static_cast<uint8_t>(positions_size);
250+
251+
auto const stats_size = (stats == nullptr)
252+
? 0
253+
: varint_size(encode_field_number<decltype(*stats)>(2)) +
254+
varint_size(stats->size()) + stats->size();
255+
auto const entry_size = positions_data.size() + stats_size;
256+
257+
// 1:RowIndex.entry
258+
put_uint(encode_field_number(1, ProtofType::FIXEDLEN));
259+
put_uint(entry_size);
260+
put_bytes<uint8_t>(positions_data);
250261

251262
if (stats != nullptr) {
252-
sz += put_uint(encode_field_number<decltype(*stats)>(2)); // 2: statistics
263+
put_uint(encode_field_number<decltype(*stats)>(2)); // 2: statistics
253264
// Statistics field contains its length as varint and dtype specific data (encoded on the GPU)
254-
sz += put_uint(stats->size());
255-
sz += put_bytes<typename ColStatsBlob::value_type>(*stats);
265+
put_uint(stats->size());
266+
put_bytes<typename ColStatsBlob::value_type>(*stats);
256267
}
257-
258-
// size of the whole row index entry
259-
m_buf->data()[lpos] = (uint8_t)(sz + 2);
260268
}
261269

262270
size_t ProtobufWriter::write(const PostScript& s)

cpp/src/io/orc/orc.hpp

+11
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,17 @@ class ProtobufWriter {
495495
put_byte(static_cast<uint8_t>(v));
496496
return l;
497497
}
498+
499+
uint32_t varint_size(uint64_t val)
500+
{
501+
auto len = 1u;
502+
while (val > 0x7f) {
503+
val >>= 7;
504+
++len;
505+
}
506+
return len;
507+
}
508+
498509
uint32_t put_int(int64_t v)
499510
{
500511
int64_t s = (v < 0);

cpp/src/io/orc/stripe_init.cu

+3-3
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ enum row_entry_state_e {
239239
* @return bytes consumed
240240
*/
241241
static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s,
242-
const uint8_t* start,
243-
const uint8_t* end)
242+
uint8_t const* const start,
243+
uint8_t const* const end)
244244
{
245245
constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8;
246246

@@ -471,7 +471,7 @@ __global__ void __launch_bounds__(128, 8) gpuParseRowGroupIndex(RowGroup* row_gr
471471
: row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows;
472472
auto const start_row =
473473
(use_base_stride)
474-
? rowidx_stride
474+
? i * rowidx_stride
475475
: row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row;
476476
for (int j = t4; j < rowgroup_size4; j += 4) {
477477
((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] =

python/cudf/cudf/tests/test_orc.py

+14
Original file line numberDiff line numberDiff line change
@@ -1729,3 +1729,17 @@ def test_orc_reader_zstd_compression(list_struct_buff):
17291729
assert_eq(expected, got)
17301730
except RuntimeError:
17311731
pytest.mark.xfail(reason="zstd support is not enabled")
1732+
1733+
1734+
def test_writer_protobuf_large_rowindexentry():
1735+
s = [
1736+
"Length of the two strings needs to add up to at least ~120",
1737+
"So that the encoded statistics are larger than 128 bytes",
1738+
] * 5001 # generate more than 10K rows to have two row groups
1739+
df = cudf.DataFrame({"s1": s})
1740+
1741+
buff = BytesIO()
1742+
df.to_orc(buff)
1743+
1744+
got = cudf.read_orc(buff)
1745+
assert_frame_equal(df, got)

0 commit comments

Comments
 (0)