From 94d37423bea823b89a32f357a71fd032e067f4ad Mon Sep 17 00:00:00 2001 From: Xin Li Date: Fri, 11 Sep 2020 17:07:26 -0500 Subject: [PATCH] Revert "Fix the issue gRPC notify thread blocked (#965)" This reverts commit e3016470c11ca77dbbf2889bfee29273720986d9. Signed-off-by: Xin Li --- docs/configuration.md | 1 - .../grpc/controller/ConsensusController.java | 3 - .../grpc/listener/ListenerProperties.java | 5 -- .../grpc/listener/NotifyingTopicListener.java | 7 +-- .../listener/SharedPollingTopicListener.java | 3 +- .../controller/ConsensusControllerTest.java | 1 + .../AbstractSharedTopicListenerTest.java | 58 ------------------- .../listener/AbstractTopicListenerTest.java | 2 +- .../listener/NotifyingTopicListenerTest.java | 11 +--- .../SharedPollingTopicListenerTest.java | 2 +- 10 files changed, 9 insertions(+), 84 deletions(-) delete mode 100644 hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java diff --git a/docs/configuration.md b/docs/configuration.md index 42c5dee30a3..df9ae99ceb2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -153,7 +153,6 @@ value, it is recommended to only populate overridden properties in the custom `a | `hedera.mirror.grpc.endTimeInterval` | 30s | How often we should check if a subscription has gone past the end time | | `hedera.mirror.grpc.entityCacheSize` | 50000 | The maximum size of the cache to store entities used for existence check | | `hedera.mirror.grpc.listener.enabled` | true | Whether to listen for incoming massages or not | -| `hedera.mirror.grpc.listener.maxBufferSize` | 2048 | The maximum number of messages the notifying listener or the shared polling listener buffers before sending an error to a client | | `hedera.mirror.grpc.listener.maxPageSize` | 5000 | The maximum number of messages the listener can return in a single call to the database | | `hedera.mirror.grpc.listener.frequency` | 500ms | How often to poll or retry errors (varies by type). Can accept duration units like `50ms`, `10s`, etc. | | `hedera.mirror.grpc.listener.type` | NOTIFY | The type of listener to use for incoming messages. Accepts either NOTIFY, POLL or SHARED_POLL | diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/controller/ConsensusController.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/controller/ConsensusController.java index e34b9dd4860..0bce5669581 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/controller/ConsensusController.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/controller/ConsensusController.java @@ -31,7 +31,6 @@ import net.devh.boot.grpc.server.service.GrpcService; import org.springframework.dao.NonTransientDataAccessResourceException; import org.springframework.dao.TransientDataAccessException; -import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -57,7 +56,6 @@ public class ConsensusController extends ReactorConsensusServiceGrpc.ConsensusServiceImplBase { private static final String DB_ERROR = "Unable to connect to database. Please retry later"; - private static final String OVERFLOW_ERROR = "Client lags too much behind. Please retry later"; private final TopicMessageService topicMessageService; @@ -72,7 +70,6 @@ public Flux subscribeTopic(Mono req .onErrorMap(TimeoutException.class, e -> error(e, Status.RESOURCE_EXHAUSTED)) .onErrorMap(TopicNotFoundException.class, e -> error(e, Status.NOT_FOUND)) .onErrorMap(TransientDataAccessException.class, e -> error(e, Status.RESOURCE_EXHAUSTED)) - .onErrorMap(Exceptions::isOverflow, e -> error(e, Status.DEADLINE_EXCEEDED, OVERFLOW_ERROR)) .onErrorMap(t -> unknownError(t)); } diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/ListenerProperties.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/ListenerProperties.java index 534c4389f94..6bf4dd3960c 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/ListenerProperties.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/ListenerProperties.java @@ -21,7 +21,6 @@ */ import java.time.Duration; -import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; import lombok.Data; @@ -38,10 +37,6 @@ public class ListenerProperties { @Min(32) private int maxPageSize = 5000; - @Min(1024) - @Max(8192) - private int maxBufferSize = 2048; - @NotNull private Duration frequency = Duration.ofMillis(500L); diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/NotifyingTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/NotifyingTopicListener.java index af176678f20..3767b0d0fcf 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/NotifyingTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/NotifyingTopicListener.java @@ -47,7 +47,6 @@ public class NotifyingTopicListener implements TopicListener { private final ObjectMapper objectMapper; private final Flux topicMessages; private final PgChannel channel; - private final ListenerProperties listenerProperties; public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties listenerProperties) { this.objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); @@ -58,7 +57,6 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list .setPort(dbProperties.getPort()) .setUser(dbProperties.getUsername()); - this.listenerProperties = listenerProperties; Duration frequency = listenerProperties.getFrequency(); Vertx vertx = Vertx.vertx(); PgSubscriber subscriber = PgSubscriber.subscriber(vertx, connectOptions) @@ -91,8 +89,7 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list @Override public Flux listen(TopicMessageFilter filter) { return topicMessages.filter(t -> filterMessage(t, filter)) - .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) - .onBackpressureBuffer(listenerProperties.getMaxBufferSize()); + .doOnSubscribe(s -> log.info("Subscribing: {}", filter)); } private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) { @@ -104,7 +101,7 @@ private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) { private Flux listen() { EmitterProcessor emitterProcessor = EmitterProcessor.create(); FluxSink sink = emitterProcessor.sink().onDispose(this::unlisten); - channel.handler(sink::next); + channel.handler(json -> sink.next(json)); log.info("Listening for messages"); return emitterProcessor; } diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListener.java index 81e24a24fcd..8801fc895f8 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListener.java @@ -81,8 +81,7 @@ public SharedPollingTopicListener(ListenerProperties listenerProperties, @Override public Flux listen(TopicMessageFilter filter) { return poller.filter(t -> filterMessage(t, filter)) - .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) - .onBackpressureBuffer(listenerProperties.getMaxBufferSize()); + .doOnSubscribe(s -> log.info("Subscribing: {}", filter)); } private Flux poll(PollingContext context) { diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/controller/ConsensusControllerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/controller/ConsensusControllerTest.java index e8adbc8717d..69b915c971a 100644 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/controller/ConsensusControllerTest.java +++ b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/controller/ConsensusControllerTest.java @@ -33,6 +33,7 @@ import net.devh.boot.grpc.client.inject.GrpcClient; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java deleted file mode 100644 index c707339df5d..00000000000 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.hedera.mirror.grpc.listener; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.Duration; -import java.time.Instant; -import java.util.Vector; -import java.util.stream.Collectors; -import java.util.stream.LongStream; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import reactor.core.Exceptions; -import reactor.test.StepVerifier; - -import com.hedera.mirror.grpc.domain.TopicMessage; -import com.hedera.mirror.grpc.domain.TopicMessageFilter; - -public abstract class AbstractSharedTopicListenerTest extends AbstractTopicListenerTest { - - @Test - @DisplayName("slow subscriber receives overflow exception and normal subscriber is not affected") - void slowSubscriberOverflowException() { - int maxBufferSize = 16; - listenerProperties.setMaxBufferSize(maxBufferSize); - - TopicMessageFilter filter = TopicMessageFilter.builder() - .startTime(Instant.EPOCH) - .build(); - - // create a normal subscriber to keep the shared flux open - Vector sequenceNumbers = new Vector<>(); - var subscription = getTopicListener().listen(filter) - .map(TopicMessage::getSequenceNumber) - .subscribe(sequenceNumbers::add); - - // the slow subscriber - getTopicListener().listen(filter) - .map(TopicMessage::getSequenceNumber) - .as(p -> StepVerifier.create(p, 1)) // initial request amount - 1 - .thenRequest(1) // trigger subscription - .thenAwait(Duration.ofMillis(10L)) - .then(() -> { - // upon subscription, step verifier will request 2, so we need 2 + maxBufferSize + 1 to trigger - // overflow error - domainBuilder.topicMessages(maxBufferSize + 3, future).blockLast(); - }) - .expectNext(1L, 2L) - .thenAwait(Duration.ofMillis(500L)) // stall to overrun backpressure buffer - .thenRequest(Long.MAX_VALUE) - .expectNextSequence(LongStream.range(3, maxBufferSize + 3).boxed().collect(Collectors.toList())) - .expectErrorMatches(Exceptions::isOverflow) - .verify(Duration.ofMillis(600L)); - - assertThat(subscription.isDisposed()).isFalse(); - subscription.dispose(); - assertThat(sequenceNumbers).isEqualTo(LongStream.range(1, maxBufferSize + 4).boxed().collect(Collectors.toList())); - } -} diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractTopicListenerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractTopicListenerTest.java index b40c2b32b1a..983c2b2642f 100644 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractTopicListenerTest.java +++ b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractTopicListenerTest.java @@ -39,7 +39,7 @@ public abstract class AbstractTopicListenerTest extends GrpcIntegrationTest { - protected final Instant future = Instant.now().plusSeconds(30L); + protected final Instant future = Instant.now().plusSeconds(10L); @Resource protected DomainBuilder domainBuilder; diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/NotifyingTopicListenerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/NotifyingTopicListenerTest.java index 342a0562aae..b3f769beb55 100644 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/NotifyingTopicListenerTest.java +++ b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/NotifyingTopicListenerTest.java @@ -32,7 +32,7 @@ import com.hedera.mirror.grpc.domain.TopicMessage; import com.hedera.mirror.grpc.domain.TopicMessageFilter; -public class NotifyingTopicListenerTest extends AbstractSharedTopicListenerTest { +public class NotifyingTopicListenerTest extends AbstractTopicListenerTest { @Resource private NotifyingTopicListener topicListener; @@ -92,16 +92,13 @@ void json() { TopicMessageFilter filter = TopicMessageFilter.builder() .startTime(Instant.EPOCH) - .topicNum(1001) .build(); topicListener.listen(filter) .as(StepVerifier::create) .thenAwait(Duration.ofMillis(50)) .then(() -> jdbcTemplate.execute("NOTIFY topic_message, '" + json + "'")) - .expectNext(topicMessage) - .thenCancel() - .verify(Duration.ofMillis(500L)); + .expectNext(topicMessage); } @Test @@ -115,8 +112,6 @@ void jsonError() { .as(StepVerifier::create) .thenAwait(Duration.ofMillis(50)) .then(() -> jdbcTemplate.execute("NOTIFY topic_message, 'invalid'")) - .expectNoEvent(Duration.ofMillis(500L)) - .thenCancel() - .verify(Duration.ofMillis(600L)); + .expectNoEvent(Duration.ofMillis(500L)); } } diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListenerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListenerTest.java index 8922c69295e..41f9ef7f418 100644 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListenerTest.java +++ b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/SharedPollingTopicListenerTest.java @@ -22,7 +22,7 @@ import javax.annotation.Resource; -public class SharedPollingTopicListenerTest extends AbstractSharedTopicListenerTest { +public class SharedPollingTopicListenerTest extends AbstractTopicListenerTest { @Resource private SharedPollingTopicListener topicListener;