Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject expired data #4215

Merged
merged 2 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void onAdded(Offer offer) {
.filter(item -> item.getOffer().getId().equals(offer.getId()))
.findAny();
if (candidateWithSameId.isPresent()) {
log.warn("We had an old offer in the list with the same Offer ID. Might be that the state or errorMessage was different. " +
log.warn("We had an old offer in the list with the same Offer ID. We remove the old one. " +
"old offerBookListItem={}, new offerBookListItem={}", candidateWithSameId.get(), offerBookListItem);
offerBookListItems.remove(candidateWithSameId.get());
}
Expand Down
37 changes: 21 additions & 16 deletions p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@
import bisq.network.p2p.storage.messages.RemoveMailboxDataMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.DateTolerantPayload;
import bisq.network.p2p.storage.payload.ExpirablePayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.MailboxStoragePayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
Expand Down Expand Up @@ -76,7 +75,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;


import java.security.KeyPair;
import java.security.PublicKey;

Expand Down Expand Up @@ -215,7 +213,7 @@ private Set<byte[]> getKnownPayloadHashes() {
// PersistedStoragePayload items don't get removed, so we don't have an issue with the case that
// an object gets removed in between PreliminaryGetDataRequest and the GetUpdatedDataRequest and we would
// miss that event if we do not load the full set or use some delta handling.
Set<byte[]> excludedKeys =this.appendOnlyDataStoreService.getMap().keySet().stream()
Set<byte[]> excludedKeys = this.appendOnlyDataStoreService.getMap().keySet().stream()
.map(e -> e.bytes)
.collect(Collectors.toSet());

Expand Down Expand Up @@ -248,7 +246,7 @@ static private <T extends NetworkPayload> Set<T> filterKnownHashes(
.filter(e -> limit.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.filter(networkPayload -> shouldTransmitPayloadToPeer(peerCapabilities,
objToPayload.apply(networkPayload)))
objToPayload.apply(networkPayload)))
.collect(Collectors.toSet());

if (limit.get() < 0)
Expand Down Expand Up @@ -351,7 +349,7 @@ public void processGetDataResponse(GetDataResponse getDataResponse, NodeAddress
}
} else {
// We don't broadcast here as we are only connected to the seed node and would be pointless
addPersistableNetworkPayload(e, sender,false, false, false);
addPersistableNetworkPayload(e, sender, false, false, false);
}
});
log.info("Processing {} persistableNetworkPayloads took {} ms.",
Expand All @@ -374,16 +372,15 @@ public void shutDown() {

@VisibleForTesting
void removeExpiredEntries() {
log.trace("removeExpiredEntries");
// The moment when an object becomes expired will not be synchronous in the network and we could
// get add network_messages after the object has expired. To avoid repeated additions of already expired
// object when we get it sent from new peers, we don’t remove the sequence number from the map.
// That way an ADD message for an already expired data will fail because the sequence number
// is equal and not larger as expected.
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toRemoveList =
map.entrySet().stream()
.filter(entry -> entry.getValue().isExpired(this.clock))
.collect(Collectors.toCollection(ArrayList::new));
.filter(entry -> entry.getValue().isExpired(this.clock))
.collect(Collectors.toCollection(ArrayList::new));

// Batch processing can cause performance issues, so do all of the removes first, then update the listeners
// to let them know about the removes.
Expand Down Expand Up @@ -452,7 +449,7 @@ public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection
map.values().stream()
.filter(protectedStorageEntry -> protectedStorageEntry.getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload)
.filter(protectedStorageEntry -> ((RequiresOwnerIsOnlinePayload) protectedStorageEntry.getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress))
.forEach(protectedStorageEntry -> {
.forEach(protectedStorageEntry -> {
// We only set the data back by half of the TTL and remove the data only if is has
// expired after that back dating.
// We might get connection drops which are not caused by the node going offline, so
Expand Down Expand Up @@ -567,9 +564,9 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn
}

private boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry,
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener,
boolean allowBroadcast) {
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener,
boolean allowBroadcast) {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);

Expand All @@ -580,6 +577,15 @@ private boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageE
return false;
}

// To avoid that expired data get stored and broadcast we check early for expire date.
if (protectedStorageEntry.isExpired(clock)) {
log.warn("We received an expired protectedStorageEntry from peer {}. getCreationTimeStamp={}, protectedStorageEntry={}",
sender != null ? sender.getFullAddress() : "sender is null",
new Date(protectedStorageEntry.getCreationTimeStamp()),
protectedStorageEntry);
return false;
}

ProtectedStorageEntry storedEntry = map.get(hashOfPayload);

// If we have seen a more recent operation for this payload and we have a payload locally, ignore it
Expand Down Expand Up @@ -652,7 +658,7 @@ public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage,


// If we have seen a more recent operation for this payload, we ignore the current one
if(!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload))
if (!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload))
return false;

// Verify the updated ProtectedStorageEntry is well formed and valid for update
Expand Down Expand Up @@ -723,7 +729,7 @@ public boolean remove(ProtectedStorageEntry protectedStorageEntry,
}

return true;
}
}


/**
Expand Down Expand Up @@ -1049,4 +1055,3 @@ public static MapValue fromProto(protobuf.MapValue proto) {
}
}
}