diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cc11231a92..634ae7956ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#2293](https://github.com/influxdb/influxdb/pull/2293): Open cluster listener before starting broker. - [#2287](https://github.com/influxdb/influxdb/pull/2287): Fix data race during SHOW RETENTION POLICIES. - [#2288](https://github.com/influxdb/influxdb/pull/2288): Fix expression parsing bug. +- [#2294](https://github.com/influxdb/influxdb/pull/2294): Fix async response flushing (invalid chunked response error). ## Features - [#2276](https://github.com/influxdb/influxdb/pull/2276): Broker topic truncation. diff --git a/messaging/handler.go b/messaging/handler.go index c5c2390a81f..bbba9141301 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -231,7 +231,9 @@ func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error) { } // Flush after write. - if dst, ok := dst.(http.Flusher); ok { + if dst, ok := dst.(interface { + Flush() + }); ok { dst.Flush() } diff --git a/raft/log.go b/raft/log.go index 26be8cec0d1..08d4b24d162 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1357,9 +1357,6 @@ func (l *Log) appendToWriters(buf []byte) { i-- continue } - - // Flush, if possible. - flushWriter(w.Writer) } } @@ -1808,15 +1805,6 @@ func (l *Log) removeWriter(writer *logWriter) { return } -// Flush pushes out buffered data for all open writers. -func (l *Log) Flush() { - l.lock() - defer l.unlock() - for _, w := range l.writers { - flushWriter(w.Writer) - } -} - // ReadFrom continually reads log entries from a reader. func (l *Log) ReadFrom(r io.ReadCloser) error { l.tracef("ReadFrom") @@ -1961,12 +1949,21 @@ type logWriter struct { } // Write writes bytes to the underlying writer. -// The write is ignored if the writer is currently snapshotting. func (w *logWriter) Write(p []byte) (int, error) { + // Ignore if the writer is currently snapshotting. if w.snapshotIndex != 0 { return 0, nil } - return w.Writer.Write(p) + + // Write to underlying writer. + n, err := w.Writer.Write(p) + if err != nil { + return n, err + } + + // Flush writer if successful. + flushWriter(w.Writer) + return n, err } func (w *logWriter) Close() error {