From 7bb450c52a7f7c7a8576f25da90240bd28b7423b Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Mon, 20 Jan 2025 15:11:22 +0100 Subject: [PATCH 1/3] Handle errors that might occur in the middle of a stream --- packages/client-common/src/settings.ts | 1 + .../integration/node_streaming_errors.test.ts | 83 +++++++++++++++++++ packages/client-node/src/result_set.ts | 14 ++++ 3 files changed, 98 insertions(+) create mode 100644 packages/client-node/__tests__/integration/node_streaming_errors.test.ts diff --git a/packages/client-common/src/settings.ts b/packages/client-common/src/settings.ts index 1f116960..61d85256 100644 --- a/packages/client-common/src/settings.ts +++ b/packages/client-common/src/settings.ts @@ -1583,6 +1583,7 @@ interface ClickHouseServerSettings { /** @see https://clickhouse.com/docs/en/interfaces/http */ interface ClickHouseHTTPSettings { + http_write_exception_in_output_format: Bool /** Ensures that the entire response is buffered. * In this case, the data that is not stored in memory will be buffered in a temporary server file. * This could help prevent errors that might occur during the streaming of SELECT queries. diff --git a/packages/client-node/__tests__/integration/node_streaming_errors.test.ts b/packages/client-node/__tests__/integration/node_streaming_errors.test.ts new file mode 100644 index 00000000..b259ce29 --- /dev/null +++ b/packages/client-node/__tests__/integration/node_streaming_errors.test.ts @@ -0,0 +1,83 @@ +import { + ClickHouseError, + type ClickHouseSettings, +} from '@clickhouse/client-common' +import { createTestClient } from '@test/utils' +import { StreamableDataFormat } from '../../src' +import { NodeClickHouseClient } from '../../src/client' + +fdescribe('[Node.js] Errors during streaming', () => { + let client: NodeClickHouseClient + beforeAll(() => { + client = createTestClient() as NodeClickHouseClient + }) + afterAll(async () => { + await client.close() + }) + + it('should work with CSV', async () => { + let err: unknown + let rowsCount = 0 + try { + const rs = await runQuery('CSV') + const stream = rs.stream() + for await (const rows of stream) { + rowsCount += rows.length + } + } catch (e) { + err = e + } + assertStreamFailure(rowsCount, err) + }) + + fit('should work with JSONEachRow', async () => { + let err: unknown + let rowsCount = 0 + try { + const rs = await runQuery('JSONEachRow', { + http_write_exception_in_output_format: 0, + }) + const stream = rs.stream() + for await (const rows of stream) { + for (const row of rows) { + row.json() // should not produce SyntaxError + rowsCount++ + } + } + } catch (e) { + err = e + } + assertStreamFailure(rowsCount, err) + }) + + function runQuery( + format: F, + settings?: ClickHouseSettings, + ) { + return client.query({ + query: ` + SELECT + number, + if(number > 20, throwIf(1, 'Boom'), number) AS value + FROM numbers(1000) + `, + format, + clickhouse_settings: { + // Forces CH to send more chunks of data (1 row each), + // so that we get 200 OK + headers first instead of 500 ISE + max_block_size: '1', + ...(settings ?? {}), + }, + }) + } + + function assertStreamFailure(rowsCount: number, err: unknown) { + // rows count may vary, but the last row will always contain the error message + expect(rowsCount).toBeGreaterThanOrEqual(10) + expect(err).toBeInstanceOf(ClickHouseError) + expect((err as ClickHouseError).code).toBe('395') + expect((err as ClickHouseError).type).toBe( + 'FUNCTION_THROW_IF_VALUE_IS_NON_ZERO', + ) + } +}) diff --git a/packages/client-node/src/result_set.ts b/packages/client-node/src/result_set.ts index d05b275d..96db6371 100644 --- a/packages/client-node/src/result_set.ts +++ b/packages/client-node/src/result_set.ts @@ -7,8 +7,10 @@ import type { Row, } from '@clickhouse/client-common' import { + ClickHouseError, isNotStreamableJSONFamily, isStreamableJSONFamily, + parseError, validateStreamFormat, } from '@clickhouse/client-common' import { Buffer } from 'buffer' @@ -111,6 +113,7 @@ export class ResultSet validateStreamFormat(this.format) let incompleteChunks: Buffer[] = [] + let lastRowText = '' const logError = this.log_error const toRows = new Transform({ transform( @@ -134,6 +137,7 @@ export class ResultSet } else { text = chunk.subarray(0, idx).toString() } + lastRowText = text rows.push({ text, json(): T { @@ -147,6 +151,7 @@ export class ResultSet idx = chunk.indexOf(NEWLINE, lastIdx) if (idx !== -1) { const text = chunk.subarray(lastIdx, idx).toString() + lastRowText = text rows.push({ text, json(): T { @@ -165,6 +170,15 @@ export class ResultSet } callback() }, + // Will be triggered if ClickHouse terminates the connection with an error while streaming + destroy(err: Error | null, callback: (error?: Error | null) => void) { + const maybeLastRowErr = parseError(lastRowText) + if (maybeLastRowErr instanceof ClickHouseError) { + callback(maybeLastRowErr) + } else { + callback(err) + } + }, autoDestroy: true, objectMode: true, }) From 5e90e2f581d1d82393109c66e9d76674b1085e9f Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Mon, 20 Jan 2025 16:21:34 +0100 Subject: [PATCH 2/3] Handle stream errors in JSONEachRow --- .../integration/node_streaming_errors.test.ts | 8 ++--- packages/client-node/src/result_set.ts | 36 ++++++++++++++----- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/packages/client-node/__tests__/integration/node_streaming_errors.test.ts b/packages/client-node/__tests__/integration/node_streaming_errors.test.ts index b259ce29..7683deba 100644 --- a/packages/client-node/__tests__/integration/node_streaming_errors.test.ts +++ b/packages/client-node/__tests__/integration/node_streaming_errors.test.ts @@ -3,10 +3,10 @@ import { type ClickHouseSettings, } from '@clickhouse/client-common' import { createTestClient } from '@test/utils' -import { StreamableDataFormat } from '../../src' -import { NodeClickHouseClient } from '../../src/client' +import type { StreamableDataFormat } from '../../src' +import type { NodeClickHouseClient } from '../../src/client' -fdescribe('[Node.js] Errors during streaming', () => { +describe('[Node.js] Errors during streaming', () => { let client: NodeClickHouseClient beforeAll(() => { client = createTestClient() as NodeClickHouseClient @@ -30,7 +30,7 @@ fdescribe('[Node.js] Errors during streaming', () => { assertStreamFailure(rowsCount, err) }) - fit('should work with JSONEachRow', async () => { + it('should work with JSONEachRow', async () => { let err: unknown let rowsCount = 0 try { diff --git a/packages/client-node/src/result_set.ts b/packages/client-node/src/result_set.ts index 96db6371..c5408caa 100644 --- a/packages/client-node/src/result_set.ts +++ b/packages/client-node/src/result_set.ts @@ -113,7 +113,7 @@ export class ResultSet validateStreamFormat(this.format) let incompleteChunks: Buffer[] = [] - let lastRowText = '' + let errorRowText: string | undefined const logError = this.log_error const toRows = new Transform({ transform( @@ -137,7 +137,6 @@ export class ResultSet } else { text = chunk.subarray(0, idx).toString() } - lastRowText = text rows.push({ text, json(): T { @@ -151,7 +150,6 @@ export class ResultSet idx = chunk.indexOf(NEWLINE, lastIdx) if (idx !== -1) { const text = chunk.subarray(lastIdx, idx).toString() - lastRowText = text rows.push({ text, json(): T { @@ -161,7 +159,22 @@ export class ResultSet } else { // to be processed during the first pass for the next chunk incompleteChunks.push(chunk.subarray(lastIdx)) - this.push(rows) + // error reporting goes like this: + // __exception__\r\n // - the row before the last one + // Code: X. DB::Exception: ...\n // - the very last row + // we are not going to push these rows downstream + if ( + rows.length > 1 && + rows[rows.length - 2].text === errorHeaderMessage + ) { + errorRowText = rows[rows.length - 1].text + // push the remaining rows before the error + if (rows.length > 2) { + this.push(rows.slice(0, -2)) + } + } else { + this.push(rows) + } } lastIdx = idx + 1 // skipping newline character } while (idx !== -1) @@ -170,13 +183,17 @@ export class ResultSet } callback() }, - // Will be triggered if ClickHouse terminates the connection with an error while streaming + // will be triggered if ClickHouse terminates the connection with an error while streaming destroy(err: Error | null, callback: (error?: Error | null) => void) { - const maybeLastRowErr = parseError(lastRowText) - if (maybeLastRowErr instanceof ClickHouseError) { - callback(maybeLastRowErr) - } else { + if (errorRowText !== undefined) { + const maybeLastRowErr = parseError(errorRowText) + if (maybeLastRowErr instanceof ClickHouseError) { + callback(maybeLastRowErr) + } + } else if (err !== null) { callback(err) + } else { + callback() } }, autoDestroy: true, @@ -217,3 +234,4 @@ export class ResultSet const streamAlreadyConsumedMessage = 'Stream has been already consumed' const resultSetClosedMessage = 'ResultSet has been closed' +const errorHeaderMessage = `__exception__\r` From 9c2ae156056aa0b29af6e4d845bf2630aed735d0 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Mon, 20 Jan 2025 16:22:44 +0100 Subject: [PATCH 3/3] Update the comment --- packages/client-node/src/result_set.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/client-node/src/result_set.ts b/packages/client-node/src/result_set.ts index c5408caa..fb68474d 100644 --- a/packages/client-node/src/result_set.ts +++ b/packages/client-node/src/result_set.ts @@ -160,8 +160,8 @@ export class ResultSet // to be processed during the first pass for the next chunk incompleteChunks.push(chunk.subarray(lastIdx)) // error reporting goes like this: - // __exception__\r\n // - the row before the last one - // Code: X. DB::Exception: ...\n // - the very last row + // __exception__\r\n // - the row before the last one + // Code: X. DB::Exception: ...\n // - the very last row // we are not going to push these rows downstream if ( rows.length > 1 &&