Skip to content

Commit 35e31b4

Browse files
committed
chore: commit built files
1 parent ea1737f commit 35e31b4

File tree

5 files changed

+81
-0
lines changed

5 files changed

+81
-0
lines changed

cf/src/connection.js

+1
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
886886
function CopyDone() {
887887
stream && stream.push(null)
888888
stream = null
889+
socket.isPaused() && socket.resume()
889890
}
890891

891892
function NoticeResponse(x) {

cjs/src/connection.js

+1
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
884884
function CopyDone() {
885885
stream && stream.push(null)
886886
stream = null
887+
socket.isPaused() && socket.resume()
887888
}
888889

889890
function NoticeResponse(x) {

cjs/tests/index.js

+39
Original file line numberDiff line numberDiff line change
@@ -1914,6 +1914,45 @@ t('Copy read', async() => {
19141914
]
19151915
})
19161916

1917+
t('Copy read with back-pressure', async() => {
1918+
await sql`create table test (x int)`
1919+
1920+
// Make sure there are enough rows in the table to fill the buffer
1921+
// so that `CopyDone` message is handled while the socket is paused
1922+
await sql`insert into test select * from generate_series(1,12774)`
1923+
1924+
let result = 0
1925+
const readable = await sql`copy test to stdout`.readable()
1926+
readable.on('data', _ => result++)
1927+
1928+
// Pause the stream so that the entire buffer fills up
1929+
readable.pause()
1930+
1931+
await Promise.all([
1932+
// Wait until the stream has been consumed
1933+
new Promise(r => readable.on('end', r)),
1934+
(async() => {
1935+
// Wait until the entire buffer fills up,
1936+
await new Promise(r => readable.on('readable', () => {
1937+
if (readable.readableBuffer.length === 12774)
1938+
r()
1939+
}))
1940+
// Switch the stream back to flowing mode (allowing it to be consumed)
1941+
readable.removeAllListeners('readable')
1942+
})()
1943+
])
1944+
1945+
// This is the actual test, the copy stream is done
1946+
// we should be able to run a new query
1947+
await sql`SELECT 1`
1948+
1949+
return [
1950+
result,
1951+
12774,
1952+
await sql`drop table test`
1953+
]
1954+
})
1955+
19171956
t('Copy write', { timeout: 2 }, async() => {
19181957
await sql`create table test (x int)`
19191958
const writable = await sql`copy test from stdin`.writable()

deno/src/connection.js

+1
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
887887
function CopyDone() {
888888
stream && stream.push(null)
889889
stream = null
890+
socket.isPaused() && socket.resume()
890891
}
891892

892893
function NoticeResponse(x) {

deno/tests/index.js

+39
Original file line numberDiff line numberDiff line change
@@ -1916,6 +1916,45 @@ t('Copy read', async() => {
19161916
]
19171917
})
19181918

1919+
t('Copy read with back-pressure', async() => {
1920+
await sql`create table test (x int)`
1921+
1922+
// Make sure there are enough rows in the table to fill the buffer
1923+
// so that `CopyDone` message is handled while the socket is paused
1924+
await sql`insert into test select * from generate_series(1,12774)`
1925+
1926+
let result = 0
1927+
const readable = await sql`copy test to stdout`.readable()
1928+
readable.on('data', _ => result++)
1929+
1930+
// Pause the stream so that the entire buffer fills up
1931+
readable.pause()
1932+
1933+
await Promise.all([
1934+
// Wait until the stream has been consumed
1935+
new Promise(r => readable.on('end', r)),
1936+
(async() => {
1937+
// Wait until the entire buffer fills up,
1938+
await new Promise(r => readable.on('readable', () => {
1939+
if (readable.readableBuffer.length === 12774)
1940+
r()
1941+
}))
1942+
// Switch the stream back to flowing mode (allowing it to be consumed)
1943+
readable.removeAllListeners('readable')
1944+
})()
1945+
])
1946+
1947+
// This is the actual test, the copy stream is done
1948+
// we should be able to run a new query
1949+
await sql`SELECT 1`
1950+
1951+
return [
1952+
result,
1953+
12774,
1954+
await sql`drop table test`
1955+
]
1956+
})
1957+
19191958
t('Copy write', { timeout: 2 }, async() => {
19201959
await sql`create table test (x int)`
19211960
const writable = await sql`copy test from stdin`.writable()

0 commit comments

Comments
 (0)