Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Gasch <[email protected]>
  • Loading branch information
Michael Gasch committed Jan 10, 2022
1 parent c4ae14e commit 5a8fa66
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 30 deletions.
4 changes: 2 additions & 2 deletions example_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Example_stream() {
// write some records (offsets 10-14)
for i := 0; i < writeRecords/2; i++ {
d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart)
_, err := l.Write(ctx, []byte(d))
_, err = l.Write(ctx, []byte(d))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
Expand Down Expand Up @@ -74,7 +74,7 @@ func Example_stream() {
}()
for i := writeRecords / 2; i < writeRecords; i++ {
d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart)
_, err := l.Write(ctx, []byte(d))
_, err = l.Write(ctx, []byte(d))
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Printf("write: %v", err)
os.Exit(1)
Expand Down
File renamed without changes.
47 changes: 21 additions & 26 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import (
)

const (
streamBuffer = 100 // limit before ErrSlowReader
streamBuffer = 100 // limit before blocking on send to receiver
streamBackoffInterval = time.Millisecond * 10
)

// ErrSlowReader is returned by a stream when the stream buffer is full
var ErrSlowReader = errors.New("slow reader blocking stream channel send")

// StreamHeader is metadata associated with a stream record
type StreamHeader struct {
Earliest Offset
Expand All @@ -33,6 +30,7 @@ type StreamRecord struct {
// Both channels are closed when the provided context is cancelled or an
// unrecoverable error, e.g. ErrOutOfRange, occurs.
//
// TODO: fix
// If the caller is not keeping up with the record stream (buffered channel),
// ErrSlowReader will be returned and the stream terminates.
//
Expand All @@ -42,12 +40,12 @@ type StreamRecord struct {
// Safe for concurrent use.
func (l *Log) Stream(ctx context.Context, start Offset) (<-chan StreamRecord, <-chan error) {
var (
// when buffer is full, returns with ErrSlowReader
// blocks when buffer is full
streamCh = make(chan StreamRecord, streamBuffer)

// unbuffered to guarantee delivery before returning. avoids coding
// complexity/bugs on caller side when streamCh is closed (returning invalid
// empty Records to receiver)
// unbuffered to guarantee delivery before returning (channels won't be closed
// unless err delivered) - avoids coding complexity/bugs on caller side when
// streamCh is closed (returning invalid empty Records to receiver)
errCh = make(chan error)
)

Expand All @@ -65,26 +63,16 @@ func (l *Log) Stream(ctx context.Context, start Offset) (<-chan StreamRecord, <-
return

default:
sendOne := func() error {
if len(streamCh) == streamBuffer {
return ErrSlowReader
}

readOne := func() (StreamRecord, error) {
l.mu.RLock()
defer l.mu.RUnlock()

earliest, latest := l.offsetRange()
r, err := l.read(ctx, offset)
if err != nil {
if errors.Is(err, ErrFutureOffset) {
// back off and continue polling
time.Sleep(streamBackoffInterval)
return nil
}

return err
return StreamRecord{}, err
}

earliest, latest := l.offsetRange()
rec := StreamRecord{
Metadata: StreamHeader{
Earliest: earliest,
Expand All @@ -93,16 +81,23 @@ func (l *Log) Stream(ctx context.Context, start Offset) (<-chan StreamRecord, <-
Record: r,
}

streamCh <- rec
offset = r.Metadata.Offset + 1

return nil
return rec, nil
}

if err := sendOne(); err != nil {
rec, err := readOne()
if err != nil {
if errors.Is(err, ErrFutureOffset) {
// back off and continue polling
time.Sleep(streamBackoffInterval)
continue
}

errCh <- err
return
}

streamCh <- rec // blocks when chan is full
offset = rec.Record.Metadata.Offset + 1
}
}
}()
Expand Down
5 changes: 3 additions & 2 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ func TestLog_Stream(t *testing.T) {
}
})

t.Run("returns error when stream reader is too slow", func(t *testing.T) {
// TODO: fix test
/*t.Run("returns error when stream reader is too slow", func(t *testing.T) {
t.Parallel()
const (
Expand Down Expand Up @@ -291,7 +292,7 @@ func TestLog_Stream(t *testing.T) {
streamErr := <-errCh
assert.Assert(t, errors.Is(streamErr, ErrSlowReader))
})
})*/

t.Run("two stream receivers, starting at different offsets until stream cancelled", func(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 5a8fa66

Please sign in to comment.