diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index e2fd43ee8f060..a60753e934420 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -445,7 +445,6 @@ func (s *session) launch() error { }() s.log.Debugf("Launching session: %v", s.id) - s.BroadcastMessage("Connecting to %v over K8S", s.podName) q := s.req.URL.Query() request := &remoteCommandRequest{ @@ -463,6 +462,9 @@ func (s *session) launch() error { pingPeriod: s.forwarder.cfg.ConnPingPeriod, } + s.podName = request.podName + s.BroadcastMessage("Connecting to %v over K8S", s.podName) + eventPodMeta := request.eventPodMeta(request.context, s.sess.creds) s.io.OnWriteError = func(idString string, err error) { s.mu.Lock() @@ -541,7 +543,6 @@ func (s *session) launch() error { } }() - s.podName = request.podName err = s.trackerUpdateState(types.SessionState_SessionStateRunning) if err != nil { s.log.Warn("Failed to set tracker state to running") diff --git a/lib/srv/termmanager.go b/lib/srv/termmanager.go index b9bf958ca3c33..33dea82fc944f 100644 --- a/lib/srv/termmanager.go +++ b/lib/srv/termmanager.go @@ -23,7 +23,11 @@ import ( log "github.com/sirupsen/logrus" ) -const maxHistory = 1000 +// maxHistoryBytes is the maximum bytes that are retained as history and broadcasted to new clients. +const maxHistoryBytes = 1000 + +// maxPausedHistoryBytes is maximum bytes that are buffered when a session is paused. +const maxPausedHistoryBytes = 10000 // TermManager handles the streams of terminal-like sessions. // It performs a number of tasks including: @@ -70,18 +74,7 @@ func NewTermManager() *TermManager { func (g *TermManager) writeToClients(p []byte) int { g.lastWasBroadcast = false - truncateFront := func(slice []byte, max int) []byte { - if len(slice) > max { - return slice[len(slice)-max:] - } - - return slice - } - - g.history = append(g.history, truncateFront(p, maxHistory)...) - if len(g.history) > maxHistory { - g.history = g.history[:maxHistory] - } + g.history = truncateFront(append(g.history, p...), maxHistoryBytes) atomic.AddUint64(&g.countWritten, uint64(len(p))) for key, w := range g.writers { @@ -113,7 +106,9 @@ func (g *TermManager) Write(p []byte) (int, error) { if g.on { g.writeToClients(p) } else { - g.buffer = append(g.buffer, p...) + // Only keep the last maxPausedHistoryBytes of stdout/stderr while the session is paused. + // The alternative is flushing to disk but this should be a pretty rare occurrence and shouldn't be an issue in practice. + g.buffer = truncateFront(append(g.buffer, p...), maxPausedHistoryBytes) } return len(p), nil @@ -282,7 +277,15 @@ func (g *TermManager) Close() { func (g *TermManager) GetRecentHistory() []byte { g.mu.Lock() defer g.mu.Unlock() - data := make([]byte, len(g.history)) + data := make([]byte, 0, len(g.history)) data = append(data, g.history...) return data } + +func truncateFront(slice []byte, max int) []byte { + if len(slice) > max { + return slice[len(slice)-max:] + } + + return slice +} diff --git a/lib/srv/termmanager_test.go b/lib/srv/termmanager_test.go index feb5667bab47c..d1cdb1c2e2b10 100644 --- a/lib/srv/termmanager_test.go +++ b/lib/srv/termmanager_test.go @@ -17,6 +17,7 @@ limitations under the License. package srv import ( + "crypto/rand" "io" "testing" "time" @@ -48,3 +49,40 @@ func TestCTRLCCapture(t *testing.T) { t.Fatal("terminateNotifier should've seen an event") } } + +func TestHistoryKept(t *testing.T) { + m := NewTermManager() + m.On() + + data := make([]byte, 10000) + n, err := rand.Read(data) + require.NoError(t, err) + require.Equal(t, len(data), n) + + n, err = m.Write(data[:len(data)/2]) + require.NoError(t, err) + require.Equal(t, len(data)/2, n) + + n, err = m.Write(data[len(data)/2:]) + require.NoError(t, err) + require.Equal(t, len(data)/2, n) + + kept := data[len(data)-maxHistoryBytes:] + require.Equal(t, m.GetRecentHistory(), kept) +} + +func TestBufferedKept(t *testing.T) { + m := NewTermManager() + + data := make([]byte, 20000) + n, err := rand.Read(data) + require.NoError(t, err) + require.Equal(t, len(data), n) + + n, err = m.Write(data) + require.NoError(t, err) + require.Equal(t, len(data), n) + + kept := data[len(data)-maxPausedHistoryBytes:] + require.Equal(t, m.buffer, kept) +} diff --git a/tool/tsh/kube.go b/tool/tsh/kube.go index 73eb9d250179a..59d0a583dc70c 100644 --- a/tool/tsh/kube.go +++ b/tool/tsh/kube.go @@ -23,6 +23,7 @@ import ( "net" "net/url" "os" + "sort" "strconv" "strings" "time" @@ -489,6 +490,10 @@ func (c *kubeSessionsCommand) run(cf *CLIConf) error { } } + sort.Slice(filteredSessions, func(i, j int) bool { + return filteredSessions[i].GetCreated().Before(filteredSessions[j].GetCreated()) + }) + printSessions(filteredSessions) return nil }