Skip to content

Commit

Permalink
Revert "Fix the issue gRPC notify thread blocked (#965)" (#1034)
Browse files Browse the repository at this point in the history
This reverts commit e301647.

Signed-off-by: Xin Li <[email protected]>
  • Loading branch information
xin-hedera authored Sep 12, 2020
1 parent 556b374 commit 5d51fec
Show file tree
Hide file tree
Showing 10 changed files with 9 additions and 84 deletions.
1 change: 0 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.grpc.endTimeInterval` | 30s | How often we should check if a subscription has gone past the end time |
| `hedera.mirror.grpc.entityCacheSize` | 50000 | The maximum size of the cache to store entities used for existence check |
| `hedera.mirror.grpc.listener.enabled` | true | Whether to listen for incoming massages or not |
| `hedera.mirror.grpc.listener.maxBufferSize` | 2048 | The maximum number of messages the notifying listener or the shared polling listener buffers before sending an error to a client |
| `hedera.mirror.grpc.listener.maxPageSize` | 5000 | The maximum number of messages the listener can return in a single call to the database |
| `hedera.mirror.grpc.listener.frequency` | 500ms | How often to poll or retry errors (varies by type). Can accept duration units like `50ms`, `10s`, etc. |
| `hedera.mirror.grpc.listener.type` | NOTIFY | The type of listener to use for incoming messages. Accepts either NOTIFY, POLL or SHARED_POLL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.dao.NonTransientDataAccessResourceException;
import org.springframework.dao.TransientDataAccessException;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -57,7 +56,6 @@
public class ConsensusController extends ReactorConsensusServiceGrpc.ConsensusServiceImplBase {

private static final String DB_ERROR = "Unable to connect to database. Please retry later";
private static final String OVERFLOW_ERROR = "Client lags too much behind. Please retry later";

private final TopicMessageService topicMessageService;

Expand All @@ -72,7 +70,6 @@ public Flux<ConsensusTopicResponse> subscribeTopic(Mono<ConsensusTopicQuery> req
.onErrorMap(TimeoutException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(TopicNotFoundException.class, e -> error(e, Status.NOT_FOUND))
.onErrorMap(TransientDataAccessException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(Exceptions::isOverflow, e -> error(e, Status.DEADLINE_EXCEEDED, OVERFLOW_ERROR))
.onErrorMap(t -> unknownError(t));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
*/

import java.time.Duration;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import lombok.Data;
Expand All @@ -38,10 +37,6 @@ public class ListenerProperties {
@Min(32)
private int maxPageSize = 5000;

@Min(1024)
@Max(8192)
private int maxBufferSize = 2048;

@NotNull
private Duration frequency = Duration.ofMillis(500L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class NotifyingTopicListener implements TopicListener {
private final ObjectMapper objectMapper;
private final Flux<TopicMessage> topicMessages;
private final PgChannel channel;
private final ListenerProperties listenerProperties;

public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties listenerProperties) {
this.objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
Expand All @@ -58,7 +57,6 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
.setPort(dbProperties.getPort())
.setUser(dbProperties.getUsername());

this.listenerProperties = listenerProperties;
Duration frequency = listenerProperties.getFrequency();
Vertx vertx = Vertx.vertx();
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, connectOptions)
Expand Down Expand Up @@ -91,8 +89,7 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return topicMessages.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
}

private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
Expand All @@ -104,7 +101,7 @@ private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
private Flux<String> listen() {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
FluxSink<String> sink = emitterProcessor.sink().onDispose(this::unlisten);
channel.handler(sink::next);
channel.handler(json -> sink.next(json));
log.info("Listening for messages");
return emitterProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public SharedPollingTopicListener(ListenerProperties listenerProperties,
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return poller.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
}

private Flux<TopicMessage> poll(PollingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

public abstract class AbstractTopicListenerTest extends GrpcIntegrationTest {

protected final Instant future = Instant.now().plusSeconds(30L);
protected final Instant future = Instant.now().plusSeconds(10L);

@Resource
protected DomainBuilder domainBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.hedera.mirror.grpc.domain.TopicMessage;
import com.hedera.mirror.grpc.domain.TopicMessageFilter;

public class NotifyingTopicListenerTest extends AbstractSharedTopicListenerTest {
public class NotifyingTopicListenerTest extends AbstractTopicListenerTest {

@Resource
private NotifyingTopicListener topicListener;
Expand Down Expand Up @@ -92,16 +92,13 @@ void json() {

TopicMessageFilter filter = TopicMessageFilter.builder()
.startTime(Instant.EPOCH)
.topicNum(1001)
.build();

topicListener.listen(filter)
.as(StepVerifier::create)
.thenAwait(Duration.ofMillis(50))
.then(() -> jdbcTemplate.execute("NOTIFY topic_message, '" + json + "'"))
.expectNext(topicMessage)
.thenCancel()
.verify(Duration.ofMillis(500L));
.expectNext(topicMessage);
}

@Test
Expand All @@ -115,8 +112,6 @@ void jsonError() {
.as(StepVerifier::create)
.thenAwait(Duration.ofMillis(50))
.then(() -> jdbcTemplate.execute("NOTIFY topic_message, 'invalid'"))
.expectNoEvent(Duration.ofMillis(500L))
.thenCancel()
.verify(Duration.ofMillis(600L));
.expectNoEvent(Duration.ofMillis(500L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import javax.annotation.Resource;

public class SharedPollingTopicListenerTest extends AbstractSharedTopicListenerTest {
public class SharedPollingTopicListenerTest extends AbstractTopicListenerTest {

@Resource
private SharedPollingTopicListener topicListener;
Expand Down

0 comments on commit 5d51fec

Please sign in to comment.