Skip to content

Commit

Permalink
early data for the client, fix context cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
mmmray committed Jun 20, 2024
1 parent efcd8db commit 7605a4d
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 59 deletions.
133 changes: 76 additions & 57 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/common/signal/semaphore"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/transport/internet"
Expand Down Expand Up @@ -44,18 +45,6 @@ var (
globalDialerAccess sync.Mutex
)

func destroyHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) {
globalDialerAccess.Lock()
defer globalDialerAccess.Unlock()

if globalDialerMap == nil {
globalDialerMap = make(map[dialerConf]reusedClient)
}

delete(globalDialerMap, dialerConf{dest, streamSettings})

}

func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) reusedClient {
globalDialerAccess.Lock()
defer globalDialerAccess.Unlock()
Expand All @@ -77,15 +66,15 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
}

dialContext := func(ctxInner context.Context) (net.Conn, error) {
conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
conn, err := internet.DialSystem(ctxInner, dest, streamSettings.SocketSettings)
if err != nil {
return nil, err
}

if gotlsConfig != nil {
if fingerprint := tls.GetFingerprint(tlsConfig.Fingerprint); fingerprint != nil {
conn = tls.UClient(conn, gotlsConfig, fingerprint)
if err := conn.(*tls.UConn).HandshakeContext(ctx); err != nil {
if err := conn.(*tls.UConn).HandshakeContext(ctxInner); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -171,47 +160,71 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

var remoteAddr gonet.Addr
var localAddr gonet.Addr
// this is done when the TCP/UDP connection to the server was established,
// and we can unblock the Dial function and print correct net addresses in
// logs
gotConn := done.New()

trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
remoteAddr = connInfo.Conn.RemoteAddr()
localAddr = connInfo.Conn.LocalAddr()
},
}
var downResponse io.ReadCloser
gotDownResponse := done.New()

sessionIdUuid := uuid.New()
sessionId := sessionIdUuid.String()

req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(ctx, trace),
"GET",
requestURL.String()+sessionId,
nil,
)
if err != nil {
return nil, err
}
go func() {
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
remoteAddr = connInfo.Conn.RemoteAddr()
localAddr = connInfo.Conn.LocalAddr()
gotConn.Close()
},
}

req.Header = transportConfiguration.GetRequestHeader()

downResponse, err := httpClient.download.Do(req)
if err != nil {
// workaround for various connection pool related issues, mostly around
// HTTP/1.1. if the http client ever fails to send a request, we simply
// delete it entirely.
// in HTTP/1.1, it was observed that pool connections would immediately
// fail with "context canceled" if the previous http response body was
// not explicitly BOTH drained and closed. at the same time, sometimes
// the draining itself takes forever and causes more problems.
// see also https://github.com/golang/go/issues/60240
destroyHTTPClient(ctx, dest, streamSettings)
return nil, newError("failed to send download http request, destroying client").Base(err)
}
// in case we hit an error, we want to unblock this part
defer gotConn.Close()

if downResponse.StatusCode != 200 {
downResponse.Body.Close()
return nil, newError("invalid status code on download:", downResponse.Status)
}
req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(context.WithoutCancel(ctx), trace),
"GET",
requestURL.String()+sessionId,
nil,
)
if err != nil {
newError("failed to construct download http request").Base(err).WriteToLog()
gotDownResponse.Close()
return
}

req.Header = transportConfiguration.GetRequestHeader()

response, err := httpClient.download.Do(req)
gotConn.Close()
if err != nil {
newError("failed to send download http request").Base(err).WriteToLog()
gotDownResponse.Close()
return
}

if response.StatusCode != 200 {
response.Body.Close()
newError("invalid status code on download:", response.Status).WriteToLog()
gotDownResponse.Close()
return
}

// skip "ok" response
trashHeader := []byte{0, 0}
_, err = io.ReadFull(response.Body, trashHeader)
if err != nil {
response.Body.Close()
newError("failed to read initial response").Base(err).WriteToLog()
gotDownResponse.Close()
return
}

downResponse = response.Body
gotDownResponse.Close()
}()

uploadUrl := requestURL.String() + sessionId + "/"

Expand Down Expand Up @@ -266,7 +279,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
for i := 0; i < 5; i++ {
uploadConn = httpClient.uploadRawPool.Get()
if uploadConn == nil {
uploadConn, err = httpClient.dialUploadConn(ctx)
uploadConn, err = httpClient.dialUploadConn(context.WithoutCancel(ctx))
if err != nil {
newError("failed to connect upload").Base(err).WriteToLog()
uploadPipeReader.Interrupt()
Expand All @@ -293,21 +306,27 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
}
}()

// skip "ok" response
trashHeader := []byte{0, 0}
_, err = io.ReadFull(downResponse.Body, trashHeader)
if err != nil {
downResponse.Body.Close()
return nil, newError("failed to read initial response")
}
// we want to block Dial until we know the remote address of the server,
// for logging purposes
<-gotConn.Wait()

// necessary in order to send larger chunks in upload
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter)
bufferedUploadPipeWriter.SetBuffered(false)

lazyDownload := &LazyReader{
CreateReader: func() (io.ReadCloser, error) {
<-gotDownResponse.Wait()
if downResponse == nil {
return nil, newError("downResponse failed")
}
return downResponse, nil
},
}

conn := splitConn{
writer: bufferedUploadPipeWriter,
reader: downResponse.Body,
reader: lazyDownload,
remoteAddr: remoteAddr,
localAddr: localAddr,
}
Expand Down
4 changes: 2 additions & 2 deletions transport/internet/splithttp/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessi
shouldReap := done.New()
go func() {
time.Sleep(30 * time.Second)
shouldReap.Done()
shouldReap.Close()
}()

select {
Expand Down Expand Up @@ -154,7 +154,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req

// after GET is done, the connection is finished. disable automatic
// session reaping, and handle it in defer
currentSession.isFullyConnected.Done()
currentSession.isFullyConnected.Close()
defer h.sessions.Delete(sessionId)

// magic header instructs nginx + apache to not buffer response body
Expand Down
57 changes: 57 additions & 0 deletions transport/internet/splithttp/lazy_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package splithttp

import (
"io"
"sync"
)

type LazyReader struct {
readerSync sync.Mutex
CreateReader func() (io.ReadCloser, error)
reader io.ReadCloser
readerError error
}

func (r *LazyReader) getReader() (io.ReadCloser, error) {
r.readerSync.Lock()
defer r.readerSync.Unlock()
if r.reader != nil {
return r.reader, nil
}

if r.readerError != nil {
return nil, r.readerError
}

reader, err := r.CreateReader()
if err != nil {
r.readerError = err
return nil, err
}

r.reader = reader
return reader, nil
}

func (r *LazyReader) Read(b []byte) (int, error) {
reader, err := r.getReader()
if err != nil {
return 0, err
}
n, err := reader.Read(b)
return n, err
}

func (r *LazyReader) Close() error {
r.readerSync.Lock()
defer r.readerSync.Unlock()

var err error
if r.reader != nil {
err = r.reader.Close()
r.reader = nil
r.readerError = newError("closed reader")
}

return err
}

0 comments on commit 7605a4d

Please sign in to comment.