From 22de89f41a5842ede1319bd4ca982ae2c7a50ed6 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Thu, 9 Jul 2015 17:19:29 +0100 Subject: [PATCH] [proxy] rewrote chunked response handler 1) We cannot send "Connection: close", because the fsouza docker client expects the tcp socket to stay open between requests. 2) Because we cannot force-close the connection, we can't hijack the connection (because go's net/http doesn't let use un-hijack it). 3) Because we need to maintain the individual chunking of messages (for docker-py), we can't just copy the response body, as Go will remove and re-add the chunking willy-nilly. Therefore, we have to read each chunk one-by-one, and flush the ResponseWriter after each one. --- proxy/chunked.go | 146 +++++++++++++++++++++++++++++++++ proxy/chunked_test.go | 170 +++++++++++++++++++++++++++++++++++++++ proxy/proxy_intercept.go | 39 +++++---- 3 files changed, 335 insertions(+), 20 deletions(-) create mode 100644 proxy/chunked.go create mode 100644 proxy/chunked_test.go diff --git a/proxy/chunked.go b/proxy/chunked.go new file mode 100644 index 0000000000..af5d25d8ce --- /dev/null +++ b/proxy/chunked.go @@ -0,0 +1,146 @@ +// Based on net/http/internal +package proxy + +import ( + "bufio" + "errors" + "io" + "io/ioutil" +) + +const maxLineLength = 4096 // assumed <= bufio.defaultBufSize + +var ErrLineTooLong = errors.New("header line too long") + +// Unlike net/http/internal.chunkedReader, this has an interface where we can +// handle individual chunks. The interface is based on database/sql.Rows. +func NewChunkedReader(r io.Reader) *ChunkedReader { + br, ok := r.(*bufio.Reader) + if !ok { + br = bufio.NewReader(r) + } + return &ChunkedReader{r: br} +} + +type ChunkedReader struct { + r *bufio.Reader + chunk *io.LimitedReader + err error + buf [2]byte +} + +// Next prepares the next chunk for reading. It returns true on success, or +// false if there is no next chunk or an error happened while preparing +// it. Err should be consulted to distinguish between the two cases. +// +// Every call to Chunk, even the first one, must be preceded by a call to Next. +// +// Calls to Next will discard any unread bytes in the current Chunk. +func (cr *ChunkedReader) Next() bool { + if cr.err != nil { + return false + } + + // Check the termination of the previous chunk + if cr.chunk != nil { + // Make sure the remainder is drained, in case the user of this quit + // reading early. + if _, cr.err = io.Copy(ioutil.Discard, cr.chunk); cr.err != nil { + return false + } + + // Check the next two bytes after the chunk are \r\n + if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err != nil { + return false + } + if cr.buf[0] != '\r' || cr.buf[1] != '\n' { + cr.err = errors.New("malformed chunked encoding") + return false + } + } else { + cr.chunk = &io.LimitedReader{R: cr.r} + } + + // Setup the next chunk + if n := cr.beginChunk(); n > 0 { + cr.chunk.N = int64(n) + } else { + cr.err = io.EOF + } + return cr.err == nil +} + +// Chunk returns the io.Reader of the current chunk. On each call, this returns +// the same io.Reader for a given chunk. +func (cr *ChunkedReader) Chunk() io.Reader { + return cr.chunk +} + +// Err returns the error, if any, that was encountered during iteration. +func (cr *ChunkedReader) Err() error { + if cr.err == io.EOF { + return nil + } + return cr.err +} + +func (cr *ChunkedReader) beginChunk() (n uint64) { + // chunk-size CRLF + var line []byte + line, cr.err = readLine(cr.r) + if cr.err != nil { + return + } + n, cr.err = parseHexUint(line) + return +} + +// Read a line of bytes (up to \n) from b. +// Give up if the line exceeds maxLineLength. +// The returned bytes are a pointer into storage in +// the bufio, so they are only valid until the next bufio read. +func readLine(b *bufio.Reader) (p []byte, err error) { + if p, err = b.ReadSlice('\n'); err != nil { + // We always know when EOF is coming. + // If the caller asked for a line, there should be a line. + if err == io.EOF { + err = io.ErrUnexpectedEOF + } else if err == bufio.ErrBufferFull { + err = ErrLineTooLong + } + return nil, err + } + if len(p) >= maxLineLength { + return nil, ErrLineTooLong + } + return trimTrailingWhitespace(p), nil +} + +func trimTrailingWhitespace(b []byte) []byte { + for len(b) > 0 && isASCIISpace(b[len(b)-1]) { + b = b[:len(b)-1] + } + return b +} + +func isASCIISpace(b byte) bool { + return b == ' ' || b == '\t' || b == '\n' || b == '\r' +} + +func parseHexUint(v []byte) (n uint64, err error) { + for _, b := range v { + n <<= 4 + switch { + case '0' <= b && b <= '9': + b = b - '0' + case 'a' <= b && b <= 'f': + b = b - 'a' + 10 + case 'A' <= b && b <= 'F': + b = b - 'A' + 10 + default: + return 0, errors.New("invalid byte in chunk length") + } + n |= uint64(b) + } + return +} diff --git a/proxy/chunked_test.go b/proxy/chunked_test.go new file mode 100644 index 0000000000..50bf4770be --- /dev/null +++ b/proxy/chunked_test.go @@ -0,0 +1,170 @@ +// Based on net/http/internal +package proxy + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "testing" +) + +func assertNextChunk(t *testing.T, r *ChunkedReader, expected string) { + if !r.Next() { + t.Fatalf("Expected chunk, but ran out early: %v", r.Err()) + } + if r.Err() != nil { + t.Fatalf("Error reading chunk: %q", r.Err()) + } + data, err := ioutil.ReadAll(r.Chunk()) + if g := string(data); g != expected { + t.Errorf("chunk reader read %q; want %q", g, expected) + } + if err != nil { + t.Logf(`data: "%s"`, data) + t.Fatalf("reading chunk: %v", err) + } +} + +func assertNoMoreChunks(t *testing.T, r *ChunkedReader) { + if r.Next() { + t.Errorf("Expected no more chunks, but found too many") + } + if r.Err() != nil { + t.Errorf("Expected no error, but found: %q", r.Err()) + } +} + +func TestChunk(t *testing.T) { + var b bytes.Buffer + + w := NewChunkedWriter(&b) + const chunk1 = "hello, " + const chunk2 = "world! 0123456789abcdef" + w.Write([]byte(chunk1)) + w.Write([]byte(chunk2)) + w.Close() + + r := NewChunkedReader(&b) + + assertNextChunk(t, r, chunk1) + assertNextChunk(t, r, chunk2) + assertNoMoreChunks(t, r) +} + +func TestIncompleteReadOfChunk(t *testing.T) { + var b bytes.Buffer + + w := NewChunkedWriter(&b) + const chunk1 = "hello, " + const chunk2 = "world! 0123456789abcdef" + w.Write([]byte(chunk1)) + w.Write([]byte(chunk2)) + w.Close() + + r := NewChunkedReader(&b) + + // Incomplete read of first chunk + { + if !r.Next() { + t.Fatalf("Expected chunk, but ran out early: %v", r.Err()) + } + if r.Err() != nil { + t.Fatalf("Error reading chunk: %q", r.Err()) + } + // Read just 2 bytes + buf := make([]byte, 2) + if _, err := io.ReadFull(r.Chunk(), buf[:2]); err != nil { + t.Fatalf("Error reading first bytes of chunk: %q", err) + } + if buf[0] != 'h' || buf[1] != 'e' { + t.Fatalf("Unexpected first 2 bytes of chunk: %s", string(buf)) + } + } + + // Second chunk still reads ok + assertNextChunk(t, r, chunk2) + + assertNoMoreChunks(t, r) +} + +func TestMalformedChunks(t *testing.T) { + r := NewChunkedReader(bytes.NewBufferString( + "7\r\nhello, GARBAGEBYTES17\r\nworld! 0123456789abcdef\r\n0\r\n", + )) + + // First chunk is ok + assertNextChunk(t, r, "hello, ") + + // Second chunk fails + { + if r.Next() { + t.Errorf("Expected failure when reading chunks, but got one") + } + e := "malformed chunked encoding" + if r.Err() == nil || r.Err().Error() != e { + t.Errorf("chunk reader errored %q; want %q", r.Err(), e) + } + data, err := ioutil.ReadAll(r.Chunk()) + if len(data) != 0 { + t.Errorf("chunk should have been empty. got %q", string(data)) + } + if err != nil { + t.Logf(`data: "%s"`, data) + t.Errorf("reading chunk: %v", err) + } + } + + assertNoMoreChunks(t, r) +} + +// Stolen from net/http/internal for testing +// +// NewChunkedWriter returns a new chunkedWriter that translates writes into HTTP +// "chunked" format before writing them to w. Closing the returned chunkedWriter +// sends the final 0-length chunk that marks the end of the stream. +// +// NewChunkedWriter is not needed by normal applications. The http +// package adds chunking automatically if handlers don't set a +// Content-Length header. Using newChunkedWriter inside a handler +// would result in double chunking or chunking with a Content-Length +// length, both of which are wrong. +func NewChunkedWriter(w io.Writer) io.WriteCloser { + return &chunkedWriter{w} +} + +// Writing to chunkedWriter translates to writing in HTTP chunked Transfer +// Encoding wire format to the underlying Wire chunkedWriter. +type chunkedWriter struct { + Wire io.Writer +} + +// Write the contents of data as one chunk to Wire. +// NOTE: Note that the corresponding chunk-writing procedure in Conn.Write has +// a bug since it does not check for success of io.WriteString +func (cw *chunkedWriter) Write(data []byte) (n int, err error) { + + // Don't send 0-length data. It looks like EOF for chunked encoding. + if len(data) == 0 { + return 0, nil + } + + if _, err = fmt.Fprintf(cw.Wire, "%x\r\n", len(data)); err != nil { + return 0, err + } + if n, err = cw.Wire.Write(data); err != nil { + return + } + if n != len(data) { + err = io.ErrShortWrite + return + } + _, err = io.WriteString(cw.Wire, "\r\n") + + return +} + +func (cw *chunkedWriter) Close() error { + _, err := io.WriteString(cw.Wire, "0\r\n") + return err +} diff --git a/proxy/proxy_intercept.go b/proxy/proxy_intercept.go index 69f83c8353..383bb78e90 100644 --- a/proxy/proxy_intercept.go +++ b/proxy/proxy_intercept.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" "net/http" "net/http/httputil" @@ -117,29 +116,29 @@ func copyStream(dst io.Writer, src io.Reader, done chan struct{}) { } func doChunkedResponse(w http.ResponseWriter, resp *http.Response, client *httputil.ClientConn) { - // Because we can't go back to request/response after we - // hijack the connection, we need to close it and make the - // client open another. - w.Header().Add("Connection", "close") - w.WriteHeader(resp.StatusCode) - - down, _, up, rem, err := hijack(w, client) - if err != nil { - http.Error(w, "Unable to hijack response stream for chunked response", http.StatusInternalServerError) + wf, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Error forwarding chunked response body: flush not available", http.StatusInternalServerError) return } + + w.WriteHeader(resp.StatusCode) + + up, rem := client.Hijack() defer up.Close() - defer down.Close() - // Copy the chunked response body to downstream, - // stopping at the end of the chunked section. - rawResponseBody := io.MultiReader(rem, up) - if _, err := io.Copy(ioutil.Discard, httputil.NewChunkedReader(io.TeeReader(rawResponseBody, down))); err != nil { - http.Error(w, "Error copying chunked response body", http.StatusInternalServerError) - return + + var err error + chunks := NewChunkedReader(io.MultiReader(rem, up)) + for chunks.Next() && err == nil { + _, err = io.Copy(w, chunks.Chunk()) + wf.Flush() + } + if err == nil { + err = chunks.Err() + } + if err != nil { + Error.Printf("Error forwarding chunked response body: %s", err) } - resp.Trailer.Write(down) - // a chunked response ends with a CRLF - down.Write([]byte("\r\n")) } func hijack(w http.ResponseWriter, client *httputil.ClientConn) (down net.Conn, downBuf *bufio.ReadWriter, up net.Conn, rem io.Reader, err error) {