Skip to content

Commit

Permalink
Fix BurningManAccountingStore data races
Browse files Browse the repository at this point in the history
Multiple threads read and write to the accounting blocks list causing
data races. Luckily, the LinkedList threw a ConcurrentModificationException
to limit damage. Now, a ReadWriteLock protects the LinkedList against
data races. Multiple threads can read the list at the same time but only
one thread can write to it. Other writing threads wait until it's their
turn.

Fixes #6545
  • Loading branch information
alvasw committed Feb 3, 2023
1 parent 8dbdecd commit 4779c82
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.util.Arrays;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
Expand Down Expand Up @@ -113,7 +112,7 @@ public void start() {
CompletableFuture.runAsync(() -> {
Map<String, BalanceModel> map = new HashMap<>();
// addAccountingBlockToBalanceModel takes about 500ms for 100k items, so we run it in a non UI thread.
getBlocks().forEach(block -> addAccountingBlockToBalanceModel(map, block));
burningManAccountingStoreService.forEachBlock(block -> addAccountingBlockToBalanceModel(map, block));
UserThread.execute(() -> balanceModelByBurningManName.putAll(map));
});
}
Expand All @@ -125,7 +124,7 @@ public void start() {

public void onInitialBlockRequestsComplete() {
updateBalanceModelByAddress();
getBlocks().forEach(this::addAccountingBlockToBalanceModel);
burningManAccountingStoreService.forEachBlock(this::addAccountingBlockToBalanceModel);
}

public void onNewBlockReceived(AccountingBlock accountingBlock) {
Expand All @@ -134,37 +133,19 @@ public void onNewBlockReceived(AccountingBlock accountingBlock) {
}

public void addBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
if (!getBlocks().contains(block)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (block.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(block.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (block.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", block.getHeight(),
new Date(block.getDate()), block.getTxs().size());
burningManAccountingStoreService.addBlock(block);
} else {
log.info("We have that block already. Height: {}", block.getHeight());
}
burningManAccountingStoreService.addIfNewBlock(block);
}

public int getBlockHeightOfLastBlock() {
return getLastBlock().map(AccountingBlock::getHeight).orElse(BurningManAccountingService.EARLIEST_BLOCK_HEIGHT - 1);
}

public Optional<AccountingBlock> getLastBlock() {
return getBlocks().stream().max(Comparator.comparing(AccountingBlock::getHeight));
return burningManAccountingStoreService.getLastBlock();
}

public Optional<AccountingBlock> getBlockAtHeight(int height) {
return getBlocks().stream().filter(block -> block.getHeight() == height).findAny();
return burningManAccountingStoreService.getBlockAtHeight(height);
}

public Map<Date, Price> getAverageBsqPriceByMonth() {
Expand Down Expand Up @@ -213,8 +194,8 @@ private Stream<ReceivedBtcBalanceEntry> getReceivedBtcBalanceEntryStreamExcludin
// Delegates
///////////////////////////////////////////////////////////////////////////////////////////

public List<AccountingBlock> getBlocks() {
return burningManAccountingStoreService.getBlocks();
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return burningManAccountingStoreService.getBlocksAtLeastWithHeight(minHeight);
}

public Map<String, String> getBurningManNameByAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -92,9 +91,7 @@ public interface Listener {

public void onGetBlocksRequest(GetAccountingBlocksRequest request, Connection connection) {
long ts = System.currentTimeMillis();
List<AccountingBlock> blocks = burningManAccountingService.getBlocks().stream()
.filter(block -> block.getHeight() >= request.getFromBlockHeight())
.collect(Collectors.toList());
List<AccountingBlock> blocks = burningManAccountingService.getBlocksAtLeastWithHeight(request.getFromBlockHeight());
byte[] signature = AccountingNode.getSignature(AccountingNode.getSha256Hash(blocks), bmOracleNodePrivKey);
GetAccountingBlocksResponse getBlocksResponse = new GetAccountingBlocksResponse(blocks, request.getNonce(), bmOracleNodePubKey, signature);
log.info("Received GetAccountingBlocksRequest from {} for blocks from height {}. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,135 @@


import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;

import bisq.common.proto.persistable.PersistableEnvelope;

import com.google.protobuf.Message;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static bisq.core.dao.burningman.accounting.BurningManAccountingService.EARLIEST_BLOCK_HEIGHT;

@Slf4j
@Getter
public class BurningManAccountingStore implements PersistableEnvelope {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LinkedList<AccountingBlock> blocks = new LinkedList<>();

public BurningManAccountingStore(List<AccountingBlock> blocks) {
this.blocks.addAll(blocks);
}

public void addIfNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
tryToAddNewBlock(newBlock);
} finally {
writeLock.unlock();
}
}

public void forEachBlock(Consumer<AccountingBlock> consumer) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
blocks.forEach(consumer);
} finally {
readLock.unlock();
}
}

public void purgeLastTenBlocks() {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
purgeLast10Blocks();
} finally {
writeLock.unlock();
}
}

public Optional<AccountingBlock> getLastBlock() {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.max(Comparator.comparing(AccountingBlock::getHeight));
} finally {
readLock.unlock();
}
}

public Optional<AccountingBlock> getBlockAtHeight(int height) {
Lock readLock = readWriteLock.readLock();
try {
return blocks.stream()
.filter(block -> block.getHeight() == height)
.findAny();
} finally {
readLock.unlock();
}
}

public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.filter(block -> block.getHeight() >= minHeight)
.collect(Collectors.toList());
} finally {
readLock.unlock();
}
}

private void tryToAddNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
if (!blocks.contains(newBlock)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (newBlock.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(newBlock.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (newBlock.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", newBlock.getHeight(),
new Date(newBlock.getDate()), newBlock.getTxs().size());
blocks.add(newBlock);
} else {
log.info("We have that block already. Height: {}", newBlock.getHeight());
}
}

private void purgeLast10Blocks() {
if (blocks.size() <= 10) {
blocks.clear();
return;
}

List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
}

public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder()
.setBurningManAccountingStore(protobuf.BurningManAccountingStore.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package bisq.core.dao.burningman.accounting.storage;

import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;

import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
import bisq.network.p2p.storage.persistence.StoreService;
Expand All @@ -32,8 +34,9 @@
import java.io.File;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -60,29 +63,31 @@ public void requestPersistence() {
persistenceManager.requestPersistence();
}

public List<AccountingBlock> getBlocks() {
return Collections.unmodifiableList(store.getBlocks());
public void addIfNewBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
store.addIfNewBlock(block);
requestPersistence();
}

public void addBlock(AccountingBlock block) {
store.getBlocks().add(block);
requestPersistence();
public void forEachBlock(Consumer<AccountingBlock> consumer) {
store.forEachBlock(consumer);
}

public void purgeLastTenBlocks() {
List<AccountingBlock> blocks = store.getBlocks();
if (blocks.size() <= 10) {
blocks.clear();
requestPersistence();
return;
}

List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
store.purgeLastTenBlocks();
requestPersistence();
}

public Optional<AccountingBlock> getLastBlock() {
return store.getLastBlock();
}

public Optional<AccountingBlock> getBlockAtHeight(int height) {
return store.getBlockAtHeight(height);
}

public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return store.getBlocksAtLeastWithHeight(minHeight);
}

///////////////////////////////////////////////////////////////////////////////////////////
// Protected
Expand Down

0 comments on commit 4779c82

Please sign in to comment.