Skip to content

Commit

Permalink
Add read API to convert result to DuckDB and Ray (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Nov 26, 2024
1 parent f09dc58 commit 03108ec
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 12 deletions.
2 changes: 2 additions & 0 deletions dev/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ numpy>=1.22.4
python-dateutil>=2.8.0,<3
pytz>=2018.3
pytest~=7.0
duckdb>=0.5.0,<2.0.0
ray~=2.10.0
16 changes: 15 additions & 1 deletion paimon_python_api/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import pandas as pd
import pyarrow as pa
import ray

from abc import ABC, abstractmethod
from duckdb.duckdb import DuckDBPyConnection
from paimon_python_api import Split
from typing import List
from typing import List, Optional


class TableRead(ABC):
Expand All @@ -38,3 +40,15 @@ def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader:
@abstractmethod
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
"""Read data from splits and converted to pandas.DataFrame format."""

@abstractmethod
def to_duckdb(
self,
splits: List[Split],
table_name: str,
connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
"""Convert splits into an in-memory DuckDB table which can be queried."""

@abstractmethod
def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
"""Convert splits into a Ray dataset format."""
15 changes: 15 additions & 0 deletions paimon_python_java/pypaimon.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
# limitations under the License.
################################################################################

import duckdb
import pandas as pd
import pyarrow as pa
import ray

from duckdb.duckdb import DuckDBPyConnection
from paimon_python_java.java_gateway import get_gateway
from paimon_python_java.util import java_utils, constants
from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read,
Expand Down Expand Up @@ -161,6 +164,18 @@ def to_arrow_batch_reader(self, splits):
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
return self.to_arrow(splits).to_pandas()

def to_duckdb(
self,
splits: List[Split],
table_name: str,
connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
con = connection or duckdb.connect(database=":memory:")
con.register(table_name, self.to_arrow(splits))
return con

def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
return ray.data.from_arrow(self.to_arrow(splits))

def _init(self):
if self._j_bytes_reader is None:
# get thread num
Expand Down
49 changes: 38 additions & 11 deletions paimon_python_java/tests/test_write_and_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,20 @@ def testAllWriteAndReadApi(self):
table_write.close()
table_commit.close()

all_data = pd.DataFrame({
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
})
all_data['f0'] = all_data['f0'].astype('int32')

read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
splits = table_scan.plan().splits()

# to_arrow
actual = table_read.to_arrow(splits)
expected = pa.Table.from_pydict({
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
}, schema=self.simple_pa_schema)
expected = pa.Table.from_pandas(all_data, schema=self.simple_pa_schema)
self.assertEqual(actual, expected)

# to_arrow_batch_reader
Expand All @@ -286,18 +289,42 @@ def testAllWriteAndReadApi(self):
for batch in table_read.to_arrow_batch_reader(splits)
]
actual = pd.concat(data_frames)
expected = pd.DataFrame({
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
})
expected['f0'] = expected['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual.reset_index(drop=True), expected.reset_index(drop=True))
actual.reset_index(drop=True), all_data.reset_index(drop=True))

# to_pandas
actual = table_read.to_pandas(splits)
pd.testing.assert_frame_equal(
actual.reset_index(drop=True), expected.reset_index(drop=True))
actual.reset_index(drop=True), all_data.reset_index(drop=True))

# to_duckdb
duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
# select *
result1 = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
pd.testing.assert_frame_equal(
result1.reset_index(drop=True), all_data.reset_index(drop=True))
# select * where
result2 = duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 < 4").fetchdf()
expected2 = pd.DataFrame({
'f0': [1, 2, 3],
'f1': ['a', 'b', 'c']
})
expected2['f0'] = expected2['f0'].astype('int32')
pd.testing.assert_frame_equal(
result2.reset_index(drop=True), expected2.reset_index(drop=True))
# select f0 where
result3 = duckdb_con.query("SELECT f0 FROM duckdb_table WHERE f0 < 4").fetchdf()
expected3 = pd.DataFrame({
'f0': [1, 2, 3]
})
expected3['f0'] = expected3['f0'].astype('int32')
pd.testing.assert_frame_equal(
result3.reset_index(drop=True), expected3.reset_index(drop=True))

# to_ray
ray_dataset = table_read.to_ray(splits)
pd.testing.assert_frame_equal(
ray_dataset.to_pandas().reset_index(drop=True), all_data.reset_index(drop=True))

def test_overwrite(self):
schema = Schema(self.simple_pa_schema, partition_keys=['f0'],
Expand Down

0 comments on commit 03108ec

Please sign in to comment.