From 885f807f0d42dfc63fee1a3379756c48dd19521d Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Thu, 12 Sep 2024 15:26:49 -0700 Subject: [PATCH 01/10] =?UTF-8?q?=F0=9F=94=A5=20Feature:=20Add=20SendStrea?= =?UTF-8?q?mWriter=20to=20Ctx?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create a new `*DefaultCtx` method called `SendStreamWriter()` that maps to fasthttp's `Response.SetBodyStreamWriter()` --- ctx.go | 9 ++++++++- ctx_interface_gen.go | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/ctx.go b/ctx.go index 4d7417ee2a..b17959374b 100644 --- a/ctx.go +++ b/ctx.go @@ -5,6 +5,7 @@ package fiber import ( + "bufio" "bytes" "context" "crypto/tls" @@ -45,7 +46,6 @@ const userContextKey contextKey = 0 // __local_user_context__ // DefaultCtx is the default implementation of the Ctx interface // generation tool `go install github.com/vburenin/ifacemaker@975a95966976eeb2d4365a7fb236e274c54da64c` // https://github.com/vburenin/ifacemaker/blob/975a95966976eeb2d4365a7fb236e274c54da64c/ifacemaker.go#L14-L30 -// //go:generate ifacemaker --file ctx.go --struct DefaultCtx --iface Ctx --pkg fiber --output ctx_interface_gen.go --not-exported true --iface-comment "Ctx represents the Context which hold the HTTP request and response.\nIt has methods for the request query string, parameters, body, HTTP headers and so on." type DefaultCtx struct { app *App // Reference to *App @@ -1669,6 +1669,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error { return nil } +// SendStreamWriter sets response body stream writer +func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error { + c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter)) + + return nil +} + // Set sets the response's HTTP header field to the specified key, value. func (c *DefaultCtx) Set(key, val string) { c.fasthttp.Response.Header.Set(key, val) diff --git a/ctx_interface_gen.go b/ctx_interface_gen.go index 7709f7c929..35b9de1e1c 100644 --- a/ctx_interface_gen.go +++ b/ctx_interface_gen.go @@ -3,6 +3,7 @@ package fiber import ( + "bufio" "context" "crypto/tls" "io" @@ -282,6 +283,8 @@ type Ctx interface { SendString(body string) error // SendStream sets response body stream and optional body size. SendStream(stream io.Reader, size ...int) error + // SendStreamWriter sets response body stream writer + SendStreamWriter(streamWriter func(*bufio.Writer)) error // Set sets the response's HTTP header field to the specified key, value. Set(key, val string) setCanonical(key, val string) From 1ff06dcbc997a8895c599625ad5355e4d4cccbbc Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Thu, 12 Sep 2024 17:00:34 -0700 Subject: [PATCH 02/10] =?UTF-8?q?=F0=9F=9A=A8=20Test:=20Validate=20regular?= =?UTF-8?q?=20use=20of=20c.SendStreamWriter()?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adds Test_Ctx_SendStreamWriter to ctx_test.go --- ctx_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/ctx_test.go b/ctx_test.go index 6dcd74109e..45b094b13c 100644 --- a/ctx_test.go +++ b/ctx_test.go @@ -4327,6 +4327,35 @@ func Test_Ctx_SendStream(t *testing.T) { require.Equal(t, "Hello bufio", string(c.Response().Body())) } +// go test -run Test_Ctx_SendStreamWriter +func Test_Ctx_SendStreamWriter(t *testing.T) { + t.Parallel() + app := New() + c := app.AcquireCtx(&fasthttp.RequestCtx{}) + + err := c.SendStreamWriter(func(w *bufio.Writer) { + w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error + }) + require.NoError(t, err) + require.Equal(t, "Don't crash please", string(c.Response().Body())) + + err = c.SendStreamWriter(func(w *bufio.Writer) { + for lineNum := 1; lineNum <= 5; lineNum++ { + fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error + if err := w.Flush(); err != nil { + t.Errorf("unexpected error: %s", err) + return + } + } + }) + require.NoError(t, err) + require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body())) + + err = c.SendStreamWriter(func(_ *bufio.Writer) {}) + require.NoError(t, err) + require.Empty(t, c.Response().Body()) +} + // go test -run Test_Ctx_Set func Test_Ctx_Set(t *testing.T) { t.Parallel() From c977b38ff1d7a31452bf3a39650d2d8c790780d5 Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Thu, 12 Sep 2024 17:42:04 -0700 Subject: [PATCH 03/10] =?UTF-8?q?=F0=9F=9A=A8=20Test:=20(WIP)=20Validate?= =?UTF-8?q?=20interrupted=20use=20of=20c.SendStreamWriter()?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adds Test_Ctx_SendStreamWriter_Interrupted to ctx_test.go - (Work-In-Progress) This test verifies that some data is still sent before a client disconnects when using the method `c.SendStreamWriter()`. **Note:** Running this test reports a race condition when using the `-race` flag or running `make test`. The test uses a channel and mutex to prevent race conditions, but still triggers a warning. --- ctx.go | 1 + ctx_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/ctx.go b/ctx.go index b17959374b..2628f5121d 100644 --- a/ctx.go +++ b/ctx.go @@ -46,6 +46,7 @@ const userContextKey contextKey = 0 // __local_user_context__ // DefaultCtx is the default implementation of the Ctx interface // generation tool `go install github.com/vburenin/ifacemaker@975a95966976eeb2d4365a7fb236e274c54da64c` // https://github.com/vburenin/ifacemaker/blob/975a95966976eeb2d4365a7fb236e274c54da64c/ifacemaker.go#L14-L30 +// //go:generate ifacemaker --file ctx.go --struct DefaultCtx --iface Ctx --pkg fiber --output ctx_interface_gen.go --not-exported true --iface-comment "Ctx represents the Context which hold the HTTP request and response.\nIt has methods for the request query string, parameters, body, HTTP headers and so on." type DefaultCtx struct { app *App // Reference to *App diff --git a/ctx_test.go b/ctx_test.go index 45b094b13c..937ff3d018 100644 --- a/ctx_test.go +++ b/ctx_test.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "testing" "text/template" "time" @@ -4356,6 +4357,44 @@ func Test_Ctx_SendStreamWriter(t *testing.T) { require.Empty(t, c.Response().Body()) } +// go test -run Test_Ctx_SendStreamWriter_Interrupted +func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) { + t.Parallel() + app := New() + c := app.AcquireCtx(&fasthttp.RequestCtx{}) + + var mutex sync.Mutex + startChan := make(chan bool) + interruptStreamWriter := func() { + <-startChan + time.Sleep(5 * time.Millisecond) + mutex.Lock() + c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error + mutex.Unlock() + } + err := c.SendStreamWriter(func(w *bufio.Writer) { + go interruptStreamWriter() + + startChan <- true + for lineNum := 1; lineNum <= 5; lineNum++ { + mutex.Lock() + fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error + mutex.Unlock() + + if err := w.Flush(); err != nil { + if lineNum < 3 { + t.Errorf("unexpected error: %s", err) + } + return + } + + time.Sleep(1500 * time.Microsecond) + } + }) + require.NoError(t, err) + require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body())) +} + // go test -run Test_Ctx_Set func Test_Ctx_Set(t *testing.T) { t.Parallel() From 024ac5e8313fb5eff640419006361f333d596ee6 Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Fri, 13 Sep 2024 17:36:10 -0700 Subject: [PATCH 04/10] =?UTF-8?q?=F0=9F=93=9A=20Doc:=20Add=20`SendStreamWr?= =?UTF-8?q?iter`=20to=20docs/api/ctx.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/api/ctx.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/api/ctx.md b/docs/api/ctx.md index 2640512a3e..9007137266 100644 --- a/docs/api/ctx.md +++ b/docs/api/ctx.md @@ -1871,6 +1871,47 @@ app.Get("/", func(c fiber.Ctx) error { }) ``` +## SendStreamWriter + +Sets the response body stream writer. + +:::note +The argument `streamWriter` represents a function that populates +the response body using a buffered stream writer. +::: + +```go title="Signature" +func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error +``` + +```go title="Example" +app.Get("/", func (c fiber.Ctx) error { + return c.SendStreamWriter(func(w *bufio.Writer) { + fmt.Fprintf(w, "Hello, World!\n") + }) + // => "Hello, World!" +}) +``` + +:::info +To flush data before the function returns, you can call `w.Flush()` +on the provided writer. Otherwise, the buffered stream flushes after +`streamWriter` returns. +::: + +```go title="Example" +app.Get("/wait", func(c fiber.Ctx) error { + return c.SendStreamWriter(func(w *bufio.Writer) { + fmt.Fprintf(w, "Waiting for 10 seconds\n") + if err := w.Flush(); err != nil { + log.Print("User quit early") + } + time.Sleep(10 * time.Second) + fmt.Fprintf(w, "Done!\n") + }) +}) +``` + ## Set Sets the response’s HTTP header field to the specified `key`, `value`. From 02b7ce2b6647a3686817db6c6960b0d0f8e8f0bf Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Sun, 24 Nov 2024 21:39:50 -0800 Subject: [PATCH 05/10] =?UTF-8?q?=F0=9F=A9=B9=20Fix:=20Remove=20race=20con?= =?UTF-8?q?dition=20in=20Test=5FCtx=5FSendStreamWriter=5FInterrupted?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ctx_test.go | 53 +++++++++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/ctx_test.go b/ctx_test.go index b8cebd7a4a..dcc44c9b0f 100644 --- a/ctx_test.go +++ b/ctx_test.go @@ -24,7 +24,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "testing" "text/template" "time" @@ -4481,38 +4480,36 @@ func Test_Ctx_SendStreamWriter(t *testing.T) { func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) { t.Parallel() app := New() - c := app.AcquireCtx(&fasthttp.RequestCtx{}) - - var mutex sync.Mutex - startChan := make(chan bool) - interruptStreamWriter := func() { - <-startChan - time.Sleep(5 * time.Millisecond) - mutex.Lock() - c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error - mutex.Unlock() - } - err := c.SendStreamWriter(func(w *bufio.Writer) { - go interruptStreamWriter() - - startChan <- true - for lineNum := 1; lineNum <= 5; lineNum++ { - mutex.Lock() - fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error - mutex.Unlock() + app.Get("/", func(c Ctx) error { + return c.SendStreamWriter(func(w *bufio.Writer) { + for lineNum := 1; lineNum <= 5; lineNum++ { + fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error - if err := w.Flush(); err != nil { - if lineNum < 3 { - t.Errorf("unexpected error: %s", err) + if err := w.Flush(); err != nil { + if lineNum < 3 { + t.Errorf("unexpected error: %s", err) + } + return } - return - } - time.Sleep(1500 * time.Microsecond) - } + time.Sleep(400 * time.Millisecond) + } + }) }) + + req := httptest.NewRequest(MethodGet, "/", nil) + testConfig := TestConfig{ + Timeout: 1 * time.Second, + FailOnTimeout: false, + } + resp, err := app.Test(req, testConfig) require.NoError(t, err) - require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body())) + + body, err := io.ReadAll(resp.Body) + t.Logf("%v", err) + require.EqualError(t, err, "unexpected EOF") + + require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(body)) } // go test -run Test_Ctx_Set From d82e3bcdf8ec7871079e66900bcb50d68c472c21 Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Sun, 24 Nov 2024 21:51:46 -0800 Subject: [PATCH 06/10] =?UTF-8?q?=F0=9F=8E=A8=20Styles:=20Update=20ctx=5Ft?= =?UTF-8?q?est.go=20to=20respect=20golangci-lint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ctx_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ctx_test.go b/ctx_test.go index dcc44c9b0f..71b9e86221 100644 --- a/ctx_test.go +++ b/ctx_test.go @@ -4483,7 +4483,7 @@ func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) { app.Get("/", func(c Ctx) error { return c.SendStreamWriter(func(w *bufio.Writer) { for lineNum := 1; lineNum <= 5; lineNum++ { - fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error + fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck // It is fine to ignore the error if err := w.Flush(); err != nil { if lineNum < 3 { From 17241211c6f26f4d004d49b652218f2240ed8d67 Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Mon, 25 Nov 2024 22:45:54 -0800 Subject: [PATCH 07/10] =?UTF-8?q?=F0=9F=93=9A=20Doc:=20Update=20/docs/api/?= =?UTF-8?q?ctx.md=20to=20show=20proper=20`w.Flush()`=20error=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/api/ctx.md | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/docs/api/ctx.md b/docs/api/ctx.md index 071543e590..88e6c142fe 100644 --- a/docs/api/ctx.md +++ b/docs/api/ctx.md @@ -1875,19 +1875,38 @@ app.Get("/", func (c fiber.Ctx) error { ``` :::info -To flush data before the function returns, you can call `w.Flush()` +To send data before `streamWriter` returns, you can call `w.Flush()` on the provided writer. Otherwise, the buffered stream flushes after `streamWriter` returns. ::: +:::note +`w.Flush()` will return an error if the client disconnects before `streamWriter` finishes writing a response. +::: + ```go title="Example" app.Get("/wait", func(c fiber.Ctx) error { return c.SendStreamWriter(func(w *bufio.Writer) { - fmt.Fprintf(w, "Waiting for 10 seconds\n") + // Begin Work + fmt.Fprintf(w, "Please wait for 10 seconds\n") if err := w.Flush(); err != nil { - log.Print("User quit early") + log.Print("Client disconnected!") + return } - time.Sleep(10 * time.Second) + + // Send progress over time + time.Sleep(time.Second) + for i := 0; i < 9; i++ { + fmt.Fprintf("Still waiting...\n") + if err := w.Flush(); err != nil { + // If client disconnected, cancel work and finish + log.Print("Client disconnected!") + return + } + time.Sleep(time.Second) + } + + // Finish fmt.Fprintf(w, "Done!\n") }) }) From 0b0729532a912a3127c100373aa7556eddd96f3c Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Mon, 25 Nov 2024 23:28:02 -0800 Subject: [PATCH 08/10] =?UTF-8?q?=F0=9F=93=9A=20Doc:=20Add=20SendStreamWri?= =?UTF-8?q?ter=20details=20to=20docs/whats=5Fnew.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/whats_new.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/docs/whats_new.md b/docs/whats_new.md index 8677a0080f..5f545831c2 100644 --- a/docs/whats_new.md +++ b/docs/whats_new.md @@ -268,6 +268,7 @@ DRAFT section - Reset - Schema -> ExpressJs like - SendStream -> ExpressJs like +- SendStreamWriter - SendString -> ExpressJs like - String -> ExpressJs like - ViewBind -> instead of Bind @@ -296,6 +297,42 @@ DRAFT section - UserContext has been renamed to Context which returns a context.Context object. - SetUserContext has been renamed to SetContext. +### SendStreamWriter + +In v3, we added support for buffered streaming by providing the new method `SendStreamWriter()`. + +```go +func (c Ctx) SendStreamWriter(streamWriter func(w *bufio.Writer)) +``` + +With this new method, you can implement: +- Server-Side Events (SSE) +- Large file downloads +- Live data streaming + +```go +app.Get("/sse", func(c fiber.Ctx) { + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + c.Set("Transfer-Encoding", "chunked") + + return c.SendStreamWriter(func(w *bufio.Writer) { + for { + fmt.Fprintf(w, "event: my-event\n") + fmt.Fprintf(w, "data: Hello SSE\n\n") + + if err := w.Flush(); err != nil { + log.Print("Client disconnected!") + return + } + } + }) +}) +``` + +You can find more details about this feature in [/docs/api/ctx.md](./api/ctx.md). + --- ## 🌎 Client package From c59764e7beedbe0eb8b60b7ecb8db79aa2623e04 Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Mon, 25 Nov 2024 23:32:57 -0800 Subject: [PATCH 09/10] =?UTF-8?q?=F0=9F=8E=A8=20Styles:=20Update=20/docs/w?= =?UTF-8?q?hats=5Fnew.md=20to=20respect=20markdownlint-cli2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/whats_new.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/whats_new.md b/docs/whats_new.md index 5f545831c2..a222a65ce5 100644 --- a/docs/whats_new.md +++ b/docs/whats_new.md @@ -306,6 +306,7 @@ func (c Ctx) SendStreamWriter(streamWriter func(w *bufio.Writer)) ``` With this new method, you can implement: + - Server-Side Events (SSE) - Large file downloads - Live data streaming From 76dee5702119bebd6c7fc41aefbbd808cd9656c3 Mon Sep 17 00:00:00 2001 From: Giovanni Rivera Date: Tue, 26 Nov 2024 00:28:40 -0800 Subject: [PATCH 10/10] =?UTF-8?q?=F0=9F=A9=B9=20Fix:=20Fix=20Fprintf=20syn?= =?UTF-8?q?tax=20error=20in=20docs/whats=5Fnew.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/api/ctx.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/api/ctx.md b/docs/api/ctx.md index 88e6c142fe..2171a0ba80 100644 --- a/docs/api/ctx.md +++ b/docs/api/ctx.md @@ -1897,7 +1897,7 @@ app.Get("/wait", func(c fiber.Ctx) error { // Send progress over time time.Sleep(time.Second) for i := 0; i < 9; i++ { - fmt.Fprintf("Still waiting...\n") + fmt.Fprintf(w, "Still waiting...\n") if err := w.Flush(); err != nil { // If client disconnected, cancel work and finish log.Print("Client disconnected!")