Skip to content

[WIP] Buffering #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions adapter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpcompression // import "github.com/CAFxX/httpcompression"

import (
"bufio"
"compress/gzip"
"fmt"
"net/http"
Expand Down Expand Up @@ -35,6 +36,9 @@ const (
// In general there can be no one-size-fits-all value: you will want to measure if a different
// minimum size improves end-to-end performance for your workloads.
DefaultMinSize = 200

DefaultBufferSize = 4096
minBufferSize = 512
)

// Adapter returns a HTTP handler wrapping function (a.k.a. middleware)
Expand Down Expand Up @@ -64,6 +68,10 @@ func Adapter(opts ...Option) (func(http.Handler) http.Handler, error) {
}, nil
}

if c.bufferSize < minBufferSize {
c.bufferSize = minBufferSize
}

bufPool := &sync.Pool{}
writerPool := &sync.Pool{}

Expand Down Expand Up @@ -92,23 +100,26 @@ func Adapter(opts ...Option) (func(http.Handler) http.Handler, error) {
gw, _ := writerPool.Get().(*compressWriter)
if gw == nil {
gw = &compressWriter{}
gw.bw = *bufio.NewWriterSize(skipBuffer{gw}, c.bufferSize)
}
*gw = compressWriter{
ResponseWriter: w,
config: c,
accept: accept,
common: common,
pool: bufPool,
bw: gw.bw,
}
defer func() {
// Important: gw.Close() must be called *always*, as this will
// in turn Close() the compressor. This is important because
// it is guaranteed by the CompressorProvider interface, and
// because some compressors may be implemented via cgo, and they
// may rely on Close() being called to release memory resources.
// TODO: expose the error
_ = gw.Close() // expose the error
*gw = compressWriter{}
_ = gw.Close() // TODO: expose the error
*gw = compressWriter{
bw: gw.bw,
}
writerPool.Put(gw)
}()

Expand Down Expand Up @@ -145,6 +156,7 @@ func DefaultAdapter(opts ...Option) (func(http.Handler) http.Handler, error) {
BrotliCompressionLevel(brotli.DefaultCompression),
defaultZstandardCompressor(),
MinSize(DefaultMinSize),
BufferSize(DefaultBufferSize),
}
opts = append(defaults, opts...)
return Adapter(opts...)
Expand All @@ -157,6 +169,7 @@ type config struct {
blacklist bool
prefer PreferType
compressor comps
bufferSize int
}

type comps map[string]comp
Expand All @@ -181,6 +194,16 @@ func MinSize(size int) Option {
}
}

func BufferSize(size int) Option {
return func(c *config) error {
if size < 0 {
return fmt.Errorf("buffer size can not be negative: %d", size)
}
c.bufferSize = size
return nil
}
}

// DeflateCompressionLevel is an option that controls the Deflate compression
// level to be used when compressing payloads.
// The default is flate.DefaultCompression.
Expand Down
5 changes: 2 additions & 3 deletions adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,6 @@ func TestGzipHandlerDoubleWriteHeader(t *testing.T) {
}
assert.Empty(t, body)
header := rec.Header()
assert.Equal(t, "gzip", header.Get("Content-Encoding"))
assert.Equal(t, "Accept-Encoding", header.Get("Vary"))
assert.Equal(t, 304, rec.Code)
}
Expand Down Expand Up @@ -1015,8 +1014,8 @@ func TestWriteStringNoCompressionDynamic(t *testing.T) {
t.Run("WriteString", func(t *testing.T) {
w := &discardResponseWriterWithWriteString{}
h.ServeHTTP(w, r)
if w.s != len(testBody) || w.b != int64(len(testBody)) { // first WriteString falls back to Write
t.Fatalf("WriteString not called: %+v", w)
if w.b != int64(len(testBody))*2 {
t.Fatalf("Write not called: %+v", w)
}
})
t.Run("Write", func(t *testing.T) {
Expand Down
39 changes: 18 additions & 21 deletions response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type compressWriter struct {
enc string
code int // Saves the WriteHeader value.
buf *[]byte // Holds the first part of the write before reaching the minSize or the end of the write.

bw bufio.Writer
}

var (
Expand Down Expand Up @@ -53,7 +55,7 @@ var (
const maxBuf = 1 << 16 // maximum size of recycled buffer

// Write compresses and appends the given byte slice to the underlying ResponseWriter.
func (w *compressWriter) Write(b []byte) (int, error) {
func (w *compressWriter) write(b []byte) (int, error) {
if w.w != nil {
// The responseWriter is already initialized: use it.
return w.w.Write(b)
Expand Down Expand Up @@ -126,25 +128,15 @@ func (w *compressWriter) Write(b []byte) (int, error) {
return len(b), nil
}

// WriteString compresses and appends the given string to the underlying ResponseWriter.
//
// This makes use of an optional method (WriteString) exposed by the compressors, or by
// the underlying ResponseWriter.
func (w *compressWriter) WriteString(s string) (int, error) {
// Since WriteString is an optional interface of the compressor, and the actual compressor
// is chosen only after the first call to Write, we can't statically know whether the interface
// is supported. We therefore have to check dynamically.
if ws, _ := w.w.(io.StringWriter); ws != nil {
// The responseWriter is already initialized and it implements WriteString.
return ws.WriteString(s)
}
// Fallback: the writer has not been initialized yet, or it has been initialized
// and it does not implement WriteString. We could in theory do something unsafe
// here but for now let's keep it simple and fallback to Write.
// TODO: in case the string is large, we should avoid allocating a full copy:
// instead we should copy the string in chunks.
return w.Write([]byte(s))
}
type skipBuffer struct{ cw *compressWriter }

func (w skipBuffer) Write(b []byte) (int, error) { return w.cw.write(b) }

func (w *compressWriter) WriteString(s string) (int, error) { return w.bw.WriteString(s) }
func (w *compressWriter) Write(b []byte) (int, error) { return w.bw.Write(b) }
func (w *compressWriter) WriteRune(r rune) (int, error) { return w.bw.WriteRune(r) }
func (w *compressWriter) WriteByte(c byte) error { return w.bw.WriteByte(c) }
func (w *compressWriter) ReadFrom(r io.Reader) (int64, error) { return w.bw.ReadFrom(r) }

// startCompress initializes a compressing writer and writes the buffer.
func (w *compressWriter) startCompress(enc string, buf []byte) error {
Expand Down Expand Up @@ -229,6 +221,8 @@ func (w *compressWriter) WriteHeader(code int) {

// Close closes the compression Writer.
func (w *compressWriter) Close() error {
w.bw.Flush()

if w.w != nil && w.enc == "" {
return nil
}
Expand Down Expand Up @@ -257,12 +251,15 @@ func (w *compressWriter) Close() error {
// response should be compressed or not (e.g. less than MinSize bytes have
// been written).
func (w *compressWriter) Flush() {
if w.w == nil {
if w.w == nil || (w.w == nil && w.bw.Buffered() < minBufferSize) {
// Flush is thus a no-op until we're certain whether a plain
// or compressed response will be served.
return
}

// Flush the bufio.Writer.
w.bw.Flush()

// Flush the compressor, if supported.
// note: http.ResponseWriter does not implement Flusher (http.Flusher does not return an error),
// so we need to later call ResponseWriter.Flush anyway:
Expand Down
Loading