Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Fix the issue gRPC notify thread blocked (#965)" #1034

Merged
merged 1 commit into from
Sep 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.grpc.endTimeInterval` | 30s | How often we should check if a subscription has gone past the end time |
| `hedera.mirror.grpc.entityCacheSize` | 50000 | The maximum size of the cache to store entities used for existence check |
| `hedera.mirror.grpc.listener.enabled` | true | Whether to listen for incoming massages or not |
| `hedera.mirror.grpc.listener.maxBufferSize` | 2048 | The maximum number of messages the notifying listener or the shared polling listener buffers before sending an error to a client |
| `hedera.mirror.grpc.listener.maxPageSize` | 5000 | The maximum number of messages the listener can return in a single call to the database |
| `hedera.mirror.grpc.listener.frequency` | 500ms | How often to poll or retry errors (varies by type). Can accept duration units like `50ms`, `10s`, etc. |
| `hedera.mirror.grpc.listener.type` | NOTIFY | The type of listener to use for incoming messages. Accepts either NOTIFY, POLL or SHARED_POLL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.dao.NonTransientDataAccessResourceException;
import org.springframework.dao.TransientDataAccessException;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -57,7 +56,6 @@
public class ConsensusController extends ReactorConsensusServiceGrpc.ConsensusServiceImplBase {

private static final String DB_ERROR = "Unable to connect to database. Please retry later";
private static final String OVERFLOW_ERROR = "Client lags too much behind. Please retry later";

private final TopicMessageService topicMessageService;

Expand All @@ -72,7 +70,6 @@ public Flux<ConsensusTopicResponse> subscribeTopic(Mono<ConsensusTopicQuery> req
.onErrorMap(TimeoutException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(TopicNotFoundException.class, e -> error(e, Status.NOT_FOUND))
.onErrorMap(TransientDataAccessException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(Exceptions::isOverflow, e -> error(e, Status.DEADLINE_EXCEEDED, OVERFLOW_ERROR))
.onErrorMap(t -> unknownError(t));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
*/

import java.time.Duration;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import lombok.Data;
Expand All @@ -38,10 +37,6 @@ public class ListenerProperties {
@Min(32)
private int maxPageSize = 5000;

@Min(1024)
@Max(8192)
private int maxBufferSize = 2048;

@NotNull
private Duration frequency = Duration.ofMillis(500L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class NotifyingTopicListener implements TopicListener {
private final ObjectMapper objectMapper;
private final Flux<TopicMessage> topicMessages;
private final PgChannel channel;
private final ListenerProperties listenerProperties;

public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties listenerProperties) {
this.objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
Expand All @@ -58,7 +57,6 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
.setPort(dbProperties.getPort())
.setUser(dbProperties.getUsername());

this.listenerProperties = listenerProperties;
Duration frequency = listenerProperties.getFrequency();
Vertx vertx = Vertx.vertx();
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, connectOptions)
Expand Down Expand Up @@ -91,8 +89,7 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return topicMessages.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
}

private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
Expand All @@ -104,7 +101,7 @@ private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
private Flux<String> listen() {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
FluxSink<String> sink = emitterProcessor.sink().onDispose(this::unlisten);
channel.handler(sink::next);
channel.handler(json -> sink.next(json));
log.info("Listening for messages");
return emitterProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public SharedPollingTopicListener(ListenerProperties listenerProperties,
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return poller.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
}

private Flux<TopicMessage> poll(PollingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

public abstract class AbstractTopicListenerTest extends GrpcIntegrationTest {

protected final Instant future = Instant.now().plusSeconds(30L);
protected final Instant future = Instant.now().plusSeconds(10L);

@Resource
protected DomainBuilder domainBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.hedera.mirror.grpc.domain.TopicMessage;
import com.hedera.mirror.grpc.domain.TopicMessageFilter;

public class NotifyingTopicListenerTest extends AbstractSharedTopicListenerTest {
public class NotifyingTopicListenerTest extends AbstractTopicListenerTest {

@Resource
private NotifyingTopicListener topicListener;
Expand Down Expand Up @@ -92,16 +92,13 @@ void json() {

TopicMessageFilter filter = TopicMessageFilter.builder()
.startTime(Instant.EPOCH)
.topicNum(1001)
.build();

topicListener.listen(filter)
.as(StepVerifier::create)
.thenAwait(Duration.ofMillis(50))
.then(() -> jdbcTemplate.execute("NOTIFY topic_message, '" + json + "'"))
.expectNext(topicMessage)
.thenCancel()
.verify(Duration.ofMillis(500L));
.expectNext(topicMessage);
}

@Test
Expand All @@ -115,8 +112,6 @@ void jsonError() {
.as(StepVerifier::create)
.thenAwait(Duration.ofMillis(50))
.then(() -> jdbcTemplate.execute("NOTIFY topic_message, 'invalid'"))
.expectNoEvent(Duration.ofMillis(500L))
.thenCancel()
.verify(Duration.ofMillis(600L));
.expectNoEvent(Duration.ofMillis(500L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import javax.annotation.Resource;

public class SharedPollingTopicListenerTest extends AbstractSharedTopicListenerTest {
public class SharedPollingTopicListenerTest extends AbstractTopicListenerTest {

@Resource
private SharedPollingTopicListener topicListener;
Expand Down