Skip to content

Commit

Permalink
Merge pull request #3230 from HenrikJannsen/fix-bug-with-trade-messag…
Browse files Browse the repository at this point in the history
…es-pocessed-out-of-order

Fix bug with trade messages pocessed out of order
  • Loading branch information
HenrikJannsen authored Feb 23, 2025
2 parents 74b17ff + 4d13f01 commit 5d02d6c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -58,6 +56,7 @@ public class BisqEasyOpenTradeChannelService extends PrivateGroupChatChannelServ
private final BisqEasyOpenTradeChannelStore persistableStore = new BisqEasyOpenTradeChannelStore();
@Getter
private final Persistence<BisqEasyOpenTradeChannelStore> persistence;
private final Set<BisqEasyOpenTradeMessage> pendingMessages = new CopyOnWriteArraySet<>();

public BisqEasyOpenTradeChannelService(PersistenceService persistenceService,
NetworkService networkService,
Expand All @@ -76,6 +75,10 @@ public BisqEasyOpenTradeChannelService(PersistenceService persistenceService,
public void onMessage(EnvelopePayloadMessage envelopePayloadMessage) {
if (envelopePayloadMessage instanceof BisqEasyOpenTradeMessage) {
processMessage((BisqEasyOpenTradeMessage) envelopePayloadMessage);
if (!pendingMessages.isEmpty()) {
log.info("Processing pendingMessages messages");
pendingMessages.forEach(this::processMessage);
}
} else if (envelopePayloadMessage instanceof BisqEasyOpenTradeMessageReaction) {
processMessageReaction((BisqEasyOpenTradeMessageReaction) envelopePayloadMessage);
}
Expand Down Expand Up @@ -285,7 +288,8 @@ protected BisqEasyOpenTradeMessage createAndGetNewPrivateChatMessage(String mess

//todo (refactor, low prio)
@Override
protected BisqEasyOpenTradeChannel createAndGetNewPrivateChatChannel(UserProfile peer, UserIdentity myUserIdentity) {
protected BisqEasyOpenTradeChannel createAndGetNewPrivateChatChannel(UserProfile peer,
UserIdentity myUserIdentity) {
throw new RuntimeException("createNewChannel not supported at PrivateTradeChannelService. " +
"Use mediatorCreatesNewChannel or traderCreatesNewChannel instead.");
}
Expand All @@ -311,6 +315,19 @@ protected BisqEasyOpenTradeMessageReaction createAndGetNewPrivateChatMessageReac
);
}

@Override
protected void addMessageAndProcessQueuedReactions(BisqEasyOpenTradeMessage message,
BisqEasyOpenTradeChannel channel) {
addMessage(message, channel);
// Check if there are any reactions that should be added to existing messages
processQueuedReactions();

if (pendingMessages.contains(message)) {
log.info("Removing message from pendingMessages. message={}", message);
pendingMessages.remove(message);
}
}

@Override
protected Optional<BisqEasyOpenTradeChannel> createNewChannelFromReceivedMessage(BisqEasyOpenTradeMessage message) {
if (message.getBisqEasyOffer().isPresent()) {
Expand All @@ -326,9 +343,10 @@ protected Optional<BisqEasyOpenTradeChannel> createNewChannelFromReceivedMessage
// This is a very unlikely case, so we ignore it.
// It also happens if we left a trade channel and receive a message again.
// We ignore that and do not re-open the channel.
// TODO we could put it into a queue for re-processing once a valid trade message comes.
log.warn("We received the first message for a new channel without an offer. " +
"We drop that message. Message={}", message);
"We add that message to pendingMessages for re-processing when we receive the next message. " +
"Message={}", message);
pendingMessages.add(message);
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -86,6 +88,7 @@ public class BisqEasyTradeService implements PersistenceClient<BisqEasyTradeStor

private Pin authorizedAlertDataSetPin, numDaysAfterRedactingTradeDataPin;
private Scheduler numDaysAfterRedactingTradeDataScheduler;
private final Set<BisqEasyTradeMessage> pendingMessages = new CopyOnWriteArraySet<>();

public BisqEasyTradeService(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
Expand Down Expand Up @@ -206,15 +209,34 @@ private void handleBisqEasyTakeOfferMessage(BisqEasyTakeOfferRequest message) {
BisqEasyProtocol protocol = createProtocol(bisqEasyContract, message.getSender(), message.getReceiver());
protocol.handle(message);
persist();

if (!pendingMessages.isEmpty()) {
log.info("We have pendingMessages. We try to re-process them now.");
pendingMessages.forEach(this::handleBisqEasyTradeMessage);
}
}

private void handleBisqEasyTradeMessage(BisqEasyTradeMessage message) {
String tradeId = message.getTradeId();
findProtocol(tradeId).ifPresentOrElse(protocol -> {
protocol.handle(message);
persist();

if (pendingMessages.contains(message)) {
log.info("We remove message {} from pendingMessages.", message);
pendingMessages.remove(message);
}

if (!pendingMessages.isEmpty()) {
log.info("We have pendingMessages. We try to re-process them now.");
pendingMessages.forEach(this::handleBisqEasyTradeMessage);
}
},
() -> log.info("Protocol with tradeId {} not found. This is expected if the trade have been closed already", tradeId));
() -> {
log.info("Protocol with tradeId {} not found. We add the message to pendingMessages for " +
"re-processing when the next message arrives. message={}", tradeId, message);
pendingMessages.add(message);
});
}


Expand Down

0 comments on commit 5d02d6c

Please sign in to comment.