Skip to content

Commit

Permalink
BIG rewrite stream info
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Jun 16, 2024
1 parent ecfe802 commit 96504e2
Show file tree
Hide file tree
Showing 88 changed files with 1,042 additions and 853 deletions.
27 changes: 23 additions & 4 deletions internal/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"
"os/exec"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -80,7 +81,7 @@ func execHandle(rawURL string) (core.Producer, error) {
return handleRTSP(rawURL, cmd, path)
}

func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
}
Expand All @@ -104,12 +105,17 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error
return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
}

if info, ok := prod.(core.Info); ok {
info.SetProtocol("pipe")
setRemoteInfo(info, source, cmd.Args)
}

log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run pipe")

return prod, nil
}

func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error) {
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
Expand All @@ -131,7 +137,7 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
ts := time.Now()

if err := cmd.Start(); err != nil {
log.Error().Err(err).Str("url", url).Msg("[exec]")
log.Error().Err(err).Str("source", source).Msg("[exec]")
return nil, err
}

Expand All @@ -143,13 +149,14 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
select {
case <-time.After(time.Second * 60):
_ = cmd.Process.Kill()
log.Error().Str("url", url).Msg("[exec] timeout")
log.Error().Str("source", source).Msg("[exec] timeout")
return nil, errors.New("exec: timeout")
case <-done:
// limit message size
return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr)
case prod := <-waiter:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp")
setRemoteInfo(prod, source, cmd.Args)
prod.OnClose = func() error {
log.Debug().Msgf("[exec] kill rtsp")
return errors.Join(cmd.Process.Kill(), cmd.Wait())
Expand Down Expand Up @@ -210,3 +217,15 @@ func trimSpace(b []byte) []byte {
}
return b[start:stop]
}

func setRemoteInfo(info core.Info, source string, args []string) {
info.SetSource(source)

if i := slices.Index(args, "-i"); i > 0 && i < len(args)-1 {
rawURL := args[i+1]
if u, err := url.Parse(rawURL); err == nil && u.Host != "" {
info.SetRemoteAddr(u.Host)
info.SetURL(rawURL)
}
}
}
7 changes: 4 additions & 3 deletions internal/ffmpeg/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type Producer struct {
core.SuperProducer
core.Connection
url string
query url.Values
ffmpeg core.Producer
Expand All @@ -31,7 +31,8 @@ func NewProducer(url string) (core.Producer, error) {
return nil, errors.New("ffmpeg: unsupported params: " + url[i:])
}

p.Type = "FFmpeg producer"
p.ID = core.NewID()
p.FormatName = "ffmpeg"
p.Medias = []*core.Media{
{
// we can support only audio, because don't know FmtpLine for H264 and PayloadType for MJPEG
Expand Down Expand Up @@ -81,7 +82,7 @@ func (p *Producer) Stop() error {

func (p *Producer) MarshalJSON() ([]byte, error) {
if p.ffmpeg == nil {
return json.Marshal(p.SuperProducer)
return json.Marshal(p.Connection)
}
return json.Marshal(p.ffmpeg)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/hass/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func apiStream(w http.ResponseWriter, r *http.Request) {
return
}

s, err = webrtc.ExchangeSDP(stream, string(offer), "WebRTC/Hass sync", r.UserAgent())
s, err = webrtc.ExchangeSDP(stream, string(offer), "hass/webrtc", r.UserAgent())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
11 changes: 4 additions & 7 deletions internal/hls/hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -63,15 +62,13 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
medias := mp4.ParseQuery(r.URL.Query())
if medias != nil {
c := mp4.NewConsumer(medias)
c.Type = "HLS/fMP4 consumer"
c.RemoteAddr = tcp.RemoteAddr(r)
c.UserAgent = r.UserAgent()
c.FormatName = "hls/fmp4"
c.WithRequest(r)
cons = c
} else {
c := mpegts.NewConsumer()
c.Type = "HLS/TS consumer"
c.RemoteAddr = tcp.RemoteAddr(r)
c.UserAgent = r.UserAgent()
c.FormatName = "hls/mpegts"
c.WithRequest(r)
cons = c
}

Expand Down
6 changes: 2 additions & 4 deletions internal/hls/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)

func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
Expand All @@ -20,9 +19,8 @@ func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
codecs := msg.String()
medias := mp4.ParseCodecs(codecs, true)
cons := mp4.NewConsumer(medias)
cons.Type = "HLS/fMP4 consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.FormatName = "hls/fmp4"
cons.WithRequest(tr.Request)

log.Trace().Msgf("[hls] new ws consumer codecs=%s", codecs)

Expand Down
29 changes: 21 additions & 8 deletions internal/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/hls"
"github.com/AlexxIT/go2rtc/pkg/image"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)

Expand Down Expand Up @@ -45,6 +45,21 @@ func handleHTTP(rawURL string) (core.Producer, error) {
}
}

prod, err := do(req)
if err != nil {
return nil, err
}

if info, ok := prod.(core.Info); ok {
info.SetProtocol("http")
info.SetRemoteAddr(req.URL.Host) // TODO: rewrite to net.Conn
info.SetURL(rawURL)
}

return prod, nil
}

func do(req *http.Request) (core.Producer, error) {
res, err := tcp.Do(req)
if err != nil {
return nil, err
Expand All @@ -66,14 +81,12 @@ func handleHTTP(rawURL string) (core.Producer, error) {
}

switch {
case ct == "image/jpeg":
return mjpeg.NewClient(res), nil

case ct == "multipart/x-mixed-replace":
return multipart.Open(res.Body)

case ct == "application/vnd.apple.mpegurl" || ext == "m3u8":
return hls.OpenURL(req.URL, res.Body)
case ct == "image/jpeg":
return image.Open(res)
case ct == "multipart/x-mixed-replace":
return mpjpeg.Open(res.Body)
}

return magic.Open(res.Body)
Expand Down
27 changes: 11 additions & 16 deletions internal/mjpeg/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/y4m"
"github.com/rs/zerolog"
)
Expand All @@ -44,8 +44,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
}

cons := magic.NewKeyframe()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)

if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
Expand Down Expand Up @@ -100,8 +99,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) {
}

cons := mjpeg.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)

if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Msg("[api.mjpeg] add consumer")
Expand All @@ -117,7 +115,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) {
wr := mjpeg.NewWriter(w)
_, _ = cons.WriteTo(wr)
} else {
cons.Type = "ASCII passive consumer "
cons.FormatName = "ascii"

query := r.URL.Query()
wr := ascii.NewWriter(w, query.Get("color"), query.Get("back"), query.Get("text"))
Expand All @@ -135,17 +133,16 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) {
return
}

res := &http.Response{Body: r.Body, Header: r.Header, Request: r}
res.Header.Set("Content-Type", "multipart/mixed;boundary=")
prod, _ := mpjpeg.Open(r.Body)
prod.WithRequest(r)

client := mjpeg.NewClient(res)
stream.AddProducer(client)
stream.AddProducer(prod)

if err := client.Start(); err != nil && err != io.EOF {
if err := prod.Start(); err != nil && err != io.EOF {
log.Warn().Err(err).Caller().Send()
}

stream.RemoveProducer(client)
stream.RemoveProducer(prod)
}

func handlerWS(tr *ws.Transport, _ *ws.Message) error {
Expand All @@ -155,8 +152,7 @@ func handlerWS(tr *ws.Transport, _ *ws.Message) error {
}

cons := mjpeg.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.WithRequest(tr.Request)

if err := stream.AddConsumer(cons); err != nil {
log.Debug().Err(err).Msg("[mjpeg] add consumer")
Expand All @@ -183,8 +179,7 @@ func apiStreamY4M(w http.ResponseWriter, r *http.Request) {
}

cons := y4m.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)

if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
Expand Down
7 changes: 3 additions & 4 deletions internal/mp4/mp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -100,9 +99,9 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {

medias := mp4.ParseQuery(r.URL.Query())
cons := mp4.NewConsumer(medias)
cons.Type = "MP4/HTTP active consumer"
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.FormatName = "mp4"
cons.Protocol = "http"
cons.WithRequest(r)

if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
Expand Down
10 changes: 3 additions & 7 deletions internal/mp4/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)

func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
Expand All @@ -24,9 +23,8 @@ func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
}

cons := mp4.NewConsumer(medias)
cons.Type = "MSE/WebSocket active consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.FormatName = "mse/fmp4"
cons.WithRequest(tr.Request)

if err := stream.AddConsumer(cons); err != nil {
log.Debug().Err(err).Msg("[mp4] add consumer")
Expand Down Expand Up @@ -57,9 +55,7 @@ func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error {
}

cons := mp4.NewKeyframe(medias)
cons.Type = "MP4/WebSocket active consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.WithRequest(tr.Request)

if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
Expand Down
4 changes: 1 addition & 3 deletions internal/mpegts/aac.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)

func apiStreamAAC(w http.ResponseWriter, r *http.Request) {
Expand All @@ -18,8 +17,7 @@ func apiStreamAAC(w http.ResponseWriter, r *http.Request) {
}

cons := aac.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)

if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
4 changes: 1 addition & 3 deletions internal/mpegts/mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)

func Init() {
Expand All @@ -31,8 +30,7 @@ func outputMpegTS(w http.ResponseWriter, r *http.Request) {
}

cons := mpegts.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)

if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
Loading

0 comments on commit 96504e2

Please sign in to comment.