From 96504e2fb0c89870a6cd18e08d27af8e1cd1b0e8 Mon Sep 17 00:00:00 2001 From: Alex X Date: Sat, 15 Jun 2024 16:46:03 +0300 Subject: [PATCH] BIG rewrite stream info --- internal/exec/exec.go | 27 ++++- internal/ffmpeg/producer.go | 7 +- internal/hass/api.go | 2 +- internal/hls/hls.go | 11 +- internal/hls/ws.go | 6 +- internal/http/http.go | 29 +++-- internal/mjpeg/init.go | 27 ++--- internal/mp4/mp4.go | 7 +- internal/mp4/ws.go | 10 +- internal/mpegts/aac.go | 4 +- internal/mpegts/mpegts.go | 4 +- internal/rtmp/rtmp.go | 11 +- internal/streams/api.go | 4 +- internal/streams/handlers.go | 2 +- internal/webrtc/client.go | 8 +- internal/webrtc/kinesis.go | 8 +- internal/webrtc/milestone.go | 4 +- internal/webrtc/openipc.go | 4 +- internal/webrtc/server.go | 8 +- internal/webrtc/webrtc.go | 5 +- internal/webtorrent/init.go | 2 +- pkg/README.md | 82 +++++++++++++ pkg/aac/consumer.go | 23 ++-- pkg/aac/producer.go | 26 ++-- pkg/bubble/client.go | 1 + pkg/bubble/producer.go | 15 ++- pkg/core/codec.go | 2 +- pkg/core/connection.go | 139 ++++++++++++++++++++++ pkg/core/core.go | 89 +------------- pkg/core/media.go | 2 +- pkg/core/node.go | 7 +- pkg/core/track.go | 45 ++++++- pkg/dvrip/{consumer.go => backchannel.go} | 15 +-- pkg/dvrip/dvrip.go | 17 ++- pkg/dvrip/producer.go | 6 +- pkg/flv/consumer.go | 25 ++-- pkg/flv/producer.go | 19 +-- pkg/gopro/{gopro.go => producer.go} | 13 +- pkg/hass/client.go | 4 +- pkg/hls/producer.go | 11 +- pkg/homekit/consumer.go | 46 +++---- pkg/homekit/{client.go => producer.go} | 17 +-- pkg/image/producer.go | 92 ++++++++++++++ pkg/isapi/{consumer.go => backchannel.go} | 14 ++- pkg/isapi/client.go | 1 + pkg/ivideon/client.go | 1 + pkg/ivideon/producer.go | 16 ++- pkg/kasa/producer.go | 24 ++-- pkg/magic/bitstream/producer.go | 24 ++-- pkg/magic/keyframe.go | 39 +++--- pkg/magic/mjpeg/producer.go | 21 ++-- pkg/magic/producer.go | 34 +++--- pkg/mjpeg/client.go | 75 ------------ pkg/mjpeg/consumer.go | 37 +++--- pkg/mjpeg/producer.go | 61 ---------- pkg/mp4/consumer.go | 20 ++-- pkg/mp4/keyframe.go | 16 +-- pkg/mpegts/consumer.go | 37 +++--- pkg/mpegts/producer.go | 20 ++-- pkg/{multipart => mpjpeg}/multipart.go | 2 +- pkg/mpjpeg/producer.go | 65 ++++++++++ pkg/multipart/producer.go | 68 ----------- pkg/nest/client.go | 4 +- pkg/probe/{probe.go => producer.go} | 20 ++-- pkg/roborock/client.go | 5 +- pkg/rtmp/client.go | 9 +- pkg/rtmp/flv.go | 15 ++- pkg/rtsp/client.go | 16 ++- pkg/rtsp/conn.go | 20 +--- pkg/rtsp/consumer.go | 17 +-- pkg/rtsp/producer.go | 32 ++--- pkg/rtsp/server.go | 30 +++-- pkg/stdin/{consumer.go => backchannel.go} | 10 +- pkg/stdin/client.go | 1 + pkg/tapo/{consumer.go => backchannel.go} | 0 pkg/tapo/client.go | 1 + pkg/tapo/producer.go | 18 ++- pkg/tcp/helpers.go | 12 -- pkg/wav/{wav.go => producer.go} | 22 ++-- pkg/webrtc/client.go | 2 +- pkg/webrtc/conn.go | 51 +++++--- pkg/webrtc/consumer.go | 23 +--- pkg/webrtc/producer.go | 14 +-- pkg/webrtc/server.go | 4 +- pkg/webtorrent/client.go | 8 +- pkg/y4m/consumer.go | 16 ++- pkg/y4m/producer.go | 55 +++------ pkg/y4m/y4m.go | 29 +++++ 88 files changed, 1042 insertions(+), 853 deletions(-) create mode 100644 pkg/core/connection.go rename pkg/dvrip/{consumer.go => backchannel.go} (78%) rename pkg/gopro/{gopro.go => producer.go} (90%) rename pkg/homekit/{client.go => producer.go} (95%) create mode 100644 pkg/image/producer.go rename pkg/isapi/{consumer.go => backchannel.go} (83%) delete mode 100644 pkg/mjpeg/client.go delete mode 100644 pkg/mjpeg/producer.go rename pkg/{multipart => mpjpeg}/multipart.go (98%) create mode 100644 pkg/mpjpeg/producer.go delete mode 100644 pkg/multipart/producer.go rename pkg/probe/{probe.go => producer.go} (72%) rename pkg/stdin/{consumer.go => backchannel.go} (88%) rename pkg/tapo/{consumer.go => backchannel.go} (100%) delete mode 100644 pkg/tcp/helpers.go rename pkg/wav/{wav.go => producer.go} (89%) diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 6c41fe9eb..ac1691d3c 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "os/exec" + "slices" "strings" "sync" "time" @@ -80,7 +81,7 @@ func execHandle(rawURL string) (core.Producer, error) { return handleRTSP(rawURL, cmd, path) } -func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) { +func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, error) { if query.Get("backchannel") == "1" { return stdin.NewClient(cmd) } @@ -104,12 +105,17 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr) } + if info, ok := prod.(core.Info); ok { + info.SetProtocol("pipe") + setRemoteInfo(info, source, cmd.Args) + } + log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run pipe") return prod, nil } -func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { +func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error) { if log.Trace().Enabled() { cmd.Stdout = os.Stdout } @@ -131,7 +137,7 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { ts := time.Now() if err := cmd.Start(); err != nil { - log.Error().Err(err).Str("url", url).Msg("[exec]") + log.Error().Err(err).Str("source", source).Msg("[exec]") return nil, err } @@ -143,13 +149,14 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { select { case <-time.After(time.Second * 60): _ = cmd.Process.Kill() - log.Error().Str("url", url).Msg("[exec] timeout") + log.Error().Str("source", source).Msg("[exec] timeout") return nil, errors.New("exec: timeout") case <-done: // limit message size return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr) case prod := <-waiter: log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp") + setRemoteInfo(prod, source, cmd.Args) prod.OnClose = func() error { log.Debug().Msgf("[exec] kill rtsp") return errors.Join(cmd.Process.Kill(), cmd.Wait()) @@ -210,3 +217,15 @@ func trimSpace(b []byte) []byte { } return b[start:stop] } + +func setRemoteInfo(info core.Info, source string, args []string) { + info.SetSource(source) + + if i := slices.Index(args, "-i"); i > 0 && i < len(args)-1 { + rawURL := args[i+1] + if u, err := url.Parse(rawURL); err == nil && u.Host != "" { + info.SetRemoteAddr(u.Host) + info.SetURL(rawURL) + } + } +} diff --git a/internal/ffmpeg/producer.go b/internal/ffmpeg/producer.go index 05df69e32..d132d2536 100644 --- a/internal/ffmpeg/producer.go +++ b/internal/ffmpeg/producer.go @@ -13,7 +13,7 @@ import ( ) type Producer struct { - core.SuperProducer + core.Connection url string query url.Values ffmpeg core.Producer @@ -31,7 +31,8 @@ func NewProducer(url string) (core.Producer, error) { return nil, errors.New("ffmpeg: unsupported params: " + url[i:]) } - p.Type = "FFmpeg producer" + p.ID = core.NewID() + p.FormatName = "ffmpeg" p.Medias = []*core.Media{ { // we can support only audio, because don't know FmtpLine for H264 and PayloadType for MJPEG @@ -81,7 +82,7 @@ func (p *Producer) Stop() error { func (p *Producer) MarshalJSON() ([]byte, error) { if p.ffmpeg == nil { - return json.Marshal(p.SuperProducer) + return json.Marshal(p.Connection) } return json.Marshal(p.ffmpeg) } diff --git a/internal/hass/api.go b/internal/hass/api.go index 4628cc118..e3de23b3e 100644 --- a/internal/hass/api.go +++ b/internal/hass/api.go @@ -63,7 +63,7 @@ func apiStream(w http.ResponseWriter, r *http.Request) { return } - s, err = webrtc.ExchangeSDP(stream, string(offer), "WebRTC/Hass sync", r.UserAgent()) + s, err = webrtc.ExchangeSDP(stream, string(offer), "hass/webrtc", r.UserAgent()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/internal/hls/hls.go b/internal/hls/hls.go index 5d3cd918f..5c136450d 100644 --- a/internal/hls/hls.go +++ b/internal/hls/hls.go @@ -12,7 +12,6 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mp4" "github.com/AlexxIT/go2rtc/pkg/mpegts" - "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/rs/zerolog" ) @@ -63,15 +62,13 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { medias := mp4.ParseQuery(r.URL.Query()) if medias != nil { c := mp4.NewConsumer(medias) - c.Type = "HLS/fMP4 consumer" - c.RemoteAddr = tcp.RemoteAddr(r) - c.UserAgent = r.UserAgent() + c.FormatName = "hls/fmp4" + c.WithRequest(r) cons = c } else { c := mpegts.NewConsumer() - c.Type = "HLS/TS consumer" - c.RemoteAddr = tcp.RemoteAddr(r) - c.UserAgent = r.UserAgent() + c.FormatName = "hls/mpegts" + c.WithRequest(r) cons = c } diff --git a/internal/hls/ws.go b/internal/hls/ws.go index ea1f5a3a2..608f515ff 100644 --- a/internal/hls/ws.go +++ b/internal/hls/ws.go @@ -8,7 +8,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/mp4" - "github.com/AlexxIT/go2rtc/pkg/tcp" ) func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error { @@ -20,9 +19,8 @@ func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error { codecs := msg.String() medias := mp4.ParseCodecs(codecs, true) cons := mp4.NewConsumer(medias) - cons.Type = "HLS/fMP4 consumer" - cons.RemoteAddr = tcp.RemoteAddr(tr.Request) - cons.UserAgent = tr.Request.UserAgent() + cons.FormatName = "hls/fmp4" + cons.WithRequest(tr.Request) log.Trace().Msgf("[hls] new ws consumer codecs=%s", codecs) diff --git a/internal/http/http.go b/internal/http/http.go index 8b1903f38..a35439d50 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -11,9 +11,9 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/hls" + "github.com/AlexxIT/go2rtc/pkg/image" "github.com/AlexxIT/go2rtc/pkg/magic" - "github.com/AlexxIT/go2rtc/pkg/mjpeg" - "github.com/AlexxIT/go2rtc/pkg/multipart" + "github.com/AlexxIT/go2rtc/pkg/mpjpeg" "github.com/AlexxIT/go2rtc/pkg/tcp" ) @@ -45,6 +45,21 @@ func handleHTTP(rawURL string) (core.Producer, error) { } } + prod, err := do(req) + if err != nil { + return nil, err + } + + if info, ok := prod.(core.Info); ok { + info.SetProtocol("http") + info.SetRemoteAddr(req.URL.Host) // TODO: rewrite to net.Conn + info.SetURL(rawURL) + } + + return prod, nil +} + +func do(req *http.Request) (core.Producer, error) { res, err := tcp.Do(req) if err != nil { return nil, err @@ -66,14 +81,12 @@ func handleHTTP(rawURL string) (core.Producer, error) { } switch { - case ct == "image/jpeg": - return mjpeg.NewClient(res), nil - - case ct == "multipart/x-mixed-replace": - return multipart.Open(res.Body) - case ct == "application/vnd.apple.mpegurl" || ext == "m3u8": return hls.OpenURL(req.URL, res.Body) + case ct == "image/jpeg": + return image.Open(res) + case ct == "multipart/x-mixed-replace": + return mpjpeg.Open(res.Body) } return magic.Open(res.Body) diff --git a/internal/mjpeg/init.go b/internal/mjpeg/init.go index 0bed95c60..2bb7093a1 100644 --- a/internal/mjpeg/init.go +++ b/internal/mjpeg/init.go @@ -17,7 +17,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/magic" "github.com/AlexxIT/go2rtc/pkg/mjpeg" - "github.com/AlexxIT/go2rtc/pkg/tcp" + "github.com/AlexxIT/go2rtc/pkg/mpjpeg" "github.com/AlexxIT/go2rtc/pkg/y4m" "github.com/rs/zerolog" ) @@ -44,8 +44,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { } cons := magic.NewKeyframe() - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() @@ -100,8 +99,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) { } cons := mjpeg.NewConsumer() - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Msg("[api.mjpeg] add consumer") @@ -117,7 +115,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) { wr := mjpeg.NewWriter(w) _, _ = cons.WriteTo(wr) } else { - cons.Type = "ASCII passive consumer " + cons.FormatName = "ascii" query := r.URL.Query() wr := ascii.NewWriter(w, query.Get("color"), query.Get("back"), query.Get("text")) @@ -135,17 +133,16 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) { return } - res := &http.Response{Body: r.Body, Header: r.Header, Request: r} - res.Header.Set("Content-Type", "multipart/mixed;boundary=") + prod, _ := mpjpeg.Open(r.Body) + prod.WithRequest(r) - client := mjpeg.NewClient(res) - stream.AddProducer(client) + stream.AddProducer(prod) - if err := client.Start(); err != nil && err != io.EOF { + if err := prod.Start(); err != nil && err != io.EOF { log.Warn().Err(err).Caller().Send() } - stream.RemoveProducer(client) + stream.RemoveProducer(prod) } func handlerWS(tr *ws.Transport, _ *ws.Message) error { @@ -155,8 +152,7 @@ func handlerWS(tr *ws.Transport, _ *ws.Message) error { } cons := mjpeg.NewConsumer() - cons.RemoteAddr = tcp.RemoteAddr(tr.Request) - cons.UserAgent = tr.Request.UserAgent() + cons.WithRequest(tr.Request) if err := stream.AddConsumer(cons); err != nil { log.Debug().Err(err).Msg("[mjpeg] add consumer") @@ -183,8 +179,7 @@ func apiStreamY4M(w http.ResponseWriter, r *http.Request) { } cons := y4m.NewConsumer() - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() diff --git a/internal/mp4/mp4.go b/internal/mp4/mp4.go index 2f59ba041..cca5220c7 100644 --- a/internal/mp4/mp4.go +++ b/internal/mp4/mp4.go @@ -13,7 +13,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mp4" - "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/rs/zerolog" ) @@ -100,9 +99,9 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { medias := mp4.ParseQuery(r.URL.Query()) cons := mp4.NewConsumer(medias) - cons.Type = "MP4/HTTP active consumer" - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.FormatName = "mp4" + cons.Protocol = "http" + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() diff --git a/internal/mp4/ws.go b/internal/mp4/ws.go index 060ff5f68..c880fb584 100644 --- a/internal/mp4/ws.go +++ b/internal/mp4/ws.go @@ -8,7 +8,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mp4" - "github.com/AlexxIT/go2rtc/pkg/tcp" ) func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error { @@ -24,9 +23,8 @@ func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error { } cons := mp4.NewConsumer(medias) - cons.Type = "MSE/WebSocket active consumer" - cons.RemoteAddr = tcp.RemoteAddr(tr.Request) - cons.UserAgent = tr.Request.UserAgent() + cons.FormatName = "mse/fmp4" + cons.WithRequest(tr.Request) if err := stream.AddConsumer(cons); err != nil { log.Debug().Err(err).Msg("[mp4] add consumer") @@ -57,9 +55,7 @@ func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error { } cons := mp4.NewKeyframe(medias) - cons.Type = "MP4/WebSocket active consumer" - cons.RemoteAddr = tcp.RemoteAddr(tr.Request) - cons.UserAgent = tr.Request.UserAgent() + cons.WithRequest(tr.Request) if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() diff --git a/internal/mpegts/aac.go b/internal/mpegts/aac.go index 867dc971b..3b1522fe9 100644 --- a/internal/mpegts/aac.go +++ b/internal/mpegts/aac.go @@ -6,7 +6,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/aac" - "github.com/AlexxIT/go2rtc/pkg/tcp" ) func apiStreamAAC(w http.ResponseWriter, r *http.Request) { @@ -18,8 +17,7 @@ func apiStreamAAC(w http.ResponseWriter, r *http.Request) { } cons := aac.NewConsumer() - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/internal/mpegts/mpegts.go b/internal/mpegts/mpegts.go index 6ef00ba12..d5f7752b0 100644 --- a/internal/mpegts/mpegts.go +++ b/internal/mpegts/mpegts.go @@ -6,7 +6,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/mpegts" - "github.com/AlexxIT/go2rtc/pkg/tcp" ) func Init() { @@ -31,8 +30,7 @@ func outputMpegTS(w http.ResponseWriter, r *http.Request) { } cons := mpegts.NewConsumer() - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index 07aa5f716..afc363a97 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -12,7 +12,6 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/flv" "github.com/AlexxIT/go2rtc/pkg/rtmp" - "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/rs/zerolog" ) @@ -128,11 +127,7 @@ func tcpHandle(netConn net.Conn) error { var log zerolog.Logger func streamsHandle(url string) (core.Producer, error) { - client, err := rtmp.DialPlay(url) - if err != nil { - return nil, err - } - return client, nil + return rtmp.DialPlay(url) } func streamsConsumerHandle(url string) (core.Consumer, func(), error) { @@ -165,9 +160,7 @@ func outputFLV(w http.ResponseWriter, r *http.Request) { } cons := flv.NewConsumer() - cons.Type = "HTTP-FLV consumer" - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() diff --git a/internal/streams/api.go b/internal/streams/api.go index 720994253..69d2276a6 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -6,7 +6,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/pkg/probe" - "github.com/AlexxIT/go2rtc/pkg/tcp" ) func apiStreams(w http.ResponseWriter, r *http.Request) { @@ -30,8 +29,7 @@ func apiStreams(w http.ResponseWriter, r *http.Request) { cons := probe.NewProbe(query) if len(cons.Medias) != 0 { - cons.RemoteAddr = tcp.RemoteAddr(r) - cons.UserAgent = r.UserAgent() + cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/internal/streams/handlers.go b/internal/streams/handlers.go index 3009dd661..3240abb57 100644 --- a/internal/streams/handlers.go +++ b/internal/streams/handlers.go @@ -7,7 +7,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" ) -type Handler func(url string) (core.Producer, error) +type Handler func(source string) (core.Producer, error) var handlers = map[string]Handler{} diff --git a/internal/webrtc/client.go b/internal/webrtc/client.go index ae1a455b8..4b8b1b9ad 100644 --- a/internal/webrtc/client.go +++ b/internal/webrtc/client.go @@ -41,7 +41,7 @@ func streamsHandler(rawURL string) (core.Producer, error) { // https://aws.amazon.com/kinesis/video-streams/ // https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/what-is-kvswebrtc.html // https://github.com/orgs/awslabs/repositories?q=kinesis+webrtc - return kinesisClient(rawURL, query, "WebRTC/Kinesis") + return kinesisClient(rawURL, query, "webrtc/kinesis") } else if format == "openipc" { return openIPCClient(rawURL, query) } else { @@ -86,8 +86,9 @@ func go2rtcClient(url string) (core.Producer, error) { var connMu sync.Mutex prod := webrtc.NewConn(pc) - prod.Desc = "WebRTC/WebSocket async" prod.Mode = core.ModeActiveProducer + prod.Protocol = "ws" + prod.URL = url prod.Listen(func(msg any) { switch msg := msg.(type) { case *pion.ICECandidate: @@ -180,8 +181,9 @@ func whepClient(url string) (core.Producer, error) { } prod := webrtc.NewConn(pc) - prod.Desc = "WebRTC/WHEP sync" prod.Mode = core.ModeActiveProducer + prod.Protocol = "http" + prod.URL = url medias := []*core.Media{ {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, diff --git a/internal/webrtc/kinesis.go b/internal/webrtc/kinesis.go index 7ef9d9bb3..2ea1cf7a3 100644 --- a/internal/webrtc/kinesis.go +++ b/internal/webrtc/kinesis.go @@ -34,7 +34,7 @@ func (k kinesisResponse) String() string { return fmt.Sprintf("type=%s, payload=%s", k.Type, k.Payload) } -func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer, error) { +func kinesisClient(rawURL string, query url.Values, format string) (core.Producer, error) { // 1. Connect to signalign server conn, _, err := websocket.DefaultDialer.Dial(rawURL, nil) if err != nil { @@ -79,8 +79,10 @@ func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer, } prod := webrtc.NewConn(pc) - prod.Desc = desc + prod.FormatName = format prod.Mode = core.ModeActiveProducer + prod.Protocol = "ws" + prod.URL = rawURL prod.Listen(func(msg any) { switch msg := msg.(type) { case *pion.ICECandidate: @@ -216,5 +218,5 @@ func wyzeClient(rawURL string) (core.Producer, error) { "ice_servers": []string{string(kvs.Servers)}, } - return kinesisClient(kvs.URL, query, "WebRTC/Wyze") + return kinesisClient(kvs.URL, query, "webrtc/wyze") } diff --git a/internal/webrtc/milestone.go b/internal/webrtc/milestone.go index b4e695c97..6a696cb07 100644 --- a/internal/webrtc/milestone.go +++ b/internal/webrtc/milestone.go @@ -193,8 +193,10 @@ func milestoneClient(rawURL string, query url.Values) (core.Producer, error) { } prod := webrtc.NewConn(pc) - prod.Desc = "WebRTC/Milestone" + prod.FormatName = "webrtc/milestone" prod.Mode = core.ModeActiveProducer + prod.Protocol = "http" + prod.URL = rawURL offer, err := mc.GetOffer() if err != nil { diff --git a/internal/webrtc/openipc.go b/internal/webrtc/openipc.go index 8055ea916..8a951d040 100644 --- a/internal/webrtc/openipc.go +++ b/internal/webrtc/openipc.go @@ -53,8 +53,10 @@ func openIPCClient(rawURL string, query url.Values) (core.Producer, error) { var connState core.Waiter prod := webrtc.NewConn(pc) - prod.Desc = "WebRTC/OpenIPC" + prod.FormatName = "webrtc/openipc" prod.Mode = core.ModeActiveProducer + prod.Protocol = "ws" + prod.URL = rawURL prod.Listen(func(msg any) { switch msg := msg.(type) { case *pion.ICECandidate: diff --git a/internal/webrtc/server.go b/internal/webrtc/server.go index fcb72b856..91a237dbc 100644 --- a/internal/webrtc/server.go +++ b/internal/webrtc/server.go @@ -100,11 +100,11 @@ func outputWebRTC(w http.ResponseWriter, r *http.Request) { switch mediaType { case "application/json": - desc = "WebRTC/JSON sync" + desc = "webrtc/json" case MimeSDP: - desc = "WebRTC/WHEP sync" + desc = "webrtc/whep" default: - desc = "WebRTC/HTTP sync" + desc = "webrtc/post" } answer, err := ExchangeSDP(stream, offer, desc, r.UserAgent()) @@ -168,8 +168,8 @@ func inputWebRTC(w http.ResponseWriter, r *http.Request) { // create new webrtc instance prod := webrtc.NewConn(pc) - prod.Desc = "WebRTC/WHIP sync" prod.Mode = core.ModePassiveProducer + prod.Protocol = "http" prod.UserAgent = r.UserAgent() if err = prod.SetOffer(string(offer)); err != nil { diff --git a/internal/webrtc/webrtc.go b/internal/webrtc/webrtc.go index cabd88b76..8b4943c3f 100644 --- a/internal/webrtc/webrtc.go +++ b/internal/webrtc/webrtc.go @@ -117,8 +117,8 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) error { defer sendAnswer.Done(nil) conn := webrtc.NewConn(pc) - conn.Desc = "WebRTC/WebSocket async" conn.Mode = mode + conn.Protocol = "ws" conn.UserAgent = tr.Request.UserAgent() conn.Listen(func(msg any) { switch msg := msg.(type) { @@ -207,8 +207,9 @@ func ExchangeSDP(stream *streams.Stream, offer, desc, userAgent string) (answer // create new webrtc instance conn := webrtc.NewConn(pc) - conn.Desc = desc + conn.FormatName = desc conn.UserAgent = userAgent + conn.Protocol = "http" conn.Listen(func(msg any) { switch msg := msg.(type) { case pion.PeerConnectionState: diff --git a/internal/webtorrent/init.go b/internal/webtorrent/init.go index 25b7ef9ba..b1c25c767 100644 --- a/internal/webtorrent/init.go +++ b/internal/webtorrent/init.go @@ -47,7 +47,7 @@ func Init() { if stream == nil { return "", errors.New(api.StreamNotFound) } - return webrtc.ExchangeSDP(stream, offer, "WebRTC/WebTorrent sync", "") + return webrtc.ExchangeSDP(stream, offer, "webtorrent", "") }, } diff --git a/pkg/README.md b/pkg/README.md index c875dc35e..b12f0a704 100644 --- a/pkg/README.md +++ b/pkg/README.md @@ -1,3 +1,85 @@ +# Notes + +go2rtc tries to name formats, protocols and codecs the same way they are named in FFmpeg. +Some formats and protocols go2rtc supports exclusively. They have no equivalent in FFmpeg. + +## Producers (input) + +- The initiator of the connection can be go2rtc - **Source protocols** +- The initiator of the connection can be an external program - **Ingress protocols** +- Codecs can be incoming - **Recevers codecs** +- Codecs can be outgoing (two way audio) - **Senders codecs** + +| Format | Source protocols | Ingress protocols | Recevers codecs | Senders codecs | Example | +|--------------|------------------|-------------------|------------------------------|--------------------|---------------| +| adts | http,tcp,pipe | http | aac | | `http:` | +| bubble | http | | h264,hevc,pcm_alaw | | `bubble:` | +| dvrip | tcp | | h264,hevc,pcm_alaw,pcm_mulaw | pcm_alaw | `dvrip:` | +| flv | http,tcp,pipe | http | h264,aac | | `http:` | +| gopro | http+udp | | TODO | | `gopro:` | +| hass/webrtc | ws+udp,tcp | | TODO | | `hass:` | +| hls/mpegts | http | | h264,h265,aac,opus | | `http:` | +| homekit | homekit+udp | | h264,eld* | | `homekit:` | +| isapi | http | | | pcm_alaw,pcm_mulaw | `isapi:` | +| ivideon | ws | | h264 | | `ivideon:` | +| kasa | http | | h264,pcm_mulaw | | `kasa:` | +| h264 | http,tcp,pipe | http | h264 | | `http:` | +| hevc | http,tcp,pipe | http | hevc | | `http:` | +| mjpeg | http,tcp,pipe | http | mjpeg | | `http:` | +| mpjpeg | http,tcp,pipe | http | mjpeg | | `http:` | +| mpegts | http,tcp,pipe | http | h264,hevc,aac,opus | | `http:` | +| nest/webrtc | http+udp | | TODO | | `nest:` | +| roborock | mqtt+udp | | h264,opus | opus | `roborock:` | +| rtmp | rtmp | rtmp | h264,aac | | `rtmp:` | +| rtsp | rtsp+tcp,ws | rtsp+tcp | h264,hevc,aac,pcm*,opus | pcm*,opus | `rtsp:` | +| stdin | pipe | | | pcm_alaw,pcm_mulaw | `stdin:` | +| tapo | http | | h264,pcma | pcm_alaw | `tapo:` | +| wav | http,tcp,pipe | http | pcm_alaw,pcm_mulaw | | `http:` | +| webrtc* | TODO | TODO | h264,pcm_alaw,pcm_mulaw,opus | pcm_alaw,pcm_mulaw | `webrtc:` | +| webtorrent | TODO | TODO | TODO | TODO | `webtorrent:` | +| yuv4mpegpipe | http,tcp,pipe | http | rawvideo | | `http:` | + +- **eld** - rare variant of aac codec +- **pcm** - pcm_alaw pcm_mulaw pcm_s16be pcm_s16le +- **webrtc** - webrtc/kinesis, webrtc/openipc, webrtc/milestone, webrtc/wyze, webrtc/whep + +## Consumers (output) + +| Format | Protocol | Send codecs | Recv codecs | Example | +|--------------|-------------|------------------------------|-------------------------|---------------------------------------| +| adts | http | aac | | `GET /api/stream.adts` | +| ascii | http | mjpeg | | `GET /api/stream.ascii` | +| flv | http | h264,aac | | `GET /api/stream.flv` | +| hls/mpegts | http | h264,hevc,aac | | `GET /api/stream.m3u8` | +| hls/fmp4 | http | h264,hevc,aac,pcm*,opus | | `GET /api/stream.m3u8?mp4` | +| homekit | homekit+udp | h264,opus | | Apple HomeKit app | +| mjpeg | ws | mjpeg | | `{"type":"mjpeg"}` -> `/api/ws` | +| mpjpeg | http | mjpeg | | `GET /api/stream.mjpeg` | +| mp4 | http | h264,hevc,aac,pcm*,opus | | `GET /api/stream.mp4` | +| mse/fmp4 | ws | h264,hevc,aac,pcm*,opus | | `{"type":"mse"}` -> `/api/ws` | +| mpegts | http | h264,hevc,aac | | `GET /api/stream.ts` | +| rtmp | rtmp | h264,aac | | `rtmp://localhost:1935/{stream_name}` | +| rtsp | rtsp+tcp | h264,hevc,aac,pcm*,opus | | `rtsp://localhost:8554/{stream_name}` | +| webrtc | TODO | h264,pcm_alaw,pcm_mulaw,opus | pcm_alaw,pcm_mulaw,opus | `{"type":"webrtc"}` -> `/api/ws` | +| yuv4mpegpipe | http | rawvideo | | `GET /api/stream.y4m` | + +- **pcm** - pcm_alaw pcm_mulaw pcm_s16be pcm_s16le + +## Snapshots + +| Format | Protocol | Send codecs | Example | +|--------|----------|-------------|-----------------------| +| jpeg | http | mjpeg | `GET /api/frame.jpeg` | +| mp4 | http | h264,hevc | `GET /api/frame.mp4` | + +## Developers + +File naming: + +- `pkg/{format}/producer.go` - producer for this format (also if support backchannel) +- `pkg/{format}/consumer.go` - consumer for this format +- `pkg/{format}/backchanel.go` - producer with only backchannel func + ## Useful links - https://www.wowza.com/blog/streaming-protocols diff --git a/pkg/aac/consumer.go b/pkg/aac/consumer.go index e785adc52..fc67d2a40 100644 --- a/pkg/aac/consumer.go +++ b/pkg/aac/consumer.go @@ -8,15 +8,12 @@ import ( ) type Consumer struct { - core.SuperConsumer + core.Connection wr *core.WriteBuffer } func NewConsumer() *Consumer { - cons := &Consumer{ - wr: core.NewWriteBuffer(nil), - } - cons.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindAudio, Direction: core.DirectionSendonly, @@ -25,7 +22,16 @@ func NewConsumer() *Consumer { }, }, } - return cons + wr := core.NewWriteBuffer(nil) + return &Consumer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "adts", + Medias: medias, + Transport: wr, + }, + wr: wr, + } } func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { @@ -51,8 +57,3 @@ func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Re func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { return c.wr.WriteTo(wr) } - -func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() - return c.wr.Close() -} diff --git a/pkg/aac/producer.go b/pkg/aac/producer.go index e9be71fda..efd2d1754 100644 --- a/pkg/aac/producer.go +++ b/pkg/aac/producer.go @@ -10,9 +10,8 @@ import ( ) type Producer struct { - core.SuperProducer + core.Connection rd *bufio.Reader - cl io.Closer } func Open(r io.Reader) (*Producer, error) { @@ -23,18 +22,22 @@ func Open(r io.Reader) (*Producer, error) { return nil, err } - codec := ADTSToCodec(b) - - prod := &Producer{rd: rd, cl: r.(io.Closer)} - prod.Type = "ADTS producer" - prod.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindAudio, Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, + Codecs: []*core.Codec{ADTSToCodec(b)}, }, } - return prod, nil + return &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "adts", + Medias: medias, + Transport: r, + }, + rd: rd, + }, nil } func (c *Producer) Start() error { @@ -66,8 +69,3 @@ func (c *Producer) Start() error { c.Receivers[0].WriteRTP(pkt) } } - -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.cl.Close() -} diff --git a/pkg/bubble/client.go b/pkg/bubble/client.go index c0a797011..5afba779a 100644 --- a/pkg/bubble/client.go +++ b/pkg/bubble/client.go @@ -22,6 +22,7 @@ import ( "github.com/pion/rtp" ) +// Deprecated: should be rewritten to core.Connection type Client struct { core.Listener diff --git a/pkg/bubble/producer.go b/pkg/bubble/producer.go index a7aaa56e3..9fa18f252 100644 --- a/pkg/bubble/producer.go +++ b/pkg/bubble/producer.go @@ -65,11 +65,16 @@ func (c *Client) Stop() error { } func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "Bubble active producer", - Medias: c.medias, - Recv: c.recv, - Receivers: c.receivers, + info := &core.Connection{ + ID: core.ID(c), + FormatName: "bubble", + Protocol: "http", + Medias: c.medias, + Recv: c.recv, + Receivers: c.receivers, + } + if c.conn != nil { + info.RemoteAddr = c.conn.RemoteAddr().String() } return json.Marshal(info) } diff --git a/pkg/core/codec.go b/pkg/core/codec.go index 91f6fddcb..d07b8b74f 100644 --- a/pkg/core/codec.go +++ b/pkg/core/codec.go @@ -46,7 +46,7 @@ func FFmpegCodecName(name string) string { case CodecH264: return "h264" case CodecH265: - return "h265" + return "hevc" case CodecJPEG: return "mjpeg" case CodecRAW: diff --git a/pkg/core/connection.go b/pkg/core/connection.go new file mode 100644 index 000000000..1055c3819 --- /dev/null +++ b/pkg/core/connection.go @@ -0,0 +1,139 @@ +package core + +import ( + "io" + "net/http" + "reflect" + "sync/atomic" +) + +func NewID() uint32 { + return id.Add(1) +} + +// Deprecated: use NewID instead +func ID(v any) uint32 { + p := uintptr(reflect.ValueOf(v).UnsafePointer()) + return 0x8000_0000 | uint32(p) +} + +var id atomic.Uint32 + +type Info interface { + SetProtocol(string) + SetRemoteAddr(string) + SetSource(string) + SetURL(string) + WithRequest(*http.Request) +} + +// Connection just like webrtc.PeerConnection +// - ID and RemoteAddr used for building Connection(s) graph +// - FormatName, Protocol, RemoteAddr, Source, URL, SDP, UserAgent used for info about Connection +// - FormatName and Protocol has FFmpeg compatible names +// - Transport used for auto closing on Stop +type Connection struct { + ID uint32 `json:"id,omitempty"` + FormatName string `json:"format_name,omitempty"` // rtsp, webrtc, mp4, mjpeg, mpjpeg... + Protocol string `json:"protocol,omitempty"` // tcp, udp, http, ws, pipe... + RemoteAddr string `json:"remote_addr,omitempty"` // host:port other info + Source string `json:"source,omitempty"` + URL string `json:"url,omitempty"` + SDP string `json:"sdp,omitempty"` + UserAgent string `json:"user_agent,omitempty"` + + Medias []*Media `json:"medias,omitempty"` + Receivers []*Receiver `json:"receivers,omitempty"` + Senders []*Sender `json:"senders,omitempty"` + Recv int `json:"bytes_recv,omitempty"` + Send int `json:"bytes_send,omitempty"` + + Transport any `json:"-"` +} + +func (c *Connection) GetMedias() []*Media { + return c.Medias +} + +func (c *Connection) GetTrack(media *Media, codec *Codec) (*Receiver, error) { + for _, receiver := range c.Receivers { + if receiver.Codec == codec { + return receiver, nil + } + } + receiver := NewReceiver(media, codec) + c.Receivers = append(c.Receivers, receiver) + return receiver, nil +} + +func (c *Connection) Stop() error { + for _, receiver := range c.Receivers { + receiver.Close() + } + for _, sender := range c.Senders { + sender.Close() + } + if closer, ok := c.Transport.(io.Closer); ok { + return closer.Close() + } + return nil +} + +// Deprecated: +func (c *Connection) Codecs() []*Codec { + codecs := make([]*Codec, len(c.Senders)) + for i, sender := range c.Senders { + codecs[i] = sender.Codec + } + return codecs +} + +func (c *Connection) SetProtocol(s string) { + c.Protocol = s +} + +func (c *Connection) SetRemoteAddr(s string) { + if c.RemoteAddr == "" { + c.RemoteAddr = s + } else { + c.RemoteAddr += " forward " + c.RemoteAddr + } +} + +func (c *Connection) SetSource(s string) { + c.Source = s +} + +func (c *Connection) SetURL(s string) { + c.URL = s +} + +func (c *Connection) WithRequest(r *http.Request) { + if r.Header.Get("Upgrade") == "websocket" { + c.Protocol = "ws" + } else { + c.Protocol = "http" + } + + c.RemoteAddr = r.RemoteAddr + if remote := r.Header.Get("X-Forwarded-For"); remote != "" { + c.RemoteAddr += " forwarded " + remote + } + + c.UserAgent = r.UserAgent() +} + +// Create like os.Create, init Consumer with existing Transport +func Create(w io.Writer) (*Connection, error) { + return &Connection{Transport: w}, nil +} + +// Open like os.Open, init Producer from existing Transport +func Open(r io.Reader) (*Connection, error) { + return &Connection{Transport: r}, nil +} + +// Dial like net.Dial, init Producer via Dialing +func Dial(rawURL string) (*Connection, error) { + return &Connection{}, nil +} diff --git a/pkg/core/core.go b/pkg/core/core.go index bc855ccc2..9555ecfa0 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -1,5 +1,7 @@ package core +import "encoding/json" + const ( DirectionRecvonly = "recvonly" DirectionSendonly = "sendonly" @@ -90,89 +92,6 @@ func (m Mode) String() string { return "unknown" } -type Info struct { - Type string `json:"type,omitempty"` - URL string `json:"url,omitempty"` - RemoteAddr string `json:"remote_addr,omitempty"` - UserAgent string `json:"user_agent,omitempty"` - SDP string `json:"sdp,omitempty"` - Medias []*Media `json:"medias,omitempty"` - Receivers []*Receiver `json:"receivers,omitempty"` - Senders []*Sender `json:"senders,omitempty"` - Recv int `json:"recv,omitempty"` - Send int `json:"send,omitempty"` -} - -const ( - UnsupportedCodec = "unsupported codec" - WrongMediaDirection = "wrong media direction" -) - -type SuperProducer struct { - Type string `json:"type,omitempty"` - URL string `json:"url,omitempty"` - SDP string `json:"sdp,omitempty"` - Medias []*Media `json:"medias,omitempty"` - Receivers []*Receiver `json:"receivers,omitempty"` - Recv int `json:"recv,omitempty"` -} - -func (s *SuperProducer) GetMedias() []*Media { - return s.Medias -} - -func (s *SuperProducer) GetTrack(media *Media, codec *Codec) (*Receiver, error) { - for _, receiver := range s.Receivers { - if receiver.Codec == codec { - return receiver, nil - } - } - receiver := NewReceiver(media, codec) - s.Receivers = append(s.Receivers, receiver) - return receiver, nil -} - -func (s *SuperProducer) Close() error { - for _, receiver := range s.Receivers { - receiver.Close() - } - return nil -} - -type SuperConsumer struct { - Type string `json:"type,omitempty"` - URL string `json:"url,omitempty"` - RemoteAddr string `json:"remote_addr,omitempty"` - UserAgent string `json:"user_agent,omitempty"` - SDP string `json:"sdp,omitempty"` - Medias []*Media `json:"medias,omitempty"` - Senders []*Sender `json:"senders,omitempty"` - Send int `json:"send,omitempty"` -} - -func (s *SuperConsumer) GetMedias() []*Media { - return s.Medias -} - -func (s *SuperConsumer) AddTrack(media *Media, codec *Codec, track *Receiver) error { - return nil -} - -//func (b *SuperConsumer) WriteTo(w io.Writer) (n int64, err error) { -// return 0, nil -//} - -func (s *SuperConsumer) Close() error { - for _, sender := range s.Senders { - sender.Close() - } - return nil -} - -func (s *SuperConsumer) Codecs() []*Codec { - codecs := make([]*Codec, len(s.Senders)) - for i, sender := range s.Senders { - codecs[i] = sender.Codec - } - return codecs +func (m Mode) MarshalJSON() ([]byte, error) { + return json.Marshal(m.String()) } diff --git a/pkg/core/media.go b/pkg/core/media.go index ef9ef74b6..2284d0cdd 100644 --- a/pkg/core/media.go +++ b/pkg/core/media.go @@ -92,7 +92,7 @@ func (m *Media) Equal(media *Media) bool { func GetKind(name string) string { switch name { - case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG: + case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG, CodecRAW: return KindVideo case CodecPCMU, CodecPCMA, CodecAAC, CodecOpus, CodecG722, CodecMP3, CodecPCM, CodecPCML, CodecELD, CodecFLAC: return KindAudio diff --git a/pkg/core/node.go b/pkg/core/node.go index fd58f2d72..a9959c3de 100644 --- a/pkg/core/node.go +++ b/pkg/core/node.go @@ -23,10 +23,11 @@ type Filter func(handler HandlerFunc) HandlerFunc // Node - Receiver or Sender or Filter (transform) type Node struct { - Codec *Codec `json:"codec"` - Input HandlerFunc `json:"-"` - Output HandlerFunc `json:"-"` + Codec *Codec + Input HandlerFunc + Output HandlerFunc + id uint32 childs []*Node parent *Node diff --git a/pkg/core/track.go b/pkg/core/track.go index 83c39e011..8bc653749 100644 --- a/pkg/core/track.go +++ b/pkg/core/track.go @@ -1,6 +1,7 @@ package core import ( + "encoding/json" "errors" "github.com/pion/rtp" @@ -22,7 +23,7 @@ type Receiver struct { func NewReceiver(media *Media, codec *Codec) *Receiver { r := &Receiver{ - Node: Node{Codec: codec}, + Node: Node{id: NewID(), Codec: codec}, Media: media, } r.Input = func(packet *Packet) { @@ -91,7 +92,7 @@ func NewSender(media *Media, codec *Codec) *Sender { buf := make(chan *Packet, bufSize) s := &Sender{ - Node: Node{Codec: codec}, + Node: Node{id: NewID(), Codec: codec}, Media: media, buf: buf, } @@ -171,3 +172,43 @@ func (s *Sender) Close() { s.Node.Close() } + +func (r *Receiver) MarshalJSON() ([]byte, error) { + v := struct { + ID uint32 `json:"id"` + Codec *Codec `json:"codec"` + Childs []uint32 `json:"childs,omitempty"` + Bytes int `json:"bytes,omitempty"` + Packets int `json:"packets,omitempty"` + }{ + ID: r.Node.id, + Codec: r.Node.Codec, + Bytes: r.Bytes, + Packets: r.Packets, + } + for _, child := range r.childs { + v.Childs = append(v.Childs, child.id) + } + return json.Marshal(v) +} + +func (s *Sender) MarshalJSON() ([]byte, error) { + v := struct { + ID uint32 `json:"id"` + Codec *Codec `json:"codec"` + Parent uint32 `json:"parent,omitempty"` + Bytes int `json:"bytes,omitempty"` + Packets int `json:"packets,omitempty"` + Drops int `json:"drops,omitempty"` + }{ + ID: s.Node.id, + Codec: s.Node.Codec, + Bytes: s.Bytes, + Packets: s.Packets, + Drops: s.Drops, + } + if s.parent != nil { + v.Parent = s.parent.id + } + return json.Marshal(v) +} diff --git a/pkg/dvrip/consumer.go b/pkg/dvrip/backchannel.go similarity index 78% rename from pkg/dvrip/consumer.go rename to pkg/dvrip/backchannel.go index 7652c079d..0424e9650 100644 --- a/pkg/dvrip/consumer.go +++ b/pkg/dvrip/backchannel.go @@ -8,16 +8,16 @@ import ( "github.com/pion/rtp" ) -type Consumer struct { - core.SuperConsumer +type Backchannel struct { + core.Connection client *Client } -func (c *Consumer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { +func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { return nil, core.ErrCantGetTrack } -func (c *Consumer) Start() error { +func (c *Backchannel) Start() error { if err := c.client.conn.SetReadDeadline(time.Time{}); err != nil { return err } @@ -30,12 +30,7 @@ func (c *Consumer) Start() error { } } -func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() - return c.client.Close() -} - -func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { +func (c *Backchannel) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { if err := c.client.Talk(); err != nil { return err } diff --git a/pkg/dvrip/dvrip.go b/pkg/dvrip/dvrip.go index 0f914640f..c4980a80d 100644 --- a/pkg/dvrip/dvrip.go +++ b/pkg/dvrip/dvrip.go @@ -8,17 +8,22 @@ func Dial(url string) (core.Producer, error) { return nil, err } + conn := core.Connection{ + ID: core.NewID(), + FormatName: "dvrip", + Protocol: "tcp", + RemoteAddr: client.conn.RemoteAddr().String(), + Transport: client.conn, + } + if client.stream != "" { - prod := &Producer{client: client} - prod.Type = "DVRIP active producer" + prod := &Producer{Connection: conn, client: client} if err := prod.probe(); err != nil { return nil, err } return prod, nil } else { - cons := &Consumer{client: client} - cons.Type = "DVRIP active consumer" - cons.Medias = []*core.Media{ + conn.Medias = []*core.Media{ { Kind: core.KindAudio, Direction: core.DirectionSendonly, @@ -29,6 +34,6 @@ func Dial(url string) (core.Producer, error) { }, }, } - return cons, nil + return &Backchannel{Connection: conn, client: client}, nil } } diff --git a/pkg/dvrip/producer.go b/pkg/dvrip/producer.go index 412dd0a39..c87017b4d 100644 --- a/pkg/dvrip/producer.go +++ b/pkg/dvrip/producer.go @@ -15,7 +15,7 @@ import ( ) type Producer struct { - core.SuperProducer + core.Connection client *Client @@ -92,10 +92,6 @@ func (c *Producer) Start() error { } } -func (c *Producer) Stop() error { - return c.client.Close() -} - func (c *Producer) probe() error { if err := c.client.Play(); err != nil { return err diff --git a/pkg/flv/consumer.go b/pkg/flv/consumer.go index 59e65d9c9..fe966bfc2 100644 --- a/pkg/flv/consumer.go +++ b/pkg/flv/consumer.go @@ -10,17 +10,13 @@ import ( ) type Consumer struct { - core.SuperConsumer + core.Connection wr *core.WriteBuffer muxer *Muxer } func NewConsumer() *Consumer { - c := &Consumer{ - wr: core.NewWriteBuffer(nil), - muxer: &Muxer{}, - } - c.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindVideo, Direction: core.DirectionSendonly, @@ -36,7 +32,17 @@ func NewConsumer() *Consumer { }, }, } - return c + wr := core.NewWriteBuffer(nil) + return &Consumer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "flv", + Medias: medias, + Transport: wr, + }, + wr: wr, + muxer: &Muxer{}, + } } func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { @@ -86,8 +92,3 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { } return c.wr.WriteTo(wr) } - -func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() - return c.wr.Close() -} diff --git a/pkg/flv/producer.go b/pkg/flv/producer.go index 3972e6669..667552173 100644 --- a/pkg/flv/producer.go +++ b/pkg/flv/producer.go @@ -15,18 +15,24 @@ import ( ) type Producer struct { - core.SuperProducer + core.Connection rd *core.ReadBuffer video, audio *core.Receiver } func Open(rd io.Reader) (*Producer, error) { - prod := &Producer{rd: core.NewReadBuffer(rd)} + prod := &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "flv", + Transport: rd, + }, + rd: core.NewReadBuffer(rd), + } if err := prod.probe(); err != nil { return nil, err } - prod.Type = "FLV producer" return prod, nil } @@ -57,7 +63,7 @@ const ( ) func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver, _ := c.SuperProducer.GetTrack(media, codec) + receiver, _ := c.Connection.GetTrack(media, codec) if media.Kind == core.KindVideo { c.video = receiver } else { @@ -117,11 +123,6 @@ func (c *Producer) Start() error { } } -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.rd.Close() -} - func (c *Producer) probe() error { if err := c.readHeader(); err != nil { return err diff --git a/pkg/gopro/gopro.go b/pkg/gopro/producer.go similarity index 90% rename from pkg/gopro/gopro.go rename to pkg/gopro/producer.go index 2d6a098b6..1873159fa 100644 --- a/pkg/gopro/gopro.go +++ b/pkg/gopro/producer.go @@ -8,11 +8,10 @@ import ( "net/url" "time" - "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mpegts" ) -func Dial(rawURL string) (core.Producer, error) { +func Dial(rawURL string) (*mpegts.Producer, error) { u, err := url.Parse(rawURL) if err != nil { return nil, err @@ -32,7 +31,15 @@ func Dial(rawURL string) (core.Producer, error) { return nil, err } - return mpegts.Open(r) + prod, err := mpegts.Open(r) + if err != nil { + return nil, err + } + + prod.FormatName = "gopro" + prod.RemoteAddr = u.Host + + return prod, nil } type listener struct { diff --git a/pkg/hass/client.go b/pkg/hass/client.go index c1ed5b4b0..5b2360515 100644 --- a/pkg/hass/client.go +++ b/pkg/hass/client.go @@ -61,8 +61,10 @@ func NewClient(rawURL string) (*Client, error) { } conn := webrtc.NewConn(pc) - conn.Desc = "Hass" + conn.FormatName = "hass/webrtc" conn.Mode = core.ModeActiveProducer + conn.Protocol = "ws" + conn.URL = rawURL // https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields medias := []*core.Media{ diff --git a/pkg/hls/producer.go b/pkg/hls/producer.go index 410e771ad..e1c3ed431 100644 --- a/pkg/hls/producer.go +++ b/pkg/hls/producer.go @@ -4,14 +4,19 @@ import ( "io" "net/url" - "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mpegts" ) -func OpenURL(u *url.URL, body io.ReadCloser) (core.Producer, error) { +func OpenURL(u *url.URL, body io.ReadCloser) (*mpegts.Producer, error) { rd, err := NewReader(u, body) if err != nil { return nil, err } - return mpegts.Open(rd) + prod, err := mpegts.Open(rd) + if err != nil { + return nil, err + } + prod.FormatName = "hls/mpegts" + prod.RemoteAddr = u.Host + return prod, nil } diff --git a/pkg/homekit/consumer.go b/pkg/homekit/consumer.go index 05ea24274..1c6652332 100644 --- a/pkg/homekit/consumer.go +++ b/pkg/homekit/consumer.go @@ -16,7 +16,7 @@ import ( ) type Consumer struct { - core.SuperConsumer + core.Connection conn net.Conn srtp *srtp.Server @@ -29,28 +29,31 @@ type Consumer struct { } func NewConsumer(conn net.Conn, server *srtp.Server) *Consumer { + medias := []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecH264}, + }, + }, + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecOpus}, + }, + }, + } return &Consumer{ - SuperConsumer: core.SuperConsumer{ - Type: "HomeKit passive consumer", + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "homekit", + Protocol: "udp", RemoteAddr: conn.RemoteAddr().String(), - Medias: []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecH264}, - }, - }, - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecOpus}, - }, - }, - }, + Medias: medias, + Transport: conn, }, - conn: conn, srtp: server, } @@ -175,11 +178,10 @@ func (c *Consumer) WriteTo(io.Writer) (int64, error) { } func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() if c.deadline != nil { c.deadline.Reset(0) } - return c.SuperConsumer.Close() + return c.Connection.Stop() } func (c *Consumer) srtpEndpoint() *srtp.Endpoint { diff --git a/pkg/homekit/client.go b/pkg/homekit/producer.go similarity index 95% rename from pkg/homekit/client.go rename to pkg/homekit/producer.go index 133499d33..c2781e277 100644 --- a/pkg/homekit/client.go +++ b/pkg/homekit/producer.go @@ -15,8 +15,9 @@ import ( "github.com/pion/rtp" ) +// Deprecated: rename to Producer type Client struct { - core.SuperProducer + core.Connection hap *hap.Client srtp *srtp.Server @@ -52,9 +53,12 @@ func Dial(rawURL string, server *srtp.Server) (*Client, error) { } client := &Client{ - SuperProducer: core.SuperProducer{ - Type: "HomeKit active producer", - URL: conn.URL(), + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "homekit", + Protocol: "udp", + Source: conn.URL(), + Transport: conn, }, hap: conn, srtp: server, @@ -93,7 +97,6 @@ func (c *Client) GetMedias() []*core.Media { return nil } - c.URL = c.hap.URL() c.SDP = fmt.Sprintf("%+v\n%+v", c.videoConfig, c.audioConfig) c.Medias = []*core.Media{ @@ -175,8 +178,6 @@ func (c *Client) Start() error { } func (c *Client) Stop() error { - _ = c.SuperProducer.Close() - if c.videoSession != nil && c.videoSession.Remote != nil { c.srtp.DelSession(c.videoSession) } @@ -184,7 +185,7 @@ func (c *Client) Stop() error { c.srtp.DelSession(c.audioSession) } - return c.hap.Close() + return c.Connection.Stop() } func (c *Client) trackByKind(kind string) *core.Receiver { diff --git a/pkg/image/producer.go b/pkg/image/producer.go new file mode 100644 index 000000000..2081c048f --- /dev/null +++ b/pkg/image/producer.go @@ -0,0 +1,92 @@ +package image + +import ( + "errors" + "io" + "net/http" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "github.com/pion/rtp" +) + +type Producer struct { + core.Connection + + closed bool + res *http.Response +} + +func Open(res *http.Response) (*Producer, error) { + return &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "image", + Protocol: "http", + RemoteAddr: res.Request.URL.Host, + Transport: res.Body, + Medias: []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + }, + }, + }, + res: res, + }, nil +} + +func (c *Producer) Start() error { + body, err := io.ReadAll(c.res.Body) + if err != nil { + return err + } + + pkt := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: body, + } + c.Receivers[0].WriteRTP(pkt) + + c.Recv += len(body) + + req := c.res.Request + + for !c.closed { + res, err := tcp.Do(req) + if err != nil { + return err + } + + if res.StatusCode != http.StatusOK { + return errors.New("wrong status: " + res.Status) + } + + body, err = io.ReadAll(res.Body) + if err != nil { + return err + } + + c.Recv += len(body) + + pkt = &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: body, + } + c.Receivers[0].WriteRTP(pkt) + } + + return nil +} + +func (c *Producer) Stop() error { + c.closed = true + return c.Connection.Stop() +} diff --git a/pkg/isapi/consumer.go b/pkg/isapi/backchannel.go similarity index 83% rename from pkg/isapi/consumer.go rename to pkg/isapi/backchannel.go index c7b51c9d0..ade16255b 100644 --- a/pkg/isapi/consumer.go +++ b/pkg/isapi/backchannel.go @@ -2,6 +2,7 @@ package isapi import ( "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" ) @@ -51,10 +52,15 @@ func (c *Client) Stop() (err error) { } func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "ISAPI active consumer", - Medias: c.medias, - Send: c.send, + info := &core.Connection{ + ID: core.ID(c), + FormatName: "isapi", + Protocol: "http", + Medias: c.medias, + Send: c.send, + } + if c.conn != nil { + info.RemoteAddr = c.conn.RemoteAddr().String() } if c.sender != nil { info.Senders = []*core.Sender{c.sender} diff --git a/pkg/isapi/client.go b/pkg/isapi/client.go index 83dd90269..ba3e68874 100644 --- a/pkg/isapi/client.go +++ b/pkg/isapi/client.go @@ -11,6 +11,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/tcp" ) +// Deprecated: should be rewritten to core.Connection type Client struct { core.Listener diff --git a/pkg/ivideon/client.go b/pkg/ivideon/client.go index 7cbf0b386..ef79010ec 100644 --- a/pkg/ivideon/client.go +++ b/pkg/ivideon/client.go @@ -26,6 +26,7 @@ const ( StateHandle ) +// Deprecated: should be rewritten to core.Connection type Client struct { core.Listener diff --git a/pkg/ivideon/producer.go b/pkg/ivideon/producer.go index d0a8fcbaf..780841233 100644 --- a/pkg/ivideon/producer.go +++ b/pkg/ivideon/producer.go @@ -2,6 +2,7 @@ package ivideon import ( "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/core" ) @@ -32,11 +33,16 @@ func (c *Client) Stop() error { } func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "Ivideon active producer", - URL: c.ID, - Medias: c.medias, - Recv: c.recv, + info := &core.Connection{ + ID: core.ID(c), + FormatName: "ivideon", + Protocol: "ws", + URL: c.ID, + Medias: c.medias, + Recv: c.recv, + } + if c.conn != nil { + info.RemoteAddr = c.conn.RemoteAddr().String() } if c.receiver != nil { info.Receivers = []*core.Receiver{c.receiver} diff --git a/pkg/kasa/producer.go b/pkg/kasa/producer.go index d138cb686..22d102163 100644 --- a/pkg/kasa/producer.go +++ b/pkg/kasa/producer.go @@ -12,13 +12,13 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264/annexb" - "github.com/AlexxIT/go2rtc/pkg/multipart" + "github.com/AlexxIT/go2rtc/pkg/mpjpeg" "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/pion/rtp" ) type Producer struct { - core.SuperProducer + core.Connection rd *core.ReadBuffer reader *bufio.Reader @@ -65,11 +65,18 @@ func Dial(url string) (*Producer, error) { rd.Reader = httputil.NewChunkedReader(buf) } - prod := &Producer{rd: core.NewReadBuffer(rd)} + prod := &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "kasa", + Protocol: "http", + Transport: rd, + }, + rd: core.NewReadBuffer(rd), + } if err = prod.probe(); err != nil { return nil, err } - prod.Type = "Kasa producer" return prod, nil } @@ -90,7 +97,7 @@ func (c *Producer) Start() error { } for { - header, body, err := multipart.Next(c.reader) + header, body, err := mpjpeg.Next(c.reader) if err != nil { return err } @@ -128,11 +135,6 @@ func (c *Producer) Start() error { } } -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.rd.Close() -} - const ( MimeVideo = "video/x-h264" MimeG711U = "audio/g711u" @@ -151,7 +153,7 @@ func (c *Producer) probe() error { timeout := time.Now().Add(core.ProbeTimeout) for (waitVideo || waitAudio) && time.Now().Before(timeout) { - header, body, err := multipart.Next(c.reader) + header, body, err := mpjpeg.Next(c.reader) if err != nil { return err } diff --git a/pkg/magic/bitstream/producer.go b/pkg/magic/bitstream/producer.go index 2ffa964e8..b84f049b3 100644 --- a/pkg/magic/bitstream/producer.go +++ b/pkg/magic/bitstream/producer.go @@ -13,7 +13,7 @@ import ( ) type Producer struct { - core.SuperProducer + core.Connection rd *core.ReadBuffer } @@ -28,26 +28,35 @@ func Open(r io.Reader) (*Producer, error) { buf = annexb.EncodeToAVCC(buf, false) // won't break original buffer var codec *core.Codec + var format string switch { case h264.NALUType(buf) == h264.NALUTypeSPS: codec = h264.AVCCToCodec(buf) + format = "h264" case h265.NALUType(buf) == h265.NALUTypeVPS: codec = h265.AVCCToCodec(buf) + format = "hevc" default: return nil, errors.New("bitstream: unsupported header: " + hex.EncodeToString(buf[:8])) } - prod := &Producer{rd: rd} - prod.Type = "Bitstream producer" - prod.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindVideo, Direction: core.DirectionRecvonly, Codecs: []*core.Codec{codec}, }, } - return prod, nil + return &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: format, + Medias: medias, + Transport: r, + }, + rd: rd, + }, nil } func (c *Producer) Start() error { @@ -84,8 +93,3 @@ func (c *Producer) Start() error { } } } - -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.rd.Close() -} diff --git a/pkg/magic/keyframe.go b/pkg/magic/keyframe.go index d2ae80bd1..8f70eec6e 100644 --- a/pkg/magic/keyframe.go +++ b/pkg/magic/keyframe.go @@ -12,26 +12,32 @@ import ( ) type Keyframe struct { - core.SuperConsumer + core.Connection wr *core.WriteBuffer } +// Deprecated: should be rewritten func NewKeyframe() *Keyframe { - return &Keyframe{ - core.SuperConsumer{ - Medias: []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecJPEG}, - {Name: core.CodecH264}, - {Name: core.CodecH265}, - }, - }, + medias := []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecJPEG}, + {Name: core.CodecH264}, + {Name: core.CodecH265}, }, }, - core.NewWriteBuffer(nil), + } + wr := core.NewWriteBuffer(nil) + return &Keyframe{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "keyframe", + Medias: medias, + Transport: wr, + }, + wr: wr, } } @@ -98,8 +104,3 @@ func (k *Keyframe) CodecName() string { func (k *Keyframe) WriteTo(wr io.Writer) (int64, error) { return k.wr.WriteTo(wr) } - -func (k *Keyframe) Stop() error { - _ = k.SuperConsumer.Close() - return k.wr.Close() -} diff --git a/pkg/magic/mjpeg/producer.go b/pkg/magic/mjpeg/producer.go index e5627fd72..e47c168d7 100644 --- a/pkg/magic/mjpeg/producer.go +++ b/pkg/magic/mjpeg/producer.go @@ -9,14 +9,12 @@ import ( ) type Producer struct { - core.SuperProducer + core.Connection rd *core.ReadBuffer } func Open(rd io.Reader) (*Producer, error) { - prod := &Producer{rd: core.NewReadBuffer(rd)} - prod.Type = "MJPEG producer" - prod.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindVideo, Direction: core.DirectionRecvonly, @@ -29,7 +27,15 @@ func Open(rd io.Reader) (*Producer, error) { }, }, } - return prod, nil + return &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "mjpeg", + Medias: medias, + Transport: rd, + }, + rd: core.NewReadBuffer(rd), + }, nil } func (c *Producer) Start() error { @@ -70,8 +76,3 @@ func (c *Producer) Start() error { buf = buf[i:] } } - -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.rd.Close() -} diff --git a/pkg/magic/producer.go b/pkg/magic/producer.go index 9bde508d0..3742ccf95 100644 --- a/pkg/magic/producer.go +++ b/pkg/magic/producer.go @@ -13,7 +13,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/magic/bitstream" "github.com/AlexxIT/go2rtc/pkg/magic/mjpeg" "github.com/AlexxIT/go2rtc/pkg/mpegts" - "github.com/AlexxIT/go2rtc/pkg/multipart" + "github.com/AlexxIT/go2rtc/pkg/mpjpeg" "github.com/AlexxIT/go2rtc/pkg/wav" "github.com/AlexxIT/go2rtc/pkg/y4m" ) @@ -26,29 +26,31 @@ func Open(r io.Reader) (core.Producer, error) { return nil, err } - switch { - case string(b) == annexb.StartCode: + switch string(b) { + case annexb.StartCode: return bitstream.Open(rd) - - case string(b) == wav.FourCC: + case wav.FourCC: return wav.Open(rd) - - case string(b) == y4m.FourCC: + case y4m.FourCC: return y4m.Open(rd) + } - case bytes.HasPrefix(b, []byte{0xFF, 0xD8}): - return mjpeg.Open(rd) - - case bytes.HasPrefix(b, []byte(flv.Signature)): + switch string(b[:3]) { + case flv.Signature: return flv.Open(rd) + } - case bytes.HasPrefix(b, []byte("--")): - return multipart.Open(rd) - - case b[0] == 0xFF && (b[1] == 0xF1 || b[1] == 0xF9): + switch string(b[:2]) { + case "\xFF\xD8": + return mjpeg.Open(rd) + case "\xFF\xF1", "\xFF\xF9": return aac.Open(rd) + case "--": + return mpjpeg.Open(rd) + } - case b[0] == mpegts.SyncByte: + switch b[0] { + case mpegts.SyncByte: return mpegts.Open(rd) } diff --git a/pkg/mjpeg/client.go b/pkg/mjpeg/client.go deleted file mode 100644 index f16c42cd5..000000000 --- a/pkg/mjpeg/client.go +++ /dev/null @@ -1,75 +0,0 @@ -package mjpeg - -import ( - "errors" - "io" - "net/http" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/tcp" - "github.com/pion/rtp" -) - -type Client struct { - core.Listener - - UserAgent string - RemoteAddr string - - closed bool - res *http.Response - - medias []*core.Media - receiver *core.Receiver - - recv int -} - -func NewClient(res *http.Response) *Client { - return &Client{res: res} -} - -func (c *Client) Handle() error { - body, err := io.ReadAll(c.res.Body) - if err != nil { - return err - } - - pkt := &rtp.Packet{ - Header: rtp.Header{Timestamp: core.Now90000()}, - Payload: body, - } - c.receiver.WriteRTP(pkt) - - c.recv += len(body) - - req := c.res.Request - - for !c.closed { - res, err := tcp.Do(req) - if err != nil { - return err - } - - if res.StatusCode != http.StatusOK { - return errors.New("wrong status: " + res.Status) - } - - body, err = io.ReadAll(res.Body) - if err != nil { - return err - } - - c.recv += len(body) - - if c.receiver != nil { - pkt = &rtp.Packet{ - Header: rtp.Header{Timestamp: core.Now90000()}, - Payload: body, - } - c.receiver.WriteRTP(pkt) - } - } - - return nil -} diff --git a/pkg/mjpeg/consumer.go b/pkg/mjpeg/consumer.go index d5fb0d51b..16edc8950 100644 --- a/pkg/mjpeg/consumer.go +++ b/pkg/mjpeg/consumer.go @@ -8,26 +8,30 @@ import ( ) type Consumer struct { - core.SuperConsumer + core.Connection wr *core.WriteBuffer } func NewConsumer() *Consumer { - return &Consumer{ - core.SuperConsumer{ - Type: "MJPEG passive consumer", - Medias: []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecJPEG}, - {Name: core.CodecRAW}, - }, - }, + medias := []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecJPEG}, + {Name: core.CodecRAW}, }, }, - core.NewWriteBuffer(nil), + } + wr := core.NewWriteBuffer(nil) + return &Consumer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "mjpeg", + Medias: medias, + Transport: wr, + }, + wr: wr, } } @@ -53,8 +57,3 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { return c.wr.WriteTo(wr) } - -func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() - return c.wr.Close() -} diff --git a/pkg/mjpeg/producer.go b/pkg/mjpeg/producer.go deleted file mode 100644 index 5b3522528..000000000 --- a/pkg/mjpeg/producer.go +++ /dev/null @@ -1,61 +0,0 @@ -package mjpeg - -import ( - "encoding/json" - - "github.com/AlexxIT/go2rtc/pkg/core" -) - -func (c *Client) GetMedias() []*core.Media { - if c.medias == nil { - c.medias = []*core.Media{{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecJPEG, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - }, - }, - }} - } - return c.medias -} - -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - if c.receiver == nil { - c.receiver = core.NewReceiver(media, codec) - } - return c.receiver, nil -} - -func (c *Client) Start() error { - // https://github.com/AlexxIT/go2rtc/issues/278 - return c.Handle() -} - -func (c *Client) Stop() error { - if c.receiver != nil { - c.receiver.Close() - } - // important for close reader/writer gorutines - _ = c.res.Body.Close() - c.closed = true - return nil -} - -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "JPEG active producer", - URL: c.res.Request.URL.String(), - RemoteAddr: c.RemoteAddr, - UserAgent: c.UserAgent, - Medias: c.medias, - Recv: c.recv, - } - if c.receiver != nil { - info.Receivers = []*core.Receiver{c.receiver} - } - return json.Marshal(info) -} diff --git a/pkg/mp4/consumer.go b/pkg/mp4/consumer.go index 83b2d2e30..348498638 100644 --- a/pkg/mp4/consumer.go +++ b/pkg/mp4/consumer.go @@ -14,7 +14,7 @@ import ( ) type Consumer struct { - core.SuperConsumer + core.Connection wr *core.WriteBuffer muxer *Muxer mu sync.Mutex @@ -47,12 +47,17 @@ func NewConsumer(medias []*core.Media) *Consumer { } } - cons := &Consumer{ + wr := core.NewWriteBuffer(nil) + return &Consumer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "mp4", + Medias: medias, + Transport: wr, + }, muxer: &Muxer{}, - wr: core.NewWriteBuffer(nil), + wr: wr, } - cons.Medias = medias - return cons } func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { @@ -182,8 +187,3 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { return c.wr.WriteTo(wr) } - -func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() - return c.wr.Close() -} diff --git a/pkg/mp4/keyframe.go b/pkg/mp4/keyframe.go index 25a6983d6..399f95e7e 100644 --- a/pkg/mp4/keyframe.go +++ b/pkg/mp4/keyframe.go @@ -10,11 +10,12 @@ import ( ) type Keyframe struct { - core.SuperConsumer + core.Connection wr *core.WriteBuffer muxer *Muxer } +// Deprecated: should be rewritten func NewKeyframe(medias []*core.Media) *Keyframe { if medias == nil { medias = []*core.Media{ @@ -29,9 +30,15 @@ func NewKeyframe(medias []*core.Media) *Keyframe { } } + wr := core.NewWriteBuffer(nil) cons := &Keyframe{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "mp4", + Transport: wr, + }, muxer: &Muxer{}, - wr: core.NewWriteBuffer(nil), + wr: wr, } cons.Medias = medias return cons @@ -95,8 +102,3 @@ func (c *Keyframe) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv func (c *Keyframe) WriteTo(wr io.Writer) (int64, error) { return c.wr.WriteTo(wr) } - -func (c *Keyframe) Stop() error { - _ = c.SuperConsumer.Close() - return c.wr.Close() -} diff --git a/pkg/mpegts/consumer.go b/pkg/mpegts/consumer.go index eb0902fc9..fcb57c747 100644 --- a/pkg/mpegts/consumer.go +++ b/pkg/mpegts/consumer.go @@ -11,17 +11,13 @@ import ( ) type Consumer struct { - core.SuperConsumer + core.Connection muxer *Muxer wr *core.WriteBuffer } func NewConsumer() *Consumer { - c := &Consumer{ - muxer: NewMuxer(), - wr: core.NewWriteBuffer(nil), - } - c.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindVideo, Direction: core.DirectionSendonly, @@ -38,7 +34,17 @@ func NewConsumer() *Consumer { }, }, } - return c + wr := core.NewWriteBuffer(nil) + return &Consumer{ + core.Connection{ + ID: core.NewID(), + FormatName: "mpegts", + Medias: medias, + Transport: wr, + }, + NewMuxer(), + wr, + } } func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { @@ -110,14 +116,9 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { return c.wr.WriteTo(wr) } -func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() - return c.wr.Close() -} - -func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) { - if codec.ClockRate == ClockRate { - return - } - rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate) -} +//func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) { +// if codec.ClockRate == ClockRate { +// return +// } +// rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate) +//} diff --git a/pkg/mpegts/producer.go b/pkg/mpegts/producer.go index 78f320a28..2c72d8aaf 100644 --- a/pkg/mpegts/producer.go +++ b/pkg/mpegts/producer.go @@ -13,12 +13,19 @@ import ( ) type Producer struct { - core.SuperProducer + core.Connection rd *core.ReadBuffer } func Open(rd io.Reader) (*Producer, error) { - prod := &Producer{rd: core.NewReadBuffer(rd)} + prod := &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "mpegts", + Transport: rd, + }, + rd: core.NewReadBuffer(rd), + } if err := prod.probe(); err != nil { return nil, err } @@ -26,7 +33,7 @@ func Open(rd io.Reader) (*Producer, error) { } func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver, _ := c.SuperProducer.GetTrack(media, codec) + receiver, _ := c.Connection.GetTrack(media, codec) receiver.ID = StreamType(codec) return receiver, nil } @@ -40,6 +47,8 @@ func (c *Producer) Start() error { return err } + c.Recv += len(pkt.Payload) + //log.Printf("[mpegts] size: %6d, muxer: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType) for _, receiver := range c.Receivers { @@ -52,11 +61,6 @@ func (c *Producer) Start() error { } } -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.rd.Close() -} - func (c *Producer) probe() error { c.rd.BufferSize = core.ProbeSize defer c.rd.Reset() diff --git a/pkg/multipart/multipart.go b/pkg/mpjpeg/multipart.go similarity index 98% rename from pkg/multipart/multipart.go rename to pkg/mpjpeg/multipart.go index aea1b8281..abceea437 100644 --- a/pkg/multipart/multipart.go +++ b/pkg/mpjpeg/multipart.go @@ -1,4 +1,4 @@ -package multipart +package mpjpeg import ( "bufio" diff --git a/pkg/mpjpeg/producer.go b/pkg/mpjpeg/producer.go new file mode 100644 index 000000000..a8d5e16aa --- /dev/null +++ b/pkg/mpjpeg/producer.go @@ -0,0 +1,65 @@ +package mpjpeg + +import ( + "bufio" + "errors" + "io" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type Producer struct { + core.Connection + rd *bufio.Reader +} + +func Open(rd io.Reader) (*Producer, error) { + return &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "mpjpeg", // Multipart JPEG + Transport: rd, + Medias: []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + }, + }, + }, + }, nil +} + +func (c *Producer) Start() error { + if len(c.Receivers) != 1 { + return errors.New("mjpeg: no receivers") + } + + rd := bufio.NewReader(c.Transport.(io.Reader)) + + mjpeg := c.Receivers[0] + + for { + _, body, err := Next(rd) + if err != nil { + return err + } + + c.Recv += len(body) + + if mjpeg != nil { + packet := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: body, + } + mjpeg.WriteRTP(packet) + } + } +} diff --git a/pkg/multipart/producer.go b/pkg/multipart/producer.go deleted file mode 100644 index 70a2c5475..000000000 --- a/pkg/multipart/producer.go +++ /dev/null @@ -1,68 +0,0 @@ -package multipart - -import ( - "bufio" - "errors" - "io" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -type Producer struct { - core.SuperProducer - closer io.Closer - reader *bufio.Reader -} - -func Open(rd io.Reader) (*Producer, error) { - prod := &Producer{ - closer: rd.(io.Closer), - reader: bufio.NewReader(rd), - } - prod.Medias = []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecJPEG, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - }, - }, - }, - } - prod.Type = "Multipart producer" - return prod, nil -} - -func (c *Producer) Start() error { - if len(c.Receivers) != 1 { - return errors.New("mjpeg: no receivers") - } - - mjpeg := c.Receivers[0] - - for { - _, body, err := Next(c.reader) - if err != nil { - return err - } - - c.Recv += len(body) - - if mjpeg != nil { - packet := &rtp.Packet{ - Header: rtp.Header{Timestamp: core.Now90000()}, - Payload: body, - } - mjpeg.WriteRTP(packet) - } - } -} - -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.closer.Close() -} diff --git a/pkg/nest/client.go b/pkg/nest/client.go index 2169773b3..0b243384b 100644 --- a/pkg/nest/client.go +++ b/pkg/nest/client.go @@ -48,8 +48,10 @@ func Dial(rawURL string) (*Client, error) { } conn := webrtc.NewConn(pc) - conn.Desc = "Nest" + conn.FormatName = "nest/webrtc" conn.Mode = core.ModeActiveProducer + conn.Protocol = "http" + conn.URL = rawURL // https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields medias := []*core.Media{ diff --git a/pkg/probe/probe.go b/pkg/probe/producer.go similarity index 72% rename from pkg/probe/probe.go rename to pkg/probe/producer.go index 61a2b3616..1fbd3efb4 100644 --- a/pkg/probe/probe.go +++ b/pkg/probe/producer.go @@ -8,17 +8,11 @@ import ( ) type Probe struct { - Type string `json:"type,omitempty"` - RemoteAddr string `json:"remote_addr,omitempty"` - UserAgent string `json:"user_agent,omitempty"` - Medias []*core.Media `json:"medias,omitempty"` - Receivers []*core.Receiver `json:"receivers,omitempty"` - Senders []*core.Sender `json:"senders,omitempty"` + core.Connection } func NewProbe(query url.Values) *Probe { - c := &Probe{Type: "probe"} - c.Medias = core.ParseQuery(query) + medias := core.ParseQuery(query) for _, value := range query["microphone"] { media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly} @@ -32,10 +26,16 @@ func NewProbe(query url.Values) *Probe { media.Codecs = append(media.Codecs, &core.Codec{Name: name}) } - c.Medias = append(c.Medias, media) + medias = append(medias, media) } - return c + return &Probe{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "probe", + Medias: medias, + }, + } } func (p *Probe) GetMedias() []*core.Media { diff --git a/pkg/roborock/client.go b/pkg/roborock/client.go index 522b0e13a..ef221e659 100644 --- a/pkg/roborock/client.go +++ b/pkg/roborock/client.go @@ -18,6 +18,7 @@ import ( pion "github.com/pion/webrtc/v3" ) +// Deprecated: should be rewritten to core.Connection type Client struct { core.Listener @@ -110,8 +111,10 @@ func (c *Client) Connect() error { var sendOffer sync.WaitGroup c.conn = webrtc.NewConn(pc) - c.conn.Desc = "Roborock" + c.conn.FormatName = "roborock" c.conn.Mode = core.ModeActiveProducer + c.conn.Protocol = "mqtt" + c.conn.URL = c.url c.conn.Listen(func(msg any) { switch msg := msg.(type) { case *pion.ICECandidate: diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index aff8e23ca..138d727dc 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -8,10 +8,11 @@ import ( "strings" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv" "github.com/AlexxIT/go2rtc/pkg/tcp" ) -func DialPlay(rawURL string) (core.Producer, error) { +func DialPlay(rawURL string) (*flv.Producer, error) { u, err := url.Parse(rawURL) if err != nil { return nil, err @@ -22,16 +23,16 @@ func DialPlay(rawURL string) (core.Producer, error) { return nil, err } - rtmpConn, err := NewClient(conn, u) + client, err := NewClient(conn, u) if err != nil { return nil, err } - if err = rtmpConn.play(); err != nil { + if err = client.play(); err != nil { return nil, err } - return rtmpConn.Producer() + return client.Producer() } func DialPublish(rawURL string) (io.Writer, error) { diff --git a/pkg/rtmp/flv.go b/pkg/rtmp/flv.go index 87bef0a87..350f4c3c0 100644 --- a/pkg/rtmp/flv.go +++ b/pkg/rtmp/flv.go @@ -1,11 +1,10 @@ package rtmp import ( - "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/flv" ) -func (c *Conn) Producer() (core.Producer, error) { +func (c *Conn) Producer() (*flv.Producer, error) { c.rdBuf = []byte{ 'F', 'L', 'V', // signature 1, // version @@ -13,7 +12,17 @@ func (c *Conn) Producer() (core.Producer, error) { 0, 0, 0, 9, // header size } - return flv.Open(c) + prod, err := flv.Open(c) + if err != nil { + return nil, err + } + + prod.FormatName = "rtmp" + prod.Protocol = "rtmp" + prod.RemoteAddr = c.conn.RemoteAddr().String() + prod.URL = c.url + + return prod, nil } // Read - convert RTMP to FLV format diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index 9002d0a1e..352c00a19 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -20,7 +20,13 @@ import ( var Timeout = time.Second * 5 func NewClient(uri string) *Conn { - return &Conn{uri: uri} + return &Conn{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "rtsp", + }, + uri: uri, + } } func (c *Conn) Dial() (err error) { @@ -36,8 +42,10 @@ func (c *Conn) Dial() (err error) { timeout = time.Second * time.Duration(c.Timeout) } conn, err = tcp.Dial(c.URL, timeout) + c.Protocol = "rtsp+tcp" } else { conn, err = websocket.Dial(c.Transport) + c.Protocol = "ws" } if err != nil { return @@ -53,6 +61,10 @@ func (c *Conn) Dial() (err error) { c.sequence = 0 c.state = StateConn + c.Connection.RemoteAddr = conn.RemoteAddr().String() + c.Connection.Transport = conn + c.Connection.URL = c.uri + return nil } @@ -143,7 +155,7 @@ func (c *Conn) Describe() error { } } - c.sdp = string(res.Body) // for info + c.SDP = string(res.Body) // for info medias, err := UnmarshalSDP(res.Body) if err != nil { diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 1d9edf063..0c2009d7f 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -18,6 +18,7 @@ import ( ) type Conn struct { + core.Connection core.Listener // public @@ -30,9 +31,7 @@ type Conn struct { Timeout int Transport string // custom transport support, ex. RTSP over WebSocket - Medias []*core.Media - UserAgent string - URL *url.URL + URL *url.URL // internal @@ -44,19 +43,10 @@ type Conn struct { reader *bufio.Reader sequence int session string - sdp string uri string state State stateMu sync.Mutex - - receivers []*core.Receiver - senders []*core.Sender - - // stats - - recv int - send int } const ( @@ -114,7 +104,7 @@ func (c *Conn) Handle() (err error) { // polling frames from remote RTSP Server (ex Camera) timeout = time.Second * 5 - if len(c.receivers) == 0 { + if len(c.Receivers) == 0 { // if we only send audio to camera // https://github.com/AlexxIT/go2rtc/issues/659 timeout += keepaliveDT @@ -239,7 +229,7 @@ func (c *Conn) Handle() (err error) { return } - c.recv += int(size) + c.Recv += int(size) if channelID&1 == 0 { packet := &rtp.Packet{} @@ -247,7 +237,7 @@ func (c *Conn) Handle() (err error) { return } - for _, receiver := range c.receivers { + for _, receiver := range c.Receivers { if receiver.ID == channelID { receiver.WriteRTP(packet) break diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index 79e2b3485..b6df188fa 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -18,15 +18,6 @@ func (c *Conn) GetMedias() []*core.Media { } func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) (err error) { - core.Assert(media.Direction == core.DirectionSendonly) - - for _, sender := range c.senders { - if sender.Codec == codec { - sender.HandleRTP(track) - return - } - } - var channel byte switch c.mode { @@ -47,12 +38,12 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv c.state = StateSetup case core.ModePassiveConsumer: - channel = byte(len(c.senders)) * 2 + channel = byte(len(c.Senders)) * 2 // for consumer is better to use original track codec codec = track.Codec.Clone() // generate new payload type, starting from 96 - codec.PayloadType = byte(96 + len(c.senders)) + codec.PayloadType = byte(96 + len(c.Senders)) default: panic(core.Caller()) @@ -70,7 +61,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv sender.HandleRTP(track) - c.senders = append(c.senders, sender) + c.Senders = append(c.Senders, sender) return nil } @@ -99,7 +90,7 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. } //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf)) if _, err := c.conn.Write(buf[:n]); err == nil { - c.send += n + c.Send += n } n = 0 } diff --git a/pkg/rtsp/producer.go b/pkg/rtsp/producer.go index d0f36a1c6..de1158085 100644 --- a/pkg/rtsp/producer.go +++ b/pkg/rtsp/producer.go @@ -10,7 +10,7 @@ import ( func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { core.Assert(media.Direction == core.DirectionRecvonly) - for _, track := range c.receivers { + for _, track := range c.Receivers { if track.Codec == codec { return track, nil } @@ -34,7 +34,7 @@ func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, e track := core.NewReceiver(media, codec) track.ID = channel - c.receivers = append(c.receivers, track) + c.Receivers = append(c.Receivers, track) return track, nil } @@ -81,10 +81,10 @@ func (c *Conn) Start() (err error) { } func (c *Conn) Stop() (err error) { - for _, receiver := range c.receivers { + for _, receiver := range c.Receivers { receiver.Close() } - for _, sender := range c.senders { + for _, sender := range c.Senders { sender.Close() } @@ -99,25 +99,7 @@ func (c *Conn) Stop() (err error) { } func (c *Conn) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "RTSP " + c.mode.String(), - SDP: c.sdp, - UserAgent: c.UserAgent, - Medias: c.Medias, - Receivers: c.receivers, - Senders: c.senders, - Recv: c.recv, - Send: c.send, - } - - if c.URL != nil { - info.URL = c.URL.String() - } - if c.conn != nil { - info.RemoteAddr = c.conn.RemoteAddr().String() - } - - return json.Marshal(info) + return json.Marshal(c.Connection) } func (c *Conn) Reconnect() error { @@ -135,12 +117,12 @@ func (c *Conn) Reconnect() error { } // restore previous medias - for _, receiver := range c.receivers { + for _, receiver := range c.Receivers { if _, err := c.SetupMedia(receiver.Media); err != nil { return err } } - for _, sender := range c.senders { + for _, sender := range c.Senders { if _, err := c.SetupMedia(sender.Media); err != nil { return err } diff --git a/pkg/rtsp/server.go b/pkg/rtsp/server.go index 8e0d31341..7953b0dc8 100644 --- a/pkg/rtsp/server.go +++ b/pkg/rtsp/server.go @@ -14,10 +14,16 @@ import ( ) func NewServer(conn net.Conn) *Conn { - c := new(Conn) - c.conn = conn - c.reader = bufio.NewReader(conn) - return c + return &Conn{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "rtsp", + Protocol: "rtsp+tcp", + RemoteAddr: conn.RemoteAddr().String(), + }, + conn: conn, + reader: bufio.NewReader(conn), + } } func (c *Conn) Auth(username, password string) { @@ -70,7 +76,7 @@ func (c *Conn) Accept() error { return errors.New("wrong content type") } - c.sdp = string(req.Body) // for info + c.SDP = string(req.Body) // for info c.Medias, err = UnmarshalSDP(req.Body) if err != nil { @@ -81,7 +87,7 @@ func (c *Conn) Accept() error { for i, media := range c.Medias { track := core.NewReceiver(media, media.Codecs[0]) track.ID = byte(i * 2) - c.receivers = append(c.receivers, track) + c.Receivers = append(c.Receivers, track) } c.mode = core.ModePassiveProducer @@ -96,7 +102,7 @@ func (c *Conn) Accept() error { c.mode = core.ModePassiveConsumer c.Fire(MethodDescribe) - if c.senders == nil { + if c.Senders == nil { res := &tcp.Response{ Status: "404 Not Found", Request: req, @@ -113,7 +119,7 @@ func (c *Conn) Accept() error { // convert tracks to real output medias medias var medias []*core.Media - for i, track := range c.senders { + for i, track := range c.Senders { media := &core.Media{ Kind: core.GetKind(track.Codec.Name), Direction: core.DirectionRecvonly, @@ -128,7 +134,7 @@ func (c *Conn) Accept() error { return err } - c.sdp = string(res.Body) // for info + c.SDP = string(res.Body) // for info if err = c.WriteResponse(res); err != nil { return err @@ -148,9 +154,9 @@ func (c *Conn) Accept() error { c.state = StateSetup if c.mode == core.ModePassiveConsumer { - if i := reqTrackID(req); i >= 0 && i < len(c.senders) { + if i := reqTrackID(req); i >= 0 && i < len(c.Senders) { // mark sender as SETUP - c.senders[i].Media.ID = MethodSetup + c.Senders[i].Media.ID = MethodSetup tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1) res.Header.Set("Transport", tr) } else { @@ -170,7 +176,7 @@ func (c *Conn) Accept() error { case MethodRecord, MethodPlay: if c.mode == core.ModePassiveConsumer { // stop unconfigured senders - for _, track := range c.senders { + for _, track := range c.Senders { if track.Media.ID != MethodSetup { track.Close() } diff --git a/pkg/stdin/consumer.go b/pkg/stdin/backchannel.go similarity index 88% rename from pkg/stdin/consumer.go rename to pkg/stdin/backchannel.go index a1284948d..b9a4a6d45 100644 --- a/pkg/stdin/consumer.go +++ b/pkg/stdin/backchannel.go @@ -49,10 +49,12 @@ func (c *Client) Stop() (err error) { } func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "Exec active consumer", - Medias: c.medias, - Send: c.send, + info := &core.Connection{ + ID: core.ID(c), + FormatName: "exec", + Protocol: "pipe", + Medias: c.medias, + Send: c.send, } if c.sender != nil { info.Senders = []*core.Sender{c.sender} diff --git a/pkg/stdin/client.go b/pkg/stdin/client.go index 51db30eed..09e525ad1 100644 --- a/pkg/stdin/client.go +++ b/pkg/stdin/client.go @@ -6,6 +6,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" ) +// Deprecated: should be rewritten to core.Connection type Client struct { cmd *exec.Cmd diff --git a/pkg/tapo/consumer.go b/pkg/tapo/backchannel.go similarity index 100% rename from pkg/tapo/consumer.go rename to pkg/tapo/backchannel.go diff --git a/pkg/tapo/client.go b/pkg/tapo/client.go index ed79e500f..3585011c4 100644 --- a/pkg/tapo/client.go +++ b/pkg/tapo/client.go @@ -23,6 +23,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/tcp" ) +// Deprecated: should be rewritten to core.Connection type Client struct { core.Listener diff --git a/pkg/tapo/producer.go b/pkg/tapo/producer.go index ac213e159..7d66d9070 100644 --- a/pkg/tapo/producer.go +++ b/pkg/tapo/producer.go @@ -2,6 +2,7 @@ package tapo import ( "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mpegts" ) @@ -74,15 +75,20 @@ func (c *Client) Stop() error { } func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "Tapo active producer", - Medias: c.medias, - Recv: c.recv, - Receivers: c.receivers, - Send: c.send, + info := &core.Connection{ + ID: core.ID(c), + FormatName: "tapo", + Protocol: "http", + Medias: c.medias, + Recv: c.recv, + Receivers: c.receivers, + Send: c.send, } if c.sender != nil { info.Senders = []*core.Sender{c.sender} } + if c.conn1 != nil { + info.RemoteAddr = c.conn1.RemoteAddr().String() + } return json.Marshal(info) } diff --git a/pkg/tcp/helpers.go b/pkg/tcp/helpers.go deleted file mode 100644 index 9db42a896..000000000 --- a/pkg/tcp/helpers.go +++ /dev/null @@ -1,12 +0,0 @@ -package tcp - -import ( - "net/http" -) - -func RemoteAddr(r *http.Request) string { - if remote := r.Header.Get("X-Forwarded-For"); remote != "" { - return remote + ", " + r.RemoteAddr - } - return r.RemoteAddr -} diff --git a/pkg/wav/wav.go b/pkg/wav/producer.go similarity index 89% rename from pkg/wav/wav.go rename to pkg/wav/producer.go index 5f572bd64..63f6d01af 100644 --- a/pkg/wav/wav.go +++ b/pkg/wav/producer.go @@ -54,22 +54,27 @@ func Open(r io.Reader) (*Producer, error) { return nil, errors.New("waw: unsupported codec") } - prod := &Producer{rd: rd, cl: r.(io.Closer)} - prod.Type = "WAV producer" - prod.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindAudio, Direction: core.DirectionRecvonly, Codecs: []*core.Codec{codec}, }, } - return prod, nil + return &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "wav", + Medias: medias, + Transport: r, + }, + rd: rd, + }, nil } type Producer struct { - core.SuperProducer + core.Connection rd *bufio.Reader - cl io.Closer } func (c *Producer) Start() error { @@ -106,11 +111,6 @@ func (c *Producer) Start() error { } } -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.cl.Close() -} - func readChunk(r io.Reader) (chunkID string, data []byte, err error) { b := make([]byte, 8) if _, err = io.ReadFull(r, b); err != nil { diff --git a/pkg/webrtc/client.go b/pkg/webrtc/client.go index 50c7773df..9a7a7b2f8 100644 --- a/pkg/webrtc/client.go +++ b/pkg/webrtc/client.go @@ -71,7 +71,7 @@ func (c *Conn) SetAnswer(answer string) (err error) { return } - c.medias = UnmarshalMedias(sd.MediaDescriptions) + c.Medias = UnmarshalMedias(sd.MediaDescriptions) return nil } diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 0e10874eb..3e3ecc4f1 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -1,6 +1,9 @@ package webrtc import ( + "encoding/json" + "fmt" + "strings" "time" "github.com/AlexxIT/go2rtc/pkg/core" @@ -10,28 +13,25 @@ import ( ) type Conn struct { + core.Connection core.Listener - UserAgent string - Desc string - Mode core.Mode + Mode core.Mode `json:"mode"` pc *webrtc.PeerConnection - medias []*core.Media - receivers []*core.Receiver - senders []*core.Sender - - recv int - send int - offer string - remote string closed core.Waiter } func NewConn(pc *webrtc.PeerConnection) *Conn { - c := &Conn{pc: pc} + c := &Conn{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "webrtc", + }, + pc: pc, + } pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { // last candidate will be empty @@ -50,7 +50,15 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { } pc.SCTP().Transport().ICETransport().OnSelectedCandidatePairChange( func(pair *webrtc.ICECandidatePair) { - c.remote = pair.Remote.String() + c.Protocol += "+" + pair.Remote.Protocol.String() + c.RemoteAddr = fmt.Sprintf( + "%s:%d %s", sanitizeIP6(pair.Remote.Address), pair.Remote.Port, pair.Remote.Typ, + ) + if pair.Remote.RelatedAddress != "" { + c.RemoteAddr += fmt.Sprintf( + " %s:%d", sanitizeIP6(pair.Remote.RelatedAddress), pair.Remote.RelatedPort, + ) + } }, ) }) @@ -92,7 +100,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { return } - c.recv += n + c.Recv += n packet := &rtp.Packet{} if err := packet.Unmarshal(b[:n]); err != nil { @@ -121,7 +129,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { switch state { case webrtc.PeerConnectionStateConnected: - for _, sender := range c.senders { + for _, sender := range c.Senders { sender.Start() } case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed: @@ -134,6 +142,10 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { return c } +func (c *Conn) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Connection) +} + func (c *Conn) Close() error { c.closed.Done(nil) return c.pc.Close() @@ -172,7 +184,7 @@ func (c *Conn) getMediaCodec(remote *webrtc.TrackRemote) (*core.Media, *core.Cod } // search Media for this MID - for _, media := range c.medias { + for _, media := range c.Medias { if media.ID != tr.Mid() || media.Direction != core.DirectionRecvonly { continue } @@ -194,3 +206,10 @@ func (c *Conn) getMediaCodec(remote *webrtc.TrackRemote) (*core.Media, *core.Cod return nil, nil } + +func sanitizeIP6(host string) string { + if strings.IndexByte(host, ':') > 0 { + return "[" + host + "]" + } + return host +} diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index 3bcaf49af..2dcab4362 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -1,7 +1,6 @@ package webrtc import ( - "encoding/json" "errors" "github.com/AlexxIT/go2rtc/pkg/core" @@ -12,13 +11,13 @@ import ( ) func (c *Conn) GetMedias() []*core.Media { - return WithResampling(c.medias) + return WithResampling(c.Medias) } func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { core.Assert(media.Direction == core.DirectionSendonly) - for _, sender := range c.senders { + for _, sender := range c.Senders { if sender.Codec == codec { sender.Bind(track) return nil @@ -42,7 +41,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv sender := core.NewSender(media, codec) sender.Handler = func(packet *rtp.Packet) { - c.send += packet.MarshalSize() + c.Send += packet.MarshalSize() //important to send with remote PayloadType _ = localTrack.WriteRTP(payloadType, packet) } @@ -85,20 +84,6 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv sender.HandleRTP(track) } - c.senders = append(c.senders, sender) + c.Senders = append(c.Senders, sender) return nil } - -func (c *Conn) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: c.Desc + " " + c.Mode.String(), - RemoteAddr: c.remote, - UserAgent: c.UserAgent, - Medias: c.medias, - Receivers: c.receivers, - Senders: c.senders, - Recv: c.recv, - Send: c.send, - } - return json.Marshal(info) -} diff --git a/pkg/webrtc/producer.go b/pkg/webrtc/producer.go index d4136a5c8..a0910c39e 100644 --- a/pkg/webrtc/producer.go +++ b/pkg/webrtc/producer.go @@ -8,7 +8,7 @@ import ( func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { core.Assert(media.Direction == core.DirectionRecvonly) - for _, track := range c.receivers { + for _, track := range c.Receivers { if track.Codec == codec { return track, nil } @@ -39,7 +39,7 @@ func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, e } track := core.NewReceiver(media, codec) - c.receivers = append(c.receivers, track) + c.Receivers = append(c.Receivers, track) return track, nil } @@ -47,13 +47,3 @@ func (c *Conn) Start() error { c.closed.Wait() return nil } - -func (c *Conn) Stop() error { - for _, receiver := range c.receivers { - receiver.Close() - } - for _, sender := range c.senders { - sender.Close() - } - return c.pc.Close() -} diff --git a/pkg/webrtc/server.go b/pkg/webrtc/server.go index ce462e453..9cc897785 100644 --- a/pkg/webrtc/server.go +++ b/pkg/webrtc/server.go @@ -42,7 +42,7 @@ func (c *Conn) SetOffer(offer string) (err error) { } } - c.medias = UnmarshalMedias(sd.MediaDescriptions) + c.Medias = UnmarshalMedias(sd.MediaDescriptions) return } @@ -57,7 +57,7 @@ func (c *Conn) GetAnswer() (answer string, err error) { // disable transceivers if we don't have track, make direction=inactive transeivers: for _, tr := range c.pc.GetTransceivers() { - for _, sender := range c.senders { + for _, sender := range c.Senders { if sender.Media.ID == tr.Mid() { continue transeivers } diff --git a/pkg/webtorrent/client.go b/pkg/webtorrent/client.go index de6b21c7c..3594679dc 100644 --- a/pkg/webtorrent/client.go +++ b/pkg/webtorrent/client.go @@ -3,19 +3,21 @@ package webtorrent import ( "encoding/base64" "fmt" + "strconv" + "time" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/gorilla/websocket" pion "github.com/pion/webrtc/v3" - "strconv" - "time" ) func NewClient(tracker, share, pwd string, pc *pion.PeerConnection) (*webrtc.Conn, error) { // 1. Create WebRTC producer prod := webrtc.NewConn(pc) - prod.Desc = "WebRTC/WebTorrent sync" + prod.FormatName = "webtorrent" prod.Mode = core.ModeActiveProducer + prod.Protocol = "ws" medias := []*core.Media{ {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, diff --git a/pkg/y4m/consumer.go b/pkg/y4m/consumer.go index 01bece31c..dd9b46e94 100644 --- a/pkg/y4m/consumer.go +++ b/pkg/y4m/consumer.go @@ -9,14 +9,17 @@ import ( ) type Consumer struct { - core.SuperConsumer + core.Connection wr *core.WriteBuffer } func NewConsumer() *Consumer { + wr := core.NewWriteBuffer(nil) return &Consumer{ - core.SuperConsumer{ - Type: "YUV4MPEG2 passive consumer", + core.Connection{ + ID: core.NewID(), + Transport: wr, + FormatName: "yuv4mpegpipe", Medias: []*core.Media{ { Kind: core.KindVideo, @@ -27,7 +30,7 @@ func NewConsumer() *Consumer { }, }, }, - core.NewWriteBuffer(nil), + wr, } } @@ -60,8 +63,3 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { return c.wr.WriteTo(wr) } - -func (c *Consumer) Stop() error { - _ = c.SuperConsumer.Close() - return c.wr.Close() -} diff --git a/pkg/y4m/producer.go b/pkg/y4m/producer.go index 05f98a6f6..ee2dd731f 100644 --- a/pkg/y4m/producer.go +++ b/pkg/y4m/producer.go @@ -2,7 +2,6 @@ package y4m import ( "bufio" - "bytes" "errors" "io" @@ -19,41 +18,13 @@ func Open(r io.Reader) (*Producer, error) { b = b[:len(b)-1] // remove \n - sdp := string(b) - var fmtp string - - for b != nil { - // YUV4MPEG2 W1280 H720 F24:1 Ip A1:1 C420mpeg2 XYSCSS=420MPEG2 - // https://manned.org/yuv4mpeg.5 - // https://github.com/FFmpeg/FFmpeg/blob/master/libavformat/yuv4mpegenc.c - key := b[0] - var value string - if i := bytes.IndexByte(b, ' '); i > 0 { - value = string(b[1:i]) - b = b[i+1:] - } else { - value = string(b[1:]) - b = nil - } - - switch key { - case 'W': - fmtp = "width=" + value - case 'H': - fmtp += ";height=" + value - case 'C': - fmtp += ";colorspace=" + value - } - } + fmtp := ParseHeader(b) if GetSize(fmtp) == 0 { - return nil, errors.New("y4m: unsupported format: " + sdp) + return nil, errors.New("y4m: unsupported format: " + string(b)) } - prod := &Producer{rd: rd, cl: r.(io.Closer)} - prod.Type = "YUV4MPEG2 producer" - prod.SDP = sdp - prod.Medias = []*core.Media{ + medias := []*core.Media{ { Kind: core.KindVideo, Direction: core.DirectionRecvonly, @@ -67,14 +38,21 @@ func Open(r io.Reader) (*Producer, error) { }, }, } - - return prod, nil + return &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "yuv4mpegpipe", + Medias: medias, + SDP: string(b), + Transport: r, + }, + rd: rd, + }, nil } type Producer struct { - core.SuperProducer + core.Connection rd *bufio.Reader - cl io.Closer } func (c *Producer) Start() error { @@ -103,8 +81,3 @@ func (c *Producer) Start() error { c.Receivers[0].WriteRTP(pkt) } } - -func (c *Producer) Stop() error { - _ = c.SuperProducer.Close() - return c.cl.Close() -} diff --git a/pkg/y4m/y4m.go b/pkg/y4m/y4m.go index 8184ea97d..4ac54da6a 100644 --- a/pkg/y4m/y4m.go +++ b/pkg/y4m/y4m.go @@ -1,6 +1,7 @@ package y4m import ( + "bytes" "image" "github.com/AlexxIT/go2rtc/pkg/core" @@ -10,6 +11,34 @@ const FourCC = "YUV4" const frameHdr = "FRAME\n" +func ParseHeader(b []byte) (fmtp string) { + for b != nil { + // YUV4MPEG2 W1280 H720 F24:1 Ip A1:1 C420mpeg2 XYSCSS=420MPEG2 + // https://manned.org/yuv4mpeg.5 + // https://github.com/FFmpeg/FFmpeg/blob/master/libavformat/yuv4mpegenc.c + key := b[0] + + var value string + if i := bytes.IndexByte(b, ' '); i > 0 { + value = string(b[1:i]) + b = b[i+1:] + } else { + value = string(b[1:]) + b = nil + } + + switch key { + case 'W': + fmtp = "width=" + value + case 'H': + fmtp += ";height=" + value + case 'C': + fmtp += ";colorspace=" + value + } + } + return +} + func GetSize(fmtp string) int { w := core.Atoi(core.Between(fmtp, "width=", ";")) h := core.Atoi(core.Between(fmtp, "height=", ";"))