Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Probabilistic queue priorization: do not starve bulk requests on very fast connections #995

Merged
merged 9 commits into from
Nov 29, 2024
4 changes: 3 additions & 1 deletion src/freenet/io/comm/UdpSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ private static int getFd(DatagramSocket s) {
f.setAccessible(true);
ret = f.getInt(fdi);
} catch (Exception e) {
Logger.error(UdpSocketHandler.class, e.getMessage(), e);
if (logMINOR) { // TODO: Known Java 21 problem.
Logger.warning(UdpSocketHandler.class, e.getMessage(), e);
}
}
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion src/freenet/node/OpennetManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class OpennetManager {
* (around 20%), and (b) it ensures that nodes with 10 connections still have 3 long links, so
* long links cannot form chains and the routing still scales if the short routing is broken.
*
* See USK@ZLwcSLwqpM1527Tw1YmnSiXgzITU0neHQ11Cyl0iLmk,f6FLo3TvsEijIcJq-X3BTjjtm0ErVZwAPO7AUd9V7lY,AQACAAE/fix-link-length/7/
* See USK@ZLwcSLwqpM1527Tw1YmnSiXgzITU0neHQ11Cyl0iLmk,f6FLo3TvsEijIcJq-X3BTjjtm0ErVZwAPO7AUd9V7lY,AQACAAE/fix-link-length/22/
* (FIXME move to wiki or other permanent storage)
*/
/** Peers with more than this distance are considered "long links". */
Expand Down
23 changes: 12 additions & 11 deletions src/freenet/node/PeerMessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.Random;

import freenet.io.comm.DMT;
import freenet.support.DoublyLinkedList;
Expand Down Expand Up @@ -710,7 +711,9 @@ public boolean isEmpty() {

}

PeerMessageQueue() {
final private Random fastWeakRandom;
PeerMessageQueue(Random fastWeakRandom) {
this.fastWeakRandom = fastWeakRandom;
queuesByPriority = new PrioQueue[DMT.NUM_PRIORITIES];
for(int i=0;i<queuesByPriority.length;i++) {
if(i == DMT.PRIORITY_BULK_DATA)
Expand Down Expand Up @@ -873,25 +876,23 @@ public synchronized MessageItem grabQueuedMessageItem(int minPriority) {
if(ret != null) return ret;
}

// Include bulk or realtime, whichever is more urgent.

boolean tryRealtimeFirst = true;

// If one is empty, try the other.
// Otherwise try whichever is more urgent, favouring realtime if there is a draw.
// Realtime is supposed to be bursty.


// Include bulk or realtime, whichever is more urgent.
boolean tryRealtimeFirst;
if(queuesByPriority[DMT.PRIORITY_REALTIME_DATA].isEmpty()) {
tryRealtimeFirst = false;
} else if(queuesByPriority[DMT.PRIORITY_BULK_DATA].isEmpty()) {
tryRealtimeFirst = true;
} else if(queuesByPriority[DMT.PRIORITY_BULK_DATA].getNextUrgentTime(Long.MAX_VALUE, 0) >= queuesByPriority[DMT.PRIORITY_REALTIME_DATA].getNextUrgentTime(Long.MAX_VALUE, 0)) {
} else if(queuesByPriority[DMT.PRIORITY_BULK_DATA].getNextUrgentTime(Long.MAX_VALUE, 0)
>= queuesByPriority[DMT.PRIORITY_REALTIME_DATA].getNextUrgentTime(Long.MAX_VALUE, 0)) {
tryRealtimeFirst = true;
} else {
tryRealtimeFirst = false;
// 10% chance to use bulk in case of a draw to avoid starving the bulk queue.
tryRealtimeFirst = this.fastWeakRandom.nextInt(10) > 0;
}

// FIXME token bucket?

if(tryRealtimeFirst) {
// Try realtime first
if(logMINOR) Logger.minor(this, "Trying realtime first");
Expand Down
2 changes: 1 addition & 1 deletion src/freenet/node/PeerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public PeerNode(SimpleFieldSet fs, Node node2, NodeCrypto crypto, boolean fromLo
swapRequestsInterval = new SimpleRunningAverage(50, Node.MIN_INTERVAL_BETWEEN_INCOMING_SWAP_REQUESTS);
probeRequestsInterval = new SimpleRunningAverage(50, Node.MIN_INTERVAL_BETWEEN_INCOMING_PROBE_REQUESTS);

messageQueue = new PeerMessageQueue();
messageQueue = new PeerMessageQueue(node.getFastWeakRandom());

decrementHTLAtMaximum = node.getRandom().nextFloat() < Node.DECREMENT_AT_MAX_PROB;
decrementHTLAtMinimum = node.getRandom().nextFloat() < Node.DECREMENT_AT_MIN_PROB;
Expand Down
23 changes: 12 additions & 11 deletions test/freenet/node/NewPacketFormatTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.Test;

import freenet.crypt.BlockCipher;
import freenet.crypt.DummyRandomSource;
import freenet.crypt.ciphers.Rijndael;
import freenet.io.comm.DMT;
import freenet.io.comm.FreenetInetAddress;
Expand All @@ -33,7 +34,7 @@ public void setUp() {
@Test
public void testEmptyCreation() throws BlockedTooLongException {
NewPacketFormat npf = new NewPacketFormat(null, 0, 0);
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));
SessionKey s = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);

NPFPacket p = npf.createPacket(1400, pmq, s, false);
Expand All @@ -44,7 +45,7 @@ public void testEmptyCreation() throws BlockedTooLongException {
public void testAckOnlyCreation() throws BlockedTooLongException, InterruptedException {
BasePeerNode pn = new NullBasePeerNode();
NewPacketFormat npf = new NewPacketFormat(pn, 0, 0);
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));
SessionKey s = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);

NPFPacket p = null;
Expand All @@ -65,10 +66,10 @@ public void testAckOnlyCreation() throws BlockedTooLongException, InterruptedExc
public void testLostLastAck() throws BlockedTooLongException, InterruptedException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
PeerMessageQueue receiverQueue = new PeerMessageQueue();
PeerMessageQueue receiverQueue = new PeerMessageQueue(new DummyRandomSource(1234));
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
senderNode.currentKey = senderKey;
SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testLostLastAck() throws BlockedTooLongException, InterruptedExcepti
public void testOutOfOrderDelivery() throws BlockedTooLongException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand All @@ -142,7 +143,7 @@ public void testOutOfOrderDelivery() throws BlockedTooLongException {
public void testReceiveUnknownMessageLength() throws BlockedTooLongException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand All @@ -166,7 +167,7 @@ public void testReceiveUnknownMessageLength() throws BlockedTooLongException {
public void testResendAlreadyCompleted() throws BlockedTooLongException, InterruptedException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand Down Expand Up @@ -219,7 +220,7 @@ public SessionKey getCurrentKeyTracker() {

};
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode() {

@Override
Expand Down Expand Up @@ -284,7 +285,7 @@ public SessionKey getCurrentKeyTracker() {

};
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));

senderQueue.queueAndEstimateSize(new MessageItem(new byte[128], null, false, null, (short) 0, false, true), 1024);

Expand Down Expand Up @@ -319,7 +320,7 @@ public void handleMessage(Message msg) {

};
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode() {

@Override
Expand Down Expand Up @@ -434,7 +435,7 @@ public void testEncryption()
receiverNPF =
new NewPacketFormat(receiverNode, receiverStartSeq, senderStartSeq);

PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));

byte[] message = new byte[1024];
random.nextBytes(message);
Expand Down
10 changes: 6 additions & 4 deletions test/freenet/node/PeerMessageQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@

import org.junit.Test;

import freenet.crypt.DummyRandomSource;

public class PeerMessageQueueTest {
@Test
public void testUrgentTimeEmpty() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));
assertEquals(Long.MAX_VALUE, pmq.getNextUrgentTime(Long.MAX_VALUE, System.currentTimeMillis()));
}

@Test
public void testUrgentTime() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));

//Constructor might take some time, so grab a range
long start = System.currentTimeMillis();
Expand All @@ -37,7 +39,7 @@ public void testUrgentTime() {
* it. */
@Test
public void testUrgentTimeQueuedWrong() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));

//Constructor might take some time, so grab a range
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -67,7 +69,7 @@ public void testUrgentTimeQueuedWrong() {

@Test
public void testGrabQueuedMessageItem() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));

MessageItem itemUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false);

Expand Down