Skip to content

Commit

Permalink
XHTTP server: Finish stream-up's HTTP POST when its request.Body is c…
Browse files Browse the repository at this point in the history
…losed

#4373 (comment)

Fixes #4373
  • Loading branch information
RPRX authored Feb 10, 2025
1 parent 2d7ca4a commit dcd7e92
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions transport/internet/splithttp/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
scMaxEachPostBytes := int(h.ln.config.GetNormalizedScMaxEachPostBytes().To)

if request.Method == "POST" && sessionId != "" {
if request.Method == "POST" && sessionId != "" { // stream-up, packet-up
seq := ""
if len(subpath) > 1 {
seq = subpath[1]
Expand All @@ -173,8 +173,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
writer.WriteHeader(http.StatusBadRequest)
return
}
uploadDone := done.New()
err = currentSession.uploadQueue.Push(Packet{
Reader: request.Body,
Reader: &httpRequestBodyReader{
requestReader: request.Body,
uploadDone: uploadDone,
},
})
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)")
Expand All @@ -199,8 +203,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
}()
}
<-request.Context().Done()
select {
case <-request.Context().Done():
case <-uploadDone.Wait():
}
}
uploadDone.Close()
return
}

Expand Down Expand Up @@ -243,7 +251,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}

writer.WriteHeader(http.StatusOK)
} else if request.Method == "GET" || sessionId == "" {
} else if request.Method == "GET" || sessionId == "" { // stream-down, stream-one
responseFlusher, ok := writer.(http.Flusher)
if !ok {
panic("expected http.ResponseWriter to be an http.Flusher")
Expand Down Expand Up @@ -283,7 +291,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
reader: request.Body,
remoteAddr: remoteAddr,
}
if sessionId != "" {
if sessionId != "" { // if not stream-one
conn.reader = currentSession.uploadQueue
}

Expand All @@ -302,6 +310,20 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
}

type httpRequestBodyReader struct {
requestReader io.ReadCloser
uploadDone *done.Instance
}

func (c *httpRequestBodyReader) Read(b []byte) (int, error) {
return c.requestReader.Read(b)
}

func (c *httpRequestBodyReader) Close() error {
defer c.uploadDone.Close()
return c.requestReader.Close()
}

type httpResponseBodyWriter struct {
sync.Mutex
responseWriter http.ResponseWriter
Expand Down

0 comments on commit dcd7e92

Please sign in to comment.