Skip to content

Commit

Permalink
Merge pull request #3682 from weaveworks/websocket-tracing
Browse files Browse the repository at this point in the history
Websocket tracing spans
  • Loading branch information
bboreham authored Sep 17, 2019
2 parents a811afd + 4e8000c commit 938d594
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 43 deletions.
114 changes: 71 additions & 43 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package app

import (
"net/http"
"net/url"
"time"

"context"

"github.com/gorilla/mux"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/weaveworks/scope/common/xfer"
Expand Down Expand Up @@ -119,54 +123,24 @@ func handleWebsocket(
}
}(conn)

var (
previousTopo detailed.NodeSummaries
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
topologyID = mux.Vars(r)["topology"]
startReportingAt = deserializeTimestamp(r.Form.Get("timestamp"))
censorCfg = report.GetCensorConfigFromRequest(r)
channelOpenedAt = time.Now()
)
wc := websocketState{
rep: rep,
values: r.Form,
conn: conn,
topologyID: mux.Vars(r)["topology"],
startReportingAt: deserializeTimestamp(r.Form.Get("timestamp")),
censorCfg: report.GetCensorConfigFromRequest(r),
channelOpenedAt: time.Now(),
}

wait := make(chan struct{}, 1)
rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)

tick := time.Tick(loop)
for {
// We measure how much time has passed since the channel was opened
// and add it to the initial report timestamp to get the timestamp
// of the snapshot we want to report right now.
// NOTE: Multiplying `timestampDelta` by a constant factor here
// would have an effect of fast-forward, which is something we
// might be interested in implementing in the future.
timestampDelta := time.Since(channelOpenedAt)
reportTimestamp := startReportingAt.Add(timestampDelta)
re, err := rep.Report(ctx, reportTimestamp)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
}
renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, r.Form, re)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
}

newTopo := detailed.CensorNodeSummaries(
detailed.Summaries(
ctx,
RenderContextForReporter(rep, re),
render.Render(ctx, re, renderer, filter).Nodes,
),
censorCfg,
)
diff := detailed.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo

if err := conn.WriteJSON(diff); err != nil {
if !xfer.IsExpectedWSCloseError(err) {
log.Errorf("cannot serialize topology diff: %s", err)
}
if err := wc.update(ctx); err != nil {
log.Errorf("%v", err)
return
}

Expand All @@ -178,3 +152,57 @@ func handleWebsocket(
}
}
}

type websocketState struct {
rep Reporter
values url.Values
conn xfer.Websocket
previousTopo detailed.NodeSummaries
topologyID string
startReportingAt time.Time
reportTimestamp time.Time
censorCfg report.CensorConfig
channelOpenedAt time.Time
}

func (wc *websocketState) update(ctx context.Context) error {
span := ot.StartSpan("websocket.Render", ot.Tag{"topology", wc.topologyID})
defer span.Finish()
ctx = ot.ContextWithSpan(ctx, span)
// We measure how much time has passed since the channel was opened
// and add it to the initial report timestamp to get the timestamp
// of the snapshot we want to report right now.
// NOTE: Multiplying `timestampDelta` by a constant factor here
// would have an effect of fast-forward, which is something we
// might be interested in implementing in the future.
timestampDelta := time.Since(wc.channelOpenedAt)
reportTimestamp := wc.startReportingAt.Add(timestampDelta)
span.LogFields(otlog.String("opened-at", wc.channelOpenedAt.String()),
otlog.String("timestamp", reportTimestamp.String()))
re, err := wc.rep.Report(ctx, reportTimestamp)
if err != nil {
return errors.Wrapf(err, "Error generating report: %v")
}
renderer, filter, err := topologyRegistry.RendererForTopology(wc.topologyID, wc.values, re)
if err != nil {
return errors.Wrapf(err, "Error generating report: %v")
}

newTopo := detailed.CensorNodeSummaries(
detailed.Summaries(
ctx,
RenderContextForReporter(wc.rep, re),
render.Render(ctx, re, renderer, filter).Nodes,
),
wc.censorCfg,
)
diff := detailed.TopoDiff(wc.previousTopo, newTopo)
wc.previousTopo = newTopo

if err := wc.conn.WriteJSON(diff); err != nil {
if !xfer.IsExpectedWSCloseError(err) {
return errors.Wrapf(err, "cannot serialize topology diff: %s")
}
}
return nil
}
1 change: 1 addition & 0 deletions app/multitenant/aws_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.
if err != nil {
return report.MakeReport(), err
}
span.SetTag("userid", userid)
end := timestamp
start := end.Add(-c.cfg.Window)
reportKeys, err := c.getReportKeys(ctx, userid, start, end)
Expand Down

0 comments on commit 938d594

Please sign in to comment.