Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add to_parquet_row_groups #2979

Merged
merged 23 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ebd11a4
Set up, but not writing in batches correctly.
zbilodea Jan 22, 2024
8686679
style: pre-commit fixes
pre-commit-ci[bot] Jan 22, 2024
6596f13
with pre-commit...
zbilodea Jan 22, 2024
87cf382
Writes correct row groups, don't think it's properly iterative yet
zbilodea Jan 23, 2024
2ce2d38
Merge branch 'feat-add-to-parquet-row-groups' of https://github.com/s…
zbilodea Jan 23, 2024
5b2de81
test with HZZ file works as expected.
zbilodea Jan 24, 2024
73a59a7
style: pre-commit fixes
pre-commit-ci[bot] Jan 24, 2024
2b5d079
typo...
zbilodea Jan 24, 2024
84caa70
added importerskip for uproot
zbilodea Jan 24, 2024
e3f3f85
style: pre-commit fixes
pre-commit-ci[bot] Jan 24, 2024
2ee1409
Change record.Record to .Record
zbilodea Jan 24, 2024
5c5f42d
Merge branch 'main' into feat-add-to-parquet-row-groups
zbilodea Jan 24, 2024
51ecd0b
add to docs
zbilodea Jan 24, 2024
fcbed3d
adding test
zbilodea Jan 24, 2024
2ba3f6b
corrected tests
zbilodea Jan 26, 2024
a1e139b
Merge branch 'main' into feat-add-to-parquet-row-groups
zbilodea Jan 26, 2024
d37bdb1
add importerskip?
zbilodea Jan 26, 2024
631025d
style: pre-commit fixes
pre-commit-ci[bot] Jan 26, 2024
1c3aaaf
removed batch-iterator function, layouts should only be created for o…
zbilodea Jan 26, 2024
f3dba30
Merge branch 'feat-add-to-parquet-row-groups' of https://github.com/s…
zbilodea Jan 26, 2024
6fae650
removed batch-iterator function, layouts should only be created for o…
zbilodea Jan 26, 2024
1da5b07
refactored implementation and made the tests more like other tests
jpivarski Jan 29, 2024
14087c8
Merge branch 'main' into feat-add-to-parquet-row-groups
zbilodea Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 58 additions & 56 deletions src/awkward/operations/ak_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _impl(
parquet_byte_stream_split,
parquet_coerce_timestamps,
parquet_old_int96_timestamps,
parquet_compliant_nested, # https://issues.apache.org/jira/browse/ARROW-16348
parquet_compliant_nested,
parquet_extra_options,
storage_options,
write_iteratively,
Expand All @@ -242,9 +242,9 @@ def _impl(
pyarrow_parquet = awkward._connect.pyarrow.import_pyarrow_parquet("ak.to_parquet")
fsspec = awkward._connect.pyarrow.import_fsspec("ak.to_parquet")

if not write_iteratively:
def get_layout_and_table(x):
layout = ak.operations.ak_to_layout._impl(
data,
x,
allow_record=True,
allow_unknown=False,
none_policy="error",
Expand All @@ -253,7 +253,6 @@ def _impl(
primitive_policy="error",
string_policy="as-characters",
)

table = ak.operations.ak_to_arrow_table._impl(
layout,
list_to32,
Expand All @@ -264,27 +263,44 @@ def _impl(
extensionarray,
count_nulls,
)
return layout, table

if not write_iteratively:
layout, table = get_layout_and_table(data)

else:
layout = ak.operations.ak_to_layout._impl(
next(data),
allow_record=True,
allow_unknown=False,
none_policy="error",
regulararray=True,
use_from_iter=True,
primitive_policy="error",
string_policy="as-characters",
)
table = ak.operations.ak_to_arrow_table._impl(
layout,
list_to32,
string_to32,
bytestring_to32,
emptyarray_to,
categorical_as_dictionary,
extensionarray,
count_nulls,
)
if (
isinstance(
data,
(
ak.highlevel.Array,
ak.highlevel.Record,
ak.highlevel.ArrayBuilder,
ak.contents.Content,
ak.record.Record,
),
)
or hasattr(data, "dtype")
or hasattr(data, "shape")
):
raise TypeError(
"The first argument of ak.to_parquet_row_groups should be an "
"iterable collection of arrays to put into separate row groups, "
"not a single array. If this was intended, wrap the first argument "
"with the `iter` function."
)

data = iter(data) # necessary for iterables; no-op for iterators

try:
first_array = next(data)
except StopIteration:
raise ValueError(
"The first argument of ak.to_parquet_row_groups must contain at least one array."
) from None

# if `write_iteratively`, then `layout` and `table` represent the FIRST, not all
layout, table = get_layout_and_table(first_array)

if parquet_compliant_nested:
list_indicator = "list.element"
Expand Down Expand Up @@ -415,38 +431,24 @@ def parquet_columns(specifier, only=None):
metadata_collector=metalist,
**parquet_extra_options,
) as writer:
if write_iteratively:
writer.write_table(table, row_group_size=row_group_size)
try:
while True:
try:
layout = ak.operations.ak_to_layout._impl(
next(data),
allow_record=True,
allow_unknown=False,
none_policy="error",
regulararray=True,
use_from_iter=True,
primitive_policy="error",
string_policy="as-characters",
)
table = ak.operations.ak_to_arrow_table._impl(
layout,
list_to32,
string_to32,
bytestring_to32,
emptyarray_to,
categorical_as_dictionary,
extensionarray,
count_nulls,
)
writer.write_table(table, row_group_size=row_group_size)
except StopIteration:
break
finally:
writer.close()
else:
writer.write_table(table, row_group_size=row_group_size)
try:
if not write_iteratively:
writer.write_table(table, row_group_size=row_group_size)

else:
# this `table` is JUST for the first array
writer.write_table(table, row_group_size=row_group_size)

# this `data` is an iterator (not iterable), starting AFTER the first array
# a `for` loop implicitly calls `next` and stops at `StopIteration`
for item in data:
layout, table = get_layout_and_table(item)
writer.write_table(table, row_group_size=row_group_size)

finally:
# ensure that the ParquetWriter is closed for iterative and non-iterative cases
writer.close()
zbilodea marked this conversation as resolved.
Show resolved Hide resolved

meta = metalist[0]
meta.set_file_path(destination.rsplit("/", 1)[-1])
return meta
Expand Down
Loading
Loading