Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of sync & adv module #2045

Merged
merged 55 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
6c2c883
add tron proxy
xxo1shine Oct 25, 2018
510db9b
add message handler
xxo1shine Oct 26, 2018
e3f900f
add SyncBlockChainMsgHadler log
xxo1shine Oct 29, 2018
a6a3576
Merge branch 'Odyssey_v3.2' of https://github.com/tronprotocol/java-t…
xxo1shine Oct 30, 2018
fa2f142
add TransactionsMsgHandler
xxo1shine Oct 30, 2018
0033658
add ChainInventory Msg Handler
xxo1shine Nov 2, 2018
38471b9
add peer adv
xxo1shine Nov 5, 2018
44283b7
add invetory handler
xxo1shine Nov 6, 2018
56ceb8a
add msg handler
xxo1shine Nov 9, 2018
6b68891
remove nodeimpl
xxo1shine Nov 12, 2018
071c803
Resolving merger conflicts
xxo1shine Nov 12, 2018
ecd4514
system integration synchronization module
xxo1shine Nov 12, 2018
4463f7f
add Peer Adv
xxo1shine Nov 17, 2018
cdbc256
modify peer adv
xxo1shine Nov 20, 2018
77bb646
modify sync bolock handle
xxo1shine Nov 21, 2018
310b4cb
modify peer sync
xxo1shine Nov 21, 2018
95c0bca
modify peer sync and adv
xxo1shine Nov 22, 2018
14a76bc
perfect message processing code
xxo1shine Nov 26, 2018
91ae198
modify FetchInvData msg handler
xxo1shine Nov 26, 2018
2b522f5
add peer status check
xxo1shine Nov 27, 2018
c879837
update peer sync
xxo1shine Nov 28, 2018
71e0151
add advBlockCacheSize limitation
xxo1shine Dec 10, 2018
97afe01
resolving conflicts
xxo1shine Dec 11, 2018
c6e2075
modify peer connection
xxo1shine Dec 17, 2018
edec6cd
resolving conflicts
xxo1shine Jan 7, 2019
3bf7784
modify p2p excepiton
xxo1shine Jan 14, 2019
c84f4a1
resolving conflicts
xxo1shine Jan 15, 2019
add9e07
modify test
xxo1shine Jan 17, 2019
8f5c7ee
merge develop
xxo1shine Feb 18, 2019
c5e62ed
Merge branch 'develop' of https://github.com/tronprotocol/java-tron i…
xxo1shine Feb 27, 2019
b534b64
modify SyncService
xxo1shine Feb 27, 2019
c8dca57
merge develop
xxo1shine Feb 28, 2019
2b60e66
Merge branch 'develop' of https://github.com/tronprotocol/java-tron i…
xxo1shine Mar 5, 2019
a2c1a89
modify TronNetService
xxo1shine Mar 7, 2019
c431bc9
modify log
xxo1shine Mar 11, 2019
dfca864
modify tron class name
xxo1shine Mar 11, 2019
f36134e
add unit test
xxo1shine Mar 12, 2019
6a77eec
modify processTransaction
xxo1shine Mar 12, 2019
246aa69
modify AdvService
xxo1shine Mar 12, 2019
9d95e00
merge develop
xxo1shine Mar 12, 2019
8d6287e
modify InventoryMsgHandler
xxo1shine Mar 13, 2019
a833f69
modify peerconnection
xxo1shine Mar 14, 2019
1cbd25e
modify Channel handle exception
xxo1shine Mar 18, 2019
478c4a7
Merge branch 'develop' of https://github.com/tronprotocol/java-tron i…
xxo1shine Mar 18, 2019
00ac4d9
modify AdvService broadcast inf
xxo1shine Mar 19, 2019
76b188a
modify AdvService consumerInvToFetch func
xxo1shine Mar 19, 2019
41ba416
modify MessageQueue sendmsg
xxo1shine Mar 19, 2019
da177dc
modify trx handle
xxo1shine Mar 19, 2019
e19ca89
modify trx handle
xxo1shine Mar 19, 2019
269ce10
modiyf log
xxo1shine Mar 19, 2019
d24a54d
Merge branch 'develop' of https://github.com/tronprotocol/java-tron i…
xxo1shine Mar 20, 2019
b7ca6c4
modify http start func
xxo1shine Mar 20, 2019
43dc293
merge develop
xxo1shine Mar 20, 2019
547764c
Merge branch 'develop' of https://github.com/tronprotocol/java-tron i…
xxo1shine Mar 22, 2019
824ca31
modify tronNet processException
xxo1shine Mar 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/main/java/org/tron/common/application/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.tron.core.config.args.Args;
import org.tron.core.db.BlockStore;
import org.tron.core.db.Manager;
import org.tron.core.net.node.Node;

public interface Application {

Expand All @@ -36,8 +35,6 @@ public interface Application {

void shutdownServices();

Node getP2pNode();

BlockStore getBlockStoreS();

void addService(Service service);
Expand Down
41 changes: 6 additions & 35 deletions src/main/java/org/tron/common/application/ApplicationImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,23 @@
import org.tron.core.config.args.Args;
import org.tron.core.db.BlockStore;
import org.tron.core.db.Manager;
import org.tron.core.net.node.Node;
import org.tron.core.net.node.NodeDelegate;
import org.tron.core.net.node.NodeDelegateImpl;
import org.tron.core.net.node.NodeImpl;
import org.tron.core.net.TronNetService;

@Slf4j(topic = "app")
@Component
public class ApplicationImpl implements Application {

@Autowired
private NodeImpl p2pNode;

private BlockStore blockStoreDb;
private ServiceContainer services;
private NodeDelegate nodeDelegate;

@Autowired
private TronNetService tronNetService;

@Autowired
private Manager dbManager;

private boolean isProducer;


private void resetP2PNode() {
p2pNode.listen();
p2pNode.syncFrom(null);
}

@Override
public void setOptions(Args args) {
// not used
Expand All @@ -44,7 +34,6 @@ public void setOptions(Args args) {
public void init(Args args) {
blockStoreDb = dbManager.getBlockStore();
services = new ServiceContainer();
nodeDelegate = new NodeDelegateImpl(dbManager);
}

@Override
Expand All @@ -61,19 +50,17 @@ public void initServices(Args args) {
* start up the app.
*/
public void startup() {
p2pNode.setNodeDelegate(nodeDelegate);
resetP2PNode();
tronNetService.start();
}

@Override
public void shutdown() {
logger.info("******** begin to shutdown ********");
//p2pNode.shutDown();
tronNetService.close();
synchronized (dbManager.getRevokingStore()) {
closeRevokingStore();
closeAllStore();
}
closeConnection();
dbManager.stopRepushThread();
dbManager.stopRepushTriggerThread();
EventPluginLoader.getInstance().stopPlugin();
Expand All @@ -90,11 +77,6 @@ public void shutdownServices() {
services.stop();
}

@Override
public Node getP2pNode() {
return p2pNode;
}

@Override
public BlockStore getBlockStoreS() {
return blockStoreDb;
Expand All @@ -113,17 +95,6 @@ public void setIsProducer(boolean producer) {
isProducer = producer;
}

private void closeConnection() {
logger.info("******** begin to shutdown connection ********");
try {
p2pNode.close();
} catch (Exception e) {
logger.info("failed to close p2pNode. " + e);
} finally {
logger.info("******** end to shutdown connection ********");
}
}

private void closeRevokingStore() {
logger.info("******** begin to closeRevokingStore ********");
dbManager.getRevokingStore().shutdown();
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/org/tron/common/application/CliApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.tron.core.config.args.Args;
import org.tron.core.db.BlockStore;
import org.tron.core.db.Manager;
import org.tron.core.net.node.Node;

public class CliApplication implements Application {

Expand Down Expand Up @@ -56,11 +55,6 @@ public void shutdownServices() {

}

@Override
public Node getP2pNode() {
return null;
}

@Override
public BlockStore getBlockStoreS() {
return null;
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/org/tron/common/overlay/client/PeerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.tron.common.overlay.discover.node.Node;
import org.tron.common.overlay.discover.node.NodeHandler;
import org.tron.common.overlay.server.TronChannelInitializer;
import org.tron.core.config.args.Args;
import org.tron.core.net.node.NodeImpl;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
Expand All @@ -29,10 +27,6 @@ public class PeerClient {
@Autowired
private ApplicationContext ctx;

@Autowired
@Lazy
private NodeImpl node;

private EventLoopGroup workerGroup;

public PeerClient() {
Expand Down Expand Up @@ -61,7 +55,7 @@ public ChannelFuture connectAsync(NodeHandler nodeHandler, boolean discoveryMode
return connectAsync(node.getHost(), node.getPort(), node.getHexId(), discoveryMode)
.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
logger.error("connect to {}:{} fail,cause:{}", node.getHost(), node.getPort(),
logger.warn("connect to {}:{} fail,cause:{}", node.getHost(), node.getPort(),
future.cause().getMessage());
nodeHandler.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.CONNECT_FAIL);
nodeHandler.getNodeStatistics().notifyDisconnect();
Expand All @@ -70,14 +64,13 @@ public ChannelFuture connectAsync(NodeHandler nodeHandler, boolean discoveryMode
});
}

public ChannelFuture connectAsync(String host, int port, String remoteId, boolean discoveryMode) {
private ChannelFuture connectAsync(String host, int port, String remoteId, boolean discoveryMode) {

logger.info("connect peer {} {} {}", host, port, remoteId);

TronChannelInitializer tronChannelInitializer = ctx
.getBean(TronChannelInitializer.class, remoteId);
tronChannelInitializer.setPeerDiscoveryMode(discoveryMode);
tronChannelInitializer.setNodeImpl(node);

Bootstrap b = new Bootstrap();
b.group(workerGroup);
Expand Down
52 changes: 18 additions & 34 deletions src/main/java/org/tron/common/overlay/server/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -39,8 +40,7 @@
import org.tron.common.overlay.message.StaticMessages;
import org.tron.core.db.ByteArrayWrapper;
import org.tron.core.exception.P2pException;
import org.tron.core.net.peer.PeerConnectionDelegate;
import org.tron.core.net.peer.TronHandler;
import org.tron.core.net.TronNetHandler;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
Expand Down Expand Up @@ -70,7 +70,7 @@ public class Channel {
private P2pHandler p2pHandler;

@Autowired
private TronHandler tronHandler;
private TronNetHandler tronNetHandler;

private ChannelManager channelManager;

Expand All @@ -82,8 +82,6 @@ public class Channel {

private long startTime;

private PeerConnectionDelegate peerDel;

private TronState tronState = TronState.INIT;

protected NodeStatistics nodeStatistics;
Expand All @@ -99,7 +97,7 @@ public class Channel {
private boolean isTrustPeer;

public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMode,
ChannelManager channelManager, PeerConnectionDelegate peerDel) {
ChannelManager channelManager) {

this.channelManager = channelManager;

Expand All @@ -118,18 +116,14 @@ public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMod
//handshake first
pipeline.addLast("handshakeHandler", handshakeHandler);

this.peerDel = peerDel;

messageCodec.setChannel(this);
msgQueue.setChannel(this);
handshakeHandler.setChannel(this, remoteId);
p2pHandler.setChannel(this);
tronHandler.setChannel(this);
tronNetHandler.setChannel(this);

p2pHandler.setMsgQueue(msgQueue);
tronHandler.setMsgQueue(msgQueue);
tronHandler.setPeerDel(peerDel);

tronNetHandler.setMsgQueue(msgQueue);
}

public void publicHandshakeFinished(ChannelHandlerContext ctx, HelloMessage msg) {
Expand All @@ -138,7 +132,7 @@ public void publicHandshakeFinished(ChannelHandlerContext ctx, HelloMessage msg)
msgQueue.activate(ctx);
ctx.pipeline().addLast("messageCodec", messageCodec);
ctx.pipeline().addLast("p2p", p2pHandler);
ctx.pipeline().addLast("data", tronHandler);
ctx.pipeline().addLast("data", tronNetHandler);
setStartTime(msg.getTimestamp());
setTronState(TronState.HANDSHAKE_FINISHED);
getNodeStatistics().p2pHandShake.add();
Expand All @@ -158,7 +152,10 @@ public void disconnect(ReasonCode reason) {
this.isDisconnect = true;
channelManager.processDisconnect(this, reason);
DisconnectMessage msg = new DisconnectMessage(reason);
logger.info("Send to {}, {}", ctx.channel().remoteAddress(), msg);
logger.info("Send to {} online-time {}s, {}",
ctx.channel().remoteAddress(),
(System.currentTimeMillis() - startTime) / 1000,
msg);
getNodeStatistics().nodeDisconnectedLocal(reason);
ctx.writeAndFlush(msg.getSendData()).addListener(future -> close());
}
Expand All @@ -168,17 +165,15 @@ public void processException(Throwable throwable) {
while (baseThrowable.getCause() != null) {
baseThrowable = baseThrowable.getCause();
}
String errMsg = throwable.getMessage();
SocketAddress address = ctx.channel().remoteAddress();
if (throwable instanceof ReadTimeoutException) {
logger.error("Read timeout, {}", address);
if (throwable instanceof ReadTimeoutException ||
throwable instanceof IOException) {
logger.warn("Close peer {}, reason: {}", address, throwable.getMessage());
} else if (baseThrowable instanceof P2pException) {
logger.error("type: {}, info: {}, {}", ((P2pException) baseThrowable).getType(),
baseThrowable.getMessage(), address);
} else if (errMsg != null && errMsg.contains("Connection reset by peer")) {
logger.error("{}, {}", errMsg, address);
logger.warn("Close peer {}, type: {}, info: {}",
address,((P2pException) baseThrowable).getType(), baseThrowable.getMessage());
} else {
logger.error("exception caught, {}", address, throwable);
logger.error("Close peer {}, exception caught", address, throwable);
}
close();
}
Expand Down Expand Up @@ -224,10 +219,6 @@ public void setChannelHandlerContext(ChannelHandlerContext ctx) {
this.inetSocketAddress = ctx == null ? null : (InetSocketAddress) ctx.channel().remoteAddress();
}

public ChannelHandlerContext getChannelHandlerContext() {
return this.ctx;
}

public InetAddress getInetAddress() {
return ctx == null ? null : ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
}
Expand All @@ -246,10 +237,7 @@ public long getStartTime() {

public void setTronState(TronState tronState) {
this.tronState = tronState;
}

public TronState getTronState() {
return tronState;
logger.info("Peer {} status change to {}.", inetSocketAddress, tronState);
}

public boolean isActive() {
Expand All @@ -260,10 +248,6 @@ public boolean isDisconnect() {
return isDisconnect;
}

public boolean isProtocolsInitialized() {
return tronState.ordinal() > TronState.INIT.ordinal();
}

public boolean isTrustPeer() {
return isTrustPeer;
}
Expand Down
29 changes: 14 additions & 15 deletions src/main/java/org/tron/common/overlay/server/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@
@Component
public class ChannelManager {

@Autowired
private PeerServer peerServer;

@Autowired
private PeerClient peerClient;

@Autowired
private SyncPool syncPool;

private Args args = Args.getInstance();

private final Map<ByteArrayWrapper, Channel> activePeers = new ConcurrentHashMap<>();

private Cache<InetAddress, ReasonCode> badPeers = CacheBuilder.newBuilder().maximumSize(10000)
Expand All @@ -38,25 +49,10 @@ public class ChannelManager {
@Getter
private Map<InetAddress, Node> trustPeers = new ConcurrentHashMap();

private Args args = Args.getInstance();

private int maxActivePeers = args.getNodeMaxActiveNodes();

private int getMaxActivePeersWithSameIp = args.getNodeMaxActiveNodesWithSameIp();

private PeerServer peerServer;

private PeerClient peerClient;

@Autowired
private SyncPool syncPool;

@Autowired
private ChannelManager(final PeerServer peerServer, final PeerClient peerClient) {
this.peerServer = peerServer;
this.peerClient = peerClient;
}

public void init() {
if (this.args.getNodeListenPort() > 0) {
new Thread(() -> peerServer.start(Args.getInstance().getNodeListenPort()),
Expand All @@ -67,6 +63,8 @@ public void init() {
trustPeers.put(new InetSocketAddress(node.getHost(), node.getPort()).getAddress(), node);
}
logger.info("Trust peer size {}", trustPeers.size());

syncPool.init();
}

public void processDisconnect(Channel channel, ReasonCode reason) {
Expand Down Expand Up @@ -164,5 +162,6 @@ public Cache<InetAddress, ReasonCode> getBadPeers() {
public void close() {
peerServer.close();
peerClient.close();
syncPool.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void activate(ChannelHandlerContext ctx) {
}
Message msg = msgQueue.take();
ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
if (!future.isSuccess() && !channel.isDisconnect()) {
logger.error("Fail send to {}, {}", ctx.channel().remoteAddress(), msg);
}
});
Expand Down
Loading