From 05add307eb23b1835cb95e44da0b107f5403fb43 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 10 May 2019 12:12:44 -0700 Subject: [PATCH] read as much as we can in one go --- stream.go | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/stream.go b/stream.go index dfd70d7..0175a16 100644 --- a/stream.go +++ b/stream.go @@ -57,6 +57,19 @@ func (s *Stream) Name() string { return s.name } +// tries to preload pending data +func (s *Stream) preloadData() { + select { + case read, ok := <-s.dataIn: + if !ok { + return + } + s.extra = read + s.exbuf = read + default: + } +} + func (s *Stream) waitForData(ctx context.Context) error { s.deadlineLock.Lock() if !s.rDeadline.IsZero() { @@ -106,21 +119,31 @@ func (s *Stream) returnBuffers() { } func (s *Stream) Read(b []byte) (int, error) { + select { + case <-s.reset: + return 0, streammux.ErrReset + default: + } if s.extra == nil { err := s.waitForData(context.Background()) if err != nil { return 0, err } } - n := copy(b, s.extra) - if n < len(s.extra) { - s.extra = s.extra[n:] - } else { - if s.exbuf != nil { - pool.Put(s.exbuf) + n := 0 + for s.extra != nil && n < len(b) { + read := copy(b[n:], s.extra) + n += read + if read < len(s.extra) { + s.extra = s.extra[read:] + } else { + if s.exbuf != nil { + pool.Put(s.exbuf) + } + s.extra = nil + s.exbuf = nil + s.preloadData() } - s.extra = nil - s.exbuf = nil } return n, nil }