Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce batch write api #4

Merged
merged 3 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/paimon-python-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Run lint-python.sh
run: |
chmod +x dev/lint-python.sh
Expand Down
13 changes: 9 additions & 4 deletions dev/lint-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ function get_os_index() {
local sys_os=$(uname -s)
echo "Detected OS: ${sys_os}"
if [ ${sys_os} == "Darwin" ]; then
return 0
echo 0
elif [[ ${sys_os} == "Linux" ]]; then
return 1
echo 1
else
echo "Unsupported OS: ${sys_os}"
exit 1
Expand Down Expand Up @@ -360,8 +360,13 @@ function install_environment() {
print_function "STAGE" "installing environment"

#get the index of the SUPPORT_OS array for convenient to install tool.
get_os_index $sys_os
local os_index=$?
local os_index=$(get_os_index | tail -n1)

# In some Linux distributions, md5sum is installed instead of md5. But our miniconda installation shell uses md5
if [ "$os_index" -eq 1 ] && [ ! -f /usr/local/bin/md5 ]; then
echo "Creating symlink for md5 to md5sum..."
sudo ln -s $(which md5sum) /usr/local/bin/md5
fi

# step-1 install wget
# the file size of the miniconda.sh is too big to use "wget" tool to download instead
Expand Down
61 changes: 58 additions & 3 deletions java_based_implementation/api_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

from java_based_implementation.java_gateway import get_gateway
from java_based_implementation.util.java_utils import to_j_catalog_context
from paimon_python_api import catalog, read_builder, table_scan, split, table_read
from paimon_python_api import table
from pyarrow import RecordBatchReader
from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read,
write_builder, table_write, commit_message, table_commit)
from pyarrow import RecordBatchReader, RecordBatch
from typing import List
from typing_extensions import Self

Expand Down Expand Up @@ -53,6 +53,10 @@ def new_read_builder(self) -> 'ReadBuilder':
j_read_builder = self._j_table.newReadBuilder()
return ReadBuilder(j_read_builder)

def new_batch_write_builder(self) -> 'BatchWriteBuilder':
j_batch_write_builder = self._j_table.newBatchWriteBuilder()
return BatchWriteBuilder(j_batch_write_builder)


class ReadBuilder(read_builder.ReadBuilder):

Expand Down Expand Up @@ -110,3 +114,54 @@ class TableRead(table_read.TableRead):
def create_reader(self, split: Split) -> RecordBatchReader:
# TODO
pass


class BatchWriteBuilder(write_builder.BatchWriteBuilder):

def __init__(self, j_batch_write_builder):
self._j_batch_write_builder = j_batch_write_builder

def with_overwrite(self, static_partition: dict) -> Self:
self._j_batch_write_builder.withOverwrite(static_partition)
return self

def new_write(self) -> 'BatchTableWrite':
j_batch_table_write = self._j_batch_write_builder.newWrite()
return BatchTableWrite(j_batch_table_write)

def new_commit(self) -> 'BatchTableCommit':
j_batch_table_commit = self._j_batch_write_builder.newCommit()
return BatchTableCommit(j_batch_table_commit)


class BatchTableWrite(table_write.BatchTableWrite):

def __init__(self, j_batch_table_write):
self._j_batch_table_write = j_batch_table_write

def write(self, record_batch: RecordBatch):
# TODO
pass

def prepare_commit(self) -> List['CommitMessage']:
j_commit_messages = self._j_batch_table_write.prepareCommit()
return list(map(lambda cm: CommitMessage(cm), j_commit_messages))


class CommitMessage(commit_message.CommitMessage):

def __init__(self, j_commit_message):
self._j_commit_message = j_commit_message

def to_j_commit_message(self):
return self._j_commit_message


class BatchTableCommit(table_commit.BatchTableCommit):

def __init__(self, j_batch_table_commit):
self._j_batch_table_commit = j_batch_table_commit

def commit(self, commit_messages: List[CommitMessage]):
j_commit_messages = list(map(lambda cm: cm.to_j_commit_message(), commit_messages))
self._j_batch_table_commit.commit(j_commit_messages)
23 changes: 23 additions & 0 deletions paimon_python_api/commit_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from abc import ABC


class CommitMessage(ABC):
"""Commit message collected from writer."""
5 changes: 5 additions & 0 deletions paimon_python_api/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from abc import ABC, abstractmethod
from read_builder import ReadBuilder
from write_builder import BatchWriteBuilder


class Table(ABC):
Expand All @@ -26,3 +27,7 @@ class Table(ABC):
@abstractmethod
def new_read_builder(self) -> ReadBuilder:
"""Return a builder for building table scan and table read."""

@abstractmethod
def new_batch_write_builder(self) -> BatchWriteBuilder:
"""Returns a builder for building batch table write and table commit."""
32 changes: 32 additions & 0 deletions paimon_python_api/table_commit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from abc import ABC, abstractmethod
from commit_message import CommitMessage
from typing import List


class BatchTableCommit(ABC):
"""A table commit for batch processing. Recommended for one-time committing."""

@abstractmethod
def commit(self, commit_messages: List[CommitMessage]):
"""
Commit the commit messages to generate snapshots. One commit may generate
up to two snapshots, one for adding new files and the other for compaction.
"""
34 changes: 34 additions & 0 deletions paimon_python_api/table_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from abc import ABC, abstractmethod
from commit_message import CommitMessage
from pyarrow import RecordBatch
from typing import List


class BatchTableWrite(ABC):
"""A table write for batch processing. Recommended for one-time committing."""

@abstractmethod
def write(self, record_batch: RecordBatch):
""" Write a batch to the writer. */"""

@abstractmethod
def prepare_commit(self) -> List[CommitMessage]:
"""Prepare commit message for TableCommit. Collect incremental files for this writer."""
41 changes: 41 additions & 0 deletions paimon_python_api/write_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from abc import ABC, abstractmethod
from table_commit import BatchTableCommit
from table_write import BatchTableWrite
from typing_extensions import Self


class BatchWriteBuilder(ABC):
"""An interface for building the TableScan and TableRead."""

@abstractmethod
def with_overwrite(self, static_partition: dict) -> Self:
"""
Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL.
If you pass an empty dict, it means OVERWRITE whole table.
"""

@abstractmethod
def new_write(self) -> BatchTableWrite:
"""Create a BatchTableWrite to perform batch writing."""

@abstractmethod
def new_commit(self) -> BatchTableCommit:
"""Create a BatchTableCommit to perform batch commiting."""
Loading