Skip to content

Commit

Permalink
[FLINK-36936][table] Introduce UpdatableTopNFunction in Rank with Asy…
Browse files Browse the repository at this point in the history
…nc State API
  • Loading branch information
Au-Miner committed Feb 24, 2025
1 parent 4091e85 commit f8c0e01
Show file tree
Hide file tree
Showing 10 changed files with 862 additions and 500 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.flink.table.runtime.operators.rank.async.AbstractAsyncStateTopNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateUpdatableTopNFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.runtime.util.StateConfigUtil;
Expand All @@ -86,7 +87,7 @@
@ExecNodeMetadata(
name = "stream-exec-rank",
version = 1,
consumedOptions = {"table.exec.rank.topn-cache-size"},
consumedOptions = {"table.exec.rank.topn-cache-size", "table.exec.async-state.enabled"},
producedTransformations = StreamExecRank.RANK_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down Expand Up @@ -346,18 +347,33 @@ protected Transformation<RowData> translateToPlanInternal(
planner.getFlinkContext().getClassLoader(),
primaryKeys,
inputRowTypeInfo);
processFunction =
new UpdatableTopNFunction(
ttlConfig,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
if (isAsyncStateEnabled) {
processFunction =
new AsyncStateUpdatableTopNFunction(
ttlConfig,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
} else {
processFunction =
new UpdatableTopNFunction(
ttlConfig,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}
}
} else if (rankStrategy instanceof RankProcessStrategy.RetractStrategy) {
EqualiserCodeGenerator equaliserCodeGen =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean)
DataTypes.INT().getLogicalType,
DataTypes.BIGINT().getLogicalType))

assertThat(isAsyncStateOperator(testHarness)).isFalse

(testHarness, assertor)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

import java.util.Objects;

Expand All @@ -36,8 +35,6 @@ public abstract class AbstractSyncStateTopNFunction extends AbstractTopNFunction

private ValueState<Long> rankEndState;

protected long rankEnd;

public AbstractSyncStateTopNFunction(
StateTtlConfig ttlConfig,
InternalTypeInfo<RowData> inputRowType,
Expand Down Expand Up @@ -101,30 +98,4 @@ protected long initRankEnd(RowData row) throws Exception {
}
}
}

// ====== utility methods that omit the specified rank end ======

protected boolean isInRankEnd(long rank) {
return rank <= rankEnd;
}

protected boolean isInRankRange(long rank) {
return rank <= rankEnd && rank >= rankStart;
}

protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) {
collectInsert(out, inputRow, rank, rankEnd);
}

protected void collectDelete(Collector<RowData> out, RowData inputRow, long rank) {
collectDelete(out, inputRow, rank, rankEnd);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateAfter(out, inputRow, rank, rankEnd);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateBefore(out, inputRow, rank, rankEnd);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData,

protected final long rankStart;

protected long rankEnd;

// constant rank end
// if rank end is variable, this var is null
@Nullable protected final Long constantRankEnd;
Expand Down Expand Up @@ -228,6 +230,10 @@ protected void registerMetric(long heapSize, long requestCount, long hitCount) {
.<Long, Gauge<Long>>gauge("topn.cache.size", () -> heapSize);
}

protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) {
collectInsert(out, inputRow, rank, rankEnd);
}

protected void collectInsert(
Collector<RowData> out, RowData inputRow, long rank, long rankEnd) {
if (isInRankRange(rank, rankEnd)) {
Expand Down Expand Up @@ -282,6 +288,26 @@ protected boolean isInRankRange(long rank, long rankEnd) {
return rank <= rankEnd && rank >= rankStart;
}

public boolean isInRankEnd(long rank) {
return rank <= rankEnd;
}

protected boolean isInRankRange(long rank) {
return rank <= rankEnd && rank >= rankStart;
}

protected void collectDelete(Collector<RowData> out, RowData inputRow, long rank) {
collectDelete(out, inputRow, rank, rankEnd);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateAfter(out, inputRow, rank, rankEnd);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateBefore(out, inputRow, rank, rankEnd);
}

protected boolean hasOffset() {
// rank start is 1-based
return rankStart > 1;
Expand Down Expand Up @@ -343,6 +369,10 @@ protected void collectInsert(
topNFunction.collectInsert(out, inputRow, rank, rankEnd);
}

protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) {
topNFunction.collectInsert(out, inputRow, rank);
}

protected void collectInsert(Collector<RowData> out, RowData inputRow) {
topNFunction.collectInsert(out, inputRow);
}
Expand All @@ -361,6 +391,10 @@ protected void collectUpdateAfter(
topNFunction.collectUpdateAfter(out, inputRow, rank, rankEnd);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) {
topNFunction.collectUpdateAfter(out, inputRow, rank);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow) {
topNFunction.collectUpdateAfter(out, inputRow);
}
Expand All @@ -370,10 +404,18 @@ protected void collectUpdateBefore(
topNFunction.collectUpdateBefore(out, inputRow, rank, rankEnd);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) {
topNFunction.collectUpdateBefore(out, inputRow, rank);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow) {
topNFunction.collectUpdateBefore(out, inputRow);
}

protected boolean checkSortKeyInBufferRange(RowData sortKey, TopNBuffer buffer) {
return topNFunction.checkSortKeyInBufferRange(sortKey, buffer);
}

protected boolean isInRankEnd(long rank, long rankEnd) {
return rank <= rankEnd;
}
Expand Down
Loading

0 comments on commit f8c0e01

Please sign in to comment.