Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reponse body io.Reader #411

Closed
qJkee opened this issue Sep 11, 2018 · 40 comments
Closed

Reponse body io.Reader #411

qJkee opened this issue Sep 11, 2018 · 40 comments

Comments

@qJkee
Copy link

qJkee commented Sep 11, 2018

Hey guys. How to set custom response body writer?
I've tried to use SetBodyStream, but i don't see any body readers from response which i need to pass in io.Reader.
P.S I'm trying to implement throttler.

@qJkee
Copy link
Author

qJkee commented Sep 11, 2018

Problem is, that I can't get/reach Responce.bodyStream

@erikdubbelboer
Copy link
Collaborator

Here is a simple example that throttles the response to 2 bytes per second:

package main

import (
	"bufio"
	"io"
	"net/http"
	"time"

	"github.com/valyala/fasthttp"
)

func rateLimited(ctx *fasthttp.RequestCtx) {
	body := []byte("this is a test")

	ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
		i := 0
		for i < len(body) {
			end := i + 2
			if end >= len(body) {
				end = len(body) - 1
			}

			w.Write(body[i:end])
			w.Flush()

			i += 2

			time.Sleep(time.Second)
		}
	})
}

func main() {
	go func() {
		if err := fasthttp.ListenAndServe(":8080", rateLimited); err != nil {
			panic(err)
		}
	}()

	res, err := http.Get("http://localhost:8080")
	if err != nil {
		panic(err)
	}

	for {
		var b [2]byte

		if _, err := res.Body.Read(b[:]); err == io.EOF {
			break
		} else if err != nil {
			panic(err)
		}

		println(string(b[:]))
	}
}

@qJkee
Copy link
Author

qJkee commented Sep 11, 2018

It's correct.
But i need to read the body from fasthttp.Response(i've done request to another website), not from static one

@qJkee
Copy link
Author

qJkee commented Sep 11, 2018

@erikdubbelboer
?

@erikdubbelboer
Copy link
Collaborator

@qJkee Just use Response.Body()

@qJkee
Copy link
Author

qJkee commented Sep 12, 2018

@erikdubbelboer
It will return me []byte, not io.Reader

@erikdubbelboer
Copy link
Collaborator

@qJkee look at my example! it's not using any io.Reader.

@dgrr
Copy link
Contributor

dgrr commented Sep 12, 2018

I think @qJkee wants to make client request and read the response from io.Reader. And this is not possible using fasthttp.

@erikdubbelboer
Copy link
Collaborator

@qJkee if that is what you want I'm afraid @dgrr is correct that it's not possible. Fasthttp always reads the whole response body before returning.

@qJkee
Copy link
Author

qJkee commented Sep 13, 2018

Why you can't provide some getter/setter for that?
Because it would be good to be able to replace io.Reader with some custom one. For example, in case of throttling for downloading huge files, it's a perfect solution

@erikdubbelboer
Copy link
Collaborator

@qJkee because this requires huge internal changes. The way things work internally in fasthttp wasn't designed for this at all.

To demonstrate this I actually wrote a version that would add support for this with backwards compatibility and keeping it possible to work with 0 heap allocations.

You can find my changes here: erikdubbelboer@6951527

It's quite ugly and complicated code. It introduces 2 more sync.Pool uses with each request. And it increases the coupling between the different components which I don't like that much either.

In the current state the API could also use some improvements. Right now you have to set Response.ReturnBodyReader to true in which case after the request Response.BodyReader contains an io.ReadCloser which allows you to read the body.

Just like net/http it requires you to read the whole body and call Response.BodyReader.Close() for the connection to be reused for future request. On top of that right now it also requires calling ReleaseBodyReader(Response.BodyReader) but I think this can be merged into the Response.BodyReader.Close call.

A full example can be found in the benchmarks that I added to make sure I introduces no new heap allocations.

When I have time in the future I might improve this and at some point merge it into the master branch.

@carlonelong
Copy link

carlonelong commented Dec 14, 2018

Hi @erikdubbelboer I tried your code. The "Do" call would not block, but I could not read anything out from the resp.BodyReader until the "Do" Call timed out.
My code is like this:

	client := &fasthttp.Client{
		DialDualStack: true,
		// ReadTimeout:                   time.Second,
		DisableHeaderNamesNormalizing: true,
	}
	var req fasthttp.Request
	req.Header.SetMethod("POST")
	// set some header
	req.SetRequestURI(url)
	req.SetBody(data)

	resp := fasthttp.AcquireResponse()
	resp.ReturnBodyReader = true
	err = c.client.Do(&req, resp)
	// check err && return resp

Another goroutine reading resp.BodyReader would block until timeout(if the client's ReadTimeout is set).

Did I miss something?

@erikdubbelboer
Copy link
Collaborator

I would have to see more code. I just wrote this simple test program which works fine:

package main
  
import (
  "fmt"
  "io/ioutil"
  "time"

  "github.com/erikdubbelboer/fasthttp"
)

func readBody(resp *fasthttp.Response) {
  body, err := ioutil.ReadAll(resp.BodyReader)
  if err != nil {
    panic(err)
  }
  fmt.Printf("read %d bytes of body\n", len(body))
}

func main() {
  client := &fasthttp.Client{
    DialDualStack:                 true,
    DisableHeaderNamesNormalizing: true,
  }

  req := fasthttp.AcquireRequest()
  req.Header.SetMethod("GET")
  req.SetRequestURI("https://www.google.com/")

  resp := fasthttp.AcquireResponse()
  resp.ReturnBodyReader = true

  err := client.Do(req, resp)
  if err != nil {
    panic(err)
  }

  go readBody(resp)

  fmt.Println("after go readBody")

  // give readBody 10 seconds to read the body before we exit.
  time.Sleep(time.Second * 10)
}

@carlonelong
Copy link

carlonelong commented Dec 17, 2018

@erikdubbelboer HI. My case is that the client/server will hold the connection for a very long time(like forever). The server will send messages with uncertain interval in this connection. And ReadAll will block. As I can see, the BodyReader will only be filled after the connection is closed. Am I right?

The server's code is like this

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

func main() {
	http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
		flusher, ok := w.(http.Flusher)
		if !ok {
			panic("expected http.ResponseWriter to be an http.Flusher")
		}
		for i := 1; ; i++ {
			fmt.Fprintf(w, "Chunk #%d\n", i)
			flusher.Flush() // Trigger "chunked" encoding and send a chunk...
			time.Sleep(500 * time.Millisecond)
		}
	})

	log.Print("Listening on localhost:8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

@erikdubbelboer
Copy link
Collaborator

I think I know what is happening. You are trying to read a buffer that is bigger than the current chunk and it hangs trying to read those extra bytes (which only arrive at the next chunk).

I have changed my implementation a bit so Read() now returns as soon as a chunk is finished. This way the following example works fine for me:

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/erikdubbelboer/fasthttp"
)

func readBody(resp *fasthttp.Response) {
	// 40 bytes is bigger than the chunk of 9 we're about to receive.
	b := make([]byte, 40)

	for {
		if n, err := resp.BodyReader.Read(b); err != nil {
			break
		} else {
			log.Printf("body: %q", b[:n])
		}
	}
}

func doRequest() {
	client := &fasthttp.Client{
		DialDualStack:                 true,
		DisableHeaderNamesNormalizing: true,
	}

	req := fasthttp.AcquireRequest()
	req.Header.SetMethod("GET")
	req.SetRequestURI("http://localhost:8080/test")

	resp := fasthttp.AcquireResponse()
	resp.ReturnBodyReader = true

	err := client.Do(req, resp)
	if err != nil {
		panic(err)
	}

	go readBody(resp)

	log.Println("after go readBody")
}

func main() {
	http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
		flusher, ok := w.(http.Flusher)
		if !ok {
			panic("expected http.ResponseWriter to be an http.Flusher")
		}
		for i := 1; ; i++ {
			fmt.Fprintf(w, "Chunk #%d\n", i)
			flusher.Flush() // Trigger "chunked" encoding and send a chunk...
			time.Sleep(500 * time.Millisecond)
		}
	})

	go func() {
		log.Print("Listening on localhost:8080")
		log.Fatal(http.ListenAndServe(":8080", nil))
	}()

	// Give the server 100ms to start.
	time.Sleep(time.Millisecond * 100)

	doRequest()

	// Exit the program after 10 seconds.
	time.Sleep(time.Second * 10)
}

@carlonelong
Copy link

OK. It works. And here comes up another question. How could I set the proper buffer size... ? I just want to use some code like this:

reader := bufio.NewReader(data) // I have to use bufio.NewReaderSize(data, 100) instead
for {
            fmt.Printf("decoding %#v\n", reader)
            lengthStr, err := reader.ReadString('\n')
            fmt.Printf("find a ln\n")
            if err != nil {
                    return err
            }
            //  do something.
}

Background is that this "bufio.NewReader" use http.Response before, and I want to change it to fasthttp.Response.
Thanks!

@erikdubbelboer
Copy link
Collaborator

You don't really need to set an initial buffer size. Let the reader figure that out itself. Your code works fine. Try replacing the function in my example with this:

func readBody(resp *fasthttp.Response) {
  reader := bufio.NewReader(resp.BodyReader)

  for {
    s, err := reader.ReadString('\n')
    if err != nil {
      break
    } else {
      log.Printf("body: %q", s)
    }
  }
}

@carlonelong
Copy link

Not specifying size would go back to the first problem I mentioned above... I can never read out the '\n' from resp.BodyReader.

@erikdubbelboer
Copy link
Collaborator

The problem you had was fixed when I introduced the commit that makes Read return when a chunk is finished.

@carlonelong
Copy link

Sorry. Missed this.
It now works fine. Thanks!

@Arnold1
Copy link

Arnold1 commented Apr 11, 2019

Fasthttp always reads the whole response body before returning

@erikdubbelboer is that still true, or is there a way to use read the response body with io.Reader?

@erikdubbelboer
Copy link
Collaborator

@Arnold1 I'm afraid that this is still true, the code I wrote above to add support for this isn't fit to be merged and I'm not sure if I could ever get it good enough. Reading the whole body is just how fasthttp was designed I'm afraid.

@Arnold1
Copy link

Arnold1 commented Apr 13, 2019

@erikdubbelboer whats the best way to achieve high thoughput + low latency with fasthttp? my server will receive a high number of concurrent requests and needs to perform a small calculation for each request (~5-15ms).

i have code like that:

router := fasthttprouter.New()
router.POST("/calcResp", calcResp)
log.Fatal(fasthttp.ListenAndServe(":8080"))

func calcResp(ctx *fasthttp.RequestCtx) error {
   var request = Request{}

   // check content length

   body = ctx.PostBody()
   err := json.Unmarshal(body, &request)
   // do error handling

   modelData := readModelData(someURL)
  
   timeOutChan := time.After(30 * time.Millisecond)

   var allResp []*response
   for _, model := range modelData {
      select {
		default:
                // if calc takes longer than 30ms send back http.StatusRequestTimeout 
		case <-timeOutChan:
                    ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
		    respBody, _ := json.Marshal(errMessage)
                    ctx.Write(respBody)
       }
    
       // do calculation ... 5-7ms, but can be called multiple times, bc i range over modelData...
       response := calcResponse(jsonData)
       allResp = append(allResp, response)
   }

   // json marshal
   ctx.Write(allResponseJSON)
}

also where can i set fasthttp.Server Concurrency?

@erikdubbelboer
Copy link
Collaborator

To set server options use:

	s := &fasthttp.Server{
		Handler:               router.Handler,
		ReadTimeout:           time.Hour,
		WriteTimeout:          time.Hour,
		ReadBufferSize:        4096 * 6,
		WriteBufferSize:       4096 * 6,
		NoDefaultServerHeader: true,
	}
	log.Fatal(s.ListenAndServe(":8080"))

You probably don't have to change the concurrency as the default is 262144 concurrent connections which should always be enough.

If you really want to have the best performance I would suggest not using fasthttprouter but doing your own simple routing. Unless of course your routing is complex in which case fasthttprouter is the better option.

For JSON I would suggest using https://github.com/mailru/easyjson for best performance.

Your timeout code doesn't make sense this way, it needs to be something like this:

	done := make(chan struct{})

	// Preallocate the correct slice capacity already for optimal performance.
	allResp := make([]*response, 0, len(modelData))
	
	// Do the processing in a goroutine so we can abandon it if it takes too long.
	go func() {
		defer close(done)

		// In theory you could also do each calcResponse in parallel goroutines
		// to speed it up even more if its slow.
		for _, model := range modelData {
			// do calculation ... 5-7ms, but can be called multiple times, bc i range over modelData...
			response := calcResponse(jsonData)
			allResp = append(allResp, response)
		}
	}()

	// Either wait until the processing is done, or the timeout.
	select {
	case done:
	case <-time.After(30 * time.Millisecond):
		ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
		respBody, _ := json.Marshal(errMessage)
		ctx.Write(respBody)
		return // If we timeout we should return here and not send back the normal response.
	}
	
	ctx.Write(allResponseJSON)

@Arnold1
Copy link

Arnold1 commented Apr 14, 2019

hi can you do the following without fasthttprouter?

router := fasthttprouter.New()
router.POST("/getResp", logWrapper(HttpErrWrapper(calcResp)))

@erikdubbelboer
Copy link
Collaborator

Just check the paths in your one handler:

var (
	strSlashGetResp = []byte("/getResp")
)

func handler(ctx *fasthttp.RequestCtx) error {
	if bytes.Equal(ctx.Path(), strSlashGetResp) {
		// ...
	}
}

@Arnold1
Copy link

Arnold1 commented Apr 15, 2019

@erikdubbelboer how can i run the for _, model := range modelData { in parallel?

@erikdubbelboer
Copy link
Collaborator

Something like:

	done := make(chan struct{}, len(modelData))

	// Preallocate the correct slice capacity already for optimal performance.
	allResp := make([]*response, 0, len(modelData))
	var allRespMu sync.Mutex

	// In theory you could also do each calcResponse in parallel goroutines
	// to speed it up even more if its slow.
	for _, model := range modelData {
		go func(model) {
			defer func() {
				done<-struct{}{}
			}()

			// do calculation ... 5-7ms, but can be called multiple times, bc i range over modelData...
			response := calcResponse(jsonData)

			allRespMu.Lock()
			allResp = append(allResp, response)
			allRespMu.Unlock()
		}(model)
	}

	// Either wait until the processing is done, or the timeout.
	for _ := range modelData {
		select {
		case <-done:
		case <-time.After(30 * time.Millisecond):
			ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
			respBody, _ := json.Marshal(errMessage)
			ctx.Write(respBody)
			return // If we timeout we should return here and not send back the normal response.
		}
	}
	
	allRespMu.Lock()
	ctx.Write(allResp)

Or you could even use the channel to pass down the results instead of using a Mutex. That would be better but I don't have the time to write that now.

@Arnold1
Copy link

Arnold1 commented May 3, 2019

@erikdubbelboer
is there any issue with i do the following?

// ctx from fasthttp ... ctx *fasthttp.RequestCtx
cancelFunc, cancel := context.WithTimeout(ctx, time.Duration(30)*time.Millisecond)
defer cancel()
resultsCh := make(chan *Response, len(modelData))
results := make([]*Response, len(modelData))

for _, model := range modelData {
	go func(model *ModelData) {
		// do calculation
		response := calcResponse(jsonData)

		resultsCh <- &Response{
			Results:           response,
			MetaData:       model.ModelID,
			// and other things...
		}

	}(model)
}

i := 0
for {
	select {
	case <-cancelFunc.Done():
		status = fasthttp.StatusRequestTimeout // 408

                // i might want to return partial data...
		/*
                ctx.Write(Results{Results: results[0:i]})
                */
		return nil
	case r := <-resultsCh:
		results[i] = r
		i++
		if i == len(modelData) {
			ctx.Write(Results{Results: results})
			return nil
		}
	}
}

@erikdubbelboer
Copy link
Collaborator

Arnold1 Yes that's perfect.

@Arnold1
Copy link

Arnold1 commented May 3, 2019

for some reason the the code works slower...hm...

@Arnold1
Copy link

Arnold1 commented May 3, 2019

@erikdubbelboer
what is the diff between 1 and 2?

1.)

func calcResponse(ctx *fasthttp.RequestCtx) {
   cancelFunc, cancel := context.WithTimeout(ctx, time.Duration(30)*time.Millisecond)
   //...
}
func calcResponse(ctx *fasthttp.RequestCtx) {
   cancelFunc, cancel := context.WithTimeout(context.Background(), time.Duration(30)*time.Millisecond)
   //...
}

@erikdubbelboer
Copy link
Collaborator

In case of 1 your context will also be cancelled if you shut down the server using server.Shutdown().

@geraldstanje
Copy link

@erikdubbelboer is there any obvious reason why mutex version works faster than the context version i posted before?

@erikdubbelboer
Copy link
Collaborator

In theory the channel version should be slightly faster as channels can be slightly faster for the scheduler. But in practice they are probably the same speed.

@Arnold1
Copy link

Arnold1 commented May 3, 2019

@erikdubbelboer

type ErrorHandlerFunc func(ctx *fasthttp.RequestCtx) error

func startApp() {
    router := fasthttprouter.New()
    router.POST("/Response", LogWrapper(HTTPErrorWrapper(ResponseRate)))
}

func LogWrapper(f fasthttp.RequestHandler) fasthttp.RequestHandler {
}

func HTTPErrorWrapper(errorHandlerFunc ErrorHandlerFunc) fasthttp.RequestHandler {
}

func ResponseRate(ctx *fasthttp.RequestCtx) error {
//....
for _ := range modelData {
        // Either wait until the processing is done, or the timeout.
	select {
	case done:
	case <-time.After(30 * time.Millisecond):
		ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
		respBody, _ := json.Marshal(errMessage)
		ctx.Write(respBody)
		return // If we timeout we should return here and not send back the normal response.
	}
}

is there a way i can benchmark the code? i want to figure out why the version with context is slower...

@erikdubbelboer
case <-time.After(30 * time.Millisecond): ... i think thats not what i want... is it 30 ms per item? i want the total max time to be 30ms....

@erikdubbelboer
Copy link
Collaborator

Use: https://golang.org/pkg/testing/#hdr-Benchmarks

In that case just assign the time.After(30 * time.Millisecond) to a variable outside the loop so it is only started once.

@Arnold1
Copy link

Arnold1 commented May 4, 2019

@erikdubbelboer cool, do you have a sample where you benchmark fasthttp code? should i send a request to /Response endpoint and benchmark that?

@erikdubbelboer
Copy link
Collaborator

@Fenny
Copy link
Contributor

Fenny commented Aug 16, 2020

I think his question has been answered with examples and links to specific resources, can we close this? @erikdubbelboer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants