Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky test - TestAuditOn #12101

Merged
merged 14 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {
inForwardAgent: false,
auditSessionsURI: t.TempDir(),
}, {
comment: "recording proxy with upload to file server",
inRecordLocation: types.RecordAtProxy,
inForwardAgent: false,
auditSessionsURI: t.TempDir(),
Expand Down
2 changes: 1 addition & 1 deletion integration/utmp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func newSrvCtx(ctx context.Context, t *testing.T) *SrvCtx {
nodeDir,
"",
utils.NetAddr{},
nil,
s.nodeClient,
regular.SetUUID(s.nodeID),
regular.SetNamespace(apidefaults.Namespace),
regular.SetEmitter(s.nodeClient),
Expand Down
67 changes: 34 additions & 33 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type UploadCompleterConfig struct {
CheckPeriod time.Duration
// Clock is used to override clock in tests
Clock clockwork.Clock
// Unstarted does not start automatic goroutine,
// is useful when completer is embedded in another function
Unstarted bool
}

// CheckAndSetDefaults checks and sets default values
Expand All @@ -76,37 +73,41 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
return nil
}

// NewUploadCompleter returns a new instance of the upload completer
// the completer has to be closed to release resources and goroutines
func NewUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
// StartNewUploadCompleter starts an upload completer background process. It can
// be closed by closing the provided context.
func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error {
uc, err := newUploadCompleter(cfg)
if err != nil {
return trace.Wrap(err)
}
go uc.start(ctx)
return nil
}

// newUploadCompleter returns a new instance of the upload completer without
// starting it. Useful in tests.
func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
ctx, cancel := context.WithCancel(context.Background())
u := &UploadCompleter{
cfg: cfg,
log: log.WithFields(log.Fields{
trace.Component: teleport.Component(cfg.Component, "completer"),
}),
cancel: cancel,
closeCtx: ctx,
}
if !cfg.Unstarted {
go u.run()
}
return u, nil
}

// UploadCompleter periodically scans uploads that have not been completed
// and completes them
type UploadCompleter struct {
cfg UploadCompleterConfig
log *log.Entry
cancel context.CancelFunc
closeCtx context.Context
cfg UploadCompleterConfig
log *log.Entry
}

func (u *UploadCompleter) run() {
// start starts a goroutine to periodically check for and complete abandoned uploads
func (u *UploadCompleter) start(ctx context.Context) {
periodic := interval.New(interval.Config{
Duration: u.cfg.CheckPeriod,
FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod),
Expand All @@ -117,17 +118,27 @@ func (u *UploadCompleter) run() {
for {
select {
case <-periodic.Next():
if err := u.CheckUploads(u.closeCtx); err != nil {
if err := u.checkUploads(ctx); err != nil {
u.log.WithError(err).Warningf("Failed to check uploads.")
}
case <-u.closeCtx.Done():
case <-ctx.Done():
return
}
}
}

// CheckUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
// checkUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) checkUploads(ctx context.Context) error {
trackers, err := u.cfg.SessionTracker.GetActiveSessionTrackers(ctx)
if err != nil {
return trace.Wrap(err)
}

var activeSessionIDs []string
for _, st := range trackers {
activeSessionIDs = append(activeSessionIDs, st.GetSessionID())
}

uploads, err := u.cfg.Uploader.ListUploads(ctx)
if err != nil {
return trace.Wrap(err)
Expand All @@ -140,14 +151,10 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
}
}()

// Complete upload for any uploads without an active session tracker
for _, upload := range uploads {
// Check for an active session tracker for the session upload.
_, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String())
if err == nil {
// session appears to be active, don't complete the upload.
if apiutils.SliceContainsStr(activeSessionIDs, upload.SessionID.String()) {
continue
} else if !trace.IsNotFound(err) {
return trace.Wrap(err)
}

parts, err := u.cfg.Uploader.ListParts(ctx, upload)
Expand Down Expand Up @@ -207,12 +214,6 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
return nil
}

// Close closes all outstanding operations without waiting
func (u *UploadCompleter) Close() error {
u.cancel()
return nil
}

func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData UploadMetadata) error {
// at this point, we don't know whether we'll need to emit a session.end or a
// windows.desktop.session.end, but as soon as we see the session start we'll
Expand Down
13 changes: 6 additions & 7 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,24 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
MockTrackers: []types.SessionTracker{sessionTracker},
}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
uc, err := newUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

clock.Advance(1 * time.Hour)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
}
Expand All @@ -99,8 +99,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
sessionEvents: []apievents.AuditEvent{test.startEvent},
}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
uc, err := newUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
Clock: clock,
Expand All @@ -116,7 +115,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
_, err = mu.UploadPart(context.Background(), *upload, 0, strings.NewReader("part"))
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)

// advance the clock to force the asynchronous session end event emission
Expand Down
9 changes: 8 additions & 1 deletion lib/events/eventstest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,14 @@ type MockSessionTrackerService struct {
}

func (m *MockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) {
return nil, nil
var trackers []types.SessionTracker
for _, tracker := range m.MockTrackers {
// mock session tracker expiration
if tracker.Expiry().After(m.Clock.Now()) {
trackers = append(trackers, tracker)
}
}
return trackers, nil
}

func (m *MockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
Expand Down
42 changes: 18 additions & 24 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,10 @@ func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerServi
if err != nil {
return nil, trace.Wrap(err)
}
// completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them
uploadCompleter, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: handler,
AuditLog: cfg.AuditLog,
Unstarted: true,
SessionTracker: sessionTracker,
})
if err != nil {
return nil, trace.Wrap(err)
}

ctx, cancel := context.WithCancel(cfg.Context)
uploader := &Uploader{
uploadCompleter: uploadCompleter,
cfg: cfg,
cfg: cfg,
log: log.WithFields(log.Fields{
trace.Component: cfg.Component,
}),
Expand All @@ -126,6 +115,19 @@ func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerServi
semaphore: make(chan struct{}, cfg.ConcurrentUploads),
eventsCh: make(chan events.UploadEvent, cfg.ConcurrentUploads),
}

// upload completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them. It will be closed once
// the uploader context is closed.
err = events.StartNewUploadCompleter(uploader.ctx, events.UploadCompleterConfig{
Uploader: handler,
AuditLog: cfg.AuditLog,
SessionTracker: sessionTracker,
})
if err != nil {
return nil, trace.Wrap(err)
}

return uploader, nil
}

Expand All @@ -145,9 +147,8 @@ func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerServi
type Uploader struct {
semaphore chan struct{}

cfg UploaderConfig
log *log.Entry
uploadCompleter *events.UploadCompleter
cfg UploaderConfig
log *log.Entry

cancel context.CancelFunc
ctx context.Context
Expand Down Expand Up @@ -221,12 +222,6 @@ func (u *Uploader) Serve() error {
// Tick at scan period but slow down (and speeds up) on errors.
case <-backoff.After():
var failed bool
if err := u.uploadCompleter.CheckUploads(u.ctx); err != nil {
if trace.Unwrap(err) != errContext {
failed = true
u.log.WithError(err).Warningf("Completer scan failed.")
}
}
if _, err := u.Scan(); err != nil {
if trace.Unwrap(err) != errContext {
failed = true
Expand Down Expand Up @@ -303,9 +298,8 @@ func (u *Uploader) sessionErrorFilePath(sid session.ID) string {
}

// Close closes all operations
func (u *Uploader) Close() error {
func (u *Uploader) Close() {
u.cancel()
return u.uploadCompleter.Close()
}

type upload struct {
Expand Down
3 changes: 0 additions & 3 deletions lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,6 @@ type uploaderPack struct {

func (u *uploaderPack) Close(t *testing.T) {
u.cancel()

err := u.uploader.Close()
require.NoError(t, err)
}

func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack {
Expand Down
10 changes: 3 additions & 7 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,10 +1261,9 @@ func (process *TeleportProcess) initAuthService() error {
process.setLocalAuth(authServer)

// Upload completer is responsible for checking for initiated but abandoned
// session uploads and completing them
var uploadCompleter *events.UploadCompleter
// session uploads and completing them. it will be closed once the process exits.
if uploadHandler != nil {
uploadCompleter, err = events.NewUploadCompleter(events.UploadCompleterConfig{
err = events.StartNewUploadCompleter(process.ExitContext(), events.UploadCompleterConfig{
Uploader: uploadHandler,
Component: teleport.ComponentAuth,
AuditLog: process.auditLog,
Expand Down Expand Up @@ -1486,9 +1485,6 @@ func (process *TeleportProcess) initAuthService() error {
// of the auth server basically never exits.
warnOnErr(tlsServer.Close(), log)
}
if uploadCompleter != nil {
warnOnErr(uploadCompleter.Close(), log)
}
log.Info("Exited.")
})
return nil
Expand Down Expand Up @@ -2111,7 +2107,7 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au

process.OnExit("fileuploader.shutdown", func(payload interface{}) {
log.Infof("File uploader is shutting down.")
warnOnErr(fileUploader.Close(), log)
fileUploader.Close()
log.Infof("File uploader has shut down.")
})

Expand Down
5 changes: 0 additions & 5 deletions lib/services/local/sessiontracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,6 @@ func (s *sessionTracker) UpdateSessionTracker(ctx context.Context, req *proto.Up
switch update := req.Update.(type) {
case *proto.UpdateSessionTrackerRequest_UpdateState:
session.SetState(update.UpdateState.State)
if update.UpdateState.State == types.SessionState_SessionStateTerminated {
// Mark session tracker for deletion.
session.SetExpiry(s.bk.Clock().Now())
}

case *proto.UpdateSessionTrackerRequest_AddParticipant:
session.AddParticipant(*update.AddParticipant.Participant)
case *proto.UpdateSessionTrackerRequest_RemoveParticipant:
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/forward/sshserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func New(c ServerConfig) (*Server, error) {
s.kexAlgorithms = c.KEXAlgorithms
s.macAlgorithms = c.MACAlgorithms

s.sessionRegistry, err = srv.NewSessionRegistry(s, nil)
s.sessionRegistry, err = srv.NewSessionRegistry(s, s.authClient)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
16 changes: 8 additions & 8 deletions lib/srv/regular/sshserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ func TestProxyRoundRobin(t *testing.T) {
t.TempDir(),
"",
utils.NetAddr{},
nil,
proxyClient,
SetProxyMode(reverseTunnelServer, proxyClient),
SetSessionServer(proxyClient),
SetEmitter(nodeClient),
Expand Down Expand Up @@ -1280,7 +1280,7 @@ func TestProxyDirectAccess(t *testing.T) {
t.TempDir(),
"",
utils.NetAddr{},
nil,
proxyClient,
SetProxyMode(reverseTunnelServer, proxyClient),
SetSessionServer(proxyClient),
SetEmitter(nodeClient),
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func TestLimiter(t *testing.T) {
nodeStateDir,
"",
utils.NetAddr{},
nil,
nodeClient,
SetLimiter(limiter),
SetShell("/bin/sh"),
SetSessionServer(nodeClient),
Expand Down Expand Up @@ -1586,13 +1586,13 @@ func TestSessionTracker(t *testing.T) {
err = se.Close()
require.NoError(t, err)

// Advance server clock to trigger the session to close (after lingering) and
// update the session tracker to expired. We don't know when the linger sleeper
// will start waiting for clock, so we give it a grace period of 5 seconds.
time.Sleep(time.Second * 5)
f.clock.BlockUntil(3)
f.clock.Advance(defaults.SessionIdlePeriod)

// once the session is closed, the tracker should expire (not found)
// Once the session is closed, the tracker should be termianted.
// Once the last set expiration is up, the tracker should be delted.
f.clock.Advance(defaults.SessionTrackerTTL)

trackerExpired := func() bool {
_, err := f.testSrv.Auth().GetSessionTracker(ctx, tracker.GetSessionID())
return trace.IsNotFound(err)
Expand Down
Loading