Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with trade messages pocessed out of order #3230

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -58,6 +56,7 @@ public class BisqEasyOpenTradeChannelService extends PrivateGroupChatChannelServ
private final BisqEasyOpenTradeChannelStore persistableStore = new BisqEasyOpenTradeChannelStore();
@Getter
private final Persistence<BisqEasyOpenTradeChannelStore> persistence;
private final Set<BisqEasyOpenTradeMessage> pendingMessages = new CopyOnWriteArraySet<>();

public BisqEasyOpenTradeChannelService(PersistenceService persistenceService,
NetworkService networkService,
Expand All @@ -76,6 +75,10 @@ public BisqEasyOpenTradeChannelService(PersistenceService persistenceService,
public void onMessage(EnvelopePayloadMessage envelopePayloadMessage) {
if (envelopePayloadMessage instanceof BisqEasyOpenTradeMessage) {
processMessage((BisqEasyOpenTradeMessage) envelopePayloadMessage);
if (!pendingMessages.isEmpty()) {
log.info("Processing pendingMessages messages");
pendingMessages.forEach(this::processMessage);
}
} else if (envelopePayloadMessage instanceof BisqEasyOpenTradeMessageReaction) {
processMessageReaction((BisqEasyOpenTradeMessageReaction) envelopePayloadMessage);
}
Expand Down Expand Up @@ -285,7 +288,8 @@ protected BisqEasyOpenTradeMessage createAndGetNewPrivateChatMessage(String mess

//todo (refactor, low prio)
@Override
protected BisqEasyOpenTradeChannel createAndGetNewPrivateChatChannel(UserProfile peer, UserIdentity myUserIdentity) {
protected BisqEasyOpenTradeChannel createAndGetNewPrivateChatChannel(UserProfile peer,
UserIdentity myUserIdentity) {
throw new RuntimeException("createNewChannel not supported at PrivateTradeChannelService. " +
"Use mediatorCreatesNewChannel or traderCreatesNewChannel instead.");
}
Expand All @@ -311,6 +315,19 @@ protected BisqEasyOpenTradeMessageReaction createAndGetNewPrivateChatMessageReac
);
}

@Override
protected void addMessageAndProcessQueuedReactions(BisqEasyOpenTradeMessage message,
BisqEasyOpenTradeChannel channel) {
addMessage(message, channel);
// Check if there are any reactions that should be added to existing messages
processQueuedReactions();

if (pendingMessages.contains(message)) {
log.info("Removing message from pendingMessages. message={}", message);
pendingMessages.remove(message);
}
}

@Override
protected Optional<BisqEasyOpenTradeChannel> createNewChannelFromReceivedMessage(BisqEasyOpenTradeMessage message) {
if (message.getBisqEasyOffer().isPresent()) {
Expand All @@ -326,9 +343,10 @@ protected Optional<BisqEasyOpenTradeChannel> createNewChannelFromReceivedMessage
// This is a very unlikely case, so we ignore it.
// It also happens if we left a trade channel and receive a message again.
// We ignore that and do not re-open the channel.
// TODO we could put it into a queue for re-processing once a valid trade message comes.
log.warn("We received the first message for a new channel without an offer. " +
"We drop that message. Message={}", message);
"We add that message to pendingMessages for re-processing when we receive the next message. " +
"Message={}", message);
pendingMessages.add(message);
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -86,6 +88,7 @@ public class BisqEasyTradeService implements PersistenceClient<BisqEasyTradeStor

private Pin authorizedAlertDataSetPin, numDaysAfterRedactingTradeDataPin;
private Scheduler numDaysAfterRedactingTradeDataScheduler;
private final Set<BisqEasyTradeMessage> pendingMessages = new CopyOnWriteArraySet<>();

public BisqEasyTradeService(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
Expand Down Expand Up @@ -206,15 +209,34 @@ private void handleBisqEasyTakeOfferMessage(BisqEasyTakeOfferRequest message) {
BisqEasyProtocol protocol = createProtocol(bisqEasyContract, message.getSender(), message.getReceiver());
protocol.handle(message);
persist();

if (!pendingMessages.isEmpty()) {
log.info("We have pendingMessages. We try to re-process them now.");
pendingMessages.forEach(this::handleBisqEasyTradeMessage);
}
}

private void handleBisqEasyTradeMessage(BisqEasyTradeMessage message) {
String tradeId = message.getTradeId();
findProtocol(tradeId).ifPresentOrElse(protocol -> {
protocol.handle(message);
persist();

if (pendingMessages.contains(message)) {
log.info("We remove message {} from pendingMessages.", message);
pendingMessages.remove(message);
}

if (!pendingMessages.isEmpty()) {
log.info("We have pendingMessages. We try to re-process them now.");
pendingMessages.forEach(this::handleBisqEasyTradeMessage);
}
},
() -> log.info("Protocol with tradeId {} not found. This is expected if the trade have been closed already", tradeId));
() -> {
log.info("Protocol with tradeId {} not found. We add the message to pendingMessages for " +
"re-processing when the next message arrives. message={}", tradeId, message);
pendingMessages.add(message);
});
}


Expand Down
Loading