Skip to content

Commit 4012178

Browse files
author
Michael Lauer
committed
streamWriter
1 parent 6dc69ee commit 4012178

File tree

4 files changed

+116
-3
lines changed

4 files changed

+116
-3
lines changed

protocol.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,11 @@ func readRecord(r io.Reader) (record, error) {
113113
if _, err := r.Read(pad[:1]); err != nil {
114114
return rec, err
115115
}
116-
rec.Content = make([]byte, clength)
117-
if _, err := r.Read(rec.Content); err != nil {
118-
return rec, err
116+
if clength != 0 {
117+
rec.Content = make([]byte, clength)
118+
if _, err := r.Read(rec.Content); err != nil {
119+
return rec, err
120+
}
119121
}
120122
if plength != 0 {
121123
if _, err := r.Read(pad[:plength]); err != nil {

protocol_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ func TestWriteRecord(t *testing.T) {
2828
},
2929
[]byte{1, 3, 0, 1, 0, 8, 0, 0},
3030
},
31+
{
32+
record{
33+
Type: 3,
34+
Id: 1,
35+
Content: nil,
36+
},
37+
[]byte{1, 3, 0, 1, 0, 0, 0, 0},
38+
},
3139
}
3240
for _, d := range data {
3341
rec := d.rec

streams.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package gofcgisrv
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"sync"
7+
)
8+
9+
type streamReader struct {
10+
buffer bytes.Buffer
11+
lock sync.Mutex
12+
gotData *sync.Cond
13+
err error
14+
}
15+
16+
func newStreamReader() *streamReader {
17+
s := new(streamReader)
18+
s.gotData = sync.NewCond(&s.lock)
19+
return s
20+
}
21+
22+
func (sr *streamReader) Read(data []byte) (int, error) {
23+
sr.lock.Lock()
24+
defer sr.lock.Unlock()
25+
// Wait for something to show up
26+
for sr.buffer.Len() == 0 && sr.err == nil {
27+
sr.gotData.Wait()
28+
}
29+
if sr.buffer.Len() == 0 {
30+
return 0, sr.err
31+
}
32+
return sr.buffer.Read(data)
33+
}
34+
35+
func (sr *streamReader) Write(data []byte) (int, error) {
36+
sr.lock.Lock()
37+
defer sr.lock.Unlock()
38+
if sr.err == nil {
39+
n, err := sr.buffer.Write(data)
40+
sr.gotData.Signal()
41+
return n, err
42+
}
43+
return 0, sr.err
44+
}
45+
46+
func (sr *streamReader) Close() error {
47+
sr.lock.Lock()
48+
defer sr.lock.Unlock()
49+
sr.err = io.EOF
50+
sr.gotData.Signal()
51+
return nil
52+
}

streams_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package gofcgisrv
2+
3+
import (
4+
"io"
5+
"io/ioutil"
6+
"strings"
7+
"testing"
8+
"time"
9+
)
10+
11+
func TestStreamReader(t *testing.T) {
12+
sr := newStreamReader()
13+
io.WriteString(sr, "abc")
14+
data := make([]byte, 1024)
15+
n, err := sr.Read(data)
16+
if err != nil {
17+
t.Errorf("Read error %v", err)
18+
}
19+
if string(data[:n]) != "abc" {
20+
t.Errorf("Read %s, not %s", data[:n], "abc")
21+
}
22+
23+
ch := make(chan string)
24+
go func() {
25+
n, err := sr.Read(data)
26+
if err != nil {
27+
t.Error(err)
28+
}
29+
ch <- string(data[:n])
30+
}()
31+
// Let the goroutine execute a bit
32+
time.Sleep(time.Millisecond)
33+
go func() {
34+
sr.Write([]byte("ABCD"))
35+
}()
36+
str := <-ch
37+
if str != "ABCD" {
38+
t.Errorf("Read %s, not %s", str, "ABCD")
39+
}
40+
41+
ss := []string{"foo", "bar", strings.Repeat("xyzå", 100)}
42+
for _, s := range ss {
43+
sr.Write([]byte(s))
44+
}
45+
go sr.Close()
46+
ssj := strings.Join(ss, "")
47+
read, err := ioutil.ReadAll(sr)
48+
if err != nil || string(read) != ssj {
49+
t.Errorf("Read %s with error %v", read, err)
50+
}
51+
}

0 commit comments

Comments
 (0)