Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-add grace period to Upload completer for backwards compatibility. #12471

Merged
5 changes: 5 additions & 0 deletions lib/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ const (
// AbandonedUploadPollingRate defines how often to check for
// abandoned uploads which need to be completed.
AbandonedUploadPollingRate = SessionTrackerTTL / 6

// UploadGracePeriod is a period after which non-completed
// upload is considered abandoned and will be completed by the reconciler
// DELETE IN 11.0.0
UploadGracePeriod = 24 * time.Hour
)

var (
Expand Down
5 changes: 4 additions & 1 deletion lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,11 +614,14 @@ type StreamUpload struct {
ID string
// SessionID is a session ID of the upload
SessionID session.ID
// Initiated contains the timestamp of when the upload
// was initiated, not always initialized
Initiated time.Time
}

// String returns user friendly representation of the upload
func (u StreamUpload) String() string {
return fmt.Sprintf("Upload(session=%v, id=%v)", u.SessionID, u.ID)
return fmt.Sprintf("Upload(session=%v, id=%v, initiated=%v)", u.SessionID, u.ID, u.Initiated)
}

// CheckAndSetDefaults checks and sets default values
Expand Down
62 changes: 43 additions & 19 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type UploadCompleterConfig struct {
CheckPeriod time.Duration
// Clock is used to override clock in tests
Clock clockwork.Clock
// DELETE IN 11.0.0 in favor of SessionTrackerService
// GracePeriod is the period after which an upload's session
// tracker will be check to see if it's an abandoned upload.
GracePeriod time.Duration
}

// CheckAndSetDefaults checks and sets default values
Expand All @@ -73,20 +77,8 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
return nil
}

// StartNewUploadCompleter starts an upload completer background process. It can
// be closed by closing the provided context.
func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error {
uc, err := newUploadCompleter(cfg)
if err != nil {
return trace.Wrap(err)
}
go uc.start(ctx)
return nil
}

// newUploadCompleter returns a new instance of the upload completer without
// starting it. Useful in tests.
func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
// NewUploadCompleter returns a new UploadCompleter.
func NewUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -95,19 +87,37 @@ func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
log: log.WithFields(log.Fields{
trace.Component: teleport.Component(cfg.Component, "completer"),
}),
closeC: make(chan struct{}),
}
return u, nil
}

// StartNewUploadCompleter starts an upload completer background process that will
// will close once the provided ctx is closed.
func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error {
uc, err := NewUploadCompleter(cfg)
if err != nil {
return trace.Wrap(err)
}
go uc.Serve(ctx)
return nil
}

// UploadCompleter periodically scans uploads that have not been completed
// and completes them
type UploadCompleter struct {
cfg UploadCompleterConfig
log *log.Entry
cfg UploadCompleterConfig
log *log.Entry
closeC chan struct{}
}

// Close stops the UploadCompleter
func (u *UploadCompleter) Close() {
close(u.closeC)
}

// start starts a goroutine to periodically check for and complete abandoned uploads
func (u *UploadCompleter) start(ctx context.Context) {
// Serve runs the upload completer until closed or until ctx is cancelled.
func (u *UploadCompleter) Serve(ctx context.Context) error {
periodic := interval.New(interval.Config{
Duration: u.cfg.CheckPeriod,
FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod),
Expand All @@ -121,8 +131,10 @@ func (u *UploadCompleter) start(ctx context.Context) {
if err := u.checkUploads(ctx); err != nil {
u.log.WithError(err).Warningf("Failed to check uploads.")
}
case <-u.closeC:
return nil
case <-ctx.Done():
return
return trace.Wrap(ctx.Err(), "Context canceled")
}
}
}
Expand Down Expand Up @@ -153,6 +165,18 @@ func (u *UploadCompleter) checkUploads(ctx context.Context) error {

// Complete upload for any uploads without an active session tracker
for _, upload := range uploads {
// DELETE IN 11.0.0
// To support v9/v8 versions which do not use SessionTrackerService,
// sessions are only considered abandoned after the provided grace period.
if u.cfg.GracePeriod != 0 {
gracePoint := upload.Initiated.Add(u.cfg.GracePeriod)
if gracePoint.After(u.cfg.Clock.Now()) {
// uploads are ordered oldest to newest, stop checking
// once an upload doesn't exceed the grace point
return nil
}
}

if apiutils.SliceContainsStr(activeSessionIDs, upload.SessionID.String()) {
continue
}
Expand Down
98 changes: 92 additions & 6 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"testing"
"time"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
)
Expand All @@ -52,12 +54,12 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
},
}

sessionTrackerService := &eventstest.MockSessionTrackerService{
Clock: clock,
MockTrackers: []types.SessionTracker{sessionTracker},
sessionTrackerService := &mockSessionTrackerService{
clock: clock,
trackers: []types.SessionTracker{sessionTracker},
}

uc, err := newUploadCompleter(UploadCompleterConfig{
uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Expand All @@ -79,6 +81,48 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
require.True(t, mu.uploads[upload.ID].completed)
}

// TestUploadCompleterWithGracePeriod verifies that the upload completer
// completes uploads that have lived past the configured grace period.
// DELETE IN 11.0.0
func TestUploadCompleterWithGracePeriod(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &mockAuditLog{}

sessionID := session.NewID()
sessionTrackerService := &mockSessionTrackerService{}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

// Even if session tracker is not found, the completer
// should wait until the grace period to complete it
clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
}

// TestUploadCompleterEmitsSessionEnd verifies that the upload completer
// emits session.end or windows.desktop.session.end events for sessions
// that are completed.
Expand All @@ -99,11 +143,11 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
sessionEvents: []apievents.AuditEvent{test.startEvent},
}

uc, err := newUploadCompleter(UploadCompleterConfig{
uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
Clock: clock,
SessionTracker: &eventstest.MockSessionTrackerService{},
SessionTracker: &mockSessionTrackerService{},
})
require.NoError(t, err)

Expand Down Expand Up @@ -162,3 +206,45 @@ func (m *mockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID,
func (m *mockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return m.emitter.EmitAuditEvent(ctx, event)
}

type mockSessionTrackerService struct {
clock clockwork.Clock
trackers []types.SessionTracker
}

func (m *mockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) {
var trackers []types.SessionTracker
for _, tracker := range m.trackers {
// mock session tracker expiration
if tracker.Expiry().After(m.clock.Now()) {
trackers = append(trackers, tracker)
}
}
return trackers, nil
}

func (m *mockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
for _, tracker := range m.trackers {
// mock session tracker expiration
if tracker.GetSessionID() == sessionID && tracker.Expiry().After(m.clock.Now()) {
return tracker, nil
}
}
return nil, trace.NotFound("tracker not found")
}

func (m *mockSessionTrackerService) CreateSessionTracker(ctx context.Context, req *proto.CreateSessionTrackerRequest) (types.SessionTracker, error) {
return nil, trace.NotImplemented("CreateSessionTracker is not implemented")
}

func (m *mockSessionTrackerService) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error {
return trace.NotImplemented("UpdateSessionTracker is not implemented")
}

func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, sessionID string) error {
return trace.NotImplemented("RemoveSessionTracker is not implemented")
}

func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error {
return trace.NotImplemented("UpdatePresence is not implemented")
}
46 changes: 0 additions & 46 deletions lib/events/eventstest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ import (
"context"
"sync"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
)

// MockEmitter is an emitter that stores all emitted events.
Expand Down Expand Up @@ -92,45 +88,3 @@ func (e *MockEmitter) Close(ctx context.Context) error {
func (e *MockEmitter) Complete(ctx context.Context) error {
return nil
}

type MockSessionTrackerService struct {
Clock clockwork.Clock
MockTrackers []types.SessionTracker
}

func (m *MockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) {
var trackers []types.SessionTracker
for _, tracker := range m.MockTrackers {
// mock session tracker expiration
if tracker.Expiry().After(m.Clock.Now()) {
trackers = append(trackers, tracker)
}
}
return trackers, nil
}

func (m *MockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
for _, tracker := range m.MockTrackers {
// mock session tracker expiration
if tracker.GetSessionID() == sessionID && tracker.Expiry().After(m.Clock.Now()) {
return tracker, nil
}
}
return nil, trace.NotFound("tracker not found")
}

func (m *MockSessionTrackerService) CreateSessionTracker(ctx context.Context, req *proto.CreateSessionTrackerRequest) (types.SessionTracker, error) {
return nil, nil
}

func (m *MockSessionTrackerService) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error {
return nil
}

func (m *MockSessionTrackerService) RemoveSessionTracker(ctx context.Context, sessionID string) error {
return nil
}

func (m *MockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error {
return nil
}
Loading