diff --git a/token_test.go b/token_test.go index 49c63c5d..2c2fc412 100644 --- a/token_test.go +++ b/token_test.go @@ -8,7 +8,9 @@ import ( "io" "regexp" "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -467,3 +469,83 @@ func TestNextToken_CancelDrainClosedChannelStartsSecondResponse(t *testing.T) { t.Fatal("expected attention packet to be written") } } + +// blockingTransport blocks Read until unblock is closed, then returns EOF. +// It signals readEntered when a Read call begins, allowing deterministic +// synchronization of the start of Read. +type blockingTransport struct { + unblock chan struct{} + readEntered chan struct{} +} + +func (b *blockingTransport) Read([]byte) (int, error) { + select { + case b.readEntered <- struct{}{}: + default: + } + <-b.unblock + return 0, io.EOF +} + +func (b *blockingTransport) Write(p []byte) (int, error) { return len(p), nil } +func (b *blockingTransport) Close() error { return nil } + +// TestStartResponseReaderSerializes verifies that startResponseReader waits for +// the previous goroutine to finish before launching a new one. +func TestStartResponseReaderSerializes(t *testing.T) { + // First reader: transport blocks until we say so. + readEntered := make(chan struct{}, 1) + unblock := make(chan struct{}) + var closeOnce sync.Once + closeUnblock := func() { closeOnce.Do(func() { close(unblock) }) } + t.Cleanup(closeUnblock) + sess := &tdsSession{ + buf: newTdsBuffer(defaultPacketSize, &blockingTransport{ + unblock: unblock, + readEntered: readEntered, + }), + } + + ch1 := make(chan tokenStruct, 10) + sess.startResponseReader(context.Background(), ch1, outputs{}) + + // Wait for the first goroutine to actually enter Read, proving it's blocked. + select { + case <-readEntered: + case <-time.After(5 * time.Second): + t.Fatal("first reader never entered Read") + } + + // Launch second startResponseReader in a goroutine; it should block on + // <-sess.readDone until the first reader finishes. + var secondStarted atomic.Int32 + ch2 := make(chan tokenStruct, 10) + goroutineStarted := make(chan struct{}) + go func() { + close(goroutineStarted) + sess.startResponseReader(context.Background(), ch2, outputs{}) + secondStarted.Store(1) + }() + + // Wait for the goroutine to be scheduled and reach startResponseReader. + <-goroutineStarted + time.Sleep(100 * time.Millisecond) + + // The second goroutine cannot proceed while the first reader is blocked, + // so secondStarted must still be 0. + if secondStarted.Load() != 0 { + t.Fatal("second startResponseReader returned before first completed") + } + + // Unblock the first reader. processSingleResponse will receive EOF from + // BeginRead as an error, send it to ch1, and return, closing readDone. + closeUnblock() + + // Second call should now proceed. + select { + case <-time.After(5 * time.Second): + t.Fatal("second startResponseReader did not return after first completed") + case <-ch2: + // Expected: second reader started and wrote (or closed) ch2. + } +}