Skip to content

Commit

Permalink
Merge pull request #2294 from influxdb/flush-fix
Browse files Browse the repository at this point in the history
Fix stream writer flushing to be thread safe.
  • Loading branch information
toddboom committed Apr 15, 2015
2 parents 8b8ce9d + b54f81f commit b43db19
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [#2293](https://github.com/influxdb/influxdb/pull/2293): Open cluster listener before starting broker.
- [#2287](https://github.com/influxdb/influxdb/pull/2287): Fix data race during SHOW RETENTION POLICIES.
- [#2288](https://github.com/influxdb/influxdb/pull/2288): Fix expression parsing bug.
- [#2294](https://github.com/influxdb/influxdb/pull/2294): Fix async response flushing (invalid chunked response error).

## Features
- [#2276](https://github.com/influxdb/influxdb/pull/2276): Broker topic truncation.
Expand Down
4 changes: 3 additions & 1 deletion messaging/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error) {
}

// Flush after write.
if dst, ok := dst.(http.Flusher); ok {
if dst, ok := dst.(interface {
Flush()
}); ok {
dst.Flush()
}

Expand Down
25 changes: 11 additions & 14 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,9 +1357,6 @@ func (l *Log) appendToWriters(buf []byte) {
i--
continue
}

// Flush, if possible.
flushWriter(w.Writer)
}
}

Expand Down Expand Up @@ -1808,15 +1805,6 @@ func (l *Log) removeWriter(writer *logWriter) {
return
}

// Flush pushes out buffered data for all open writers.
func (l *Log) Flush() {
l.lock()
defer l.unlock()
for _, w := range l.writers {
flushWriter(w.Writer)
}
}

// ReadFrom continually reads log entries from a reader.
func (l *Log) ReadFrom(r io.ReadCloser) error {
l.tracef("ReadFrom")
Expand Down Expand Up @@ -1961,12 +1949,21 @@ type logWriter struct {
}

// Write writes bytes to the underlying writer.
// The write is ignored if the writer is currently snapshotting.
func (w *logWriter) Write(p []byte) (int, error) {
// Ignore if the writer is currently snapshotting.
if w.snapshotIndex != 0 {
return 0, nil
}
return w.Writer.Write(p)

// Write to underlying writer.
n, err := w.Writer.Write(p)
if err != nil {
return n, err
}

// Flush writer if successful.
flushWriter(w.Writer)
return n, err
}

func (w *logWriter) Close() error {
Expand Down

0 comments on commit b43db19

Please sign in to comment.