Skip to content

Commit

Permalink
Adding InclusiveManifestEvaluator and ResidualEvaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted Gooch committed Jun 5, 2019
1 parent 4a75dce commit 83431b4
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 22 deletions.
4 changes: 4 additions & 0 deletions python/iceberg/api/expressions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"FixedLiteral",
"FloatLiteral",
"inclusive",
"InclusiveManifestEvaluator",
"InclusiveMetricsEvaluator",
"InclusiveProjection",
"IntegerLiteral",
Expand All @@ -52,6 +53,7 @@
"Or",
"Predicate",
"Reference",
"ResidualEvaluator",
"strict",
"StrictMetricsEvaluator",
"StrictProjection",
Expand All @@ -73,6 +75,7 @@
TRUE,
TrueExp)
from .expressions import Expressions, ExpressionVisitors
from .inclusive_manifest_evaluator import InclusiveManifestEvaluator
from .inclusive_metrics_evaluator import InclusiveMetricsEvaluator
from .java_variables import (JAVA_MAX_FLOAT,
JAVA_MAX_INT,
Expand Down Expand Up @@ -102,4 +105,5 @@
from .reference import (BoundReference,
NamedReference,
Reference)
from .residual_evaluator import ResidualEvaluator
from .strict_metrics_evaluator import StrictMetricsEvaluator
15 changes: 9 additions & 6 deletions python/iceberg/api/expressions/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@
# specific language governing permissions and limitations
# under the License.

import threading

from .binder import Binder
from .expressions import ExpressionVisitors


class Evaluator(object):
THREAD_LOCAL_DATA = threading.local()

def visitor(self):
if not hasattr(Evaluator.THREAD_LOCAL_DATA, "visitors") :
Evaluator.THREAD_LOCAL_DATA.visitors = Evaluator.EvalVisitor()

return Evaluator.THREAD_LOCAL_DATA.visitors

def __init__(self, struct, unbound, case_sensitive=True):
self.expr = Binder.bind(struct, unbound, case_sensitive)
self.visitors = None

def visitor(self):
if self.visitors is None:
self.visitors = Evaluator.EvalVisitor()

return self.visitors

def eval(self, data):
return self.visitor().eval(data, self.expr)

Expand Down
161 changes: 161 additions & 0 deletions python/iceberg/api/expressions/inclusive_manifest_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import threading

from .binder import Binder
from .expressions import Expressions, ExpressionVisitors
from .projections import inclusive
from ..types import Conversions

ROWS_MIGHT_MATCH = True
ROWS_CANNOT_MATCH = False


class InclusiveManifestEvaluator(object):

def visitor(self):
if not hasattr(self.thread_local_data, "visitors"):
self.thread_local_data.visitors = ManifestEvalVistor(self.expr)

return self.thread_local_data.visitors

def __init__(self, spec, row_filter, case_sensitive=True):
self.struct = spec.partition_type()
self.expr = Binder.bind(self.struct,
Expressions.rewrite_not(inclusive(spec, case_sensitive=case_sensitive)
.project(row_filter)),
case_sensitive=case_sensitive)
self.thread_local_data = threading.local()

def eval(self, manifest):
return self.visitor().eval(manifest)


class ManifestEvalVistor(ExpressionVisitors.BoundExpressionVisitor):

def __init__(self, expr):
self.expr = expr
self.stats = None

def eval(self, manifest):
self.stats = manifest.partitions
if self.stats is None:
return ROWS_MIGHT_MATCH

return ExpressionVisitors.visit(self.expr, self)

def always_true(self):
return ROWS_MIGHT_MATCH

def always_false(self):
return ROWS_CANNOT_MATCH

def not_(self, result):
return not result

def and_(self, left_result, right_result):
return left_result and right_result

def or_(self, left_result, right_result):
return left_result or right_result

def is_null(self, ref):
if not self.stats[ref.pos].contains_null():
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH

def not_null(self, ref):
lower_bound = self.stats[ref.pos].lower_bound()
if lower_bound is None:
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH

def lt(self, ref, lit):
lower_bound = self.stats[ref.pos].lower_bound()
if lower_bound is None:
return ROWS_CANNOT_MATCH

lower = Conversions.from_byte_buffer(ref.type, lower_bound)

if lower >= lit.value:
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH

def lt_eq(self, ref, lit):
lower_bound = self.stats[ref.pos].lower_bound()
if lower_bound is None:
return ROWS_CANNOT_MATCH

lower = Conversions.from_byte_buffer(ref.type, lower_bound)

if lower > lit.value:
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH

def gt(self, ref, lit):
upper_bound = self.stats[ref.pos].upper_bound()
if upper_bound is None:
return ROWS_CANNOT_MATCH

upper = Conversions.from_byte_buffer(ref.type, upper_bound)

if upper <= lit.value:
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH

def gt_eq(self, ref, lit):
upper_bound = self.stats[ref.pos].upper_bound()
if upper_bound is None:
return ROWS_CANNOT_MATCH

upper = Conversions.from_byte_buffer(ref.type, upper_bound)

if upper < lit.value:
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH

def eq(self, ref, lit):
field_stats = self.stats[ref.pos]
if field_stats.lower_bound() is None:
return ROWS_CANNOT_MATCH

lower = Conversions.from_byte_buffer(ref.type, field_stats.lower_bound())
if lower > lit.value:
return ROWS_CANNOT_MATCH

upper = Conversions.from_byte_buffer(ref.type, field_stats.upper_bound())

if upper < lit.value:
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH

def not_eq(self, ref, lit):
return ROWS_MIGHT_MATCH

def in_(self, ref, lit):
return ROWS_MIGHT_MATCH

def not_in(self, ref, lit):
return ROWS_MIGHT_MATCH
18 changes: 10 additions & 8 deletions python/iceberg/api/expressions/inclusive_metrics_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,30 @@
# specific language governing permissions and limitations
# under the License.

import threading

from .expressions import Expressions, ExpressionVisitors
from ..expressions.binder import Binder
from ..types import Conversions


class InclusiveMetricsEvaluator(object):

def visitor(self):
if not hasattr(self.thread_local_data, "visitors"):
self.thread_local_data.visitors = MetricsEvalVisitor(self.expr, self.schema, self.struct)

return self.thread_local_data.visitors

def __init__(self, schema, unbound, case_sensitive=True):
self.schema = schema
self.struct = schema.as_struct()
self.case_sensitive = case_sensitive
self.expr = Binder.bind(self.struct, Expressions.rewrite_not(unbound), case_sensitive)
self._visitors = None

def _visitor(self):
if self._visitors is None:
self._visitors = MetricsEvalVisitor(self.expr, self.schema, self.struct)

return self._visitors
self.thread_local_data = threading.local()

def eval(self, file):
return self._visitor().eval(file)
return self.visitor().eval(file)


class MetricsEvalVisitor(ExpressionVisitors.BoundExpressionVisitor):
Expand Down
119 changes: 119 additions & 0 deletions python/iceberg/api/expressions/residual_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import threading

from .expressions import Expressions, ExpressionVisitors
from .predicate import BoundPredicate, Predicate, UnboundPredicate


class ResidualEvaluator(object):

def visitor(self):
if not hasattr(self.thread_local_data, "visitors"):
self.thread_local_data.visitors = ResidualVisitor()

return self.thread_local_data.visitors

def __init__(self, spec, expr):
self._spec = spec
self._expr = expr
self.thread_local_data = threading.local()

def residual_for(self, partition_data):
return self.visitor().eval(partition_data)


class ResidualVisitor(ExpressionVisitors.BoundExpressionVisitor):

def __init__(self):
self.struct = None

def eval(self, struct):
self.struct = struct

def always_true(self):
return Expressions.always_true()

def always_false(self):
return Expressions.always_false()

def is_null(self, ref):
return self.always_true() if ref.get(self.struct) is None else self.always_false()

def not_null(self, ref):
return self.always_true() if ref.get(self.struct) is not None else self.always_false()

def lt(self, ref, lit):
return self.always_true() if ref.get(self.struct) < lit.value else self.always_false()

def lt_eq(self, ref, lit):
return self.always_true() if ref.get(self.struct) <= lit.value else self.always_false()

def gt(self, ref, lit):
return self.always_true() if ref.get(self.struct) > lit.value else self.always_false()

def gt_eq(self, ref, lit):
return self.always_true() if ref.get(self.struct) >= lit.value else self.always_false()

def eq(self, ref, lit):
return self.always_true() if ref.get(self.struct) == lit.value else self.always_false()

def not_eq(self, ref, lit):
return self.always_true() if ref.get(self.struct) != lit.value else self.always_false()

def not_(self, result):
return Expressions.not_(result)

def and_(self, left_result, right_result):
return Expressions.and_(left_result, right_result)

def or_(self, left_result, right_result):
return Expressions.or_(left_result, right_result)

def predicate(self, pred):
if isinstance(pred, BoundPredicate):
return self.bound_predicate(pred)
elif isinstance(pred, UnboundPredicate):
return self.unbound_predicate(pred)

raise RuntimeError("Invalid predicate argument %s" % pred)

def bound_predicate(self, pred):
part = self.spec.get_field_by_source_id(pred.ref.field_id)
if part is None:
return pred

strict_projection = part.transform.project_strict(part.name, pred)
if strict_projection is None:
bound = strict_projection.bind(self.spec.partition_type())
if isinstance(bound, BoundPredicate):
return super(ResidualVisitor, self).predicate(bound)
return bound

return pred

def unbound_predicate(self, pred):
bound = pred.bind(self.spec.schema.as_struct())

if isinstance(bound, BoundPredicate):
bound_residual = self.predicate(bound)
if isinstance(bound_residual, Predicate):
return pred
return bound_residual

return bound
Loading

0 comments on commit 83431b4

Please sign in to comment.