diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index afdddf306e0..968f06e8872 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -296,7 +296,9 @@ public class CommonParameter { @Getter @Setter public int minEffectiveConnection; - + @Getter + @Setter + public boolean trxCacheEnable; @Getter @Setter public long allowMarketTransaction; //committee parameter diff --git a/common/src/main/java/org/tron/core/Constant.java b/common/src/main/java/org/tron/core/Constant.java index 890be85e2a0..c149cf9042d 100644 --- a/common/src/main/java/org/tron/core/Constant.java +++ b/common/src/main/java/org/tron/core/Constant.java @@ -169,6 +169,8 @@ public class Constant { public static final String NODE_RPC_MIN_EFFECTIVE_CONNECTION = "node.rpc.minEffectiveConnection"; + public static final String NODE_RPC_TRX_CACHE_ENABLE = "node.rpc.trxCacheEnable"; + public static final String ENERGY_LIMIT_BLOCK_NUM = "enery.limit.block.num"; public static final String VM_TRACE = "vm.vmTrace"; diff --git a/framework/src/main/java/org/tron/core/Wallet.java b/framework/src/main/java/org/tron/core/Wallet.java index 2d6a2febd93..67005828400 100755 --- a/framework/src/main/java/org/tron/core/Wallet.java +++ b/framework/src/main/java/org/tron/core/Wallet.java @@ -263,6 +263,7 @@ public class Wallet { @Autowired private NodeManager nodeManager; private int minEffectiveConnection = Args.getInstance().getMinEffectiveConnection(); + private boolean trxCacheEnable = Args.getInstance().isTrxCacheEnable(); public static final String CONTRACT_VALIDATE_EXCEPTION = "ContractValidateException: {}"; public static final String CONTRACT_VALIDATE_ERROR = "contract validate error : "; @@ -478,12 +479,12 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) { GrpcAPI.Return.Builder builder = GrpcAPI.Return.newBuilder(); TransactionCapsule trx = new TransactionCapsule(signedTransaction); trx.setTime(System.currentTimeMillis()); + Sha256Hash txID = trx.getTransactionId(); try { - Message message = new TransactionMessage(signedTransaction.toByteArray()); + TransactionMessage message = new TransactionMessage(signedTransaction.toByteArray()); if (minEffectiveConnection != 0) { if (tronNetDelegate.getActivePeer().isEmpty()) { - logger - .warn("Broadcast transaction {} has failed, no connection.", trx.getTransactionId()); + logger.warn("Broadcast transaction {} has failed, no connection.", txID); return builder.setResult(false).setCode(response_code.NO_CONNECTION) .setMessage(ByteString.copyFromUtf8("no connection")) .build(); @@ -496,7 +497,7 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) { if (count < minEffectiveConnection) { String info = "effective connection:" + count + " lt minEffectiveConnection:" + minEffectiveConnection; - logger.warn("Broadcast transaction {} has failed, {}.", trx.getTransactionId(), info); + logger.warn("Broadcast transaction {} has failed, {}.", txID, info); return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION) .setMessage(ByteString.copyFromUtf8(info)) .build(); @@ -504,67 +505,73 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) { } if (dbManager.isTooManyPending()) { - logger - .warn("Broadcast transaction {} has failed, too many pending.", trx.getTransactionId()); + logger.warn("Broadcast transaction {} has failed, too many pending.", txID); return builder.setResult(false).setCode(response_code.SERVER_BUSY).build(); } - if (dbManager.getTransactionIdCache().getIfPresent(trx.getTransactionId()) != null) { - logger.warn("Broadcast transaction {} has failed, it already exists.", - trx.getTransactionId()); - return builder.setResult(false).setCode(response_code.DUP_TRANSACTION_ERROR).build(); - } else { - dbManager.getTransactionIdCache().put(trx.getTransactionId(), true); + if (trxCacheEnable) { + if (dbManager.getTransactionIdCache().getIfPresent(txID) != null) { + logger.warn("Broadcast transaction {} has failed, it already exists.", txID); + return builder.setResult(false).setCode(response_code.DUP_TRANSACTION_ERROR).build(); + } else { + dbManager.getTransactionIdCache().put(txID, true); + } } + if (chainBaseManager.getDynamicPropertiesStore().supportVM()) { trx.resetResult(); } dbManager.pushTransaction(trx); - tronNetService.broadcast(message); - logger.info("Broadcast transaction {} successfully.", trx.getTransactionId()); - return builder.setResult(true).setCode(response_code.SUCCESS).build(); + int num = tronNetService.fastBroadcastTransaction(message); + if (num == 0) { + return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION) + .setMessage(ByteString.copyFromUtf8("p2p broadcast failed.")).build(); + } else { + logger.info("Broadcast transaction {} to {} peers successfully.", txID, num); + return builder.setResult(true).setCode(response_code.SUCCESS).build(); + } } catch (ValidateSignatureException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.SIGERROR) .setMessage(ByteString.copyFromUtf8("validate signature error " + e.getMessage())) .build(); } catch (ContractValidateException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.CONTRACT_VALIDATE_ERROR) .setMessage(ByteString.copyFromUtf8(CONTRACT_VALIDATE_ERROR + e.getMessage())) .build(); } catch (ContractExeException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.CONTRACT_EXE_ERROR) .setMessage(ByteString.copyFromUtf8("contract execute error : " + e.getMessage())) .build(); } catch (AccountResourceInsufficientException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.BANDWITH_ERROR) .setMessage(ByteString.copyFromUtf8("AccountResourceInsufficient error")) .build(); } catch (DupTransactionException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.DUP_TRANSACTION_ERROR) .setMessage(ByteString.copyFromUtf8("dup transaction")) .build(); } catch (TaposException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.TAPOS_ERROR) .setMessage(ByteString.copyFromUtf8("Tapos check error")) .build(); } catch (TooBigTransactionException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.TOO_BIG_TRANSACTION_ERROR) .setMessage(ByteString.copyFromUtf8("transaction size is too big")) .build(); } catch (TransactionExpirationException e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.TRANSACTION_EXPIRATION_ERROR) .setMessage(ByteString.copyFromUtf8("transaction expired")) .build(); } catch (Exception e) { - logger.error(BROADCAST_TRANS_FAILED, trx.getTransactionId(), e.getMessage()); + logger.error(BROADCAST_TRANS_FAILED, txID, e.getMessage()); return builder.setResult(false).setCode(response_code.OTHER_ERROR) .setMessage(ByteString.copyFromUtf8("other error : " + e.getMessage())) .build(); diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index d159f1ad243..2892172d0af 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -610,6 +610,9 @@ public static void setParam(final String[] args, final String confFileName) { PARAMETER.minEffectiveConnection = config.hasPath(Constant.NODE_RPC_MIN_EFFECTIVE_CONNECTION) ? config.getInt(Constant.NODE_RPC_MIN_EFFECTIVE_CONNECTION) : 1; + PARAMETER.trxCacheEnable = config.hasPath(Constant.NODE_RPC_TRX_CACHE_ENABLE) + ? config.getBoolean(Constant.NODE_RPC_TRX_CACHE_ENABLE) : false; + PARAMETER.blockNumForEnergyLimit = config.hasPath(Constant.ENERGY_LIMIT_BLOCK_NUM) ? config.getInt(Constant.ENERGY_LIMIT_BLOCK_NUM) : 4727890L; diff --git a/framework/src/main/java/org/tron/core/net/TronNetService.java b/framework/src/main/java/org/tron/core/net/TronNetService.java index 094e452f444..15d5173774a 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetService.java +++ b/framework/src/main/java/org/tron/core/net/TronNetService.java @@ -8,6 +8,7 @@ import org.tron.core.exception.P2pException; import org.tron.core.exception.P2pException.TypeEnum; import org.tron.core.net.message.BlockMessage; +import org.tron.core.net.message.TransactionMessage; import org.tron.core.net.message.TronMessage; import org.tron.core.net.messagehandler.BlockMsgHandler; import org.tron.core.net.messagehandler.ChainInventoryMsgHandler; @@ -78,6 +79,10 @@ public void stop() { logger.info("TronNetService closed successfully."); } + public int fastBroadcastTransaction(TransactionMessage msg) { + return advService.fastBroadcastTransaction(msg); + } + public void broadcast(Message msg) { advService.broadcast(msg); } diff --git a/framework/src/main/java/org/tron/core/net/service/AdvService.java b/framework/src/main/java/org/tron/core/net/service/AdvService.java index 46c0f4c883a..64c9ec79e99 100644 --- a/framework/src/main/java/org/tron/core/net/service/AdvService.java +++ b/framework/src/main/java/org/tron/core/net/service/AdvService.java @@ -6,6 +6,8 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; + +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; @@ -141,6 +143,40 @@ public Message getMessage(Item item) { } } + public int fastBroadcastTransaction(TransactionMessage msg) { + + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) + .collect(Collectors.toList()); + + if (peers.size() == 0) { + logger.warn("Broadcast transaction {} failed, no connection.", msg.getMessageId()); + return 0; + } + + Item item = new Item(msg.getMessageId(), InventoryType.TRX); + trxCount.add(); + trxCache.put(item, new TransactionMessage(msg.getTransactionCapsule().getInstance())); + + List list = new ArrayList<>(); + list.add(msg.getMessageId()); + InventoryMessage inventoryMessage = new InventoryMessage(list, InventoryType.TRX); + + int peersCount = 0; + for (PeerConnection peer: peers) { + if (peer.getAdvInvReceive().getIfPresent(item) == null + && peer.getAdvInvSpread().getIfPresent(item) == null) { + peersCount++; + peer.getAdvInvSpread().put(item, Time.getCurrentMillis()); + peer.fastSend(inventoryMessage); + } + } + if (peersCount == 0) { + logger.warn("Broadcast transaction {} failed, no peers.", msg.getMessageId()); + } + return peersCount; + } + public void broadcast(Message msg) { if (fastForward) {