Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SplitDiscovery for use in split enumerator #14

Merged
merged 4 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class SingleSubscriptionSplitDiscovery implements SplitDiscovery {
private final AdminClient adminClient;
private final CursorClient cursorClient;
private final TopicPath topicPath;
private final SubscriptionPath subscriptionPath;
private long partitionCount;

private SingleSubscriptionSplitDiscovery(
AdminClient adminClient,
CursorClient cursorClient,
TopicPath topicPath,
SubscriptionPath subscriptionPath,
long partitionCount) {
this.adminClient = adminClient;
this.cursorClient = cursorClient;
this.topicPath = topicPath;
this.subscriptionPath = subscriptionPath;
this.partitionCount = partitionCount;
}

static SingleSubscriptionSplitDiscovery create(
AdminClient adminClient,
CursorClient cursorClient,
TopicPath topicPath,
SubscriptionPath subscriptionPath) {
return new SingleSubscriptionSplitDiscovery(
adminClient, cursorClient, topicPath, subscriptionPath, 0L);
}

static SingleSubscriptionSplitDiscovery fromCheckpoint(
SplitEnumeratorCheckpoint.Discovery proto,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just put the saved partition count in this proto directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we can't. Checkpointing can be interleaved between split discovery and when the splits are actually turned over to the assigner. If we checkpoint the partition count, we might lose some splits if a checkpoint is taken after the splits are discovered but before they're given to the assigner.

Collection<SubscriptionPartitionSplit> currentSplits,
AdminClient adminClient,
CursorClient cursorClient) {
SubscriptionPath subscriptionPath = SubscriptionPath.parse(proto.getSubscription());
TopicPath topicPath = TopicPath.parse(proto.getTopic());
long partitionCount =
currentSplits.stream()
.filter(s -> s.subscriptionPath().equals(subscriptionPath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue you should fail if any of the subscription paths are wrong, but this is a nit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with that, done

.mapToLong(a -> a.partition().value())
.max()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong. the max existing partition number != the count. partitions are 0 indexed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch. Fixed this and the test

.orElse(0);
return new SingleSubscriptionSplitDiscovery(
adminClient, cursorClient, topicPath, subscriptionPath, partitionCount);
}

public List<SubscriptionPartitionSplit> discoverSplits() throws ApiException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do methods on this class need to be synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so -- discover splits is the only method which modifies state and it's never called concurrently, but at the same time, there's no performance consideration or anything, so I just made them synchronized

List<SubscriptionPartitionSplit> newSplits = new ArrayList<>();
long newPartitionCount;
try {
newPartitionCount = adminClient.getTopicPartitionCount(topicPath).get();
} catch (ExecutionException | InterruptedException e) {
palmere-google marked this conversation as resolved.
Show resolved Hide resolved
throw ExtractStatus.toCanonical(e).underlying;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. I'd just use a try{ block around the whole function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (newPartitionCount == partitionCount) {
return newSplits;
}
Map<Partition, Offset> cursorMap;
try {
cursorMap = cursorClient.listPartitionCursors(subscriptionPath).get();
} catch (ExecutionException | InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwable t

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

throw ExtractStatus.toCanonical(e).underlying;
}
for (long p = partitionCount; p < newPartitionCount; p++) {
Partition partition = Partition.of(p);
Offset offset = cursorMap.getOrDefault(partition, Offset.of(0));
newSplits.add(SubscriptionPartitionSplit.create(subscriptionPath, partition, offset));
}
partitionCount = newPartitionCount;
return newSplits;
}

public SplitEnumeratorCheckpoint.Discovery checkpoint() {
return SplitEnumeratorCheckpoint.Discovery.newBuilder()
.setSubscription(subscriptionPath.toString())
.setTopic(topicPath.toString())
.build();
}

@Override
public void close() {
try (AdminClient a = adminClient;
CursorClient c = cursorClient) {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import java.util.List;

interface SplitDiscovery extends AutoCloseable {
List<SubscriptionPartitionSplit> discoverSplits() throws ApiException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. discoverNewSplits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


SplitEnumeratorCheckpoint.Discovery checkpoint();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleSubscriptionPath;
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleTopicPath;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SingleSubscriptionSplitDiscoveryTest {

@Mock CursorClient mockCursorClient;
@Mock AdminClient mockAdminClient;

SplitDiscovery discovery;

@Before
public void setUp() {
discovery =
SingleSubscriptionSplitDiscovery.create(
mockAdminClient, mockCursorClient, exampleTopicPath(), exampleSubscriptionPath());
}

@Test
public void testDiscovery() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(2L));
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
List<SubscriptionPartitionSplit> splits = discovery.discoverSplits();
assertThat(splits)
.containsExactly(
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(0), Offset.of(0)),
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(1), Offset.of(2)));
}

@Test
public void testDiscovery_Incremental() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(2L))
.thenReturn(ApiFutures.immediateFuture(3L))
.thenReturn(ApiFutures.immediateFuture(3L));
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
assertThat(discovery.discoverSplits()).hasSize(2);
assertThat(discovery.discoverSplits()).hasSize(1);
assertThat(discovery.discoverSplits()).hasSize(0);
}

@Test
public void testDiscovery_AdminFailure() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException("", StatusCode.Code.INTERNAL)));
assertThrows(ApiException.class, () -> discovery.discoverSplits());
}

@Test
public void testDiscovery_CursorFailure() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(2L));
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException("", StatusCode.Code.INTERNAL)));
assertThrows(ApiException.class, () -> discovery.discoverSplits());
}

@Test
public void testCheckpoint() {
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
assertThat(proto.getSubscription()).isEqualTo(exampleSubscriptionPath().toString());
assertThat(proto.getTopic()).isEqualTo(exampleTopicPath().toString());
}

@Test
public void testCheckpointRestore() {
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();

List<SubscriptionPartitionSplit> splits =
ImmutableList.of(
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(2), Offset.of(4)));
SplitDiscovery restored =
SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient);

when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(2L));
assertThat(restored.discoverSplits()).isEmpty();
}

@Test
public void testClose() throws Exception {
discovery.close();
verify(mockAdminClient).close();
verify(mockCursorClient).close();
}
}