From 161974f159afddef16add91fbc8582549fa82a6d Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 5 May 2022 12:07:17 -0700 Subject: [PATCH 1/5] Re-add grace period to Upload completer for backwards compatibility. Provide grace period to upload completer in Proxy and Auth services so that proxy/sync recording works with old nodes. Further decouple uploader from upload completer so that they can be configured as standalone processes more easily. --- lib/defaults/defaults.go | 5 + lib/events/api.go | 5 +- lib/events/complete.go | 60 ++++++++---- lib/events/complete_test.go | 61 +++++++++++- lib/events/filesessions/fileasync.go | 96 +++++++------------ .../filesessions/fileasync_chaos_test.go | 10 +- lib/events/filesessions/fileasync_test.go | 15 ++- lib/events/filesessions/filestream.go | 15 ++- lib/events/gcssessions/gcsstream.go | 5 +- lib/events/s3sessions/s3stream.go | 6 ++ lib/events/stream.go | 20 +++- lib/service/db.go | 10 +- lib/service/kubernetes.go | 10 +- lib/service/service.go | 77 ++++++++++++--- 14 files changed, 281 insertions(+), 114 deletions(-) diff --git a/lib/defaults/defaults.go b/lib/defaults/defaults.go index cc503d88c950c..1f63298b4ae3a 100644 --- a/lib/defaults/defaults.go +++ b/lib/defaults/defaults.go @@ -292,6 +292,11 @@ const ( // AbandonedUploadPollingRate defines how often to check for // abandoned uploads which need to be completed. AbandonedUploadPollingRate = SessionTrackerTTL / 6 + + // UploadGracePeriod is a period after which non-completed + // upload is considered abandoned and will be completed by the reconciler + // DELETE IN 11.0.0 + UploadGracePeriod = 24 * time.Hour ) var ( diff --git a/lib/events/api.go b/lib/events/api.go index 0dda1f215cb3b..bc1637620ca8f 100644 --- a/lib/events/api.go +++ b/lib/events/api.go @@ -614,11 +614,14 @@ type StreamUpload struct { ID string // SessionID is a session ID of the upload SessionID session.ID + // Initiated contains the timestamp of when the upload + // was initiated, not always initialized + Initiated time.Time } // String returns user friendly representation of the upload func (u StreamUpload) String() string { - return fmt.Sprintf("Upload(session=%v, id=%v)", u.SessionID, u.ID) + return fmt.Sprintf("Upload(session=%v, id=%v, initiated=%v)", u.SessionID, u.ID, u.Initiated) } // CheckAndSetDefaults checks and sets default values diff --git a/lib/events/complete.go b/lib/events/complete.go index 3ab981c01c883..bd0301b8cdd16 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -51,6 +51,10 @@ type UploadCompleterConfig struct { CheckPeriod time.Duration // Clock is used to override clock in tests Clock clockwork.Clock + // DELETE IN 11.0.0 in favor of SessionTrackerService + // GracePeriod is the period after which an upload's session + // tracker will be check to see if it's an abandoned upload. + GracePeriod time.Duration } // CheckAndSetDefaults checks and sets default values @@ -73,20 +77,8 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error { return nil } -// StartNewUploadCompleter starts an upload completer background process. It can -// be closed by closing the provided context. -func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error { - uc, err := newUploadCompleter(cfg) - if err != nil { - return trace.Wrap(err) - } - go uc.start(ctx) - return nil -} - -// newUploadCompleter returns a new instance of the upload completer without -// starting it. Useful in tests. -func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) { +// NewUploadCompleter returns a new UploadCompleter. +func NewUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } @@ -95,19 +87,37 @@ func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) { log: log.WithFields(log.Fields{ trace.Component: teleport.Component(cfg.Component, "completer"), }), + closeC: make(chan struct{}), } return u, nil } +// StartNewUploadCompleter starts an upload completer background process that will +// will close once the provided ctx is closed. +func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error { + uc, err := NewUploadCompleter(cfg) + if err != nil { + return trace.Wrap(err) + } + go uc.Serve(ctx) + return nil +} + // UploadCompleter periodically scans uploads that have not been completed // and completes them type UploadCompleter struct { - cfg UploadCompleterConfig - log *log.Entry + cfg UploadCompleterConfig + log *log.Entry + closeC chan struct{} +} + +// Close stops the UploadCompleter +func (u *UploadCompleter) Close() { + close(u.closeC) } -// start starts a goroutine to periodically check for and complete abandoned uploads -func (u *UploadCompleter) start(ctx context.Context) { +// Serve runs the upload completer until closed or until ctx is cancelled. +func (u *UploadCompleter) Serve(ctx context.Context) error { periodic := interval.New(interval.Config{ Duration: u.cfg.CheckPeriod, FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod), @@ -121,8 +131,10 @@ func (u *UploadCompleter) start(ctx context.Context) { if err := u.checkUploads(ctx); err != nil { u.log.WithError(err).Warningf("Failed to check uploads.") } + case <-u.closeC: + return nil case <-ctx.Done(): - return + return trace.Wrap(ctx.Err(), "Context canceled") } } } @@ -153,6 +165,16 @@ func (u *UploadCompleter) checkUploads(ctx context.Context) error { // Complete upload for any uploads without an active session tracker for _, upload := range uploads { + // DELETE IN 11.0.0 + // To support v9/v8 versions which do not use SessionTrackerService, + // sessions are only considered abandoned after the provided grace period. + if u.cfg.GracePeriod != 0 { + gracePoint := upload.Initiated.Add(u.cfg.GracePeriod) + if gracePoint.After(u.cfg.Clock.Now()) { + return nil + } + } + if apiutils.SliceContainsStr(activeSessionIDs, upload.SessionID.String()) { continue } diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go index 6f13de20ac6a4..2beca33c85d4c 100644 --- a/lib/events/complete_test.go +++ b/lib/events/complete_test.go @@ -57,7 +57,7 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { MockTrackers: []types.SessionTracker{sessionTracker}, } - uc, err := newUploadCompleter(UploadCompleterConfig{ + uc, err := NewUploadCompleter(UploadCompleterConfig{ Uploader: mu, AuditLog: log, SessionTracker: sessionTrackerService, @@ -79,6 +79,63 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { require.True(t, mu.uploads[upload.ID].completed) } +// TestUploadCompleterWithGracePeriod verifies that the upload completer +// completes uploads that have lived past the configured grace period. +// DELETE IN 11.0.0 +func TestUploadCompleterWithGracePeriod(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := NewMemoryUploader() + mu.Clock = clock + + log := &mockAuditLog{} + + sessionID := session.NewID() + expires := clock.Now().Add(time.Hour * 1) + sessionTracker := &types.SessionTrackerV1{ + Spec: types.SessionTrackerSpecV1{ + SessionID: string(sessionID), + }, + ResourceHeader: types.ResourceHeader{ + Metadata: types.Metadata{ + Expires: &expires, + }, + }, + } + + sessionTrackerService := &eventstest.MockSessionTrackerService{ + Clock: clock, + MockTrackers: []types.SessionTracker{sessionTracker}, + } + + uc, err := NewUploadCompleter(UploadCompleterConfig{ + Uploader: mu, + AuditLog: log, + SessionTracker: sessionTrackerService, + Clock: clock, + GracePeriod: 2 * time.Hour, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), sessionID) + require.NoError(t, err) + + err = uc.checkUploads(context.Background()) + require.NoError(t, err) + require.False(t, mu.uploads[upload.ID].completed) + + // Even if session tracker is expired/not found, the completer + // should wait until the grace period to complete it + clock.Advance(1 * time.Hour) + err = uc.checkUploads(context.Background()) + require.NoError(t, err) + require.False(t, mu.uploads[upload.ID].completed) + + clock.Advance(1 * time.Hour) + err = uc.checkUploads(context.Background()) + require.NoError(t, err) + require.True(t, mu.uploads[upload.ID].completed) +} + // TestUploadCompleterEmitsSessionEnd verifies that the upload completer // emits session.end or windows.desktop.session.end events for sessions // that are completed. @@ -99,7 +156,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) { sessionEvents: []apievents.AuditEvent{test.startEvent}, } - uc, err := newUploadCompleter(UploadCompleterConfig{ + uc, err := NewUploadCompleter(UploadCompleterConfig{ Uploader: mu, AuditLog: log, Clock: clock, diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index 752e17e2fac9a..a1bb4b441a278 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -30,7 +30,6 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" - "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/utils" @@ -45,8 +44,6 @@ type UploaderConfig struct { ScanDir string // Clock is the clock replacement Clock clockwork.Clock - // Context is an optional context - Context context.Context // ScanPeriod is a uploader dir scan period ScanPeriod time.Duration // ConcurrentUploads sets up how many parallel uploads to schedule @@ -79,9 +76,6 @@ func (cfg *UploaderConfig) CheckAndSetDefaults() error { if cfg.ScanPeriod <= 0 { cfg.ScanPeriod = defaults.UploaderScanPeriod } - if cfg.Context == nil { - cfg.Context = context.Background() - } if cfg.Clock == nil { cfg.Clock = clockwork.NewRealClock() } @@ -92,42 +86,22 @@ func (cfg *UploaderConfig) CheckAndSetDefaults() error { } // NewUploader creates new disk based session logger -func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerService) (*Uploader, error) { +func NewUploader(cfg UploaderConfig) (*Uploader, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - handler, err := NewHandler(Config{ - Directory: cfg.ScanDir, - }) - if err != nil { - return nil, trace.Wrap(err) - } - ctx, cancel := context.WithCancel(cfg.Context) uploader := &Uploader{ cfg: cfg, log: log.WithFields(log.Fields{ trace.Component: cfg.Component, }), - cancel: cancel, - ctx: ctx, + closeC: make(chan struct{}), auditLog: cfg.AuditLog, semaphore: make(chan struct{}, cfg.ConcurrentUploads), eventsCh: make(chan events.UploadEvent, cfg.ConcurrentUploads), } - // upload completer scans for uploads that have been initiated, but not completed - // by the client (aborted or crashed) and completes them. It will be closed once - // the uploader context is closed. - err = events.StartNewUploadCompleter(uploader.ctx, events.UploadCompleterConfig{ - Uploader: handler, - AuditLog: cfg.AuditLog, - SessionTracker: sessionTracker, - }) - if err != nil { - return nil, trace.Wrap(err) - } - return uploader, nil } @@ -150,10 +124,13 @@ type Uploader struct { cfg UploaderConfig log *log.Entry - cancel context.CancelFunc - ctx context.Context eventsCh chan events.UploadEvent auditLog events.IAuditLog + closeC chan struct{} +} + +func (u *Uploader) Close() { + close(u.closeC) } func (u *Uploader) writeSessionError(sessionID session.ID, err error) error { @@ -180,7 +157,7 @@ func (u *Uploader) checkSessionError(sessionID session.ID) (bool, error) { } // Serve runs the uploader until stopped -func (u *Uploader) Serve() error { +func (u *Uploader) Serve(ctx context.Context) error { backoff, err := utils.NewLinear(utils.LinearConfig{ Step: u.cfg.ScanPeriod, Max: u.cfg.ScanPeriod * 100, @@ -191,11 +168,14 @@ func (u *Uploader) Serve() error { } for { select { - case <-u.ctx.Done(): + case <-u.closeC: return nil + case <-ctx.Done(): + u.Close() + return nil + case event := <-u.eventsCh: // Successful and failed upload events are used to speed up and // slow down the scans and uploads. - case event := <-u.eventsCh: switch { case event.Error == nil: backoff.ResetToDelay() @@ -222,7 +202,7 @@ func (u *Uploader) Serve() error { // Tick at scan period but slow down (and speeds up) on errors. case <-backoff.After(): var failed bool - if _, err := u.Scan(); err != nil { + if _, err := u.Scan(ctx); err != nil { if trace.Unwrap(err) != errContext { failed = true u.log.WithError(err).Warningf("Uploader scan failed.") @@ -248,7 +228,7 @@ type ScanStats struct { } // Scan scans the streaming directory and uploads recordings -func (u *Uploader) Scan() (*ScanStats, error) { +func (u *Uploader) Scan(ctx context.Context) (*ScanStats, error) { files, err := os.ReadDir(u.cfg.ScanDir) if err != nil { return nil, trace.ConvertSystemError(err) @@ -264,7 +244,7 @@ func (u *Uploader) Scan() (*ScanStats, error) { continue } stats.Scanned++ - if err := u.startUpload(fi.Name()); err != nil { + if err := u.startUpload(ctx, fi.Name()); err != nil { if trace.IsCompareFailed(err) { u.log.Debugf("Scan is skipping recording %v that is locked by another process.", fi.Name()) continue @@ -297,11 +277,6 @@ func (u *Uploader) sessionErrorFilePath(sid session.ID) string { return filepath.Join(u.cfg.ScanDir, sid.String()+errorExt) } -// Close closes all operations -func (u *Uploader) Close() { - u.cancel() -} - type upload struct { sessionID session.ID reader *events.ProtoReader @@ -370,7 +345,7 @@ func (u *upload) removeFiles() error { return trace.NewAggregate(errs...) } -func (u *Uploader) startUpload(fileName string) error { +func (u *Uploader) startUpload(ctx context.Context, fileName string) error { sessionID, err := sessionIDFromPath(fileName) if err != nil { return trace.Wrap(err) @@ -416,7 +391,7 @@ func (u *Uploader) startUpload(fileName string) error { } start := time.Now() - if err := u.takeSemaphore(); err != nil { + if err := u.takeSemaphore(ctx); err != nil { if err := upload.Close(); err != nil { u.log.WithError(err).Warningf("Failed to close upload.") } @@ -426,7 +401,7 @@ func (u *Uploader) startUpload(fileName string) error { u.log.Debugf("Semaphore acquired in %v for upload %v.", time.Since(start), fileName) } go func() { - if err := u.upload(upload); err != nil { + if err := u.upload(ctx, upload); err != nil { u.log.WithError(err).Warningf("Upload failed.") u.emitEvent(events.UploadEvent{ SessionID: string(upload.sessionID), @@ -444,8 +419,8 @@ func (u *Uploader) startUpload(fileName string) error { return nil } -func (u *Uploader) upload(up *upload) error { - defer u.releaseSemaphore() +func (u *Uploader) upload(ctx context.Context, up *upload) error { + defer u.releaseSemaphore(ctx) defer func() { if err := up.Close(); err != nil { u.log.WithError(err).Warningf("Failed to close upload.") @@ -458,12 +433,12 @@ func (u *Uploader) upload(up *upload) error { if !trace.IsNotFound(err) { return trace.Wrap(err) } - stream, err = u.cfg.Streamer.CreateAuditStream(u.ctx, up.sessionID) + stream, err = u.cfg.Streamer.CreateAuditStream(ctx, up.sessionID) if err != nil { return trace.Wrap(err) } } else { - stream, err = u.cfg.Streamer.ResumeAuditStream(u.ctx, up.sessionID, status.UploadID) + stream, err = u.cfg.Streamer.ResumeAuditStream(ctx, up.sessionID, status.UploadID) if err != nil { if !trace.IsNotFound(err) { return trace.Wrap(err) @@ -472,7 +447,7 @@ func (u *Uploader) upload(up *upload) error { "Upload for sesion %v, upload ID %v is not found starting a new upload from scratch.", up.sessionID, status.UploadID) status = nil - stream, err = u.cfg.Streamer.CreateAuditStream(u.ctx, up.sessionID) + stream, err = u.cfg.Streamer.CreateAuditStream(ctx, up.sessionID) if err != nil { return trace.Wrap(err) } @@ -480,7 +455,7 @@ func (u *Uploader) upload(up *upload) error { } defer func() { - if err := stream.Close(u.ctx); err != nil { + if err := stream.Close(ctx); err != nil { if trace.Unwrap(err) != io.EOF { u.log.WithError(err).Debugf("Failed to close stream.") } @@ -491,16 +466,19 @@ func (u *Uploader) upload(up *upload) error { // if it was successful get the first status update // sent by the server after create. select { + case <-u.closeC: + return trace.Errorf("operation has been cancelled, uploader is closed") case <-stream.Status(): case <-time.After(defaults.NetworkRetryDuration): return trace.ConnectionProblem(nil, "timeout waiting for stream status update") - case <-u.ctx.Done(): - return trace.ConnectionProblem(u.ctx.Err(), "operation has been cancelled") + case <-ctx.Done(): + return trace.ConnectionProblem(ctx.Err(), "operation has been cancelled") + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go u.monitorStreamStatus(u.ctx, up, stream, cancel) + go u.monitorStreamStatus(ctx, up, stream, cancel) for { event, err := up.reader.Read(ctx) @@ -514,12 +492,12 @@ func (u *Uploader) upload(up *upload) error { if status != nil && event.GetIndex() <= status.LastEventIndex { continue } - if err := stream.EmitAuditEvent(u.ctx, event); err != nil { + if err := stream.EmitAuditEvent(ctx, event); err != nil { return trace.Wrap(err) } } - if err := stream.Complete(u.ctx); err != nil { + if err := stream.Complete(ctx); err != nil { u.log.WithError(err).Error("Failed to complete upload.") return trace.Wrap(err) } @@ -566,20 +544,20 @@ func (u *Uploader) monitorStreamStatus(ctx context.Context, up *upload, stream a var errContext = fmt.Errorf("context has closed") -func (u *Uploader) takeSemaphore() error { +func (u *Uploader) takeSemaphore(ctx context.Context) error { select { case u.semaphore <- struct{}{}: return nil - case <-u.ctx.Done(): + case <-ctx.Done(): return errContext } } -func (u *Uploader) releaseSemaphore() error { +func (u *Uploader) releaseSemaphore(ctx context.Context) error { select { case <-u.semaphore: return nil - case <-u.ctx.Done(): + case <-ctx.Done(): return errContext } } diff --git a/lib/events/filesessions/fileasync_chaos_test.go b/lib/events/filesessions/fileasync_chaos_test.go index b2b482718a9e1..5d2e1317fb6dd 100644 --- a/lib/events/filesessions/fileasync_chaos_test.go +++ b/lib/events/filesessions/fileasync_chaos_test.go @@ -35,7 +35,6 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" - "github.com/gravitational/teleport/lib/events/eventstest" "github.com/gravitational/teleport/lib/session" "github.com/gravitational/trace" @@ -118,15 +117,14 @@ func TestChaosUpload(t *testing.T) { scanPeriod := 10 * time.Second uploader, err := NewUploader(UploaderConfig{ - Context: ctx, ScanDir: scanDir, ScanPeriod: scanPeriod, Streamer: faultyStreamer, Clock: clock, AuditLog: &events.DiscardAuditLog{}, - }, &eventstest.MockSessionTrackerService{}) + }) require.NoError(t, err) - go uploader.Serve() + go uploader.Serve(ctx) // wait until uploader blocks on the clock clock.BlockUntil(1) @@ -174,7 +172,7 @@ func TestChaosUpload(t *testing.T) { scansCh := make(chan error, parallelStreams) for i := 0; i < parallelStreams; i++ { go func() { - _, err := uploader.Scan() + _, err := uploader.Scan(ctx) scansCh <- trace.Wrap(err) }() } @@ -203,7 +201,7 @@ func TestChaosUpload(t *testing.T) { for i := 0; i < parallelStreams; i++ { // do scans to catch remaining uploads - _, err = uploader.Scan() + _, err = uploader.Scan(ctx) require.NoError(t, err) // wait for the upload events diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 038cb48a063af..9e244a6702320 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -32,7 +32,6 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" - "github.com/gravitational/teleport/lib/events/eventstest" "github.com/gravitational/teleport/lib/session" "github.com/gravitational/trace" @@ -396,6 +395,8 @@ func TestUploadBackoff(t *testing.T) { // TestUploadBadSession creates a corrupted session file // and makes sure the uploader marks it as faulty func TestUploadBadSession(t *testing.T) { + ctx := context.Background() + p := newUploaderPack(t, nil) defer p.Close(t) @@ -423,7 +424,7 @@ func TestUploadBadSession(t *testing.T) { t.Fatalf("Timeout waiting for async upload, try `go test -v` to get more logs for details") } - stats, err := p.uploader.Scan() + stats, err := p.uploader.Scan(ctx) require.NoError(t, err) // Bad records have been scanned, but uploads have not started require.Equal(t, 1, stats.Scanned) @@ -476,17 +477,16 @@ func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack { } uploader, err := NewUploader(UploaderConfig{ - Context: pack.ctx, ScanDir: pack.scanDir, ScanPeriod: pack.scanPeriod, Streamer: pack.streamer, Clock: pack.clock, EventsC: pack.eventsC, AuditLog: &events.DiscardAuditLog{}, - }, &eventstest.MockSessionTrackerService{}) + }) require.NoError(t, err) pack.uploader = uploader - go pack.uploader.Serve() + go pack.uploader.Serve(pack.ctx) return pack } @@ -514,15 +514,14 @@ func runResume(t *testing.T, testCase resumeTestCase) { scanPeriod := 10 * time.Second uploader, err := NewUploader(UploaderConfig{ EventsC: eventsC, - Context: ctx, ScanDir: scanDir, ScanPeriod: scanPeriod, Streamer: test.streamer, Clock: clock, AuditLog: &events.DiscardAuditLog{}, - }, &eventstest.MockSessionTrackerService{}) + }) require.Nil(t, err) - go uploader.Serve() + go uploader.Serve(ctx) // wait until uploader blocks on the clock clock.BlockUntil(1) diff --git a/lib/events/filesessions/filestream.go b/lib/events/filesessions/filestream.go index 4cff982a6c72d..81f19a063d116 100644 --- a/lib/events/filesessions/filestream.go +++ b/lib/events/filesessions/filestream.go @@ -194,7 +194,8 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([] return parts, nil } -// ListUploads lists uploads that have been initiated but not completed +// ListUploads lists uploads that have been initiated but not completed with +// earlier uploads returned first func (h *Handler) ListUploads(ctx context.Context) ([]events.StreamUpload, error) { var uploads []events.StreamUpload @@ -235,11 +236,23 @@ func (h *Handler) ListUploads(ctx context.Context) ([]events.StreamUpload, error continue } + info, err := dir.Info() + if err != nil { + h.WithError(err).Warningf("Skipping upload %v: cannot read file info", uploadID) + continue + } + uploads = append(uploads, events.StreamUpload{ SessionID: session.ID(filepath.Base(files[0].Name())), ID: uploadID, + Initiated: info.ModTime(), }) } + + sort.Slice(uploads, func(i, j int) bool { + return uploads[i].Initiated.Before(uploads[j].Initiated) + }) + return uploads, nil } diff --git a/lib/events/gcssessions/gcsstream.go b/lib/events/gcssessions/gcsstream.go index 10af9d6c5480e..b1ef99f57892c 100644 --- a/lib/events/gcssessions/gcsstream.go +++ b/lib/events/gcssessions/gcsstream.go @@ -43,6 +43,7 @@ func (h *Handler) CreateUpload(ctx context.Context, sessionID session.ID) (*even upload := events.StreamUpload{ ID: uuid.New().String(), SessionID: sessionID, + Initiated: time.Now().UTC(), } if err := upload.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) @@ -251,7 +252,8 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([] return parts, nil } -// ListUploads lists uploads that have been initiated but not completed +// ListUploads lists uploads that have been initiated but not completed with +// earlier uploads returned first func (h *Handler) ListUploads(ctx context.Context) ([]events.StreamUpload, error) { i := h.gcsClient.Bucket(h.Config.Bucket).Objects(ctx, &storage.Query{ Prefix: h.uploadsPrefix(), @@ -273,6 +275,7 @@ func (h *Handler) ListUploads(ctx context.Context) ([]events.StreamUpload, error if err != nil { return nil, trace.Wrap(err) } + upload.Initiated = attrs.Created uploads = append(uploads, *upload) } return uploads, nil diff --git a/lib/events/s3sessions/s3stream.go b/lib/events/s3sessions/s3stream.go index 5c313b9f98434..087128367e9e1 100644 --- a/lib/events/s3sessions/s3stream.go +++ b/lib/events/s3sessions/s3stream.go @@ -196,6 +196,7 @@ func (h *Handler) ListUploads(ctx context.Context) ([]events.StreamUpload, error uploads = append(uploads, events.StreamUpload{ ID: *upload.UploadId, SessionID: h.fromPath(*upload.Key), + Initiated: *upload.Initiated, }) } if !*re.IsTruncated { @@ -204,6 +205,11 @@ func (h *Handler) ListUploads(ctx context.Context) ([]events.StreamUpload, error keyMarker = re.KeyMarker uploadIDMarker = re.UploadIdMarker } + + sort.Slice(uploads, func(i, j int) bool { + return uploads[i].Initiated.Before(uploads[j].Initiated) + }) + return uploads, nil } diff --git a/lib/events/stream.go b/lib/events/stream.go index 2e05efe54025c..aad27ef1cff7f 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -1087,6 +1087,9 @@ type MemoryUpload struct { sessionID session.ID //completed specifies upload as completed completed bool + // Initiated contains the timestamp of when the upload + // was initiated, not always initialized + Initiated time.Time } func (m *MemoryUploader) trySendEvent(event UploadEvent) { @@ -1115,10 +1118,14 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID) ID: uuid.New().String(), SessionID: sessionID, } + if m.Clock != nil { + upload.Initiated = m.Clock.Now() + } m.uploads[upload.ID] = &MemoryUpload{ id: upload.ID, sessionID: sessionID, parts: make(map[int64][]byte), + Initiated: upload.Initiated, } return upload, nil } @@ -1173,18 +1180,23 @@ func (m *MemoryUploader) UploadPart(ctx context.Context, upload StreamUpload, pa return &StreamPart{Number: partNumber}, nil } -// ListUploads lists uploads that have been initiated but not completed +// ListUploads lists uploads that have been initiated but not completed with +// earlier uploads returned first. func (m *MemoryUploader) ListUploads(ctx context.Context) ([]StreamUpload, error) { m.mtx.RLock() defer m.mtx.RUnlock() - out := make([]StreamUpload, 0, len(m.uploads)) + uploads := make([]StreamUpload, 0, len(m.uploads)) for id, upload := range m.uploads { - out = append(out, StreamUpload{ + uploads = append(uploads, StreamUpload{ ID: id, SessionID: upload.sessionID, + Initiated: upload.Initiated, }) } - return out, nil + sort.Slice(uploads, func(i, j int) bool { + return uploads[i].Initiated.Before(uploads[j].Initiated) + }) + return uploads, nil } // GetParts returns upload parts uploaded up to date, sorted by part number diff --git a/lib/service/db.go b/lib/service/db.go index 451a2cd5242bc..9c869a8cec41a 100644 --- a/lib/service/db.go +++ b/lib/service/db.go @@ -21,6 +21,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/events/filesessions" "github.com/gravitational/teleport/lib/limiter" "github.com/gravitational/teleport/lib/reversetunnel" "github.com/gravitational/teleport/lib/services" @@ -76,7 +77,14 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) { // Start uploader that will scan a path on disk and upload completed // sessions to the auth server. - err = process.initUploaderService(accessPoint, conn.Client, conn.Client) + uploaderCfg := filesessions.UploaderConfig{ + Streamer: accessPoint, + AuditLog: conn.Client, + } + completerCfg := events.UploadCompleterConfig{ + SessionTracker: conn.Client, + } + err = process.initUploaderService(uploaderCfg, completerCfg) if err != nil { return trace.Wrap(err) } diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index 1073ad978e65a..314b4325c6050 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -28,6 +28,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/events/filesessions" kubeproxy "github.com/gravitational/teleport/lib/kube/proxy" "github.com/gravitational/teleport/lib/labels" "github.com/gravitational/teleport/lib/reversetunnel" @@ -85,7 +86,14 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C // Start uploader that will scan a path on disk and upload completed // sessions to the Auth Server. - if err := process.initUploaderService(accessPoint, conn.Client, conn.Client); err != nil { + uploaderCfg := filesessions.UploaderConfig{ + Streamer: accessPoint, + AuditLog: conn.Client, + } + completerCfg := events.UploadCompleterConfig{ + SessionTracker: conn.Client, + } + if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil { return trace.Wrap(err) } diff --git a/lib/service/service.go b/lib/service/service.go index 00d3ea7413865..940d890838f4f 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1281,6 +1281,10 @@ func (process *TeleportProcess) initAuthService() error { Component: teleport.ComponentAuth, AuditLog: process.auditLog, SessionTracker: authServer.Services, + // DELETE IN 11.0.0 + // Provide a grace period so that Auth does not prematurely upload + // sessions which don't have a session tracker (v9.2 and earlier) + GracePeriod: defaults.UploadGracePeriod, }) if err != nil { return trace.Wrap(err) @@ -1949,7 +1953,15 @@ func (process *TeleportProcess) initSSH() error { // init uploader service for recording SSH node, if proxy is not // enabled on this node, because proxy stars uploader service as well if !cfg.Proxy.Enabled { - if err := process.initUploaderService(authClient, conn.Client, conn.Client); err != nil { + uploaderCfg := filesessions.UploaderConfig{ + Streamer: authClient, + AuditLog: conn.Client, + } + completerCfg := events.UploadCompleterConfig{ + SessionTracker: conn.Client, + GracePeriod: defaults.UploadGracePeriod, + } + if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil { return trace.Wrap(err) } } @@ -2070,7 +2082,7 @@ func (process *TeleportProcess) registerWithAuthServer(role types.SystemRole, ev // initUploadService starts a file-based uploader that scans the local streaming logs directory // (data/log/upload/streaming/default/) -func (process *TeleportProcess) initUploaderService(streamer events.Streamer, auditLog events.IAuditLog, sessionTracker services.SessionTrackerService) error { +func (process *TeleportProcess) initUploaderService(uploaderCfg filesessions.UploaderConfig, completerCfg events.UploadCompleterConfig) error { log := process.log.WithFields(logrus.Fields{ trace.Component: teleport.Component(teleport.ComponentAuditLog, process.id), }) @@ -2101,20 +2113,19 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au } } - fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{ - ScanDir: filepath.Join(path...), - Streamer: streamer, - AuditLog: auditLog, - EventsC: process.Config.UploadEventsC, - }, sessionTracker) + uploaderCfg.ScanDir = filepath.Join(path...) + uploaderCfg.EventsC = process.Config.UploadEventsC + fileUploader, err := filesessions.NewUploader(uploaderCfg) if err != nil { return trace.Wrap(err) } + process.RegisterFunc("fileuploader.service", func() error { - err := fileUploader.Serve() + err := fileUploader.Serve(process.ExitContext()) if err != nil { log.WithError(err).Errorf("File uploader server exited with error.") } + return nil }) @@ -2124,6 +2135,36 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au log.Infof("File uploader has shut down.") }) + // upload completer scans for uploads that have been initiated, but not completed + // by the client (aborted or crashed) and completes them. It will be closed once + // the uploader context is closed. + handler, err := filesessions.NewHandler(filesessions.Config{ + Directory: filepath.Join(path...), + }) + if err != nil { + return trace.Wrap(err) + } + + completerCfg.Uploader = handler + completerCfg.AuditLog = uploaderCfg.AuditLog + uploadCompleter, err := events.NewUploadCompleter(completerCfg) + if err != nil { + return trace.Wrap(err) + } + + process.RegisterFunc("fileuploadcompleter.service", func() error { + if err := uploadCompleter.Serve(process.ExitContext()); err != nil { + log.WithError(err).Errorf("File uploader server exited with error.") + } + return nil + }) + + process.OnExit("fileuploadcompleter.shutdown", func(payload interface{}) { + log.Infof("File upload completer is shutting down.") + uploadCompleter.Close() + log.Infof("File upload completer has shut down.") + }) + return nil } @@ -3297,7 +3338,14 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { log.Infof("Exited.") }) - if err := process.initUploaderService(accessPoint, conn.Client, conn.Client); err != nil { + uploaderCfg := filesessions.UploaderConfig{ + Streamer: accessPoint, + AuditLog: conn.Client, + } + completerCfg := events.UploadCompleterConfig{ + SessionTracker: conn.Client, + } + if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil { return trace.Wrap(err) } return nil @@ -3551,7 +3599,14 @@ func (process *TeleportProcess) initApps() { // Start uploader that will scan a path on disk and upload completed // sessions to the Auth Server. - if err := process.initUploaderService(accessPoint, conn.Client, conn.Client); err != nil { + uploaderCfg := filesessions.UploaderConfig{ + Streamer: accessPoint, + AuditLog: conn.Client, + } + completerCfg := events.UploadCompleterConfig{ + SessionTracker: conn.Client, + } + if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil { return trace.Wrap(err) } From 5717783d8a62c81f867b474e0f57ed828fe4ea37 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 5 May 2022 13:22:05 -0700 Subject: [PATCH 2/5] Fix double close on channel. --- lib/events/filesessions/fileasync.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index a1bb4b441a278..6dab5f97b2bb8 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -171,7 +171,6 @@ func (u *Uploader) Serve(ctx context.Context) error { case <-u.closeC: return nil case <-ctx.Done(): - u.Close() return nil case event := <-u.eventsCh: // Successful and failed upload events are used to speed up and From b70ec422c0510d2823d1c7e441e3b91e3a2ec277 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 5 May 2022 13:26:30 -0700 Subject: [PATCH 3/5] Move mockSessionTrackerService to completer_test; Use empty tracker service for grace period test. --- lib/events/complete_test.go | 64 +++++++++++++++++++++++------------ lib/events/eventstest/mock.go | 46 ------------------------- 2 files changed, 43 insertions(+), 67 deletions(-) diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go index 2beca33c85d4c..e33c4bb9c1762 100644 --- a/lib/events/complete_test.go +++ b/lib/events/complete_test.go @@ -22,10 +22,12 @@ import ( "testing" "time" + "github.com/gravitational/teleport/api/client/proto" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events/eventstest" "github.com/gravitational/teleport/lib/session" + "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) @@ -52,9 +54,9 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { }, } - sessionTrackerService := &eventstest.MockSessionTrackerService{ - Clock: clock, - MockTrackers: []types.SessionTracker{sessionTracker}, + sessionTrackerService := &mockSessionTrackerService{ + clock: clock, + trackers: []types.SessionTracker{sessionTracker}, } uc, err := NewUploadCompleter(UploadCompleterConfig{ @@ -90,22 +92,7 @@ func TestUploadCompleterWithGracePeriod(t *testing.T) { log := &mockAuditLog{} sessionID := session.NewID() - expires := clock.Now().Add(time.Hour * 1) - sessionTracker := &types.SessionTrackerV1{ - Spec: types.SessionTrackerSpecV1{ - SessionID: string(sessionID), - }, - ResourceHeader: types.ResourceHeader{ - Metadata: types.Metadata{ - Expires: &expires, - }, - }, - } - - sessionTrackerService := &eventstest.MockSessionTrackerService{ - Clock: clock, - MockTrackers: []types.SessionTracker{sessionTracker}, - } + sessionTrackerService := &mockSessionTrackerService{} uc, err := NewUploadCompleter(UploadCompleterConfig{ Uploader: mu, @@ -123,7 +110,7 @@ func TestUploadCompleterWithGracePeriod(t *testing.T) { require.NoError(t, err) require.False(t, mu.uploads[upload.ID].completed) - // Even if session tracker is expired/not found, the completer + // Even if session tracker is not found, the completer // should wait until the grace period to complete it clock.Advance(1 * time.Hour) err = uc.checkUploads(context.Background()) @@ -160,7 +147,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) { Uploader: mu, AuditLog: log, Clock: clock, - SessionTracker: &eventstest.MockSessionTrackerService{}, + SessionTracker: &mockSessionTrackerService{}, }) require.NoError(t, err) @@ -219,3 +206,38 @@ func (m *mockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID, func (m *mockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { return m.emitter.EmitAuditEvent(ctx, event) } + +type mockSessionTrackerService struct { + clock clockwork.Clock + trackers []types.SessionTracker +} + +func (m *mockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) { + return nil, trace.NotImplemented("") +} + +func (m *mockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) { + for _, tracker := range m.trackers { + // mock session tracker expiration + if tracker.GetSessionID() == sessionID && tracker.Expiry().After(m.clock.Now()) { + return tracker, nil + } + } + return nil, trace.NotFound("tracker not found") +} + +func (m *mockSessionTrackerService) CreateSessionTracker(ctx context.Context, req *proto.CreateSessionTrackerRequest) (types.SessionTracker, error) { + return nil, trace.NotImplemented("") +} + +func (m *mockSessionTrackerService) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error { + return trace.NotImplemented("") +} + +func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, sessionID string) error { + return trace.NotImplemented("") +} + +func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error { + return trace.NotImplemented("") +} diff --git a/lib/events/eventstest/mock.go b/lib/events/eventstest/mock.go index 7411eaa9161ea..d5b254ddbb3fa 100644 --- a/lib/events/eventstest/mock.go +++ b/lib/events/eventstest/mock.go @@ -20,12 +20,8 @@ import ( "context" "sync" - "github.com/gravitational/teleport/api/client/proto" - "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/session" - "github.com/gravitational/trace" - "github.com/jonboulle/clockwork" ) // MockEmitter is an emitter that stores all emitted events. @@ -92,45 +88,3 @@ func (e *MockEmitter) Close(ctx context.Context) error { func (e *MockEmitter) Complete(ctx context.Context) error { return nil } - -type MockSessionTrackerService struct { - Clock clockwork.Clock - MockTrackers []types.SessionTracker -} - -func (m *MockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) { - var trackers []types.SessionTracker - for _, tracker := range m.MockTrackers { - // mock session tracker expiration - if tracker.Expiry().After(m.Clock.Now()) { - trackers = append(trackers, tracker) - } - } - return trackers, nil -} - -func (m *MockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) { - for _, tracker := range m.MockTrackers { - // mock session tracker expiration - if tracker.GetSessionID() == sessionID && tracker.Expiry().After(m.Clock.Now()) { - return tracker, nil - } - } - return nil, trace.NotFound("tracker not found") -} - -func (m *MockSessionTrackerService) CreateSessionTracker(ctx context.Context, req *proto.CreateSessionTrackerRequest) (types.SessionTracker, error) { - return nil, nil -} - -func (m *MockSessionTrackerService) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error { - return nil -} - -func (m *MockSessionTrackerService) RemoveSessionTracker(ctx context.Context, sessionID string) error { - return nil -} - -func (m *MockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error { - return nil -} From 3aacb28a4f6eebaf4a5072b7a296df85cd9b05e5 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 5 May 2022 13:57:15 -0700 Subject: [PATCH 4/5] Fix mockSessionTrackerService. --- lib/events/complete_test.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go index e33c4bb9c1762..3aeb8f9c42c9e 100644 --- a/lib/events/complete_test.go +++ b/lib/events/complete_test.go @@ -213,7 +213,14 @@ type mockSessionTrackerService struct { } func (m *mockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) { - return nil, trace.NotImplemented("") + var trackers []types.SessionTracker + for _, tracker := range m.trackers { + // mock session tracker expiration + if tracker.Expiry().After(m.clock.Now()) { + trackers = append(trackers, tracker) + } + } + return trackers, nil } func (m *mockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) { @@ -227,17 +234,17 @@ func (m *mockSessionTrackerService) GetSessionTracker(ctx context.Context, sessi } func (m *mockSessionTrackerService) CreateSessionTracker(ctx context.Context, req *proto.CreateSessionTrackerRequest) (types.SessionTracker, error) { - return nil, trace.NotImplemented("") + return nil, trace.NotImplemented("CreateSessionTracker is not implemented") } func (m *mockSessionTrackerService) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error { - return trace.NotImplemented("") + return trace.NotImplemented("UpdateSessionTracker is not implemented") } func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, sessionID string) error { - return trace.NotImplemented("") + return trace.NotImplemented("RemoveSessionTracker is not implemented") } func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error { - return trace.NotImplemented("") + return trace.NotImplemented("UpdatePresence is not implemented") } From 91418f181a1eda2e990f0527ed2302472627bdd2 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 5 May 2022 17:25:10 -0700 Subject: [PATCH 5/5] Cleanup. --- lib/events/complete.go | 2 ++ lib/events/filesessions/fileasync.go | 3 --- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/events/complete.go b/lib/events/complete.go index bd0301b8cdd16..82b6c7129f468 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -171,6 +171,8 @@ func (u *UploadCompleter) checkUploads(ctx context.Context) error { if u.cfg.GracePeriod != 0 { gracePoint := upload.Initiated.Add(u.cfg.GracePeriod) if gracePoint.After(u.cfg.Clock.Now()) { + // uploads are ordered oldest to newest, stop checking + // once an upload doesn't exceed the grace point return nil } } diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index 6dab5f97b2bb8..a8a6415f1d48d 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -113,9 +113,6 @@ func NewUploader(cfg UploaderConfig) (*Uploader, error) { // It keeps checkpoints of the upload state and resumes // the upload that have been aborted. // -// The uploader completes the sessions that have been -// abandoned longer than the grace period. -// // It marks corrupted session files to skip their processing. // type Uploader struct {