This repository was archived by the owner on Jun 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 678
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[proxy] rewrote chunked response handler
1) We cannot send "Connection: close", because the fsouza docker client expects the tcp socket to stay open between requests. 2) Because we cannot force-close the connection, we can't hijack the connection (because go's net/http doesn't let use un-hijack it). 3) Because we need to maintain the individual chunking of messages (for docker-py), we can't just copy the response body, as Go will remove and re-add the chunking willy-nilly. Therefore, we have to read each chunk one-by-one, and flush the ResponseWriter after each one.
- Loading branch information
1 parent
b93070f
commit 22de89f
Showing
3 changed files
with
335 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
// Based on net/http/internal | ||
package proxy | ||
|
||
import ( | ||
"bufio" | ||
"errors" | ||
"io" | ||
"io/ioutil" | ||
) | ||
|
||
const maxLineLength = 4096 // assumed <= bufio.defaultBufSize | ||
|
||
var ErrLineTooLong = errors.New("header line too long") | ||
|
||
// Unlike net/http/internal.chunkedReader, this has an interface where we can | ||
// handle individual chunks. The interface is based on database/sql.Rows. | ||
func NewChunkedReader(r io.Reader) *ChunkedReader { | ||
br, ok := r.(*bufio.Reader) | ||
if !ok { | ||
br = bufio.NewReader(r) | ||
} | ||
return &ChunkedReader{r: br} | ||
} | ||
|
||
type ChunkedReader struct { | ||
r *bufio.Reader | ||
chunk *io.LimitedReader | ||
err error | ||
buf [2]byte | ||
} | ||
|
||
// Next prepares the next chunk for reading. It returns true on success, or | ||
// false if there is no next chunk or an error happened while preparing | ||
// it. Err should be consulted to distinguish between the two cases. | ||
// | ||
// Every call to Chunk, even the first one, must be preceded by a call to Next. | ||
// | ||
// Calls to Next will discard any unread bytes in the current Chunk. | ||
func (cr *ChunkedReader) Next() bool { | ||
if cr.err != nil { | ||
return false | ||
} | ||
|
||
// Check the termination of the previous chunk | ||
if cr.chunk != nil { | ||
// Make sure the remainder is drained, in case the user of this quit | ||
// reading early. | ||
if _, cr.err = io.Copy(ioutil.Discard, cr.chunk); cr.err != nil { | ||
return false | ||
} | ||
|
||
// Check the next two bytes after the chunk are \r\n | ||
if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err != nil { | ||
return false | ||
} | ||
if cr.buf[0] != '\r' || cr.buf[1] != '\n' { | ||
cr.err = errors.New("malformed chunked encoding") | ||
return false | ||
} | ||
} else { | ||
cr.chunk = &io.LimitedReader{R: cr.r} | ||
} | ||
|
||
// Setup the next chunk | ||
if n := cr.beginChunk(); n > 0 { | ||
cr.chunk.N = int64(n) | ||
} else { | ||
cr.err = io.EOF | ||
} | ||
return cr.err == nil | ||
} | ||
|
||
// Chunk returns the io.Reader of the current chunk. On each call, this returns | ||
// the same io.Reader for a given chunk. | ||
func (cr *ChunkedReader) Chunk() io.Reader { | ||
return cr.chunk | ||
} | ||
|
||
// Err returns the error, if any, that was encountered during iteration. | ||
func (cr *ChunkedReader) Err() error { | ||
if cr.err == io.EOF { | ||
return nil | ||
} | ||
return cr.err | ||
} | ||
|
||
func (cr *ChunkedReader) beginChunk() (n uint64) { | ||
// chunk-size CRLF | ||
var line []byte | ||
line, cr.err = readLine(cr.r) | ||
if cr.err != nil { | ||
return | ||
} | ||
n, cr.err = parseHexUint(line) | ||
return | ||
} | ||
|
||
// Read a line of bytes (up to \n) from b. | ||
// Give up if the line exceeds maxLineLength. | ||
// The returned bytes are a pointer into storage in | ||
// the bufio, so they are only valid until the next bufio read. | ||
func readLine(b *bufio.Reader) (p []byte, err error) { | ||
if p, err = b.ReadSlice('\n'); err != nil { | ||
// We always know when EOF is coming. | ||
// If the caller asked for a line, there should be a line. | ||
if err == io.EOF { | ||
err = io.ErrUnexpectedEOF | ||
} else if err == bufio.ErrBufferFull { | ||
err = ErrLineTooLong | ||
} | ||
return nil, err | ||
} | ||
if len(p) >= maxLineLength { | ||
return nil, ErrLineTooLong | ||
} | ||
return trimTrailingWhitespace(p), nil | ||
} | ||
|
||
func trimTrailingWhitespace(b []byte) []byte { | ||
for len(b) > 0 && isASCIISpace(b[len(b)-1]) { | ||
b = b[:len(b)-1] | ||
} | ||
return b | ||
} | ||
|
||
func isASCIISpace(b byte) bool { | ||
return b == ' ' || b == '\t' || b == '\n' || b == '\r' | ||
} | ||
|
||
func parseHexUint(v []byte) (n uint64, err error) { | ||
for _, b := range v { | ||
n <<= 4 | ||
switch { | ||
case '0' <= b && b <= '9': | ||
b = b - '0' | ||
case 'a' <= b && b <= 'f': | ||
b = b - 'a' + 10 | ||
case 'A' <= b && b <= 'F': | ||
b = b - 'A' + 10 | ||
default: | ||
return 0, errors.New("invalid byte in chunk length") | ||
} | ||
n |= uint64(b) | ||
} | ||
return | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
// Based on net/http/internal | ||
package proxy | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"testing" | ||
) | ||
|
||
func assertNextChunk(t *testing.T, r *ChunkedReader, expected string) { | ||
if !r.Next() { | ||
t.Fatalf("Expected chunk, but ran out early: %v", r.Err()) | ||
} | ||
if r.Err() != nil { | ||
t.Fatalf("Error reading chunk: %q", r.Err()) | ||
} | ||
data, err := ioutil.ReadAll(r.Chunk()) | ||
if g := string(data); g != expected { | ||
t.Errorf("chunk reader read %q; want %q", g, expected) | ||
} | ||
if err != nil { | ||
t.Logf(`data: "%s"`, data) | ||
t.Fatalf("reading chunk: %v", err) | ||
} | ||
} | ||
|
||
func assertNoMoreChunks(t *testing.T, r *ChunkedReader) { | ||
if r.Next() { | ||
t.Errorf("Expected no more chunks, but found too many") | ||
} | ||
if r.Err() != nil { | ||
t.Errorf("Expected no error, but found: %q", r.Err()) | ||
} | ||
} | ||
|
||
func TestChunk(t *testing.T) { | ||
var b bytes.Buffer | ||
|
||
w := NewChunkedWriter(&b) | ||
const chunk1 = "hello, " | ||
const chunk2 = "world! 0123456789abcdef" | ||
w.Write([]byte(chunk1)) | ||
w.Write([]byte(chunk2)) | ||
w.Close() | ||
|
||
r := NewChunkedReader(&b) | ||
|
||
assertNextChunk(t, r, chunk1) | ||
assertNextChunk(t, r, chunk2) | ||
assertNoMoreChunks(t, r) | ||
} | ||
|
||
func TestIncompleteReadOfChunk(t *testing.T) { | ||
var b bytes.Buffer | ||
|
||
w := NewChunkedWriter(&b) | ||
const chunk1 = "hello, " | ||
const chunk2 = "world! 0123456789abcdef" | ||
w.Write([]byte(chunk1)) | ||
w.Write([]byte(chunk2)) | ||
w.Close() | ||
|
||
r := NewChunkedReader(&b) | ||
|
||
// Incomplete read of first chunk | ||
{ | ||
if !r.Next() { | ||
t.Fatalf("Expected chunk, but ran out early: %v", r.Err()) | ||
} | ||
if r.Err() != nil { | ||
t.Fatalf("Error reading chunk: %q", r.Err()) | ||
} | ||
// Read just 2 bytes | ||
buf := make([]byte, 2) | ||
if _, err := io.ReadFull(r.Chunk(), buf[:2]); err != nil { | ||
t.Fatalf("Error reading first bytes of chunk: %q", err) | ||
} | ||
if buf[0] != 'h' || buf[1] != 'e' { | ||
t.Fatalf("Unexpected first 2 bytes of chunk: %s", string(buf)) | ||
} | ||
} | ||
|
||
// Second chunk still reads ok | ||
assertNextChunk(t, r, chunk2) | ||
|
||
assertNoMoreChunks(t, r) | ||
} | ||
|
||
func TestMalformedChunks(t *testing.T) { | ||
r := NewChunkedReader(bytes.NewBufferString( | ||
"7\r\nhello, GARBAGEBYTES17\r\nworld! 0123456789abcdef\r\n0\r\n", | ||
)) | ||
|
||
// First chunk is ok | ||
assertNextChunk(t, r, "hello, ") | ||
|
||
// Second chunk fails | ||
{ | ||
if r.Next() { | ||
t.Errorf("Expected failure when reading chunks, but got one") | ||
} | ||
e := "malformed chunked encoding" | ||
if r.Err() == nil || r.Err().Error() != e { | ||
t.Errorf("chunk reader errored %q; want %q", r.Err(), e) | ||
} | ||
data, err := ioutil.ReadAll(r.Chunk()) | ||
if len(data) != 0 { | ||
t.Errorf("chunk should have been empty. got %q", string(data)) | ||
} | ||
if err != nil { | ||
t.Logf(`data: "%s"`, data) | ||
t.Errorf("reading chunk: %v", err) | ||
} | ||
} | ||
|
||
assertNoMoreChunks(t, r) | ||
} | ||
|
||
// Stolen from net/http/internal for testing | ||
// | ||
// NewChunkedWriter returns a new chunkedWriter that translates writes into HTTP | ||
// "chunked" format before writing them to w. Closing the returned chunkedWriter | ||
// sends the final 0-length chunk that marks the end of the stream. | ||
// | ||
// NewChunkedWriter is not needed by normal applications. The http | ||
// package adds chunking automatically if handlers don't set a | ||
// Content-Length header. Using newChunkedWriter inside a handler | ||
// would result in double chunking or chunking with a Content-Length | ||
// length, both of which are wrong. | ||
func NewChunkedWriter(w io.Writer) io.WriteCloser { | ||
return &chunkedWriter{w} | ||
} | ||
|
||
// Writing to chunkedWriter translates to writing in HTTP chunked Transfer | ||
// Encoding wire format to the underlying Wire chunkedWriter. | ||
type chunkedWriter struct { | ||
Wire io.Writer | ||
} | ||
|
||
// Write the contents of data as one chunk to Wire. | ||
// NOTE: Note that the corresponding chunk-writing procedure in Conn.Write has | ||
// a bug since it does not check for success of io.WriteString | ||
func (cw *chunkedWriter) Write(data []byte) (n int, err error) { | ||
|
||
// Don't send 0-length data. It looks like EOF for chunked encoding. | ||
if len(data) == 0 { | ||
return 0, nil | ||
} | ||
|
||
if _, err = fmt.Fprintf(cw.Wire, "%x\r\n", len(data)); err != nil { | ||
return 0, err | ||
} | ||
if n, err = cw.Wire.Write(data); err != nil { | ||
return | ||
} | ||
if n != len(data) { | ||
err = io.ErrShortWrite | ||
return | ||
} | ||
_, err = io.WriteString(cw.Wire, "\r\n") | ||
|
||
return | ||
} | ||
|
||
func (cw *chunkedWriter) Close() error { | ||
_, err := io.WriteString(cw.Wire, "0\r\n") | ||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters