Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various improvements in GetBlocks handling #4749

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions core/src/main/java/bisq/core/app/WalletAppSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,29 +111,38 @@ void init(@Nullable Consumer<String> chainFileLockedExceptionHandler,

ObjectProperty<Throwable> walletServiceException = new SimpleObjectProperty<>();
btcInfoBinding = EasyBind.combine(walletsSetup.downloadPercentageProperty(),
walletsSetup.chainHeightProperty(),
feeService.feeUpdateCounterProperty(),
walletServiceException,
(downloadPercentage, feeUpdate, exception) -> {
(downloadPercentage, chainHeight, feeUpdate, exception) -> {
String result;
if (exception == null) {
double percentage = (double) downloadPercentage;
btcSyncProgress.set(percentage);
int bestChainHeight = walletsSetup.getChain() != null ?
walletsSetup.getChain().getBestChainHeight() :
0;
String chainHeightAsString = bestChainHeight > 0 ?
String.valueOf(bestChainHeight) :
"";
if (percentage == 1) {
result = Res.get("mainView.footer.btcInfo",
Res.get("mainView.footer.btcInfo.synchronizedWith"),
getBtcNetworkAsString(),
feeService.getFeeTextForDisplay());
String synchronizedWith = Res.get("mainView.footer.btcInfo.synchronizedWith",
getBtcNetworkAsString(), chainHeightAsString);
String info = feeService.isFeeAvailable() ?
Res.get("mainView.footer.btcFeeRate", feeService.getTxFeePerVbyte().value) :
"";
result = Res.get("mainView.footer.btcInfo", synchronizedWith, info);
getBtcSplashSyncIconId().set("image-connection-synced");

downloadCompleteHandler.run();
} else if (percentage > 0.0) {
result = Res.get("mainView.footer.btcInfo",
Res.get("mainView.footer.btcInfo.synchronizingWith"),
getBtcNetworkAsString() + ": " + FormattingUtils.formatToPercentWithSymbol(percentage), "");
String synchronizingWith = Res.get("mainView.footer.btcInfo.synchronizingWith",
getBtcNetworkAsString(), chainHeightAsString,
FormattingUtils.formatToPercentWithSymbol(percentage));
result = Res.get("mainView.footer.btcInfo", synchronizingWith, "");
} else {
result = Res.get("mainView.footer.btcInfo",
Res.get("mainView.footer.btcInfo.connectingTo"),
getBtcNetworkAsString(), "");
getBtcNetworkAsString());
}
} else {
result = Res.get("mainView.footer.btcInfo",
Expand Down Expand Up @@ -259,6 +268,7 @@ void setRejectedTxErrorMessageHandler(Consumer<String> rejectedTxErrorMessageHan
}
});
}

private String getBtcNetworkAsString() {
String postFix;
if (config.ignoreLocalBtcNode)
Expand Down
72 changes: 55 additions & 17 deletions core/src/main/java/bisq/core/btc/setup/WalletConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,66 @@
import bisq.common.config.Config;
import bisq.common.file.FileUtil;

import com.google.common.io.Closeables;
import com.google.common.util.concurrent.*;
import org.bitcoinj.core.listeners.*;
import org.bitcoinj.core.*;
import org.bitcoinj.core.BlockChain;
import org.bitcoinj.core.CheckpointManager;
import org.bitcoinj.core.Context;
import org.bitcoinj.core.NetworkParameters;
import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.PeerGroup;
import org.bitcoinj.core.listeners.DownloadProgressTracker;
import org.bitcoinj.crypto.KeyCrypter;
import org.bitcoinj.net.BlockingClientManager;
import org.bitcoinj.net.discovery.*;
import org.bitcoinj.net.discovery.DnsDiscovery;
import org.bitcoinj.net.discovery.PeerDiscovery;
import org.bitcoinj.script.Script;
import org.bitcoinj.store.*;
import org.bitcoinj.wallet.*;
import org.bitcoinj.store.BlockStore;
import org.bitcoinj.store.BlockStoreException;
import org.bitcoinj.store.SPVBlockStore;
import org.bitcoinj.wallet.DeterministicKeyChain;
import org.bitcoinj.wallet.DeterministicSeed;
import org.bitcoinj.wallet.KeyChainGroup;
import org.bitcoinj.wallet.KeyChainGroupStructure;
import org.bitcoinj.wallet.Protos;
import org.bitcoinj.wallet.Wallet;
import org.bitcoinj.wallet.WalletExtension;
import org.bitcoinj.wallet.WalletProtobufSerializer;

import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;

import com.google.common.io.Closeables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;

import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;

import org.bouncycastle.crypto.params.KeyParameter;

import org.slf4j.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.annotation.*;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.Getter;
import lombok.Setter;

import javax.annotation.Nullable;

import static bisq.common.util.Preconditions.checkDir;
import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

/**
* <p>Utility class that wraps the boilerplate needed to set up a new SPV bitcoinj app. Instantiate it with a directory
Expand Down Expand Up @@ -98,8 +128,10 @@ public class WalletConfig extends AbstractIdleService {
protected DownloadProgressTracker downloadListener;
protected InputStream checkpoints;
protected String userAgent, version;
@Nullable protected DeterministicSeed restoreFromSeed;
@Nullable protected PeerDiscovery discovery;
@Nullable
protected DeterministicSeed restoreFromSeed;
@Nullable
protected PeerDiscovery discovery;

protected volatile Context context;

Expand Down Expand Up @@ -279,7 +311,7 @@ protected void startUp() throws Exception {
}
vChain = new BlockChain(params, vStore);
vPeerGroup = createPeerGroup();
if (minBroadcastConnections > 0 )
if (minBroadcastConnections > 0)
vPeerGroup.setMinBroadcastConnections(minBroadcastConnections);
if (this.userAgent != null)
vPeerGroup.setUserAgent(userAgent, version);
Expand Down Expand Up @@ -334,7 +366,9 @@ public void onFailure(Throwable t) {
}, MoreExecutors.directExecutor());
}

private Wallet createOrLoadWallet(boolean shouldReplayWallet, File walletFile, boolean isBsqWallet) throws Exception {
private Wallet createOrLoadWallet(boolean shouldReplayWallet,
File walletFile,
boolean isBsqWallet) throws Exception {
Wallet wallet;

maybeMoveOldWalletOutOfTheWay(walletFile);
Expand Down Expand Up @@ -546,4 +580,8 @@ public void maybeAddSegwitKeychain(Wallet wallet, KeyParameter aesKey) {
}
migratedWalletToSegwit.set(true);
}

public boolean stateStartingOrRunning() {
return state() == State.STARTING || state() == State.RUNNING;
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/btc/setup/WalletsSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public NetworkParameters getParams() {

@Nullable
public BlockChain getChain() {
return walletConfig != null ? walletConfig.chain() : null;
return walletConfig != null && walletConfig.stateStartingOrRunning() ? walletConfig.chain() : null;
}

public PeerGroup getPeerGroup() {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected int getStartBlockHeight() {
if (chainHeight > genesisBlockHeight)
startBlockHeight = chainHeight + 1;

log.info("Start parse blocks:\n" +
log.info("getStartBlockHeight:\n" +
" Start block height={}\n" +
" Genesis txId={}\n" +
" Genesis block height={}\n" +
Expand Down Expand Up @@ -223,7 +223,7 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) {
log.debug("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,51 +136,62 @@ public void onAwakeFromStandby() {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetBlocksRequest) {
// We received a GetBlocksRequest from a liteNode
if (!stopped) {
final String uid = connection.getUid();
if (!getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}

@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");

UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
}
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
handleGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else if (networkEnvelope instanceof RepublishGovernanceDataRequest) {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
handleRepublishGovernanceDataRequest();
}
}

private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
if (stopped) {
log.warn("We have stopped already. We ignore that onMessage call.");
return;
}

String uid = connection.getUid();
if (getBlocksRequestHandlers.containsKey(uid)) {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");

UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
return;
}

GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}

@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
if (connection != null) {
peerManager.handleConnectionFault(connection);
}
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest(getBlocksRequest, connection);
}

private void handleRepublishGovernanceDataRequest() {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
*/
@Slf4j
class GetBlocksRequestHandler {
private static final long TIMEOUT = 120;
private static final long TIMEOUT_MIN = 3;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -89,22 +89,28 @@ public GetBlocksRequestHandler(NetworkNode networkNode, DaoStateService daoState
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connection connection) {
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
long ts = System.currentTimeMillis();
// We limit number of blocks to 6000 which is about 1.5 month.
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 6000));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight());
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT, TimeUnit.SECONDS);
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
"Building GetBlocksResponse with {} blocks took {} ms.",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight(),
rawBlocks.size(), System.currentTimeMillis() - ts);

if (timeoutTimer != null) {
timeoutTimer.stop();
log.warn("Timeout was already running. We stopped it.");
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection: " + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT_MIN, TimeUnit.MINUTES);

SettableFuture<Connection> future = networkNode.sendMessage(connection, getBlocksResponse);
Futures.addCallback(future, new FutureCallback<>() {
Expand Down Expand Up @@ -145,7 +151,7 @@ public void stop() {

private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
if (!stopped) {
log.debug(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason);
log.warn("{}, closeConnectionReason={}", errorMessage, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
} else {
Expand Down
Loading