Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomad logs command to stream task logs #1444

Merged
merged 16 commits into from
Jul 25, 2016
Next Next commit
initial log api impl
  • Loading branch information
dadgar committed Jul 25, 2016
commit cc0fec53f1e6b5df709085e791bf051183dfce9a
11 changes: 8 additions & 3 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type AllocDirFS interface {
List(path string) ([]*AllocFileInfo, error)
Stat(path string) (*AllocFileInfo, error)
ReadAt(path string, offset int64) (io.ReadCloser, error)
BlockUntilExists(path string, t *tomb.Tomb) error
BlockUntilExists(path string, t *tomb.Tomb) chan error
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
}

Expand Down Expand Up @@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {

// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error {
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error {
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
return watcher.BlockUntilExists(t)
returnCh := make(chan error, 1)
go func() {
returnCh <- watcher.BlockUntilExists(t)
close(returnCh)
}()
return returnCh
}

// ChangeEvents watches for changes to the passed path relative to the
Expand Down
209 changes: 202 additions & 7 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"bytes"
"fmt"
"io"
"math"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
Expand All @@ -21,6 +24,8 @@ import (
var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
taskNotPresentErr = fmt.Errorf("must provide task name")
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
invalidOrigin = fmt.Errorf("origin must be start or end")
)
Expand Down Expand Up @@ -58,6 +63,8 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int
return s.FileCatRequest(resp, req)
case strings.HasPrefix(path, "stream/"):
return s.Stream(resp, req)
case strings.HasPrefix(path, "logs/"):
return s.Logs(resp, req)
default:
return nil, CodedError(404, ErrInvalidMethod)
}
Expand Down Expand Up @@ -499,10 +506,18 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

return nil, s.stream(offset, path, fs, output)
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

return nil, s.stream(offset, path, fs, framer, nil)
}

func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
func (s *HTTPServer) stream(offset int64, path string,
fs allocdir.AllocDirFS, framer *StreamFramer,
eofCancelCh chan error) error {

// Get the reader
f, err := fs.ReadAt(path, offset)
if err != nil {
Expand All @@ -517,11 +532,6 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o
t.Done()
}()

// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

// Create a variable to allow setting the last event
var lastEvent string

Expand Down Expand Up @@ -595,9 +605,194 @@ OUTER:
continue OUTER
case <-framer.ExitCh():
return nil
case err := <-eofCancelCh:
return err
}
}
}

return nil
}

// Logs streams the content of a log blocking on EOF. The parameters are:
// * task: task name to stream logs for.
// * type: stdout/stderr to stream.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var err error

q := req.URL.Query()

if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
return nil, allocIDNotPresentErr
}

if task = q.Get("task"); task == "" {
return nil, taskNotPresentErr
}

logType = q.Get("type")
switch logType {
case "stdout", "stderr":
default:
return nil, logTypeNotPresentErr
}

var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}

origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}

fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}

// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

return nil, s.logs(offset, origin, task, logType, fs, output)
}

func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

// Path to the logs
logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)

// nextIdx is the next index to read logs from
var nextIdx int64
switch origin {
case "start":
nextIdx = 0
case "end":
nextIdx = math.MaxInt64
offset *= -1
default:
return invalidOrigin
}

// Create a tomb to cancel watch events
t := tomb.Tomb{}
defer func() {
t.Kill(nil)
t.Done()
}()

for {
// Logic for picking next file is:
// 1) List log files
// 2) Pick log file closest to desired index
// 3) Open log file at correct offset
// 3a) No error, read contents
// 3b) If file doesn't exist, goto 1 as it may have been rotated out
entries, err := fs.List(logPath)
if err != nil {
return fmt.Errorf("failed to list entries: %v", err)
}

logEntry, idx, err := findClosest(entries, nextIdx, task, logType)
if err != nil {
return err
}

// Apply the offset we should open at. Handling the negative case is
// only for the first time.
openOffset := offset
if openOffset < 0 {
openOffset = logEntry.Size + openOffset
if openOffset < 0 {
openOffset = 0
}
}

p := filepath.Join(logPath, logEntry.Name)
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
nextExists := fs.BlockUntilExists(nextPath, &t)
err = s.stream(openOffset, p, fs, framer, nextExists)

// Check if there was an error where the file does not exist. That means
// it got rotated out from under us.
if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}

//Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
idx++
}

return nil
}

func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64,
task, logType string) (*allocdir.AllocFileInfo, int64, error) {

if len(entries) == 0 {
return nil, 0, fmt.Errorf("no file entries found")
}

prefix := fmt.Sprintf("%s.%s.", task, logType)

var closest *allocdir.AllocFileInfo
var closestIdx int64
closestDist := int64(math.MaxInt64)
for _, entry := range entries {
if entry.IsDir {
continue
}

idxStr := strings.TrimPrefix(entry.Name, prefix)

// If nothing was trimmed, then it is not a match
if idxStr == entry.Name {
continue
}

// Convert to an int
idx, err := strconv.Atoi(idxStr)
if err != nil {
return nil, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
}

// Determine distance to desired
d := desiredIdx - int64(idx)
if d < 0 {
d *= -1
}

if d < closestDist {
closestDist = d
closest = entry
closestIdx = int64(idx)
}
}

if closest == nil {
return nil, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
}

return closest, closestIdx, nil
}