Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v9] Replace session upload grace period with session tracker #11853

Merged
merged 8 commits into from
Apr 11, 2022
1,511 changes: 912 additions & 599 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion api/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,13 @@ message SessionTrackerRemoveParticipant {
string ParticipantID = 2 [ (gogoproto.jsontag) = "participant_id,omitempty" ];
}

// SessionTrackerUpdateExpiry is used to update the session tracker expiration time.
message SessionTrackerUpdateExpiry {
// Expires is when the session tracker will expire.
google.protobuf.Timestamp Expires = 1
[ (gogoproto.stdtime) = true, (gogoproto.jsontag) = "expires" ];
}

// UpdateSessionTrackerRequest is a request to update some state of a session.
message UpdateSessionTrackerRequest {
// SessionID is unique identifier of this session.
Expand All @@ -1615,6 +1622,8 @@ message UpdateSessionTrackerRequest {
[ (gogoproto.jsontag) = "add_participant,omitempty" ];
SessionTrackerRemoveParticipant RemoveParticipant = 4
[ (gogoproto.jsontag) = "remove_participant,omitempty" ];
SessionTrackerUpdateExpiry UpdateExpiry = 5
[ (gogoproto.jsontag) = "update_expiry,omitempty" ];
}
}

Expand Down Expand Up @@ -1642,7 +1651,10 @@ service AuthService {
// CreateSessionTracker creates a new session tracker resource.
rpc CreateSessionTracker(CreateSessionTrackerRequest) returns (types.SessionTrackerV1);

// GetSessionTrackerRequest fetches a session tracker resource.
// UpsertSessionTracker upserts a session tracker resource.
rpc UpsertSessionTracker(types.SessionTrackerV1) returns (google.protobuf.Empty);

// GetSessionTracker fetches a session tracker resource.
rpc GetSessionTracker(GetSessionTrackerRequest) returns (types.SessionTrackerV1);

// GetActiveSessionTrackers returns a list of active sessions.
Expand Down
21 changes: 11 additions & 10 deletions api/types/session_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,16 @@ type SessionTracker interface {
}

func NewSessionTracker(spec SessionTrackerSpecV1) (SessionTracker, error) {
meta := Metadata{
Name: spec.SessionID,
}

session := &SessionTrackerV1{
ResourceHeader: ResourceHeader{
Kind: KindSessionTracker,
Version: V1,
Metadata: meta,
Metadata: Metadata{
Name: spec.SessionID,
},
},
Spec: spec,
}

if err := session.Metadata.CheckAndSetDefaults(); err != nil {
if err := session.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

Expand Down Expand Up @@ -176,10 +172,15 @@ func (s *SessionTrackerV1) SetSubKind(sk string) {
s.SubKind = sk
}

// CheckAndSetDefaults sets defaults for the session resource.
func (s *SessionTrackerV1) CheckAndSetDefaults() error {
// setStaticFields sets static resource header and metadata fields.
func (s *SessionTrackerV1) setStaticFields() {
s.Kind = KindSessionTracker
s.Version = V1
}

// CheckAndSetDefaults sets defaults for the session resource.
func (s *SessionTrackerV1) CheckAndSetDefaults() error {
s.setStaticFields()

if err := s.Metadata.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
Expand Down
15 changes: 11 additions & 4 deletions lib/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,6 @@ const (
// per stream
ConcurrentUploadsPerStream = 1

// UploadGracePeriod is a period after which non-completed
// upload is considered abandoned and will be completed by the reconciler
UploadGracePeriod = 24 * time.Hour

// InactivityFlushPeriod is a period of inactivity
// that triggers upload of the data - flush.
InactivityFlushPeriod = 5 * time.Minute
Expand All @@ -323,6 +319,17 @@ const (
// DefaultRedisUsername is a default username used by Redis when
// no name is provided at connection time.
DefaultRedisUsername = "default"

// SessionTrackerTTL defines the default base ttl of a session tracker.
SessionTrackerTTL = time.Hour

// SessionTrackerExpirationUpdateInterval is the default interval on which an active
// session's expiration will be extended.
SessionTrackerExpirationUpdateInterval = SessionTrackerTTL / 6

// AbandonedUploadPollingRate defines how often to check for
// abandoned uploads which need to be completed.
AbandonedUploadPollingRate = SessionTrackerTTL / 6
)

var (
Expand Down
5 changes: 1 addition & 4 deletions lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,11 @@ type StreamUpload struct {
ID string
// SessionID is a session ID of the upload
SessionID session.ID
// Initiated contains the timestamp of when the upload
// was initiated, not always initialized
Initiated time.Time
}

// String returns user friendly representation of the upload
func (u StreamUpload) String() string {
return fmt.Sprintf("Upload(session=%v, id=%v, initiated=%v)", u.SessionID, u.ID, u.Initiated)
return fmt.Sprintf("Upload(session=%v, id=%v)", u.SessionID, u.ID)
}

// CheckAndSetDefaults checks and sets default values
Expand Down
37 changes: 23 additions & 14 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"

Expand All @@ -41,9 +42,9 @@ type UploadCompleterConfig struct {
AuditLog IAuditLog
// Uploader allows the completer to list and complete uploads
Uploader MultipartUploader
// GracePeriod is the period after which uploads are considered
// abandoned and will be completed
GracePeriod time.Duration
// SessionTracker is used to discover the current state of a
// sesssions with active uploads.
SessionTracker services.SessionTrackerService
// Component is a component used in logging
Component string
// CheckPeriod is a period for checking the upload
Expand All @@ -60,8 +61,8 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
if cfg.Uploader == nil {
return trace.BadParameter("missing parameter Uploader")
}
if cfg.GracePeriod == 0 {
cfg.GracePeriod = defaults.UploadGracePeriod
if cfg.SessionTracker == nil {
return trace.BadParameter("missing parameter SessionTracker")
}
if cfg.Component == "" {
cfg.Component = teleport.ComponentAuth
Expand Down Expand Up @@ -125,19 +126,30 @@ func (u *UploadCompleter) run() {
}
}

// CheckUploads fetches uploads, checks if any uploads exceed grace period
// and completes unfinished uploads
// CheckUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
uploads, err := u.cfg.Uploader.ListUploads(ctx)
if err != nil {
return trace.Wrap(err)
}

completed := 0
defer func() {
if completed > 0 {
u.log.Debugf("Found %v active uploads, completed %v.", len(uploads), completed)
}
}()

for _, upload := range uploads {
gracePoint := upload.Initiated.Add(u.cfg.GracePeriod)
if !gracePoint.Before(u.cfg.Clock.Now()) {
return nil
// Check for an active session tracker for the session upload.
_, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String())
if err == nil {
// session appears to be active, don't complete the upload.
continue
} else if !trace.IsNotFound(err) {
return trace.Wrap(err)
}

parts, err := u.cfg.Uploader.ListParts(ctx, upload)
if err != nil {
if trace.IsNotFound(err) {
Expand All @@ -147,7 +159,7 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
return trace.Wrap(err)
}

u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload.ID)
u.log.Debugf("Upload %v was abandoned, trying to complete.", upload.ID)
if err := u.cfg.Uploader.CompleteUpload(ctx, upload, parts); err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -192,9 +204,6 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
return trace.Wrap(err)
}
}
if completed > 0 {
u.log.Debugf("Found %v active uploads, completed %v.", len(uploads), completed)
}
return nil
}

Expand Down
99 changes: 59 additions & 40 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,62 @@ import (
"testing"
"time"

"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
)

// TestUploadCompleterCompletesAbandonedUploads verifies that the upload completer
// completes uploads that don't have an associated session tracker.
func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &mockAuditLog{}

sessionID := session.NewID()
expires := clock.Now().Add(time.Hour * 1)
sessionTracker := &types.SessionTrackerV1{
Spec: types.SessionTrackerSpecV1{
SessionID: string(sessionID),
},
ResourceHeader: types.ResourceHeader{
Metadata: types.Metadata{
Expires: &expires,
},
},
}

sessionTrackerService := &eventstest.MockSessionTrackerService{
Clock: clock,
MockTrackers: []types.SessionTracker{sessionTracker},
}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

clock.Advance(1 * time.Hour)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
}

// TestUploadCompleterEmitsSessionEnd verifies that the upload completer
// emits session.end or windows.desktop.session.end events for sessions
// that are completed.
Expand All @@ -44,25 +94,22 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
mu := NewMemoryUploader()
mu.Clock = clock

log := &MockAuditLog{
log := &mockAuditLog{
sessionEvents: []apievents.AuditEvent{test.startEvent},
}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
Uploader: mu,
AuditLog: log,
GracePeriod: 2 * time.Hour,
Clock: clock,
Unstarted: true,
Uploader: mu,
AuditLog: log,
Clock: clock,
SessionTracker: &eventstest.MockSessionTrackerService{},
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), session.NewID())
require.NoError(t, err)

// advance the clock to force the grace period check to succeed
clock.Advance(3 * time.Hour)

// session end events are only emitted if there's at least one
// part to be uploaded, so create that here
_, err = mu.UploadPart(context.Background(), *upload, 0, strings.NewReader("part"))
Expand All @@ -86,42 +133,14 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
}
}

// TestUploadCompleterCompletesEmptyUploads verifies that the upload completer
// completes uploads that have no parts. This ensures that we don't leave empty
// directories behind.
func TestUploadCompleterCompletesEmptyUploads(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &MockAuditLog{}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
Uploader: mu,
AuditLog: log,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), session.NewID())
require.NoError(t, err)
clock.Advance(3 * time.Hour)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)

require.True(t, mu.uploads[upload.ID].completed)
}

type MockAuditLog struct {
type mockAuditLog struct {
DiscardAuditLog

emitter MockEmitter
sessionEvents []apievents.AuditEvent
}

func (m *MockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (m *mockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
errors := make(chan error, 1)
events := make(chan apievents.AuditEvent)

Expand All @@ -140,6 +159,6 @@ func (m *MockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID,
return events, errors
}

func (m *MockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
func (m *mockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return m.emitter.EmitAuditEvent(ctx, event)
}
10 changes: 6 additions & 4 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"

Expand Down Expand Up @@ -92,7 +93,7 @@ func (cfg *UploaderConfig) CheckAndSetDefaults() error {
}

// NewUploader creates new disk based session logger
func NewUploader(cfg UploaderConfig) (*Uploader, error) {
func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerService) (*Uploader, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -105,9 +106,10 @@ func NewUploader(cfg UploaderConfig) (*Uploader, error) {
// completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them
uploadCompleter, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: handler,
AuditLog: cfg.AuditLog,
Unstarted: true,
Uploader: handler,
AuditLog: cfg.AuditLog,
Unstarted: true,
SessionTracker: sessionTracker,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
Loading