diff --git a/lib/events/complete.go b/lib/events/complete.go index c7594c61521c3..edd671c438d86 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -26,6 +26,7 @@ import ( apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/interval" "github.com/gravitational/trace" @@ -105,11 +106,16 @@ type UploadCompleter struct { } func (u *UploadCompleter) run() { - ticker := u.cfg.Clock.NewTicker(u.cfg.CheckPeriod) - defer ticker.Stop() + periodic := interval.New(interval.Config{ + Duration: u.cfg.CheckPeriod, + FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod), + Jitter: utils.NewSeventhJitter(), + }) + defer periodic.Stop() + for { select { - case <-ticker.Chan(): + case <-periodic.Next(): if err := u.CheckUploads(u.closeCtx); err != nil { u.log.WithError(err).Warningf("Failed to check uploads.") } @@ -134,6 +140,10 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { } parts, err := u.cfg.Uploader.ListParts(ctx, upload) if err != nil { + if trace.IsNotFound(err) { + u.log.WithError(err).Warnf("Missing parts for upload %v. Moving on to next upload.", upload.ID) + continue + } return trace.Wrap(err) } @@ -161,7 +171,7 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { case <-u.cfg.Clock.After(2 * time.Minute): u.log.Debugf("checking for session end event for session %v", upload.SessionID) if err := u.ensureSessionEndEvent(ctx, uploadData); err != nil { - u.log.WithError(err).Warningf("failed to ensure session end event") + u.log.WithError(err).Warningf("failed to ensure session end event for session %v", upload.SessionID) } } }() @@ -201,8 +211,6 @@ func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData var sshSessionEnd events.SessionEnd var desktopSessionEnd events.WindowsDesktopSessionEnd - first := true - // We use the streaming events API to search through the session events, because it works // for both Desktop and SSH sessions, where as the GetSessionEvents API relies on downloading // a copy of the session and using the SSH-specific index to iterate through events. @@ -217,11 +225,6 @@ loop: break loop } - if first { - u.log.Infof("got first event %T", evt) - first = false - } - lastEvent = evt switch e := evt.(type) { @@ -271,6 +274,10 @@ loop: } } + if lastEvent == nil { + return trace.Errorf("could not find any events for session %v", uploadData.SessionID) + } + sshSessionEnd.Participants = apiutils.Deduplicate(sshSessionEnd.Participants) sshSessionEnd.EndTime = lastEvent.GetTime() desktopSessionEnd.EndTime = lastEvent.GetTime()