Skip to content

Commit

Permalink
Fix the issue gRPC notify thread blocked (#965)
Browse files Browse the repository at this point in the history
Add backpressure buffer with ERROR strategy to notifying and shared polling topic listeners. When overflow happens, the controller will send a gRPC error after all buffered messages and disconnect the client.

Signed-off-by: Xin Li <[email protected]>
  • Loading branch information
xin-hedera authored Aug 19, 2020
1 parent c252dbb commit e301647
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.grpc.endTimeInterval` | 30s | How often we should check if a subscription has gone past the end time |
| `hedera.mirror.grpc.entityCacheSize` | 50000 | The maximum size of the cache to store entities used for existence check |
| `hedera.mirror.grpc.listener.enabled` | true | Whether to listen for incoming massages or not |
| `hedera.mirror.grpc.listener.maxBufferSize` | 2048 | The maximum number of messages the notifying listener or the shared polling listener buffers before sending an error to a client |
| `hedera.mirror.grpc.listener.maxPageSize` | 5000 | The maximum number of messages the listener can return in a single call to the database |
| `hedera.mirror.grpc.listener.frequency` | 500ms | How often to poll or retry errors (varies by type). Can accept duration units like `50ms`, `10s`, etc. |
| `hedera.mirror.grpc.listener.type` | NOTIFY | The type of listener to use for incoming messages. Accepts either NOTIFY, POLL or SHARED_POLL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.dao.NonTransientDataAccessResourceException;
import org.springframework.dao.TransientDataAccessException;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -56,6 +57,7 @@
public class ConsensusController extends ReactorConsensusServiceGrpc.ConsensusServiceImplBase {

private static final String DB_ERROR = "Unable to connect to database. Please retry later";
private static final String OVERFLOW_ERROR = "Client lags too much behind. Please retry later";

private final TopicMessageService topicMessageService;

Expand All @@ -70,6 +72,7 @@ public Flux<ConsensusTopicResponse> subscribeTopic(Mono<ConsensusTopicQuery> req
.onErrorMap(TimeoutException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(TopicNotFoundException.class, e -> error(e, Status.NOT_FOUND))
.onErrorMap(TransientDataAccessException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(Exceptions::isOverflow, e -> error(e, Status.DEADLINE_EXCEEDED, OVERFLOW_ERROR))
.onErrorMap(t -> unknownError(t));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import java.time.Duration;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import lombok.Data;
Expand All @@ -37,6 +38,10 @@ public class ListenerProperties {
@Min(32)
private int maxPageSize = 5000;

@Min(1024)
@Max(8192)
private int maxBufferSize = 2048;

@NotNull
private Duration frequency = Duration.ofMillis(500L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class NotifyingTopicListener implements TopicListener {
private final ObjectMapper objectMapper;
private final Flux<TopicMessage> topicMessages;
private final PgChannel channel;
private final ListenerProperties listenerProperties;

public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties listenerProperties) {
this.objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
Expand All @@ -57,6 +58,7 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
.setPort(dbProperties.getPort())
.setUser(dbProperties.getUsername());

this.listenerProperties = listenerProperties;
Duration frequency = listenerProperties.getFrequency();
Vertx vertx = Vertx.vertx();
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, connectOptions)
Expand Down Expand Up @@ -89,7 +91,8 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return topicMessages.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
}

private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
Expand All @@ -101,7 +104,7 @@ private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
private Flux<String> listen() {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
FluxSink<String> sink = emitterProcessor.sink().onDispose(this::unlisten);
channel.handler(json -> sink.next(json));
channel.handler(sink::next);
log.info("Listening for messages");
return emitterProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public SharedPollingTopicListener(ListenerProperties listenerProperties,
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return poller.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
}

private Flux<TopicMessage> poll(PollingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.hedera.mirror.grpc.listener;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.Vector;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.Exceptions;
import reactor.test.StepVerifier;

import com.hedera.mirror.grpc.domain.TopicMessage;
import com.hedera.mirror.grpc.domain.TopicMessageFilter;

public abstract class AbstractSharedTopicListenerTest extends AbstractTopicListenerTest {

@Test
@DisplayName("slow subscriber receives overflow exception and normal subscriber is not affected")
void slowSubscriberOverflowException() {
int maxBufferSize = 16;
listenerProperties.setMaxBufferSize(maxBufferSize);

TopicMessageFilter filter = TopicMessageFilter.builder()
.startTime(Instant.EPOCH)
.build();

// create a normal subscriber to keep the shared flux open
Vector<Long> sequenceNumbers = new Vector<>();
var subscription = getTopicListener().listen(filter)
.map(TopicMessage::getSequenceNumber)
.subscribe(sequenceNumbers::add);

// the slow subscriber
getTopicListener().listen(filter)
.map(TopicMessage::getSequenceNumber)
.as(p -> StepVerifier.create(p, 1)) // initial request amount - 1
.thenRequest(1) // trigger subscription
.thenAwait(Duration.ofMillis(10L))
.then(() -> {
// upon subscription, step verifier will request 2, so we need 2 + maxBufferSize + 1 to trigger
// overflow error
domainBuilder.topicMessages(maxBufferSize + 3, future).blockLast();
})
.expectNext(1L, 2L)
.thenAwait(Duration.ofMillis(500L)) // stall to overrun backpressure buffer
.thenRequest(Long.MAX_VALUE)
.expectNextSequence(LongStream.range(3, maxBufferSize + 3).boxed().collect(Collectors.toList()))
.expectErrorMatches(Exceptions::isOverflow)
.verify(Duration.ofMillis(600L));

assertThat(subscription.isDisposed()).isFalse();
subscription.dispose();
assertThat(sequenceNumbers).isEqualTo(LongStream.range(1, maxBufferSize + 4).boxed().collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

public abstract class AbstractTopicListenerTest extends GrpcIntegrationTest {

protected final Instant future = Instant.now().plusSeconds(10L);
protected final Instant future = Instant.now().plusSeconds(30L);

@Resource
protected DomainBuilder domainBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.hedera.mirror.grpc.domain.TopicMessage;
import com.hedera.mirror.grpc.domain.TopicMessageFilter;

public class NotifyingTopicListenerTest extends AbstractTopicListenerTest {
public class NotifyingTopicListenerTest extends AbstractSharedTopicListenerTest {

@Resource
private NotifyingTopicListener topicListener;
Expand Down Expand Up @@ -92,13 +92,16 @@ void json() {

TopicMessageFilter filter = TopicMessageFilter.builder()
.startTime(Instant.EPOCH)
.topicNum(1001)
.build();

topicListener.listen(filter)
.as(StepVerifier::create)
.thenAwait(Duration.ofMillis(50))
.then(() -> jdbcTemplate.execute("NOTIFY topic_message, '" + json + "'"))
.expectNext(topicMessage);
.expectNext(topicMessage)
.thenCancel()
.verify(Duration.ofMillis(500L));
}

@Test
Expand All @@ -112,6 +115,8 @@ void jsonError() {
.as(StepVerifier::create)
.thenAwait(Duration.ofMillis(50))
.then(() -> jdbcTemplate.execute("NOTIFY topic_message, 'invalid'"))
.expectNoEvent(Duration.ofMillis(500L));
.expectNoEvent(Duration.ofMillis(500L))
.thenCancel()
.verify(Duration.ofMillis(600L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import javax.annotation.Resource;

public class SharedPollingTopicListenerTest extends AbstractTopicListenerTest {
public class SharedPollingTopicListenerTest extends AbstractSharedTopicListenerTest {

@Resource
private SharedPollingTopicListener topicListener;
Expand Down

0 comments on commit e301647

Please sign in to comment.