From 0ccfcb0ec027824746e93b430949faaa12d2b258 Mon Sep 17 00:00:00 2001 From: Alex X Date: Sun, 26 May 2024 00:18:56 +0300 Subject: [PATCH] Fix timestamps for RTMP client --- pkg/rtmp/README.md | 3 +- pkg/rtmp/client.go | 2 +- pkg/rtmp/conn.go | 130 +++++++++++++++++++++++++-------------------- pkg/rtmp/server.go | 2 +- 4 files changed, 75 insertions(+), 62 deletions(-) diff --git a/pkg/rtmp/README.md b/pkg/rtmp/README.md index 11382210e..4196d5704 100644 --- a/pkg/rtmp/README.md +++ b/pkg/rtmp/README.md @@ -16,4 +16,5 @@ response []interface {}{"onStatus", 0, interface {}(nil), map[string]interface { - https://en.wikipedia.org/wiki/Flash_Video - https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol -- https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf \ No newline at end of file +- https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf +- https://rtmp.veriskope.com/docs/spec/ diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index 00544d5bf..aff8e23ca 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -65,7 +65,7 @@ func NewClient(conn net.Conn, u *url.URL) (*Conn, error) { rd: bufio.NewReaderSize(conn, core.BufferSize), wr: conn, - chunks: map[uint8]*header{}, + chunks: map[uint8]*chunk{}, rdPacketSize: 128, wrPacketSize: 4096, // OBS - 4096, Reolink - 4096 diff --git a/pkg/rtmp/conn.go b/pkg/rtmp/conn.go index 2c7c3ddad..2083a148f 100644 --- a/pkg/rtmp/conn.go +++ b/pkg/rtmp/conn.go @@ -29,7 +29,7 @@ type Conn struct { rdPacketSize uint32 wrPacketSize uint32 - chunks map[byte]*header + chunks map[byte]*chunk streamID byte url string @@ -66,11 +66,59 @@ func (c *Conn) readResponse(transID float64) ([]any, error) { } } -type header struct { - timeMS uint32 +type chunk struct { + conn *Conn + rawTime uint32 dataSize uint32 tagType byte streamID uint32 + timeMS uint32 +} + +func (c *chunk) readHeader(typ byte) error { + switch typ { + case 0: // 12 byte header (full header) + b, err := c.conn.readSize(11) + if err != nil { + return err + } + c.rawTime = Uint24(b) + c.dataSize = Uint24(b[3:]) + c.tagType = b[6] + c.streamID = binary.LittleEndian.Uint32(b[7:]) + c.timeMS = c.readExtendedTime() + + case 1: // 8 bytes - like type b00, not including message ID (4 last bytes) + b, err := c.conn.readSize(7) + if err != nil { + return err + } + c.rawTime = Uint24(b) + c.dataSize = Uint24(b[3:]) // msgdatalen + c.tagType = b[6] // msgtypeid + c.timeMS += c.readExtendedTime() + + case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included + b, err := c.conn.readSize(3) + if err != nil { + return err + } + c.rawTime = Uint24(b) // timestamp + c.timeMS += c.readExtendedTime() + + case 3: // 1 byte - only the Basic Header is included + // use here hdr from previous msg with same session ID (sid) + } + return nil +} + +func (c *chunk) readExtendedTime() uint32 { + if c.rawTime == 0xFFFFFF { + if b, err := c.conn.readSize(4); err == nil { + return binary.BigEndian.Uint32(b) + } + } + return c.rawTime } //var ErrNotImplemented = errors.New("rtmp: not implemented") @@ -85,93 +133,57 @@ func (c *Conn) readMessage() (byte, uint32, []byte, error) { chunkID := b[0] & 0b111111 // storing header information for support header type 3 - hdr, ok := c.chunks[chunkID] + ch, ok := c.chunks[chunkID] if !ok { - hdr = &header{} - c.chunks[chunkID] = hdr + ch = &chunk{conn: c} + c.chunks[chunkID] = ch } - switch hdrType { - case 0: // 12 byte header (full header) - if b, err = c.readSize(11); err != nil { - return 0, 0, nil, err - } - _ = b[7] - hdr.timeMS = Uint24(b) - hdr.dataSize = Uint24(b[3:]) - hdr.tagType = b[6] - hdr.streamID = binary.LittleEndian.Uint32(b[7:]) - - case 1: // 8 bytes - like type b00, not including message ID (4 last bytes) - if b, err = c.readSize(7); err != nil { - return 0, 0, nil, err - } - _ = b[6] - hdr.timeMS = Uint24(b) // timestamp - hdr.dataSize = Uint24(b[3:]) // msgdatalen - hdr.tagType = b[6] // msgtypeid - - case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included - if b, err = c.readSize(3); err != nil { - return 0, 0, nil, err - } - hdr.timeMS = Uint24(b) // timestamp - - case 3: // 1 byte - only the Basic Header is included - // use here hdr from previous msg with same session ID (sid) - } - - timeMS := hdr.timeMS - if timeMS == 0xFFFFFF { - if b, err = c.readSize(4); err != nil { - return 0, 0, nil, err - } - timeMS = binary.BigEndian.Uint32(b) + if err = ch.readHeader(hdrType); err != nil { + return 0, 0, nil, err } - //log.Printf("[rtmp] hdr=%d chunkID=%d timeMS=%d size=%d tagType=%d streamID=%d", hdrType, chunkID, hdr.timeMS, hdr.dataSize, hdr.tagType, hdr.streamID) + //log.Printf("[rtmp] hdr=%d chunkID=%d timeMS=%d size=%d tagType=%d streamID=%d", hdrType, chunkID, ch.timeMS, ch.dataSize, ch.tagType, ch.streamID) // 1. Response zero size - if hdr.dataSize == 0 { - return hdr.tagType, timeMS, nil, nil + if ch.dataSize == 0 { + return ch.tagType, ch.timeMS, nil, nil } - b = make([]byte, hdr.dataSize) + data := make([]byte, ch.dataSize) // 2. Response small packet - if hdr.dataSize <= c.rdPacketSize { - if _, err = io.ReadFull(c.rd, b); err != nil { + if ch.dataSize <= c.rdPacketSize { + if _, err = io.ReadFull(c.rd, data); err != nil { return 0, 0, nil, err } - return hdr.tagType, timeMS, b, nil + return ch.tagType, ch.timeMS, data, nil } // 3. Response big packet var i0 uint32 - for i1 := c.rdPacketSize; i1 < hdr.dataSize; i1 += c.rdPacketSize { - if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil { + for i1 := c.rdPacketSize; i1 < ch.dataSize; i1 += c.rdPacketSize { + if _, err = io.ReadFull(c.rd, data[i0:i1]); err != nil { return 0, 0, nil, err } + // hopefully this will be hdrType=3 with same chunkID if _, err = c.readSize(1); err != nil { return 0, 0, nil, err } - if hdr.timeMS == 0xFFFFFF { - if _, err = c.readSize(4); err != nil { - return 0, 0, nil, err - } - } + _ = ch.readExtendedTime() i0 = i1 } - if _, err = io.ReadFull(c.rd, b[i0:]); err != nil { + if _, err = io.ReadFull(c.rd, data[i0:]); err != nil { return 0, 0, nil, err } - return hdr.tagType, timeMS, b, nil + return ch.tagType, ch.timeMS, data, nil } + func (c *Conn) writeMessage(chunkID, tagType byte, timeMS uint32, payload []byte) error { c.mu.Lock() c.resetBuffer() @@ -324,7 +336,7 @@ func (c *Conn) writePlay() error { func (c *Conn) readSize(n uint32) ([]byte, error) { b := make([]byte, n) - if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil { + if _, err := io.ReadFull(c.rd, b); err != nil { return nil, err } return b, nil diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index f5fc96f8a..ed727b986 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -17,7 +17,7 @@ func NewServer(conn net.Conn) (*Conn, error) { rd: bufio.NewReaderSize(conn, core.BufferSize), wr: conn, - chunks: map[uint8]*header{}, + chunks: map[uint8]*chunk{}, rdPacketSize: 128, wrPacketSize: 4096,