Skip to content

Commit

Permalink
Fix stream writer flushing to be thread safe.
Browse files Browse the repository at this point in the history
This change fixes the raft streams so that Flush() is not called
asynchronously while the snapshot is being written.
  • Loading branch information
benbjohnson committed Apr 15, 2015
1 parent 817c2e7 commit b54f81f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [#2293](https://github.com/influxdb/influxdb/pull/2293): Open cluster listener before starting broker.
- [#2287](https://github.com/influxdb/influxdb/pull/2287): Fix data race during SHOW RETENTION POLICIES.
- [#2288](https://github.com/influxdb/influxdb/pull/2288): Fix expression parsing bug.
- [#2294](https://github.com/influxdb/influxdb/pull/2294): Fix async response flushing (invalid chunked response error).

## Features
- [#2276](https://github.com/influxdb/influxdb/pull/2276): Broker topic truncation.
Expand Down
4 changes: 3 additions & 1 deletion messaging/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error) {
}

// Flush after write.
if dst, ok := dst.(http.Flusher); ok {
if dst, ok := dst.(interface {
Flush()
}); ok {
dst.Flush()
}

Expand Down
25 changes: 11 additions & 14 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,9 +1357,6 @@ func (l *Log) appendToWriters(buf []byte) {
i--
continue
}

// Flush, if possible.
flushWriter(w.Writer)
}
}

Expand Down Expand Up @@ -1808,15 +1805,6 @@ func (l *Log) removeWriter(writer *logWriter) {
return
}

// Flush pushes out buffered data for all open writers.
func (l *Log) Flush() {
l.lock()
defer l.unlock()
for _, w := range l.writers {
flushWriter(w.Writer)
}
}

// ReadFrom continually reads log entries from a reader.
func (l *Log) ReadFrom(r io.ReadCloser) error {
l.tracef("ReadFrom")
Expand Down Expand Up @@ -1961,12 +1949,21 @@ type logWriter struct {
}

// Write writes bytes to the underlying writer.
// The write is ignored if the writer is currently snapshotting.
func (w *logWriter) Write(p []byte) (int, error) {
// Ignore if the writer is currently snapshotting.
if w.snapshotIndex != 0 {
return 0, nil
}
return w.Writer.Write(p)

// Write to underlying writer.
n, err := w.Writer.Write(p)
if err != nil {
return n, err
}

// Flush writer if successful.
flushWriter(w.Writer)
return n, err
}

func (w *logWriter) Close() error {
Expand Down

0 comments on commit b54f81f

Please sign in to comment.