Skip to content

Commit

Permalink
add a console implemetation based on epoll
Browse files Browse the repository at this point in the history
This adds a console implementation for linux based on epoll and its
epoll manager (EpollConsole and Epoller). We need this in cases the
slave side repeatedly open then closes the slave console when doing I/O.

For more information, refers to:
- systemd/systemd#4262
- moby/moby#27202
- opencontainers/runc#1446

Signed-off-by: Daniel Dao <[email protected]>
  • Loading branch information
dqminh committed Jun 30, 2017
1 parent f6ecb30 commit 5a740e6
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 0 deletions.
2 changes: 2 additions & 0 deletions console.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Console interface {
Reset() error
// Size returns the window size of the console
Size() (WinSize, error)
// Fd returns the console's file descriptor
Fd() uintptr
}

// WinSize specifies the window size of the console
Expand Down
255 changes: 255 additions & 0 deletions console_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// +build linux
package console

import (
"io"
"os"
"sync"

"golang.org/x/sys/unix"
)

const (
maxEvents = 128
)

// Epoller manages multiple epoll consoles using edge-triggered epoll api so we
// dont have to deal with repeated wake-up of EPOLLER or EPOLLHUP.
// For more details, see:
// - https://github.com/systemd/systemd/pull/4262
// - https://github.com/moby/moby/issues/27202
//
// Example usage of Epoller and EpollConsole can be as follow:
//
// epoller, _ := NewEpoller()
// epollConsole, _ := epoller.Add(console)
// go epoller.Wait()
// var (
// b bytes.Buffer
// wg sync.WaitGroup
// )
// wg.Add(1)
// go func() {
// io.Copy(&b, epollConsole)
// wg.Done()
// }()
// // perform I/O on the console
// epollConsole.Shutdown(epoller.CloseConsole)
// wg.Wait()
// epollConsole.Close()
type Epoller struct {
efd int
mu sync.Mutex
fdMapping map[int]*EpollConsole
}

// NewEpoller returns an instance of epoller with a valid epoll fd.
func NewEpoller() (*Epoller, error) {
efd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, err
}
return &Epoller{
efd: efd,
fdMapping: make(map[int]*EpollConsole),
}, nil
}

// Add creates a epoll console based on the provided console. The console will
// be registered with EPOLLET (i.e. using edge-triggered notification) and its
// file descriptor will be set to non-blocking mode. After this, user should use
// the return console to perform I/O.
func (e *Epoller) Add(console Console) (*EpollConsole, error) {
sysfd := int(console.Fd())
// Set sysfd to non-blocking mode
if err := unix.SetNonblock(sysfd, true); err != nil {
return nil, err
}

ev := unix.EpollEvent{
Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLRDHUP | unix.EPOLLET,
Fd: int32(sysfd),
}
if err := unix.EpollCtl(e.efd, unix.EPOLL_CTL_ADD, sysfd, &ev); err != nil {
return nil, err
}
ef := &EpollConsole{
Console: console,
sysfd: sysfd,
readc: sync.NewCond(&sync.Mutex{}),
writec: sync.NewCond(&sync.Mutex{}),
}
e.mu.Lock()
e.fdMapping[sysfd] = ef
e.mu.Unlock()
return ef, nil
}

// Wait starts the loop to wait for its consoles' notifications and signal
// appropriate console that it can perform I/O.
func (e *Epoller) Wait() error {
events := make([]unix.EpollEvent, maxEvents)
for {
n, err := unix.EpollWait(e.efd, events, -1)
if err != nil {
// EINTR: The call was interrupted by a signal handler before either
// any of the requested events occurred or the timeout expired
if err == unix.EINTR {
continue
}
return err
}
for i := 0; i < n; i++ {
ev := &events[i]
// the console is ready to be read from
if ev.Events&(unix.EPOLLIN|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
epfile.signalRead()
}
}
// the console is ready to be written to
if ev.Events&(unix.EPOLLOUT|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
epfile.signalWrite()
}
}
}
}
}

// Close unregister the console's file descriptor from epoll interface
func (e *Epoller) CloseConsole(fd int) error {
e.mu.Lock()
defer e.mu.Unlock()
delete(e.fdMapping, fd)
return unix.EpollCtl(e.efd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{})
}

func (e *Epoller) getConsole(sysfd int) *EpollConsole {
e.mu.Lock()
f := e.fdMapping[sysfd]
e.mu.Unlock()
return f
}

// EpollConsole acts like a console but register its file descriptor with a
// epoll fd and uses epoll API to perform I/O.
type EpollConsole struct {
Console
sysfd int
readc *sync.Cond
writec *sync.Cond

mu sync.Mutex
closed bool
}

// Read reads up to len(p) bytes into p. It returns the number of bytes read
// (0 <= n <= len(p)) and any error encountered.
//
// If the console's read returns EAGAIN or EIO, we assumes that its a
// temporary error because the other side went away and wait for the signal
// generated by epoll event to continue.
func (ec *EpollConsole) Read(p []byte) (n int, err error) {
var read int
ec.readc.L.Lock()
defer ec.readc.L.Unlock()
for {
read, err = ec.Console.Read(p[n:])
n += read
if err != nil {
var hangup bool
if perr, ok := err.(*os.PathError); ok {
hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
} else {
hangup = (err == unix.EAGAIN || err == unix.EIO)
}
// if the other end disappear, assume this is temporary and wait for the
// signal to continue again. Unless we didnt read anything and the
// console is already marked as closed then we should exit
if hangup && !(n == 0 && len(p) > 0 && ec.closed) {
ec.readc.Wait()
continue
}
}
break
}
// if we didnt read anything then return io.EOF to end gracefully
if n == 0 && len(p) > 0 && err == nil {
err = io.EOF
}
// signal for others that we finished the read
ec.readc.Signal()
return n, err
}

// Writes len(p) bytes from p to the console. It returns the number of bytes
// written from p (0 <= n <= len(p)) and any error encountered that caused
// the write to stop early.
//
// If writes to the console returns EAGAIN or EIO, we assumes that its a
// temporary error because the other side went away and wait for the signal
// generated by epoll event to continue.
func (ec *EpollConsole) Write(p []byte) (n int, err error) {
var written int
ec.writec.L.Lock()
defer ec.writec.L.Unlock()
for {
written, err = ec.Console.Write(p[n:])
n += written
if err != nil {
var hangup bool
if perr, ok := err.(*os.PathError); ok {
hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
} else {
hangup = (err == unix.EAGAIN || err == unix.EIO)
}
// if the other end disappear, assume this is temporary and wait for the
// signal to continue again.
if hangup {
ec.writec.Wait()
continue
}
}
// unrecoverable error, break the loop and return the error
break
}
if n < len(p) && err == nil {
err = io.ErrShortWrite
}
// signal for others that we finished the write
ec.writec.Signal()
return n, err
}

// Close closed the file descriptor and signal call waiters for this fd.
// It accepts a callback which will be called with the console's fd. The
// callback typically will be used to do further cleanup such as unregister the
// console's fd from the epoll interface.
// User should call Shutdown and wait for all I/O operation to be finished
// before closing the console.
func (ec *EpollConsole) Shutdown(close func(int) error) error {
ec.mu.Lock()
defer ec.mu.Unlock()

ec.readc.Broadcast()
ec.writec.Broadcast()
ec.closed = true
return close(ec.sysfd)
}

// signalRead signals that the console is readable. It doesnt do anything if
// the console is closed
func (ec *EpollConsole) signalRead() {
ec.mu.Lock()
defer ec.mu.Unlock()
ec.readc.Signal()
}

// signalWrite signals that the console is writable. It doesnt do anything if
// the console is closed
func (ec *EpollConsole) signalWrite() {
ec.mu.Lock()
defer ec.mu.Unlock()
ec.writec.Signal()
}
61 changes: 61 additions & 0 deletions console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,64 @@ func TestConsolePty(t *testing.T) {
t.Errorf("unexpected output %q", out)
}
}

func TestEpollConsole(t *testing.T) {
console, slavePath, err := NewPty()
if err != nil {
t.Fatal(err)
}
defer console.Close()

slave, err := os.OpenFile(slavePath, os.O_RDWR, 0)
if err != nil {
t.Fatal(err)
}
defer slave.Close()

iteration := 10

cmd := exec.Command("sh", "-c", fmt.Sprintf("for x in `seq 1 %d`; do echo -n test; done", iteration))
cmd.Stdin = slave
cmd.Stdout = slave
cmd.Stderr = slave

epoller, err := NewEpoller()
if err != nil {
t.Fatal(err)
}
epollConsole, err := epoller.Add(console)
if err != nil {
t.Fatal(err)
}
go epoller.Wait()

var (
b bytes.Buffer
wg sync.WaitGroup
)
wg.Add(1)
go func() {
io.Copy(&b, epollConsole)
wg.Done()
}()

if err := cmd.Run(); err != nil {
t.Fatal(err)
}
slave.Close()
if err := epollConsole.Shutdown(epoller.CloseConsole); err != nil {
t.Fatal(err)
}
wg.Wait()
if err := epollConsole.Close(); err != nil {
t.Fatal(err)
}

expectedOutput := ""
for i := 0; i < iteration; i++ {
expectedOutput += "test"
}
if out := b.String(); out != expectedOutput {
t.Errorf("unexpected output %q", out)
}
}
4 changes: 4 additions & 0 deletions console_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (m *master) Size() (WinSize, error) {
return ws, nil
}

func (m *master) Fd() uintptr {
return m.f.Fd()
}

// checkConsole checks if the provided file is a console
func checkConsole(f *os.File) error {
var termios unix.Termios
Expand Down

0 comments on commit 5a740e6

Please sign in to comment.