Skip to content

Commit

Permalink
Apply latest origin master commits (#93)
Browse files Browse the repository at this point in the history
* Fix the issue gRPC notify thread blocked (hiero-ledger#965)

Add backpressure buffer with ERROR strategy to notifying and shared polling topic listeners. When overflow happens, the controller will send a gRPC error after all buffered messages and disconnect the client.

Signed-off-by: Xin Li <[email protected]>

* Performance test HighTPS HCS publish improvements  (hiero-ledger#970)

* Performance updates to high tps module

- Adds additional stats
- Allow publish of stats interval to be configured
- Does some reorganizing of code
- Sets JMeter and java flags to allow for improved publish performance and prevent memory exhaustion

Signed-off-by: Nana-EC <[email protected]>

* Bump spring-boot-starter-parent from 2.2.7.RELEASE to 2.3.3.RELEASE (hiero-ledger#961)

Bumps [spring-boot-starter-parent](https://github.com/spring-projects/spring-boot) from 2.2.7.RELEASE to 2.3.3.RELEASE.
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v2.2.7.RELEASE...v2.3.3.RELEASE)

Signed-off-by: Nana-EC <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

Co-authored-by: Xin Li <[email protected]>
Co-authored-by: Nana-EC <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Aug 24, 2020
1 parent 5216bc0 commit 61620de
Show file tree
Hide file tree
Showing 19 changed files with 329 additions and 116 deletions.
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.grpc.endTimeInterval` | 30s | How often we should check if a subscription has gone past the end time |
| `hedera.mirror.grpc.entityCacheSize` | 50000 | The maximum size of the cache to store entities used for existence check |
| `hedera.mirror.grpc.listener.enabled` | true | Whether to listen for incoming massages or not |
| `hedera.mirror.grpc.listener.maxBufferSize` | 2048 | The maximum number of messages the notifying listener or the shared polling listener buffers before sending an error to a client |
| `hedera.mirror.grpc.listener.maxPageSize` | 5000 | The maximum number of messages the listener can return in a single call to the database |
| `hedera.mirror.grpc.listener.frequency` | 500ms | How often to poll or retry errors (varies by type). Can accept duration units like `50ms`, `10s`, etc. |
| `hedera.mirror.grpc.listener.type` | NOTIFY | The type of listener to use for incoming messages. Accepts either NOTIFY, POLL or SHARED_POLL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.dao.NonTransientDataAccessResourceException;
import org.springframework.dao.TransientDataAccessException;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -56,6 +57,7 @@
public class ConsensusController extends ReactorConsensusServiceGrpc.ConsensusServiceImplBase {

private static final String DB_ERROR = "Unable to connect to database. Please retry later";
private static final String OVERFLOW_ERROR = "Client lags too much behind. Please retry later";

private final TopicMessageService topicMessageService;

Expand All @@ -70,6 +72,7 @@ public Flux<ConsensusTopicResponse> subscribeTopic(Mono<ConsensusTopicQuery> req
.onErrorMap(TimeoutException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(TopicNotFoundException.class, e -> error(e, Status.NOT_FOUND))
.onErrorMap(TransientDataAccessException.class, e -> error(e, Status.RESOURCE_EXHAUSTED))
.onErrorMap(Exceptions::isOverflow, e -> error(e, Status.DEADLINE_EXCEEDED, OVERFLOW_ERROR))
.onErrorMap(t -> unknownError(t));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import java.time.Duration;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import lombok.Data;
Expand All @@ -37,6 +38,10 @@ public class ListenerProperties {
@Min(32)
private int maxPageSize = 5000;

@Min(1024)
@Max(8192)
private int maxBufferSize = 2048;

@NotNull
private Duration frequency = Duration.ofMillis(500L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class NotifyingTopicListener implements TopicListener {
private final ObjectMapper objectMapper;
private final Flux<TopicMessage> topicMessages;
private final PgChannel channel;
private final ListenerProperties listenerProperties;

public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties listenerProperties) {
this.objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
Expand All @@ -57,6 +58,7 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
.setPort(dbProperties.getPort())
.setUser(dbProperties.getUsername());

this.listenerProperties = listenerProperties;
Duration frequency = listenerProperties.getFrequency();
Vertx vertx = Vertx.vertx();
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, connectOptions)
Expand Down Expand Up @@ -89,7 +91,8 @@ public NotifyingTopicListener(DbProperties dbProperties, ListenerProperties list
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return topicMessages.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
}

private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
Expand All @@ -101,7 +104,7 @@ private boolean filterMessage(TopicMessage message, TopicMessageFilter filter) {
private Flux<String> listen() {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
FluxSink<String> sink = emitterProcessor.sink().onDispose(this::unlisten);
channel.handler(json -> sink.next(json));
channel.handler(sink::next);
log.info("Listening for messages");
return emitterProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public SharedPollingTopicListener(ListenerProperties listenerProperties,
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
return poller.filter(t -> filterMessage(t, filter))
.doOnSubscribe(s -> log.info("Subscribing: {}", filter));
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(listenerProperties.getMaxBufferSize());
}

private Flux<TopicMessage> poll(PollingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,22 @@
import com.google.common.base.Stopwatch;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Value;
import lombok.extern.log4j.Log4j2;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;

import com.hedera.hashgraph.sdk.Client;
import com.hedera.hashgraph.sdk.account.AccountId;
import com.hedera.hashgraph.sdk.consensus.ConsensusTopicId;
import com.hedera.hashgraph.sdk.crypto.ed25519.Ed25519PrivateKey;
import com.hedera.mirror.grpc.jmeter.handler.PropertiesHandler;
import com.hedera.mirror.grpc.jmeter.handler.SDKClientHandler;
import com.hedera.mirror.grpc.jmeter.props.TopicMessagePublishRequest;
import com.hedera.mirror.grpc.jmeter.sampler.TopicMessagesPublishSampler;
import com.hedera.mirror.grpc.jmeter.sampler.result.TransactionSubmissionResult;
Expand All @@ -50,7 +47,7 @@
public class TopicMessagePublishClient extends AbstractJavaSamplerClient {

private PropertiesHandler propHandler;
private List<SDKClient> clientList;
private List<SDKClientHandler> clientList;
private Long topicNum;
private int messagesPerBatchCount;
private int messageByteSize;
Expand Down Expand Up @@ -79,7 +76,7 @@ public void setupTest(JavaSamplerContext javaSamplerContext) {
// node info expected in comma separated list of <node_IP>:<node_accountId>:<node_port>
String[] nodeList = propHandler.getTestParam("networkNodes", "localhost:0.0.3:50211").split(",");
clientList = Arrays.asList(nodeList).stream()
.map(x -> new SDKClient(x, operatorId, operatorPrivateKey))
.map(x -> new SDKClientHandler(x, operatorId, operatorPrivateKey))
.collect(Collectors.toList());
}

Expand All @@ -98,7 +95,7 @@ public SampleResult runTest(JavaSamplerContext javaSamplerContext) {

// kick off batched message publish
TopicMessagePublishRequest topicMessagePublishRequest = TopicMessagePublishRequest.builder()
.consensusTopicId(new ConsensusTopicId(0, 0, topicNum))
.consensusTopicId(topicNum == 0L ? null : new ConsensusTopicId(0, 0, topicNum))
.messageByteSize(messageByteSize)
.publishInterval(publishInterval)
.publishTimeout(publishTimeout)
Expand All @@ -123,7 +120,8 @@ public SampleResult runTest(JavaSamplerContext javaSamplerContext) {
() -> {
TopicMessagesPublishSampler topicMessagesPublishSampler =
new TopicMessagesPublishSampler(topicMessagePublishRequest, x, verifyTransactions);
counter.addAndGet(topicMessagesPublishSampler.submitConsensusMessageTransactions());
counter.addAndGet(topicMessagesPublishSampler
.submitConsensusMessageTransactions());
},
0,
publishInterval,
Expand Down Expand Up @@ -176,53 +174,4 @@ private void printStatus(int totalCount, Stopwatch totalStopwatch) {
log.info("Published {} total transactions in {} s ({}/s)", totalCount, totalStopwatch
.elapsed(TimeUnit.SECONDS), rate);
}

@Value
public class NodeInfo {
private final AccountId nodeId;
private final String nodeHost;
private final String nodePort;

public NodeInfo(String nodeInfo) {
String[] nodeParts = nodeInfo.split(":");
nodeHost = nodeParts[0];
nodeId = AccountId.fromString(nodeParts[1]);
nodePort = nodeParts[2];
}

public String getNodeAddress() {
return nodeHost + ":" + nodePort;
}
}

@Value
public class SDKClient {
private final NodeInfo nodeInfo;
private final AccountId operatorId;
private final Ed25519PrivateKey operatorPrivateKey;
private final Client client;

public SDKClient(String nodeParts, AccountId operatorId, Ed25519PrivateKey operatorPrivateKey) {
nodeInfo = new NodeInfo(nodeParts);
this.operatorId = operatorId;
this.operatorPrivateKey = operatorPrivateKey;

client = new Client(Map.of(nodeInfo.nodeId, nodeInfo.getNodeAddress()));
client.setOperator(operatorId, operatorPrivateKey);

log.trace("Created client for {}", nodeInfo);
}

public void close() throws InterruptedException {
log.debug("Closing SDK client, waits up to 10 s for valid close");

try {
if (client != null) {
client.close(5, TimeUnit.SECONDS);
}
} catch (TimeoutException tex) {
log.debug("Exception on client close: {}", tex.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package com.hedera.mirror.grpc.jmeter.handler;

/*-
* ‌
* Hedera Mirror Node
* ​
* Copyright (C) 2019 - 2020 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Value;
import lombok.extern.log4j.Log4j2;

import com.hedera.hashgraph.sdk.Client;
import com.hedera.hashgraph.sdk.HederaStatusException;
import com.hedera.hashgraph.sdk.Status;
import com.hedera.hashgraph.sdk.Transaction;
import com.hedera.hashgraph.sdk.TransactionId;
import com.hedera.hashgraph.sdk.TransactionReceipt;
import com.hedera.hashgraph.sdk.account.AccountId;
import com.hedera.hashgraph.sdk.account.CryptoTransferTransaction;
import com.hedera.hashgraph.sdk.consensus.ConsensusMessageSubmitTransaction;
import com.hedera.hashgraph.sdk.consensus.ConsensusTopicCreateTransaction;
import com.hedera.hashgraph.sdk.consensus.ConsensusTopicId;
import com.hedera.hashgraph.sdk.crypto.ed25519.Ed25519PrivateKey;
import com.hedera.mirror.grpc.jmeter.props.NodeInfo;

@Log4j2
@Value
public class SDKClientHandler {
protected final Client client;
private final NodeInfo nodeInfo;
private final AccountId operatorId;
@Getter(AccessLevel.NONE)
private final Ed25519PrivateKey operatorPrivateKey;

public SDKClientHandler(String nodeParts, AccountId operatorId, Ed25519PrivateKey operatorPrivateKey) {
nodeInfo = new NodeInfo(nodeParts);
this.operatorId = operatorId;
this.operatorPrivateKey = operatorPrivateKey;

client = new Client(Map.of(nodeInfo.getNodeId(), nodeInfo.getNodeAddress()));
client.setOperator(operatorId, operatorPrivateKey);

log.trace("Created client for {}", nodeInfo);
}

public void close() throws InterruptedException {
log.debug("Closing SDK client, waits up to 10 s for valid close");

try {
if (client != null) {
client.close(5, TimeUnit.SECONDS);
}
} catch (TimeoutException tex) {
log.debug("Exception on client close: {}", tex.getMessage());
}
}

public ConsensusTopicId createTopic() throws HederaStatusException {

ConsensusTopicCreateTransaction consensusTopicCreateTransaction = new ConsensusTopicCreateTransaction()
.setAdminKey(operatorPrivateKey.publicKey)
.setAutoRenewAccountId(operatorId)
.setMaxTransactionFee(1_000_000_000)
.setTopicMemo("HCS Topic_" + Instant.now());

return createTopic(consensusTopicCreateTransaction);
}

public ConsensusTopicId createTopic(ConsensusTopicCreateTransaction consensusTopicCreateTransaction) throws HederaStatusException {
TransactionReceipt transactionReceipt = consensusTopicCreateTransaction
.execute(client)
.getReceipt(client);

ConsensusTopicId topicId = transactionReceipt.getConsensusTopicId();
log.info("Created new topic {}, with TransactionReceipt : {}", topicId, transactionReceipt);

return topicId;
}

public TransactionId submitTopicMessage(ConsensusTopicId topicId, String message) throws HederaStatusException {
ConsensusMessageSubmitTransaction consensusMessageSubmitTransaction = new ConsensusMessageSubmitTransaction()
.setTopicId(topicId)
.setMessage(message);

return submitTopicMessage(consensusMessageSubmitTransaction);
}

public TransactionId submitTopicMessage(ConsensusMessageSubmitTransaction consensusMessageSubmitTransaction) throws HederaStatusException {
Transaction transaction = consensusMessageSubmitTransaction.build(client);

return transaction.execute(client, Duration.ofSeconds(2));
}

public TransactionId submitCryptoTransfer(AccountId operatorId, AccountId recipientId, int amount) throws HederaStatusException {
TransactionId transactionId = new CryptoTransferTransaction()
.addSender(operatorId, amount)
.addRecipient(recipientId, amount)
.setTransactionMemo("transfer test")
.execute(client);

return transactionId;
}

public int getValidTransactionsCount(List<TransactionId> transactionIds) {
log.debug("Verify Transactions {}", transactionIds.size());
AtomicInteger counter = new AtomicInteger(0);
transactionIds.forEach(x -> {
TransactionReceipt receipt = null;
try {
receipt = x.getReceipt(client);
} catch (HederaStatusException e) {
log.debug("Error pulling {} receipt {}", x, e.getMessage());
}
if (receipt.status == Status.Success) {
counter.incrementAndGet();
} else {
log.warn("Transaction {} had an unexpected status of {}", x, receipt.status);
}
});

log.debug("{} out of {} transactions returned a Success status", counter.get(), transactionIds.size());
return counter.get();
}
}
Loading

0 comments on commit 61620de

Please sign in to comment.