Skip to content

Commit 4d26a50

Browse files
committed
add amortized flushing to client, increase client channel buffer size
1 parent b1f2d9a commit 4d26a50

File tree

1 file changed

+28
-10
lines changed

1 file changed

+28
-10
lines changed

client.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ import (
44
"io"
55
"net/http"
66
"sync"
7+
"time"
78
)
89

910
// Client wraps an http connection and converts it to an
1011
// event stream.
1112
type Client struct {
12-
flush http.Flusher
13-
write io.Writer
14-
close http.CloseNotifier
15-
events chan *Event
16-
closed bool
17-
waiter sync.WaitGroup
13+
flush http.Flusher
14+
write io.Writer
15+
close http.CloseNotifier
16+
events chan *Event
17+
closed bool
18+
waiter sync.WaitGroup
19+
lastFlush time.Time
20+
lastWrite time.Time
1821
}
1922

2023
// NewClient creates a client wrapping a response writer.
@@ -26,7 +29,7 @@ type Client struct {
2629
// Returns nil on error.
2730
func NewClient(w http.ResponseWriter, req *http.Request) *Client {
2831
c := &Client{
29-
events: make(chan *Event, 1),
32+
events: make(chan *Event, 100),
3033
write: w,
3134
}
3235

@@ -55,6 +58,7 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client {
5558
// start the sending thread
5659
c.waiter.Add(1)
5760
go c.run()
61+
go c.flusher()
5862
return c
5963
}
6064

@@ -84,7 +88,6 @@ func (c *Client) Wait() {
8488

8589
// Worker thread for the client responsible for writing events
8690
func (c *Client) run() {
87-
8891
for {
8992
select {
9093
case ev, ok := <-c.events:
@@ -97,13 +100,28 @@ func (c *Client) run() {
97100

98101
// send the event
99102
io.Copy(c.write, ev)
100-
c.flush.Flush()
103+
c.lastWrite = time.Now()
101104

102-
case _ = <-c.close.CloseNotify():
105+
case <-c.close.CloseNotify():
103106
c.closed = true
104107
c.waiter.Done()
105108
return
106109
}
107110

108111
}
109112
}
113+
114+
// flusher amortizes flushing costs for high activity SSE channels
115+
func (c *Client) flusher() {
116+
ticker := time.NewTicker(100 * time.Millisecond)
117+
118+
for !c.closed {
119+
<-ticker.C
120+
if c.lastFlush.Before(c.lastWrite) {
121+
c.lastFlush = c.lastWrite
122+
c.flush.Flush()
123+
}
124+
}
125+
126+
ticker.Stop()
127+
}

0 commit comments

Comments
 (0)