Skip to content

Commit

Permalink
runc run/start/exec: fix init log forwarding race
Browse files Browse the repository at this point in the history
Sometimes debug.bats test cases are failing like this:

> not ok 27 global --debug to --log --log-format 'json'
> # (in test file tests/integration/debug.bats, line 77)
> #   `[[ "${output}" == *"child process in init()"* ]]' failed

It happens more when writing to disk.

This issue is caused by the fact that runc spawns log forwarding goroutine
(ForwardLogs) but does not wait for it to finish, resulting in missing
debug lines from nsexec.

ForwardLogs itself, though, never finishes, because it reads from a
reading side of a pipe which writing side is not closed. This is
especially true in case of runc create, which spawns runc init and
exits; meanwhile runc init waits on exec fifo for some undetermined long
time before doing execve.

So, to fix the failure described above, we need to:

 1. Make runc create/run/exec wait for ForwardLogs to finish;

 2. Make runc init close its log pipe file descriptor (i.e.
    the one which value is passed in _LIBCONTAINER_LOGPIPE
    environment variable).

This is exactly what this commit does. In addition, it

 - adds logrus debug to late stages of runc init, and amends the
   integration tests to check for those messages;

 - adds runc --debug exec test cases, similar to those in debug.bats
   but for runc exec eather than runc run;

 - refactors libcotainer/logs unit tests to simplify and update for
   the new ForwardLogs.

PS I have to admit I still do not understand why closing log pipe fd
is required in e.g. (*linuxSetnsInit).Init, right before the execve
which (thanks to CLOEXEC) closes the fd anyway.

Signed-off-by: Kir Kolyshkin <[email protected]>
  • Loading branch information
kolyshkin committed Mar 6, 2021
1 parent 7577f2f commit 3a14578
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 108 deletions.
18 changes: 16 additions & 2 deletions libcontainer/container_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,26 @@ type openResult struct {
err error
}

func (c *linuxContainer) start(process *Process) error {
func (c *linuxContainer) start(process *Process) (retErr error) {
parent, err := c.newParentProcess(process)
if err != nil {
return newSystemErrorWithCause(err, "creating new parent process")
}
parent.forwardChildLogs()

logsDone := parent.forwardChildLogs()
if logsDone != nil {
defer func() {
// Wait for log forwarder to finish. This depends
// on init closing the _LIBCONTAINER_LOGPIPE log fd:
// For standard init, this is done by closeLogFD.
// For setns init, the fd is closed upon execve.
err := <-logsDone
if err != nil && retErr == nil {
retErr = newSystemErrorWithCause(err, "forwarding init logs")
}
}()
}

if err := parent.start(); err != nil {
return newSystemErrorWithCause(err, "starting container process")
}
Expand Down
3 changes: 2 additions & 1 deletion libcontainer/container_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func (m *mockProcess) externalDescriptors() []string {
func (m *mockProcess) setExternalDescriptors(newFds []string) {
}

func (m *mockProcess) forwardChildLogs() {
func (m *mockProcess) forwardChildLogs() chan error {
return nil
}

func TestGetContainerPids(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion libcontainer/factory_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ func (l *LinuxFactory) StartInitialization() (err error) {
defer consoleSocket.Close()
}

logPipeFdStr := os.Getenv("_LIBCONTAINER_LOGPIPE")
logPipeFd, err := strconv.Atoi(logPipeFdStr)
if err != nil {
return fmt.Errorf("unable to convert _LIBCONTAINER_LOGPIPE=%s to int: %s", logPipeFdStr, err)
}

// clear the current process's environment to clean any libcontainer
// specific env vars.
os.Clearenv()
Expand All @@ -387,7 +393,7 @@ func (l *LinuxFactory) StartInitialization() (err error) {
}
}()

i, err := newContainerInit(it, pipe, consoleSocket, fifofd)
i, err := newContainerInit(it, pipe, consoleSocket, fifofd, logPipeFd)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion libcontainer/init_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type initer interface {
Init() error
}

func newContainerInit(t initType, pipe *os.File, consoleSocket *os.File, fifoFd int) (initer, error) {
func newContainerInit(t initType, pipe *os.File, consoleSocket *os.File, fifoFd, logFd int) (initer, error) {
var config *initConfig
if err := json.NewDecoder(pipe).Decode(&config); err != nil {
return nil, err
Expand All @@ -90,6 +90,7 @@ func newContainerInit(t initType, pipe *os.File, consoleSocket *os.File, fifoFd
pipe: pipe,
consoleSocket: consoleSocket,
config: config,
logFd: logFd,
}, nil
case initStandard:
return &linuxStandardInit{
Expand All @@ -98,6 +99,7 @@ func newContainerInit(t initType, pipe *os.File, consoleSocket *os.File, fifoFd
parentPid: unix.Getppid(),
config: config,
fifoFd: fifoFd,
logFd: logFd,
}, nil
}
return nil, fmt.Errorf("unknown init type %q", t)
Expand Down
26 changes: 16 additions & 10 deletions libcontainer/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

var (
configureMutex = sync.Mutex{}
configureMutex sync.Mutex
// loggingConfigured will be set once logging has been configured via invoking `ConfigureLogging`.
// Subsequent invocations of `ConfigureLogging` would be no-op
loggingConfigured = false
Expand All @@ -26,16 +26,22 @@ type Config struct {
LogPipeFd int
}

func ForwardLogs(logPipe io.Reader) {
func ForwardLogs(logPipe io.ReadCloser) chan error {
done := make(chan error, 1)
s := bufio.NewScanner(logPipe)
for s.Scan() {
processEntry(s.Bytes())
}
if err := s.Err(); err != nil {
logrus.Errorf("log pipe read error: %+v", err)
} else {
logrus.Debugf("log pipe closed")
}

go func() {
for s.Scan() {
processEntry(s.Bytes())
}
if err := logPipe.Close(); err != nil {
logrus.Errorf("error closing log source: %v", err)
}
done <- s.Err()
close(done)
}()

return done
}

func processEntry(text []byte) {
Expand Down
175 changes: 90 additions & 85 deletions libcontainer/logs/logs_linux_test.go
Original file line number Diff line number Diff line change
@@ -1,152 +1,120 @@
package logs

import (
"errors"
"bytes"
"io"
"io/ioutil"
"os"
"strings"
"testing"
"time"

"github.com/sirupsen/logrus"
)

func TestLoggingToFile(t *testing.T) {
logW, logFile, _ := runLogForwarding(t)
defer os.Remove(logFile)
defer logW.Close()
l := runLogForwarding(t)

logToLogWriter(t, logW, `{"level": "info","msg":"kitten"}`)

logFileContent := waitForLogContent(t, logFile)
if !strings.Contains(logFileContent, "kitten") {
t.Fatalf("%s does not contain kitten", logFileContent)
}
logToLogWriter(t, l, `{"level": "info","msg":"kitten"}`)
finish(t, l)
check(t, l, "kitten")
}

func TestLogForwardingDoesNotStopOnJsonDecodeErr(t *testing.T) {
logW, logFile, _ := runLogForwarding(t)
defer os.Remove(logFile)
defer logW.Close()

logToLogWriter(t, logW, "invalid-json-with-kitten")
l := runLogForwarding(t)

logFileContent := waitForLogContent(t, logFile)
if !strings.Contains(logFileContent, "failed to decode") {
t.Fatalf("%q does not contain decoding error", logFileContent)
}

truncateLogFile(t, logFile)
logToLogWriter(t, l, "invalid-json-with-kitten")
checkWait(t, l, "failed to decode")

logToLogWriter(t, logW, `{"level": "info","msg":"puppy"}`)
truncateLogFile(t, l.file)

logFileContent = waitForLogContent(t, logFile)
if !strings.Contains(logFileContent, "puppy") {
t.Fatalf("%s does not contain puppy", logFileContent)
}
logToLogWriter(t, l, `{"level": "info","msg":"puppy"}`)
finish(t, l)
check(t, l, "puppy")
}

func TestLogForwardingDoesNotStopOnLogLevelParsingErr(t *testing.T) {
logW, logFile, _ := runLogForwarding(t)
defer os.Remove(logFile)
defer logW.Close()
l := runLogForwarding(t)

logToLogWriter(t, logW, `{"level": "alert","msg":"puppy"}`)
logToLogWriter(t, l, `{"level": "alert","msg":"puppy"}`)
checkWait(t, l, "failed to parse log level")

logFileContent := waitForLogContent(t, logFile)
if !strings.Contains(logFileContent, "failed to parse log level") {
t.Fatalf("%q does not contain log level parsing error", logFileContent)
}

truncateLogFile(t, logFile)

logToLogWriter(t, logW, `{"level": "info","msg":"puppy"}`)
truncateLogFile(t, l.file)

logFileContent = waitForLogContent(t, logFile)
if !strings.Contains(logFileContent, "puppy") {
t.Fatalf("%s does not contain puppy", logFileContent)
}
logToLogWriter(t, l, `{"level": "info","msg":"puppy"}`)
finish(t, l)
check(t, l, "puppy")
}

func TestLogForwardingStopsAfterClosingTheWriter(t *testing.T) {
logW, logFile, doneForwarding := runLogForwarding(t)
defer os.Remove(logFile)
l := runLogForwarding(t)

logToLogWriter(t, logW, `{"level": "info","msg":"sync"}`)
logToLogWriter(t, l, `{"level": "info","msg":"sync"}`)

logFileContent := waitForLogContent(t, logFile)
if !strings.Contains(logFileContent, "sync") {
t.Fatalf("%q does not contain sync message", logFileContent)
}

logW.Close()
// Do not use finish() here as we check done pipe ourselves.
l.w.Close()
select {
case <-doneForwarding:
case <-l.done:
case <-time.After(10 * time.Second):
t.Fatal("log forwarding did not stop after closing the pipe")
}

check(t, l, "sync")
}

func logToLogWriter(t *testing.T, logW *os.File, message string) {
_, err := logW.Write([]byte(message + "\n"))
func logToLogWriter(t *testing.T, l *log, message string) {
t.Helper()
_, err := l.w.Write([]byte(message + "\n"))
if err != nil {
t.Fatalf("failed to write %q to log writer: %v", message, err)
}
}

func runLogForwarding(t *testing.T) (*os.File, string, chan struct{}) {
type log struct {
w io.WriteCloser
file string
done chan error
}

func runLogForwarding(t *testing.T) *log {
t.Helper()
logR, logW, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
logR.Close()
logW.Close()
})

tempFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer tempFile.Close()
logFile := tempFile.Name()
t.Cleanup(func() { os.Remove(logFile) })

logConfig := Config{LogLevel: logrus.InfoLevel, LogFormat: "json", LogFilePath: logFile}
return logW, logFile, startLogForwarding(t, logConfig, logR)
}

func startLogForwarding(t *testing.T, logConfig Config, logR *os.File) chan struct{} {
loggingConfigured = false
if err := ConfigureLogging(logConfig); err != nil {
t.Fatal(err)
}
doneForwarding := make(chan struct{})
go func() {
ForwardLogs(logR)
close(doneForwarding)
}()
return doneForwarding
}

func waitForLogContent(t *testing.T, logFile string) string {
startTime := time.Now()
done := ForwardLogs(logR)

for {
if time.Now().After(startTime.Add(10 * time.Second)) {
t.Fatal(errors.New("No content in log file after 10 seconds"))
break
}
return &log{w: logW, done: done, file: logFile}
}

fileContent, err := ioutil.ReadFile(logFile)
if err != nil {
t.Fatal(err)
}
if len(fileContent) == 0 {
continue
}
return string(fileContent)
func finish(t *testing.T, l *log) {
t.Helper()
l.w.Close()
if err := <-l.done; err != nil {
t.Fatalf("ForwardLogs: %v", err)
}

return ""
}

func truncateLogFile(t *testing.T, logFile string) {
file, err := os.OpenFile(logFile, os.O_RDWR, 0666)
file, err := os.OpenFile(logFile, os.O_RDWR, 0o600)
if err != nil {
t.Fatalf("failed to open log file: %v", err)
return
Expand All @@ -158,3 +126,40 @@ func truncateLogFile(t *testing.T, logFile string) {
t.Fatalf("failed to truncate log file: %v", err)
}
}

// check checks that file contains txt
func check(t *testing.T, l *log, txt string) {
t.Helper()
contents, err := ioutil.ReadFile(l.file)
if err != nil {
t.Fatal(err)
}
if !bytes.Contains(contents, []byte(txt)) {
t.Fatalf("%q does not contain %q", string(contents), txt)
}
}

// checkWait checks that file contains txt. If the file is empty,
// it waits until it's not.
func checkWait(t *testing.T, l *log, txt string) {
t.Helper()
const (
delay = 100 * time.Millisecond
iter = 3
)
for i := 0; ; i++ {
st, err := os.Stat(l.file)
if err != nil {
t.Fatal(err)
}
if st.Size() > 0 {
break
}
if i == iter {
t.Fatalf("waited %s for file %s to be non-empty but it still is", iter*delay, l.file)
}
time.Sleep(delay)
}

check(t, l, txt)
}
Loading

0 comments on commit 3a14578

Please sign in to comment.