Skip to content

Commit

Permalink
Add messages which cannot be used to create a new protocol to a pendi…
Browse files Browse the repository at this point in the history
…ngMessages set and reprocess it at next message.

This covers the case when the maker is offline while the taker/buyer takes the offer and sends the btc address. The maker once going online receives both as mailbox message without guaranteed order, and if the btc address message was processed first, it is dropped as no protocol was yet created.
  • Loading branch information
HenrikJannsen committed Feb 22, 2025
1 parent f23e7e7 commit 4d13f01
Showing 1 changed file with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -86,6 +88,7 @@ public class BisqEasyTradeService implements PersistenceClient<BisqEasyTradeStor

private Pin authorizedAlertDataSetPin, numDaysAfterRedactingTradeDataPin;
private Scheduler numDaysAfterRedactingTradeDataScheduler;
private final Set<BisqEasyTradeMessage> pendingMessages = new CopyOnWriteArraySet<>();

public BisqEasyTradeService(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
Expand Down Expand Up @@ -206,15 +209,34 @@ private void handleBisqEasyTakeOfferMessage(BisqEasyTakeOfferRequest message) {
BisqEasyProtocol protocol = createProtocol(bisqEasyContract, message.getSender(), message.getReceiver());
protocol.handle(message);
persist();

if (!pendingMessages.isEmpty()) {
log.info("We have pendingMessages. We try to re-process them now.");
pendingMessages.forEach(this::handleBisqEasyTradeMessage);
}
}

private void handleBisqEasyTradeMessage(BisqEasyTradeMessage message) {
String tradeId = message.getTradeId();
findProtocol(tradeId).ifPresentOrElse(protocol -> {
protocol.handle(message);
persist();

if (pendingMessages.contains(message)) {
log.info("We remove message {} from pendingMessages.", message);
pendingMessages.remove(message);
}

if (!pendingMessages.isEmpty()) {
log.info("We have pendingMessages. We try to re-process them now.");
pendingMessages.forEach(this::handleBisqEasyTradeMessage);
}
},
() -> log.info("Protocol with tradeId {} not found. This is expected if the trade have been closed already", tradeId));
() -> {
log.info("Protocol with tradeId {} not found. We add the message to pendingMessages for " +
"re-processing when the next message arrives. message={}", tradeId, message);
pendingMessages.add(message);
});
}


Expand Down

0 comments on commit 4d13f01

Please sign in to comment.