From 06fc931c9d08ea5b5b9bb6605a4a44bb31d03079 Mon Sep 17 00:00:00 2001 From: Maksim An Date: Tue, 17 Nov 2020 22:20:06 -0800 Subject: [PATCH] Add support for logging binary Signed-off-by: Maksim An --- .gitignore | 1 + cmd/containerd-shim-runhcs-v1/log.go | 52 +++++ cmd/containerd-shim-runhcs-v1/task_hcs.go | 45 +++- cmd/logging/logging.go | 87 ++++++++ internal/cmd/io_binary.go | 260 ++++++++++++++++++++++ 5 files changed, 441 insertions(+), 4 deletions(-) create mode 100644 cmd/containerd-shim-runhcs-v1/log.go create mode 100644 cmd/logging/logging.go create mode 100644 internal/cmd/io_binary.go diff --git a/.gitignore b/.gitignore index b883f1fdc6..3b877a1458 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.exe +.idea diff --git a/cmd/containerd-shim-runhcs-v1/log.go b/cmd/containerd-shim-runhcs-v1/log.go new file mode 100644 index 0000000000..083715ec73 --- /dev/null +++ b/cmd/containerd-shim-runhcs-v1/log.go @@ -0,0 +1,52 @@ +/* +Example logger, which writes to 2 different files +*/ + +package main + +import ( + "context" + "io" + "os" + "sync" + + "github.com/Microsoft/hcsshim/cmd/logging" +) + +func _main() { + logging.Run(logger) +} + +func logger(_ context.Context, config *logging.Config, ready func() error) error { + var wg sync.WaitGroup + wg.Add(2) + + fileOut, err := os.Create("C:/Users/Administrator/LCOW/container-stdout.txt") + defer fileOut.Close() + if err != nil { + return err + } + + fileErr, err := os.Create("C:/Users/Administrator/LCOW/container-stderr.txt") + defer fileErr.Close() + if err != nil { + return err + } + + go func() { + defer wg.Done() + io.Copy(fileOut, config.Stdout) + }() + + go func() { + defer wg.Done() + io.Copy(fileErr, config.Stderr) + }() + + if err := ready(); err != nil { + return err + } + + wg.Wait() + return nil +} diff --git a/cmd/containerd-shim-runhcs-v1/task_hcs.go b/cmd/containerd-shim-runhcs-v1/task_hcs.go index 825864eeea..1944586d81 100644 --- a/cmd/containerd-shim-runhcs-v1/task_hcs.go +++ b/cmd/containerd-shim-runhcs-v1/task_hcs.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net/url" "os" "path/filepath" "sync" @@ -124,7 +125,18 @@ func newHcsTask( owner := filepath.Base(os.Args[0]) - io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal) + var ( + io cmd.UpstreamIO + ) + + u, err := url.Parse(req.Stdout) + + if u.Scheme != "binary" || err != nil { + io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal) + } else { + io, err = cmd.NewBinaryIO(ctx, req.ID, u) + } + if err != nil { return nil, err } @@ -177,7 +189,8 @@ func newHcsTask( req.Bundle, ht.isWCOW, s.Process, - io) + io, + ) if parent != nil { // We have a parent UVM. Listen for its exit and forcibly close this @@ -284,11 +297,35 @@ func (ht *hcsTask) CreateExec(ctx context.Context, req *task.ExecProcessRequest, return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '' in task: '%s' must be running to create additional execs", ht.id) } - io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal) + var ( + io cmd.UpstreamIO + err error + ) + + u, err := url.Parse(req.Stdout) + + if err != nil || u.Scheme != "binary" { + io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal) + } else { + io, err = cmd.NewBinaryIO(ctx, req.ID, u) + } + if err != nil { return err } - he := newHcsExec(ctx, ht.events, ht.id, ht.host, ht.c, req.ExecID, ht.init.Status().Bundle, ht.isWCOW, spec, io) + he := newHcsExec( + ctx, + ht.events, + ht.id, + ht.host, + ht.c, + req.ExecID, + ht.init.Status().Bundle, + ht.isWCOW, + spec, + io, + ) + ht.execs.Store(req.ExecID, he) // Publish the created event diff --git a/cmd/logging/logging.go b/cmd/logging/logging.go new file mode 100644 index 0000000000..fa5895ec9d --- /dev/null +++ b/cmd/logging/logging.go @@ -0,0 +1,87 @@ +package logging + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "time" + + "github.com/Microsoft/go-winio" +) + +// Config is passed to the binary logging function +type Config struct { + ID string + Namespace string + Stdout io.Reader + Stderr io.Reader +} + +// LoggerFunc is a binary logging function signature +type LoggerFunc func(context.Context, *Config, func() error) error + +// Run runs LoggerFunc +func Run(fn LoggerFunc) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var errCh = make(chan error, 0) + + sout, _ := winio.DialPipeContext(ctx, os.Getenv("CONTAINER_STDOUT")) + serr, _ := winio.DialPipeContext(ctx, os.Getenv("CONTAINER_STDERR")) + wait, _ := winio.DialPipeContext(ctx, os.Getenv("CONTAINER_WAIT")) + + config := &Config{ + ID: os.Getenv("CONTAINER_ID"), + Namespace: os.Getenv("CONTAINER_NAMESPACE"), + Stdout: sout, + Stderr: serr, + } + + // Write to wait pipe + ready := func() error { + wait.Write([]byte("#")) + return wait.Close() + } + + f, _ := os.Create("C:/Users/Administrator/LCOW/binary-results.txt") + defer f.Close() + + w := bufio.NewWriter(f) + w.WriteString("Starting logging goroutine\n") + w.Flush() + + go func() { + if err := fn(ctx, config, ready); err != nil { + w.WriteString("Binary exited with error. sending error via channel\n") + w.Flush() + errCh <- err + return + } + w.WriteString("Binary exited normally.\n") + w.Flush() + errCh <- nil + }() + + w.WriteString("Started logging goroutine\n") + w.Flush() + + for { + select { + case err := <-errCh: + w.WriteString(fmt.Sprintf("Received from error channel: %s\n", err)) + w.Flush() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + os.Exit(0) + default: + w.WriteString("Nothing received, sleeping for 500ms\n") + w.Flush() + time.Sleep(500 * time.Millisecond) + } + } +} diff --git a/internal/cmd/io_binary.go b/internal/cmd/io_binary.go new file mode 100644 index 0000000000..3f33e8cc61 --- /dev/null +++ b/internal/cmd/io_binary.go @@ -0,0 +1,260 @@ +package cmd + +import ( + "context" + "fmt" + "io" + "net" + "net/url" + "os/exec" + "sync" + "time" + + "github.com/Microsoft/go-winio" + "github.com/Microsoft/hcsshim/internal/log" + "github.com/containerd/containerd/namespaces" + "github.com/pkg/errors" +) + +const pipeRootBinary = `\\.\pipe` +const binaryCmdWaitTimeout = 5 * time.Second + +func newBinaryCmd(ctx context.Context, uri *url.URL, id string, ns string) *exec.Cmd { + var args []string + for k, vs := range uri.Query() { + args = append(args, k) + if len(vs) > 0 { + args = append(args, vs[0]) + } + } + + execPath := uri.Path + + cmd := exec.CommandContext(ctx, execPath, args...) + cmd.Env = append(cmd.Env, + "CONTAINER_ID="+id, + "CONTAINER_NAMESPACE="+ns, + ) + + return cmd +} + +type binaryIO struct { + cmd *exec.Cmd + + binaryCloser sync.Once + + stdin, stdout, stderr string + + sout, serr, wait io.ReadWriteCloser + soutCloser sync.Once +} + +// NewBinaryIO starts a binary logger +func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ UpstreamIO, err error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + var ( + sout, serr, w io.ReadWriteCloser = nil, nil, nil + ) + + bio := &binaryIO{} + + stdoutPipe := fmt.Sprintf(`%s\binary-%s-stdout`, pipeRootBinary, id) + sout, err = openNPipe(stdoutPipe) + if err != nil { + return nil, err + } + bio.stdout = stdoutPipe + bio.sout = sout + + stderrPipe := fmt.Sprintf(`%s\binary-%s-stderr`, pipeRootBinary, id) + serr, err = openNPipe(stderrPipe) + if err != nil { + return nil, err + } + bio.stderr = stderrPipe + bio.serr = serr + + waitPipe := fmt.Sprintf(`%s\binary-%s-wait`, pipeRootBinary, id) + w, err = openNPipe(waitPipe) + if err != nil { + return nil, err + } + bio.wait = w + + cmd := newBinaryCmd(ctx, uri, id, ns) + cmd.Env = append(cmd.Env, + "CONTAINER_STDOUT="+stdoutPipe, + "CONTAINER_STDERR="+stderrPipe, + "CONTAINER_WAIT="+waitPipe, + ) + + bio.cmd = cmd + + started := make(chan bool, 1) + if err := cmd.Start(); err != nil { + return nil, err + } + + go func() { + defer w.Close() + t := 0 + for { + b := make([]byte, 1) + v, err := w.Read(b) + if (v == 0) || err != nil && err != io.EOF { + log.G(ctx).Debugf("Failed to read from wait pipe. Sleeping") + time.Sleep(1 * time.Second) + t++ + if t > 10 { + break + } + } else { + log.G(ctx).Debugf("Read from wait pipe. Binary started: %s", b) + started <- true + return + } + } + started <- false + }() + + if !<-started { + return nil, errors.Errorf("Failed to started binary") + } + + return bio, nil +} + +var _ UpstreamIO = &binaryIO{} + +func (b *binaryIO) Close(ctx context.Context) { + b.soutCloser.Do(func() { + if b.sout != nil { + err := b.sout.Close() + if err != nil { + log.G(ctx).WithError(err).Errorf("Error while closing stdout npipe") + } + } + if b.serr != nil { + err := b.serr.Close() + if err != nil { + log.G(ctx).WithError(err).Errorf("Error while closing stderr npipe") + } + } + }) + b.binaryCloser.Do(func() { + // Borrowed this from Ming's PR + log.G(ctx).Debugf("Waiting for binaryIO to exit: %d", b.cmd.Process.Pid) + done := make(chan error, 1) + go func() { + done <- b.cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.G(ctx).WithError(err).Errorf("Error while waiting for cmd to finish") + } else { + log.G(ctx).Debugf("binary_io::b.cmd.Wait() finished normally") + } + + case <-time.After(binaryCmdWaitTimeout): + log.G(ctx).Errorf("Timeout while waiting for binaryIO process to finish") + err := b.cmd.Process.Kill() + if err != nil { + log.G(ctx).WithError(err).Errorf("Error while killing binaryIO process") + } + log.G(ctx).Debugln("BinaryIO process killed") + } + }) +} + +func (b *binaryIO) CloseStdin(ctx context.Context) { + +} + +func (b *binaryIO) Stdin() io.Reader { + return nil +} + +func (b *binaryIO) StdinPath() string { + return "" +} + +func (b *binaryIO) Stdout() io.Writer { + return b.sout +} + +func (b *binaryIO) StdoutPath() string { + return b.stdout +} + +func (b *binaryIO) Stderr() io.Writer { + return b.serr +} + +func (b *binaryIO) StderrPath() string { + return b.stderr +} + +func (b *binaryIO) Terminal() bool { + return false +} + +type pipe struct { + l net.Listener + con net.Conn + conErr error + conWg sync.WaitGroup +} + +func openNPipe(path string) (io.ReadWriteCloser, error) { + l, err := winio.ListenPipe(path, nil) + if err != nil { + return nil, err + } + + p := &pipe{l: l} + p.conWg.Add(1) + + go func() { + defer p.conWg.Done() + c, err := l.Accept() + if err != nil { + p.conErr = err + return + } + p.con = c + }() + return p, nil +} + +func (p *pipe) Write(b []byte) (int, error) { + p.conWg.Wait() + if p.conErr != nil { + return 0, errors.Wrap(p.conErr, "connection error") + } + return p.con.Write(b) +} + +func (p *pipe) Read(b []byte) (int, error) { + p.conWg.Wait() + if p.conErr != nil { + return 0, errors.Wrap(p.conErr, "connection error") + } + return p.con.Read(b) +} + +func (p *pipe) Close() error { + p.l.Close() + p.conWg.Wait() + if p.con != nil { + err := p.con.Close() + return err + } + return p.conErr +}