Skip to content

Commit

Permalink
bulk-cdk-toolkit-extract-jdbc: remove cursor from WHERE clause in ini…
Browse files Browse the repository at this point in the history
…tial syncs (#44011)
  • Loading branch information
Marius Posta authored Aug 13, 2024
1 parent 42c9bc1 commit 75e0adf
Showing 1 changed file with 1 addition and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ fun StreamPartitionReader.Input.querySpec(
checkpointUpperBound = primaryKeyUpperBound,
isOrdered,
limit,
extraConjWhereClauses = arrayOf(LesserOrEqual(cursor, cursorUpperBound)),
)
is StreamPartitionReader.CursorIncrementalInput ->
querySpecForStreamPartitionReader(
Expand All @@ -205,7 +204,6 @@ private fun querySpecForStreamPartitionReader(
checkpointUpperBound: List<JsonNode>?,
isOrdered: Boolean,
limit: Long?,
vararg extraConjWhereClauses: WhereClauseNode,
): SelectQuerySpec {
val selectColumns: List<Field> =
if (isOrdered) {
Expand Down Expand Up @@ -240,11 +238,10 @@ private fun querySpecForStreamPartitionReader(
} + listOf(lastLeaf),
)
}
val whereClause = And(listOf(Or(lowerBoundDisj), Or(upperBoundDisj)) + extraConjWhereClauses)
return SelectQuerySpec(
SelectColumns(selectColumns),
From(stream.name, stream.namespace),
Where(whereClause),
Where(And(Or(lowerBoundDisj), Or(upperBoundDisj))),
if (isOrdered) OrderBy(checkpointColumns) else NoOrderBy,
if (limit == null) NoLimit else Limit(limit),
)
Expand Down

0 comments on commit 75e0adf

Please sign in to comment.