Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement: stop rendering if Context is cancelled #3801

Merged
merged 1 commit into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions app/api_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ func makeRawReportHandler(rep Reporter) CtxHandlerFunc {
timestamp := deserializeTimestamp(r.URL.Query().Get("timestamp"))
rawReport, err := rep.Report(ctx, timestamp)
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
censorCfg := report.GetCensorConfigFromRequest(r)
respondWith(w, http.StatusOK, report.CensorRawReport(rawReport, censorCfg))
respondWith(ctx, w, http.StatusOK, report.CensorRawReport(rawReport, censorCfg))
}
}

Expand All @@ -38,14 +38,14 @@ func makeProbeHandler(rep Reporter) CtxHandlerFunc {
// if we have reports, we must have connected probes
hasProbes, err := rep.HasReports(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
}
respondWith(w, http.StatusOK, hasProbes)
respondWith(ctx, w, http.StatusOK, hasProbes)
return
}
rpt, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
result := []probeDesc{}
Expand All @@ -60,6 +60,6 @@ func makeProbeHandler(rep Reporter) CtxHandlerFunc {
LastSeen: dt,
})
}
respondWith(w, http.StatusOK, result)
respondWith(ctx, w, http.StatusOK, result)
}
}
11 changes: 7 additions & 4 deletions app/api_topologies.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,10 @@ func (r *Registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
timestamp := deserializeTimestamp(req.URL.Query().Get("timestamp"))
report, err := rep.Report(ctx, timestamp)
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
respondWith(w, http.StatusOK, r.renderTopologies(ctx, report, req))
respondWith(ctx, w, http.StatusOK, r.renderTopologies(ctx, report, req))
}
}

Expand All @@ -489,6 +489,9 @@ func (r *Registry) renderTopologies(ctx context.Context, rpt report.Report, req
topologies := []APITopologyDesc{}
req.ParseForm()
r.walk(func(desc APITopologyDesc) {
if ctx.Err() != nil {
return
}
renderer, filter, _ := r.RendererForTopology(desc.id, req.Form, rpt)
desc.Stats = computeStats(ctx, rpt, renderer, filter)
for i, sub := range desc.SubTopologies {
Expand Down Expand Up @@ -573,13 +576,13 @@ func (r *Registry) captureRenderer(rep Reporter, f rendererHandler) CtxHandlerFu
}
rpt, err := rep.Report(ctx, timestamp)
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
req.ParseForm()
renderer, filter, err := r.RendererForTopology(topologyID, req.Form, rpt)
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
f(ctx, renderer, filter, RenderContextForReporter(rep, rpt), w, req)
Expand Down
8 changes: 4 additions & 4 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type rendererHandler func(context.Context, render.Renderer, render.Transformer,
func handleTopology(ctx context.Context, renderer render.Renderer, transformer render.Transformer, rc detailed.RenderContext, w http.ResponseWriter, r *http.Request) {
censorCfg := report.GetCensorConfigFromRequest(r)
nodeSummaries := detailed.Summaries(ctx, rc, render.Render(ctx, rc.Report, renderer, transformer).Nodes)
respondWith(w, http.StatusOK, APITopology{
respondWith(ctx, w, http.StatusOK, APITopology{
Nodes: detailed.CensorNodeSummaries(nodeSummaries, censorCfg),
})
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func handleNode(ctx context.Context, renderer render.Renderer, transformer rende
nodes.Filtered--
}
rawNode := detailed.MakeNode(topologyID, rc, nodes.Nodes, node)
respondWith(w, http.StatusOK, APINode{Node: detailed.CensorNode(rawNode, censorCfg)})
respondWith(ctx, w, http.StatusOK, APINode{Node: detailed.CensorNode(rawNode, censorCfg)})
}

// Websocket for the full topology.
Expand All @@ -91,14 +91,14 @@ func handleWebsocket(
r *http.Request,
) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
loop := websocketLoop
if t := r.Form.Get("t"); t != "" {
var err error
if loop, err = time.ParseDuration(t); err != nil {
respondWith(w, http.StatusBadRequest, t)
respondWith(ctx, w, http.StatusBadRequest, t)
return
}
}
Expand Down
12 changes: 6 additions & 6 deletions app/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func handleControl(cr ControlRouter) CtxHandlerFunc {
err := codec.NewDecoder(r.Body, &codec.JsonHandle{}).Decode(&controlArgs)
defer r.Body.Close()
if err != nil {
respondWith(w, http.StatusBadRequest, err)
respondWith(ctx, w, http.StatusBadRequest, err)
return
}
}
Expand All @@ -52,14 +52,14 @@ func handleControl(cr ControlRouter) CtxHandlerFunc {
ControlArgs: controlArgs,
})
if err != nil {
respondWith(w, http.StatusBadRequest, err.Error())
respondWith(ctx, w, http.StatusBadRequest, err.Error())
return
}
if result.Error != "" {
respondWith(w, http.StatusBadRequest, result.Error)
respondWith(ctx, w, http.StatusBadRequest, result.Error)
return
}
respondWith(w, http.StatusOK, result)
respondWith(ctx, w, http.StatusOK, result)
}
}

Expand All @@ -69,7 +69,7 @@ func handleProbeWS(cr ControlRouter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
probeID := r.Header.Get(xfer.ScopeProbeIDHeader)
if probeID == "" {
respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader)
respondWith(ctx, w, http.StatusBadRequest, xfer.ScopeProbeIDHeader)
return
}

Expand All @@ -92,7 +92,7 @@ func handleProbeWS(cr ControlRouter) CtxHandlerFunc {
return res
})
if err != nil {
respondWith(w, http.StatusBadRequest, err)
respondWith(ctx, w, http.StatusBadRequest, err)
return
}
defer cr.Deregister(ctx, probeID, id)
Expand Down
4 changes: 2 additions & 2 deletions app/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func checkPipe(pr PipeRouter) CtxHandlerFunc {
id := mux.Vars(r)["pipeID"]
exists, err := pr.Exists(ctx, id)
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
} else if exists {
w.WriteHeader(http.StatusNoContent)
} else {
Expand Down Expand Up @@ -81,7 +81,7 @@ func deletePipe(pr PipeRouter) CtxHandlerFunc {
pipeID := mux.Vars(r)["pipeID"]
log.Debugf("Deleting pipe %s", pipeID)
if err := pr.Delete(ctx, pipeID); err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
}
}
}
12 changes: 6 additions & 6 deletions app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) {
case strings.HasPrefix(contentType, "application/json"):
isMsgpack = false
default:
respondWith(w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType))
respondWith(ctx, w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType))
return
}

rpt, err := report.MakeFromBinary(ctx, reader, gzipped, isMsgpack)
if err != nil {
respondWith(w, http.StatusBadRequest, err)
respondWith(ctx, w, http.StatusBadRequest, err)
return
}

Expand All @@ -150,7 +150,7 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) {

if err := a.Add(ctx, *rpt, buf.Bytes()); err != nil {
log.Errorf("Error Adding report: %v", err)
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
w.WriteHeader(http.StatusOK)
Expand All @@ -163,7 +163,7 @@ func RegisterAdminRoutes(router *mux.Router, reporter Reporter) {
get.Handle("/admin/summary", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
summary, err := reporter.AdminSummary(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusBadRequest, err)
respondWith(ctx, w, http.StatusBadRequest, err)
}
fmt.Fprintln(w, summary)
}))
Expand All @@ -188,12 +188,12 @@ func apiHandler(rep Reporter, capabilities map[string]bool) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
report, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
respondWith(ctx, w, http.StatusInternalServerError, err)
return
}
newVersion.Lock()
defer newVersion.Unlock()
respondWith(w, http.StatusOK, xfer.Details{
respondWith(ctx, w, http.StatusOK, xfer.Details{
ID: UniqueID,
Version: Version,
Hostname: hostname.Get(),
Expand Down
11 changes: 10 additions & 1 deletion app/server_helpers.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package app

import (
"context"
"net/http"

opentracing "github.com/opentracing/opentracing-go"
"github.com/ugorji/go/codec"

log "github.com/sirupsen/logrus"
)

func respondWith(w http.ResponseWriter, code int, response interface{}) {
func respondWith(ctx context.Context, w http.ResponseWriter, code int, response interface{}) {
if err, ok := response.(error); ok {
log.Errorf("Error %d: %v", code, err)
response = err.Error()
} else if 500 <= code && code < 600 {
log.Errorf("Non-error %d: %v", code, response)
} else if ctx.Err() != nil {
log.Debugf("Context error %v", ctx.Err())
code = 499
response = nil
}
if span := opentracing.SpanFromContext(ctx); span != nil {
span.LogKV("response-code", code)
}

w.Header().Set("Content-Type", "application/json")
Expand Down
3 changes: 3 additions & 0 deletions render/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type CustomRenderer struct {

// Render implements Renderer
func (c CustomRenderer) Render(ctx context.Context, rpt report.Report) Nodes {
if ctx.Err() != nil {
return Nodes{}
}
return c.RenderFunc(c.Renderer.Render(ctx, rpt))
}

Expand Down
6 changes: 6 additions & 0 deletions render/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func MakeReduce(renderers ...Renderer) Renderer {

// Render produces a set of Nodes given a Report.
func (r Reduce) Render(ctx context.Context, rpt report.Report) Nodes {
if ctx.Err() != nil {
return Nodes{}
}
span, ctx := opentracing.StartSpanFromContext(ctx, "Reduce.Render")
defer span.Finish()
l := len(r)
Expand Down Expand Up @@ -110,6 +113,9 @@ func MakeMap(f MapFunc, r Renderer) Renderer {
// Render transforms a set of Nodes produces by another Renderer.
// using a map function
func (m Map) Render(ctx context.Context, rpt report.Report) Nodes {
if ctx.Err() != nil {
return Nodes{}
}
span, ctx := opentracing.StartSpanFromContext(ctx, "Map.Render:"+functionName(m.MapFunc))
defer span.Finish()
var (
Expand Down