Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add to_parquet_row_groups #2979

Merged
merged 23 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ebd11a4
Set up, but not writing in batches correctly.
zbilodea Jan 22, 2024
8686679
style: pre-commit fixes
pre-commit-ci[bot] Jan 22, 2024
6596f13
with pre-commit...
zbilodea Jan 22, 2024
87cf382
Writes correct row groups, don't think it's properly iterative yet
zbilodea Jan 23, 2024
2ce2d38
Merge branch 'feat-add-to-parquet-row-groups' of https://github.com/s…
zbilodea Jan 23, 2024
5b2de81
test with HZZ file works as expected.
zbilodea Jan 24, 2024
73a59a7
style: pre-commit fixes
pre-commit-ci[bot] Jan 24, 2024
2b5d079
typo...
zbilodea Jan 24, 2024
84caa70
added importerskip for uproot
zbilodea Jan 24, 2024
e3f3f85
style: pre-commit fixes
pre-commit-ci[bot] Jan 24, 2024
2ee1409
Change record.Record to .Record
zbilodea Jan 24, 2024
5c5f42d
Merge branch 'main' into feat-add-to-parquet-row-groups
zbilodea Jan 24, 2024
51ecd0b
add to docs
zbilodea Jan 24, 2024
fcbed3d
adding test
zbilodea Jan 24, 2024
2ba3f6b
corrected tests
zbilodea Jan 26, 2024
a1e139b
Merge branch 'main' into feat-add-to-parquet-row-groups
zbilodea Jan 26, 2024
d37bdb1
add importerskip?
zbilodea Jan 26, 2024
631025d
style: pre-commit fixes
pre-commit-ci[bot] Jan 26, 2024
1c3aaaf
removed batch-iterator function, layouts should only be created for o…
zbilodea Jan 26, 2024
f3dba30
Merge branch 'feat-add-to-parquet-row-groups' of https://github.com/s…
zbilodea Jan 26, 2024
6fae650
removed batch-iterator function, layouts should only be created for o…
zbilodea Jan 26, 2024
1da5b07
refactored implementation and made the tests more like other tests
jpivarski Jan 29, 2024
14087c8
Merge branch 'main' into feat-add-to-parquet-row-groups
zbilodea Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/toctree.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
generated/ak.to_numpy
generated/ak.to_packed
generated/ak.to_parquet
generated/ak.to_parquet_row_groups
generated/ak.to_rdataframe

.. toctree::
Expand Down
1 change: 1 addition & 0 deletions src/awkward/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from awkward.operations.ak_to_numpy import *
from awkward.operations.ak_to_packed import *
from awkward.operations.ak_to_parquet import *
from awkward.operations.ak_to_parquet_row_groups import *
from awkward.operations.ak_to_rdataframe import *
from awkward.operations.ak_to_regular import *
from awkward.operations.ak_transform import *
Expand Down
156 changes: 134 additions & 22 deletions src/awkward/operations/ak_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,35 +178,130 @@ def to_parquet(
# Dispatch
yield (array,)

# Implementation
import awkward._connect.pyarrow

data = array

pyarrow_parquet = awkward._connect.pyarrow.import_pyarrow_parquet("ak.to_parquet")
fsspec = awkward._connect.pyarrow.import_fsspec("ak.to_parquet")

layout = ak.operations.ak_to_layout._impl(
data,
allow_record=True,
allow_unknown=False,
none_policy="error",
regulararray=True,
use_from_iter=True,
primitive_policy="error",
string_policy="as-characters",
)
table = ak.operations.ak_to_arrow_table._impl(
layout,
return _impl(
array,
destination,
list_to32,
string_to32,
bytestring_to32,
emptyarray_to,
categorical_as_dictionary,
extensionarray,
count_nulls,
compression,
compression_level,
row_group_size,
data_page_size,
parquet_flavor,
parquet_version,
parquet_page_version,
parquet_metadata_statistics,
parquet_dictionary_encoding,
parquet_byte_stream_split,
parquet_coerce_timestamps,
parquet_old_int96_timestamps,
parquet_compliant_nested, # https://issues.apache.org/jira/browse/ARROW-16348
parquet_extra_options,
storage_options,
write_iteratively=False,
)


def _impl(
array,
destination,
list_to32,
string_to32,
bytestring_to32,
emptyarray_to,
categorical_as_dictionary,
extensionarray,
count_nulls,
compression,
compression_level,
row_group_size,
data_page_size,
parquet_flavor,
parquet_version,
parquet_page_version,
parquet_metadata_statistics,
parquet_dictionary_encoding,
parquet_byte_stream_split,
parquet_coerce_timestamps,
parquet_old_int96_timestamps,
parquet_compliant_nested,
parquet_extra_options,
storage_options,
write_iteratively,
):
# Implementation
import awkward._connect.pyarrow

data = array

pyarrow_parquet = awkward._connect.pyarrow.import_pyarrow_parquet("ak.to_parquet")
fsspec = awkward._connect.pyarrow.import_fsspec("ak.to_parquet")

def get_layout_and_table(x):
layout = ak.operations.ak_to_layout._impl(
x,
allow_record=True,
allow_unknown=False,
none_policy="error",
regulararray=True,
use_from_iter=True,
primitive_policy="error",
string_policy="as-characters",
)
table = ak.operations.ak_to_arrow_table._impl(
layout,
list_to32,
string_to32,
bytestring_to32,
emptyarray_to,
categorical_as_dictionary,
extensionarray,
count_nulls,
)
return layout, table

if not write_iteratively:
layout, table = get_layout_and_table(data)

else:
if (
isinstance(
data,
(
ak.highlevel.Array,
ak.highlevel.Record,
ak.highlevel.ArrayBuilder,
ak.contents.Content,
ak.record.Record,
),
)
or hasattr(data, "dtype")
or hasattr(data, "shape")
):
raise TypeError(
"The first argument of ak.to_parquet_row_groups should be an "
"iterable collection of arrays to put into separate row groups, "
"not a single array. If this was intended, wrap the first argument "
"with the `iter` function."
)

data = iter(data) # necessary for iterables; no-op for iterators

try:
first_array = next(data)
except StopIteration:
raise ValueError(
"The first argument of ak.to_parquet_row_groups must contain at least one array."
) from None

# if `write_iteratively`, then `layout` and `table` represent the FIRST, not all
layout, table = get_layout_and_table(first_array)

if parquet_compliant_nested:
list_indicator = "list.element"
else:
Expand Down Expand Up @@ -312,7 +407,7 @@ def parquet_columns(specifier, only=None):
destination = fsdecode(destination)
except TypeError:
raise TypeError(
f"'destination' argument of 'ak.to_parquet' must be a path-like, not {type(destination).__name__} ('array' argument is first; 'destination' second)"
f"'destination' argument of 'ak.to_parquet' and 'ak.to_parquet_row_groups' must be a path-like, not {type(destination).__name__} ('array' argument is first; 'destination' second)"
) from None

fs, destination = fsspec.core.url_to_fs(destination, **(storage_options or {}))
Expand All @@ -336,7 +431,24 @@ def parquet_columns(specifier, only=None):
metadata_collector=metalist,
**parquet_extra_options,
) as writer:
writer.write_table(table, row_group_size=row_group_size)
try:
if not write_iteratively:
writer.write_table(table, row_group_size=row_group_size)

else:
# this `table` is JUST for the first array
writer.write_table(table, row_group_size=row_group_size)

# this `data` is an iterator (not iterable), starting AFTER the first array
# a `for` loop implicitly calls `next` and stops at `StopIteration`
for item in data:
layout, table = get_layout_and_table(item)
writer.write_table(table, row_group_size=row_group_size)

finally:
# ensure that the ParquetWriter is closed for iterative and non-iterative cases
writer.close()

meta = metalist[0]
meta.set_file_path(destination.rsplit("/", 1)[-1])
return meta
Expand Down
Loading
Loading