Skip to content

Commit

Permalink
Merge pull request #4000 from wubin01/opt_broadcast
Browse files Browse the repository at this point in the history
Optimize the broadcast transaction interface
  • Loading branch information
forfreeday authored Aug 20, 2021
2 parents 67de589 + 53f0ee1 commit 350c601
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ public class CommonParameter {
@Getter
@Setter
public int minEffectiveConnection;

@Getter
@Setter
public boolean trxCacheEnable;
@Getter
@Setter
public long allowMarketTransaction; //committee parameter
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public class Constant {

public static final String NODE_RPC_MIN_EFFECTIVE_CONNECTION = "node.rpc.minEffectiveConnection";

public static final String NODE_RPC_TRX_CACHE_ENABLE = "node.rpc.trxCacheEnable";

public static final String ENERGY_LIMIT_BLOCK_NUM = "enery.limit.block.num";

public static final String VM_TRACE = "vm.vmTrace";
Expand Down
55 changes: 31 additions & 24 deletions framework/src/main/java/org/tron/core/Wallet.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public class Wallet {
@Autowired
private NodeManager nodeManager;
private int minEffectiveConnection = Args.getInstance().getMinEffectiveConnection();
private boolean trxCacheEnable = Args.getInstance().isTrxCacheEnable();
public static final String CONTRACT_VALIDATE_EXCEPTION = "ContractValidateException: {}";
public static final String CONTRACT_VALIDATE_ERROR = "contract validate error : ";

Expand Down Expand Up @@ -478,12 +479,12 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) {
GrpcAPI.Return.Builder builder = GrpcAPI.Return.newBuilder();
TransactionCapsule trx = new TransactionCapsule(signedTransaction);
trx.setTime(System.currentTimeMillis());
Sha256Hash txID = trx.getTransactionId();
try {
Message message = new TransactionMessage(signedTransaction.toByteArray());
TransactionMessage message = new TransactionMessage(signedTransaction.toByteArray());
if (minEffectiveConnection != 0) {
if (tronNetDelegate.getActivePeer().isEmpty()) {
logger
.warn("Broadcast transaction {} has failed, no connection.", trx.getTransactionId());
logger.warn("Broadcast transaction {} has failed, no connection.", txID);
return builder.setResult(false).setCode(response_code.NO_CONNECTION)
.setMessage(ByteString.copyFromUtf8("no connection"))
.build();
Expand All @@ -496,75 +497,81 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) {
if (count < minEffectiveConnection) {
String info = "effective connection:" + count + " lt minEffectiveConnection:"
+ minEffectiveConnection;
logger.warn("Broadcast transaction {} has failed, {}.", trx.getTransactionId(), info);
logger.warn("Broadcast transaction {} has failed, {}.", txID, info);
return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION)
.setMessage(ByteString.copyFromUtf8(info))
.build();
}
}

if (dbManager.isTooManyPending()) {
logger
.warn("Broadcast transaction {} has failed, too many pending.", trx.getTransactionId());
logger.warn("Broadcast transaction {} has failed, too many pending.", txID);
return builder.setResult(false).setCode(response_code.SERVER_BUSY).build();
}

if (dbManager.getTransactionIdCache().getIfPresent(trx.getTransactionId()) != null) {
logger.warn("Broadcast transaction {} has failed, it already exists.",
trx.getTransactionId());
return builder.setResult(false).setCode(response_code.DUP_TRANSACTION_ERROR).build();
} else {
dbManager.getTransactionIdCache().put(trx.getTransactionId(), true);
if (trxCacheEnable) {
if (dbManager.getTransactionIdCache().getIfPresent(txID) != null) {
logger.warn("Broadcast transaction {} has failed, it already exists.", txID);
return builder.setResult(false).setCode(response_code.DUP_TRANSACTION_ERROR).build();
} else {
dbManager.getTransactionIdCache().put(txID, true);
}
}

if (chainBaseManager.getDynamicPropertiesStore().supportVM()) {
trx.resetResult();
}
dbManager.pushTransaction(trx);
tronNetService.broadcast(message);
logger.info("Broadcast transaction {} successfully.", trx.getTransactionId());
return builder.setResult(true).setCode(response_code.SUCCESS).build();
int num = tronNetService.fastBroadcastTransaction(message);
if (num == 0) {
return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION)
.setMessage(ByteString.copyFromUtf8("p2p broadcast failed.")).build();
} else {
logger.info("Broadcast transaction {} to {} peers successfully.", txID, num);
return builder.setResult(true).setCode(response_code.SUCCESS).build();
}
} catch (ValidateSignatureException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.SIGERROR)
.setMessage(ByteString.copyFromUtf8("validate signature error " + e.getMessage()))
.build();
} catch (ContractValidateException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.CONTRACT_VALIDATE_ERROR)
.setMessage(ByteString.copyFromUtf8(CONTRACT_VALIDATE_ERROR + e.getMessage()))
.build();
} catch (ContractExeException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.CONTRACT_EXE_ERROR)
.setMessage(ByteString.copyFromUtf8("contract execute error : " + e.getMessage()))
.build();
} catch (AccountResourceInsufficientException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.BANDWITH_ERROR)
.setMessage(ByteString.copyFromUtf8("AccountResourceInsufficient error"))
.build();
} catch (DupTransactionException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.DUP_TRANSACTION_ERROR)
.setMessage(ByteString.copyFromUtf8("dup transaction"))
.build();
} catch (TaposException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.TAPOS_ERROR)
.setMessage(ByteString.copyFromUtf8("Tapos check error"))
.build();
} catch (TooBigTransactionException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.TOO_BIG_TRANSACTION_ERROR)
.setMessage(ByteString.copyFromUtf8("transaction size is too big"))
.build();
} catch (TransactionExpirationException e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.TRANSACTION_EXPIRATION_ERROR)
.setMessage(ByteString.copyFromUtf8("transaction expired"))
.build();
} catch (Exception e) {
logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage());
logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage());
return builder.setResult(false).setCode(response_code.OTHER_ERROR)
.setMessage(ByteString.copyFromUtf8("other error : " + e.getMessage()))
.build();
Expand Down
3 changes: 3 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,9 @@ public static void setParam(final String[] args, final String confFileName) {
PARAMETER.minEffectiveConnection = config.hasPath(Constant.NODE_RPC_MIN_EFFECTIVE_CONNECTION)
? config.getInt(Constant.NODE_RPC_MIN_EFFECTIVE_CONNECTION) : 1;

PARAMETER.trxCacheEnable = config.hasPath(Constant.NODE_RPC_TRX_CACHE_ENABLE)
? config.getBoolean(Constant.NODE_RPC_TRX_CACHE_ENABLE) : false;

PARAMETER.blockNumForEnergyLimit = config.hasPath(Constant.ENERGY_LIMIT_BLOCK_NUM)
? config.getInt(Constant.ENERGY_LIMIT_BLOCK_NUM) : 4727890L;

Expand Down
5 changes: 5 additions & 0 deletions framework/src/main/java/org/tron/core/net/TronNetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.tron.core.exception.P2pException;
import org.tron.core.exception.P2pException.TypeEnum;
import org.tron.core.net.message.BlockMessage;
import org.tron.core.net.message.TransactionMessage;
import org.tron.core.net.message.TronMessage;
import org.tron.core.net.messagehandler.BlockMsgHandler;
import org.tron.core.net.messagehandler.ChainInventoryMsgHandler;
Expand Down Expand Up @@ -78,6 +79,10 @@ public void stop() {
logger.info("TronNetService closed successfully.");
}

public int fastBroadcastTransaction(TransactionMessage msg) {
return advService.fastBroadcastTransaction(msg);
}

public void broadcast(Message msg) {
advService.broadcast(msg);
}
Expand Down
36 changes: 36 additions & 0 deletions framework/src/main/java/org/tron/core/net/service/AdvService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -141,6 +143,40 @@ public Message getMessage(Item item) {
}
}

public int fastBroadcastTransaction(TransactionMessage msg) {

List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.collect(Collectors.toList());

if (peers.size() == 0) {
logger.warn("Broadcast transaction {} failed, no connection.", msg.getMessageId());
return 0;
}

Item item = new Item(msg.getMessageId(), InventoryType.TRX);
trxCount.add();
trxCache.put(item, new TransactionMessage(msg.getTransactionCapsule().getInstance()));

List<Sha256Hash> list = new ArrayList<>();
list.add(msg.getMessageId());
InventoryMessage inventoryMessage = new InventoryMessage(list, InventoryType.TRX);

int peersCount = 0;
for (PeerConnection peer: peers) {
if (peer.getAdvInvReceive().getIfPresent(item) == null
&& peer.getAdvInvSpread().getIfPresent(item) == null) {
peersCount++;
peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
peer.fastSend(inventoryMessage);
}
}
if (peersCount == 0) {
logger.warn("Broadcast transaction {} failed, no peers.", msg.getMessageId());
}
return peersCount;
}

public void broadcast(Message msg) {

if (fastForward) {
Expand Down

0 comments on commit 350c601

Please sign in to comment.