Skip to content

Commit

Permalink
deliver UDP packets from same connection in receiving order
Browse files Browse the repository at this point in the history
  • Loading branch information
wwqgtxx committed Sep 25, 2024
1 parent e347645 commit 5480b18
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

var (
tcpQueue = channel.NewInfiniteChannel[C.ConnContext]()
udpQueue = channel.NewInfiniteChannel[C.PacketAdapter]()
udpQueues []*channel.InfiniteChannel[C.PacketAdapter]
natTable = nat.New()
rules []C.Rule
subRules map[string][]C.Rule
Expand Down Expand Up @@ -70,8 +70,12 @@ func (t tunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) {

func (t tunnel) HandleUDPPacket(packet C.UDPPacket, metadata *C.Metadata) {
packetAdapter := C.NewPacketAdapter(packet, metadata)

hash := utils.MapHash(metadata.SourceAddress() + "-" + metadata.RemoteAddress())
queueNo := uint(hash) % uint(len(udpQueues))

select {
case udpQueue.In() <- packetAdapter:
case udpQueues[queueNo].In() <- packetAdapter:
default:
}
}
Expand Down Expand Up @@ -141,7 +145,8 @@ func TCPIn() chan<- C.ConnContext {
// UDPIn return fan-in udp queue
// Deprecated: using Tunnel instead
func UDPIn() chan<- C.PacketAdapter {
return udpQueue.In()
// compatibility: first queue is always available for external callers
return udpQueues[0].In()
}

// NatTable return nat table
Expand Down Expand Up @@ -197,8 +202,7 @@ func SetMode(m TunnelMode) {
}

// processUDP starts a loop to handle udp packet
func processUDP() {
queue := udpQueue.Out()
func processUDP(queue chan C.PacketAdapter) {
for conn := range queue {
handleUDPConn(conn)
}
Expand All @@ -209,8 +213,11 @@ func process() {
if num := runtime.GOMAXPROCS(0); num > numUDPWorkers {
numUDPWorkers = num
}
udpQueues = make([]*channel.InfiniteChannel[C.PacketAdapter], numUDPWorkers)
for i := 0; i < numUDPWorkers; i++ {
go processUDP()
queue := channel.NewInfiniteChannel[C.PacketAdapter]()
udpQueues[i] = queue
go processUDP(queue.Out())
}

queue := tcpQueue.Out()
Expand Down

0 comments on commit 5480b18

Please sign in to comment.