Skip to content

Commit

Permalink
Merge pull request #4607 from halibobo1205/feature/liteTool_opt
Browse files Browse the repository at this point in the history
feat(liteFullNodeTool): optimize liteFullNodeTool
  • Loading branch information
halibobo1205 authored Aug 31, 2022
2 parents a0874d4 + 046eab5 commit 626e31e
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 168 deletions.
222 changes: 106 additions & 116 deletions framework/src/main/java/org/tron/tool/litefullnode/LiteFullNodeTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.internal.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.primitives.Bytes;
Expand All @@ -16,9 +17,9 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.LongStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -47,50 +48,23 @@ public class LiteFullNodeTool {
private static final String INFO_FILE_NAME = "info.properties";
private static final String BACKUP_DIR_PREFIX = ".bak_";
private static final String CHECKPOINT_DB = "tmp";
private static final long VM_NEED_RECENT_BLKS = 256;
private static long RECENT_BLKS = 65536;

private static final String BLOCK_DB_NAME = "block";
private static final String BLOCK_INDEX_DB_NAME = "block-index";
private static final String TRANS_CACHE_DB_NAME = "trans-cache";
private static final String TRANS_DB_NAME = "trans";
private static final String COMMON_DB_NAME = "common";
private static final String TRANSACTION_RET_DB_NAME = "transactionRetStore";
private static final String TRANSACTION_HISTORY_DB_NAME = "transactionHistoryStore";

private static final String DIR_FORMAT_STRING = "%s%s%s";

private static List<String> archiveDbs = Arrays.asList(
BLOCK_DB_NAME,
BLOCK_INDEX_DB_NAME,
"trans",
"transactionRetStore",
"transactionHistoryStore");
private static List<String> minimumDbsForLiteNode = Arrays.asList(
"DelegatedResource",
"DelegatedResourceAccountIndex",
"IncrementalMerkleTree",
"account",
"account-index",
"accountTrie",
"accountid-index",
"asset-issue",
"asset-issue-v2",
//"block_KDB",
"code",
//"common",
"contract",
"delegation",
"exchange",
"exchange-v2",
//"nullifier",
"properties",
"proposal",
"recent-block",
//"recent-transaction",
"storage-row",
//TRANS_CACHE_DB_NAME,
//"tree-block-index",
"votes",
"witness",
"witness_schedule"
);
TRANS_DB_NAME,
TRANSACTION_RET_DB_NAME,
TRANSACTION_HISTORY_DB_NAME);

/**
* Create the snapshot dataset.
Expand All @@ -104,13 +78,12 @@ public void generateSnapshot(String sourceDir, String snapshotDir) {
long start = System.currentTimeMillis();
snapshotDir = Paths.get(snapshotDir, SNAPSHOT_DIR_NAME).toString();
try {
hasEnoughBlock(sourceDir);
List<String> snapshotDbs = getSnapshotDbs(sourceDir);
split(sourceDir, snapshotDir, snapshotDbs);
mergeCheckpoint2Snapshot(sourceDir, snapshotDir);
// write genesisBlock and latestBlock
fillSnapshotBlockDb(sourceDir, snapshotDir);
// create tran-cache if not exist, for compatible
checkTranCacheStore(sourceDir, snapshotDir);
// write genesisBlock , latest recent blocks and trans
fillSnapshotBlockAndTransDb(sourceDir, snapshotDir);
generateInfoProperties(Paths.get(snapshotDir, INFO_FILE_NAME).toString(), sourceDir);
} catch (IOException | RocksDBException e) {
logger.error("create snapshot failed, " + e.getMessage());
Expand All @@ -132,6 +105,11 @@ public void generateHistory(String sourceDir, String historyDir) {
long start = System.currentTimeMillis();
historyDir = Paths.get(historyDir, HISTORY_DIR_NAME).toString();
try {
if (isLite(sourceDir)) {
throw new IllegalStateException(
String.format("Unavailable sourceDir: %s is not fullNode data.", sourceDir));
}
hasEnoughBlock(sourceDir);
split(sourceDir, historyDir, archiveDbs);
mergeCheckpoint2History(sourceDir, historyDir);
generateInfoProperties(Paths.get(historyDir, INFO_FILE_NAME).toString(), sourceDir);
Expand All @@ -155,6 +133,12 @@ public void completeHistoryData(String historyDir, String databaseDir) {
long start = System.currentTimeMillis();
BlockNumInfo blockNumInfo = null;
try {
// check historyDir is from lite data
if (isLite(historyDir)) {
throw new IllegalStateException(
String.format("Unavailable history: %s is not generated by fullNode data.",
historyDir));
}
// 1. check block number and genesis block are compatible,
// and return the block numbers of snapshot and history
blockNumInfo = checkAndGetBlockNumInfo(historyDir, databaseDir);
Expand Down Expand Up @@ -183,11 +167,6 @@ private List<String> getSnapshotDbs(String sourceDir) {
.filter(File::isDirectory)
.filter(dir -> !archiveDbs.contains(dir.getName()))
.forEach(dir -> snapshotDbs.add(dir.getName()));
for (String dir : minimumDbsForLiteNode) {
if (!snapshotDbs.contains(dir)) {
throw new RuntimeException("databaseDir does not contain all the necessary databases");
}
}
return snapshotDbs;
}

Expand Down Expand Up @@ -270,88 +249,50 @@ private long getLatestBlockHeaderNum(String databaseDir) throws IOException, Roc
}

/**
* Syncing block from peer that needs latest block and genesis block,
* also VM need recent blocks.
* recent blocks, trans and genesis block.
*/
private void fillSnapshotBlockDb(String sourceDir, String snapshotDir)
private void fillSnapshotBlockAndTransDb(String sourceDir, String snapshotDir)
throws IOException, RocksDBException {
logger.info("-- begin to fill latest block and genesis block to snapshot");
logger.info("-- begin to fill {} block , genesis block and trans to snapshot", RECENT_BLKS);
DBInterface sourceBlockIndexDb = DbTool.getDB(sourceDir, BLOCK_INDEX_DB_NAME);
DBInterface sourceBlockDb = DbTool.getDB(sourceDir, BLOCK_DB_NAME);
DBInterface destBlockDb = DbTool.getDB(snapshotDir, BLOCK_DB_NAME);
DBInterface destBlockIndexDb = DbTool.getDB(snapshotDir, BLOCK_INDEX_DB_NAME);
DBInterface destTransDb = DbTool.getDB(snapshotDir, TRANS_DB_NAME);
// put genesis block and block-index into snapshot
long genesisBlockNum = 0L;
byte[] genesisBlockID = sourceBlockIndexDb.get(ByteArray.fromLong(genesisBlockNum));
destBlockIndexDb.put(ByteArray.fromLong(genesisBlockNum), genesisBlockID);
destBlockDb.put(genesisBlockID, sourceBlockDb.get(genesisBlockID));

long latestBlockNum = getLatestBlockHeaderNum(sourceDir);
long startIndex = latestBlockNum > VM_NEED_RECENT_BLKS
? latestBlockNum - VM_NEED_RECENT_BLKS : 0;
// put the recent blocks in snapshot, VM needs recent 256 blocks.
LongStream.rangeClosed(startIndex, latestBlockNum).forEach(
blockNum -> {
byte[] blockId = null;
byte[] block = null;
try {
blockId = getDataFromSourceDB(sourceDir, BLOCK_INDEX_DB_NAME,
Longs.toByteArray(blockNum));
block = getDataFromSourceDB(sourceDir, BLOCK_DB_NAME, blockId);
} catch (IOException | RocksDBException e) {
throw new RuntimeException(e.getMessage());
}
// put recent blocks index into snapshot
destBlockIndexDb.put(ByteArray.fromLong(blockNum), blockId);
// put latest blocks into snapshot
destBlockDb.put(blockId, block);
});
long startIndex = latestBlockNum - RECENT_BLKS + 1;
// put the recent blocks and trans in snapshot
for (long blockNum = startIndex; blockNum <= latestBlockNum; blockNum++) {
try {
byte[] blockId = getDataFromSourceDB(sourceDir, BLOCK_INDEX_DB_NAME,
Longs.toByteArray(blockNum));
byte[] block = getDataFromSourceDB(sourceDir, BLOCK_DB_NAME, blockId);
// put block
destBlockDb.put(blockId, block);
// put block index
destBlockIndexDb.put(ByteArray.fromLong(blockNum), blockId);
// put trans
long finalBlockNum = blockNum;
new BlockCapsule(block).getTransactions().stream().map(
tc -> tc.getTransactionId().getBytes())
.map(bytes -> Maps.immutableEntry(bytes, Longs.toByteArray(finalBlockNum)))
.forEach(e -> destTransDb.put(e.getKey(), e.getValue()));
} catch (IOException | RocksDBException | BadItemException e) {
throw new RuntimeException(e.getMessage());
}
}

DBInterface destCommonDb = DbTool.getDB(snapshotDir, COMMON_DB_NAME);
destCommonDb.put(DB_KEY_NODE_TYPE, ByteArray.fromInt(Constant.NODE_TYPE_LIGHT_NODE));
destCommonDb.put(DB_KEY_LOWEST_BLOCK_NUM, ByteArray.fromLong(startIndex));
}

private void checkTranCacheStore(String sourceDir, String snapshotDir)
throws IOException, RocksDBException {
logger.info("-- create trans-cache db if not exists.");
if (FileUtil.isExists(String.format(DIR_FORMAT_STRING, snapshotDir,
File.separator, TRANS_CACHE_DB_NAME))) {
return;
}
// fullnode is old version, create trans-cache database
DBInterface recentBlockDb = DbTool.getDB(snapshotDir, "recent-block");
DBInterface transCacheDb = DbTool.getDB(snapshotDir, TRANS_CACHE_DB_NAME);
long headNum = getLatestBlockHeaderNum(sourceDir);
long recentBlockCount = recentBlockDb.size();

LongStream.rangeClosed(headNum - recentBlockCount + 1, headNum).forEach(
blockNum -> {
byte[] blockId = null;
byte[] block = null;
try {
blockId = getDataFromSourceDB(sourceDir, BLOCK_INDEX_DB_NAME,
Longs.toByteArray(blockNum));
block = getDataFromSourceDB(sourceDir, BLOCK_DB_NAME, blockId);
} catch (IOException | RocksDBException e) {
throw new RuntimeException(e.getMessage());
}
BlockCapsule blockCapsule = null;
try {
blockCapsule = new BlockCapsule(block);
} catch (BadItemException e) {
throw new RuntimeException("construct block failed, num: " + blockNum);
}
if (blockCapsule.getTransactions().isEmpty()) {
return;
}
blockCapsule.getTransactions().stream()
.map(tc -> tc.getTransactionId().getBytes())
.map(bytes -> Maps.immutableEntry(bytes, Longs.toByteArray(blockNum)))
.forEach(e -> transCacheDb.put(e.getKey(), e.getValue()));
});
}

private byte[] getGenesisBlockHash(String parentDir) throws IOException, RocksDBException {
long genesisBlockNum = 0L;
DBInterface blockIndexDb = DbTool.getDB(parentDir, BLOCK_INDEX_DB_NAME);
Expand Down Expand Up @@ -424,8 +365,8 @@ private void trimHistory(String databaseDir, BlockNumInfo blockNumInfo)
logger.info("-- begin to trim the history data.");
DBInterface blockIndexDb = DbTool.getDB(databaseDir, BLOCK_INDEX_DB_NAME);
DBInterface blockDb = DbTool.getDB(databaseDir, BLOCK_DB_NAME);
DBInterface transDb = DbTool.getDB(databaseDir, "trans");
DBInterface tranRetDb = DbTool.getDB(databaseDir, "transactionRetStore");
DBInterface transDb = DbTool.getDB(databaseDir, TRANS_DB_NAME);
DBInterface tranRetDb = DbTool.getDB(databaseDir, TRANSACTION_RET_DB_NAME);
for (long n = blockNumInfo.getHistoryBlkNum(); n > blockNumInfo.getSnapshotBlkNum(); n--) {
byte[] blockIdHash = blockIndexDb.get(ByteArray.fromLong(n));
BlockCapsule block = new BlockCapsule(blockDb.get(blockIdHash));
Expand Down Expand Up @@ -460,12 +401,15 @@ private void mergeBak2Database(String databaseDir) throws IOException, RocksDBEx
private byte[] getDataFromSourceDB(String sourceDir, String dbName, byte[] key)
throws IOException, RocksDBException {
DBInterface sourceDb = DbTool.getDB(sourceDir, dbName);
DBInterface checkpointDb = DbTool.getDB(sourceDir, "tmp");
byte[] value = sourceDb.get(key);
if (isEmptyBytes(value)) {
byte[] valueFromTmp = checkpointDb.get(Bytes.concat(simpleEncode(dbName), key));
DBInterface checkpointDb = DbTool.getDB(sourceDir, CHECKPOINT_DB);
// get data from tmp first.
byte[] valueFromTmp = checkpointDb.get(Bytes.concat(simpleEncode(dbName), key));
byte[] value;
if (isEmptyBytes(valueFromTmp)) {
value = sourceDb.get(key);
} else {
value = valueFromTmp.length == 1
? null : Arrays.copyOfRange(valueFromTmp, 1, valueFromTmp.length);
? null : Arrays.copyOfRange(valueFromTmp, 1, valueFromTmp.length);
}
if (isEmptyBytes(value)) {
throw new RuntimeException(String.format("data not found in store, dbName: %s, key: %s",
Expand All @@ -489,8 +433,7 @@ private static boolean isEmptyBytes(byte[] b) {
private void deleteSnapshotFlag(String databaseDir) throws IOException, RocksDBException {
logger.info("-- delete the info file.");
Files.delete(Paths.get(databaseDir, INFO_FILE_NAME));
DBInterface destBlockIndexDb = DbTool.getDB(databaseDir, BLOCK_INDEX_DB_NAME);
if (destBlockIndexDb.get(ByteArray.fromLong(1)) != null) {
if (!isLite(databaseDir)) {
DBInterface destCommonDb = DbTool.getDB(databaseDir, COMMON_DB_NAME);
destCommonDb.delete(DB_KEY_NODE_TYPE);
destCommonDb.delete(DB_KEY_LOWEST_BLOCK_NUM);
Expand All @@ -500,6 +443,53 @@ private void deleteSnapshotFlag(String databaseDir) throws IOException, RocksDBE

}

private void hasEnoughBlock(String sourceDir) throws RocksDBException, IOException {
// check latest
long latest = getLatestBlockHeaderNum(sourceDir);
// check first, not 0;
long first = 0;
DBInterface sourceBlockIndexDb = DbTool.getDB(sourceDir, BLOCK_INDEX_DB_NAME);
DBIterator iterator = sourceBlockIndexDb.iterator();
iterator.seekToFirst();
if (iterator.hasNext()) {
iterator.next();
if (iterator.hasNext()) {
first = Longs.fromByteArray(iterator.getKey());
}
}

if (latest - first + 1 < RECENT_BLKS) {
throw new NoSuchElementException(
String.format("At least %d blocks in block store, actual latestBlock:%d, firstBlock:%d.",
RECENT_BLKS, latest, first));
}
}

private boolean isLite(String databaseDir) throws RocksDBException, IOException {
DBInterface sourceDb = DbTool.getDB(databaseDir, BLOCK_INDEX_DB_NAME);
DBInterface checkpointDb = DbTool.getDB(databaseDir, CHECKPOINT_DB);
byte[] key = ByteArray.fromLong(1);
byte[] valueFromTmp = checkpointDb.get(Bytes.concat(simpleEncode(BLOCK_INDEX_DB_NAME), key));
byte[] value;
if (isEmptyBytes(valueFromTmp)) {
value = sourceDb.get(key);
} else {
value = valueFromTmp.length == 1
? null : Arrays.copyOfRange(valueFromTmp, 1, valueFromTmp.length);
}
return isEmptyBytes(value);
}

@VisibleForTesting
public static void setRecentBlks(long recentBlks) {
RECENT_BLKS = recentBlks;
}

@VisibleForTesting
public static void reSetRecentBlks() {
RECENT_BLKS = 65536;
}

private void run(Args argv) {
if (StringUtils.isBlank(argv.fnDataPath) || StringUtils.isBlank(argv.datasetPath)) {
throw new ParameterException("fnDataPath or datasetPath can't be null");
Expand Down
Loading

0 comments on commit 626e31e

Please sign in to comment.