diff --git a/lib/include/duckdb/web/webdb.h b/lib/include/duckdb/web/webdb.h index 82428e9e9..18c6cbe74 100644 --- a/lib/include/duckdb/web/webdb.h +++ b/lib/include/duckdb/web/webdb.h @@ -85,7 +85,7 @@ class WebDB { /// Run a query and return the materialized query result arrow::Result> RunQuery(std::string_view text); /// Execute a query as pending query and return the stream schema when finished - arrow::Result> PendingQuery(std::string_view text); + arrow::Result> PendingQuery(std::string_view text, bool allow_stream_result); /// Poll a pending query and return the schema when finished arrow::Result> PollPendingQuery(); /// Cancel a pending query diff --git a/lib/src/webdb.cc b/lib/src/webdb.cc index 0161443aa..447e10ffe 100644 --- a/lib/src/webdb.cc +++ b/lib/src/webdb.cc @@ -164,10 +164,11 @@ arrow::Result> WebDB::Connection::RunQuery(std::s } } -arrow::Result> WebDB::Connection::PendingQuery(std::string_view text) { +arrow::Result> WebDB::Connection::PendingQuery(std::string_view text, + bool allow_stream_result) { try { // Send the query - auto result = connection_.PendingQuery(std::string{text}); + auto result = connection_.PendingQuery(std::string{text}, allow_stream_result); if (result->HasError()) return arrow::Status{arrow::StatusCode::ExecutionError, std::move(result->GetError())}; current_pending_query_result_ = std::move(result); current_pending_query_was_canceled_ = false; diff --git a/lib/src/webdb_api.cc b/lib/src/webdb_api.cc index 6244df6bb..33448a518 100644 --- a/lib/src/webdb_api.cc +++ b/lib/src/webdb_api.cc @@ -197,9 +197,10 @@ void duckdb_web_query_run_buffer(WASMResponse* packed, ConnectionHdl connHdl, co WASMResponseBuffer::Get().Store(*packed, std::move(r)); } /// Start a pending query -void duckdb_web_pending_query_start(WASMResponse* packed, ConnectionHdl connHdl, const char* script) { +void duckdb_web_pending_query_start(WASMResponse* packed, ConnectionHdl connHdl, const char* script, + bool allow_stream_result) { auto c = reinterpret_cast(connHdl); - auto r = c->PendingQuery(script); + auto r = c->PendingQuery(script, allow_stream_result); WASMResponseBuffer::Get().Store(*packed, std::move(r)); } /// Poll a pending query diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index 0823f0e7a..159e4bd25 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -164,8 +164,8 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { /** Send a query and return the full result */ public runQuery(conn: number, text: string): Uint8Array { const BUF = TEXT_ENCODER.encode(text); - const bufferPtr = this.mod._malloc(BUF.length ); - const bufferOfs = this.mod.HEAPU8.subarray(bufferPtr, bufferPtr + BUF.length ); + const bufferPtr = this.mod._malloc(BUF.length); + const bufferOfs = this.mod.HEAPU8.subarray(bufferPtr, bufferPtr + BUF.length); bufferOfs.set(BUF); const [s, d, n] = callSRet(this.mod, 'duckdb_web_query_run_buffer', ['number', 'number', 'number'], [conn, bufferPtr, BUF.length]); if (s !== StatusCode.SUCCESS) { @@ -182,8 +182,13 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { * On null, the query has to be executed using `pollPendingQuery` until that returns != null. * Results can then be fetched using `fetchQueryResults` */ - public startPendingQuery(conn: number, text: string): Uint8Array | null { - const [s, d, n] = callSRet(this.mod, 'duckdb_web_pending_query_start', ['number', 'string'], [conn, text]); + public startPendingQuery(conn: number, text: string, allowStreamResult: boolean = false): Uint8Array | null { + const [s, d, n] = callSRet( + this.mod, + 'duckdb_web_pending_query_start', + ['number', 'string', 'boolean'], + [conn, text, allowStreamResult], + ); if (s !== StatusCode.SUCCESS) { throw new Error(readString(this.mod, d, n)); } diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 9dcf422db..4d2ad5e17 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -16,7 +16,7 @@ export interface DuckDBBindings { connect(): DuckDBConnection; disconnect(conn: number): void; runQuery(conn: number, text: string): Uint8Array; - startPendingQuery(conn: number, text: string): Uint8Array | null; + startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Uint8Array | null; pollPendingQuery(conn: number): Uint8Array | null; cancelPendingQuery(conn: number): boolean; fetchQueryResults(conn: number): Uint8Array; diff --git a/packages/duckdb-wasm/src/bindings/connection.ts b/packages/duckdb-wasm/src/bindings/connection.ts index da30d1551..d9e0996f2 100644 --- a/packages/duckdb-wasm/src/bindings/connection.ts +++ b/packages/duckdb-wasm/src/bindings/connection.ts @@ -37,8 +37,9 @@ export class DuckDBConnection { /** Send a query */ public async send( text: string, + allowStreamResult: boolean = false, ): Promise> { - let header = this._bindings.startPendingQuery(this._conn, text); + let header = this._bindings.startPendingQuery(this._conn, text, allowStreamResult); while (header == null) { header = await new Promise((resolve, reject) => { try { @@ -79,7 +80,7 @@ export class DuckDBConnection { /** Insert an arrow table */ public insertArrowTable(table: arrow.Table, options: ArrowInsertOptions): void { - const buffer = arrow.tableToIPC(table, 'stream'); + const buffer = arrow.tableToIPC(table, 'stream'); this.insertArrowFromIPCStream(buffer, options); } /** Insert an arrow table from an ipc stream */ diff --git a/packages/duckdb-wasm/src/parallel/async_bindings.ts b/packages/duckdb-wasm/src/parallel/async_bindings.ts index d8435a59c..14a03b547 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings.ts @@ -401,11 +401,16 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { } /** Start a pending query */ - public async startPendingQuery(conn: ConnectionID, text: string): Promise { - const task = new WorkerTask( + public async startPendingQuery( + conn: ConnectionID, + text: string, + allowStreamResult: boolean = false, + ): Promise { + const task = new WorkerTask< WorkerRequestType.START_PENDING_QUERY, - [conn, text], - ); + [ConnectionID, string, boolean], + Uint8Array | null + >(WorkerRequestType.START_PENDING_QUERY, [conn, text, allowStreamResult]); return await this.postTask(task); } /** Poll a pending query */ diff --git a/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts b/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts index 20692c621..97ba2b191 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts @@ -19,7 +19,7 @@ export interface AsyncDuckDBBindings { disconnect(conn: number): Promise; runQuery(conn: number, text: string): Promise; - startPendingQuery(conn: number, text: string): Promise; + startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Promise; pollPendingQuery(conn: number): Promise; cancelPendingQuery(conn: number): Promise; fetchQueryResults(conn: number): Promise; diff --git a/packages/duckdb-wasm/src/parallel/async_connection.ts b/packages/duckdb-wasm/src/parallel/async_connection.ts index 7559ef3d4..52a782267 100644 --- a/packages/duckdb-wasm/src/parallel/async_connection.ts +++ b/packages/duckdb-wasm/src/parallel/async_connection.ts @@ -50,6 +50,7 @@ export class AsyncDuckDBConnection { /** Send a query */ public async send( text: string, + allowStreamResult: boolean = false, ): Promise> { this._bindings.logger.log({ timestamp: new Date(), @@ -59,7 +60,7 @@ export class AsyncDuckDBConnection { event: LogEvent.RUN, value: text, }); - let header = await this._bindings.startPendingQuery(this._conn, text); + let header = await this._bindings.startPendingQuery(this._conn, text, allowStreamResult); while (header == null) { header = await this._bindings.pollPendingQuery(this._conn); } diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index 5d1ea6b1c..9e1c9aa41 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -225,7 +225,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { break; } case WorkerRequestType.START_PENDING_QUERY: { - const result = this._bindings.startPendingQuery(request.data[0], request.data[1]); + const result = this._bindings.startPendingQuery(request.data[0], request.data[1], request.data[2]); const transfer = []; if (result) { transfer.push(result.buffer); diff --git a/packages/duckdb-wasm/src/parallel/worker_request.ts b/packages/duckdb-wasm/src/parallel/worker_request.ts index f1402da20..cf128cdbb 100644 --- a/packages/duckdb-wasm/src/parallel/worker_request.ts +++ b/packages/duckdb-wasm/src/parallel/worker_request.ts @@ -139,7 +139,7 @@ export type WorkerRequestVariant = | WorkerRequest | WorkerRequest | WorkerRequest - | WorkerRequest + | WorkerRequest | WorkerRequest; export type WorkerResponseVariant = @@ -198,7 +198,7 @@ export type WorkerTaskVariant = | WorkerTask | WorkerTask | WorkerTask - | WorkerTask + | WorkerTask | WorkerTask | WorkerTask | WorkerTask;