Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔥 Feature (v3): Add buffered streaming support #3131

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package fiber

import (
"bufio"
"bytes"
"context"
"crypto/tls"
Expand Down Expand Up @@ -1671,6 +1672,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error {
return nil
}

// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
Comment on lines +1675 to +1676
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add comprehensive documentation for the new public API method.

The method lacks proper documentation. As this is a new public API method for buffered streaming support, it should include:

  • A description of its purpose and use cases (e.g., SSE, large file downloads)
  • Parameter explanation
  • Return value description
  • Usage examples
  • Any important notes about concurrency or memory management

Add documentation like this:

+// SendStreamWriter sets the response body stream writer for sending 
+// chunked data through a callback function.
+//
+// The callback function is called when the response is being written to the client.
+// This allows for efficient memory usage as data can be written in chunks.
+//
+// Perfect for:
+// - Server-Sent Events (SSE)
+// - Large file downloads
+// - Real-time data streaming
+//
+// Example:
+//
+//	app.Get("/", func(c fiber.Ctx) error {
+//		return c.SendStreamWriter(func(w *bufio.Writer) {
+//			fmt.Fprintf(w, "Hello, World!")
+//			w.Flush()
+//		})
+//	})
 // SendStreamWriter sets response body stream writer
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
// SendStreamWriter sets the response body stream writer for sending
// chunked data through a callback function.
//
// The callback function is called when the response is being written to the client.
// This allows for efficient memory usage as data can be written in chunks.
//
// Perfect for:
// - Server-Sent Events (SSE)
// - Large file downloads
// - Real-time data streaming
//
// Example:
//
// app.Get("/", func(c fiber.Ctx) error {
// return c.SendStreamWriter(func(w *bufio.Writer) {
// fmt.Fprintf(w, "Hello, World!")
// w.Flush()
// })
// })
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add an example/recipe about this feature after the PR get merged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efectn, that's a great idea. Adding an example or recipe about this feature after the PR is merged will be beneficial for users. Would you like me to open a new GitHub issue to track this follow-up action?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines +1675 to +1676
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add comprehensive documentation for the new public API method.

As this is a new public API method for buffered streaming support, it should include:

  • A description of its purpose and use cases (e.g., SSE, large file downloads)
  • Parameter explanation
  • Example usage with SSE
  • Notes about concurrency and memory management

Apply this diff to improve the documentation:

-// SendStreamWriter sets response body stream writer
+// SendStreamWriter sets the response body stream writer for sending chunked data through a callback function.
+//
+// The callback function is called when the response is being written to the client.
+// This allows for efficient memory usage as data can be written in chunks.
+//
+// Perfect for:
+// - Server-Sent Events (SSE)
+// - Large file downloads
+// - Real-time data streaming
+//
+// Example:
+//  app.Get("/sse", func(c fiber.Ctx) error {
+//    c.Set("Content-Type", "text/event-stream")
+//    c.Set("Cache-Control", "no-cache")
+//    c.Set("Connection", "keep-alive")
+//    c.Set("Transfer-Encoding", "chunked")
+//
+//    return c.SendStreamWriter(func(w *bufio.Writer) {
+//      fmt.Fprintf(w, "event: message\ndata: Hello SSE\n\n")
+//      w.Flush()
+//    })
+//  })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
// SendStreamWriter sets the response body stream writer for sending chunked data through a callback function.
//
// The callback function is called when the response is being written to the client.
// This allows for efficient memory usage as data can be written in chunks.
//
// Perfect for:
// - Server-Sent Events (SSE)
// - Large file downloads
// - Real-time data streaming
//
// Example:
// app.Get("/sse", func(c fiber.Ctx) error {
// c.Set("Content-Type", "text/event-stream")
// c.Set("Cache-Control", "no-cache")
// c.Set("Connection", "keep-alive")
// c.Set("Transfer-Encoding", "chunked")
//
// return c.SendStreamWriter(func(w *bufio.Writer) {
// fmt.Fprintf(w, "event: message\ndata: Hello SSE\n\n")
// w.Flush()
// })
// })
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {

c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter))

return nil
}

// Set sets the response's HTTP header field to the specified key, value.
func (c *DefaultCtx) Set(key, val string) {
c.fasthttp.Response.Header.Set(key, val)
Expand Down
3 changes: 3 additions & 0 deletions ctx_interface_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -4447,6 +4448,73 @@ func Test_Ctx_SendStream(t *testing.T) {
require.Equal(t, "Hello bufio", string(c.Response().Body()))
}

// go test -run Test_Ctx_SendStreamWriter
func Test_Ctx_SendStreamWriter(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

err := c.SendStreamWriter(func(w *bufio.Writer) {
w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
ReneWerner87 marked this conversation as resolved.
Show resolved Hide resolved
})
require.NoError(t, err)
require.Equal(t, "Don't crash please", string(c.Response().Body()))

err = c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
ReneWerner87 marked this conversation as resolved.
Show resolved Hide resolved
if err := w.Flush(); err != nil {
t.Errorf("unexpected error: %s", err)
return
}
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body()))

err = c.SendStreamWriter(func(_ *bufio.Writer) {})
require.NoError(t, err)
require.Empty(t, c.Response().Body())
}

// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

var mutex sync.Mutex
startChan := make(chan bool)
interruptStreamWriter := func() {
<-startChan
time.Sleep(5 * time.Millisecond)
mutex.Lock()
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
ReneWerner87 marked this conversation as resolved.
Show resolved Hide resolved
mutex.Unlock()
}
err := c.SendStreamWriter(func(w *bufio.Writer) {
go interruptStreamWriter()

startChan <- true
for lineNum := 1; lineNum <= 5; lineNum++ {
mutex.Lock()
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
ReneWerner87 marked this conversation as resolved.
Show resolved Hide resolved
mutex.Unlock()

if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}

time.Sleep(1500 * time.Microsecond)
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
}

// go test -run Test_Ctx_Set
func Test_Ctx_Set(t *testing.T) {
t.Parallel()
Expand Down
41 changes: 41 additions & 0 deletions docs/api/ctx.md
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,47 @@ app.Get("/", func(c fiber.Ctx) error {
})
```

## SendStreamWriter
grivera64 marked this conversation as resolved.
Show resolved Hide resolved

Sets the response body stream writer.

:::note
The argument `streamWriter` represents a function that populates
the response body using a buffered stream writer.
:::

```go title="Signature"
func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error
```

```go title="Example"
app.Get("/", func (c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Hello, World!\n")
})
// => "Hello, World!"
})
```
gaby marked this conversation as resolved.
Show resolved Hide resolved

:::info
To flush data before the function returns, you can call `w.Flush()`
on the provided writer. Otherwise, the buffered stream flushes after
`streamWriter` returns.
:::

```go title="Example"
app.Get("/wait", func(c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Waiting for 10 seconds\n")
if err := w.Flush(); err != nil {
log.Print("User quit early")
}
time.Sleep(10 * time.Second)
fmt.Fprintf(w, "Done!\n")
})
})
```
gaby marked this conversation as resolved.
Show resolved Hide resolved

## Set

Sets the response’s HTTP header field to the specified `key`, `value`.
Expand Down
Loading