From 5264b619519753f852eb86206bb858394c6eadae Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 10 Jun 2020 19:51:36 +0000 Subject: [PATCH] improvement: stop rendering if Context is cancelled Typically this means the http caller has closed the connection, so no point responding to them. Also check at the point we send a response back, and log to OpenTracing. --- app/api_report.go | 12 ++++++------ app/api_topologies.go | 11 +++++++---- app/api_topology.go | 8 ++++---- app/controls.go | 12 ++++++------ app/pipes.go | 4 ++-- app/router.go | 12 ++++++------ app/server_helpers.go | 11 ++++++++++- render/filters.go | 3 +++ render/render.go | 6 ++++++ 9 files changed, 50 insertions(+), 29 deletions(-) diff --git a/app/api_report.go b/app/api_report.go index 82852a18e0..7ab1625285 100644 --- a/app/api_report.go +++ b/app/api_report.go @@ -15,11 +15,11 @@ func makeRawReportHandler(rep Reporter) CtxHandlerFunc { timestamp := deserializeTimestamp(r.URL.Query().Get("timestamp")) rawReport, err := rep.Report(ctx, timestamp) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } censorCfg := report.GetCensorConfigFromRequest(r) - respondWith(w, http.StatusOK, report.CensorRawReport(rawReport, censorCfg)) + respondWith(ctx, w, http.StatusOK, report.CensorRawReport(rawReport, censorCfg)) } } @@ -38,14 +38,14 @@ func makeProbeHandler(rep Reporter) CtxHandlerFunc { // if we have reports, we must have connected probes hasProbes, err := rep.HasReports(ctx, time.Now()) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) } - respondWith(w, http.StatusOK, hasProbes) + respondWith(ctx, w, http.StatusOK, hasProbes) return } rpt, err := rep.Report(ctx, time.Now()) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } result := []probeDesc{} @@ -60,6 +60,6 @@ func makeProbeHandler(rep Reporter) CtxHandlerFunc { LastSeen: dt, }) } - respondWith(w, http.StatusOK, result) + respondWith(ctx, w, http.StatusOK, result) } } diff --git a/app/api_topologies.go b/app/api_topologies.go index 136d6960f6..595241b175 100644 --- a/app/api_topologies.go +++ b/app/api_topologies.go @@ -476,10 +476,10 @@ func (r *Registry) makeTopologyList(rep Reporter) CtxHandlerFunc { timestamp := deserializeTimestamp(req.URL.Query().Get("timestamp")) report, err := rep.Report(ctx, timestamp) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } - respondWith(w, http.StatusOK, r.renderTopologies(ctx, report, req)) + respondWith(ctx, w, http.StatusOK, r.renderTopologies(ctx, report, req)) } } @@ -489,6 +489,9 @@ func (r *Registry) renderTopologies(ctx context.Context, rpt report.Report, req topologies := []APITopologyDesc{} req.ParseForm() r.walk(func(desc APITopologyDesc) { + if ctx.Err() != nil { + return + } renderer, filter, _ := r.RendererForTopology(desc.id, req.Form, rpt) desc.Stats = computeStats(ctx, rpt, renderer, filter) for i, sub := range desc.SubTopologies { @@ -573,13 +576,13 @@ func (r *Registry) captureRenderer(rep Reporter, f rendererHandler) CtxHandlerFu } rpt, err := rep.Report(ctx, timestamp) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } req.ParseForm() renderer, filter, err := r.RendererForTopology(topologyID, req.Form, rpt) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } f(ctx, renderer, filter, RenderContextForReporter(rep, rpt), w, req) diff --git a/app/api_topology.go b/app/api_topology.go index 697c3d6ca6..9a811bcfab 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -48,7 +48,7 @@ type rendererHandler func(context.Context, render.Renderer, render.Transformer, func handleTopology(ctx context.Context, renderer render.Renderer, transformer render.Transformer, rc detailed.RenderContext, w http.ResponseWriter, r *http.Request) { censorCfg := report.GetCensorConfigFromRequest(r) nodeSummaries := detailed.Summaries(ctx, rc, render.Render(ctx, rc.Report, renderer, transformer).Nodes) - respondWith(w, http.StatusOK, APITopology{ + respondWith(ctx, w, http.StatusOK, APITopology{ Nodes: detailed.CensorNodeSummaries(nodeSummaries, censorCfg), }) } @@ -80,7 +80,7 @@ func handleNode(ctx context.Context, renderer render.Renderer, transformer rende nodes.Filtered-- } rawNode := detailed.MakeNode(topologyID, rc, nodes.Nodes, node) - respondWith(w, http.StatusOK, APINode{Node: detailed.CensorNode(rawNode, censorCfg)}) + respondWith(ctx, w, http.StatusOK, APINode{Node: detailed.CensorNode(rawNode, censorCfg)}) } // Websocket for the full topology. @@ -91,14 +91,14 @@ func handleWebsocket( r *http.Request, ) { if err := r.ParseForm(); err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } loop := websocketLoop if t := r.Form.Get("t"); t != "" { var err error if loop, err = time.ParseDuration(t); err != nil { - respondWith(w, http.StatusBadRequest, t) + respondWith(ctx, w, http.StatusBadRequest, t) return } } diff --git a/app/controls.go b/app/controls.go index 6e8318e3f6..644489c04e 100644 --- a/app/controls.go +++ b/app/controls.go @@ -41,7 +41,7 @@ func handleControl(cr ControlRouter) CtxHandlerFunc { err := codec.NewDecoder(r.Body, &codec.JsonHandle{}).Decode(&controlArgs) defer r.Body.Close() if err != nil { - respondWith(w, http.StatusBadRequest, err) + respondWith(ctx, w, http.StatusBadRequest, err) return } } @@ -52,14 +52,14 @@ func handleControl(cr ControlRouter) CtxHandlerFunc { ControlArgs: controlArgs, }) if err != nil { - respondWith(w, http.StatusBadRequest, err.Error()) + respondWith(ctx, w, http.StatusBadRequest, err.Error()) return } if result.Error != "" { - respondWith(w, http.StatusBadRequest, result.Error) + respondWith(ctx, w, http.StatusBadRequest, result.Error) return } - respondWith(w, http.StatusOK, result) + respondWith(ctx, w, http.StatusOK, result) } } @@ -69,7 +69,7 @@ func handleProbeWS(cr ControlRouter) CtxHandlerFunc { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { probeID := r.Header.Get(xfer.ScopeProbeIDHeader) if probeID == "" { - respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) + respondWith(ctx, w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) return } @@ -92,7 +92,7 @@ func handleProbeWS(cr ControlRouter) CtxHandlerFunc { return res }) if err != nil { - respondWith(w, http.StatusBadRequest, err) + respondWith(ctx, w, http.StatusBadRequest, err) return } defer cr.Deregister(ctx, probeID, id) diff --git a/app/pipes.go b/app/pipes.go index 994ebd27aa..c057853c5d 100644 --- a/app/pipes.go +++ b/app/pipes.go @@ -39,7 +39,7 @@ func checkPipe(pr PipeRouter) CtxHandlerFunc { id := mux.Vars(r)["pipeID"] exists, err := pr.Exists(ctx, id) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) } else if exists { w.WriteHeader(http.StatusNoContent) } else { @@ -81,7 +81,7 @@ func deletePipe(pr PipeRouter) CtxHandlerFunc { pipeID := mux.Vars(r)["pipeID"] log.Debugf("Deleting pipe %s", pipeID) if err := pr.Delete(ctx, pipeID); err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) } } } diff --git a/app/router.go b/app/router.go index 24bcf17497..b030b91b0d 100644 --- a/app/router.go +++ b/app/router.go @@ -133,13 +133,13 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { case strings.HasPrefix(contentType, "application/json"): isMsgpack = false default: - respondWith(w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType)) + respondWith(ctx, w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType)) return } rpt, err := report.MakeFromBinary(ctx, reader, gzipped, isMsgpack) if err != nil { - respondWith(w, http.StatusBadRequest, err) + respondWith(ctx, w, http.StatusBadRequest, err) return } @@ -150,7 +150,7 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { if err := a.Add(ctx, *rpt, buf.Bytes()); err != nil { log.Errorf("Error Adding report: %v", err) - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } w.WriteHeader(http.StatusOK) @@ -163,7 +163,7 @@ func RegisterAdminRoutes(router *mux.Router, reporter Reporter) { get.Handle("/admin/summary", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { summary, err := reporter.AdminSummary(ctx, time.Now()) if err != nil { - respondWith(w, http.StatusBadRequest, err) + respondWith(ctx, w, http.StatusBadRequest, err) } fmt.Fprintln(w, summary) })) @@ -188,12 +188,12 @@ func apiHandler(rep Reporter, capabilities map[string]bool) CtxHandlerFunc { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { report, err := rep.Report(ctx, time.Now()) if err != nil { - respondWith(w, http.StatusInternalServerError, err) + respondWith(ctx, w, http.StatusInternalServerError, err) return } newVersion.Lock() defer newVersion.Unlock() - respondWith(w, http.StatusOK, xfer.Details{ + respondWith(ctx, w, http.StatusOK, xfer.Details{ ID: UniqueID, Version: Version, Hostname: hostname.Get(), diff --git a/app/server_helpers.go b/app/server_helpers.go index fffee58588..a9970c7041 100644 --- a/app/server_helpers.go +++ b/app/server_helpers.go @@ -1,19 +1,28 @@ package app import ( + "context" "net/http" + opentracing "github.com/opentracing/opentracing-go" "github.com/ugorji/go/codec" log "github.com/sirupsen/logrus" ) -func respondWith(w http.ResponseWriter, code int, response interface{}) { +func respondWith(ctx context.Context, w http.ResponseWriter, code int, response interface{}) { if err, ok := response.(error); ok { log.Errorf("Error %d: %v", code, err) response = err.Error() } else if 500 <= code && code < 600 { log.Errorf("Non-error %d: %v", code, response) + } else if ctx.Err() != nil { + log.Debugf("Context error %v", ctx.Err()) + code = 499 + response = nil + } + if span := opentracing.SpanFromContext(ctx); span != nil { + span.LogKV("response-code", code) } w.Header().Set("Content-Type", "application/json") diff --git a/render/filters.go b/render/filters.go index b4e7b2096f..0daa6fbe79 100644 --- a/render/filters.go +++ b/render/filters.go @@ -23,6 +23,9 @@ type CustomRenderer struct { // Render implements Renderer func (c CustomRenderer) Render(ctx context.Context, rpt report.Report) Nodes { + if ctx.Err() != nil { + return Nodes{} + } return c.RenderFunc(c.Renderer.Render(ctx, rpt)) } diff --git a/render/render.go b/render/render.go index 4327277dec..e0da4de53e 100644 --- a/render/render.go +++ b/render/render.go @@ -70,6 +70,9 @@ func MakeReduce(renderers ...Renderer) Renderer { // Render produces a set of Nodes given a Report. func (r Reduce) Render(ctx context.Context, rpt report.Report) Nodes { + if ctx.Err() != nil { + return Nodes{} + } span, ctx := opentracing.StartSpanFromContext(ctx, "Reduce.Render") defer span.Finish() l := len(r) @@ -110,6 +113,9 @@ func MakeMap(f MapFunc, r Renderer) Renderer { // Render transforms a set of Nodes produces by another Renderer. // using a map function func (m Map) Render(ctx context.Context, rpt report.Report) Nodes { + if ctx.Err() != nil { + return Nodes{} + } span, ctx := opentracing.StartSpanFromContext(ctx, "Map.Render:"+functionName(m.MapFunc)) defer span.Finish() var (