Skip to content

Commit

Permalink
convert streams to SSE #336
Browse files Browse the repository at this point in the history
  • Loading branch information
umputun committed Jun 18, 2019
1 parent dad51f0 commit e1bec31
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 34 deletions.
10 changes: 5 additions & 5 deletions backend/app/rest/api/rest_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *public) infoStreamCtrl(w http.ResponseWriter, r *http.Request) {
fn := func() steamEventFn {
lastTS := time.Time{}
lastCount := 0
return func() (data []byte, upd bool, err error) {
return func() (event string, data []byte, upd bool, err error) {
key := cache.NewKey(locator.SiteID).ID(URLKey(r)).Scopes(locator.SiteID, locator.URL)
data, err = s.cache.Get(key, func() ([]byte, error) {
info, e := s.dataService.Info(locator, s.readOnlyAge)
Expand All @@ -172,10 +172,10 @@ func (s *public) infoStreamCtrl(w http.ResponseWriter, r *http.Request) {
return encodeJSONWithHTML(info)
})
if err != nil {
return data, false, err
return "info", data, false, err
}

return data, upd, nil
return "info", data, upd, nil
}
}

Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *public) lastCommentsStreamCtrl(w http.ResponseWriter, r *http.Request)

fn := func() steamEventFn {
sinceTime := time.Now()
return func() (data []byte, upd bool, err error) {
return func() (event string, data []byte, upd bool, err error) {
key := cache.NewKey(siteID).ID(URLKey(r)).Scopes(lastCommentsScope)
data, err = s.cache.Get(key, func() ([]byte, error) {
comments, e := s.dataService.Last(siteID, 1, sinceTime, rest.GetUserOrEmpty(r))
Expand All @@ -248,7 +248,7 @@ func (s *public) lastCommentsStreamCtrl(w http.ResponseWriter, r *http.Request)
sinceTime = time.Now()
return encodeJSONWithHTML(comments)
})
return data, upd, err
return "last", data, upd, err
}
}

Expand Down
23 changes: 13 additions & 10 deletions backend/app/rest/api/rest_public_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,11 @@ func TestRest_InfoStream(t *testing.T) {
assert.Equal(t, 200, code)
wg.Wait()

t.Logf(string(body))
recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n")
require.Equal(t, 10, len(recs), "10 records")
assert.True(t, strings.Contains(recs[0], `"count":2`), recs[0])
assert.True(t, strings.Contains(recs[9], `"count":11`), recs[9])
require.Equal(t, 10*3, len(recs), "10 records. each 2 lines +1 emty line")
assert.True(t, strings.Contains(recs[0+1], `"count":2`), recs[0])
assert.True(t, strings.Contains(recs[9*3+1], `"count":11`), recs[9])

_, code = get(t, ts.URL+"/api/v1/stream/info?site=radio-t&url=https://radio-t.com/blah123")
assert.Equal(t, 500, code)
Expand Down Expand Up @@ -659,9 +660,9 @@ func TestRest_InfoStreamCancel(t *testing.T) {
wg.Wait()

recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n")
require.Equal(t, 2, len(recs), "should have 2 records")
assert.True(t, strings.Contains(recs[0], `"count":2`), recs[0])
assert.True(t, strings.Contains(recs[1], `"count":3`), recs[1])
require.Equal(t, 2*3, len(recs), "should have 2 events")
assert.True(t, strings.Contains(recs[0*3+1], `"count":2`), recs[0])
assert.True(t, strings.Contains(recs[1*3+1], `"count":3`), recs[1])
}

func TestRest_Robots(t *testing.T) {
Expand Down Expand Up @@ -706,11 +707,13 @@ func TestRest_LastCommentsStream(t *testing.T) {
assert.Equal(t, 200, r.StatusCode)

wg.Wait()
t.Logf("headers: %+v", r.Header)
assert.Equal(t, "text/event-stream", r.Header.Get("content-type"))

recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n")
require.Equal(t, 9, len(recs), "9 records")
require.Equal(t, 9*3, len(recs), "9 events")
t.Logf("%v", recs)
assert.True(t, strings.Contains(recs[0], `test 123`), recs[0])
assert.True(t, strings.Contains(recs[1], `test 123`), recs[1])
}

func TestRest_LastCommentsStreamTimeout(t *testing.T) {
Expand Down Expand Up @@ -765,8 +768,8 @@ func TestRest_LastCommentsStreamCancel(t *testing.T) {
wg.Wait()

recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n")
require.Equal(t, 2, len(recs), "2 records")
assert.True(t, strings.Contains(recs[0], `test 123`), recs[0])
require.Equal(t, 2*3, len(recs), "2 events")
assert.True(t, strings.Contains(recs[0+1], `test 123`), recs[0+1])
}

func TestRest_LastCommentsStreamTooMany(t *testing.T) {
Expand Down
30 changes: 19 additions & 11 deletions backend/app/rest/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"fmt"
"io"
"net/http"
"sync/atomic"
Expand All @@ -13,18 +14,18 @@ import (

// Streamer creates endless stream of \n separated json records send to remote client
type Streamer struct {
TimeOut time.Duration
Refresh time.Duration
MaxActive int32

TimeOut time.Duration
Refresh time.Duration
MaxActive int32
activeCount int32
}

type steamEventFn func() (data []byte, upd bool, err error)
type steamEventFn func() (event string, data []byte, upd bool, err error)

type steamEventResp struct {
data []byte
err error
data []byte
event string
err error
}

// Activate starts blocking function streaming update created by eventFn to ResponseWriter
Expand All @@ -39,6 +40,10 @@ func (s *Streamer) Activate(ctx context.Context, eventFn func() steamEventFn, w
return errors.New("too many streams")
}

if ww, ok := w.(http.ResponseWriter); ok {
ww.Header().Set("Content-Type", "text/event-stream")
}

for {
select {
case <-ctx.Done(): // request closed by remote client
Expand All @@ -54,7 +59,10 @@ func (s *Streamer) Activate(ctx context.Context, eventFn func() steamEventFn, w
if resp.err != nil {
return resp.err
}
if _, e := w.Write(resp.data); e != nil {

// make server-sent event record
// see https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
if _, e := fmt.Fprintf(w, "event: %s\ndata: %s\n", resp.event, string(resp.data)); e != nil {
return errors.Wrap(e, "send to stream failed")
}
if fw, okFlush := w.(http.Flusher); okFlush {
Expand All @@ -78,13 +86,13 @@ func (s *Streamer) eventsCh(ctx context.Context, fn steamEventFn) <-chan steamEv
case <-ctx.Done(): // request closed by remote client
return
case <-tick.C:
resp, upd, err := fn()
event, resp, upd, err := fn()
if err != nil {
ch <- steamEventResp{data: nil, err: errors.Wrap(err, "can't get stream data")}
ch <- steamEventResp{event: event, data: nil, err: errors.Wrap(err, "can't get stream data")}
return
}
if upd {
ch <- steamEventResp{data: resp, err: nil}
ch <- steamEventResp{event: event, data: resp, err: nil}
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions backend/app/rest/api/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ func TestStream_Timeout(t *testing.T) {

eventFn := func() steamEventFn {
n := 0
return func() (data []byte, upd bool, err error) {
return func() (event string, data []byte, upd bool, err error) {
n++
if n%2 == 0 || n > 10 {
return nil, false, nil
return "test", nil, false, nil
}
return []byte(fmt.Sprintf("some data %d\n", n)), true, nil
return "test", []byte(fmt.Sprintf("some data %d\n", n)), true, nil
}
}

buf := bytes.Buffer{}
err := s.Activate(context.Background(), eventFn, &buf)
assert.NoError(t, err)
assert.Equal(t, "some data 1\nsome data 3\nsome data 5\nsome data 7\nsome data 9\n", buf.String())
assert.Equal(t, "event: test\ndata: some data 1\n\nevent: test\ndata: some data 3\n\nevent: test\ndata: some data 5\n\nevent: test\ndata: some data 7\n\nevent: test\ndata: some data 9\n\n", buf.String())
}

func TestStream_Cancel(t *testing.T) {
Expand All @@ -43,12 +43,12 @@ func TestStream_Cancel(t *testing.T) {

eventFn := func() steamEventFn {
n := 0
return func() (data []byte, upd bool, err error) {
return func() (event string, data []byte, upd bool, err error) {
n++
if n%2 == 0 {
return nil, false, nil
return "test", nil, false, nil
}
return []byte(fmt.Sprintf("some data %d\n", n)), true, nil
return "test", []byte(fmt.Sprintf("some data %d\n", n)), true, nil
}
}

Expand All @@ -57,5 +57,5 @@ func TestStream_Cancel(t *testing.T) {
defer cancel()
err := s.Activate(ctx, eventFn, &buf)
assert.NoError(t, err)
assert.Equal(t, "some data 1\nsome data 3\nsome data 5\nsome data 7\nsome data 9\n", buf.String())
assert.Equal(t, "event: test\ndata: some data 1\n\nevent: test\ndata: some data 3\n\nevent: test\ndata: some data 5\n\nevent: test\ndata: some data 7\n\nevent: test\ndata: some data 9\n\n", buf.String())
}

0 comments on commit e1bec31

Please sign in to comment.