From b68d2b445de3762a9ffb4f8db3155ad80b440161 Mon Sep 17 00:00:00 2001
From: palmere-google <68394592+palmere-google@users.noreply.github.com>
Date: Fri, 23 Jul 2021 11:43:52 -0400
Subject: [PATCH] feat: Add SplitDiscovery for use in split enumerator (#14)

* SplitDiscovery

updated

* respond to comments

* test fix and formattiong

* fix comments
---
 .../SingleSubscriptionSplitDiscovery.java     | 121 ++++++++++++
 .../flink/enumerator/SplitDiscovery.java      |  27 +++
 .../SingleSubscriptionSplitDiscoveryTest.java | 178 ++++++++++++++++++
 3 files changed, 326 insertions(+)
 create mode 100644 src/main/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscovery.java
 create mode 100644 src/main/java/com/google/cloud/pubsublite/flink/enumerator/SplitDiscovery.java
 create mode 100644 src/test/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscoveryTest.java

diff --git a/src/main/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscovery.java b/src/main/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscovery.java
new file mode 100644
index 00000000..0fa3a6ac
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscovery.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.pubsublite.flink.enumerator;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.*;
+import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
+import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
+import com.google.cloud.pubsublite.internal.CursorClient;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class SingleSubscriptionSplitDiscovery implements SplitDiscovery {
+  private final AdminClient adminClient;
+  private final CursorClient cursorClient;
+  private final TopicPath topicPath;
+  private final SubscriptionPath subscriptionPath;
+  private long partitionCount;
+
+  private SingleSubscriptionSplitDiscovery(
+      AdminClient adminClient,
+      CursorClient cursorClient,
+      TopicPath topicPath,
+      SubscriptionPath subscriptionPath,
+      long partitionCount) {
+    this.adminClient = adminClient;
+    this.cursorClient = cursorClient;
+    this.topicPath = topicPath;
+    this.subscriptionPath = subscriptionPath;
+    this.partitionCount = partitionCount;
+  }
+
+  static SingleSubscriptionSplitDiscovery create(
+      AdminClient adminClient,
+      CursorClient cursorClient,
+      TopicPath topicPath,
+      SubscriptionPath subscriptionPath) {
+    return new SingleSubscriptionSplitDiscovery(
+        adminClient, cursorClient, topicPath, subscriptionPath, 0L);
+  }
+
+  static SingleSubscriptionSplitDiscovery fromCheckpoint(
+      SplitEnumeratorCheckpoint.Discovery proto,
+      Collection<SubscriptionPartitionSplit> currentSplits,
+      AdminClient adminClient,
+      CursorClient cursorClient) {
+    SubscriptionPath subscriptionPath = SubscriptionPath.parse(proto.getSubscription());
+    TopicPath topicPath = TopicPath.parse(proto.getTopic());
+    Set<Long> partitions = new TreeSet<>();
+    for (SubscriptionPartitionSplit s : currentSplits) {
+      if (!s.subscriptionPath().equals(subscriptionPath)) {
+        throw new IllegalStateException(
+            "Split discovery configured with subscription "
+                + subscriptionPath
+                + " but current splits contains a split from subscription "
+                + s);
+      }
+      partitions.add(s.partition().value());
+    }
+    long partitionCount = partitions.size();
+    for (long p = 0; p < partitions.size(); p++) {
+      if (!partitions.contains(p)) {
+        throw new IllegalStateException(
+            "Split set is not continuous, missing split for partition " + p + " " + currentSplits);
+      }
+    }
+    return new SingleSubscriptionSplitDiscovery(
+        adminClient, cursorClient, topicPath, subscriptionPath, partitionCount);
+  }
+
+  public synchronized List<SubscriptionPartitionSplit> discoverNewSplits() throws ApiException {
+    try {
+      List<SubscriptionPartitionSplit> newSplits = new ArrayList<>();
+      long newPartitionCount = adminClient.getTopicPartitionCount(topicPath).get();
+      if (newPartitionCount == partitionCount) {
+        return newSplits;
+      }
+      Map<Partition, Offset> cursorMap = cursorClient.listPartitionCursors(subscriptionPath).get();
+      for (long p = partitionCount; p < newPartitionCount; p++) {
+        Partition partition = Partition.of(p);
+        Offset offset = cursorMap.getOrDefault(partition, Offset.of(0));
+        newSplits.add(SubscriptionPartitionSplit.create(subscriptionPath, partition, offset));
+      }
+      partitionCount = newPartitionCount;
+      return newSplits;
+    } catch (Throwable t) {
+      throw ExtractStatus.toCanonical(t).underlying;
+    }
+  }
+
+  public synchronized SplitEnumeratorCheckpoint.Discovery checkpoint() {
+    return SplitEnumeratorCheckpoint.Discovery.newBuilder()
+        .setSubscription(subscriptionPath.toString())
+        .setTopic(topicPath.toString())
+        .build();
+  }
+
+  @Override
+  public synchronized void close() {
+    try (AdminClient a = adminClient;
+        CursorClient c = cursorClient) {}
+  }
+}
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/enumerator/SplitDiscovery.java b/src/main/java/com/google/cloud/pubsublite/flink/enumerator/SplitDiscovery.java
new file mode 100644
index 00000000..e6fd8d4e
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/flink/enumerator/SplitDiscovery.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.pubsublite.flink.enumerator;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
+import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
+import java.util.List;
+
+interface SplitDiscovery extends AutoCloseable {
+  List<SubscriptionPartitionSplit> discoverNewSplits() throws ApiException;
+
+  SplitEnumeratorCheckpoint.Discovery checkpoint();
+}
diff --git a/src/test/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscoveryTest.java b/src/test/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscoveryTest.java
new file mode 100644
index 00000000..635b81a9
--- /dev/null
+++ b/src/test/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscoveryTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.pubsublite.flink.enumerator;
+
+import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleSubscriptionPath;
+import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleTopicPath;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
+import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.CursorClient;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SingleSubscriptionSplitDiscoveryTest {
+
+  @Mock CursorClient mockCursorClient;
+  @Mock AdminClient mockAdminClient;
+
+  SplitDiscovery discovery;
+
+  @Before
+  public void setUp() {
+    discovery =
+        SingleSubscriptionSplitDiscovery.create(
+            mockAdminClient, mockCursorClient, exampleTopicPath(), exampleSubscriptionPath());
+  }
+
+  @Test
+  public void testDiscovery() {
+    when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
+        .thenReturn(ApiFutures.immediateFuture(2L));
+    when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
+        .thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
+    List<SubscriptionPartitionSplit> splits = discovery.discoverNewSplits();
+    assertThat(splits)
+        .containsExactly(
+            SubscriptionPartitionSplit.create(
+                exampleSubscriptionPath(), Partition.of(0), Offset.of(0)),
+            SubscriptionPartitionSplit.create(
+                exampleSubscriptionPath(), Partition.of(1), Offset.of(2)));
+  }
+
+  @Test
+  public void testDiscovery_Incremental() {
+    when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
+        .thenReturn(ApiFutures.immediateFuture(2L))
+        .thenReturn(ApiFutures.immediateFuture(3L))
+        .thenReturn(ApiFutures.immediateFuture(3L));
+    when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
+        .thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
+    assertThat(discovery.discoverNewSplits()).hasSize(2);
+    assertThat(discovery.discoverNewSplits()).hasSize(1);
+    assertThat(discovery.discoverNewSplits()).hasSize(0);
+  }
+
+  @Test
+  public void testDiscovery_AdminFailure() {
+    when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
+        .thenReturn(
+            ApiFutures.immediateFailedFuture(
+                new CheckedApiException("", StatusCode.Code.INTERNAL)));
+    assertThrows(ApiException.class, () -> discovery.discoverNewSplits());
+  }
+
+  @Test
+  public void testDiscovery_CursorFailure() {
+    when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
+        .thenReturn(ApiFutures.immediateFuture(2L));
+    when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
+        .thenReturn(
+            ApiFutures.immediateFailedFuture(
+                new CheckedApiException("", StatusCode.Code.INTERNAL)));
+    assertThrows(ApiException.class, () -> discovery.discoverNewSplits());
+  }
+
+  @Test
+  public void testCheckpoint() {
+    SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
+    assertThat(proto.getSubscription()).isEqualTo(exampleSubscriptionPath().toString());
+    assertThat(proto.getTopic()).isEqualTo(exampleTopicPath().toString());
+  }
+
+  @Test
+  public void testCheckpointRestore() {
+    SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
+
+    List<SubscriptionPartitionSplit> splits =
+        ImmutableList.of(
+            SubscriptionPartitionSplit.create(
+                exampleSubscriptionPath(), Partition.of(0), Offset.of(4)),
+            SubscriptionPartitionSplit.create(
+                exampleSubscriptionPath(), Partition.of(1), Offset.of(4)),
+            SubscriptionPartitionSplit.create(
+                exampleSubscriptionPath(), Partition.of(2), Offset.of(4)));
+    SplitDiscovery restored =
+        SingleSubscriptionSplitDiscovery.fromCheckpoint(
+            proto, splits, mockAdminClient, mockCursorClient);
+
+    when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
+        .thenReturn(ApiFutures.immediateFuture(4L));
+    when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
+        .thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(3), Offset.of(2))));
+    assertThat(restored.discoverNewSplits()).hasSize(1);
+  }
+
+  @Test
+  public void testCheckpointRestore_SubscriptionMismatch() {
+    SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
+
+    List<SubscriptionPartitionSplit> splits =
+        ImmutableList.of(
+            SubscriptionPartitionSplit.create(
+                SubscriptionPath.parse(exampleSubscriptionPath().toString() + "-other"),
+                Partition.of(0),
+                Offset.of(4)));
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          SingleSubscriptionSplitDiscovery.fromCheckpoint(
+              proto, splits, mockAdminClient, mockCursorClient);
+        });
+  }
+
+  @Test
+  public void testCheckpointRestore_NonContinuousPartitions() {
+    SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
+
+    List<SubscriptionPartitionSplit> splits =
+        ImmutableList.of(
+            SubscriptionPartitionSplit.create(
+                exampleSubscriptionPath(), Partition.of(1), Offset.of(4)));
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          SingleSubscriptionSplitDiscovery.fromCheckpoint(
+              proto, splits, mockAdminClient, mockCursorClient);
+        });
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    discovery.close();
+    verify(mockAdminClient).close();
+    verify(mockCursorClient).close();
+  }
+}