From 5a740e60bf52d69853813f87b4b2d498d68c6dd1 Mon Sep 17 00:00:00 2001 From: Daniel Dao Date: Thu, 29 Jun 2017 23:37:13 +0100 Subject: [PATCH] add a console implemetation based on epoll This adds a console implementation for linux based on epoll and its epoll manager (EpollConsole and Epoller). We need this in cases the slave side repeatedly open then closes the slave console when doing I/O. For more information, refers to: - https://github.com/systemd/systemd/pull/4262 - https://github.com/moby/moby/issues/27202 - https://github.com/opencontainers/runc/pull/1446 Signed-off-by: Daniel Dao --- console.go | 2 + console_linux.go | 255 +++++++++++++++++++++++++++++++++++++++++++++++ console_test.go | 61 ++++++++++++ console_unix.go | 4 + 4 files changed, 322 insertions(+) create mode 100644 console_linux.go diff --git a/console.go b/console.go index 08d255d..c702a60 100644 --- a/console.go +++ b/console.go @@ -26,6 +26,8 @@ type Console interface { Reset() error // Size returns the window size of the console Size() (WinSize, error) + // Fd returns the console's file descriptor + Fd() uintptr } // WinSize specifies the window size of the console diff --git a/console_linux.go b/console_linux.go new file mode 100644 index 0000000..7eada59 --- /dev/null +++ b/console_linux.go @@ -0,0 +1,255 @@ +// +build linux +package console + +import ( + "io" + "os" + "sync" + + "golang.org/x/sys/unix" +) + +const ( + maxEvents = 128 +) + +// Epoller manages multiple epoll consoles using edge-triggered epoll api so we +// dont have to deal with repeated wake-up of EPOLLER or EPOLLHUP. +// For more details, see: +// - https://github.com/systemd/systemd/pull/4262 +// - https://github.com/moby/moby/issues/27202 +// +// Example usage of Epoller and EpollConsole can be as follow: +// +// epoller, _ := NewEpoller() +// epollConsole, _ := epoller.Add(console) +// go epoller.Wait() +// var ( +// b bytes.Buffer +// wg sync.WaitGroup +// ) +// wg.Add(1) +// go func() { +// io.Copy(&b, epollConsole) +// wg.Done() +// }() +// // perform I/O on the console +// epollConsole.Shutdown(epoller.CloseConsole) +// wg.Wait() +// epollConsole.Close() +type Epoller struct { + efd int + mu sync.Mutex + fdMapping map[int]*EpollConsole +} + +// NewEpoller returns an instance of epoller with a valid epoll fd. +func NewEpoller() (*Epoller, error) { + efd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) + if err != nil { + return nil, err + } + return &Epoller{ + efd: efd, + fdMapping: make(map[int]*EpollConsole), + }, nil +} + +// Add creates a epoll console based on the provided console. The console will +// be registered with EPOLLET (i.e. using edge-triggered notification) and its +// file descriptor will be set to non-blocking mode. After this, user should use +// the return console to perform I/O. +func (e *Epoller) Add(console Console) (*EpollConsole, error) { + sysfd := int(console.Fd()) + // Set sysfd to non-blocking mode + if err := unix.SetNonblock(sysfd, true); err != nil { + return nil, err + } + + ev := unix.EpollEvent{ + Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLRDHUP | unix.EPOLLET, + Fd: int32(sysfd), + } + if err := unix.EpollCtl(e.efd, unix.EPOLL_CTL_ADD, sysfd, &ev); err != nil { + return nil, err + } + ef := &EpollConsole{ + Console: console, + sysfd: sysfd, + readc: sync.NewCond(&sync.Mutex{}), + writec: sync.NewCond(&sync.Mutex{}), + } + e.mu.Lock() + e.fdMapping[sysfd] = ef + e.mu.Unlock() + return ef, nil +} + +// Wait starts the loop to wait for its consoles' notifications and signal +// appropriate console that it can perform I/O. +func (e *Epoller) Wait() error { + events := make([]unix.EpollEvent, maxEvents) + for { + n, err := unix.EpollWait(e.efd, events, -1) + if err != nil { + // EINTR: The call was interrupted by a signal handler before either + // any of the requested events occurred or the timeout expired + if err == unix.EINTR { + continue + } + return err + } + for i := 0; i < n; i++ { + ev := &events[i] + // the console is ready to be read from + if ev.Events&(unix.EPOLLIN|unix.EPOLLHUP|unix.EPOLLERR) != 0 { + if epfile := e.getConsole(int(ev.Fd)); epfile != nil { + epfile.signalRead() + } + } + // the console is ready to be written to + if ev.Events&(unix.EPOLLOUT|unix.EPOLLHUP|unix.EPOLLERR) != 0 { + if epfile := e.getConsole(int(ev.Fd)); epfile != nil { + epfile.signalWrite() + } + } + } + } +} + +// Close unregister the console's file descriptor from epoll interface +func (e *Epoller) CloseConsole(fd int) error { + e.mu.Lock() + defer e.mu.Unlock() + delete(e.fdMapping, fd) + return unix.EpollCtl(e.efd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{}) +} + +func (e *Epoller) getConsole(sysfd int) *EpollConsole { + e.mu.Lock() + f := e.fdMapping[sysfd] + e.mu.Unlock() + return f +} + +// EpollConsole acts like a console but register its file descriptor with a +// epoll fd and uses epoll API to perform I/O. +type EpollConsole struct { + Console + sysfd int + readc *sync.Cond + writec *sync.Cond + + mu sync.Mutex + closed bool +} + +// Read reads up to len(p) bytes into p. It returns the number of bytes read +// (0 <= n <= len(p)) and any error encountered. +// +// If the console's read returns EAGAIN or EIO, we assumes that its a +// temporary error because the other side went away and wait for the signal +// generated by epoll event to continue. +func (ec *EpollConsole) Read(p []byte) (n int, err error) { + var read int + ec.readc.L.Lock() + defer ec.readc.L.Unlock() + for { + read, err = ec.Console.Read(p[n:]) + n += read + if err != nil { + var hangup bool + if perr, ok := err.(*os.PathError); ok { + hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO) + } else { + hangup = (err == unix.EAGAIN || err == unix.EIO) + } + // if the other end disappear, assume this is temporary and wait for the + // signal to continue again. Unless we didnt read anything and the + // console is already marked as closed then we should exit + if hangup && !(n == 0 && len(p) > 0 && ec.closed) { + ec.readc.Wait() + continue + } + } + break + } + // if we didnt read anything then return io.EOF to end gracefully + if n == 0 && len(p) > 0 && err == nil { + err = io.EOF + } + // signal for others that we finished the read + ec.readc.Signal() + return n, err +} + +// Writes len(p) bytes from p to the console. It returns the number of bytes +// written from p (0 <= n <= len(p)) and any error encountered that caused +// the write to stop early. +// +// If writes to the console returns EAGAIN or EIO, we assumes that its a +// temporary error because the other side went away and wait for the signal +// generated by epoll event to continue. +func (ec *EpollConsole) Write(p []byte) (n int, err error) { + var written int + ec.writec.L.Lock() + defer ec.writec.L.Unlock() + for { + written, err = ec.Console.Write(p[n:]) + n += written + if err != nil { + var hangup bool + if perr, ok := err.(*os.PathError); ok { + hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO) + } else { + hangup = (err == unix.EAGAIN || err == unix.EIO) + } + // if the other end disappear, assume this is temporary and wait for the + // signal to continue again. + if hangup { + ec.writec.Wait() + continue + } + } + // unrecoverable error, break the loop and return the error + break + } + if n < len(p) && err == nil { + err = io.ErrShortWrite + } + // signal for others that we finished the write + ec.writec.Signal() + return n, err +} + +// Close closed the file descriptor and signal call waiters for this fd. +// It accepts a callback which will be called with the console's fd. The +// callback typically will be used to do further cleanup such as unregister the +// console's fd from the epoll interface. +// User should call Shutdown and wait for all I/O operation to be finished +// before closing the console. +func (ec *EpollConsole) Shutdown(close func(int) error) error { + ec.mu.Lock() + defer ec.mu.Unlock() + + ec.readc.Broadcast() + ec.writec.Broadcast() + ec.closed = true + return close(ec.sysfd) +} + +// signalRead signals that the console is readable. It doesnt do anything if +// the console is closed +func (ec *EpollConsole) signalRead() { + ec.mu.Lock() + defer ec.mu.Unlock() + ec.readc.Signal() +} + +// signalWrite signals that the console is writable. It doesnt do anything if +// the console is closed +func (ec *EpollConsole) signalWrite() { + ec.mu.Lock() + defer ec.mu.Unlock() + ec.writec.Signal() +} diff --git a/console_test.go b/console_test.go index ddcd92b..65e92b4 100644 --- a/console_test.go +++ b/console_test.go @@ -82,3 +82,64 @@ func TestConsolePty(t *testing.T) { t.Errorf("unexpected output %q", out) } } + +func TestEpollConsole(t *testing.T) { + console, slavePath, err := NewPty() + if err != nil { + t.Fatal(err) + } + defer console.Close() + + slave, err := os.OpenFile(slavePath, os.O_RDWR, 0) + if err != nil { + t.Fatal(err) + } + defer slave.Close() + + iteration := 10 + + cmd := exec.Command("sh", "-c", fmt.Sprintf("for x in `seq 1 %d`; do echo -n test; done", iteration)) + cmd.Stdin = slave + cmd.Stdout = slave + cmd.Stderr = slave + + epoller, err := NewEpoller() + if err != nil { + t.Fatal(err) + } + epollConsole, err := epoller.Add(console) + if err != nil { + t.Fatal(err) + } + go epoller.Wait() + + var ( + b bytes.Buffer + wg sync.WaitGroup + ) + wg.Add(1) + go func() { + io.Copy(&b, epollConsole) + wg.Done() + }() + + if err := cmd.Run(); err != nil { + t.Fatal(err) + } + slave.Close() + if err := epollConsole.Shutdown(epoller.CloseConsole); err != nil { + t.Fatal(err) + } + wg.Wait() + if err := epollConsole.Close(); err != nil { + t.Fatal(err) + } + + expectedOutput := "" + for i := 0; i < iteration; i++ { + expectedOutput += "test" + } + if out := b.String(); out != expectedOutput { + t.Errorf("unexpected output %q", out) + } +} diff --git a/console_unix.go b/console_unix.go index b3b268d..baf390b 100644 --- a/console_unix.go +++ b/console_unix.go @@ -114,6 +114,10 @@ func (m *master) Size() (WinSize, error) { return ws, nil } +func (m *master) Fd() uintptr { + return m.f.Fd() +} + // checkConsole checks if the provided file is a console func checkConsole(f *os.File) error { var termios unix.Termios