Skip to content
76 changes: 76 additions & 0 deletions token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"io"
"regexp"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -467,3 +469,77 @@ func TestNextToken_CancelDrainClosedChannelStartsSecondResponse(t *testing.T) {
t.Fatal("expected attention packet to be written")
}
}

// blockingTransport blocks Read until unblock is closed, then returns EOF.
// It signals readEntered when a Read call begins, allowing deterministic
// synchronization without time.Sleep.
Comment thread
dlevy-msft-sql marked this conversation as resolved.
Outdated
type blockingTransport struct {
unblock chan struct{}
readEntered chan struct{}
}

func (b *blockingTransport) Read([]byte) (int, error) {
select {
case b.readEntered <- struct{}{}:
default:
}
<-b.unblock
return 0, io.EOF
}

func (b *blockingTransport) Write(p []byte) (int, error) { return len(p), nil }
func (b *blockingTransport) Close() error { return nil }

// TestStartResponseReaderSerializes verifies that startResponseReader waits for
// the previous goroutine to finish before launching a new one.
func TestStartResponseReaderSerializes(t *testing.T) {
// First reader: transport blocks until we say so.
readEntered := make(chan struct{}, 1)
unblock := make(chan struct{})
var closeOnce sync.Once
closeUnblock := func() { closeOnce.Do(func() { close(unblock) }) }
t.Cleanup(closeUnblock)
sess := &tdsSession{
buf: newTdsBuffer(defaultPacketSize, &blockingTransport{
unblock: unblock,
readEntered: readEntered,
}),
}

ch1 := make(chan tokenStruct, 10)
sess.startResponseReader(context.Background(), ch1, outputs{})

// Wait for the first goroutine to actually enter Read, proving it's blocked.
select {
case <-readEntered:
Comment thread
dlevy-msft-sql marked this conversation as resolved.
case <-time.After(5 * time.Second):
t.Fatal("first reader never entered Read")
}
Comment thread
dlevy-msft-sql marked this conversation as resolved.

// Launch second startResponseReader in a goroutine; it should block on
// <-sess.readDone until the first reader finishes.
var secondStarted atomic.Int32
ch2 := make(chan tokenStruct, 10)
go func() {
sess.startResponseReader(context.Background(), ch2, outputs{})
secondStarted.Store(1)
}()

// The second goroutine cannot proceed while the first reader is blocked,
// so secondStarted must still be 0.
if secondStarted.Load() != 0 {
t.Fatal("second startResponseReader returned before first completed")
}
Comment thread
dlevy-msft-sql marked this conversation as resolved.
Comment thread
dlevy-msft-sql marked this conversation as resolved.

// Unblock the first reader. processSingleResponse will receive EOF from
// BeginRead as an error, send it to ch1, and return, closing readDone.
closeUnblock()

// Second call should now proceed.
Comment thread
dlevy-msft-sql marked this conversation as resolved.
select {
case <-time.After(5 * time.Second):
t.Fatal("second startResponseReader did not return after first completed")
case <-ch2:
// Expected: second reader started and wrote (or closed) ch2.
}
}
Comment thread
dlevy-msft-sql marked this conversation as resolved.