Skip to content

Commit

Permalink
Restructure flink connector for new internal APIs and reduce API surf…
Browse files Browse the repository at this point in the history
…ace.
  • Loading branch information
dpcollins-google committed Dec 26, 2022
1 parent b7fe040 commit bc07b9a
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 120 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<artifactId>google-cloud-pubsublite-flink</artifactId>
<version>0.1.0-SNAPSHOT</version>
<properties>
<flink.version>1.15.0</flink.version>
<flink.version>1.12.5</flink.version>
<pubsublite.version>${project.parent.version}</pubsublite.version>
</properties>
<build>
Expand Down Expand Up @@ -107,12 +107,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<artifactId>flink-runtime_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -144,13 +144,13 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<artifactId>flink-runtime_2.12</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public PubsubLiteSource(PubsubLiteSourceSettings<OutputT> settings) {

@Override
public Boundedness getBoundedness() {
return settings.boundedness();
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
Expand All @@ -68,7 +68,7 @@ public MetricGroup getMetricGroup() {

@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
return null;
}
});
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
Expand All @@ -91,8 +91,7 @@ public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> cr
assembler.newAdminClient(),
assembler.getCursorClientRemoveThis(),
assembler.getTopicPath(),
settings.subscriptionPath()),
settings.boundedness());
settings.subscriptionPath()));
}

@Override
Expand All @@ -108,7 +107,7 @@ public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> re
assigner.listSplits(),
assembler.newAdminClient(),
assembler.getCursorClientRemoveThis());
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery, settings.boundedness());
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import java.io.Serializable;
import org.apache.flink.api.connector.source.Boundedness;

@AutoValue
public abstract class PubsubLiteSourceSettings<OutputT> implements Serializable {
Expand All @@ -30,7 +29,6 @@ public static <OutputT> Builder<OutputT> builder(
PubsubLiteDeserializationSchema<OutputT> schema) {
return new AutoValue_PubsubLiteSourceSettings.Builder<OutputT>()
.setDeserializationSchema(schema)
.setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
.setTimestampSelector(MessageTimestampExtractor.publishTimeExtractor());
}

Expand All @@ -44,9 +42,6 @@ public static Builder<SequencedMessage> messagesBuilder() {
// Required
public abstract FlowControlSettings flowControlSettings();

// Optional
public abstract Boundedness boundedness();

// Optional
public abstract MessageTimestampExtractor timestampSelector();

Expand All @@ -61,9 +56,6 @@ public abstract static class Builder<OutputT> {
// Required
public abstract Builder<OutputT> setFlowControlSettings(FlowControlSettings settings);

// Optional
public abstract Builder<OutputT> setBoundedness(Boundedness value);

// Optional
public abstract Builder<OutputT> setTimestampSelector(MessageTimestampExtractor value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map.Entry;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
Expand All @@ -39,35 +38,23 @@ public class PubsubLiteSplitEnumerator
private final SplitEnumeratorContext<SubscriptionPartitionSplit> context;
private final PartitionAssigner assigner;
private final SplitDiscovery discovery;
private final Boundedness boundedness;

public PubsubLiteSplitEnumerator(
SplitEnumeratorContext<SubscriptionPartitionSplit> context,
PartitionAssigner assigner,
SplitDiscovery discovery,
Boundedness boundedness) {
SplitDiscovery discovery) {
this.context = context;
this.boundedness = boundedness;
this.assigner = assigner;
this.discovery = discovery;
}

@Override
public void start() {
switch (boundedness) {
case CONTINUOUS_UNBOUNDED:
this.context.callAsync(
this::discoverNewSplits,
this::handlePartitionSplitDiscovery,
0,
PARTITION_DISCOVERY_INTERVAL.toMillis());
break;
case BOUNDED:
if (assigner.listSplits().isEmpty()) {
this.context.callAsync(this::discoverNewSplits, this::handlePartitionSplitDiscovery);
}
break;
}
this.context.callAsync(
this::discoverNewSplits,
this::handlePartitionSplitDiscovery,
0,
PARTITION_DISCOVERY_INTERVAL.toMillis());
}

@Override
Expand All @@ -88,7 +75,7 @@ public void addReader(int subtaskId) {
}

@Override
public SplitEnumeratorCheckpoint snapshotState(long l) {
public SplitEnumeratorCheckpoint snapshotState() {
SplitEnumeratorCheckpoint.Builder builder = SplitEnumeratorCheckpoint.newBuilder();
builder.addAllAssignments(assigner.checkpoint());
builder.setDiscovery(discovery.checkpoint());
Expand Down Expand Up @@ -134,13 +121,5 @@ private void updateAssignmentsForRegisteredReaders() {
new SplitsAssignment<>(
assignment.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().value(), Entry::getValue))));

// If this is a bounded split enumerator, and we have discovered splits, inform any task which
// received an assignment that this assignment will be the last.
if (boundedness == Boundedness.BOUNDED && !assigner.listSplits().isEmpty()) {
for (TaskId task : assignment.keySet()) {
context.signalNoMoreSplits(task.value());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ public void testSinglePublishFailure() throws Exception {
verify(fakeInnerPublisher).publish(message1);

assertThrows(
CheckedApiException.class,
() -> messagePublisher.waitUntilNoOutstandingPublishes());
CheckedApiException.class, () -> messagePublisher.waitUntilNoOutstandingPublishes());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
import org.junit.Test;
Expand All @@ -55,16 +54,15 @@ static SubscriptionPartitionSplit makeSplit(Partition partition) {
return SubscriptionPartitionSplit.create(exampleSubscriptionPath(), partition, exampleOffset());
}

public PubsubLiteSplitEnumerator createEnumerator(Boundedness boundedness) {
return new PubsubLiteSplitEnumerator(testContext, assigner, discovery, boundedness);
public PubsubLiteSplitEnumerator createEnumerator() {
return new PubsubLiteSplitEnumerator(testContext, assigner, discovery);
}

static Multimap<Integer, SubscriptionPartitionSplit> toMap(
Map<Integer, SplitAssignmentState<SubscriptionPartitionSplit>> state) {
ImmutableListMultimap.Builder<Integer, SubscriptionPartitionSplit> builder =
ImmutableListMultimap.builder();
state.forEach(
(k, v) -> builder.putAll(k, v.getAssignedSplits()));
state.forEach((k, v) -> builder.putAll(k, v.getAssignedSplits()));
return builder.build();
}

Expand All @@ -86,7 +84,7 @@ static boolean allFinished(Map<Integer, SplitAssignmentState<SubscriptionPartiti
// all tasks which have been scheduled by the executor service and throws any exceptions they
// encountered.
public void throwAnyTaskExceptions() throws Exception {
for (ScheduledFuture<?> task : testContext.getExecutorService().getAllScheduledTasks()) {
for (ScheduledFuture<?> task : testContext.getExecutorService().getScheduledTasks()) {
if (task.isDone()) {
task.get();
}
Expand All @@ -95,21 +93,21 @@ public void throwAnyTaskExceptions() throws Exception {

@Test
public void testClose() throws Exception {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
enumerator.close();
verify(discovery).close();
}

@Test
public void testClose_Exception() throws Exception {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
doThrow(new Exception("error")).when(discovery).close();
assertThrows(IOException.class, enumerator::close);
}

@Test
public void testSplitDiscovery_EmptyPoll() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
enumerator.start();

when(discovery.discoverNewSplits()).thenReturn(ImmutableList.of());
Expand All @@ -120,7 +118,7 @@ public void testSplitDiscovery_EmptyPoll() {

@Test
public void testSplitDiscovery_InitialError() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
enumerator.start();

when(discovery.discoverNewSplits()).thenThrow(new RuntimeException("error"));
Expand All @@ -133,7 +131,7 @@ public void testSplitDiscovery_InitialError() {

@Test
public void testSplitDiscovery_ErrorAfterInitialAssignment() throws Exception {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
enumerator.start();

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));
Expand All @@ -151,17 +149,17 @@ public void testSplitDiscovery_ErrorAfterInitialAssignment() throws Exception {

@Test
public void testCheckpoint() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();

when(discovery.checkpoint()).thenReturn(Discovery.newBuilder().build());
SplitEnumeratorCheckpoint checkpoint = enumerator.snapshotState(0L);
SplitEnumeratorCheckpoint checkpoint = enumerator.snapshotState();
assertThat(checkpoint.getAssignmentsList()).containsExactlyElementsIn(assigner.checkpoint());
assertThat(checkpoint.getDiscovery()).isEqualTo(discovery.checkpoint());
}

@Test
public void testContinuousSplitDiscovery() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
enumerator.start();

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));
Expand All @@ -184,7 +182,7 @@ public void testContinuousSplitDiscovery() {

@Test
public void testContinuousSplitDiscovery_DelayedReaderRegistration() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
enumerator.start();

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));
Expand All @@ -206,7 +204,7 @@ public void testContinuousSplitDiscovery_DelayedReaderRegistration() {

@Test
public void testContinuousReaderRegistration() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();
enumerator.start();

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));
Expand All @@ -223,7 +221,7 @@ public void testContinuousReaderRegistration() {

@Test
public void testContinuousAddSplitBack() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.CONTINUOUS_UNBOUNDED);
PubsubLiteSplitEnumerator enumerator = createEnumerator();

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));

Expand All @@ -233,53 +231,4 @@ public void testContinuousAddSplitBack() {
assertThat(toMap(testContext.getSplitAssignments())).containsExactly(0, s0);
assertThat(anyFinished(testContext.getSplitAssignments())).isFalse();
}

@Test
public void testBoundedSplitDiscovery() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.BOUNDED);
enumerator.start();

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));

testContext.registerReader(0, "h0");
when(discovery.discoverNewSplits()).thenReturn(ImmutableList.of(s0));
testContext.triggerAllActions();

reset(discovery);
testContext.triggerAllActions();
verifyNoInteractions(discovery);

assertThat(toMap(testContext.getSplitAssignments())).containsExactly(0, s0);
assertThat(allFinished(testContext.getSplitAssignments())).isTrue();
}

@Test
public void testBoundedReaderRegistration() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.BOUNDED);
enumerator.start();

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));

when(discovery.discoverNewSplits()).thenReturn(ImmutableList.of(s0));
testContext.triggerAllActions();
assertThat(testContext.getSplitAssignments()).isEmpty();

testContext.registerReader(0, "h0");
enumerator.addReader(0);
assertThat(toMap(testContext.getSplitAssignments())).containsExactly(0, s0);
assertThat(allFinished(testContext.getSplitAssignments())).isTrue();
}

@Test
public void testBoundedAddSplitBack() {
PubsubLiteSplitEnumerator enumerator = createEnumerator(Boundedness.BOUNDED);

SubscriptionPartitionSplit s0 = makeSplit(Partition.of(0));

testContext.registerReader(0, "h0");
enumerator.addSplitsBack(ImmutableList.of(s0), 1);

assertThat(toMap(testContext.getSplitAssignments())).containsExactly(0, s0);
assertThat(allFinished(testContext.getSplitAssignments())).isTrue();
}
}
Loading

0 comments on commit bc07b9a

Please sign in to comment.