diff --git a/README.md b/README.md index 4134ff4a63..19b8b937c3 100644 --- a/README.md +++ b/README.md @@ -624,7 +624,8 @@ Sort can be `time`, `active` or `score`. Supported sort order with prefix -/+, i } ``` -* `GET /api/v1/info?site=site-idd&url=post-ur` - returns `PostInfo` for site and url +* `GET /api/v1/info?site=site-idd&url=post-url` - returns `PostInfo` for site and url +* `GET /api/v1/stream/info?site=site-idd&url=post-url` - returns stream with `PostInfo` records ("\n" separated) for site and url` ### RSS feeds diff --git a/backend/app/cmd/server.go b/backend/app/cmd/server.go index 6b3f345f6b..7e8ad8709e 100644 --- a/backend/app/cmd/server.go +++ b/backend/app/cmd/server.go @@ -304,6 +304,8 @@ func (s *ServerCommand) newServerApp() (*serverApp, error) { SSLConfig: sslConfig, UpdateLimiter: s.UpdateLimit, ImageService: imageService, + StreamTimeOut: time.Minute * 15, + StreamRefresh: time.Second, } srv.ScoreThresholds.Low, srv.ScoreThresholds.Critical = s.LowScore, s.CriticalScore diff --git a/backend/app/rest/api/rest.go b/backend/app/rest/api/rest.go index 769719fa01..85955a4d00 100644 --- a/backend/app/rest/api/rest.go +++ b/backend/app/rest/api/rest.go @@ -55,6 +55,9 @@ type Rest struct { } UpdateLimiter float64 + StreamTimeOut time.Duration + StreamRefresh time.Duration + SSLConfig SSLConfig httpsServer *http.Server httpServer *http.Server @@ -167,8 +170,7 @@ func (s *Rest) makeHTTPServer(port int, router http.Handler) *http.Server { func (s *Rest) routes() chi.Router { router := chi.NewRouter() - router.Use(middleware.RealIP, R.Recoverer(log.Default())) - router.Use(middleware.Throttle(1000), middleware.Timeout(60*time.Second)) + router.Use(middleware.Throttle(1000), middleware.RealIP, R.Recoverer(log.Default())) router.Use(R.AppInfo("remark42", "umputun", s.Version), R.Ping) s.pubRest, s.privRest, s.adminRest, s.rssRest = s.controllerGroups() // assign controllers for groups @@ -189,11 +191,13 @@ func (s *Rest) routes() chi.Router { authHandler, avatarHandler := s.Authenticator.Handlers() router.Group(func(r chi.Router) { + r.Use(middleware.Timeout(5 * time.Second)) r.Use(logInfoWithBody, tollbooth_chi.LimitHandler(tollbooth.NewLimiter(5, nil)), middleware.NoCache) r.Mount("/auth", authHandler) }) router.Group(func(r chi.Router) { + r.Use(middleware.Timeout(5 * time.Second)) r.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(100, nil)), middleware.NoCache) r.Mount("/avatar", avatarHandler) }) @@ -204,6 +208,7 @@ func (s *Rest) routes() chi.Router { router.Route("/api/v1", func(rapi chi.Router) { rapi.Group(func(rava chi.Router) { + rava.Use(middleware.Timeout(5 * time.Second)) rava.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(100, nil))) rava.Use(middleware.NoCache) rava.Mount("/avatar", avatarHandler) @@ -211,6 +216,7 @@ func (s *Rest) routes() chi.Router { // open routes rapi.Group(func(ropen chi.Router) { + ropen.Use(middleware.Timeout(30 * time.Second)) ropen.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil))) ropen.Use(authMiddleware.Trace, middleware.NoCache, logInfoWithBody) ropen.Get("/config", s.configCtrl) @@ -230,10 +236,19 @@ func (s *Rest) routes() chi.Router { rrss.Get("/site", s.rssRest.siteCommentsCtrl) rrss.Get("/reply", s.rssRest.repliesCtrl) }) + + }) + + // open routes, streams, no send timeout + rapi.Route("/stream", func(rstream chi.Router) { + rstream.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil))) + rstream.Use(authMiddleware.Trace, middleware.NoCache, logInfoWithBody) + rstream.Get("/info", s.pubRest.infoStreamCtrl) }) // open routes, cached rapi.Group(func(ropen chi.Router) { + ropen.Use(middleware.Timeout(30 * time.Second)) ropen.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil))) ropen.Use(authMiddleware.Trace, logInfoWithBody) ropen.Get("/picture/{user}/{id}", s.pubRest.loadPictureCtrl) @@ -241,6 +256,7 @@ func (s *Rest) routes() chi.Router { // protected routes, require auth rapi.Group(func(rauth chi.Router) { + rauth.Use(middleware.Timeout(30 * time.Second)) rauth.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil))) rauth.Use(authMiddleware.Auth, middleware.NoCache, logInfoWithBody) rauth.Get("/user", s.privRest.userInfoCtrl) @@ -249,6 +265,7 @@ func (s *Rest) routes() chi.Router { // admin routes, require auth and admin users only rapi.Route("/admin", func(radmin chi.Router) { + radmin.Use(middleware.Timeout(30 * time.Second)) radmin.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(10, nil))) radmin.Use(authMiddleware.Auth, authMiddleware.AdminOnly) radmin.Use(middleware.NoCache, logInfoWithBody) @@ -273,6 +290,7 @@ func (s *Rest) routes() chi.Router { // protected routes, throttled to 10/s by default, controlled by external UpdateLimiter param rapi.Group(func(rauth chi.Router) { + rauth.Use(middleware.Timeout(10 * time.Second)) rauth.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(s.updateLimiter(), nil))) rauth.Use(authMiddleware.Auth) rauth.Use(middleware.NoCache) @@ -284,7 +302,9 @@ func (s *Rest) routes() chi.Router { rauth.With(rejectAnonUser).Post("/deleteme", s.privRest.deleteMeCtrl) }) + // protected routes, anonymous rejected rapi.Group(func(rauth chi.Router) { + rauth.Use(middleware.Timeout(10 * time.Second)) rauth.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(s.updateLimiter(), nil))) rauth.Use(authMiddleware.Auth, rejectAnonUser) rauth.Use(logger.New(logger.Log(log.Default()), logger.Prefix("[DEBUG]"), logger.IPfn(ipFn)).Handler) @@ -295,7 +315,8 @@ func (s *Rest) routes() chi.Router { // open routes on root level router.Group(func(rroot chi.Router) { - tollbooth_chi.LimitHandler(tollbooth.NewLimiter(50, nil)) + rroot.Use(middleware.Timeout(10 * time.Second)) + rroot.Use(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(50, nil))) rroot.Get("/index.html", s.pubRest.getStartedCtrl) rroot.Get("/robots.txt", s.pubRest.robotsCtrl) }) @@ -314,6 +335,8 @@ func (s *Rest) controllerGroups() (public, private, admin, rss) { commentFormatter: s.CommentFormatter, readOnlyAge: s.ReadOnlyAge, webRoot: s.WebRoot, + streamTimeOut: s.StreamTimeOut, + streamRefresh: s.StreamRefresh, } privGrp := private{ @@ -416,7 +439,7 @@ func addFileServer(r chi.Router, path string, root http.FileSystem) { } path += "*" - r.With(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(20, nil))). + r.With(tollbooth_chi.LimitHandler(tollbooth.NewLimiter(20, nil)), middleware.Timeout(10*time.Second)). Get(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // don't show dirs, just serve files if strings.HasSuffix(r.URL.Path, "/") && len(r.URL.Path) > 1 && r.URL.Path != (origPath+"/") { diff --git a/backend/app/rest/api/rest_public.go b/backend/app/rest/api/rest_public.go index 3200ca0711..00902cf75d 100644 --- a/backend/app/rest/api/rest_public.go +++ b/backend/app/rest/api/rest_public.go @@ -16,7 +16,6 @@ import ( log "github.com/go-pkgz/lgr" R "github.com/go-pkgz/rest" "github.com/go-pkgz/rest/cache" - "github.com/umputun/remark/backend/app/rest" "github.com/umputun/remark/backend/app/store" "github.com/umputun/remark/backend/app/store/image" @@ -30,6 +29,8 @@ type public struct { commentFormatter *store.CommentFormatter imageService *image.Service webRoot string + streamTimeOut time.Duration + streamRefresh time.Duration } type pubStore interface { @@ -146,6 +147,85 @@ func (s *public) infoCtrl(w http.ResponseWriter, r *http.Request) { } } +// GET /stream/info?site=siteID&url=post-url - get info stream about the post +func (s *public) infoStreamCtrl(w http.ResponseWriter, r *http.Request) { + locator := store.Locator{SiteID: r.URL.Query().Get("site"), URL: r.URL.Query().Get("url")} + log.Printf("[DEBUG] start stream for %+v, timeout=%v, refresh=%v", locator, s.streamTimeOut, s.streamRefresh) + + key := cache.NewKey(locator.SiteID).ID(URLKey(r)).Scopes(locator.SiteID, locator.URL) + lastTS := time.Time{} + lastCount := 0 + info := func() (data []byte, upd bool, err error) { + data, err = s.cache.Get(key, func() ([]byte, error) { + info, e := s.dataService.Info(locator, s.readOnlyAge) + if e != nil { + return nil, e + } + if info.LastTS != lastTS || info.Count != lastCount { + lastTS = info.LastTS + lastCount = info.Count // removal won't update lastTS + upd = true // cache update used as indication of post update. comparing lastTS for no-cache + } + return encodeJSONWithHTML(info) + }) + if err != nil { + return data, false, err + } + + return data, upd, nil + } + + // populate updates to chan, break on remote close + updCh := func() <-chan []byte { + ch := make(chan []byte) + go func() { + tick := time.NewTicker(s.streamRefresh) + defer func() { + close(ch) + tick.Stop() + }() + for { + select { + case <-r.Context().Done(): // request closed by remote client + log.Printf("[DEBUG] info stream closed by remote client, %v", r.Context().Err()) + return + case <-tick.C: + resp, upd, err := info() + if err != nil { + rest.SendErrorJSON(w, r, http.StatusBadRequest, err, "can't get post info", rest.ErrPostNotFound) + return + } + if upd { + ch <- resp + } + } + } + }() + return ch + }() + + for { + select { + case <-r.Context().Done(): // request closed by remote client + return + case <-time.After(s.streamTimeOut): // request closed by timeout + log.Printf("[DEBUG] info stream closed due to timeout") + return + case resp, ok := <-updCh: // new update + if !ok { // closed + return + } + if _, e := w.Write(resp); e != nil { + log.Printf("[WARN] failed to send stream, %v", e) + return + } + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } + } + } +} + // GET /last/{limit}?site=siteID&since=unix_ts_msec - last comments for the siteID, across all posts, sorted by time, optionally // limited with "since" param func (s *public) lastCommentsCtrl(w http.ResponseWriter, r *http.Request) { diff --git a/backend/app/rest/api/rest_public_test.go b/backend/app/rest/api/rest_public_test.go index b0f23ce70b..0e3f00134a 100644 --- a/backend/app/rest/api/rest_public_test.go +++ b/backend/app/rest/api/rest_public_test.go @@ -1,11 +1,13 @@ package api import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" "strings" + "sync" "testing" "time" @@ -524,6 +526,92 @@ func TestRest_Info(t *testing.T) { assert.Equal(t, 400, code) } +func TestRest_InfoStream(t *testing.T) { + ts, srv, teardown := startupT(t) + srv.pubRest.readOnlyAge = 10000000 // make sure we don't hit read-only + srv.pubRest.streamRefresh = 1 * time.Millisecond + srv.pubRest.streamTimeOut = 300 * time.Millisecond + + postComment(t, ts.URL) + + defer teardown() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + postComment(t, ts.URL) + } + }() + + body, code := get(t, ts.URL+"/api/v1/stream/info?site=radio-t&url=https://radio-t.com/blah1") + assert.Equal(t, 200, code) + wg.Wait() + + recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n") + require.Equal(t, 10, len(recs), "10 records") + assert.True(t, strings.Contains(recs[0], `"count":1`), recs[0]) + assert.True(t, strings.Contains(recs[9], `"count":10`), recs[9]) +} + +func TestRest_InfoStreamTimeout(t *testing.T) { + ts, srv, teardown := startupT(t) + defer teardown() + srv.pubRest.readOnlyAge = 10000000 // make sure we don't hit read-only + srv.pubRest.streamRefresh = 10 * time.Millisecond + srv.pubRest.streamTimeOut = 450 * time.Millisecond + + postComment(t, ts.URL) + + st := time.Now() + body, code := get(t, ts.URL+"/api/v1/stream/info?site=radio-t&url=https://radio-t.com/blah1") + assert.Equal(t, 200, code) + assert.True(t, time.Since(st) > time.Millisecond*450 && time.Since(st) < time.Millisecond*500, time.Since(st)) + recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n") + require.True(t, len(recs) < 10, "not all for 10 streamed, only %d", len(recs)) +} + +func TestRest_InfoStreamCancel(t *testing.T) { + ts, srv, teardown := startupT(t) + srv.pubRest.readOnlyAge = 10000000 // make sure we don't hit read-only + srv.pubRest.streamRefresh = 10 * time.Millisecond + srv.pubRest.streamTimeOut = 500 * time.Millisecond + + postComment(t, ts.URL) + + defer teardown() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + postComment(t, ts.URL) + } + }() + + client := http.Client{} + req, err := http.NewRequest("GET", ts.URL+"/api/v1/stream/info?site=radio-t&url=https://radio-t.com/blah1", nil) + require.Nil(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) + defer cancel() + req = req.WithContext(ctx) + r, err := client.Do(req) + require.Nil(t, err) + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + require.EqualError(t, err, "context deadline exceeded") + assert.Equal(t, 200, r.StatusCode) + + wg.Wait() + + recs := strings.Split(strings.TrimSuffix(string(body), "\n"), "\n") + require.Equal(t, 2, len(recs), "2 records") + assert.True(t, strings.Contains(recs[0], `"count":1`), recs[0]) + assert.True(t, strings.Contains(recs[1], `"count":2`), recs[1]) +} + func TestRest_Robots(t *testing.T) { ts, _, teardown := startupT(t) defer teardown() @@ -534,3 +622,12 @@ func TestRest_Robots(t *testing.T) { "Allow: /api/v1/last\nAllow: /api/v1/id\nAllow: /api/v1/count\nAllow: /api/v1/counts\n"+ "Allow: /api/v1/list\nAllow: /api/v1/config\nAllow: /api/v1/img\nAllow: /api/v1/avatar\nAllow: /api/v1/picture\n", string(body)) } + +func postComment(t *testing.T, url string) { + resp, e := post(t, url+"/api/v1/comment", + `{"text": "test 123", "locator":{"url": "https://radio-t.com/blah1", "site": "radio-t"}}`) + require.Nil(t, e) + b, e := ioutil.ReadAll(resp.Body) + require.Nil(t, e) + require.Equal(t, http.StatusCreated, resp.StatusCode, string(b)) +}