Skip to content

Commit

Permalink
Info stream (#336)
Browse files Browse the repository at this point in the history
* add /stream/info as a cheap way to subscribe to comment updates #253

* add check for lastTS change to allow proper info streams in no-cache mode

* check write error in info stream and terminate

* flaky info stream test

* add stream info to readme

* separate timeout middleware foe each route's group

* debug info on stream close

* fix test for streams

* stream timeout on inactivity only

* throttle streams to 500

* restore common throttle
  • Loading branch information
umputun authored Jun 3, 2019
1 parent bccf716 commit ca083f4
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 6 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,8 @@ Sort can be `time`, `active` or `score`. Supported sort order with prefix -/+, i
}
```

* `GET /api/v1/info?site=site-idd&url=post-ur` - returns `PostInfo` for site and url
* `GET /api/v1/info?site=site-idd&url=post-url` - returns `PostInfo` for site and url
* `GET /api/v1/stream/info?site=site-idd&url=post-url` - returns stream with `PostInfo` records ("\n" separated) for site and url`

### RSS feeds

Expand Down
2 changes: 2 additions & 0 deletions backend/app/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ func (s *ServerCommand) newServerApp() (*serverApp, error) {
SSLConfig: sslConfig,
UpdateLimiter: s.UpdateLimit,
ImageService: imageService,
StreamTimeOut: time.Minute * 15,
StreamRefresh: time.Second,
}

srv.ScoreThresholds.Low, srv.ScoreThresholds.Critical = s.LowScore, s.CriticalScore
Expand Down
31 changes: 27 additions & 4 deletions backend/app/rest/api/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Rest struct {
}
UpdateLimiter float64

StreamTimeOut time.Duration
StreamRefresh time.Duration

SSLConfig SSLConfig
httpsServer *http.Server
httpServer *http.Server
Expand Down Expand Up @@ -167,8 +170,7 @@ func (s *Rest) makeHTTPServer(port int, router http.Handler) *http.Server {

func (s *Rest) routes() chi.Router {
router := chi.NewRouter()
router.Use(middleware.RealIP, R.Recoverer(log.Default()))
router.Use(middleware.Throttle(1000), middleware.Timeout(60*time.Second))
router.Use(middleware.Throttle(1000), middleware.RealIP, R.Recoverer(log.Default()))
router.Use(R.AppInfo("remark42", "umputun", s.Version), R.Ping)

s.pubRest, s.privRest, s.adminRest, s.rssRest = s.controllerGroups() // assign controllers for groups
Expand All @@ -189,11 +191,13 @@ func (s *Rest) routes() chi.Router {
authHandler, avatarHandler := s.Authenticator.Handlers()

router.Group(func(r chi.Router) {
r.Use(middleware.Timeout(5 * time.Second))
r.Use(logInfoWithBody, tollbooth_chi.LimitHandler(tollbooth.NewLimiter(5, nil)), middleware.NoCache)
r.Mount("/auth", authHandler)
})

router.Group(func(r chi.Router) {
r.Use(middleware.Timeout(5 * time.Second))
r.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(100, nil)), middleware.NoCache)
r.Mount("/avatar", avatarHandler)
})
Expand All @@ -204,13 +208,15 @@ func (s *Rest) routes() chi.Router {
router.Route("/api/v1", func(rapi chi.Router) {

rapi.Group(func(rava chi.Router) {
rava.Use(middleware.Timeout(5 * time.Second))
rava.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(100, nil)))
rava.Use(middleware.NoCache)
rava.Mount("/avatar", avatarHandler)
})

// open routes
rapi.Group(func(ropen chi.Router) {
ropen.Use(middleware.Timeout(30 * time.Second))
ropen.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil)))
ropen.Use(authMiddleware.Trace, middleware.NoCache, logInfoWithBody)
ropen.Get("/config", s.configCtrl)
Expand All @@ -230,17 +236,27 @@ func (s *Rest) routes() chi.Router {
rrss.Get("/site", s.rssRest.siteCommentsCtrl)
rrss.Get("/reply", s.rssRest.repliesCtrl)
})

})

// open routes, streams, no send timeout
rapi.Route("/stream", func(rstream chi.Router) {
rstream.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil)))
rstream.Use(authMiddleware.Trace, middleware.NoCache, logInfoWithBody)
rstream.Get("/info", s.pubRest.infoStreamCtrl)
})

// open routes, cached
rapi.Group(func(ropen chi.Router) {
ropen.Use(middleware.Timeout(30 * time.Second))
ropen.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil)))
ropen.Use(authMiddleware.Trace, logInfoWithBody)
ropen.Get("/picture/{user}/{id}", s.pubRest.loadPictureCtrl)
})

// protected routes, require auth
rapi.Group(func(rauth chi.Router) {
rauth.Use(middleware.Timeout(30 * time.Second))
rauth.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil)))
rauth.Use(authMiddleware.Auth, middleware.NoCache, logInfoWithBody)
rauth.Get("/user", s.privRest.userInfoCtrl)
Expand All @@ -249,6 +265,7 @@ func (s *Rest) routes() chi.Router {

// admin routes, require auth and admin users only
rapi.Route("/admin", func(radmin chi.Router) {
radmin.Use(middleware.Timeout(30 * time.Second))
radmin.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil)))
radmin.Use(authMiddleware.Auth, authMiddleware.AdminOnly)
radmin.Use(middleware.NoCache, logInfoWithBody)
Expand All @@ -273,6 +290,7 @@ func (s *Rest) routes() chi.Router {

// protected routes, throttled to 10/s by default, controlled by external UpdateLimiter param
rapi.Group(func(rauth chi.Router) {
rauth.Use(middleware.Timeout(10 * time.Second))
rauth.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(s.updateLimiter(), nil)))
rauth.Use(authMiddleware.Auth)
rauth.Use(middleware.NoCache)
Expand All @@ -284,7 +302,9 @@ func (s *Rest) routes() chi.Router {
rauth.With(rejectAnonUser).Post("/deleteme", s.privRest.deleteMeCtrl)
})

// protected routes, anonymous rejected
rapi.Group(func(rauth chi.Router) {
rauth.Use(middleware.Timeout(10 * time.Second))
rauth.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(s.updateLimiter(), nil)))
rauth.Use(authMiddleware.Auth, rejectAnonUser)
rauth.Use(logger.New(logger.Log(log.Default()), logger.Prefix("[DEBUG]"), logger.IPfn(ipFn)).Handler)
Expand All @@ -295,7 +315,8 @@ func (s *Rest) routes() chi.Router {

// open routes on root level
router.Group(func(rroot chi.Router) {
tollbooth_chi.LimitHandler(tollbooth.NewLimiter(50, nil))
rroot.Use(middleware.Timeout(10 * time.Second))
rroot.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(50, nil)))
rroot.Get("/index.html", s.pubRest.getStartedCtrl)
rroot.Get("/robots.txt", s.pubRest.robotsCtrl)
})
Expand All @@ -314,6 +335,8 @@ func (s *Rest) controllerGroups() (public, private, admin, rss) {
commentFormatter: s.CommentFormatter,
readOnlyAge: s.ReadOnlyAge,
webRoot: s.WebRoot,
streamTimeOut: s.StreamTimeOut,
streamRefresh: s.StreamRefresh,
}

privGrp := private{
Expand Down Expand Up @@ -416,7 +439,7 @@ func addFileServer(r chi.Router, path string, root http.FileSystem) {
}
path += "*"

r.With(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(20, nil))).
r.With(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(20, nil)), middleware.Timeout(10*time.Second)).
Get(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// don't show dirs, just serve files
if strings.HasSuffix(r.URL.Path, "/") && len(r.URL.Path) > 1 && r.URL.Path != (origPath+"/") {
Expand Down
82 changes: 81 additions & 1 deletion backend/app/rest/api/rest_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
log "github.com/go-pkgz/lgr"
R "github.com/go-pkgz/rest"
"github.com/go-pkgz/rest/cache"

"github.com/umputun/remark/backend/app/rest"
"github.com/umputun/remark/backend/app/store"
"github.com/umputun/remark/backend/app/store/image"
Expand All @@ -30,6 +29,8 @@ type public struct {
commentFormatter *store.CommentFormatter
imageService *image.Service
webRoot string
streamTimeOut time.Duration
streamRefresh time.Duration
}

type pubStore interface {
Expand Down Expand Up @@ -146,6 +147,85 @@ func (s *public) infoCtrl(w http.ResponseWriter, r *http.Request) {
}
}

// GET /stream/info?site=siteID&url=post-url - get info stream about the post
func (s *public) infoStreamCtrl(w http.ResponseWriter, r *http.Request) {
locator := store.Locator{SiteID: r.URL.Query().Get("site"), URL: r.URL.Query().Get("url")}
log.Printf("[DEBUG] start stream for %+v, timeout=%v, refresh=%v", locator, s.streamTimeOut, s.streamRefresh)

key := cache.NewKey(locator.SiteID).ID(URLKey(r)).Scopes(locator.SiteID, locator.URL)
lastTS := time.Time{}
lastCount := 0
info := func() (data []byte, upd bool, err error) {
data, err = s.cache.Get(key, func() ([]byte, error) {
info, e := s.dataService.Info(locator, s.readOnlyAge)
if e != nil {
return nil, e
}
if info.LastTS != lastTS || info.Count != lastCount {
lastTS = info.LastTS
lastCount = info.Count // removal won't update lastTS
upd = true // cache update used as indication of post update. comparing lastTS for no-cache
}
return encodeJSONWithHTML(info)
})
if err != nil {
return data, false, err
}

return data, upd, nil
}

// populate updates to chan, break on remote close
updCh := func() <-chan []byte {
ch := make(chan []byte)
go func() {
tick := time.NewTicker(s.streamRefresh)
defer func() {
close(ch)
tick.Stop()
}()
for {
select {
case <-r.Context().Done(): // request closed by remote client
log.Printf("[DEBUG] info stream closed by remote client, %v", r.Context().Err())
return
case <-tick.C:
resp, upd, err := info()
if err != nil {
rest.SendErrorJSON(w, r, http.StatusBadRequest, err, "can't get post info", rest.ErrPostNotFound)
return
}
if upd {
ch <- resp
}
}
}
}()
return ch
}()

for {
select {
case <-r.Context().Done(): // request closed by remote client
return
case <-time.After(s.streamTimeOut): // request closed by timeout
log.Printf("[DEBUG] info stream closed due to timeout")
return
case resp, ok := <-updCh: // new update
if !ok { // closed
return
}
if _, e := w.Write(resp); e != nil {
log.Printf("[WARN] failed to send stream, %v", e)
return
}
if fw, ok := w.(http.Flusher); ok {
fw.Flush()
}
}
}
}

// GET /last/{limit}?site=siteID&since=unix_ts_msec - last comments for the siteID, across all posts, sorted by time, optionally
// limited with "since" param
func (s *public) lastCommentsCtrl(w http.ResponseWriter, r *http.Request) {
Expand Down
97 changes: 97 additions & 0 deletions backend/app/rest/api/rest_public_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package api

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -524,6 +526,92 @@ func TestRest_Info(t *testing.T) {
assert.Equal(t, 400, code)
}

func TestRest_InfoStream(t *testing.T) {
ts, srv, teardown := startupT(t)
srv.pubRest.readOnlyAge = 10000000 // make sure we don't hit read-only
srv.pubRest.streamRefresh = 1 * time.Millisecond
srv.pubRest.streamTimeOut = 300 * time.Millisecond

postComment(t, ts.URL)

defer teardown()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i < 10; i++ {
time.Sleep(10 * time.Millisecond)
postComment(t, ts.URL)
}
}()

body, code := get(t, ts.URL+"/api/v1/stream/info?site=radio-t&url=https://radio-t.com/blah1")
assert.Equal(t, 200, code)
wg.Wait()

recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n")
require.Equal(t, 10, len(recs), "10 records")
assert.True(t, strings.Contains(recs[0], `"count":1`), recs[0])
assert.True(t, strings.Contains(recs[9], `"count":10`), recs[9])
}

func TestRest_InfoStreamTimeout(t *testing.T) {
ts, srv, teardown := startupT(t)
defer teardown()
srv.pubRest.readOnlyAge = 10000000 // make sure we don't hit read-only
srv.pubRest.streamRefresh = 10 * time.Millisecond
srv.pubRest.streamTimeOut = 450 * time.Millisecond

postComment(t, ts.URL)

st := time.Now()
body, code := get(t, ts.URL+"/api/v1/stream/info?site=radio-t&url=https://radio-t.com/blah1")
assert.Equal(t, 200, code)
assert.True(t, time.Since(st) > time.Millisecond*450 && time.Since(st) < time.Millisecond*500, time.Since(st))
recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n")
require.True(t, len(recs) < 10, "not all for 10 streamed, only %d", len(recs))
}

func TestRest_InfoStreamCancel(t *testing.T) {
ts, srv, teardown := startupT(t)
srv.pubRest.readOnlyAge = 10000000 // make sure we don't hit read-only
srv.pubRest.streamRefresh = 10 * time.Millisecond
srv.pubRest.streamTimeOut = 500 * time.Millisecond

postComment(t, ts.URL)

defer teardown()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i < 10; i++ {
time.Sleep(100 * time.Millisecond)
postComment(t, ts.URL)
}
}()

client := http.Client{}
req, err := http.NewRequest("GET", ts.URL+"/api/v1/stream/info?site=radio-t&url=https://radio-t.com/blah1", nil)
require.Nil(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
defer cancel()
req = req.WithContext(ctx)
r, err := client.Do(req)
require.Nil(t, err)
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
require.EqualError(t, err, "context deadline exceeded")
assert.Equal(t, 200, r.StatusCode)

wg.Wait()

recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n")
require.Equal(t, 2, len(recs), "2 records")
assert.True(t, strings.Contains(recs[0], `"count":1`), recs[0])
assert.True(t, strings.Contains(recs[1], `"count":2`), recs[1])
}

func TestRest_Robots(t *testing.T) {
ts, _, teardown := startupT(t)
defer teardown()
Expand All @@ -534,3 +622,12 @@ func TestRest_Robots(t *testing.T) {
"Allow: /api/v1/last\nAllow: /api/v1/id\nAllow: /api/v1/count\nAllow: /api/v1/counts\n"+
"Allow: /api/v1/list\nAllow: /api/v1/config\nAllow: /api/v1/img\nAllow: /api/v1/avatar\nAllow: /api/v1/picture\n", string(body))
}

func postComment(t *testing.T, url string) {
resp, e := post(t, url+"/api/v1/comment",
`{"text": "test 123", "locator":{"url": "https://radio-t.com/blah1", "site": "radio-t"}}`)
require.Nil(t, e)
b, e := ioutil.ReadAll(resp.Body)
require.Nil(t, e)
require.Equal(t, http.StatusCreated, resp.StatusCode, string(b))
}

0 comments on commit ca083f4

Please sign in to comment.