From 0819148e530f5f9fc7670ed6d4827cbcdee6b961 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 4 Jul 2025 14:11:45 -0400 Subject: [PATCH 1/6] feat(bigquery/v2): query client based on autogen package --- package.json | 3 + src/index.ts | 1 + src/query/builder.ts | 36 ++++ src/query/client.ts | 155 +++++++++++++++++ src/query/format.ts | 47 +++++ src/query/index.ts | 20 +++ src/query/iterator.ts | 60 +++++++ src/query/job.ts | 157 +++++++++++++++++ src/query/reader.ts | 72 ++++++++ src/query/row.ts | 69 ++++++++ src/query/schema.ts | 31 ++++ src/query/value.ts | 140 +++++++++++++++ system-test/fixtures/query.ts | 313 ++++++++++++++++++++++++++++++++++ system-test/query/query.ts | 79 +++++++++ system-test/query/value.ts | 68 ++++++++ tsconfig.json | 5 +- 16 files changed, 1255 insertions(+), 1 deletion(-) create mode 100644 src/query/builder.ts create mode 100644 src/query/client.ts create mode 100644 src/query/format.ts create mode 100644 src/query/index.ts create mode 100644 src/query/iterator.ts create mode 100644 src/query/job.ts create mode 100644 src/query/reader.ts create mode 100644 src/query/row.ts create mode 100644 src/query/schema.ts create mode 100644 src/query/value.ts create mode 100644 system-test/fixtures/query.ts create mode 100644 system-test/query/query.ts create mode 100644 system-test/query/value.ts diff --git a/package.json b/package.json index 9d66c6ae4..7756d43f5 100644 --- a/package.json +++ b/package.json @@ -43,13 +43,16 @@ "test": "c8 mocha build/test" }, "dependencies": { + "big.js": "^7.0.1", "google-gax": "^5.0.0" }, "devDependencies": { + "@types/big.js": "^6.2.2", "@types/mocha": "^10.0.10", "@types/node": "^22.15.21", "@types/sinon": "^17.0.4", "c8": "^10.1.3", + "eslint-plugin-prettier": "^5.5.1", "gapic-tools": "^1.0.2", "gts": "^6.0.2", "jsdoc": "^4.0.4", diff --git a/src/index.ts b/src/index.ts index f86616dca..f92513e49 100644 --- a/src/index.ts +++ b/src/index.ts @@ -46,3 +46,4 @@ export { }; import * as protos from '../protos/protos'; export {protos}; +export * as query from './query'; diff --git a/src/query/builder.ts b/src/query/builder.ts new file mode 100644 index 000000000..c3c378f4f --- /dev/null +++ b/src/query/builder.ts @@ -0,0 +1,36 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {protos} from '../'; + +/** + * QueryFromSQL creates a query configuration from a SQL string. + * @param {string} sql The SQL query. + * @returns {protos.google.cloud.bigquery.v2.IPostQueryRequest} + */ +export function queryFromSQL( + projectId: string, + sql: string, +): protos.google.cloud.bigquery.v2.IPostQueryRequest { + return { + queryRequest: { + query: sql, + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + }, + projectId, + }; +} diff --git a/src/query/client.ts b/src/query/client.ts new file mode 100644 index 000000000..7bbbae127 --- /dev/null +++ b/src/query/client.ts @@ -0,0 +1,155 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + BigQueryClient, + BigQueryClientOptions, + SubClientOptions, +} from '../bigquery'; +import {QueryJob} from './job'; +import {CallOptions} from 'google-gax'; +import {protos} from '../'; +import {queryFromSQL as builderQueryFromSQL} from './builder'; +import {QueryReader} from './reader'; + +/** + * QueryClient is a client for running queries in BigQuery. + */ +export class QueryClient { + private client: BigQueryClient; + private projectID: string; + private billingProjectID: string; + + /** + * @param {BigQueryClientOptions} options - The configuration object. + */ + constructor( + options?: BigQueryClientOptions, + subClientOptions?: SubClientOptions, + ) { + this.client = new BigQueryClient(options, subClientOptions); + this.projectID = ''; + this.billingProjectID = ''; + void this.client.jobClient.getProjectId().then(projectId => { + this.projectID = projectId; + if (this.billingProjectID !== '') { + this.billingProjectID = projectId; + } + }); + } + + setBillingProjectID(projectID: string) { + this.billingProjectID = projectID; + } + + /** + * QueryFromSQL creates a query configuration from a SQL string. + * @param {string} sql The SQL query. + * @returns {protos.google.cloud.bigquery.v2.IPostQueryRequest} + */ + queryFromSQL(sql: string): protos.google.cloud.bigquery.v2.IPostQueryRequest { + const req = builderQueryFromSQL(this.projectID, sql); + return req; + } + + /** + * NewQueryReader creates a new QueryReader. + * @returns {QueryReader} + */ + newQueryReader(): QueryReader { + return new QueryReader(this); + } + + /** + * Runs a query and returns a QueryJob handle. + * + * @param {protos.google.cloud.bigquery.v2.IPostQueryRequest} request + * The request object that will be sent. + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async startQuery( + request: protos.google.cloud.bigquery.v2.IPostQueryRequest, + options?: CallOptions, + ): Promise { + const [response] = await this.client.jobClient.query(request, options); + return new QueryJob(this, response); + } + + /** + * Runs a query and returns a QueryJob handle. + * + * @param {protos.google.cloud.bigquery.v2.IQueryRequest} request + * The request object that will be sent. + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async startQueryRequest( + request: protos.google.cloud.bigquery.v2.IQueryRequest, + options?: CallOptions, + ): Promise { + return this.startQuery( + { + queryRequest: request, + projectId: this.projectID, + }, + options, + ); + } + + /** + * Starts a new asynchronous job. + * + * @param {protos.google.cloud.bigquery.v2.IJob} job + * A job resource to insert + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async startQueryJob( + job: protos.google.cloud.bigquery.v2.IJob, + options?: CallOptions, + ): Promise { + const [response] = await this.client.jobClient.insertJob( + { + projectId: this.projectID, + job, + }, + options, + ); + return new QueryJob(this, {jobReference: response.jobReference}); + } + + /** + * Gets the results of a query job. + * + * @param {protos.google.cloud.bigquery.v2.IGetQueryResultsRequest} request + * The request object that will be sent. + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async _getQueryResults( + request: protos.google.cloud.bigquery.v2.IGetQueryResultsRequest, + options?: CallOptions, + ): Promise<[protos.google.cloud.bigquery.v2.IGetQueryResultsResponse]> { + const [response] = await this.client.jobClient.getQueryResults( + request, + options, + ); + return [response]; + } +} diff --git a/src/query/format.ts b/src/query/format.ts new file mode 100644 index 000000000..bd600d50e --- /dev/null +++ b/src/query/format.ts @@ -0,0 +1,47 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +export function civilDateString(d: Date): string { + return d.toISOString().slice(0, 10); +} + +export function civilTimeString(value: string | Date): string { + if (value instanceof Date) { + const h = `${value.getHours()}`.padStart(2, "0"); + const m = `${value.getMinutes()}`.padStart(2, "0"); + const s = `${value.getSeconds()}`.padStart(2, "0"); + const f = `${value.getMilliseconds()}`.padStart(3, "0"); + return `${h}:${m}:${s}.${f}`; + } + return value; +} + +export function civilDateTimeString(value: Date | string): string { + if (value instanceof Date) { + let time; + if (value.getHours()) { + time = civilTimeString(value); + } + const y = `${value.getFullYear()}`.padStart(2, "0"); + const m = `${value.getMonth()+1}`.padStart(2, "0"); + const d = `${value.getDate()}`.padStart(2, "0"); + time = time ? ' ' + time : ''; + return `${y}-${m}-${d}${time}`; + } + return value.replace(/^(.*)T(.*)Z$/, '$1 $2'); +} + +export function timestampString(ts: Date): string { + return ts.toISOString().replace('T', ' ').replace('Z', ''); +} diff --git a/src/query/index.ts b/src/query/index.ts new file mode 100644 index 000000000..6ef842ee3 --- /dev/null +++ b/src/query/index.ts @@ -0,0 +1,20 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +export {QueryClient} from './client'; +export {QueryJob} from './job'; +export {Row} from './row'; +export {RowIterator} from './iterator'; +export {queryFromSQL} from './builder'; +export {QueryReader, withPageToken} from './reader'; diff --git a/src/query/iterator.ts b/src/query/iterator.ts new file mode 100644 index 000000000..76049a3cc --- /dev/null +++ b/src/query/iterator.ts @@ -0,0 +1,60 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {QueryJob} from './job'; +import {Row} from './row'; + +/** + * RowIterator iterates over the results of a query. + */ +export class RowIterator { + private job: QueryJob; + private pageToken?: string; + private rows: Row[] = []; + + constructor( + job: QueryJob, + opts?: { + rows?: Row[]; + pageToken?: string; + }, + ) { + this.job = job; + this.pageToken = opts?.pageToken; + this.rows = opts?.rows ?? []; + } + + async fetchRows() { + const [rows, _, pageToken] = await this.job.getRows(this.pageToken); + this.rows = rows; + this.pageToken = pageToken || undefined; + } + + /** + * Asynchronously iterates over the rows in the query result. + */ + async *[Symbol.asyncIterator](): AsyncGenerator { + if (this.rows.length > 0) { + for (const row of this.rows) { + yield row; + } + } + while (this.pageToken) { + await this.fetchRows(); + for (const row of this.rows) { + yield row; + } + } + } +} diff --git a/src/query/job.ts b/src/query/job.ts new file mode 100644 index 000000000..6f6d3a79d --- /dev/null +++ b/src/query/job.ts @@ -0,0 +1,157 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {QueryClient} from './client'; +import {protos} from '../'; +import {RowIterator} from './iterator'; +import {Row} from './row'; +import {Schema} from './schema'; +import {convertRows} from './value'; +import {CallOptions} from 'google-gax'; + +/** + * Query represents a query job. + */ +export class QueryJob { + private client: QueryClient; + private complete: boolean; + private projectID: string; + private jobID: string; + private location: string; + + private cachedRows: Row[]; + private cachedSchema: protos.google.cloud.bigquery.v2.ITableSchema; + private cachedPageToken: string; + private cachedTotalRows: number; + + constructor( + client: QueryClient, + response: protos.google.cloud.bigquery.v2.IQueryResponse, + ) { + this.client = client; + + this.cachedRows = []; + this.cachedSchema = {}; + this.cachedPageToken = ''; + this.cachedTotalRows = 0; + this.complete = false; + + this.consumeQueryResponse({ + jobComplete: response.jobComplete, + schema: response.schema, + pageToken: response.pageToken, + totalRows: response.totalRows, + rows: response.rows, + }); + + if (response.jobReference) { + this.projectID = response.jobReference.projectId!; + this.jobID = response.jobReference.jobId!; + this.location = response.jobReference.location?.value || ''; + } else { + // This should not happen, but we need to initialize the properties. + this.projectID = ''; + this.jobID = ''; + this.location = ''; + } + } + + jobReference(): protos.google.cloud.bigquery.v2.IJobReference { + return { + jobId: this.jobID, + projectId: this.projectID, + location: {value: this.location}, + }; + } + + get schema(): protos.google.cloud.bigquery.v2.ITableSchema { + return this.cachedSchema; + } + + /** + * Waits for the query to complete. + * + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + */ + async wait(options?: CallOptions): Promise { + while (!this.complete) { + await this.checkStatus(options); + if (!this.complete) { + await new Promise(resolve => setTimeout(resolve, 1000)); // TODO: exponential backoff + } + } + } + + /** + * Returns a RowIterator for the query results. + * + * @returns {RowIterator} + */ + read(): RowIterator { + const it = new RowIterator(this, { + pageToken: this.cachedPageToken, + rows: this.cachedRows, + }); + return it; + } + + async getRows( + pageToken?: string, + ): Promise< + [Row[], protos.google.cloud.bigquery.v2.ITableSchema | null, string | null] + > { + const [response] = await this.client._getQueryResults({ + projectId: this.projectID, + jobId: this.jobID, + location: this.location, + pageToken, + formatOptions: { + useInt64Timestamp: true, + }, + }); + + const rows = convertRows(response.rows || [], new Schema(response.schema!)); + return [rows, response.schema || null, response.pageToken || null]; + } + + private consumeQueryResponse( + response: protos.google.cloud.bigquery.v2.IGetQueryResultsResponse, + ) { + this.complete = response.jobComplete?.value ?? false; + this.cachedSchema = response.schema!; + this.cachedPageToken = response.pageToken!; + this.cachedTotalRows = Number(response.totalRows); + this.cachedRows = convertRows( + response.rows || [], + new Schema(this.cachedSchema), + ); + } + + private async checkStatus(options?: CallOptions): Promise { + const [response] = await this.client._getQueryResults( + { + projectId: this.projectID, + jobId: this.jobID, + location: this.location, + maxResults: {value: 0}, + formatOptions: { + useInt64Timestamp: true, + }, + }, + options, + ); + this.consumeQueryResponse(response); + } +} diff --git a/src/query/reader.ts b/src/query/reader.ts new file mode 100644 index 000000000..7d567f9cf --- /dev/null +++ b/src/query/reader.ts @@ -0,0 +1,72 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {QueryClient} from './client'; +import {RowIterator} from './iterator'; +import {QueryJob} from './job'; +import {protos} from '../'; + +export interface ReadState { + pageToken?: string; +} + +export type ReadOption = (state: ReadState) => void; + +export function withPageToken(token: string): ReadOption { + return (state: ReadState) => { + state.pageToken = token; + }; +} + +/** + * QueryReader is used to read the results of a query. + */ +export class QueryReader { + private client: QueryClient; + + constructor(client: QueryClient) { + this.client = client; + } + + /** + * Read reads the results of a query job. + * @param {protos.google.cloud.bigquery.v2.IJobReference} jobRef The job reference. + * @param {protos.google.cloud.bigquery.v2.ITableSchema} schema The schema of the results. + * @param {ReadOption[]} opts The options for reading the results. + * @returns {Promise} + */ + async read( + jobRef: protos.google.cloud.bigquery.v2.IJobReference, + schema: protos.google.cloud.bigquery.v2.ITableSchema, + ...opts: ReadOption[] + ): Promise { + const query = new QueryJob(this.client, { + jobReference: jobRef, + schema, + }); + + const initState: ReadState = {}; + for (const opt of opts) { + opt(initState); + } + + const itOpts: {pageToken?: string} = {}; + if (initState.pageToken) { + itOpts.pageToken = initState.pageToken; + } + const it = new RowIterator(query, itOpts); + await it.fetchRows(); + return it; + } +} diff --git a/src/query/row.ts b/src/query/row.ts new file mode 100644 index 000000000..ba7f023d1 --- /dev/null +++ b/src/query/row.ts @@ -0,0 +1,69 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Schema} from './schema'; +import {Value} from './value'; + +/** + * Represents a row in a query result. + */ +export class Row { + private schema: Schema; + private value: {[key: string]: Value}; + + constructor(schema: Schema) { + this.schema = schema; + this.value = {}; + } + + set(columnName: string, value: Value) { + this.value[columnName] = value; + } + + /** + * toJSON returns the row as a JSON object. + */ + toJSON(): {[key: string]: Value} { + const values: {[key: string]: Value} = {}; + for (const field of this.schema.pb.fields!) { + let fval = this.value[field.name!]; + if (fval instanceof Row) { + fval = fval.toJSON(); + } + if (Array.isArray(fval)) { + fval = fval.map(v => (v instanceof Row ? v.toJSON() : v)); + } + values[field.name!] = fval; + } + return values; + } + + /** + * toValues encodes the row into an array of Value. + */ + toValues(): Value[] { + const values: Value[] = []; + for (const field of this.schema.pb.fields!) { + let v = this.value[field.name!]; + if (v instanceof Row) { + v = v.toValues(); + } + if (Array.isArray(v)) { + v = v.map(r => (r instanceof Row ? r.toValues() : r)); + } + values.push(v); + } + return values; + } +} diff --git a/src/query/schema.ts b/src/query/schema.ts new file mode 100644 index 000000000..718efd37e --- /dev/null +++ b/src/query/schema.ts @@ -0,0 +1,31 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {protos} from '../'; + +export class Schema { + pb: protos.google.cloud.bigquery.v2.ITableSchema; + + constructor(pb: protos.google.cloud.bigquery.v2.ITableSchema) { + this.pb = pb; + } + + static fromField(field: protos.google.cloud.bigquery.v2.ITableFieldSchema) { + return new Schema({fields: field.fields}); + } + + get length() { + return this.pb.fields?.length || 0; + } +} diff --git a/src/query/value.ts b/src/query/value.ts new file mode 100644 index 000000000..00d6c7af0 --- /dev/null +++ b/src/query/value.ts @@ -0,0 +1,140 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {protos} from '../'; +import {Row} from './row'; +import {Schema} from './schema'; +import * as Big from 'big.js'; + +export type Value = any; + +export function convertRows( + rows: protos.google.protobuf.IStruct[], + schema: Schema, +): Row[] { + return rows.map(row => convertRow(row, schema)); +} + +function convertRow(row: protos.google.protobuf.IStruct, schema: Schema): Row { + const fields = getFieldList(row); + if (schema.length !== fields.length) { + throw new Error('Schema length does not match row length'); + } + const newRow = new Row(schema); + for (let i = 0; i < fields.length; i++) { + const cell = fields[i]; + const cellValue = getFieldValue(cell); + const fs = schema.pb.fields![i]; + const value = convertValue( + cellValue, + fs.type! as any, + Schema.fromField(fs), + ); + newRow.set(fs.name!, value); + } + return newRow; +} + +function convertValue( + val: protos.google.protobuf.IValue, + typ: string, + schema: Schema, +): Value { + if (val.nullValue !== undefined && val.nullValue !== null) { + return null; + } + if (val.listValue) { + return convertRepeatedRecord(val.listValue, typ, schema); + } + if (val.structValue) { + return convertNestedRecord(val.structValue, schema); + } + if (val.stringValue) { + return convertBasicType(val.stringValue, typ); + } + throw new Error(`Got value ${val}; expected a value of type ${typ}`); +} + +function convertRepeatedRecord( + vals: protos.google.protobuf.IListValue, + typ: string, + schema: Schema, +): Value[] { + return vals.values!.map(cell => { + const val = getFieldValue(cell); + return convertValue(val, typ, schema); + }); +} + +function convertNestedRecord( + val: protos.google.protobuf.IStruct, + schema: Schema, +): Row { + return convertRow(val, schema); +} + +function convertBasicType(val: string, typ: string): Value { + switch (typ) { + case 'STRING': + case 'GEOGRAPHY': + case 'JSON': + return val; + case 'BYTES': + return Buffer.from(val, 'base64'); + case 'INTEGER': + return parseInt(val, 10); + case 'FLOAT': + return parseFloat(val); + case 'BOOLEAN': + return val.toLowerCase() === 'true'; + case 'TIMESTAMP': + return new Date(parseInt(val, 10) / 1000); + case 'DATE': + return new Date(val); + case 'TIME': + return val; + case 'DATETIME': + return new Date(val); + case 'NUMERIC': + case 'BIGNUMERIC': + return Big(val); + case 'INTERVAL': + default: + throw new Error(`Unsupported type: ${typ}`); + } +} + +function getFieldList(row: protos.google.protobuf.IStruct): any[] { + const fieldValue = row.fields?.f; + if (!fieldValue) { + throw new Error('Missing fields in row'); + } + const fields = fieldValue.listValue; + if (!fields) { + throw new Error('Missing fields in row'); + } + return fields.values!; +} + +function getFieldValue(val: protos.google.protobuf.IValue): any { + const s = val.structValue; + if (!s) { + throw new Error('Missing value in a field row'); + } + const fieldValue = s.fields?.v; + if (!fieldValue) { + throw new Error('Missing value in a field row'); + } + return fieldValue; +} diff --git a/system-test/fixtures/query.ts b/system-test/fixtures/query.ts new file mode 100644 index 000000000..259262746 --- /dev/null +++ b/system-test/fixtures/query.ts @@ -0,0 +1,313 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + civilDateString, + civilDateTimeString, + civilTimeString, + timestampString, +} from '../../src/query/format'; +import {protos} from '../../src'; + +export interface QueryParameterTestCase { + name: string; + query: string; + parameters: protos.google.cloud.bigquery.v2.IQueryParameter[]; + wantRowJSON: {[key: string]: any}; + wantRowValues: any[]; +} + +export function queryParameterTestCases(): QueryParameterTestCase[] { + const d = new Date('2016-03-20'); + const tm = '15:04:05.003000'; // TODO use civil time type + const dtm = new Date('2016-03-20T15:04:05.003Z'); + const ts = new Date('2016-03-20T15:04:05Z'); + + return [ + { + name: 'Int64Param', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'INT64'}, + parameterValue: { + value: {value: '1'}, + }, + }, + ], + wantRowJSON: {f0_: 1}, + wantRowValues: [1], + }, + { + name: 'FloatParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'FLOAT64'}, + parameterValue: { + value: {value: '1.3'}, + }, + }, + ], + wantRowJSON: {f0_: 1.3}, + wantRowValues: [1.3], + }, + { + name: 'BoolParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'BOOL'}, + parameterValue: {value: {value: 'true'}}, + }, + ], + wantRowJSON: {f0_: true}, + wantRowValues: [true], + }, + { + name: 'StringParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'STRING'}, + parameterValue: {value: {value:'ABC'}}, + }, + ], + wantRowJSON: {f0_: 'ABC'}, + wantRowValues: ['ABC'], + }, + { + name: 'ByteParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'BYTES'}, + parameterValue: { + value: {value: Buffer.from('foo').toString('base64')}, + }, + }, + ], + wantRowJSON: {f0_: Buffer.from('foo')}, + wantRowValues: [Buffer.from('foo')], + }, + { + name: 'TimestampParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'TIMESTAMP'}, + parameterValue: {value: {value:timestampString(ts)}}, + }, + ], + wantRowJSON: {f0_: ts}, + wantRowValues: [ts], + }, + { + name: 'TimestampArrayParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: { + type: 'ARRAY', + arrayType: { + type: 'TIMESTAMP', + }, + }, + parameterValue: { + arrayValues: [{value: {value: timestampString(ts)}}, {value: {value: timestampString(ts)}}], + }, + }, + ], + wantRowJSON: {f0_: [ts, ts]}, + wantRowValues: [[ts, ts]], + }, + { + name: 'DatetimeParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'DATETIME'}, + parameterValue: {value: {value: civilDateTimeString(dtm)}}, + }, + ], + wantRowJSON: {f0_: dtm}, + wantRowValues: [dtm], + }, + { + name: 'DateParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'DATE'}, + parameterValue: {value: {value:civilDateString(d)}}, + }, + ], + wantRowJSON: {f0_: d}, + wantRowValues: [d], + }, + { + name: 'TimeParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'TIME'}, + parameterValue: {value: {value:civilTimeString(tm)}}, + }, + ], + wantRowJSON: {f0_: tm}, + wantRowValues: [tm], + }, + { + name: 'JsonParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: {type: 'JSON'}, + parameterValue: { + value: {value:'{"alpha":"beta"}'}, + }, + }, + ], + wantRowJSON: {f0_: '{"alpha":"beta"}'}, + wantRowValues: ['{"alpha":"beta"}'], + }, + { + name: 'NestedStructParam', + query: 'SELECT @val', + parameters: [ + { + name: 'val', + parameterType: { + type: 'STRUCT', + structTypes: [ + { + name: 'Datetime', + type: { + type: 'DATETIME', + }, + }, + { + name: 'StringArray', + type: { + type: 'ARRAY', + arrayType: { + type: 'STRING', + }, + }, + }, + { + name: 'SubStruct', + type: { + type: 'STRUCT', + structTypes: [ + { + name: 'String', + type: { + type: 'STRING', + }, + }, + ], + }, + }, + { + name: 'SubStructArray', + type: { + type: 'ARRAY', + arrayType: { + type: 'STRUCT', + structTypes: [ + { + name: 'String', + type: { + type: 'STRING', + }, + }, + ], + }, + }, + }, + ], + }, + parameterValue: { + structValues: { + Datetime: { + value: {value: civilDateTimeString(dtm)}, + }, + StringArray: { + arrayValues: [{value: {value: 'a'}}, {value: {value: 'b'}}], + }, + SubStruct: { + structValues: { + String: { + value: {value: 'c'}, + }, + }, + }, + SubStructArray: { + arrayValues: [ + { + structValues: { + String: { + value: {value: 'd'}, + }, + }, + }, + { + structValues: { + String: { + value: {value: 'e'}, + }, + }, + }, + ], + }, + }, + }, + }, + ], + wantRowJSON: { + f0_: { + Datetime: dtm, + StringArray: ['a', 'b'], + SubStruct: { + String: 'c', + }, + SubStructArray: [ + { + String: 'd', + }, + { + String: 'e', + }, + ], + }, + }, + wantRowValues: [ + [dtm, ['a', 'b'], ['c'], [['d'], ['e']]], + ], + }, + ]; +} + diff --git a/system-test/query/query.ts b/system-test/query/query.ts new file mode 100644 index 000000000..291a5def5 --- /dev/null +++ b/system-test/query/query.ts @@ -0,0 +1,79 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it} from 'mocha'; +import {query, protos} from '../../src'; + +describe('Run Query', () => { + const client = new query.QueryClient(); + + it('should run a stateless query', async () => { + const req = client.queryFromSQL( + 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + ); + req.queryRequest!.jobCreationMode = + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_OPTIONAL; + + const q = await client.startQuery(req); + await q.wait(); + + const it = q.read(); + const rows = []; + for await (const row of it) { + rows.push(row.toJSON()); + } + assert.strictEqual(rows.length, 1); + }); + + it('should read a query job without cache', async () => { + const req = client.queryFromSQL( + 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + ); + req.queryRequest!.jobCreationMode = + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED; + + const q = await client.startQuery(req); + await q.wait(); + + const reader = client.newQueryReader(); + const jobRef = q.jobReference(); + + const it = await reader.read(jobRef, q.schema); + const rows = []; + for await (const row of it) { + rows.push(row.toJSON()); + } + assert.strictEqual(rows.length, 1); + }); + + it('should insert a query job', async () => { + const q = await client.startQueryJob({ + configuration: { + query: { + query: 'SELECT CURRENT_DATETIME() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + }, + }, + }); + await q.wait(); + + const it = q.read(); + const rows = []; + for await (const row of it) { + rows.push(row); + } + assert.strictEqual(rows.length, 1); + }); +}); diff --git a/system-test/query/value.ts b/system-test/query/value.ts new file mode 100644 index 000000000..0e4e1a563 --- /dev/null +++ b/system-test/query/value.ts @@ -0,0 +1,68 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it} from 'mocha'; +import {query} from '../../src'; + +import {queryParameterTestCases} from '../fixtures/query'; + +describe('Read Query Values', () => { + const client = new query.QueryClient(); + + describe('types', () => { + for (const tc of queryParameterTestCases()) { + it(tc.name, async () => { + const req = client.queryFromSQL(tc.query); + req.queryRequest!.queryParameters = tc.parameters; + + const q = await client.startQuery(req); + await q.wait(); + + const it = q.read(); + const rows = []; + for await (const row of it) { + rows.push(row); + } + assert.deepStrictEqual(rows[0].toJSON(), tc.wantRowJSON); + assert.deepStrictEqual(rows[0].toValues(), tc.wantRowValues); + }); + } + }); + + it('should read nested objects', async () => { + const req = client.queryFromSQL( + "SELECT 40 as age, [STRUCT(STRUCT('1' as a, '2' as b) as object)] as nested", + ); + const q = await client.startQuery(req); + await q.wait(); + + const it = q.read(); + const rows = []; + for await (const row of it) { + rows.push(row); + } + assert.deepStrictEqual(rows[0].toJSON(), { + age: 40, + nested: [ + { + object: { + a: '1', + b: '2', + }, + }, + ], + }); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index 6822c5504..568b1b68f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -15,11 +15,14 @@ "test/*.ts", "test/**/*.ts", "system-test/*.ts", + "system-test/**/*.ts", "src/**/*.json", "protos/protos.json", "benchmark/*.ts", "scripts/*.ts", "samples/**/*.json" - + ], + "exclude": [ + "system-test/fixtures/sample/**/*.ts" ] } From 4d959433a37743b0bf53bdd791289918b09d2b1e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 4 Jul 2025 17:02:13 -0400 Subject: [PATCH 2/6] feat: allow query wait cancellation --- src/query/client.ts | 47 ++++++++++++-------------------------- src/query/format.ts | 14 ++++++------ src/query/iterator.ts | 2 +- src/query/job.ts | 45 ++++++++++++++++++++++++++++-------- system-test/query/query.ts | 43 ++++++++++++++++++++++++++++++++-- 5 files changed, 99 insertions(+), 52 deletions(-) diff --git a/src/query/client.ts b/src/query/client.ts index 7bbbae127..785c02fdb 100644 --- a/src/query/client.ts +++ b/src/query/client.ts @@ -17,8 +17,7 @@ import { BigQueryClientOptions, SubClientOptions, } from '../bigquery'; -import {QueryJob} from './job'; -import {CallOptions} from 'google-gax'; +import {QueryJob, CallOptions} from './job'; import {protos} from '../'; import {queryFromSQL as builderQueryFromSQL} from './builder'; import {QueryReader} from './reader'; @@ -28,8 +27,8 @@ import {QueryReader} from './reader'; */ export class QueryClient { private client: BigQueryClient; - private projectID: string; - private billingProjectID: string; + projectId: string; + private billingProjectId: string; /** * @param {BigQueryClientOptions} options - The configuration object. @@ -39,18 +38,18 @@ export class QueryClient { subClientOptions?: SubClientOptions, ) { this.client = new BigQueryClient(options, subClientOptions); - this.projectID = ''; - this.billingProjectID = ''; + this.projectId = ''; + this.billingProjectId = ''; void this.client.jobClient.getProjectId().then(projectId => { - this.projectID = projectId; - if (this.billingProjectID !== '') { - this.billingProjectID = projectId; + this.projectId = projectId; + if (this.billingProjectId !== '') { + this.billingProjectId = projectId; } }); } - setBillingProjectID(projectID: string) { - this.billingProjectID = projectID; + setBillingProjectId(projectId: string) { + this.billingProjectId = projectId; } /** @@ -59,7 +58,7 @@ export class QueryClient { * @returns {protos.google.cloud.bigquery.v2.IPostQueryRequest} */ queryFromSQL(sql: string): protos.google.cloud.bigquery.v2.IPostQueryRequest { - const req = builderQueryFromSQL(this.projectID, sql); + const req = builderQueryFromSQL(this.projectId, sql); return req; } @@ -104,7 +103,7 @@ export class QueryClient { return this.startQuery( { queryRequest: request, - projectId: this.projectID, + projectId: this.projectId, }, options, ); @@ -125,7 +124,7 @@ export class QueryClient { ): Promise { const [response] = await this.client.jobClient.insertJob( { - projectId: this.projectID, + projectId: this.projectId, job, }, options, @@ -133,23 +132,7 @@ export class QueryClient { return new QueryJob(this, {jobReference: response.jobReference}); } - /** - * Gets the results of a query job. - * - * @param {protos.google.cloud.bigquery.v2.IGetQueryResultsRequest} request - * The request object that will be sent. - * @param {CallOptions} [options] - * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. - * @returns {Promise} - */ - async _getQueryResults( - request: protos.google.cloud.bigquery.v2.IGetQueryResultsRequest, - options?: CallOptions, - ): Promise<[protos.google.cloud.bigquery.v2.IGetQueryResultsResponse]> { - const [response] = await this.client.jobClient.getQueryResults( - request, - options, - ); - return [response]; + getBigQueryClient(): BigQueryClient { + return this.client; } } diff --git a/src/query/format.ts b/src/query/format.ts index bd600d50e..109476323 100644 --- a/src/query/format.ts +++ b/src/query/format.ts @@ -18,10 +18,10 @@ export function civilDateString(d: Date): string { export function civilTimeString(value: string | Date): string { if (value instanceof Date) { - const h = `${value.getHours()}`.padStart(2, "0"); - const m = `${value.getMinutes()}`.padStart(2, "0"); - const s = `${value.getSeconds()}`.padStart(2, "0"); - const f = `${value.getMilliseconds()}`.padStart(3, "0"); + const h = `${value.getHours()}`.padStart(2, '0'); + const m = `${value.getMinutes()}`.padStart(2, '0'); + const s = `${value.getSeconds()}`.padStart(2, '0'); + const f = `${value.getMilliseconds()}`.padStart(3, '0'); return `${h}:${m}:${s}.${f}`; } return value; @@ -33,9 +33,9 @@ export function civilDateTimeString(value: Date | string): string { if (value.getHours()) { time = civilTimeString(value); } - const y = `${value.getFullYear()}`.padStart(2, "0"); - const m = `${value.getMonth()+1}`.padStart(2, "0"); - const d = `${value.getDate()}`.padStart(2, "0"); + const y = `${value.getFullYear()}`.padStart(2, '0'); + const m = `${value.getMonth() + 1}`.padStart(2, '0'); + const d = `${value.getDate()}`.padStart(2, '0'); time = time ? ' ' + time : ''; return `${y}-${m}-${d}${time}`; } diff --git a/src/query/iterator.ts b/src/query/iterator.ts index 76049a3cc..d94230e29 100644 --- a/src/query/iterator.ts +++ b/src/query/iterator.ts @@ -36,7 +36,7 @@ export class RowIterator { } async fetchRows() { - const [rows, _, pageToken] = await this.job.getRows(this.pageToken); + const [rows, _, pageToken] = await this.job._getRows(this.pageToken); this.rows = rows; this.pageToken = pageToken || undefined; } diff --git a/src/query/job.ts b/src/query/job.ts index 6f6d3a79d..5dd88febc 100644 --- a/src/query/job.ts +++ b/src/query/job.ts @@ -18,7 +18,11 @@ import {RowIterator} from './iterator'; import {Row} from './row'; import {Schema} from './schema'; import {convertRows} from './value'; -import {CallOptions} from 'google-gax'; +import {CallOptions as GaxCallOptions} from 'google-gax'; + +export interface CallOptions extends GaxCallOptions { + signal?: AbortSignal; +} /** * Query represents a query job. @@ -55,15 +59,16 @@ export class QueryJob { rows: response.rows, }); + this.jobID = ''; + this.location = response.location ?? ''; + this.projectID = ''; if (response.jobReference) { this.projectID = response.jobReference.projectId!; this.jobID = response.jobReference.jobId!; this.location = response.jobReference.location?.value || ''; - } else { - // This should not happen, but we need to initialize the properties. - this.projectID = ''; - this.jobID = ''; - this.location = ''; + } + if (response.queryId) { + this.jobID = response.queryId; } } @@ -86,14 +91,29 @@ export class QueryJob { * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. */ async wait(options?: CallOptions): Promise { + const signal = options?.signal; while (!this.complete) { + if (signal?.aborted) { + throw new Error('The operation was aborted.'); + } await this.checkStatus(options); if (!this.complete) { - await new Promise(resolve => setTimeout(resolve, 1000)); // TODO: exponential backoff + await this.waitFor(signal); } } } + private async waitFor(signal?: AbortSignal): Promise { + const delay = 1000; // TODO: backoff settings + return new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, delay); + signal?.addEventListener('abort', () => { + clearTimeout(timeout); + reject(new Error('The operation was aborted.')); + }); + }); + } + /** * Returns a RowIterator for the query results. * @@ -107,12 +127,16 @@ export class QueryJob { return it; } - async getRows( + /** + * @internal + */ + async _getRows( pageToken?: string, ): Promise< [Row[], protos.google.cloud.bigquery.v2.ITableSchema | null, string | null] > { - const [response] = await this.client._getQueryResults({ + const {jobClient} = this.client.getBigQueryClient(); + const [response] = await jobClient.getQueryResults({ projectId: this.projectID, jobId: this.jobID, location: this.location, @@ -140,7 +164,8 @@ export class QueryJob { } private async checkStatus(options?: CallOptions): Promise { - const [response] = await this.client._getQueryResults( + const {jobClient} = this.client.getBigQueryClient(); + const [response] = await jobClient.getQueryResults( { projectId: this.projectID, jobId: this.jobID, diff --git a/system-test/query/query.ts b/system-test/query/query.ts index 291a5def5..33b6a9028 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -13,18 +13,34 @@ // limitations under the License. import * as assert from 'assert'; +import * as sinon from 'sinon'; import {describe, it} from 'mocha'; import {query, protos} from '../../src'; +const sleep = (ms: number) => + new Promise(resolve => { + setTimeout(resolve, ms); + }); + describe('Run Query', () => { const client = new query.QueryClient(); + let getQueryResultsSpy: sinon.SinonSpy; + + beforeEach(() => { + const {jobClient} = client.getBigQueryClient(); + getQueryResultsSpy = sinon.spy(jobClient, 'getQueryResults'); + }); + + afterEach(() => { + getQueryResultsSpy.restore(); + }); it('should run a stateless query', async () => { const req = client.queryFromSQL( 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', ); req.queryRequest!.jobCreationMode = - protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_OPTIONAL; + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_OPTIONAL; const q = await client.startQuery(req); await q.wait(); @@ -37,12 +53,35 @@ describe('Run Query', () => { assert.strictEqual(rows.length, 1); }); + it('should stop waiting for query to complete', async () => { + const req = client.queryFromSQL( + 'SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num', + ); + req.queryRequest!.useQueryCache = {value: false}; + req.queryRequest!.jobCreationMode = + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED; + req.queryRequest!.timeoutMs = {value: 500}; + + const abortCtrl = new AbortController(); + const q = await client.startQuery(req); + q.wait({ + signal: abortCtrl.signal, + }).catch(err => { + assert(err, 'aborted'); + }); + await sleep(1000); + abortCtrl.abort(); + + assert(getQueryResultsSpy.callCount >= 1); + assert(getQueryResultsSpy.callCount <= 2); + }).timeout(5000); + it('should read a query job without cache', async () => { const req = client.queryFromSQL( 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', ); req.queryRequest!.jobCreationMode = - protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED; + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED; const q = await client.startQuery(req); await q.wait(); From 8bafda6578c4ff2dd796532a6d64e108cb429ff0 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 8 Jul 2025 17:14:26 -0400 Subject: [PATCH 3/6] feat: run tests with both rest and grpc transports --- src/query/client.ts | 40 +++++++++++++++++++++++----- src/query/job.ts | 26 +++++++++--------- system-test/fixtures/transport.ts | 44 +++++++++++++++++++++++++++++++ system-test/query/query.ts | 9 ++++--- system-test/query/value.ts | 6 ++--- 5 files changed, 98 insertions(+), 27 deletions(-) create mode 100644 system-test/fixtures/transport.ts diff --git a/src/query/client.ts b/src/query/client.ts index 785c02fdb..db995a068 100644 --- a/src/query/client.ts +++ b/src/query/client.ts @@ -40,14 +40,42 @@ export class QueryClient { this.client = new BigQueryClient(options, subClientOptions); this.projectId = ''; this.billingProjectId = ''; - void this.client.jobClient.getProjectId().then(projectId => { - this.projectId = projectId; - if (this.billingProjectId !== '') { - this.billingProjectId = projectId; - } - }); + void this.initialize(); } + async getProjectId(): Promise { + if (this.projectId) { + return this.projectId; + } + const {jobClient} = this.getBigQueryClient(); + const projectId = await jobClient.getProjectId(); + this.projectId = projectId; + return projectId; + } + /** + * Initialize the client. + * Performs asynchronous operations (such as authentication) and prepares the client. + * This function will be called automatically when any class method is called for the + * first time, but if you need to initialize it before calling an actual method, + * feel free to call initialize() directly. + * + * You can await on this method if you want to make sure the client is initialized. + * + * @returns {Promise} A promise that resolves when auth is complete. + */ + initialize = async (): Promise => { + if (this.projectId) { + return; + } + const {jobClient} = this.getBigQueryClient(); + await jobClient.initialize(); + const projectId = await this.getProjectId(); + this.projectId = projectId; + if (this.billingProjectId !== '') { + this.billingProjectId = projectId; + } + }; + setBillingProjectId(projectId: string) { this.billingProjectId = projectId; } diff --git a/src/query/job.ts b/src/query/job.ts index 5dd88febc..437d50b69 100644 --- a/src/query/job.ts +++ b/src/query/job.ts @@ -30,8 +30,8 @@ export interface CallOptions extends GaxCallOptions { export class QueryJob { private client: QueryClient; private complete: boolean; - private projectID: string; - private jobID: string; + private projectId: string; + private jobId: string; private location: string; private cachedRows: Row[]; @@ -59,23 +59,23 @@ export class QueryJob { rows: response.rows, }); - this.jobID = ''; + this.jobId = ''; this.location = response.location ?? ''; - this.projectID = ''; + this.projectId = ''; if (response.jobReference) { - this.projectID = response.jobReference.projectId!; - this.jobID = response.jobReference.jobId!; + this.projectId = response.jobReference.projectId!; + this.jobId = response.jobReference.jobId!; this.location = response.jobReference.location?.value || ''; } if (response.queryId) { - this.jobID = response.queryId; + this.jobId = response.queryId; } } jobReference(): protos.google.cloud.bigquery.v2.IJobReference { return { - jobId: this.jobID, - projectId: this.projectID, + jobId: this.jobId, + projectId: this.projectId, location: {value: this.location}, }; } @@ -137,8 +137,8 @@ export class QueryJob { > { const {jobClient} = this.client.getBigQueryClient(); const [response] = await jobClient.getQueryResults({ - projectId: this.projectID, - jobId: this.jobID, + projectId: this.projectId, + jobId: this.jobId, location: this.location, pageToken, formatOptions: { @@ -167,8 +167,8 @@ export class QueryJob { const {jobClient} = this.client.getBigQueryClient(); const [response] = await jobClient.getQueryResults( { - projectId: this.projectID, - jobId: this.jobID, + projectId: this.projectId, + jobId: this.jobId, location: this.location, maxResults: {value: 0}, formatOptions: { diff --git a/system-test/fixtures/transport.ts b/system-test/fixtures/transport.ts new file mode 100644 index 000000000..4b1ceb246 --- /dev/null +++ b/system-test/fixtures/transport.ts @@ -0,0 +1,44 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe} from 'mocha'; +import {query} from '../../src'; +import { QueryClient } from '../../src/query'; + +export const describeWithBothTransports = (title: string, fn: (client: QueryClient) => void) => { + describe(title, () => { + describe("REST", () => { + const client = new query.QueryClient({}, { + opts: { + fallback: true, + } + }); + before(async () => { + await client.initialize(); + }); + fn(client); + }); + describe("GRPC", () => { + const client = new query.QueryClient({}, { + opts: { + fallback: false, + } + }); + before(async () => { + await client.initialize(); + }); + fn(client); + }); + }); +} \ No newline at end of file diff --git a/system-test/query/query.ts b/system-test/query/query.ts index 33b6a9028..34524e30e 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -14,16 +14,17 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; -import {describe, it} from 'mocha'; -import {query, protos} from '../../src'; +import {it} from 'mocha'; + +import {protos} from '../../src'; +import {describeWithBothTransports} from '../fixtures/transport'; const sleep = (ms: number) => new Promise(resolve => { setTimeout(resolve, ms); }); -describe('Run Query', () => { - const client = new query.QueryClient(); +describeWithBothTransports('Run Query', client => { let getQueryResultsSpy: sinon.SinonSpy; beforeEach(() => { diff --git a/system-test/query/value.ts b/system-test/query/value.ts index 0e4e1a563..0f672b262 100644 --- a/system-test/query/value.ts +++ b/system-test/query/value.ts @@ -14,13 +14,11 @@ import * as assert from 'assert'; import {describe, it} from 'mocha'; -import {query} from '../../src'; import {queryParameterTestCases} from '../fixtures/query'; +import {describeWithBothTransports} from '../fixtures/transport'; -describe('Read Query Values', () => { - const client = new query.QueryClient(); - +describeWithBothTransports('Read Query Values', client => { describe('types', () => { for (const tc of queryParameterTestCases()) { it(tc.name, async () => { From 46edd398ff62f3657240b52d986cb7d03c8bfb82 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 21 Jul 2025 16:52:24 -0400 Subject: [PATCH 4/6] feat: use structpb more directly --- src/query/format.ts | 4 +- src/query/row.ts | 103 ++++++++++++++++++++++++++-------- src/query/value.ts | 51 +++++++++-------- system-test/fixtures/query.ts | 24 ++++---- 4 files changed, 121 insertions(+), 61 deletions(-) diff --git a/src/query/format.ts b/src/query/format.ts index 109476323..420f15fc5 100644 --- a/src/query/format.ts +++ b/src/query/format.ts @@ -21,7 +21,7 @@ export function civilTimeString(value: string | Date): string { const h = `${value.getHours()}`.padStart(2, '0'); const m = `${value.getMinutes()}`.padStart(2, '0'); const s = `${value.getSeconds()}`.padStart(2, '0'); - const f = `${value.getMilliseconds()}`.padStart(3, '0'); + const f = `${value.getMilliseconds() * 1000}`.padStart(6, '0'); return `${h}:${m}:${s}.${f}`; } return value; @@ -36,7 +36,7 @@ export function civilDateTimeString(value: Date | string): string { const y = `${value.getFullYear()}`.padStart(2, '0'); const m = `${value.getMonth() + 1}`.padStart(2, '0'); const d = `${value.getDate()}`.padStart(2, '0'); - time = time ? ' ' + time : ''; + time = time ? 'T' + time : ''; return `${y}-${m}-${d}${time}`; } return value.replace(/^(.*)T(.*)Z$/, '$1 $2'); diff --git a/src/query/row.ts b/src/query/row.ts index ba7f023d1..fc0568e05 100644 --- a/src/query/row.ts +++ b/src/query/row.ts @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +import {protos} from '../'; import {Schema} from './schema'; import {Value} from './value'; @@ -20,50 +21,106 @@ import {Value} from './value'; */ export class Row { private schema: Schema; - private value: {[key: string]: Value}; + private value: protos.google.protobuf.Struct; constructor(schema: Schema) { this.schema = schema; - this.value = {}; + this.value = protos.google.protobuf.Struct.create({ + fields: this.schema.pb.fields?.reduce( + (fields, f) => { + fields[f.name!] = {}; + return fields; + }, + {} as {[key: string]: protos.google.protobuf.IValue}, + ), + }); } set(columnName: string, value: Value) { - this.value[columnName] = value; + if (this.value.fields) { + this.value.fields[columnName] = value; + } } /** * toJSON returns the row as a JSON object. */ - toJSON(): {[key: string]: Value} { - const values: {[key: string]: Value} = {}; + toJSON(): {[key: string]: any} { + const value: {[key: string]: any} = {}; for (const field of this.schema.pb.fields!) { - let fval = this.value[field.name!]; - if (fval instanceof Row) { - fval = fval.toJSON(); - } - if (Array.isArray(fval)) { - fval = fval.map(v => (v instanceof Row ? v.toJSON() : v)); + const fieldValue = this.value.fields[field.name!]; + value[field.name!] = this.fieldValueToJSON(field, fieldValue); + } + return value; + } + + private fieldValueToJSON( + field: protos.google.cloud.bigquery.v2.ITableFieldSchema, + value: protos.google.protobuf.IValue, + ): any { + if (value.structValue) { + const subrow = new Row(Schema.fromField(field)); + subrow.value = protos.google.protobuf.Struct.create(value.structValue); + return subrow.toJSON(); + } else if (value.listValue) { + const arr: any[] = []; + for (const row of value.listValue.values ?? []) { + const subvalue = this.fieldValueToJSON(field, row); + arr.push(subvalue); } - values[field.name!] = fval; + return arr; + } else if (value.nullValue) { + return null; + } else if (value.numberValue) { + return value.numberValue; + } else if (value.boolValue) { + return value.boolValue; } - return values; + return value.stringValue; + } + + /** + * toStruct returns the row as a protobuf Struct object. + */ + toStruct(): protos.google.protobuf.IStruct { + return this.value; } /** * toValues encodes the row into an array of Value. */ - toValues(): Value[] { - const values: Value[] = []; + toValues(): any[] { + const values: any[] = []; for (const field of this.schema.pb.fields!) { - let v = this.value[field.name!]; - if (v instanceof Row) { - v = v.toValues(); - } - if (Array.isArray(v)) { - v = v.map(r => (r instanceof Row ? r.toValues() : r)); - } - values.push(v); + const fieldValue = this.value.fields[field.name!]; + const value = this.fieldValueToValues(field, fieldValue); + values.push(value); } return values; } + + private fieldValueToValues( + field: protos.google.cloud.bigquery.v2.ITableFieldSchema, + value: protos.google.protobuf.IValue, + ): any { + if (value.structValue) { + const subrow = new Row(Schema.fromField(field)); + subrow.value = protos.google.protobuf.Struct.create(value.structValue); + return subrow.toValues(); + } else if (value.listValue) { + const arr: any[] = []; + for (const row of value.listValue.values ?? []) { + const subvalue = this.fieldValueToValues(field, row); + arr.push(subvalue); + } + return arr; + } else if (value.nullValue) { + return null; + } else if (value.numberValue) { + return value.numberValue; + } else if (value.boolValue) { + return value.boolValue; + } + return value.stringValue; + } } diff --git a/src/query/value.ts b/src/query/value.ts index 00d6c7af0..80a3d899e 100644 --- a/src/query/value.ts +++ b/src/query/value.ts @@ -15,9 +15,8 @@ import {protos} from '../'; import {Row} from './row'; import {Schema} from './schema'; -import * as Big from 'big.js'; -export type Value = any; +export type Value = protos.google.protobuf.IValue; export function convertRows( rows: protos.google.protobuf.IStruct[], @@ -52,7 +51,9 @@ function convertValue( schema: Schema, ): Value { if (val.nullValue !== undefined && val.nullValue !== null) { - return null; + return { + nullValue: 'NULL_VALUE', + }; } if (val.listValue) { return convertRepeatedRecord(val.listValue, typ, schema); @@ -70,18 +71,25 @@ function convertRepeatedRecord( vals: protos.google.protobuf.IListValue, typ: string, schema: Schema, -): Value[] { - return vals.values!.map(cell => { - const val = getFieldValue(cell); - return convertValue(val, typ, schema); - }); +): Value { + return { + listValue: { + values: vals.values!.map(cell => { + const val = getFieldValue(cell); + return convertValue(val, typ, schema); + }), + }, + }; } function convertNestedRecord( val: protos.google.protobuf.IStruct, schema: Schema, -): Row { - return convertRow(val, schema); +): Value { + const row = convertRow(val, schema); + return { + structValue: row.toStruct(), + }; } function convertBasicType(val: string, typ: string): Value { @@ -89,27 +97,22 @@ function convertBasicType(val: string, typ: string): Value { case 'STRING': case 'GEOGRAPHY': case 'JSON': - return val; - case 'BYTES': - return Buffer.from(val, 'base64'); - case 'INTEGER': - return parseInt(val, 10); - case 'FLOAT': - return parseFloat(val); - case 'BOOLEAN': - return val.toLowerCase() === 'true'; case 'TIMESTAMP': - return new Date(parseInt(val, 10) / 1000); case 'DATE': - return new Date(val); case 'TIME': - return val; case 'DATETIME': - return new Date(val); case 'NUMERIC': case 'BIGNUMERIC': - return Big(val); case 'INTERVAL': + return {stringValue: val}; + case 'BYTES': + return {stringValue: val}; + case 'INTEGER': + return {numberValue: parseInt(val, 10)}; + case 'FLOAT': + return {numberValue: parseFloat(val)}; + case 'BOOLEAN': + return {boolValue: val.toLowerCase() === 'true'}; default: throw new Error(`Unsupported type: ${typ}`); } diff --git a/system-test/fixtures/query.ts b/system-test/fixtures/query.ts index 259262746..a25a04795 100644 --- a/system-test/fixtures/query.ts +++ b/system-test/fixtures/query.ts @@ -103,8 +103,8 @@ export function queryParameterTestCases(): QueryParameterTestCase[] { }, }, ], - wantRowJSON: {f0_: Buffer.from('foo')}, - wantRowValues: [Buffer.from('foo')], + wantRowJSON: {f0_: Buffer.from('foo').toString('base64')}, + wantRowValues: [Buffer.from('foo').toString('base64')], }, { name: 'TimestampParam', @@ -116,8 +116,8 @@ export function queryParameterTestCases(): QueryParameterTestCase[] { parameterValue: {value: {value:timestampString(ts)}}, }, ], - wantRowJSON: {f0_: ts}, - wantRowValues: [ts], + wantRowJSON: {f0_: String(ts.valueOf() * 1000)}, + wantRowValues: [String(ts.valueOf() * 1000)], }, { name: 'TimestampArrayParam', @@ -136,8 +136,8 @@ export function queryParameterTestCases(): QueryParameterTestCase[] { }, }, ], - wantRowJSON: {f0_: [ts, ts]}, - wantRowValues: [[ts, ts]], + wantRowJSON: {f0_: [String(ts.valueOf() * 1000), String(ts.valueOf() * 1000)]}, + wantRowValues: [[String(ts.valueOf() * 1000), String(ts.valueOf() * 1000)]], }, { name: 'DatetimeParam', @@ -149,8 +149,8 @@ export function queryParameterTestCases(): QueryParameterTestCase[] { parameterValue: {value: {value: civilDateTimeString(dtm)}}, }, ], - wantRowJSON: {f0_: dtm}, - wantRowValues: [dtm], + wantRowJSON: {f0_: civilDateTimeString(dtm)}, + wantRowValues: [civilDateTimeString(dtm)], }, { name: 'DateParam', @@ -162,8 +162,8 @@ export function queryParameterTestCases(): QueryParameterTestCase[] { parameterValue: {value: {value:civilDateString(d)}}, }, ], - wantRowJSON: {f0_: d}, - wantRowValues: [d], + wantRowJSON: {f0_: civilDateString(d)}, + wantRowValues: [civilDateString(d)], }, { name: 'TimeParam', @@ -289,7 +289,7 @@ export function queryParameterTestCases(): QueryParameterTestCase[] { ], wantRowJSON: { f0_: { - Datetime: dtm, + Datetime: civilDateTimeString(dtm), StringArray: ['a', 'b'], SubStruct: { String: 'c', @@ -305,7 +305,7 @@ export function queryParameterTestCases(): QueryParameterTestCase[] { }, }, wantRowValues: [ - [dtm, ['a', 'b'], ['c'], [['d'], ['e']]], + [civilDateTimeString(dtm), ['a', 'b'], ['c'], [['d'], ['e']]], ], }, ]; From 8c6287e171b0d850e7540b0829b44758ff0c0133 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 13 Aug 2025 14:14:44 -0400 Subject: [PATCH 5/6] feat: remove Reader and make .read async --- src/query/client.ts | 26 +++++++++----- src/query/index.ts | 1 - src/query/job.ts | 5 ++- src/query/reader.ts | 72 -------------------------------------- system-test/query/query.ts | 12 +++---- system-test/query/value.ts | 4 +-- 6 files changed, 29 insertions(+), 91 deletions(-) delete mode 100644 src/query/reader.ts diff --git a/src/query/client.ts b/src/query/client.ts index db995a068..fcf4fdb92 100644 --- a/src/query/client.ts +++ b/src/query/client.ts @@ -20,7 +20,6 @@ import { import {QueryJob, CallOptions} from './job'; import {protos} from '../'; import {queryFromSQL as builderQueryFromSQL} from './builder'; -import {QueryReader} from './reader'; /** * QueryClient is a client for running queries in BigQuery. @@ -90,14 +89,6 @@ export class QueryClient { return req; } - /** - * NewQueryReader creates a new QueryReader. - * @returns {QueryReader} - */ - newQueryReader(): QueryReader { - return new QueryReader(this); - } - /** * Runs a query and returns a QueryJob handle. * @@ -160,6 +151,23 @@ export class QueryClient { return new QueryJob(this, {jobReference: response.jobReference}); } + /** + * Create a managed QueryJob from a job reference. + * + * @param {protos.google.cloud.bigquery.v2.IJob} job + * A job resource to insert + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async attachJob( + jobRef: protos.google.cloud.bigquery.v2.IJobReference, + ): Promise { + return new QueryJob(this, { + jobReference: jobRef, + }); + } + getBigQueryClient(): BigQueryClient { return this.client; } diff --git a/src/query/index.ts b/src/query/index.ts index 6ef842ee3..0c14c10d1 100644 --- a/src/query/index.ts +++ b/src/query/index.ts @@ -17,4 +17,3 @@ export {QueryJob} from './job'; export {Row} from './row'; export {RowIterator} from './iterator'; export {queryFromSQL} from './builder'; -export {QueryReader, withPageToken} from './reader'; diff --git a/src/query/job.ts b/src/query/job.ts index 437d50b69..ff2a1f90a 100644 --- a/src/query/job.ts +++ b/src/query/job.ts @@ -119,11 +119,14 @@ export class QueryJob { * * @returns {RowIterator} */ - read(): RowIterator { + async read(): Promise { const it = new RowIterator(this, { pageToken: this.cachedPageToken, rows: this.cachedRows, }); + if (this.cachedRows.length === 0) { + await it.fetchRows(); + } return it; } diff --git a/src/query/reader.ts b/src/query/reader.ts deleted file mode 100644 index 7d567f9cf..000000000 --- a/src/query/reader.ts +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2025 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import {QueryClient} from './client'; -import {RowIterator} from './iterator'; -import {QueryJob} from './job'; -import {protos} from '../'; - -export interface ReadState { - pageToken?: string; -} - -export type ReadOption = (state: ReadState) => void; - -export function withPageToken(token: string): ReadOption { - return (state: ReadState) => { - state.pageToken = token; - }; -} - -/** - * QueryReader is used to read the results of a query. - */ -export class QueryReader { - private client: QueryClient; - - constructor(client: QueryClient) { - this.client = client; - } - - /** - * Read reads the results of a query job. - * @param {protos.google.cloud.bigquery.v2.IJobReference} jobRef The job reference. - * @param {protos.google.cloud.bigquery.v2.ITableSchema} schema The schema of the results. - * @param {ReadOption[]} opts The options for reading the results. - * @returns {Promise} - */ - async read( - jobRef: protos.google.cloud.bigquery.v2.IJobReference, - schema: protos.google.cloud.bigquery.v2.ITableSchema, - ...opts: ReadOption[] - ): Promise { - const query = new QueryJob(this.client, { - jobReference: jobRef, - schema, - }); - - const initState: ReadState = {}; - for (const opt of opts) { - opt(initState); - } - - const itOpts: {pageToken?: string} = {}; - if (initState.pageToken) { - itOpts.pageToken = initState.pageToken; - } - const it = new RowIterator(query, itOpts); - await it.fetchRows(); - return it; - } -} diff --git a/system-test/query/query.ts b/system-test/query/query.ts index 34524e30e..271815e39 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -46,7 +46,7 @@ describeWithBothTransports('Run Query', client => { const q = await client.startQuery(req); await q.wait(); - const it = q.read(); + const it = await q.read(); const rows = []; for await (const row of it) { rows.push(row.toJSON()); @@ -63,8 +63,8 @@ describeWithBothTransports('Run Query', client => { protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED; req.queryRequest!.timeoutMs = {value: 500}; - const abortCtrl = new AbortController(); const q = await client.startQuery(req); + const abortCtrl = new AbortController(); q.wait({ signal: abortCtrl.signal, }).catch(err => { @@ -84,13 +84,13 @@ describeWithBothTransports('Run Query', client => { req.queryRequest!.jobCreationMode = protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED; - const q = await client.startQuery(req); + let q = await client.startQuery(req); await q.wait(); - const reader = client.newQueryReader(); const jobRef = q.jobReference(); + q = await client.attachJob(jobRef); - const it = await reader.read(jobRef, q.schema); + const it = await q.read(); const rows = []; for await (const row of it) { rows.push(row.toJSON()); @@ -109,7 +109,7 @@ describeWithBothTransports('Run Query', client => { }); await q.wait(); - const it = q.read(); + const it = await q.read(); const rows = []; for await (const row of it) { rows.push(row); diff --git a/system-test/query/value.ts b/system-test/query/value.ts index 0f672b262..bf74ee6a3 100644 --- a/system-test/query/value.ts +++ b/system-test/query/value.ts @@ -28,7 +28,7 @@ describeWithBothTransports('Read Query Values', client => { const q = await client.startQuery(req); await q.wait(); - const it = q.read(); + const it = await q.read(); const rows = []; for await (const row of it) { rows.push(row); @@ -46,7 +46,7 @@ describeWithBothTransports('Read Query Values', client => { const q = await client.startQuery(req); await q.wait(); - const it = q.read(); + const it = await q.read(); const rows = []; for await (const row of it) { rows.push(row); From 123f39bf41442ad5fe02ff829029906784a8c24c Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 12 Sep 2025 11:33:12 -0400 Subject: [PATCH 6/6] feat: update constructors and add cancel method --- src/query/builder.ts | 4 ++-- src/query/client.ts | 12 +++++------- src/query/index.ts | 2 +- src/query/job.ts | 26 +++++++++++++++++++++++--- system-test/fixtures/transport.ts | 12 ++---------- system-test/query/query.ts | 6 +++--- system-test/query/value.ts | 4 ++-- 7 files changed, 38 insertions(+), 28 deletions(-) diff --git a/src/query/builder.ts b/src/query/builder.ts index c3c378f4f..d753fcf57 100644 --- a/src/query/builder.ts +++ b/src/query/builder.ts @@ -15,11 +15,11 @@ import {protos} from '../'; /** - * QueryFromSQL creates a query configuration from a SQL string. + * fromSQL creates a query configuration from a SQL string. * @param {string} sql The SQL query. * @returns {protos.google.cloud.bigquery.v2.IPostQueryRequest} */ -export function queryFromSQL( +export function fromSQL( projectId: string, sql: string, ): protos.google.cloud.bigquery.v2.IPostQueryRequest { diff --git a/src/query/client.ts b/src/query/client.ts index fcf4fdb92..e658263ab 100644 --- a/src/query/client.ts +++ b/src/query/client.ts @@ -15,11 +15,10 @@ import { BigQueryClient, BigQueryClientOptions, - SubClientOptions, } from '../bigquery'; import {QueryJob, CallOptions} from './job'; import {protos} from '../'; -import {queryFromSQL as builderQueryFromSQL} from './builder'; +import {fromSQL as builderFromSQL} from './builder'; /** * QueryClient is a client for running queries in BigQuery. @@ -34,9 +33,8 @@ export class QueryClient { */ constructor( options?: BigQueryClientOptions, - subClientOptions?: SubClientOptions, ) { - this.client = new BigQueryClient(options, subClientOptions); + this.client = new BigQueryClient(options); this.projectId = ''; this.billingProjectId = ''; void this.initialize(); @@ -80,12 +78,12 @@ export class QueryClient { } /** - * QueryFromSQL creates a query configuration from a SQL string. + * fromSQL creates a query configuration from a SQL string. * @param {string} sql The SQL query. * @returns {protos.google.cloud.bigquery.v2.IPostQueryRequest} */ - queryFromSQL(sql: string): protos.google.cloud.bigquery.v2.IPostQueryRequest { - const req = builderQueryFromSQL(this.projectId, sql); + fromSQL(sql: string): protos.google.cloud.bigquery.v2.IPostQueryRequest { + const req = builderFromSQL(this.projectId, sql); return req; } diff --git a/src/query/index.ts b/src/query/index.ts index 0c14c10d1..d63621472 100644 --- a/src/query/index.ts +++ b/src/query/index.ts @@ -16,4 +16,4 @@ export {QueryClient} from './client'; export {QueryJob} from './job'; export {Row} from './row'; export {RowIterator} from './iterator'; -export {queryFromSQL} from './builder'; +export {fromSQL} from './builder'; diff --git a/src/query/job.ts b/src/query/job.ts index ff2a1f90a..7ca175c6a 100644 --- a/src/query/job.ts +++ b/src/query/job.ts @@ -29,7 +29,7 @@ export interface CallOptions extends GaxCallOptions { */ export class QueryJob { private client: QueryClient; - private complete: boolean; + private jobComplete: boolean; private projectId: string; private jobId: string; private location: string; @@ -49,7 +49,7 @@ export class QueryJob { this.cachedSchema = {}; this.cachedPageToken = ''; this.cachedTotalRows = 0; - this.complete = false; + this.jobComplete = false; this.consumeQueryResponse({ jobComplete: response.jobComplete, @@ -84,6 +84,10 @@ export class QueryJob { return this.cachedSchema; } + get complete(): boolean { + return this.jobComplete + } + /** * Waits for the query to complete. * @@ -103,6 +107,22 @@ export class QueryJob { } } + /** + * Cancel a running query. + * + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + */ + async cancel(options?: CallOptions): Promise { + const {jobClient} = this.client.getBigQueryClient(); + const [response] = await jobClient.cancelJob({ + projectId: this.projectId, + jobId: this.jobId, + location: this.location, + }, options); + return response; + } + private async waitFor(signal?: AbortSignal): Promise { const delay = 1000; // TODO: backoff settings return new Promise((resolve, reject) => { @@ -156,7 +176,7 @@ export class QueryJob { private consumeQueryResponse( response: protos.google.cloud.bigquery.v2.IGetQueryResultsResponse, ) { - this.complete = response.jobComplete?.value ?? false; + this.jobComplete = response.jobComplete?.value ?? false; this.cachedSchema = response.schema!; this.cachedPageToken = response.pageToken!; this.cachedTotalRows = Number(response.totalRows); diff --git a/system-test/fixtures/transport.ts b/system-test/fixtures/transport.ts index 4b1ceb246..bc3f84257 100644 --- a/system-test/fixtures/transport.ts +++ b/system-test/fixtures/transport.ts @@ -19,22 +19,14 @@ import { QueryClient } from '../../src/query'; export const describeWithBothTransports = (title: string, fn: (client: QueryClient) => void) => { describe(title, () => { describe("REST", () => { - const client = new query.QueryClient({}, { - opts: { - fallback: true, - } - }); + const client = new query.QueryClient({ fallback: true }); before(async () => { await client.initialize(); }); fn(client); }); describe("GRPC", () => { - const client = new query.QueryClient({}, { - opts: { - fallback: false, - } - }); + const client = new query.QueryClient({ fallback: false }); before(async () => { await client.initialize(); }); diff --git a/system-test/query/query.ts b/system-test/query/query.ts index 271815e39..89e442017 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -37,7 +37,7 @@ describeWithBothTransports('Run Query', client => { }); it('should run a stateless query', async () => { - const req = client.queryFromSQL( + const req = client.fromSQL( 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', ); req.queryRequest!.jobCreationMode = @@ -55,7 +55,7 @@ describeWithBothTransports('Run Query', client => { }); it('should stop waiting for query to complete', async () => { - const req = client.queryFromSQL( + const req = client.fromSQL( 'SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num', ); req.queryRequest!.useQueryCache = {value: false}; @@ -78,7 +78,7 @@ describeWithBothTransports('Run Query', client => { }).timeout(5000); it('should read a query job without cache', async () => { - const req = client.queryFromSQL( + const req = client.fromSQL( 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', ); req.queryRequest!.jobCreationMode = diff --git a/system-test/query/value.ts b/system-test/query/value.ts index bf74ee6a3..57aa43f51 100644 --- a/system-test/query/value.ts +++ b/system-test/query/value.ts @@ -22,7 +22,7 @@ describeWithBothTransports('Read Query Values', client => { describe('types', () => { for (const tc of queryParameterTestCases()) { it(tc.name, async () => { - const req = client.queryFromSQL(tc.query); + const req = client.fromSQL(tc.query); req.queryRequest!.queryParameters = tc.parameters; const q = await client.startQuery(req); @@ -40,7 +40,7 @@ describeWithBothTransports('Read Query Values', client => { }); it('should read nested objects', async () => { - const req = client.queryFromSQL( + const req = client.fromSQL( "SELECT 40 as age, [STRUCT(STRUCT('1' as a, '2' as b) as object)] as nested", ); const q = await client.startQuery(req);