Skip to content

Commit

Permalink
fix: log streaming resource leaks (#2026)
Browse files Browse the repository at this point in the history
## Description:
This PR fixes resources leaks during reading and streaming of service
log files. Two goroutines (one in the central logs and one in the
websocket processing) were not being closed after the streaming
connection would be closed. Also the file reader used to taif -f the log
file wasn't being close, leading the multiple processes reading the same
file piling-up when the reconnections happened.

## Is this change user facing?
NO
  • Loading branch information
lostbean authored Jan 8, 2024
1 parent fa08707 commit 7f8db9b
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strconv"
"strings"
"time"

"github.com/hpcloud/tail"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
Expand All @@ -16,11 +22,6 @@ import (
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"io"
"os"
"strconv"
"strings"
"time"
)

const (
Expand Down Expand Up @@ -99,7 +100,7 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(

if shouldFollowLogs {
latestLogFile := paths[len(paths)-1]
if err := strategy.followLogs(latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err := strategy.followLogs(ctx, latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred creating following logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid)
return
}
Expand Down Expand Up @@ -353,6 +354,7 @@ func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logl

// Continue streaming log lines as they are written to log file (tail -f [filepath])
func (strategy *PerWeekStreamLogsStrategy) followLogs(
ctx context.Context,
filepath string,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
Expand All @@ -374,22 +376,33 @@ func (strategy *PerWeekStreamLogsStrategy) followLogs(
if err != nil {
return stacktrace.Propagate(err, "An error occurred while attempting to tail the log file.")
}

for logLine := range logTail.Lines {
if logLine.Err != nil {
return stacktrace.Propagate(logLine.Err, "hpcloud/tail encountered an error with the following log line: %v", logLine.Text)
}
jsonLog, err := convertStringToJson(logLine.Text)
if err != nil {
// if tail package fails to parse a valid new line, fail fast
return stacktrace.NewError("hpcloud/tail returned the following line: '%v' that was not valid json.\nThis is potentially a bug in tailing package.", logLine.Text)
defer func() {
if err := logTail.Stop(); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{filepath: filepath}).Error("Failed to stop reading log file")
}
err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return stacktrace.Propagate(err, "An error occurred sending json log line '%v'.", logLine.Text)
logTail.Cleanup()
}()

for {
select {
case <-ctx.Done():
logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid)
return nil
case logLine := <-logTail.Lines:
if logLine.Err != nil {
return stacktrace.Propagate(logLine.Err, "hpcloud/tail encountered an error with the following log line: %v", logLine.Text)
}
jsonLog, err := convertStringToJson(logLine.Text)
if err != nil {
// if tail package fails to parse a valid new line, fail fast
return stacktrace.NewError("hpcloud/tail returned the following line: '%v' that was not valid json.\nThis is potentially a bug in tailing package.", logLine.Text)
}
err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return stacktrace.Propagate(err, "An error occurred sending json log line '%v'.", logLine.Text)
}
}
}
return nil
}

func convertStringToJson(line string) (JsonLog, error) {
Expand Down
5 changes: 3 additions & 2 deletions engine/server/engine/server/engine_rest_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ type EngineRuntime struct {
// Delete Enclaves
// (DELETE /enclaves)
func (engine EngineRuntime) DeleteEnclaves(ctx context.Context, request api.DeleteEnclavesRequestObject) (api.DeleteEnclavesResponseObject, error) {
removedEnclaveUuidsAndNames, err := engine.EnclaveManager.Clean(ctx, *request.Params.RemoveAll)
removeAll := utils.DerefWith(request.Params.RemoveAll, false)
removedEnclaveUuidsAndNames, err := engine.EnclaveManager.Clean(ctx, removeAll)
if err != nil {
response := internalErrorResponseInfof(err, "An error occurred while cleaning enclaves")
return api.DeleteEnclavesdefaultJSONResponse{
Body: response,
StatusCode: int(response.Code),
}, nil
}
if *request.Params.RemoveAll {
if removeAll {
if err = engine.LogFileManager.RemoveAllLogs(); err != nil {
response := internalErrorResponseInfof(err, "An error occurred removing all logs")
return api.DeleteEnclavesdefaultJSONResponse{
Expand Down
5 changes: 3 additions & 2 deletions engine/server/engine/server/websocket_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func streamStarlarkLogsWithWebsocket[T any](ctx echo.Context, cors cors.Cors, st
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"streamerUUID": streamerUUID,
}).Error("Failed to stream all data")
}).Warn("Failed to stream all data")
}
}

Expand Down Expand Up @@ -263,6 +263,7 @@ func streamServiceLogsWithWebsocket(ctx echo.Context, cors cors.Cors, streamer s
return
}
defer wsPump.Close()
wsPump.OnClose(func() { streamer.Close() })

err = streamer.Consume(func(logline *api_type.ServiceLogs) error {
return wsPump.PumpMessage(logline)
Expand All @@ -271,7 +272,7 @@ func streamServiceLogsWithWebsocket(ctx echo.Context, cors cors.Cors, streamer s
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"services": streamer.GetRequestedServiceUuids(),
}).Error("Failed to stream all data")
}).Warn("Failed to stream all data")
}
}

Expand Down
75 changes: 52 additions & 23 deletions engine/server/engine/streaming/websocket_pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type WebsocketPump[T interface{}] struct {
cancelFunc context.CancelFunc
closed bool
connectionError *error
onCloseCallback func()
}

func NewWebsocketPump[T interface{}](ctx echo.Context, cors cors.Cors) (*WebsocketPump[T], error) {
Expand All @@ -52,12 +53,13 @@ func NewWebsocketPump[T interface{}](ctx echo.Context, cors cors.Cors) (*Websock
ctxWithCancel, cancelFunc := context.WithCancel(context.Background())

pump := &WebsocketPump[T]{
websocket: conn,
inputChan: make(chan *T),
infoChan: make(chan *api_type.ResponseInfo),
ctx: ctxWithCancel,
cancelFunc: cancelFunc,
closed: false,
websocket: conn,
inputChan: make(chan *T),
infoChan: make(chan *api_type.ResponseInfo),
ctx: ctxWithCancel,
cancelFunc: cancelFunc,
closed: false,
onCloseCallback: func() {},
}

go pump.startPumping()
Expand All @@ -77,6 +79,7 @@ func (pump *WebsocketPump[T]) readLoop() {
func (pump *WebsocketPump[T]) startPumping() {
ticker := time.NewTicker(pingPeriod)
defer func() {
pump.onCloseCallback()
ticker.Stop()
pump.websocket.Close()
close(pump.inputChan)
Expand Down Expand Up @@ -112,44 +115,45 @@ func (pump *WebsocketPump[T]) startPumping() {
// we also need a dummy reader loop.
go pump.readLoop()

WRITE_LOOP:
for {
select {
case <-ticker.C:
if err := pump.websocket.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
logrus.Debug("Websocket connection did not meet the write deadline")
logrus.Error("Websocket connection did not meet the write deadline")
pump.connectionError = &err
return
break WRITE_LOOP
}
if err := pump.websocket.WriteMessage(websocket.PingMessage, nil); err != nil {
logrus.Debug("Websocket connection is likely closed, exiting keep alive process")
logrus.Error("Websocket connection is likely closed, exiting keep alive process")
pump.connectionError = &err
return
break WRITE_LOOP
}
case msg := <-pump.inputChan:
if err := pump.websocket.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
logrus.Debug("Websocket connection did not meet the write deadline")
logrus.Error("Websocket connection did not meet the write deadline")
pump.connectionError = &err
return
break WRITE_LOOP
}
if err := pump.websocket.WriteJSON(msg); err != nil {
logrus.WithError(stacktrace.Propagate(err, "Failed to send value of type `%T` via websocket", msg)).Errorf("Failed to write message to websocket, closing it.")
logrus.WithError(err).Warnf("Failed to send value of type `%T` via websocket", msg)
pump.connectionError = &err
return
break WRITE_LOOP
}
case msg := <-pump.infoChan:
if err := pump.websocket.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
logrus.Debug("Websocket connection did not meet the write deadline")
logrus.Error("Websocket connection did not meet the write deadline")
pump.connectionError = &err
return
break WRITE_LOOP
}
if err := pump.websocket.WriteJSON(msg); err != nil {
logrus.WithError(stacktrace.Propagate(err, "Failed to send value of type `%T` via websocket", msg)).Errorf("Failed to write message to websocket, closing it.")
logrus.WithError(err).Warnf("Failed to send value of type `%T` via websocket", msg)
pump.connectionError = &err
return
break WRITE_LOOP
}
case <-pump.ctx.Done():
logrus.Debug("Websocket pump has been asked to close, closing it.")
return
break WRITE_LOOP
}
}
}
Expand All @@ -158,30 +162,55 @@ func (pump *WebsocketPump[T]) PumpResponseInfo(msg *api_type.ResponseInfo) error
if pump.closed {
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return nil
}

select {
case _, ok := <-pump.infoChan:
if !ok {
logrus.Debug("Worker channel closed, cannot send message")
}
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return stacktrace.NewError("Websocket has been closed due connection error")
case pump.infoChan <- msg:
return nil
}
pump.infoChan <- msg
return nil
}

func (pump *WebsocketPump[T]) PumpMessage(msg *T) error {
if pump.closed {
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return nil
}

select {
case _, ok := <-pump.inputChan:
if !ok {
logrus.Debug("Worker channel closed, cannot send message")
}
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return stacktrace.NewError("Websocket has been closed due connection error")
case pump.inputChan <- msg:
return nil
}
pump.inputChan <- msg
return nil

}

func (pump *WebsocketPump[T]) Close() {
pump.cancelFunc()
}

func (pump *WebsocketPump[T]) OnClose(callback func()) {
pump.onCloseCallback = callback
}

func (pump *WebsocketPump[T]) IsClosed() (bool, *error) {
return pump.closed, pump.connectionError
}

0 comments on commit 7f8db9b

Please sign in to comment.