Skip to content

Commit

Permalink
[FLINK-36849][table] Introduce AppendOnlyFirstNFunction in Rank with …
Browse files Browse the repository at this point in the history
…Async State API
  • Loading branch information
Au-Miner committed Feb 24, 2025
1 parent 4091e85 commit dcc1605
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AbstractAsyncStateTopNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyFirstNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -247,16 +248,29 @@ protected Transformation<RowData> translateToPlanInternal(
if (sortFields.length == 1
&& TypeCheckUtils.isProcTime(inputType.getChildren().get(sortFields[0]))
&& sortSpec.getFieldSpec(0).getIsAscendingOrder()) {
processFunction =
new AppendOnlyFirstNFunction(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
if (isAsyncStateEnabled) {
processFunction =
new AsyncStateAppendOnlyFirstNFunction(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
} else {
processFunction =
new AppendOnlyFirstNFunction(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
}
} else if (RankUtil.isTop1(rankRange)) {
if (isAsyncStateEnabled) {
processFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyFirstNHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
Expand All @@ -45,6 +46,8 @@ public class AppendOnlyFirstNFunction extends AbstractSyncStateTopNFunction {
// state stores a counter to record the occurrence of key.
private ValueState<Integer> state;

private transient AppendOnlyFirstNHelper helper;

public AppendOnlyFirstNFunction(
StateTtlConfig ttlConfig,
InternalTypeInfo<RowData> inputRowType,
Expand Down Expand Up @@ -74,6 +77,8 @@ public void open(OpenContext openContext) throws Exception {
stateDesc.enableTimeToLive(ttlConfig);
}
state = getRuntimeContext().getState(stateDesc);

helper = new SyncStateAppendOnlyFirstNHelper();
}

@Override
Expand All @@ -91,15 +96,17 @@ public void processElement(RowData input, Context context, Collector<RowData> ou
currentRank++;
state.update(currentRank);

if (outputRankNumber || hasOffset()) {
collectInsert(out, input, currentRank);
} else {
collectInsert(out, input);
}
helper.sendData(hasOffset(), out, input, currentRank, rankEnd);
}

private int getCurrentRank() throws IOException {
Integer value = state.value();
return value == null ? 0 : value;
}

private class SyncStateAppendOnlyFirstNHelper extends AppendOnlyFirstNHelper {
public SyncStateAppendOnlyFirstNHelper() {
super(AppendOnlyFirstNFunction.this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.rank.async;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.AppendOnlyFirstNFunction;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyFirstNHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A variant of {@link AppendOnlyFirstNFunction} to handle first-n case.
*
* <p>The input stream should only contain INSERT messages.
*/
public class AsyncStateAppendOnlyFirstNFunction extends AbstractAsyncStateTopNFunction {

private static final long serialVersionUID = -889227691088906247L;

// state stores a counter to record the occurrence of key.
private ValueState<Integer> state;

private transient AsyncStateAppendOnlyFirstNHelper helper;

public AsyncStateAppendOnlyFirstNFunction(
StateTtlConfig ttlConfig,
InternalTypeInfo<RowData> inputRowType,
GeneratedRecordComparator sortKeyGeneratedRecordComparator,
RowDataKeySelector sortKeySelector,
RankType rankType,
RankRange rankRange,
boolean generateUpdateBefore,
boolean outputRankNumber) {
super(
ttlConfig,
inputRowType,
sortKeyGeneratedRecordComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
}

@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
ValueStateDescriptor<Integer> stateDesc =
new ValueStateDescriptor<>("counterState", Types.INT);
if (ttlConfig.isEnabled()) {
stateDesc.enableTimeToLive(ttlConfig);
}
state = ((StreamingRuntimeContext) getRuntimeContext()).getValueState(stateDesc);

helper = new AsyncStateAppendOnlyFirstNHelper();
}

@Override
public void processElement(RowData input, Context context, Collector<RowData> out)
throws Exception {
StateFuture<Long> rankEndFuture = initRankEnd(input);


// Ensure the message is an insert-only operation.
Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
AtomicLong currentRank = new AtomicLong(getCurrentRank());
// Ignore record if it does not belong to the first-n rows
rankEndFuture.thenCompose(
rankEnd -> {
if (currentRank.get() >= rankEnd) {
return null;
}
currentRank.set(currentRank.get() + 1);
StateFuture<Void> updateFuture = state.asyncUpdate((int) currentRank.get());

return updateFuture.thenAccept(
VOID -> {
helper.sendData(
hasOffset(), out, input, currentRank.get(), rankEnd);
});
});
}

private int getCurrentRank() throws IOException {
Integer value = state.value();
return value == null ? 0 : value;
}

private class AsyncStateAppendOnlyFirstNHelper extends AppendOnlyFirstNHelper {
public AsyncStateAppendOnlyFirstNHelper() {
super(AsyncStateAppendOnlyFirstNFunction.this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.rank.utils;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.AppendOnlyFirstNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyFirstNFunction;
import org.apache.flink.util.Collector;

/**
* A helper to help do the logic 'Top-1' in {@link AppendOnlyFirstNFunction} and {@link
* AsyncStateAppendOnlyFirstNFunction}.
*/
public class AppendOnlyFirstNHelper extends AbstractTopNFunction.AbstractTopNHelper {
public AppendOnlyFirstNHelper(AbstractTopNFunction topNFunction) {
super(topNFunction);
}

public void sendData(
boolean hasOffset, Collector<RowData> out, RowData inputRow, long rank, long rankEnd) {
if (outputRankNumber || hasOffset) {
collectInsert(out, inputRow, rank, rankEnd);
} else {
collectInsert(out, inputRow);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;

import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyFirstNFunction;

import org.junit.jupiter.api.TestTemplate;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;

/** Tests for {@link AppendOnlyFirstNFunction}. */
/** Tests for {@link AppendOnlyFirstNFunction} and {@link AsyncStateAppendOnlyFirstNFunction}. */
class AppendOnlyFirstNFunctionTest extends TopNFunctionTestBase {

@Override
Expand All @@ -38,20 +40,32 @@ AbstractTopNFunction createFunction(
boolean generateUpdateBefore,
boolean outputRankNumber,
boolean enableAsyncState) {
return new AppendOnlyFirstNFunction(
ttlConfig,
inputRowType,
generatedSortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
if (enableAsyncState) {
return new AsyncStateAppendOnlyFirstNFunction(
ttlConfig,
inputRowType,
generatedSortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
} else {
return new AppendOnlyFirstNFunction(
ttlConfig,
inputRowType,
generatedSortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
}
}

@Override
boolean supportedAsyncState() {
return false;
return true;
}

@Override
Expand Down

0 comments on commit dcc1605

Please sign in to comment.