Skip to content

Commit

Permalink
feat(db): add checkpoint v2 in case db inconsistent when ungraceful s…
Browse files Browse the repository at this point in the history
…hutdown
  • Loading branch information
tomatoishealthy committed Sep 1, 2022
1 parent 626e31e commit 70769fa
Show file tree
Hide file tree
Showing 17 changed files with 619 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ private void openDatabase(Options dbOptions) throws IOException {
}
try {
database = factory.open(dbPath.toFile(), dbOptions);
logger.info("DB {} open success with : writeBufferSize {}M,cacheSize {}M,maxOpenFiles {}.",
this.getDBName(), dbOptions.writeBufferSize() / 1024 / 1024,
dbOptions.cacheSize() / 1024 / 1024, dbOptions.maxOpenFiles());
if (!this.getDBName().startsWith("checkpoint")) {
logger.info("DB {} open success with : writeBufferSize {}M,cacheSize {}M,maxOpenFiles {}.",
this.getDBName(), dbOptions.writeBufferSize() / 1024 / 1024,
dbOptions.cacheSize() / 1024 / 1024, dbOptions.maxOpenFiles());
}
} catch (IOException e) {
if (e.getMessage().contains("Corruption:")) {
factory.repair(dbPath.toFile(), dbOptions);
Expand Down
17 changes: 11 additions & 6 deletions chainbase/src/main/java/org/tron/core/db/TronDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import com.google.protobuf.InvalidProtocolBufferException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.iq80.leveldb.WriteOptions;
import org.rocksdb.DirectComparator;
import org.springframework.beans.factory.annotation.Autowired;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.WriteOptionsWrapper;
Expand All @@ -18,11 +18,8 @@
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.StorageUtils;
import org.tron.core.db.common.DbSourceInter;
import org.tron.core.db2.common.LevelDB;
import org.tron.core.db2.common.RocksDB;
import org.tron.core.db2.common.WrappedByteArray;
import org.tron.core.db2.core.ITronChainBase;
import org.tron.core.db2.core.SnapshotRoot;
import org.tron.core.exception.BadItemException;
import org.tron.core.exception.ItemNotFoundException;

Expand All @@ -46,7 +43,7 @@ protected TronDatabase(String dbName) {
dbSource =
new LevelDbDataSourceImpl(StorageUtils.getOutputDirectoryByDbName(dbName),
dbName,
StorageUtils.getOptionsByDbName(dbName),
getOptionsByDbNameForLevelDB(dbName),
new WriteOptions().sync(CommonParameter.getInstance()
.getStorage().isDbSync()));
} else if ("ROCKSDB".equals(CommonParameter.getInstance()
Expand All @@ -55,7 +52,7 @@ protected TronDatabase(String dbName) {
CommonParameter.getInstance().getStorage().getDbDirectory()).toString();
dbSource =
new RocksDbDataSourceImpl(parentName, dbName, CommonParameter.getInstance()
.getRocksDBCustomSettings());
.getRocksDBCustomSettings(), getDirectComparator());
}

dbSource.initDB();
Expand All @@ -69,6 +66,14 @@ private void init() {
protected TronDatabase() {
}

protected org.iq80.leveldb.Options getOptionsByDbNameForLevelDB(String dbName) {
return StorageUtils.getOptionsByDbName(dbName);
}

protected DirectComparator getDirectComparator() {
return null;
}

public DbSourceInter<byte[]> getDbSource() {
return dbSource;
}
Expand Down
218 changes: 206 additions & 12 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.io.File;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -15,17 +18,26 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;

import io.prometheus.client.Histogram;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.tron.common.error.TronDBException;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.prometheus.MetricKeys;
import org.tron.common.prometheus.Metrics;
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.StorageUtils;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.db.RevokingDatabase;
import org.tron.core.db2.ISession;
import org.tron.core.db2.common.DB;
Expand All @@ -34,14 +46,17 @@
import org.tron.core.db2.common.Value;
import org.tron.core.db2.common.WrappedByteArray;
import org.tron.core.exception.RevokingStoreIllegalStateException;
import org.tron.core.store.CheckPointV2Store;
import org.tron.core.store.CheckTmpStore;

@Slf4j(topic = "DB")
public class SnapshotManager implements RevokingDatabase {

public static final int DEFAULT_MAX_FLUSH_COUNT = 500;
public static final int DEFAULT_MAX_FLUSH_COUNT = 200;
public static final int DEFAULT_MIN_FLUSH_COUNT = 1;
private static final int DEFAULT_STACK_MAX_SIZE = 256;
private static final long ONE_MINUTE_MILLS = 60*1000L;
private static final String CHECKPOINT_V2_DIR = "checkpoint";
@Getter
private List<Chainbase> dbs = new ArrayList<>();
@Getter
Expand All @@ -63,6 +78,8 @@ public class SnapshotManager implements RevokingDatabase {

private Map<String, ListeningExecutorService> flushServices = new HashMap<>();

private ScheduledExecutorService pruneCheckpointThread = Executors.newSingleThreadScheduledExecutor();

@Autowired
@Setter
@Getter
Expand All @@ -71,11 +88,28 @@ public class SnapshotManager implements RevokingDatabase {
@Setter
private volatile int maxFlushCount = DEFAULT_MIN_FLUSH_COUNT;

private long currentBlockNum = -1;

private int checkpointVersion = 1; // default v1

public SnapshotManager(String checkpointPath) {
}

@PostConstruct
public void init() {
checkpointVersion = CommonParameter.getInstance().getStorage().getCheckpointVersion();
// prune checkpoint
pruneCheckpointThread.scheduleWithFixedDelay(() -> {
try {
if (isV2Open() && !unChecked) {
pruneCheckpoint();
}
} catch (Throwable t) {
logger.error("Exception in prune checkpoint", t);
}
}, 10000, 3600, TimeUnit.MILLISECONDS);


exitThread = new Thread(() -> {
LockSupport.park();
// to Guarantee Some other thread invokes unpark with the current thread as the target
Expand Down Expand Up @@ -244,6 +278,7 @@ public void shutdown() {
System.err.println("******** before revokingDb size:" + size);
checkTmpStore.close();
System.err.println("******** end to pop revokingDb ********");
pruneCheckpointThread.shutdown();
}

public void updateSolidity(int hops) {
Expand Down Expand Up @@ -307,8 +342,22 @@ public void flush() {
if (shouldBeRefreshed()) {
try {
long start = System.currentTimeMillis();
deleteCheckpoint();
createCheckpoint();
if (!isV2Open()) {
Histogram.Timer requestTimer = Metrics.histogramStartTimer(
MetricKeys.Histogram.DB_FLUSH, "delete");
deleteCheckpoint();
Metrics.histogramObserve(requestTimer);

Histogram.Timer createTimer = Metrics.histogramStartTimer(
MetricKeys.Histogram.DB_FLUSH, "create");
createCheckpoint();
Metrics.histogramObserve(createTimer);
} else {
Histogram.Timer createV2 = Metrics.histogramStartTimer(
MetricKeys.Histogram.DB_FLUSH, "create2");
createCheckpointV2();
Metrics.histogramObserve(createV2);
}
long checkPointEnd = System.currentTimeMillis();
refresh();
flushCount = 0;
Expand Down Expand Up @@ -360,25 +409,120 @@ private void createCheckpoint() {
}
}

private void deleteCheckpoint() {
private void createCheckpointV2() {
CheckPointV2Store checkPointV2Store = null;
try {
Map<byte[], byte[]> hmap = new HashMap<>();
if (!checkTmpStore.getDbSource().allKeys().isEmpty()) {
for (Map.Entry<byte[], byte[]> e : checkTmpStore.getDbSource()) {
hmap.put(e.getKey(), null);
Map<WrappedByteArray, WrappedByteArray> batch = new HashMap<>();
for (Chainbase db : dbs) {
Snapshot head = db.getHead();
if (Snapshot.isRoot(head)) {
return;
}

String dbName = db.getDbName();
Snapshot next = head.getRoot();
for (int i = 0; i < flushCount; ++i) {
next = next.getNext();
SnapshotImpl snapshot = (SnapshotImpl) next;
DB<Key, Value> keyValueDB = snapshot.getDb();
for (Map.Entry<Key, Value> e : keyValueDB) {
Key k = e.getKey();
Value v = e.getValue();
batch.put(WrappedByteArray.of(Bytes.concat(simpleEncode(dbName), k.getBytes())),
WrappedByteArray.of(v.encode()));
if ("block".equals(db.getDbName())) {
currentBlockNum = new BlockCapsule(v.getBytes()).getNum();
}
}
}
}

checkTmpStore.getDbSource().updateByBatch(hmap);
} catch (Exception e) {
if (currentBlockNum == -1) {
logger.error("create checkpoint failed, currentBlockNum: {}", currentBlockNum);
System.exit(-1);
}
String dbName = System.currentTimeMillis() + "_" + currentBlockNum;
checkPointV2Store = getCheckpointDB(dbName);
checkPointV2Store.getDbSource().updateByBatch(batch.entrySet().stream()
.map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll),
WriteOptionsWrapper.getInstance().sync(CommonParameter
.getInstance().getStorage().isCheckpointSync()));

} catch ( Exception e) {
throw new TronDBException(e);
} finally {
if (checkPointV2Store != null) {
checkPointV2Store.close();
}
}
}

private CheckPointV2Store getCheckpointDB(String dbName) {
return new CheckPointV2Store(CHECKPOINT_V2_DIR+"/"+dbName);
}

private List<String> getCheckpointList() {
String dbPath = Paths.get(StorageUtils.getOutputDirectoryByDbName(CHECKPOINT_V2_DIR),
CommonParameter.getInstance().getStorage().getDbDirectory()).toString();
File file = new File(Paths.get(dbPath, CHECKPOINT_V2_DIR).toString());
if (file.exists() && file.isDirectory()) {
String[] subDirs = file.list();
if (subDirs != null) {
return Arrays.stream(subDirs).sorted().collect(Collectors.toList());
}
}
return null;
}

private void deleteCheckpoint() {
checkTmpStore.reset();
}

private void pruneCheckpoint() {
if (unChecked) {
return;
}
List<String> cpList = getCheckpointList();
if (cpList == null) {
return;
}
if (cpList.size() < 3) {
return;
}
for (String cp: cpList.subList(0,3)) {
long timestamp = Long.parseLong(cp.split("_")[0]);
long blockNumber = Long.parseLong(cp.split("_")[1]);
if (System.currentTimeMillis() - timestamp < ONE_MINUTE_MILLS*2) {
break;
}
String checkpointPath = Paths.get(StorageUtils.getOutputDirectoryByDbName(CHECKPOINT_V2_DIR),
CommonParameter.getInstance().getStorage().getDbDirectory(), CHECKPOINT_V2_DIR).toString();
if (!FileUtil.recursiveDelete(Paths.get(checkpointPath, cp).toString())) {
logger.error("checkpoint prune failed, number: {}", blockNumber);
return;
}
logger.debug("checkpoint prune success, number: {}", blockNumber);
}
}

// ensure run this method first after process start.
@Override
public void check() {
for (Chainbase db : dbs) {
if (!isV2Open()) {
List<String> cpList = getCheckpointList();
if (cpList != null && cpList.size() != 0) {
logger.error("checkpoint check failed, can't convert checkpoint from v2 to v1");
System.exit(-1);
}
checkV1();
} else {
checkV2();
}
}

private void checkV1() {
for (Chainbase db: dbs) {
if (!Snapshot.isRoot(db.getHead())) {
throw new IllegalStateException("first check.");
}
Expand All @@ -389,7 +533,7 @@ public void check() {
.map(db -> Maps.immutableEntry(db.getDbName(), db))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
advance();
for (Map.Entry<byte[], byte[]> e : checkTmpStore.getDbSource()) {
for (Map.Entry<byte[], byte[]> e: checkTmpStore.getDbSource()) {
byte[] key = e.getKey();
byte[] value = e.getValue();
String db = simpleDecode(key);
Expand All @@ -414,6 +558,56 @@ public void check() {
unChecked = false;
}

private void checkV2() {
logger.info("checkpoint version: {}", CommonParameter.getInstance().getStorage().getCheckpointVersion());
logger.info("checkpoint sync: {}", CommonParameter.getInstance().getStorage().isCheckpointSync());
List<String> cpList = getCheckpointList();
if (cpList == null || cpList.size() == 0) {
logger.info("checkpoint size is 0, using v1 recover");
checkV1();
deleteCheckpoint();
return;
}

Map<String, Chainbase> dbMap = dbs.stream()
.map(db -> Maps.immutableEntry(db.getDbName(), db))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (String cp: cpList) {
long blockNumber = Long.parseLong(cp.split("_")[1]);
if (blockNumber < 0) {
logger.error("checkpoint illegal, block number: {}", blockNumber);
System.exit(-1);
}
CheckPointV2Store checkPointV2Store = getCheckpointDB(cp);
advance();
for (Map.Entry<byte[], byte[]> e: checkPointV2Store.getDbSource()) {
byte[] key = e.getKey();
byte[] value = e.getValue();
String db = simpleDecode(key);
if (dbMap.get(db) == null) {
continue;
}
byte[] realKey = Arrays.copyOfRange(key, db.getBytes().length + 4, key.length);
byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length);
if (realValue != null) {
dbMap.get(db).getHead().put(realKey, realValue);
} else {
dbMap.get(db).getHead().remove(realKey);
}
}

dbs.forEach(db -> db.getHead().getRoot().merge(db.getHead()));
retreat();
checkPointV2Store.close();
logger.info("checkpoint recover success, block number{}", blockNumber);
}
unChecked = false;
}

private boolean isV2Open() {
return checkpointVersion == 2;
}

private byte[] simpleEncode(String s) {
byte[] bytes = s.getBytes();
byte[] length = Ints.toByteArray(bytes.length);
Expand Down
Loading

0 comments on commit 70769fa

Please sign in to comment.