|
8 | 8 | "io" |
9 | 9 | "regexp" |
10 | 10 | "sync" |
| 11 | + "sync/atomic" |
11 | 12 | "testing" |
| 13 | + "time" |
12 | 14 |
|
13 | 15 | "github.com/stretchr/testify/assert" |
14 | 16 | ) |
@@ -468,46 +470,56 @@ func TestNextToken_CancelDrainClosedChannelStartsSecondResponse(t *testing.T) { |
468 | 470 | } |
469 | 471 | } |
470 | 472 |
|
| 473 | +// blockingTransport blocks Read until unblock is closed, then returns EOF. |
| 474 | +type blockingTransport struct { |
| 475 | + unblock chan struct{} |
| 476 | +} |
| 477 | + |
| 478 | +func (b *blockingTransport) Read([]byte) (int, error) { |
| 479 | + <-b.unblock |
| 480 | + return 0, io.EOF |
| 481 | +} |
| 482 | + |
| 483 | +func (b *blockingTransport) Write(p []byte) (int, error) { return len(p), nil } |
| 484 | +func (b *blockingTransport) Close() error { return nil } |
| 485 | + |
471 | 486 | // TestStartResponseReaderSerializes verifies that startResponseReader waits for |
472 | 487 | // the previous goroutine to finish before launching a new one. |
473 | 488 | func TestStartResponseReaderSerializes(t *testing.T) { |
474 | | - // Build a minimal TDS reply with just a DONE(final) token. |
475 | | - buildDonePacket := func() []byte { |
476 | | - tokenStream := []byte{ |
477 | | - byte(tokenDone), 0x00, 0x00, 0x00, 0x00, |
478 | | - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
479 | | - } |
480 | | - totalSize := 8 + len(tokenStream) |
481 | | - packet := make([]byte, totalSize) |
482 | | - packet[0] = byte(packReply) |
483 | | - packet[1] = 0x01 |
484 | | - binary.BigEndian.PutUint16(packet[2:4], uint16(totalSize)) |
485 | | - packet[6] = 0x01 |
486 | | - copy(packet[8:], tokenStream) |
487 | | - return packet |
488 | | - } |
489 | | - |
490 | | - // First call: launch a reader and let it finish. |
491 | | - pkt1 := buildDonePacket() |
| 489 | + // First reader: transport blocks until we say so. |
| 490 | + unblock := make(chan struct{}) |
492 | 491 | sess := &tdsSession{ |
493 | | - buf: newTdsBuffer(defaultPacketSize, closableBuffer{bytes.NewBuffer(pkt1)}), |
| 492 | + buf: newTdsBuffer(defaultPacketSize, &blockingTransport{unblock: unblock}), |
494 | 493 | } |
| 494 | + |
495 | 495 | ch1 := make(chan tokenStruct, 10) |
496 | 496 | sess.startResponseReader(context.Background(), ch1, outputs{}) |
497 | | - for range ch1 { |
498 | | - } |
| 497 | + // First goroutine is now blocked inside processSingleResponse on Read. |
499 | 498 |
|
500 | | - // Second call: replace the transport and launch another reader. |
501 | | - // startResponseReader must wait for the first goroutine (already done) |
502 | | - // before starting the second. If it doesn't serialize, the second read |
503 | | - // would race on sess.buf. |
504 | | - pkt2 := buildDonePacket() |
505 | | - sess.buf = newTdsBuffer(defaultPacketSize, closableBuffer{bytes.NewBuffer(pkt2)}) |
| 499 | + // Launch second startResponseReader in a goroutine; it should block on |
| 500 | + // <-sess.readDone until the first reader finishes. |
| 501 | + var secondStarted atomic.Int32 |
506 | 502 | ch2 := make(chan tokenStruct, 10) |
507 | | - sess.startResponseReader(context.Background(), ch2, outputs{}) |
508 | | - for range ch2 { |
| 503 | + go func() { |
| 504 | + sess.startResponseReader(context.Background(), ch2, outputs{}) |
| 505 | + secondStarted.Store(1) |
| 506 | + }() |
| 507 | + |
| 508 | + // Give the goroutine time to reach the <-sess.readDone wait. |
| 509 | + time.Sleep(50 * time.Millisecond) |
| 510 | + if secondStarted.Load() != 0 { |
| 511 | + t.Fatal("second startResponseReader returned before first completed") |
509 | 512 | } |
510 | 513 |
|
511 | | - // If we reach here without a panic or race, serialization is working. |
512 | | -} |
| 514 | + // Unblock the first reader. processSingleResponse will hit EOF, recover |
| 515 | + // from the panic, and close ch1 + readDone. |
| 516 | + close(unblock) |
| 517 | + |
| 518 | + // Second call should now proceed. |
| 519 | + select { |
| 520 | + case <-time.After(5 * time.Second): |
| 521 | + t.Fatal("second startResponseReader did not return after first completed") |
| 522 | + case <-ch2: |
| 523 | + // Expected: second reader started and wrote (or closed) ch2. |
| 524 | + } |
513 | 525 | } |
0 commit comments