Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomad logs command to stream task logs #1444

Merged
merged 16 commits into from
Jul 25, 2016
Prev Previous commit
Next Next commit
Support non-following logs
  • Loading branch information
dadgar committed Jul 25, 2016
commit b65fd2624e0f872636b8cccdebfc2ba007341884
8 changes: 5 additions & 3 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,16 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
// * follow: Whether the logs should be followed.
// * task: the tasks name to stream logs for.
// * logType: Either "stdout" or "stderr"
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * offset: The offset to start streaming data at.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {

node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
Expand All @@ -303,6 +304,7 @@ func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset i
Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID),
}
v := url.Values{}
v.Set("follow", strconv.FormatBool(follow))
v.Set("task", task)
v.Set("type", logType)
v.Set("origin", origin)
Expand Down
56 changes: 48 additions & 8 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,16 @@ OUTER:
}

// Flush any existing frames
s.l.Lock()
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
s.l.Unlock()
return
}
default:
}

s.l.Lock()
if s.f != nil {
s.f.Data = s.readData()
s.enc.Encode(s.f)
Expand Down Expand Up @@ -622,7 +621,11 @@ OUTER:
continue OUTER
case <-framer.ExitCh():
return nil
case err := <-eofCancelCh:
case err, ok := <-eofCancelCh:
if !ok {
return nil
}

return err
}
}
Expand All @@ -634,11 +637,13 @@ OUTER:
// Logs streams the content of a log blocking on EOF. The parameters are:
// * task: task name to stream logs for.
// * type: stdout/stderr to stream.
// * follow: A boolean of whether to follow the logs.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var follow bool
var err error

q := req.URL.Query()
Expand All @@ -651,6 +656,10 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
return nil, taskNotPresentErr
}

if follow, err = strconv.ParseBool(q.Get("follow")); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}

logType = q.Get("type")
switch logType {
case "stdout", "stderr":
Expand Down Expand Up @@ -684,10 +693,13 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

return nil, s.logs(offset, origin, task, logType, fs, output)
return nil, s.logs(follow, offset, origin, task, logType, fs, output)
}

func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
func (s *HTTPServer) logs(follow bool, offset int64,
origin, task, logType string,
fs allocdir.AllocDirFS, output io.WriteCloser) error {

// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
Expand Down Expand Up @@ -727,15 +739,39 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi
return fmt.Errorf("failed to list entries: %v", err)
}

// If we are not following logs, determine the max index for the logs we are
// interested in so we can stop there.
maxIndex := int64(math.MaxInt64)
if !follow {
_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
if err != nil {
return err
}
maxIndex = idx
}

logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
if err != nil {
return err
}

var eofCancelCh chan error
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
exitAfter = true
} else {
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
eofCancelCh = fs.BlockUntilExists(nextPath, &t)
}

p := filepath.Join(logPath, logEntry.Name)
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
nextExists := fs.BlockUntilExists(nextPath, &t)
err = s.stream(openOffset, p, fs, framer, nextExists)
err = s.stream(openOffset, p, fs, framer, eofCancelCh)

// Check if there was an error where the file does not exist. That means
// it got rotated out from under us.
Expand All @@ -746,6 +782,10 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi
return err
}

if exitAfter {
return nil
}

//Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
nextIdx = idx + 1
Expand Down
15 changes: 10 additions & 5 deletions command/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ Logs Specific Options:
-job <job-id>
Use a random allocation from a specified job-id.

-f
Causes the output to not stop when the end of the logs are reached, but
rather to wait for additional output.

-tail
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.
Expand All @@ -53,14 +57,15 @@ func (l *LogsCommand) Synopsis() string {
}

func (l *LogsCommand) Run(args []string) int {
var verbose, job, tail, stderr bool
var verbose, job, tail, stderr, follow bool
var numLines, numBytes int64

flags := l.Meta.FlagSet("logs-list", FlagSetClient)
flags.Usage = func() { l.Ui.Output(l.Help()) }
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&job, "job", false, "")
flags.BoolVar(&tail, "tail", false, "")
flags.BoolVar(&follow, "f", false, "")
flags.BoolVar(&stderr, "stderr", false, "")
flags.Int64Var(&numLines, "n", -1, "")
flags.Int64Var(&numBytes, "c", -1, "")
Expand Down Expand Up @@ -187,7 +192,7 @@ func (l *LogsCommand) Run(args []string) int {
var r io.ReadCloser
var readErr error
if !tail {
r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0)
r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginStart, 0)
if readErr != nil {
readErr = fmt.Errorf("Error reading file: %v", readErr)
}
Expand All @@ -206,7 +211,7 @@ func (l *LogsCommand) Run(args []string) int {
numLines = defaultTailLines
}

r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset)
r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginEnd, offset)

// If numLines is set, wrap the reader
if numLines != -1 {
Expand All @@ -231,10 +236,10 @@ func (l *LogsCommand) Run(args []string) int {
// followFile outputs the contents of the file to stdout relative to the end of
// the file.
func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
task, logType, origin string, offset int64) (io.ReadCloser, error) {
follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) {

cancel := make(chan struct{})
frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil)
frames, _, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
if err != nil {
panic(err.Error())
return nil, err
Expand Down