Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomad logs command to stream task logs #1444

Merged
merged 16 commits into from
Jul 25, 2016
Prev Previous commit
Next Next commit
unblock the readers to add liveness when using -n
  • Loading branch information
dadgar committed Jul 25, 2016
commit ac1cfd182153296c75df1e1bcc52d2802d33d402
29 changes: 23 additions & 6 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ type FrameReader struct {
cancelCh chan struct{}
closed bool

unblockTime time.Duration

frame *StreamFrame
frameOffset int

Expand All @@ -381,6 +383,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe
}
}

// SetUnblockTime sets the time to unblock and return zero bytes read. If the
// duration is unset or is zero or less, the read will block til data is read.
func (f *FrameReader) SetUnblockTime(d time.Duration) {
f.unblockTime = d
}

// Offset returns the offset into the stream.
func (f *FrameReader) Offset() int {
return f.byteOffset
Expand All @@ -390,14 +398,23 @@ func (f *FrameReader) Offset() int {
// when there are no more frames.
func (f *FrameReader) Read(p []byte) (n int, err error) {
if f.frame == nil {
frame, ok := <-f.frames
if !ok {
return 0, io.EOF
var unblock <-chan time.Time
if f.unblockTime.Nanoseconds() > 0 {
unblock = time.After(f.unblockTime)
}
f.frame = frame

// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
select {
case frame, ok := <-f.frames:
if !ok {
return 0, io.EOF
}
f.frame = frame

// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
case <-unblock:
return 0, nil
}
}

if f.frame.FileEvent != "" && len(f.fileEvent) == 0 {
Expand Down
37 changes: 37 additions & 0 deletions api/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"reflect"
"testing"
"time"
)

func TestFS_FrameReader(t *testing.T) {
Expand Down Expand Up @@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) {
t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected))
}
}

func TestFS_FrameReader_Unblock(t *testing.T) {
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
cancelCh := make(chan struct{})

r := NewFrameReader(framesCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)

// Read a little
p := make([]byte, 12)

n, err := r.Read(p)
if err != nil {
t.Fatalf("Read failed: %v", err)
}

if n != 0 {
t.Fatalf("should have unblocked")
}

// Unset the unblock
r.SetUnblockTime(0)

resultCh := make(chan struct{})
go func() {
r.Read(p)
close(resultCh)
}()

select {
case <-resultCh:
t.Fatalf("shouldn't have unblocked")
case <-time.After(300 * time.Millisecond):
}
}
4 changes: 2 additions & 2 deletions command/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (f *FSCommand) Run(args []string) int {

// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0)
}
}

Expand Down Expand Up @@ -321,7 +321,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,

// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0)
}

go func() {
Expand Down
57 changes: 52 additions & 5 deletions command/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,26 @@ type LineLimitReader struct {
lines int
searchLimit int

timeLimit time.Duration
lastRead time.Time

buffer *bytes.Buffer
bufFiled bool
foundLines bool
}

// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find
// searching backwards in the first searchLimit bytes.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader {
// searching backwards in the first searchLimit bytes. timeLimit can optionally
// be specified by passing a non-zero duration. When set, the search for the
// last n lines is aborted if no data has been read in the duration. This
// can be used to flush what is had if no extra data is being received. When
// used, the underlying reader must not block forever and must periodically
// unblock even when no data has been read.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int, timeLimit time.Duration) *LineLimitReader {
return &LineLimitReader{
ReadCloser: r,
searchLimit: searchLimit,
timeLimit: timeLimit,
lines: lines,
buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)),
}
Expand All @@ -124,14 +133,52 @@ func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReade
func (l *LineLimitReader) Read(p []byte) (n int, err error) {
// Fill up the buffer so we can find the correct number of lines.
if !l.bufFiled {
_, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit)))
b := make([]byte, len(p))
n, err := l.ReadCloser.Read(b)
if n > 0 {
if _, err := l.buffer.Write(b[:n]); err != nil {
return 0, err
}
}

if err != nil {
return 0, err
if err != io.EOF {
return 0, err
}

l.bufFiled = true
goto READ
}

if l.buffer.Len() >= l.searchLimit {
l.bufFiled = true
goto READ
}

if l.timeLimit.Nanoseconds() > 0 {
if l.lastRead.IsZero() {
l.lastRead = time.Now()
return 0, nil
}

now := time.Now()
if n == 0 {
// We hit the limit
if l.lastRead.Add(l.timeLimit).Before(now) {
l.bufFiled = true
goto READ
} else {
return 0, nil
}
} else {
l.lastRead = now
}
}

l.bufFiled = true
return 0, nil
}

READ:
if l.bufFiled && l.buffer.Len() != 0 {
b := l.buffer.Bytes()

Expand Down
63 changes: 61 additions & 2 deletions command/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package command

import (
"io"
"io/ioutil"
"reflect"
"strings"
"testing"
"time"

"github.com/mitchellh/cli"
)
Expand Down Expand Up @@ -48,7 +51,7 @@ func TestHelpers_NodeID(t *testing.T) {
}
}

func TestHelpers_LineLimitReader(t *testing.T) {
func TestHelpers_LineLimitReader_NoTimeLimit(t *testing.T) {
helloString := `hello
world
this
Expand Down Expand Up @@ -114,7 +117,7 @@ test`,

for i, c := range cases {
in := ioutil.NopCloser(strings.NewReader(c.Input))
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit)
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit, 0)
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("case %d failed: %v", i, err)
Expand All @@ -126,3 +129,59 @@ test`,
}
}
}

type testReadCloser struct {
data chan []byte
}

func (t *testReadCloser) Read(p []byte) (n int, err error) {
select {
case b, ok := <-t.data:
if !ok {
return 0, io.EOF
}

return copy(p, b), nil
case <-time.After(10 * time.Millisecond):
return 0, nil
}
}

func (t *testReadCloser) Close() error {
close(t.data)
return nil
}

func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) {
// Create the test reader
in := &testReadCloser{data: make(chan []byte)}

// Set up the reader such that it won't hit the line/buffer limit and could
// only terminate if it hits the time limit
limit := NewLineLimitReader(in, 1000, 1000, 100*time.Millisecond)

expected := []byte("hello world")

resultCh := make(chan struct{})
go func() {
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("ReadAll failed: %v", err)
}

if reflect.DeepEqual(outBytes, expected) {
close(resultCh)
return
}
}()

// Send the data
in.data <- expected
in.Close()

select {
case <-resultCh:
case <-time.After(1 * time.Second):
t.Fatalf("did not exit by time limit")
}
}
6 changes: 4 additions & 2 deletions command/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/hashicorp/nomad/api"
)
Expand All @@ -33,7 +34,7 @@ Logs Specific Options:
-job <job-id>
Use a random allocation from a specified job-id.

-tail
-tail
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.

Expand Down Expand Up @@ -182,7 +183,7 @@ func (l *LogsCommand) Run(args []string) int {

// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second)
}

if readErr != nil {
Expand Down Expand Up @@ -216,6 +217,7 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader

go func() {
Expand Down