diff --git a/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeChannelService.java b/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeChannelService.java index 3db9569be1..392d83eefd 100644 --- a/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeChannelService.java +++ b/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeChannelService.java @@ -40,11 +40,9 @@ import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,6 +56,7 @@ public class BisqEasyOpenTradeChannelService extends PrivateGroupChatChannelServ private final BisqEasyOpenTradeChannelStore persistableStore = new BisqEasyOpenTradeChannelStore(); @Getter private final Persistence persistence; + private final Set pendingMessages = new CopyOnWriteArraySet<>(); public BisqEasyOpenTradeChannelService(PersistenceService persistenceService, NetworkService networkService, @@ -76,6 +75,10 @@ public BisqEasyOpenTradeChannelService(PersistenceService persistenceService, public void onMessage(EnvelopePayloadMessage envelopePayloadMessage) { if (envelopePayloadMessage instanceof BisqEasyOpenTradeMessage) { processMessage((BisqEasyOpenTradeMessage) envelopePayloadMessage); + if (!pendingMessages.isEmpty()) { + log.info("Processing pendingMessages messages"); + pendingMessages.forEach(this::processMessage); + } } else if (envelopePayloadMessage instanceof BisqEasyOpenTradeMessageReaction) { processMessageReaction((BisqEasyOpenTradeMessageReaction) envelopePayloadMessage); } @@ -285,7 +288,8 @@ protected BisqEasyOpenTradeMessage createAndGetNewPrivateChatMessage(String mess //todo (refactor, low prio) @Override - protected BisqEasyOpenTradeChannel createAndGetNewPrivateChatChannel(UserProfile peer, UserIdentity myUserIdentity) { + protected BisqEasyOpenTradeChannel createAndGetNewPrivateChatChannel(UserProfile peer, + UserIdentity myUserIdentity) { throw new RuntimeException("createNewChannel not supported at PrivateTradeChannelService. " + "Use mediatorCreatesNewChannel or traderCreatesNewChannel instead."); } @@ -311,6 +315,19 @@ protected BisqEasyOpenTradeMessageReaction createAndGetNewPrivateChatMessageReac ); } + @Override + protected void addMessageAndProcessQueuedReactions(BisqEasyOpenTradeMessage message, + BisqEasyOpenTradeChannel channel) { + addMessage(message, channel); + // Check if there are any reactions that should be added to existing messages + processQueuedReactions(); + + if (pendingMessages.contains(message)) { + log.info("Removing message from pendingMessages. message={}", message); + pendingMessages.remove(message); + } + } + @Override protected Optional createNewChannelFromReceivedMessage(BisqEasyOpenTradeMessage message) { if (message.getBisqEasyOffer().isPresent()) { @@ -326,9 +343,10 @@ protected Optional createNewChannelFromReceivedMessage // This is a very unlikely case, so we ignore it. // It also happens if we left a trade channel and receive a message again. // We ignore that and do not re-open the channel. - // TODO we could put it into a queue for re-processing once a valid trade message comes. log.warn("We received the first message for a new channel without an offer. " + - "We drop that message. Message={}", message); + "We add that message to pendingMessages for re-processing when we receive the next message. " + + "Message={}", message); + pendingMessages.add(message); return Optional.empty(); } } diff --git a/trade/src/main/java/bisq/trade/bisq_easy/BisqEasyTradeService.java b/trade/src/main/java/bisq/trade/bisq_easy/BisqEasyTradeService.java index ac88ac065c..59f4c9fe67 100644 --- a/trade/src/main/java/bisq/trade/bisq_easy/BisqEasyTradeService.java +++ b/trade/src/main/java/bisq/trade/bisq_easy/BisqEasyTradeService.java @@ -57,8 +57,10 @@ import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; @@ -86,6 +88,7 @@ public class BisqEasyTradeService implements PersistenceClient pendingMessages = new CopyOnWriteArraySet<>(); public BisqEasyTradeService(ServiceProvider serviceProvider) { this.serviceProvider = serviceProvider; @@ -206,6 +209,11 @@ private void handleBisqEasyTakeOfferMessage(BisqEasyTakeOfferRequest message) { BisqEasyProtocol protocol = createProtocol(bisqEasyContract, message.getSender(), message.getReceiver()); protocol.handle(message); persist(); + + if (!pendingMessages.isEmpty()) { + log.info("We have pendingMessages. We try to re-process them now."); + pendingMessages.forEach(this::handleBisqEasyTradeMessage); + } } private void handleBisqEasyTradeMessage(BisqEasyTradeMessage message) { @@ -213,8 +221,22 @@ private void handleBisqEasyTradeMessage(BisqEasyTradeMessage message) { findProtocol(tradeId).ifPresentOrElse(protocol -> { protocol.handle(message); persist(); + + if (pendingMessages.contains(message)) { + log.info("We remove message {} from pendingMessages.", message); + pendingMessages.remove(message); + } + + if (!pendingMessages.isEmpty()) { + log.info("We have pendingMessages. We try to re-process them now."); + pendingMessages.forEach(this::handleBisqEasyTradeMessage); + } }, - () -> log.info("Protocol with tradeId {} not found. This is expected if the trade have been closed already", tradeId)); + () -> { + log.info("Protocol with tradeId {} not found. We add the message to pendingMessages for " + + "re-processing when the next message arrives. message={}", tradeId, message); + pendingMessages.add(message); + }); }