From ba913c1f8c6b0eb233c41bc20772e71dc4cf02f9 Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Fri, 1 Apr 2022 14:21:53 -0400 Subject: [PATCH] Spread out `UploadCompleter` load (#11590) * Spread out UploadCompleter load Replaces the use of a `Ticker` with an `Interval` to reduce the chance of auth servers `UploadCompleter` from synchronizing `run` loops. Without this auth servers are essentially racing against each other to upload any unfinished uploads. This was causing an increase in network utilization every 10 mins. This also prevents any missing parts of an upload from prematurely ending the unfinished upload process. Doing so only causes a back log of uploads that will never be completed during any of the subsequent calls to `CheckUploads` in the future. (cherry picked from commit d3de6c489dd93da8ddc0baca2b37042a2d98b65c) --- lib/events/complete.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/lib/events/complete.go b/lib/events/complete.go index c7594c61521c3..edd671c438d86 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -26,6 +26,7 @@ import ( apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/interval" "github.com/gravitational/trace" @@ -105,11 +106,16 @@ type UploadCompleter struct { } func (u *UploadCompleter) run() { - ticker := u.cfg.Clock.NewTicker(u.cfg.CheckPeriod) - defer ticker.Stop() + periodic := interval.New(interval.Config{ + Duration: u.cfg.CheckPeriod, + FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod), + Jitter: utils.NewSeventhJitter(), + }) + defer periodic.Stop() + for { select { - case <-ticker.Chan(): + case <-periodic.Next(): if err := u.CheckUploads(u.closeCtx); err != nil { u.log.WithError(err).Warningf("Failed to check uploads.") } @@ -134,6 +140,10 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { } parts, err := u.cfg.Uploader.ListParts(ctx, upload) if err != nil { + if trace.IsNotFound(err) { + u.log.WithError(err).Warnf("Missing parts for upload %v. Moving on to next upload.", upload.ID) + continue + } return trace.Wrap(err) } @@ -161,7 +171,7 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { case <-u.cfg.Clock.After(2 * time.Minute): u.log.Debugf("checking for session end event for session %v", upload.SessionID) if err := u.ensureSessionEndEvent(ctx, uploadData); err != nil { - u.log.WithError(err).Warningf("failed to ensure session end event") + u.log.WithError(err).Warningf("failed to ensure session end event for session %v", upload.SessionID) } } }() @@ -201,8 +211,6 @@ func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData var sshSessionEnd events.SessionEnd var desktopSessionEnd events.WindowsDesktopSessionEnd - first := true - // We use the streaming events API to search through the session events, because it works // for both Desktop and SSH sessions, where as the GetSessionEvents API relies on downloading // a copy of the session and using the SSH-specific index to iterate through events. @@ -217,11 +225,6 @@ loop: break loop } - if first { - u.log.Infof("got first event %T", evt) - first = false - } - lastEvent = evt switch e := evt.(type) { @@ -271,6 +274,10 @@ loop: } } + if lastEvent == nil { + return trace.Errorf("could not find any events for session %v", uploadData.SessionID) + } + sshSessionEnd.Participants = apiutils.Deduplicate(sshSessionEnd.Participants) sshSessionEnd.EndTime = lastEvent.GetTime() desktopSessionEnd.EndTime = lastEvent.GetTime()