Skip to content

Commit

Permalink
Merge pull request #2543 from lucas-clemente/bundle-acks
Browse files Browse the repository at this point in the history
make sure that ACK frames are bundled with data
  • Loading branch information
marten-seemann authored Jun 1, 2020
2 parents f54953f + b91874a commit c8e5bb5
Show file tree
Hide file tree
Showing 17 changed files with 623 additions and 330 deletions.
15 changes: 15 additions & 0 deletions framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
)

type framer interface {
HasData() bool

QueueControlFrame(wire.Frame)
AppendControlFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount)

Expand Down Expand Up @@ -43,6 +45,19 @@ func newFramer(
}
}

func (f *framerI) HasData() bool {
f.mutex.Lock()
hasData := len(f.streamQueue) > 0
f.mutex.Unlock()
if hasData {
return true
}
f.controlFrameMutex.Lock()
hasData = len(f.controlFrames) > 0
f.controlFrameMutex.Unlock()
return hasData
}

func (f *framerI) QueueControlFrame(frame wire.Frame) {
f.controlFrameMutex.Lock()
f.controlFrames = append(f.controlFrames, frame)
Expand Down
29 changes: 29 additions & 0 deletions framer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ var _ = Describe("Framer", func() {
Expect(length).To(Equal(mdf.Length(version) + msf.Length(version)))
})

It("says if it has data", func() {
Expect(framer.HasData()).To(BeFalse())
f := &wire.MaxDataFrame{ByteOffset: 0x42}
framer.QueueControlFrame(f)
Expect(framer.HasData()).To(BeTrue())
frames, _ := framer.AppendControlFrames(nil, 1000)
Expect(frames).To(HaveLen(1))
Expect(framer.HasData()).To(BeFalse())
})

It("appends to the slice given", func() {
ping := &wire.PingFrame{}
mdf := &wire.MaxDataFrame{ByteOffset: 0x42}
Expand Down Expand Up @@ -99,6 +109,25 @@ var _ = Describe("Framer", func() {
Expect(length).To(Equal(f.Length(version)))
})

It("says if it has data", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil).Times(2)
Expect(framer.HasData()).To(BeFalse())
framer.AddActiveStream(id1)
Expect(framer.HasData()).To(BeTrue())
f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foo")}
f2 := &wire.StreamFrame{StreamID: id1, Data: []byte("bar")}
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, true)
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false)
frames, _ := framer.AppendStreamFrames(nil, protocol.MaxByteCount)
Expect(frames).To(HaveLen(1))
Expect(frames[0].Frame).To(Equal(f1))
Expect(framer.HasData()).To(BeTrue())
frames, _ = framer.AppendStreamFrames(nil, protocol.MaxByteCount)
Expect(frames).To(HaveLen(1))
Expect(frames[0].Frame).To(Equal(f2))
Expect(framer.HasData()).To(BeFalse())
})

It("appends to a frame slice", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil)
f := &wire.StreamFrame{
Expand Down
113 changes: 113 additions & 0 deletions integrationtests/self/packetization_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package self_test

import (
"context"
"fmt"
"net"
"sync/atomic"
"time"

"github.com/lucas-clemente/quic-go"
quicproxy "github.com/lucas-clemente/quic-go/integrationtests/tools/proxy"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Packetization", func() {
var (
server quic.Listener
proxy *quicproxy.QuicProxy
incoming uint32
outgoing uint32
)

BeforeEach(func() {
incoming = 0
outgoing = 0
var err error
server, err = quic.ListenAddr(
"localhost:0",
getTLSConfig(),
getQuicConfigForServer(&quic.Config{AcceptToken: func(net.Addr, *quic.Token) bool { return true }}),
)
Expect(err).ToNot(HaveOccurred())
serverAddr := fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port)

proxy, err = quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{
RemoteAddr: serverAddr,
DelayPacket: func(dir quicproxy.Direction, _ []byte) time.Duration {
switch dir {
case quicproxy.DirectionIncoming:
atomic.AddUint32(&incoming, 1)
case quicproxy.DirectionOutgoing:
atomic.AddUint32(&outgoing, 1)
}
return 5 * time.Millisecond
},
})
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
Expect(proxy.Close()).To(Succeed())
Expect(server.Close()).To(Succeed())
})

// In this test, the client sends 100 small messages. The server echoes these messages.
// This means that every endpoint will send 100 ack-eliciting packets in short succession.
// This test then tests that no more than 110 packets are sent in every direction, making sure that ACK are bundled.
It("bundles ACKs", func() {
const numMsg = 100

sess, err := quic.DialAddr(
fmt.Sprintf("localhost:%d", proxy.LocalPort()),
getTLSClientConfig(),
getQuicConfigForClient(nil),
)
Expect(err).ToNot(HaveOccurred())

go func() {
defer GinkgoRecover()
sess, err := server.Accept(context.Background())
Expect(err).ToNot(HaveOccurred())
str, err := sess.AcceptStream(context.Background())
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 1)
// Echo every byte received from the client.
for {
if _, err := str.Read(b); err != nil {
break
}
_, err = str.Write(b)
Expect(err).ToNot(HaveOccurred())
}
}()

str, err := sess.OpenStreamSync(context.Background())
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 1)
// Send numMsg 1-byte messages.
for i := 0; i < numMsg; i++ {
_, err = str.Write([]byte{uint8(i)})
Expect(err).ToNot(HaveOccurred())
_, err = str.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(b[0]).To(Equal(uint8(i)))
}
Expect(sess.CloseWithError(0, "")).To(Succeed())

numIncoming := atomic.LoadUint32(&incoming)
numOutgoing := atomic.LoadUint32(&outgoing)
fmt.Fprintf(GinkgoWriter, "incoming packets: %d\n", numIncoming)
fmt.Fprintf(GinkgoWriter, "outgoing packets: %d\n", numOutgoing)
Expect(numIncoming).To(And(
BeNumerically(">", numMsg),
BeNumerically("<", numMsg+10),
))
Expect(numOutgoing).To(And(
BeNumerically(">", numMsg),
BeNumerically("<", numMsg+10),
))
})
})
2 changes: 1 addition & 1 deletion internal/ackhandler/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ type ReceivedPacketHandler interface {
DropPackets(protocol.EncryptionLevel)

GetAlarmTimeout() time.Time
GetAckFrame(protocol.EncryptionLevel) *wire.AckFrame
GetAckFrame(encLevel protocol.EncryptionLevel, onlyIfQueued bool) *wire.AckFrame
}
8 changes: 4 additions & 4 deletions internal/ackhandler/received_packet_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,20 @@ func (h *receivedPacketHandler) GetAlarmTimeout() time.Time {
return utils.MinNonZeroTime(utils.MinNonZeroTime(initialAlarm, handshakeAlarm), oneRTTAlarm)
}

func (h *receivedPacketHandler) GetAckFrame(encLevel protocol.EncryptionLevel) *wire.AckFrame {
func (h *receivedPacketHandler) GetAckFrame(encLevel protocol.EncryptionLevel, onlyIfQueued bool) *wire.AckFrame {
var ack *wire.AckFrame
switch encLevel {
case protocol.EncryptionInitial:
if h.initialPackets != nil {
ack = h.initialPackets.GetAckFrame()
ack = h.initialPackets.GetAckFrame(onlyIfQueued)
}
case protocol.EncryptionHandshake:
if h.handshakePackets != nil {
ack = h.handshakePackets.GetAckFrame()
ack = h.handshakePackets.GetAckFrame(onlyIfQueued)
}
case protocol.Encryption1RTT:
// 0-RTT packets can't contain ACK frames
return h.appDataPackets.GetAckFrame()
return h.appDataPackets.GetAckFrame(onlyIfQueued)
default:
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions internal/ackhandler/received_packet_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ var _ = Describe("Received Packet Handler", func() {
Expect(handler.ReceivedPacket(3, protocol.EncryptionInitial, sendTime, true)).To(Succeed())
Expect(handler.ReceivedPacket(2, protocol.EncryptionHandshake, sendTime, true)).To(Succeed())
Expect(handler.ReceivedPacket(4, protocol.Encryption1RTT, sendTime, true)).To(Succeed())
initialAck := handler.GetAckFrame(protocol.EncryptionInitial)
initialAck := handler.GetAckFrame(protocol.EncryptionInitial, true)
Expect(initialAck).ToNot(BeNil())
Expect(initialAck.AckRanges).To(HaveLen(1))
Expect(initialAck.AckRanges[0]).To(Equal(wire.AckRange{Smallest: 2, Largest: 3}))
Expect(initialAck.DelayTime).To(BeZero())
handshakeAck := handler.GetAckFrame(protocol.EncryptionHandshake)
handshakeAck := handler.GetAckFrame(protocol.EncryptionHandshake, true)
Expect(handshakeAck).ToNot(BeNil())
Expect(handshakeAck.AckRanges).To(HaveLen(1))
Expect(handshakeAck.AckRanges[0]).To(Equal(wire.AckRange{Smallest: 1, Largest: 2}))
Expect(handshakeAck.DelayTime).To(BeZero())
oneRTTAck := handler.GetAckFrame(protocol.Encryption1RTT)
oneRTTAck := handler.GetAckFrame(protocol.Encryption1RTT, true)
Expect(oneRTTAck).ToNot(BeNil())
Expect(oneRTTAck.AckRanges).To(HaveLen(1))
Expect(oneRTTAck.AckRanges[0]).To(Equal(wire.AckRange{Smallest: 4, Largest: 5}))
Expand All @@ -64,7 +64,7 @@ var _ = Describe("Received Packet Handler", func() {
sendTime := time.Now().Add(-time.Second)
Expect(handler.ReceivedPacket(2, protocol.Encryption0RTT, sendTime, true)).To(Succeed())
Expect(handler.ReceivedPacket(3, protocol.Encryption1RTT, sendTime, true)).To(Succeed())
ack := handler.GetAckFrame(protocol.Encryption1RTT)
ack := handler.GetAckFrame(protocol.Encryption1RTT, true)
Expect(ack).ToNot(BeNil())
Expect(ack.AckRanges).To(HaveLen(1))
Expect(ack.AckRanges[0]).To(Equal(wire.AckRange{Smallest: 2, Largest: 3}))
Expand Down Expand Up @@ -93,10 +93,10 @@ var _ = Describe("Received Packet Handler", func() {
sendTime := time.Now().Add(-time.Second)
Expect(handler.ReceivedPacket(2, protocol.EncryptionInitial, sendTime, true)).To(Succeed())
Expect(handler.ReceivedPacket(1, protocol.EncryptionHandshake, sendTime, true)).To(Succeed())
Expect(handler.GetAckFrame(protocol.EncryptionInitial)).ToNot(BeNil())
Expect(handler.GetAckFrame(protocol.EncryptionInitial, true)).ToNot(BeNil())
handler.DropPackets(protocol.EncryptionInitial)
Expect(handler.GetAckFrame(protocol.EncryptionInitial)).To(BeNil())
Expect(handler.GetAckFrame(protocol.EncryptionHandshake)).ToNot(BeNil())
Expect(handler.GetAckFrame(protocol.EncryptionInitial, true)).To(BeNil())
Expect(handler.GetAckFrame(protocol.EncryptionHandshake, true)).ToNot(BeNil())
})

It("drops Handshake packets", func() {
Expand All @@ -105,10 +105,10 @@ var _ = Describe("Received Packet Handler", func() {
sendTime := time.Now().Add(-time.Second)
Expect(handler.ReceivedPacket(1, protocol.EncryptionHandshake, sendTime, true)).To(Succeed())
Expect(handler.ReceivedPacket(2, protocol.Encryption1RTT, sendTime, true)).To(Succeed())
Expect(handler.GetAckFrame(protocol.EncryptionHandshake)).ToNot(BeNil())
Expect(handler.GetAckFrame(protocol.EncryptionHandshake, true)).ToNot(BeNil())
handler.DropPackets(protocol.EncryptionInitial)
Expect(handler.GetAckFrame(protocol.EncryptionHandshake)).To(BeNil())
Expect(handler.GetAckFrame(protocol.Encryption1RTT)).ToNot(BeNil())
Expect(handler.GetAckFrame(protocol.EncryptionHandshake, true)).To(BeNil())
Expect(handler.GetAckFrame(protocol.Encryption1RTT, true)).ToNot(BeNil())
})

It("does nothing when dropping 0-RTT packets", func() {
Expand All @@ -121,15 +121,15 @@ var _ = Describe("Received Packet Handler", func() {
sentPackets.EXPECT().GetLowestPacketNotConfirmedAcked().Times(2)
Expect(handler.ReceivedPacket(1, protocol.Encryption1RTT, sendTime, true)).To(Succeed())
Expect(handler.ReceivedPacket(2, protocol.Encryption1RTT, sendTime, true)).To(Succeed())
ack := handler.GetAckFrame(protocol.Encryption1RTT)
ack := handler.GetAckFrame(protocol.Encryption1RTT, true)
Expect(ack).ToNot(BeNil())
Expect(ack.LowestAcked()).To(Equal(protocol.PacketNumber(1)))
Expect(ack.LargestAcked()).To(Equal(protocol.PacketNumber(2)))
sentPackets.EXPECT().GetLowestPacketNotConfirmedAcked()
Expect(handler.ReceivedPacket(3, protocol.Encryption1RTT, sendTime, true)).To(Succeed())
sentPackets.EXPECT().GetLowestPacketNotConfirmedAcked().Return(protocol.PacketNumber(2))
Expect(handler.ReceivedPacket(4, protocol.Encryption1RTT, sendTime, true)).To(Succeed())
ack = handler.GetAckFrame(protocol.Encryption1RTT)
ack = handler.GetAckFrame(protocol.Encryption1RTT, true)
Expect(ack).ToNot(BeNil())
Expect(ack.LowestAcked()).To(Equal(protocol.PacketNumber(2)))
Expect(ack.LargestAcked()).To(Equal(protocol.PacketNumber(4)))
Expand Down
19 changes: 10 additions & 9 deletions internal/ackhandler/received_packet_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,26 @@ func newReceivedPacketHistory() *receivedPacketHistory {
}

// ReceivedPacket registers a packet with PacketNumber p and updates the ranges
func (h *receivedPacketHistory) ReceivedPacket(p protocol.PacketNumber) {
func (h *receivedPacketHistory) ReceivedPacket(p protocol.PacketNumber) bool /* is a new packet (and not a duplicate / delayed packet) */ {
// ignore delayed packets, if we already deleted the range
if p < h.deletedBelow {
return
return false
}
h.addToRanges(p)
isNew := h.addToRanges(p)
h.maybeDeleteOldRanges()
return isNew
}

func (h *receivedPacketHistory) addToRanges(p protocol.PacketNumber) {
func (h *receivedPacketHistory) addToRanges(p protocol.PacketNumber) bool /* is a new packet (and not a duplicate / delayed packet) */ {
if h.ranges.Len() == 0 {
h.ranges.PushBack(utils.PacketInterval{Start: p, End: p})
return
return true
}

for el := h.ranges.Back(); el != nil; el = el.Prev() {
// p already included in an existing range. Nothing to do here
if p >= el.Value.Start && p <= el.Value.End {
return
return false
}

var rangeExtended bool
Expand All @@ -58,20 +59,20 @@ func (h *receivedPacketHistory) addToRanges(p protocol.PacketNumber) {
if prev != nil && prev.Value.End+1 == el.Value.Start { // merge two ranges
prev.Value.End = el.Value.End
h.ranges.Remove(el)
return
}
return // if the two ranges were not merge, we're done here
return true // if the two ranges were not merge, we're done here
}

// create a new range at the end
if p > el.Value.End {
h.ranges.InsertAfter(utils.PacketInterval{Start: p, End: p}, el)
return
return true
}
}

// create a new range at the beginning
h.ranges.InsertBefore(utils.PacketInterval{Start: p, End: p}, h.ranges.Front())
return true
}

// Delete old ranges, if we're tracking more than 500 of them.
Expand Down
Loading

0 comments on commit c8e5bb5

Please sign in to comment.