Skip to content

Commit

Permalink
Restructure flink connector for new internal APIs and reduce API surf…
Browse files Browse the repository at this point in the history
…ace.
  • Loading branch information
dpcollins-google committed Dec 25, 2022
1 parent 801a9d8 commit b7fe040
Show file tree
Hide file tree
Showing 16 changed files with 36 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@

public interface MessageTimestampExtractor extends Serializable {
static MessageTimestampExtractor publishTimeExtractor() {
return (MessageTimestampExtractor)
m -> Timestamp.fromProto(m.publishTime()).toDate().toInstant();
return m -> Timestamp.fromProto(m.publishTime()).toDate().toInstant();
}

static MessageTimestampExtractor eventTimeExtractor() {
return (MessageTimestampExtractor)
m -> {
if (m.message().eventTime().isPresent()) {
return Timestamp.fromProto(m.message().eventTime().get()).toDate().toInstant();
}
return Timestamp.fromProto(m.publishTime()).toDate().toInstant();
};
return m -> {
if (m.message().eventTime().isPresent()) {
return Timestamp.fromProto(m.message().eventTime().get()).toDate().toInstant();
}
return Timestamp.fromProto(m.publishTime()).toDate().toInstant();
};
}

Instant timestamp(SequencedMessage m) throws Exception;
Instant timestamp(SequencedMessage m);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.pubsublite.flink.internal.sink.SerializingPublisher;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Instant;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
Expand All @@ -41,8 +42,7 @@ public PubsubLiteSink(PubsubLiteSinkSettings<T> settings) {
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext)
throws Exception {}
public void initializeState(FunctionInitializationContext functionInitializationContext) {}

@Override
public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext)
Expand All @@ -62,6 +62,11 @@ public synchronized void invoke(T value, Context context) throws Exception {
@Override
public synchronized void open(Configuration parameters) throws Exception {
super.open(parameters);
settings
.serializationSchema()
.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
publisher =
new SerializingPublisher<>(
new MessagePublisher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public UserCodeClassLoader getUserCodeClassLoader() {
});
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
return new PubsubLiteSourceReader<>(
new PubsubLiteRecordEmitter<OutputT>(),
new PubsubLiteRecordEmitter<>(),
assembler.getCursorClientRemoveThis(),
assembler.getSplitReaderSupplier(),
new Configuration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package com.google.cloud.pubsublite.flink.internal.sink;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.flink.PubsubLiteSinkSettings;
import com.google.cloud.pubsublite.internal.Publisher;
Expand All @@ -27,10 +24,6 @@ public class PerServerPublisherCache {
private static final PublisherCache<PubsubLiteSinkSettings<?>> cache =
new PublisherCache<>(PerServerPublisherCache::newPublisher);

private static AdminClient getAdminClient(CloudRegion region) {
return AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build());
}

private static Publisher<MessageMetadata> newPublisher(PubsubLiteSinkSettings<?> options) {
SinkAssembler<?> assembler = new SinkAssembler<>(options);
return assembler.newPublisher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public int getVersion() {
}

@Override
public byte[] serialize(SplitEnumeratorCheckpoint message) throws IOException {
public byte[] serialize(SplitEnumeratorCheckpoint message) {
return message.toByteArray();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public synchronized void notifySplitFinished(Collection<SubscriptionPartitionSpl
}

@Override
public void close() throws Exception {
public void close() {
cursorCommitter.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public int getVersion() {
}

@Override
public byte[] serialize(SubscriptionPartitionSplit subscriptionPartitionSplit)
throws IOException {
public byte[] serialize(SubscriptionPartitionSplit subscriptionPartitionSplit) {
return subscriptionPartitionSplit.toProto().toByteArray();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.google.cloud.pubsublite.Offset;

public final class SubscriptionPartitionSplitState {
SubscriptionPartitionSplit split;
final SubscriptionPartitionSplit split;
Offset current;

public SubscriptionPartitionSplitState(SubscriptionPartitionSplit split) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ public void createResources() throws Exception {
TopicPath.newBuilder()
.setLocation(ZONE)
.setProject(PROJECT)
.setName(TopicName.of("flink-integration-test-topic-" + uuid.toString()))
.setName(TopicName.of("flink-integration-test-topic-" + uuid))
.build();
subscriptionPath =
SubscriptionPath.newBuilder()
.setLocation(ZONE)
.setProject(PROJECT)
.setName(SubscriptionName.of("flink-test-sub-" + uuid.toString()))
.setName(SubscriptionName.of("flink-test-sub-" + uuid))
.build();

Topic topic =
Expand Down Expand Up @@ -308,11 +308,12 @@ public void testSinkWithFailure() throws Exception {
// A testing sink which stores messages in a static map to prevent them from being lost when
// the sink is serialized.
private static class CollectSink implements SinkFunction<String>, Serializable {
private static final long serialVersionUID = 32840981723478L;
// Note: doesn't store duplicates.
private static final Set<String> collector = Collections.synchronizedSet(new HashSet<>());

@Override
public void invoke(String value) throws Exception {
public void invoke(String value, Context context) {
collector.add(value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public void testSinglePublishFailure() throws Exception {

assertThrows(
CheckedApiException.class,
() -> {
messagePublisher.waitUntilNoOutstandingPublishes();
});
() -> messagePublisher.waitUntilNoOutstandingPublishes());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@

@RunWith(MockitoJUnitRunner.class)
public class PubsubLiteSplitEnumeratorTest {
TestingSplitEnumeratorContext<SubscriptionPartitionSplit> testContext =
private final TestingSplitEnumeratorContext<SubscriptionPartitionSplit> testContext =
new TestingSplitEnumeratorContext<>(2);
@Mock SplitDiscovery discovery;
PartitionAssigner assigner = UniformPartitionAssigner.create();
private final PartitionAssigner assigner = UniformPartitionAssigner.create();

static SubscriptionPartitionSplit makeSplit(Partition partition) {
return SubscriptionPartitionSplit.create(exampleSubscriptionPath(), partition, exampleOffset());
Expand All @@ -64,9 +64,7 @@ static Multimap<Integer, SubscriptionPartitionSplit> toMap(
ImmutableListMultimap.Builder<Integer, SubscriptionPartitionSplit> builder =
ImmutableListMultimap.builder();
state.forEach(
(k, v) -> {
builder.putAll(k, v.getAssignedSplits());
});
(k, v) -> builder.putAll(k, v.getAssignedSplits()));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,8 @@ public void testCheckpointRestore_SubscriptionMismatch() {
Offset.of(4)));
assertThrows(
IllegalStateException.class,
() -> {
SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient);
});
() -> SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient));
}

@Test
Expand All @@ -163,10 +161,8 @@ public void testCheckpointRestore_NonContinuousPartitions() {
exampleSubscriptionPath(), Partition.of(1), Offset.of(4)));
assertThrows(
IllegalStateException.class,
() -> {
SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient);
});
() -> SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public void testListSplits() {
public void testInvalidParallelism() {
assertThrows(
IllegalArgumentException.class,
() -> {
assigner.assignSplitsForTasks(ImmutableList.of(), 0);
});
() -> assigner.assignSplitsForTasks(ImmutableList.of(), 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testIntermediateCheckpointSkipped() {
}

@Test
public void testClose() throws Exception {
public void testClose() {
cursorCommitter.close();
verify(mockCursorClient).close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void testTimestampFailure() throws Exception {
SequencedMessage message1 = messageFromOffset(Offset.of(10));

when(mockDeserializationSchema.deserialize(message1)).thenReturn("one");
when(mockTimestampExtractor.timestamp(message1)).thenThrow(new Exception(""));
when(mockTimestampExtractor.timestamp(message1)).thenThrow(new IllegalStateException(""));

RecordsBySplits.Builder<SequencedMessage> records = new RecordsBySplits.Builder<>();
records.add("1", message1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class PubsubLiteSourceReaderTest {
@Mock(answer = RETURNS_DEEP_STUBS)
SourceReaderContext mockContext;

TestingReaderOutput<String> output = new TestingReaderOutput<>();
private final TestingReaderOutput<String> output = new TestingReaderOutput<>();
SourceReader<String, SubscriptionPartitionSplit> reader;

public static BlockingPullSubscriber subscriberFromIntegers(int... messages) {
Expand Down

0 comments on commit b7fe040

Please sign in to comment.