From 69fb64a42e89d2c85d36064494948f3fd8676d9a Mon Sep 17 00:00:00 2001 From: palmere-google <68394592+palmere-google@users.noreply.github.com> Date: Tue, 27 Jul 2021 13:00:00 -0400 Subject: [PATCH] feat: Add a publisher which can wait for outstanding messages (#19) * message publisher add test remove weird import * tests * headers * address comments * assorted improvements * simpler * respond to comments * formatting * comments --- .../flink/sink/BulkWaitPublisher.java | 26 +++++ .../flink/sink/MessagePublisher.java | 53 +++++++++ .../flink/sink/MessagePublisherTest.java | 108 ++++++++++++++++++ 3 files changed, 187 insertions(+) create mode 100644 src/main/java/com/google/cloud/pubsublite/flink/sink/BulkWaitPublisher.java create mode 100644 src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java create mode 100644 src/test/java/com/google/cloud/pubsublite/flink/sink/MessagePublisherTest.java diff --git a/src/main/java/com/google/cloud/pubsublite/flink/sink/BulkWaitPublisher.java b/src/main/java/com/google/cloud/pubsublite/flink/sink/BulkWaitPublisher.java new file mode 100644 index 00000000..8c07f9ce --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/flink/sink/BulkWaitPublisher.java @@ -0,0 +1,26 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.pubsublite.flink.sink; + +import com.google.cloud.pubsublite.internal.CheckedApiException; + +// Thread-compatible. +public interface BulkWaitPublisher { + + void publish(T message); + + void waitUntilNoOutstandingPublishes() throws CheckedApiException; +} diff --git a/src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java b/src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java new file mode 100644 index 00000000..f6c8cc20 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.pubsublite.flink.sink; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.Publisher; +import java.util.ArrayList; +import java.util.List; + +public class MessagePublisher implements BulkWaitPublisher { + private final Publisher publisher; + private final List> publishes; + + public MessagePublisher(Publisher publisher) { + this.publisher = publisher; + this.publishes = new ArrayList<>(); + this.publisher.startAsync(); + this.publisher.awaitRunning(); + } + + @Override + public void publish(Message message) { + publishes.add(publisher.publish(message)); + } + + @Override + public void waitUntilNoOutstandingPublishes() throws CheckedApiException { + try { + ApiFutures.allAsList(publishes).get(); + publishes.clear(); + } catch (Exception e) { + throw ExtractStatus.toCanonical(e); + } + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/flink/sink/MessagePublisherTest.java b/src/test/java/com/google/cloud/pubsublite/flink/sink/MessagePublisherTest.java new file mode 100644 index 00000000..04e4094a --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/flink/sink/MessagePublisherTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.pubsublite.flink.sink; + +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleOffset; +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.examplePartition; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class MessagePublisherTest { + abstract static class FakePublisher extends FakeApiService + implements Publisher {} + + @Spy FakePublisher fakeInnerPublisher; + MessagePublisher messagePublisher; + + @Before + public void setUp() { + messagePublisher = new MessagePublisher(fakeInnerPublisher); + } + + @Test + public void testPublish() throws Exception { + Message message1 = Message.builder().build(); + when(fakeInnerPublisher.publish(message1)) + .thenReturn( + ApiFutures.immediateFuture(MessageMetadata.of(examplePartition(), exampleOffset()))); + + messagePublisher.publish(message1); + + messagePublisher.waitUntilNoOutstandingPublishes(); + + verify(fakeInnerPublisher).publish(message1); + } + + @Test + public void testSinglePublishFailure() throws Exception { + Message message1 = Message.builder().build(); + when(fakeInnerPublisher.publish(message1)) + .thenReturn( + ApiFutures.immediateFailedFuture(new CheckedApiException(Code.INTERNAL).underlying)); + messagePublisher.publish(message1); + verify(fakeInnerPublisher).publish(message1); + + assertThrows( + CheckedApiException.class, + () -> { + messagePublisher.waitUntilNoOutstandingPublishes(); + }); + } + + @Test + public void testCheckpointWithOutstandingPublish() throws Exception { + Message message1 = Message.builder().build(); + SettableApiFuture future = SettableApiFuture.create(); + when(fakeInnerPublisher.publish(message1)).thenReturn(future); + messagePublisher.publish(message1); + verify(fakeInnerPublisher).publish(message1); + + Future checkpointFuture = + Executors.newSingleThreadExecutor() + .submit( + () -> { + try { + messagePublisher.waitUntilNoOutstandingPublishes(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + // Sleep for a short time so that the checkpoint could complete if it wasn't properly waiting. + Thread.sleep(50); + assertThat(checkpointFuture.isDone()).isFalse(); + future.set(MessageMetadata.of(examplePartition(), exampleOffset())); + checkpointFuture.get(); + } +}