Skip to content

Commit

Permalink
rafthttp: fix race between streamReader.stop() and connection closer
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Aug 15, 2016
1 parent 7b11c28 commit 56d1226
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
11 changes: 10 additions & 1 deletion rafthttp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,16 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
default:
plog.Panicf("unhandled stream type %s", t)
}
cr.closer = rc
select {
case <-cr.stopc:
cr.mu.Unlock()
if err := rc.Close(); err != nil {
return err
}
return io.EOF
default:
cr.closer = rc
}
cr.mu.Unlock()

for {
Expand Down
51 changes: 51 additions & 0 deletions rafthttp/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package rafthttp
import (
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
Expand Down Expand Up @@ -180,6 +181,56 @@ func TestStreamReaderDialResult(t *testing.T) {
}
}

// TestStreamReaderStopOnConnect tests a stream reader closes the connection on stop.
func TestStreamReaderStopOnConnect(t *testing.T) {
defer testutil.AfterTest(t)
h := http.Header{}
h.Add("X-Server-Version", version.Version)
tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
typ: streamTypeMessage,
status: newPeerStatus(types.ID(2)),
}
tr.onResp = func() {
go sr.stop()
time.Sleep(10 * time.Millisecond)
}
sr.start()
select {
case <-sr.done:
case <-time.After(time.Second):
t.Fatal("did not close in time")
}
}

type respWaitRoundTripper struct {
rrt *respRoundTripper
onResp func()
}

func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := t.rrt.RoundTrip(req)
resp.Body = newWaitReadCloser()
t.onResp()
return resp, err
}

type waitReadCloser struct{ closec chan struct{} }

func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
func (wrc *waitReadCloser) Read(p []byte) (int, error) {
<-wrc.closec
return 0, io.EOF
}
func (wrc *waitReadCloser) Close() error {
close(wrc.closec)
return nil
}

// TestStreamReaderDialDetectUnsupport tests that dial func could find
// out that the stream type is not supported by the remote.
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
Expand Down

0 comments on commit 56d1226

Please sign in to comment.