Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a single TCP outgoing buffer for each client / session #665

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@
reader *clientReader
timeDecoder *rtptime.GlobalDecoder2
mustClose bool
tcpFrame *base.InterleavedFrame
tcpBuffer []byte

// in
chOptions chan optionsReq
Expand Down Expand Up @@ -856,6 +858,11 @@
cm.start()
}

if *c.effectiveTransport == TransportTCP {
c.tcpFrame = &base.InterleavedFrame{}
c.tcpBuffer = make([]byte, c.MaxPacketSize+4)
}

if c.state == clientStatePlay && c.stdChannelSetupped {
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)

Expand Down Expand Up @@ -1902,8 +1909,18 @@
}

cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.writePacketRTP(byts, pkt, ntp)
cf := cm.formats[pkt.PayloadType]

cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))

ok := c.writer.push(func() error {
return cm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

Check warning on line 1921 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L1920-L1921

Added lines #L1920 - L1921 were not covered by tests

return nil
}

// WritePacketRTCP writes a RTCP packet to the server.
Expand All @@ -1920,7 +1937,15 @@
}

cm := c.medias[medi]
return cm.writePacketRTCP(byts)

ok := c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

Check warning on line 1946 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L1945-L1946

Added lines #L1945 - L1946 were not covered by tests

return nil
}

// PacketPTS returns the PTS of an incoming RTP packet.
Expand Down
15 changes: 0 additions & 15 deletions client_format.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package gortsplib

import (
"time"

"github.com/pion/rtcp"
"github.com/pion/rtp"

Expand Down Expand Up @@ -71,19 +69,6 @@ func (cf *clientFormat) stop() {
}
}

func (cf *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))

ok := cf.cm.c.writer.push(func() error {
return cf.cm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

return nil
}

func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) {
packets, lost := cf.udpReorderer.Process(pkt)
if lost != 0 {
Expand Down
29 changes: 6 additions & 23 deletions client_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"

"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
Expand All @@ -22,9 +21,6 @@ type clientMedia struct {
tcpChannel int
udpRTPListener *clientUDPListener
udpRTCPListener *clientUDPListener
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
}
Expand Down Expand Up @@ -115,10 +111,6 @@ func (cm *clientMedia) start() {
cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPPlay
cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPPlay
}

cm.tcpRTPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel}
cm.tcpRTCPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel + 1}
cm.tcpBuffer = make([]byte, cm.c.MaxPacketSize+4)
}

for _, ct := range cm.formats {
Expand Down Expand Up @@ -161,26 +153,17 @@ func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
}

func (cm *clientMedia) writePacketRTPInQueueTCP(payload []byte) error {
cm.tcpRTPFrame.Payload = payload
cm.c.tcpFrame.Channel = cm.tcpChannel
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.tcpRTPFrame, cm.tcpBuffer)
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
}

func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
cm.tcpRTCPFrame.Payload = payload
cm.c.tcpFrame.Channel = cm.tcpChannel + 1
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.tcpRTCPFrame, cm.tcpBuffer)
}

func (cm *clientMedia) writePacketRTCP(byts []byte) error {
ok := cm.c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

return nil
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
}

func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool {
Expand Down
8 changes: 4 additions & 4 deletions server_multicast_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (h *serverMulticastWriter) ip() net.IP {
return h.rtpl.ip()
}

func (h *serverMulticastWriter) writePacketRTP(payload []byte) error {
func (h *serverMulticastWriter) writePacketRTP(byts []byte) error {
ok := h.writer.push(func() error {
return h.rtpl.write(payload, h.rtpAddr)
return h.rtpl.write(byts, h.rtpAddr)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
Expand All @@ -78,9 +78,9 @@ func (h *serverMulticastWriter) writePacketRTP(payload []byte) error {
return nil
}

func (h *serverMulticastWriter) writePacketRTCP(payload []byte) error {
func (h *serverMulticastWriter) writePacketRTCP(byts []byte) error {
ok := h.writer.push(func() error {
return h.rtcpl.write(payload, h.rtcpAddr)
return h.rtcpl.write(byts, h.rtcpAddr)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
Expand Down
32 changes: 30 additions & 2 deletions server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@
udpCheckStreamTimer *time.Timer
writer *asyncProcessor
timeDecoder *rtptime.GlobalDecoder2
tcpFrame *base.InterleavedFrame
tcpBuffer []byte

// in
chHandleRequest chan sessionRequestReq
Expand Down Expand Up @@ -978,6 +980,11 @@
sm.start()
}

if *ss.setuppedTransport == TransportTCP {
ss.tcpFrame = &base.InterleavedFrame{}
ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4)
}

switch *ss.setuppedTransport {
case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
Expand Down Expand Up @@ -1067,6 +1074,11 @@
sm.start()
}

if *ss.setuppedTransport == TransportTCP {
ss.tcpFrame = &base.InterleavedFrame{}
ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4)
}

switch *ss.setuppedTransport {
case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
Expand Down Expand Up @@ -1254,7 +1266,15 @@

func (ss *ServerSession) writePacketRTP(medi *description.Media, byts []byte) error {
sm := ss.setuppedMedias[medi]
return sm.writePacketRTP(byts)

ok := sm.ss.writer.push(func() error {
return sm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

Check warning on line 1275 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L1274-L1275

Added lines #L1274 - L1275 were not covered by tests

return nil
}

// WritePacketRTP writes a RTP packet to the session.
Expand All @@ -1271,7 +1291,15 @@

func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error {
sm := ss.setuppedMedias[medi]
return sm.writePacketRTCP(byts)

ok := ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

Check warning on line 1300 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L1299-L1300

Added lines #L1299 - L1300 were not covered by tests

return nil
}

// WritePacketRTCP writes a RTCP packet to the session.
Expand Down
40 changes: 6 additions & 34 deletions server_session_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"

"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
Expand All @@ -23,9 +22,6 @@ type serverSessionMedia struct {
udpRTPWriteAddr *net.UDPAddr
udpRTCPReadPort int
udpRTCPWriteAddr *net.UDPAddr
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte
formats map[uint8]*serverSessionFormat // record only
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
Expand Down Expand Up @@ -87,10 +83,6 @@ func (sm *serverSessionMedia) start() {
sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPRecord
sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPRecord
}

sm.tcpRTPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel}
sm.tcpRTCPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel + 1}
sm.tcpBuffer = make([]byte, sm.ss.s.MaxPacketSize+4)
}
}

Expand Down Expand Up @@ -127,38 +119,18 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error {

func (sm *serverSessionMedia) writePacketRTPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.tcpRTPFrame.Payload = payload
sm.ss.tcpFrame.Channel = sm.tcpChannel
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTPFrame, sm.tcpBuffer)
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
}

func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.tcpRTCPFrame.Payload = payload
sm.ss.tcpFrame.Channel = sm.tcpChannel + 1
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTCPFrame, sm.tcpBuffer)
}

func (sm *serverSessionMedia) writePacketRTP(payload []byte) error {
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

return nil
}

func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error {
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

return nil
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
}

func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool {
Expand Down
5 changes: 2 additions & 3 deletions server_stream_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t

// send unicast
for r := range sf.sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sf.sm.media]
if ok {
err := sm.writePacketRTP(byts)
if _, ok := r.setuppedMedias[sf.sm.media]; ok {
err := r.writePacketRTP(sf.sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
} else {
Expand Down
5 changes: 2 additions & 3 deletions server_stream_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func (sm *serverStreamMedia) close() {
func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
// send unicast
for r := range sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sm.media]
if ok {
err := sm.writePacketRTCP(byts)
if _, ok := r.setuppedMedias[sm.media]; ok {
err := r.writePacketRTCP(sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
}
Expand Down