Skip to content

Commit

Permalink
Read interface support predicate (#22)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5e7d468)
  • Loading branch information
yuzelin authored and yuzelin committed Nov 7, 2024
1 parent 46d44da commit cbb81bb
Show file tree
Hide file tree
Showing 11 changed files with 735 additions and 8 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/paimon-python-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Set up hadoop dependency
run: |
mkdir -p ${{ github.workspace }}/temp
curl -L -o ${{ github.workspace }}/temp/bundled-hadoop.jar \
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: Run lint-python.sh
env:
_PYPAIMON_HADOOP_CLASSPATH: ${{ github.workspace }}/temp/bundled-hadoop.jar
run: |
chmod +x dev/lint-python.sh
./dev/lint-python.sh
5 changes: 4 additions & 1 deletion paimon_python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .split import Split
from .table_read import TableRead
from .table_scan import TableScan, Plan
from .predicate import Predicate, PredicateBuilder
from .read_builder import ReadBuilder
from .commit_message import CommitMessage
from .table_commit import BatchTableCommit
Expand All @@ -39,5 +40,7 @@
'BatchWriteBuilder',
'Table',
'Schema',
'Catalog'
'Catalog',
'Predicate',
'PredicateBuilder'
]
95 changes: 95 additions & 0 deletions paimon_python_api/predicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from abc import ABC, abstractmethod
from typing import Any, List


class Predicate(ABC):
"""Predicate which evaluates to a boolean. Now it doesn't have
any methods because only paimon_python_java implement it and
the Java implementation convert it to Java object."""


class PredicateBuilder(ABC):
"""A utility class to create Predicate object for common filter conditions."""

@abstractmethod
def equal(self, field: str, literal: Any) -> Predicate:
"""field = literal"""

@abstractmethod
def not_equal(self, field: str, literal: Any) -> Predicate:
"""field <> literal"""

@abstractmethod
def less_than(self, field: str, literal: Any) -> Predicate:
"""field < literal"""

@abstractmethod
def less_or_equal(self, field: str, literal: Any) -> Predicate:
"""field <= literal"""

@abstractmethod
def greater_than(self, field: str, literal: Any) -> Predicate:
"""field > literal"""

@abstractmethod
def greater_or_equal(self, field: str, literal: Any) -> Predicate:
"""field >= literal"""

@abstractmethod
def is_null(self, field: str) -> Predicate:
"""field IS NULL"""

@abstractmethod
def is_not_null(self, field: str) -> Predicate:
"""field IS NOT NULL"""

@abstractmethod
def startswith(self, field: str, pattern_literal: Any) -> Predicate:
"""field.startswith"""

@abstractmethod
def endswith(self, field: str, pattern_literal: Any) -> Predicate:
"""field.endswith()"""

@abstractmethod
def contains(self, field: str, pattern_literal: Any) -> Predicate:
"""literal in field"""

@abstractmethod
def is_in(self, field: str, literals: List[Any]) -> Predicate:
"""field IN literals"""

@abstractmethod
def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
"""field NOT IN literals"""

@abstractmethod
def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \
-> Predicate:
"""field BETWEEN included_lower_bound AND included_upper_bound"""

@abstractmethod
def and_predicates(self, predicates: List[Predicate]) -> Predicate:
"""predicate1 AND predicate2 AND ..."""

@abstractmethod
def or_predicates(self, predicates: List[Predicate]) -> Predicate:
"""predicate1 OR predicate2 OR ..."""
13 changes: 12 additions & 1 deletion paimon_python_api/read_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
#################################################################################

from abc import ABC, abstractmethod
from paimon_python_api import TableRead, TableScan
from paimon_python_api import TableRead, TableScan, Predicate, PredicateBuilder
from typing import List


class ReadBuilder(ABC):
"""An interface for building the TableScan and TableRead."""

@abstractmethod
def with_filter(self, predicate: Predicate):
"""
Push filters, will filter the data as much as possible,
but it is not guaranteed that it is a complete filter.
"""

@abstractmethod
def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
"""Push nested projection."""
Expand All @@ -39,3 +46,7 @@ def new_scan(self) -> TableScan:
@abstractmethod
def new_read(self) -> TableRead:
"""Create a TableRead to read splits."""

@abstractmethod
def new_predicate_builder(self) -> PredicateBuilder:
"""Create a builder for Predicate."""
7 changes: 5 additions & 2 deletions paimon_python_java/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from .util import constants
from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, TableRead,
BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit)
BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit,
Predicate, PredicateBuilder)

__all__ = [
'constants',
Expand All @@ -32,5 +33,7 @@
'BatchWriteBuilder',
'BatchTableWrite',
'CommitMessage',
'BatchTableCommit'
'BatchTableCommit',
'Predicate',
'PredicateBuilder'
]
2 changes: 1 addition & 1 deletion paimon_python_java/gateway_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _get_hadoop_classpath(env):
return env[constants.PYPAIMON_HADOOP_CLASSPATH]

if 'HADOOP_CLASSPATH' in env:
return None
return env['HADOOP_CLASSPATH']
else:
raise EnvironmentError(f"You haven't set '{constants.PYPAIMON_HADOOP_CLASSPATH}', \
and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.")
1 change: 1 addition & 0 deletions paimon_python_java/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def import_paimon_view(gateway):
java_import(gateway.jvm, 'org.apache.paimon.types.*')
java_import(gateway.jvm, 'org.apache.paimon.python.*')
java_import(gateway.jvm, "org.apache.paimon.data.*")
java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder")


class Watchdog(object):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.python;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import java.util.List;
import java.util.stream.Collectors;

/** For building Predicate. */
public class PredicationUtil {

public static Predicate build(
RowType rowType,
PredicateBuilder builder,
String method,
int index,
List<Object> literals) {
literals =
literals.stream()
.map(l -> convertJavaObject(rowType.getTypeAt(index), l))
.collect(Collectors.toList());
switch (method) {
case "equal":
return builder.equal(index, literals.get(0));
case "notEqual":
return builder.notEqual(index, literals.get(0));
case "lessThan":
return builder.lessThan(index, literals.get(0));
case "lessOrEqual":
return builder.lessOrEqual(index, literals.get(0));
case "greaterThan":
return builder.greaterThan(index, literals.get(0));
case "greaterOrEqual":
return builder.greaterOrEqual(index, literals.get(0));
case "isNull":
return builder.isNull(index);
case "isNotNull":
return builder.isNotNull(index);
case "startsWith":
return builder.startsWith(index, literals.get(0));
case "endsWith":
return builder.endsWith(index, literals.get(0));
case "contains":
return builder.contains(index, literals.get(0));
case "in":
return builder.in(index, literals);
case "notIn":
return builder.notIn(index, literals);
case "between":
return builder.between(index, literals.get(0), literals.get(1));
default:
throw new UnsupportedOperationException(
"Unknown PredicateBuilder method " + method);
}
}

/** Some type is not convenient to transfer from Python to Java. */
private static Object convertJavaObject(DataType literalType, Object literal) {
switch (literalType.getTypeRoot()) {
case BOOLEAN:
case DOUBLE:
case INTEGER:
return literal;
case CHAR:
case VARCHAR:
return BinaryString.fromString((String) literal);
case FLOAT:
return ((Number) literal).floatValue();
case TINYINT:
return ((Number) literal).byteValue();
case SMALLINT:
return ((Number) literal).shortValue();
case BIGINT:
return ((Number) literal).longValue();
default:
throw new UnsupportedOperationException(
"Unsupported predicate leaf type " + literalType.getTypeRoot().name());
}
}

public static Predicate buildAnd(List<Predicate> predicates) {
// 'and' is keyword of Python
return PredicateBuilder.and(predicates);
}

public static Predicate buildOr(List<Predicate> predicates) {
// 'or' is keyword of Python
return PredicateBuilder.or(predicates);
}
}
Loading

0 comments on commit cbb81bb

Please sign in to comment.