Skip to content

Commit

Permalink
fix: Upgrade flink connector to version 1.15.0, equivalent to datapro…
Browse files Browse the repository at this point in the history
…c 2.1 (#176)
  • Loading branch information
dpcollins-google authored Jan 20, 2023
1 parent 2d287cb commit b80a462
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<artifactId>google-cloud-pubsublite-flink</artifactId>
<version>0.1.0-SNAPSHOT</version>
<properties>
<flink.version>1.12.5</flink.version>
<flink.version>1.15.0</flink.version>
<pubsublite.version>${project.parent.version}</pubsublite.version>
</properties>
<build>
Expand Down Expand Up @@ -107,12 +107,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.12</artifactId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -144,13 +144,13 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.12</artifactId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void addReader(int subtaskId) {
}

@Override
public SplitEnumeratorCheckpoint snapshotState() {
public SplitEnumeratorCheckpoint snapshotState(long checkpointId) {
SplitEnumeratorCheckpoint.Builder builder = SplitEnumeratorCheckpoint.newBuilder();
builder.addAllAssignments(assigner.checkpoint());
builder.setDiscovery(discovery.checkpoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static boolean allFinished(Map<Integer, SplitAssignmentState<SubscriptionPartiti
// all tasks which have been scheduled by the executor service and throws any exceptions they
// encountered.
public void throwAnyTaskExceptions() throws Exception {
for (ScheduledFuture<?> task : testContext.getExecutorService().getScheduledTasks()) {
for (ScheduledFuture<?> task : testContext.getExecutorService().getAllScheduledTasks()) {
if (task.isDone()) {
task.get();
}
Expand Down Expand Up @@ -150,7 +150,7 @@ public void testCheckpoint() {
PubsubLiteSplitEnumerator enumerator = createEnumerator();

when(discovery.checkpoint()).thenReturn(Discovery.newBuilder().build());
SplitEnumeratorCheckpoint checkpoint = enumerator.snapshotState();
SplitEnumeratorCheckpoint checkpoint = enumerator.snapshotState(123L);
assertThat(checkpoint.getAssignmentsList()).containsExactlyElementsIn(assigner.checkpoint());
assertThat(checkpoint.getDiscovery()).isEqualTo(discovery.checkpoint());
}
Expand Down

0 comments on commit b80a462

Please sign in to comment.