@@ -1914,6 +1914,45 @@ t('Copy read', async() => {
1914
1914
]
1915
1915
} )
1916
1916
1917
+ t ( 'Copy read with back-pressure' , async ( ) => {
1918
+ await sql `create table test (x int)`
1919
+
1920
+ // Make sure there are enough rows in the table to fill the buffer
1921
+ // so that `CopyDone` message is handled while the socket is paused
1922
+ await sql `insert into test select * from generate_series(1,12774)`
1923
+
1924
+ let result = 0
1925
+ const readable = await sql `copy test to stdout` . readable ( )
1926
+ readable . on ( 'data' , _ => result ++ )
1927
+
1928
+ // Pause the stream so that the entire buffer fills up
1929
+ readable . pause ( )
1930
+
1931
+ await Promise . all ( [
1932
+ // Wait until the stream has been consumed
1933
+ new Promise ( r => readable . on ( 'end' , r ) ) ,
1934
+ ( async ( ) => {
1935
+ // Wait until the entire buffer fills up,
1936
+ await new Promise ( r => readable . on ( 'readable' , ( ) => {
1937
+ if ( readable . readableBuffer . length === 12774 )
1938
+ r ( )
1939
+ } ) )
1940
+ // Switch the stream back to flowing mode (allowing it to be consumed)
1941
+ readable . removeAllListeners ( 'readable' )
1942
+ } ) ( )
1943
+ ] )
1944
+
1945
+ // This is the actual test, the copy stream is done
1946
+ // we should be able to run a new query
1947
+ await sql `SELECT 1`
1948
+
1949
+ return [
1950
+ result ,
1951
+ 12774 ,
1952
+ await sql `drop table test`
1953
+ ]
1954
+ } )
1955
+
1917
1956
t ( 'Copy write' , { timeout : 2 } , async ( ) => {
1918
1957
await sql `create table test (x int)`
1919
1958
const writable = await sql `copy test from stdin` . writable ( )
0 commit comments