diff --git a/.changeset/gorgeous-adults-confess.md b/.changeset/gorgeous-adults-confess.md new file mode 100644 index 00000000..42b790ae --- /dev/null +++ b/.changeset/gorgeous-adults-confess.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-sync-rules': minor +'@powersync/service-image': minor +--- + +Add support for streams, a new and simpler way to define what data gets synced to clients. diff --git a/packages/sync-rules/src/BaseSqlDataQuery.ts b/packages/sync-rules/src/BaseSqlDataQuery.ts index 75d6fcf2..650494dd 100644 --- a/packages/sync-rules/src/BaseSqlDataQuery.ts +++ b/packages/sync-rules/src/BaseSqlDataQuery.ts @@ -4,14 +4,29 @@ import { ColumnDefinition } from './ExpressionType.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { SqlTools } from './sql_filters.js'; import { TablePattern } from './TablePattern.js'; -import { QueryParameters, QuerySchema, SourceSchema, SourceSchemaTable, SqliteJsonRow, SqliteRow } from './types.js'; +import { + EvaluationResult, + QueryParameters, + QuerySchema, + SourceSchema, + SourceSchemaTable, + SqliteJsonRow, + SqliteRow +} from './types.js'; import { filterJsonRow } from './utils.js'; +import { castAsText } from './sql_functions.js'; export interface RowValueExtractor { extract(tables: QueryParameters, into: SqliteRow): void; getTypes(schema: QuerySchema, into: Record): void; } +export interface EvaluateRowOptions { + table: SourceTableInterface; + row: SqliteRow; + bucketIds: (params: QueryParameters) => string[]; +} + export interface BaseSqlDataQueryOptions { sourceTable: TablePattern; table: string; @@ -21,7 +36,6 @@ export interface BaseSqlDataQueryOptions { descriptorName: string; bucketParameters: string[]; tools: SqlTools; - errors?: SqlRuleError[]; } @@ -149,6 +163,51 @@ export class BaseSqlDataQuery { return result; } + resolveResultSets(schema: SourceSchema, tables: Record>) { + const outTables = this.getColumnOutputs(schema); + for (let table of outTables) { + tables[table.name] ??= {}; + for (let column of table.columns) { + if (column.name != 'id') { + tables[table.name][column.name] ??= column; + } + } + } + } + + evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] { + try { + const { table, row, bucketIds } = options; + + const tables = { [this.table]: this.addSpecialParameters(table, row) }; + const resolvedBucketIds = bucketIds(tables); + + const data = this.transformRow(tables); + let id = data.id; + if (typeof id != 'string') { + // While an explicit cast would be better, this covers against very common + // issues when initially testing out sync, for example when the id column is an + // auto-incrementing integer. + // If there is no id column, we use a blank id. This will result in the user syncing + // a single arbitrary row for this table - better than just not being able to sync + // anything. + id = castAsText(id) ?? ''; + } + const outputTable = this.getOutputName(table.name); + + return resolvedBucketIds.map((bucketId) => { + return { + bucket: bucketId, + table: outputTable, + id: id, + data + } as EvaluationResult; + }); + } catch (e) { + return [{ error: e.message ?? `Evaluating data query failed` }]; + } + } + protected transformRow(tables: QueryParameters): SqliteJsonRow { let result: SqliteRow = {}; for (let extractor of this.extractors) { diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index 66ae8440..bc3e5987 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -201,15 +201,7 @@ export class SqlBucketDescriptor implements BucketSource { resolveResultSets(schema: SourceSchema, tables: Record>) { for (let query of this.dataQueries) { - const outTables = query.getColumnOutputs(schema); - for (let table of outTables) { - tables[table.name] ??= {}; - for (let column of table.columns) { - if (column.name != 'id') { - tables[table.name][column.name] ??= column; - } - } - } + query.resolveResultSets(schema, tables); } } @@ -229,7 +221,7 @@ export class SqlBucketDescriptor implements BucketSource { let all_data_queries = [...this.dataQueries.values()].flat(); return { name: this.name, - type: this.type.toString(), + type: BucketSourceType[this.type], bucket_parameters: this.bucketParameters, global_parameter_queries: this.globalParameterQueries.map((q) => { return { diff --git a/packages/sync-rules/src/SqlDataQuery.ts b/packages/sync-rules/src/SqlDataQuery.ts index 9f6f75c1..2a80f78e 100644 --- a/packages/sync-rules/src/SqlDataQuery.ts +++ b/packages/sync-rules/src/SqlDataQuery.ts @@ -186,36 +186,15 @@ export class SqlDataQuery extends BaseSqlDataQuery { } evaluateRow(table: SourceTableInterface, row: SqliteRow): EvaluationResult[] { - try { - const tables = { [this.table]: this.addSpecialParameters(table, row) }; - const bucketParameters = this.filter.filterRow(tables); - const bucketIds = bucketParameters.map((params) => - getBucketId(this.descriptorName, this.bucketParameters, params) - ); - - const data = this.transformRow(tables); - let id = data.id; - if (typeof id != 'string') { - // While an explicit cast would be better, this covers against very common - // issues when initially testing out sync, for example when the id column is an - // auto-incrementing integer. - // If there is no id column, we use a blank id. This will result in the user syncing - // a single arbitrary row for this table - better than just not being able to sync - // anything. - id = castAsText(id) ?? ''; + const query = this; + + return this.evaluateRowWithOptions({ + table, + row, + bucketIds(tables) { + const bucketParameters = query.filter.filterRow(tables); + return bucketParameters.map((params) => getBucketId(query.descriptorName, query.bucketParameters, params)); } - const outputTable = this.getOutputName(table.name); - - return bucketIds.map((bucketId) => { - return { - bucket: bucketId, - table: outputTable, - id: id, - data - } as EvaluationResult; - }); - } catch (e) { - return [{ error: e.message ?? `Evaluating data query failed` }]; - } + }); } } diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index fdf9cc5c..2aea4a34 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -22,9 +22,12 @@ import { SourceSchema, SqliteJsonRow, SqliteRow, + StreamParseOptions, SyncRules } from './types.js'; import { BucketSource } from './BucketSource.js'; +import { SyncStream } from './streams/stream.js'; +import { syncStreamFromSql } from './streams/from_sql.js'; const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES'); @@ -137,6 +140,7 @@ export class SqlSyncRules implements SyncRules { // Bucket definitions using explicit parameter and data queries. const bucketMap = parsed.get('bucket_definitions') as YAMLMap; + const streamMap = parsed.get('streams') as YAMLMap | null; const definitionNames = new Set(); const checkUniqueName = (name: string, literal: Scalar) => { if (definitionNames.has(name)) { @@ -148,8 +152,8 @@ export class SqlSyncRules implements SyncRules { return true; }; - if (bucketMap == null) { - rules.errors.push(new YamlError(new Error(`'bucket_definitions' is required`))); + if (bucketMap == null && streamMap == null) { + rules.errors.push(new YamlError(new Error(`'bucket_definitions' or 'streams' is required`))); if (throwOnError) { rules.throwOnError(); @@ -209,6 +213,39 @@ export class SqlSyncRules implements SyncRules { rules.bucketSources.push(descriptor); } + for (const entry of streamMap?.items ?? []) { + const { key: keyScalar, value } = entry as { key: Scalar; value: YAMLMap }; + const key = keyScalar.toString(); + if (!checkUniqueName(key, keyScalar)) { + continue; + } + + const accept_potentially_dangerous_queries = + value.get('accept_potentially_dangerous_queries', true)?.value == true; + + const queryOptions: StreamParseOptions = { + ...options, + accept_potentially_dangerous_queries, + priority: rules.parsePriority(value), + auto_subscribe: value.get('auto_subscribe', true)?.value == true + }; + + const data = value.get('query', true) as unknown; + if (data instanceof Scalar) { + rules.withScalar(data, (q) => { + const [parsed, errors] = syncStreamFromSql(key, q, queryOptions); + rules.bucketSources.push(parsed); + return { + parsed: true, + errors + }; + }); + } else { + rules.errors.push(this.tokenError(data, 'Must be a string.')); + continue; + } + } + const eventMap = parsed.get('event_definitions') as YAMLMap; for (const event of eventMap?.items ?? []) { const { key, value } = event as { key: Scalar; value: YAMLSeq }; diff --git a/packages/sync-rules/src/index.ts b/packages/sync-rules/src/index.ts index cac81ace..18734ff1 100644 --- a/packages/sync-rules/src/index.ts +++ b/packages/sync-rules/src/index.ts @@ -12,10 +12,13 @@ export * from './schema-generators/schema-generators.js'; export * from './SourceTableInterface.js'; export * from './sql_filters.js'; export * from './sql_functions.js'; +export { SqlBucketDescriptor } from './SqlBucketDescriptor.js'; export * from './SqlDataQuery.js'; export * from './SqlParameterQuery.js'; export * from './SqlSyncRules.js'; export * from './StaticSchema.js'; +export { SyncStream } from './streams/stream.js'; +export { syncStreamFromSql } from './streams/from_sql.js'; export * from './TablePattern.js'; export * from './types.js'; export * from './utils.js'; diff --git a/packages/sync-rules/src/json_schema.ts b/packages/sync-rules/src/json_schema.ts index 5d21169d..446db401 100644 --- a/packages/sync-rules/src/json_schema.ts +++ b/packages/sync-rules/src/json_schema.ts @@ -49,6 +49,36 @@ export const syncRulesSchema: ajvModule.Schema = { } } }, + streams: { + type: 'object', + description: 'List of stream definitions', + examples: [{ user_details: { query: 'select * from users where id = auth.user_id()' } }], + patternProperties: { + '.*': { + type: 'object', + required: ['query'], + examples: [{ query: ['select * from mytable'] }], + properties: { + accept_potentially_dangerous_queries: { + description: 'If true, disables warnings on potentially dangerous queries', + type: 'boolean' + }, + auto_subscribe: { + description: 'Whether clients will subscribe to this stream by default.', + type: 'boolean' + }, + priority: { + description: 'Priority for the bucket (lower values indicate higher priority).', + type: 'integer' + }, + query: { + description: 'The SQL query defining content to sync in this stream.', + type: 'string' + } + } + } + } + }, event_definitions: { type: 'object', description: 'Record of sync replication event definitions', @@ -79,7 +109,7 @@ export const syncRulesSchema: ajvModule.Schema = { } } }, - required: ['bucket_definitions'], + required: [], additionalProperties: false } as const; diff --git a/packages/sync-rules/src/request_functions.ts b/packages/sync-rules/src/request_functions.ts index fc840875..1daa9ed5 100644 --- a/packages/sync-rules/src/request_functions.ts +++ b/packages/sync-rules/src/request_functions.ts @@ -1,10 +1,12 @@ import { ExpressionType } from './ExpressionType.js'; +import { jsonExtract, jsonExtractFromRecord } from './sql_functions.js'; import { ParameterValueSet, SqliteValue } from './types.js'; export interface SqlParameterFunction { readonly debugName: string; - call: (parameters: ParameterValueSet) => SqliteValue; + call: (parameters: ParameterValueSet, ...args: SqliteValue[]) => SqliteValue; getReturnType(): ExpressionType; + parameterCount: number; /** request.user_id(), request.jwt(), token_parameters.* */ usesAuthenticatedRequestParameters: boolean; /** request.parameters(), user_parameters.* */ @@ -13,23 +15,83 @@ export interface SqlParameterFunction { documentation: string; } -const request_parameters: SqlParameterFunction = { - debugName: 'request.parameters', - call(parameters: ParameterValueSet) { - return parameters.rawUserParameters; - }, - getReturnType() { - return ExpressionType.TEXT; - }, - detail: 'Unauthenticated request parameters as JSON', - documentation: - 'Returns parameters passed by the client as a JSON string. These parameters are not authenticated - any value can be passed in by the client.', - usesAuthenticatedRequestParameters: false, - usesUnauthenticatedRequestParameters: true -}; +/** + * Defines a `parameters` function and a `parameter` function. + * + * The `parameters` function extracts a JSON object from the {@link ParameterValueSet} while the `parameter` function + * takes a second argument (a JSON path or a single key) to extract. + */ +export function parameterFunctions(options: { + schema: string; + extractJsonString: (v: ParameterValueSet) => string; + extractJsonParsed: (v: ParameterValueSet) => any; + sourceDescription: string; + sourceDocumentation: string; + usesAuthenticatedRequestParameters: boolean; + usesUnauthenticatedRequestParameters: boolean; +}) { + const allParameters: SqlParameterFunction = { + debugName: `${options.schema}.parameters`, + parameterCount: 0, + call(parameters: ParameterValueSet) { + return options.extractJsonString(parameters); + }, + getReturnType() { + return ExpressionType.TEXT; + }, + detail: options.sourceDescription, + documentation: `Returns ${options.sourceDocumentation}`, + usesAuthenticatedRequestParameters: options.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: options.usesUnauthenticatedRequestParameters + }; + + const extractParameter: SqlParameterFunction = { + debugName: `${options.schema}.parameter`, + parameterCount: 1, + call(parameters: ParameterValueSet, path) { + const parsed = options.extractJsonParsed(parameters); + if (typeof path == 'string') { + if (path.startsWith('$.')) { + return jsonExtractFromRecord(parsed, path, '->>'); + } else { + return parsed[path]; + } + } + + return null; + }, + getReturnType() { + return ExpressionType.ANY; + }, + detail: `Extract value from ${options.sourceDescription}`, + documentation: `Returns an extracted value (via the key as the second argument) from ${options.sourceDocumentation}`, + usesAuthenticatedRequestParameters: options.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: options.usesUnauthenticatedRequestParameters + }; + + return { parameters: allParameters, parameter: extractParameter }; +} + +export function globalRequestParameterFunctions(schema: string) { + return parameterFunctions({ + schema, + extractJsonString: function (v: ParameterValueSet): string { + return v.rawUserParameters; + }, + extractJsonParsed: function (v: ParameterValueSet) { + return v.userParameters; + }, + sourceDescription: 'Unauthenticated request parameters as JSON', + sourceDocumentation: + 'parameters passed by the client as a JSON string. These parameters are not authenticated - any value can be passed in by the client.', + usesAuthenticatedRequestParameters: false, + usesUnauthenticatedRequestParameters: true + }); +} -const request_jwt: SqlParameterFunction = { +export const request_jwt: SqlParameterFunction = { debugName: 'request.jwt', + parameterCount: 0, call(parameters: ParameterValueSet) { return parameters.rawTokenPayload; }, @@ -42,8 +104,9 @@ const request_jwt: SqlParameterFunction = { usesUnauthenticatedRequestParameters: false }; -const request_user_id: SqlParameterFunction = { +export const request_user_id: SqlParameterFunction = { debugName: 'request.user_id', + parameterCount: 0, call(parameters: ParameterValueSet) { return parameters.userId; }, @@ -56,8 +119,8 @@ const request_user_id: SqlParameterFunction = { usesUnauthenticatedRequestParameters: false }; -export const REQUEST_FUNCTIONS_NAMED = { - parameters: request_parameters, +const REQUEST_FUNCTIONS_NAMED = { + ...globalRequestParameterFunctions('request'), jwt: request_jwt, user_id: request_user_id }; diff --git a/packages/sync-rules/src/sql_filters.ts b/packages/sync-rules/src/sql_filters.ts index c35ce91c..2120ff3f 100644 --- a/packages/sync-rules/src/sql_filters.ts +++ b/packages/sync-rules/src/sql_filters.ts @@ -4,7 +4,7 @@ import { nil } from 'pgsql-ast-parser/src/utils.js'; import { BucketPriority, isValidPriority } from './BucketDescription.js'; import { ExpressionType } from './ExpressionType.js'; import { SqlRuleError } from './errors.js'; -import { REQUEST_FUNCTIONS } from './request_functions.js'; +import { REQUEST_FUNCTIONS, SqlParameterFunction } from './request_functions.js'; import { BASIC_OPERATORS, OPERATOR_IN, @@ -13,6 +13,7 @@ import { OPERATOR_JSON_EXTRACT_JSON, OPERATOR_JSON_EXTRACT_SQL, OPERATOR_NOT, + OPERATOR_OVERLAP, SQL_FUNCTIONS, SqlFunction, castOperator, @@ -46,6 +47,7 @@ import { TrueIfParametersMatch } from './types.js'; import { isJsonValue } from './utils.js'; +import { STREAM_FUNCTIONS } from './streams/functions.js'; export const MATCH_CONST_FALSE: TrueIfParametersMatch = []; export const MATCH_CONST_TRUE: TrueIfParametersMatch = [{}]; @@ -94,6 +96,11 @@ export interface SqlToolsOptions { */ supportsParameterExpressions?: boolean; + /** + * For each schema, all available parameter functions. + */ + parameterFunctions?: Record>; + /** * Schema for validations. */ @@ -113,6 +120,7 @@ export class SqlTools { readonly supportsExpandingParameters: boolean; readonly supportsParameterExpressions: boolean; + readonly parameterFunctions: Record>; schema?: QuerySchema; @@ -131,6 +139,7 @@ export class SqlTools { this.sql = options.sql; this.supportsExpandingParameters = options.supportsExpandingParameters ?? false; this.supportsParameterExpressions = options.supportsParameterExpressions ?? false; + this.parameterFunctions = options.parameterFunctions ?? { request: REQUEST_FUNCTIONS }; } error(message: string, expr: NodeLocation | Expr | undefined): ClauseError { @@ -270,30 +279,7 @@ export class SqlTools { // 1. row value = row value return compileStaticOperator(op, leftFilter as RowValueClause, rightFilter as RowValueClause); } else if (isParameterValueClause(otherFilter)) { - // 2. row value = parameter value - const inputParam = basicInputParameter(otherFilter); - - return { - error: false, - inputParameters: [inputParam], - unbounded: false, - filterRow(tables: QueryParameters): TrueIfParametersMatch { - const value = staticFilter.evaluate(tables); - if (value == null) { - // null never matches on = - // Should technically return null, but "false" is sufficient here - return MATCH_CONST_FALSE; - } - if (!isJsonValue(value)) { - // Cannot persist this, e.g. BLOB - return MATCH_CONST_FALSE; - } - - return [{ [inputParam.key]: value }]; - }, - usesAuthenticatedRequestParameters: otherFilter.usesAuthenticatedRequestParameters, - usesUnauthenticatedRequestParameters: otherFilter.usesUnauthenticatedRequestParameters - } satisfies ParameterMatchClause; + return this.parameterMatchClause(staticFilter, otherFilter); } else if (isParameterMatchClause(otherFilter)) { // 3. row value = parameterMatch // (bucket.param = 'something') = staticValue @@ -303,83 +289,7 @@ export class SqlTools { throw new Error('Unexpected'); } } else if (op == 'IN') { - // Special cases: - // parameterValue IN rowValue - // rowValue IN parameterValue - // All others are handled by standard function composition - - const composeType = this.getComposeType(OPERATOR_IN, [leftFilter, rightFilter], [left, right]); - if (composeType.errorClause != null) { - return composeType.errorClause; - } else if (composeType.argsType != null) { - // This is a standard supported configuration, takes precedence over - // the special cases below. - return this.composeFunction(OPERATOR_IN, [leftFilter, rightFilter], [left, right]); - } else if (isParameterValueClause(leftFilter) && isRowValueClause(rightFilter)) { - // token_parameters.value IN table.some_array - // bucket.param IN table.some_array - const inputParam = basicInputParameter(leftFilter); - - return { - error: false, - inputParameters: [inputParam], - unbounded: true, - filterRow(tables: QueryParameters): TrueIfParametersMatch { - const aValue = rightFilter.evaluate(tables); - if (aValue == null) { - return MATCH_CONST_FALSE; - } - const values = JSON.parse(aValue as string); - if (!Array.isArray(values)) { - throw new Error('Not an array'); - } - return values.map((value) => { - return { [inputParam.key]: value }; - }); - }, - usesAuthenticatedRequestParameters: leftFilter.usesAuthenticatedRequestParameters, - usesUnauthenticatedRequestParameters: leftFilter.usesUnauthenticatedRequestParameters - } satisfies ParameterMatchClause; - } else if ( - this.supportsExpandingParameters && - isRowValueClause(leftFilter) && - isParameterValueClause(rightFilter) - ) { - // table.some_value IN token_parameters.some_array - // This expands into "table_some_value = " for each value of the array. - // We only support one such filter per query - const key = `${rightFilter.key}[*]`; - - const inputParam: InputParameter = { - key: key, - expands: true, - filteredRowToLookupValue: (filterParameters) => { - return filterParameters[key]; - }, - parametersToLookupValue: (parameters) => { - return rightFilter.lookupParameterValue(parameters); - } - }; - - return { - error: false, - inputParameters: [inputParam], - unbounded: false, - filterRow(tables: QueryParameters): TrueIfParametersMatch { - const value = leftFilter.evaluate(tables); - if (!isJsonValue(value)) { - // Cannot persist, e.g. BLOB - return MATCH_CONST_FALSE; - } - return [{ [inputParam.key]: value }]; - }, - usesAuthenticatedRequestParameters: rightFilter.usesAuthenticatedRequestParameters, - usesUnauthenticatedRequestParameters: rightFilter.usesUnauthenticatedRequestParameters - } satisfies ParameterMatchClause; - } else { - // Not supported, return the error previously computed - return this.error(composeType.error!, composeType.errorExpr); - } + return this.compileInClause(left, leftFilter, right, rightFilter); } else if (BASIC_OPERATORS.has(op)) { const fnImpl = getOperatorFunction(op); return this.composeFunction(fnImpl, [leftFilter, rightFilter], [left, right]); @@ -402,6 +312,7 @@ export class SqlTools { } else if (expr.type == 'call' && expr.function?.name != null) { const schema = expr.function.schema; // schema.function() const fn = expr.function.name; + if (schema == null) { // Just fn() const fnImpl = SQL_FUNCTIONS[fn]; @@ -412,33 +323,50 @@ export class SqlTools { const argClauses = expr.args.map((arg) => this.compileClause(arg)); const composed = this.composeFunction(fnImpl, argClauses, expr.args); return composed; - } else if (schema == 'request') { - // Special function + } else if (schema in this.parameterFunctions) { if (!this.supportsParameterExpressions) { return this.error(`${schema} schema is not available in data queries`, expr); } - if (expr.args.length > 0) { - return this.error(`Function '${schema}.${fn}' does not take arguments`, expr); - } + const impl = this.parameterFunctions[schema][fn]; - if (fn in REQUEST_FUNCTIONS) { - const fnImpl = REQUEST_FUNCTIONS[fn]; + if (impl) { + if (expr.args.length != impl.parameterCount) { + return this.error(`Function '${schema}.${fn}' takes ${impl.parameterCount} arguments.`, expr); + } + + const compiledArguments = expr.args.map(this.compileClause); + let hasInvalidArgument = false; + for (let i = 0; i < expr.args.length; i++) { + const argument = compiledArguments[i]; + + if (!isParameterValueClause(argument)) { + hasInvalidArgument = true; + if (!isClauseError(argument)) { + this.error('Must only depend on data derived from request.', expr.args[i]); + } + } + } + + if (hasInvalidArgument) { + return { error: true }; + } + + const parameterArguments = compiledArguments as ParameterValueClause[]; return { - key: 'request.parameters()', + key: `${schema}.${fn}(${parameterArguments.map((p) => p.key).join(',')})`, lookupParameterValue(parameters) { - return fnImpl.call(parameters); + const evaluatedArgs = parameterArguments.map((p) => p.lookupParameterValue(parameters)); + return impl.call(parameters, ...evaluatedArgs); }, - usesAuthenticatedRequestParameters: fnImpl.usesAuthenticatedRequestParameters, - usesUnauthenticatedRequestParameters: fnImpl.usesUnauthenticatedRequestParameters + usesAuthenticatedRequestParameters: impl.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: impl.usesUnauthenticatedRequestParameters } satisfies ParameterValueClause; - } else { - return this.error(`Function '${schema}.${fn}' is not defined`, expr); } - } else { - // Unknown function with schema - return this.error(`Function '${schema}.${fn}' is not defined`, expr); } + + // Unknown function with schema + return this.error(`Function '${schema}.${fn}' is not defined`, expr); } else if (expr.type == 'member') { const operand = this.compileClause(expr.operand); @@ -466,6 +394,204 @@ export class SqlTools { } } + compileInClause(left: Expr, leftFilter: CompiledClause, right: Expr, rightFilter: CompiledClause): CompiledClause { + // Special cases: + // parameterValue IN rowValue + // rowValue IN parameterValue + // All others are handled by standard function composition + + const composeType = this.getComposeType(OPERATOR_IN, [leftFilter, rightFilter], [left, right]); + if (composeType.errorClause != null) { + return composeType.errorClause; + } else if (composeType.argsType != null) { + // This is a standard supported configuration, takes precedence over + // the special cases below. + return this.composeFunction(OPERATOR_IN, [leftFilter, rightFilter], [left, right]); + } else if (isParameterValueClause(leftFilter) && isRowValueClause(rightFilter)) { + // token_parameters.value IN table.some_array + // bucket.param IN table.some_array + const inputParam = basicInputParameter(leftFilter); + + return { + error: false, + inputParameters: [inputParam], + unbounded: true, + filterRow(tables: QueryParameters): TrueIfParametersMatch { + const aValue = rightFilter.evaluate(tables); + if (aValue == null) { + return MATCH_CONST_FALSE; + } + const values = JSON.parse(aValue as string); + if (!Array.isArray(values)) { + throw new Error('Not an array'); + } + return values.map((value) => { + return { [inputParam.key]: value }; + }); + }, + usesAuthenticatedRequestParameters: leftFilter.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: leftFilter.usesUnauthenticatedRequestParameters + } satisfies ParameterMatchClause; + } else if ( + this.supportsExpandingParameters && + isRowValueClause(leftFilter) && + isParameterValueClause(rightFilter) + ) { + // table.some_value IN token_parameters.some_array + // This expands into "table_some_value = " for each value of the array. + // We only support one such filter per query + const key = `${rightFilter.key}[*]`; + + const inputParam: InputParameter = { + key: key, + expands: true, + filteredRowToLookupValue: (filterParameters) => { + return filterParameters[key]; + }, + parametersToLookupValue: (parameters) => { + return rightFilter.lookupParameterValue(parameters); + } + }; + + return { + error: false, + inputParameters: [inputParam], + unbounded: false, + filterRow(tables: QueryParameters): TrueIfParametersMatch { + const value = leftFilter.evaluate(tables); + if (!isJsonValue(value)) { + // Cannot persist, e.g. BLOB + return MATCH_CONST_FALSE; + } + return [{ [inputParam.key]: value }]; + }, + usesAuthenticatedRequestParameters: rightFilter.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: rightFilter.usesUnauthenticatedRequestParameters + } satisfies ParameterMatchClause; + } else { + // Not supported, return the error previously computed + return this.error(composeType.error!, composeType.errorExpr); + } + } + + compileOverlapClause( + left: Expr, + leftFilter: CompiledClause, + right: Expr, + rightFilter: CompiledClause + ): CompiledClause { + // Special cases: + // parameterValue IN rowValue + // rowValue IN parameterValue + // All others are handled by standard function composition + + const composeType = this.getComposeType(OPERATOR_OVERLAP, [leftFilter, rightFilter], [left, right]); + if (composeType.errorClause != null) { + return composeType.errorClause; + } else if (composeType.argsType != null) { + // This is a standard supported configuration, takes precedence over + // the special cases below. + return this.composeFunction(OPERATOR_OVERLAP, [leftFilter, rightFilter], [left, right]); + } else if (isParameterValueClause(leftFilter) && isRowValueClause(rightFilter)) { + // token_parameters.value IN table.some_array + // bucket.param IN table.some_array + const inputParam = basicInputParameter(leftFilter); + + return { + error: false, + inputParameters: [inputParam], + unbounded: true, + filterRow(tables: QueryParameters): TrueIfParametersMatch { + const aValue = rightFilter.evaluate(tables); + if (aValue == null) { + return MATCH_CONST_FALSE; + } + const values = JSON.parse(aValue as string); + if (!Array.isArray(values)) { + throw new Error('Not an array'); + } + return values.map((value) => { + return { [inputParam.key]: value }; + }); + }, + usesAuthenticatedRequestParameters: leftFilter.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: leftFilter.usesUnauthenticatedRequestParameters + } satisfies ParameterMatchClause; + } else if ( + this.supportsExpandingParameters && + isRowValueClause(leftFilter) && + isParameterValueClause(rightFilter) + ) { + // table.some_value && token_parameters.some_array + // This expands into "OR(table_some_value = )" for each value of both arrays. + // We only support one such filter per query + const key = `${rightFilter.key}[*]`; + + const inputParam: InputParameter = { + key: key, + expands: true, + filteredRowToLookupValue: (filterParameters) => { + return filterParameters[key]; + }, + parametersToLookupValue: (parameters) => { + return rightFilter.lookupParameterValue(parameters); + } + }; + + return { + error: false, + inputParameters: [inputParam], + unbounded: false, + filterRow(tables: QueryParameters): TrueIfParametersMatch { + const value = leftFilter.evaluate(tables); + if (!isJsonValue(value)) { + // Cannot persist, e.g. BLOB + return MATCH_CONST_FALSE; + } + + const values = JSON.parse(value as string); + if (!Array.isArray(values)) { + throw new Error('Not an array'); + } + return values.map((value) => { + return { [inputParam.key]: value }; + }); + }, + usesAuthenticatedRequestParameters: rightFilter.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: rightFilter.usesUnauthenticatedRequestParameters + } satisfies ParameterMatchClause; + } else { + // Not supported, return the error previously computed + return this.error(composeType.error!, composeType.errorExpr); + } + } + + parameterMatchClause(staticFilter: RowValueClause, otherFilter: ParameterValueClause) { + const inputParam = basicInputParameter(otherFilter); + + return { + error: false, + inputParameters: [inputParam], + unbounded: false, + filterRow(tables: QueryParameters): TrueIfParametersMatch { + const value = staticFilter.evaluate(tables); + if (value == null) { + // null never matches on = + // Should technically return null, but "false" is sufficient here + return MATCH_CONST_FALSE; + } + if (!isJsonValue(value)) { + // Cannot persist this, e.g. BLOB + return MATCH_CONST_FALSE; + } + + return [{ [inputParam.key]: value }]; + }, + usesAuthenticatedRequestParameters: otherFilter.usesAuthenticatedRequestParameters, + usesUnauthenticatedRequestParameters: otherFilter.usesUnauthenticatedRequestParameters + } satisfies ParameterMatchClause; + } + /** * "some_column" => "some_column" * "table.some_column" => "some_column". diff --git a/packages/sync-rules/src/sql_functions.ts b/packages/sync-rules/src/sql_functions.ts index 340139cd..962e3111 100644 --- a/packages/sync-rules/src/sql_functions.ts +++ b/packages/sync-rules/src/sql_functions.ts @@ -1,6 +1,6 @@ import { JSONBig } from '@powersync/service-jsonbig'; import { SQLITE_FALSE, SQLITE_TRUE, sqliteBool, sqliteNot } from './sql_support.js'; -import { SqliteValue } from './types.js'; +import { SqliteRow, SqliteValue } from './types.js'; import { jsonValueToSqlite } from './utils.js'; // Declares @syncpoint/wkx module // This allows for consumers of this lib to resolve types correctly @@ -743,23 +743,47 @@ export function evaluateOperator(op: string, a: SqliteValue, b: SqliteValue): Sq return sqliteBool(sqliteBool(a) && sqliteBool(b)); case 'OR': return sqliteBool(sqliteBool(a) || sqliteBool(b)); - case 'IN': + case 'IN': { if (a == null || b == null) { return null; } - if (typeof b != 'string') { - throw new Error('IN is only supported on JSON arrays'); + const bParsed = checkJsonArray(b, 'IN is only supported on JSON arrays'); + return sqliteBool(bParsed.includes(a)); + } + case '&&': { + // a && b evaluates to true iff they're both arrays and have a non-empty intersection. + if (a == null || b == null) { + return null; } - const bParsed = JSON.parse(b); - if (!Array.isArray(bParsed)) { - throw new Error('IN is only supported on JSON arrays'); + + const aParsed = checkJsonArray(a, '&& is only supported on JSON arrays'); + const bParsed = checkJsonArray(a, '&& is only supported on JSON arrays'); + + for (const elementInA in aParsed) { + if (bParsed.includes(elementInA)) { + return sqliteBool(true); + } } - return sqliteBool(bParsed.includes(a)); + return sqliteBool(false); + } default: throw new Error(`Operator not supported: ${op}`); } } +export function checkJsonArray(value: SqliteValue, errorMessage: string): any[] { + if (typeof value != 'string') { + throw new Error(errorMessage); + } + + const parsed = JSON.parse(value); + if (!Array.isArray(parsed)) { + throw new Error(value); + } + + return parsed; +} + export function getOperatorReturnType(op: string, left: ExpressionType, right: ExpressionType) { switch (op) { case '=': @@ -799,6 +823,7 @@ export function getOperatorReturnType(op: string, left: ExpressionType, right: E case 'OR': return ExpressionType.INTEGER; case 'IN': + case '&&': return ExpressionType.INTEGER; default: return ExpressionType.NONE; @@ -845,8 +870,17 @@ function concat(a: SqliteValue, b: SqliteValue): string | null { export function jsonExtract(sourceValue: SqliteValue, path: SqliteValue, operator: string) { const valueText = castAsText(sourceValue); + if (valueText == null || path == null) { + return null; + } + + let value = JSONBig.parse(valueText) as any; + return jsonExtractFromRecord(value, path, operator); +} + +export function jsonExtractFromRecord(value: any, path: SqliteValue, operator: string) { const pathText = castAsText(path); - if (valueText == null || pathText == null) { + if (value == null || pathText == null) { return null; } @@ -857,7 +891,6 @@ export function jsonExtract(sourceValue: SqliteValue, path: SqliteValue, operato throw new Error(`JSON path must start with $.`); } - let value = JSONBig.parse(valueText) as any; for (let c of components) { if (value == null) { break; @@ -928,6 +961,8 @@ export const OPERATOR_NOT: SqlFunction = { export const OPERATOR_IN = getOperatorFunction('IN'); +export const OPERATOR_OVERLAP = getOperatorFunction('&&'); + export function castOperator(castTo: string | undefined): SqlFunction | null { if (castTo == null || !CAST_TYPES.has(castTo)) { return null; diff --git a/packages/sync-rules/src/sql_support.ts b/packages/sync-rules/src/sql_support.ts index b64aacd4..01e638d8 100644 --- a/packages/sync-rules/src/sql_support.ts +++ b/packages/sync-rules/src/sql_support.ts @@ -1,6 +1,6 @@ import { SelectFromStatement } from 'pgsql-ast-parser'; import { SqlRuleError } from './errors.js'; -import { ExpressionType } from './ExpressionType.js'; +import { ColumnDefinition, ExpressionType } from './ExpressionType.js'; import { MATCH_CONST_FALSE, MATCH_CONST_TRUE } from './sql_filters.js'; import { evaluateOperator, getOperatorReturnType } from './sql_functions.js'; import { @@ -10,7 +10,9 @@ import { InputParameter, ParameterMatchClause, ParameterValueClause, + ParameterValueSet, QueryParameters, + QuerySchema, RowValueClause, SqliteValue, StaticValueClause, @@ -59,6 +61,58 @@ export function sqliteNot(value: SqliteValue | boolean) { return sqliteBool(!sqliteBool(value)); } +/** + * Applies a combinator on row values that itself is also a row value. + */ +export function composeRowValues>(options: { + values: T; + compose: (values: { [K in keyof T]: SqliteValue }) => SqliteValue; + getColumnDefinition: RowValueClause['getColumnDefinition']; +}): RowValueClause { + return { + evaluate: function (tables: QueryParameters): SqliteValue { + const evaluated = Object.fromEntries( + Object.entries(options.values).map((e) => { + const [key, clause] = e; + return [key, clause.evaluate(tables)]; + }) + ) as { [K in keyof T]: SqliteValue }; + + return options.compose(evaluated); + }, + getColumnDefinition: function (schema: QuerySchema): ColumnDefinition | undefined { + return options.getColumnDefinition(schema); + } + }; +} + +/** + * Applies a combinator on parameter values that itself is also a parameter value. + */ +export function composeParameterValues>(options: { + values: T; + key: string; + compose: (values: { [K in keyof T]: SqliteValue }) => SqliteValue; +}): ParameterValueClause { + const entries = Object.entries(options.values); + + return { + usesAuthenticatedRequestParameters: entries.some((e) => e[1].usesAuthenticatedRequestParameters), + usesUnauthenticatedRequestParameters: entries.some((e) => e[1].usesUnauthenticatedRequestParameters), + key: `${options.key}${entries.map((e) => e[1].key).join(',')}`, + lookupParameterValue: function (parameters: ParameterValueSet): SqliteValue { + const evaluated = Object.fromEntries( + Object.entries(options.values).map((e) => { + const [key, clause] = e; + return [key, clause.lookupParameterValue(parameters)]; + }) + ) as { [K in keyof T]: SqliteValue }; + + return options.compose(evaluated); + } + }; +} + export function compileStaticOperator(op: string, left: RowValueClause, right: RowValueClause): RowValueClause { return { evaluate: (tables) => { @@ -79,18 +133,27 @@ export function compileStaticOperator(op: string, left: RowValueClause, right: R } export function andFilters(a: CompiledClause, b: CompiledClause): CompiledClause { + // Optimizations: If the two clauses both only depend on row or parameter data, we can merge them into a single + // clause. if (isRowValueClause(a) && isRowValueClause(b)) { - // Optimization - return { - evaluate(tables: QueryParameters): SqliteValue { - const aValue = sqliteBool(a.evaluate(tables)); - const bValue = sqliteBool(b.evaluate(tables)); - return sqliteBool(aValue && bValue); + return composeRowValues({ + values: { a, b }, + compose(values) { + return sqliteBool(sqliteBool(values.a) && sqliteBool(values.b)); }, getColumnDefinition() { return { name: 'and', type: ExpressionType.INTEGER }; } - } satisfies RowValueClause; + }); + } + if (isParameterValueClause(a) && isParameterValueClause(b)) { + return composeParameterValues({ + values: { a, b }, + key: 'and', + compose(values) { + return sqliteBool(sqliteBool(values.a) && sqliteBool(values.b)); + } + }); } const aFilter = toBooleanParameterSetClause(a); @@ -140,18 +203,27 @@ export function andFilters(a: CompiledClause, b: CompiledClause): CompiledClause } export function orFilters(a: CompiledClause, b: CompiledClause): CompiledClause { + // Optimizations: If the two clauses both only depend on row or parameter data, we can merge them into a single + // clause. if (isRowValueClause(a) && isRowValueClause(b)) { - // Optimization - return { - evaluate(tables: QueryParameters): SqliteValue { - const aValue = sqliteBool(a.evaluate(tables)); - const bValue = sqliteBool(b.evaluate(tables)); - return sqliteBool(aValue || bValue); + return composeRowValues({ + values: { a, b }, + compose(values) { + return sqliteBool(sqliteBool(values.a) || sqliteBool(values.b)); }, getColumnDefinition() { return { name: 'or', type: ExpressionType.INTEGER }; } - } satisfies RowValueClause; + }); + } + if (isParameterValueClause(a) && isParameterValueClause(b)) { + return composeParameterValues({ + values: { a, b }, + key: 'or', + compose(values) { + return sqliteBool(sqliteBool(values.a) || sqliteBool(values.b)); + } + }); } const aFilter = toBooleanParameterSetClause(a); diff --git a/packages/sync-rules/src/streams/filter.ts b/packages/sync-rules/src/streams/filter.ts new file mode 100644 index 00000000..05c6e34f --- /dev/null +++ b/packages/sync-rules/src/streams/filter.ts @@ -0,0 +1,512 @@ +import { isParameterValueClause, isRowValueClause, SQLITE_TRUE, sqliteBool } from '../sql_support.js'; +import { TablePattern } from '../TablePattern.js'; +import { ParameterMatchClause, ParameterValueClause, RowValueClause, SqliteJsonValue } from '../types.js'; +import { isJsonValue, normalizeParameterValue } from '../utils.js'; +import { SqlTools } from '../sql_filters.js'; +import { checkJsonArray, OPERATOR_NOT } from '../sql_functions.js'; +import { ParameterLookup } from '../BucketParameterQuerier.js'; + +import { StreamVariant } from './variant.js'; +import { SubqueryEvaluator } from './parameter.js'; +import { cartesianProduct } from './utils.js'; +import { NodeLocation } from 'pgsql-ast-parser'; + +/** + * An intermediate representation of a `WHERE` clause for stream queries. + * + * This representation is only used to compile expressions into the {@link StreamVariant}s representing streams in the + * end. + */ +export abstract class FilterOperator { + /** + * The syntactic source creating this filter operator. + */ + readonly location: NodeLocation | null; + + constructor(location: NodeLocation | null) { + this.location = location; + } + + abstract compile(context: StreamCompilationContext): void; + + compileVariants(streamName: string): StreamVariant[] { + const initialVariant = new StreamVariant(0); + const allVariants: StreamVariant[] = [initialVariant]; + + this.compile({ streamName, currentVariant: initialVariant, allVariants, queryCounter: 0 }); + return allVariants; + } + + /** + * Transforms this filter operator to a DNF representation, a {@link Or} operator consisting of {@link And} operators + * internally. + * + * We need to represent filters as a sum-of-products because that lets us turn each product into a + * {@link StreamVariant}, simplifying the rest of the compilation. + */ + toDisjunctiveNormalForm(tools: SqlTools): FilterOperator { + // https://en.wikipedia.org/wiki/Disjunctive_normal_form#..._by_syntactic_means + if (this instanceof Not) { + const inner = this.operand; + if (inner instanceof Not) { + // !!x => x + return inner.operand.toDisjunctiveNormalForm(tools); + } + + if (inner instanceof EvaluateSimpleCondition) { + // We can push the NOT into the simple condition to eliminate it. + return inner.negate(tools); + } + + let inverse; + if (inner instanceof And) { + inverse = Or; // !(a AND b) => (!a) OR (!b) + } else if (inner instanceof Or) { + inverse = And; // !(a OR B) => (!a) AND (!b) + } else { + return this; + } + + const terms = inner.inner.map((e) => new Not(e.location, e).toDisjunctiveNormalForm(tools)); + return new inverse(inner.location, ...terms); + } else if (this instanceof And) { + const atomarFactors: FilterOperator[] = []; + const orTerms: Or[] = []; + + for (const originalTerm of this.inner) { + const normalized = originalTerm.toDisjunctiveNormalForm(tools); + if (normalized instanceof And) { + atomarFactors.push(...normalized.inner); + } else if (normalized instanceof Or) { + orTerms.push(normalized); + } else { + atomarFactors.push(normalized); + } + } + + if (orTerms.length == 0) { + return new And(this.location, ...atomarFactors); + } + + // If there's an OR term within the AND, apply the distributive law to turn the term into an ANDs within an outer + // OR. + // First, apply distributive law on orTerms to turn e.g. [[a, b], [c, d]] into [ac, ad, bc, bd]. + const multiplied = [...cartesianProduct(...orTerms.map((e) => e.inner))]; + + // Then, combine those with the inner AND to turn e.g `A & (B | C) & D` into `(B & A & D) | (C & A & D)`. + return new Or( + this.location, + ...multiplied.map((distributedTerms) => new And(this.location, ...distributedTerms, ...atomarFactors)) + ); + } else if (this instanceof Or) { + // Already in DNF, but we want to simplify `(A OR B) OR C` into `A OR B OR C` if necessary. + return new Or( + this.location, + ...this.inner.flatMap((e) => { + const normalized = e.toDisjunctiveNormalForm(tools); + return normalized instanceof Or ? normalized.inner : [normalized]; + }) + ); + } + + return this; + } + + /** + * Checks that this filter is valid, meaning that no `NOT` operators appear before subqueries or other operators that + * don't support negations. + */ + isValid(tools: SqlTools): boolean { + let hasError = false; + + for (const literal of this.visitLiterals()) { + if (literal instanceof Not) { + tools.error('Negations are not allowed here', literal.location ?? undefined); + hasError = true; + } + } + + return !hasError; + } + + private *visitLiterals(): Generator { + if (this instanceof Or || this instanceof And) { + for (const term of this.inner) { + yield* term.visitLiterals(); + } + } else { + yield this; + } + } +} + +interface StreamCompilationContext { + streamName: string; + allVariants: StreamVariant[]; + currentVariant: StreamVariant; + queryCounter: number; +} + +/** + * A `NOT` operator that inverts an inner operator. + * + * This is only used temporarily when compiling the filter: It is illegal to negate {@link InOperator} and + * {@link CompareRowValueWithStreamParameter} clauses, as those are implemented with special lookups based on bucket + * parameters. + * For {@link EvaluateSimpleCondition}, a wrapping {@link Not} clause is unecessary, because we can simply push the + * negation into the inner evaluator. + */ +export class Not extends FilterOperator { + readonly operand: FilterOperator; + + constructor(location: NodeLocation | null, operand: FilterOperator) { + super(location); + this.operand = operand; + } + + compile(): void { + throw Error('Not operator should have been desugared before compilation.'); + } +} + +export class And extends FilterOperator { + readonly inner: FilterOperator[]; + + constructor(location: NodeLocation | null, ...operators: FilterOperator[]) { + super(location); + this.inner = operators; + } + + compile(context: StreamCompilationContext) { + // Within a variant, added conditions and parameters form a conjunction. + for (const condition of this.inner) { + condition.compile(context); + } + } +} + +export class Or extends FilterOperator { + readonly inner: FilterOperator[]; + + constructor(location: NodeLocation | null, ...operators: FilterOperator[]) { + super(location); + this.inner = operators; + } + + compile(context: StreamCompilationContext): void { + throw Error('An OR operator can only appear at the highest level of the filter chain'); + } + + compileVariants(streamName: string): StreamVariant[] { + const allVariants: StreamVariant[] = []; + const context = { + streamName, + currentVariant: undefined as StreamVariant | undefined, + allVariants, + queryCounter: 0 + }; + + for (const condition of this.inner) { + const variant = new StreamVariant(allVariants.length); + allVariants.push(variant); + + context.currentVariant = variant; + condition.compile(context as StreamCompilationContext); + } + + return allVariants; + } +} + +export class Subquery { + private table: TablePattern; + readonly column: RowValueClause; + private filter: FilterOperator; + + constructor( + table: TablePattern, + column: RowValueClause, + filter: CompareRowValueWithStreamParameter | EvaluateSimpleCondition + ) { + this.table = table; + this.column = column; + this.filter = filter; + } + + addFilter(filter: CompareRowValueWithStreamParameter | EvaluateSimpleCondition) { + this.filter = new And(this.filter.location, this.filter, filter); + } + + compileEvaluator(context: StreamCompilationContext): SubqueryEvaluator { + const innerVariants = this.filter.compileVariants(context.streamName).map((variant) => { + const id = context.queryCounter.toString(); + context.queryCounter++; + + if (variant.parameters.length == 0) { + throw new Error('Unsupported subquery without parameter, must depend on request parameters'); + } + + return [variant, id] satisfies [StreamVariant, string]; + }); + + const column = this.column; + + const evaluator: SubqueryEvaluator = { + parameterTable: this.table, + lookupsForParameterRow(sourceTable, row) { + const value = column.evaluate({ [sourceTable.name]: row }); + if (!isJsonValue(value)) { + return null; + } + + const lookups: ParameterLookup[] = []; + for (const [variant, id] of innerVariants) { + for (const instantiation of variant.instantiationsForRow({ sourceTable, record: row })) { + lookups.push(ParameterLookup.normalized(context.streamName, id, instantiation)); + } + } + return { value, lookups }; + }, + lookupsForRequest(parameters) { + const lookups: ParameterLookup[] = []; + + for (const [variant, id] of innerVariants) { + const instantiations = variant.findStaticInstantiations(parameters); + for (const instantiation of instantiations) { + lookups.push(ParameterLookup.normalized(context.streamName, id, instantiation)); + } + } + + return lookups; + } + }; + + context.currentVariant.subqueries.push(evaluator); + return evaluator; + } +} + +export type ScalarExpression = RowValueClause | ParameterValueClause; + +/** + * A filter that matches when row values are contained in a subquery depending on parameter values. + * + * Examples: + * + * - `SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issue WHERE owner_id = request.user())` + * + * An `IN` operator can also be handled without requiring a subquery, those don't create {@link InOperator}s: + * + * - `SELECT * FROM comments WHERE id IN request.params('id')` ({@link CompareRowValueWithStreamParameter}) + * - `SELECT * FROM comments WHERE request.user() IN comments.tagged_users` ({@link CompareRowValueWithStreamParameter}) + * + * Finally, it's also possible to `IN` operators for parameter values: + * + * - `SELECT * FROM comments WHERE request.user_id() IN (SELECT * FROM users WHERE is_admin)`. + * + * However, these are not represented as {@link InOperator}s in the filter graph. Instead, we push the + * `request.user_id()` filter into the subquery and then compile the operator into a {@link ExistsOperator}. + */ +export class InOperator extends FilterOperator { + private left: RowValueClause; + private right: Subquery; + + constructor(location: NodeLocation | null, left: RowValueClause, right: Subquery) { + super(location); + this.left = left; + this.right = right; + } + + compile(context: StreamCompilationContext): void { + const subqueryEvaluator = this.right.compileEvaluator(context); + const filter = this.left; + + // Something like `SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issue WHERE owner_id = request.user())` + // This groups rows into buckets identified by comments.issue_id, which happens in filterRow. + // When a user connects, we need to resolve all the issue ids they own. This happens with an indirection: + // 1. In the subquery evaluator, we create an index from owner_id to issue ids. + // 2. When we have users, we use that index to find issue ids dynamically, with which we can build the buckets + // to sync. + context.currentVariant.parameters.push({ + lookup: { + type: 'in', + subquery: subqueryEvaluator + }, + filterRow(options) { + const tables = { [options.sourceTable.name]: options.record }; + const value = filter.evaluate(tables); + if (isJsonValue(value)) { + return [normalizeParameterValue(value)]; + } else { + return []; + } + } + }); + } +} + +/** + * An operator of the form ` && `, where `right` is a subqery. + */ +export class OverlapOperator extends FilterOperator { + private left: RowValueClause; + private right: Subquery; + + constructor(location: NodeLocation | null, left: RowValueClause, right: Subquery) { + super(location); + this.left = left; + this.right = right; + } + + compile(context: StreamCompilationContext): void { + const subqueryEvaluator = this.right.compileEvaluator(context); + const filter = this.left; + + context.currentVariant.parameters.push({ + lookup: { + type: 'overlap', + subquery: subqueryEvaluator + }, + filterRow(options) { + const tables = { [options.sourceTable.name]: options.record }; + const value = filter.evaluate(tables); + if (value == null) { + return []; + } + + const parsed = checkJsonArray(value, 'Left side of && must evaluate to an array'); + return parsed.map(normalizeParameterValue); + } + }); + } +} + +/** + * A filter that matches when _something_ exists in a subquery. + * + * This is exclusively used to compile `IN` operators where the left operand does not depend on row values, e.g. + * + * - `SELECT * FROM comments WHERE request.user_id() IN (SELECT id FROM users WHERE is_admin)` + * + * These queries are desugared to something that is semantically equivalent to + * `WHERE EXISTS (SELECT _ FROM users WHERE is_admin AND id = request.user_id())`. + */ +export class ExistsOperator extends FilterOperator { + private subquery: Subquery; + + constructor(location: NodeLocation | null, subquery: Subquery) { + super(location); + this.subquery = subquery; + } + + compile(context: StreamCompilationContext): void { + const subqueryEvaluator = this.subquery.compileEvaluator(context); + + context.currentVariant.requestFilters.push({ + type: 'dynamic', + subquery: subqueryEvaluator, + matches(_, results) { + return results.length > 0; + } + }); + } +} + +/** + * A filter that matches if a column of the input table matches some condition that depends on the request parameters. + * + * E.g. `SELECT * FROM issue_id WHERE owner_id = request.user_id()`. This will add the referenced column as a bucket + * parameter, but doesn't require a dynamic lookup. + * + * We only allow a few operators that are efficient to compute here (effectively just an equality operator). + */ +export class CompareRowValueWithStreamParameter extends FilterOperator { + private match: ParameterMatchClause; + + constructor(location: NodeLocation | null, match: ParameterMatchClause) { + super(location); + this.match = match; + } + + compile(context: StreamCompilationContext): void { + const match = this.match; + + for (const filters of match.inputParameters) { + context.currentVariant.parameters.push({ + lookup: { + type: 'static', + fromRequest(parameters) { + const value = filters.parametersToLookupValue(parameters); + if (filters.expands) { + if (typeof value != 'string') { + return []; + } + let values: SqliteJsonValue[] = JSON.parse(value); + if (!Array.isArray(values)) { + return []; + } + + return values; + } else { + return [value]; + } + } + }, + filterRow(options) { + const tables = { [options.sourceTable.name]: options.record }; + const matchingValues: SqliteJsonValue[] = []; + + for (const matchingParameters of match.filterRow(tables)) { + const matchingValue = matchingParameters[filters.key]; + if (matchingValue != null) { + matchingValues.push(normalizeParameterValue(matchingValue)); + } + } + + return matchingValues; + } + }); + } + } +} + +/** + * A simple condition that is either static, only depends on the current row being matched, or only depends on the input + * parameters. + */ +export class EvaluateSimpleCondition extends FilterOperator { + expression: ScalarExpression; + + constructor(location: NodeLocation | null, expression: ScalarExpression) { + super(location); + this.expression = expression; + } + + compile(context: StreamCompilationContext): void { + if (isRowValueClause(this.expression)) { + const filter = this.expression; + + context.currentVariant.additionalRowFilters.push((row) => { + return sqliteBool(filter.evaluate({ [row.sourceTable.name]: row.record })) == SQLITE_TRUE; + }); + } else if (isParameterValueClause(this.expression)) { + const filter = this.expression; + + context.currentVariant.requestFilters.push({ + type: 'static', + matches(params) { + return sqliteBool(filter.lookupParameterValue(params)) == SQLITE_TRUE; + } + }); + } else { + const _: never = this.expression; // Exhaustive check + } + } + + negate(tools: SqlTools): EvaluateSimpleCondition { + return new EvaluateSimpleCondition( + this.location, + tools.composeFunction(OPERATOR_NOT, [this.expression], []) as ScalarExpression + ); + } +} diff --git a/packages/sync-rules/src/streams/from_sql.ts b/packages/sync-rules/src/streams/from_sql.ts new file mode 100644 index 00000000..bf9717f4 --- /dev/null +++ b/packages/sync-rules/src/streams/from_sql.ts @@ -0,0 +1,460 @@ +import { SqlRuleError } from '../errors.js'; +import { CompiledClause, QuerySchema, StaticValueClause, StreamParseOptions } from '../types.js'; +import { isSelectStatement } from '../utils.js'; +import { + andFilters, + checkUnsupportedFeatures, + isClauseError, + isParameterMatchClause, + isParameterValueClause, + isRowValueClause, + isStaticValueClause, + orFilters +} from '../sql_support.js'; +import { TablePattern } from '../TablePattern.js'; +import { TableQuerySchema } from '../TableQuerySchema.js'; +import { SqlTools } from '../sql_filters.js'; +import { BaseSqlDataQuery, BaseSqlDataQueryOptions, RowValueExtractor } from '../BaseSqlDataQuery.js'; +import { ExpressionType } from '../ExpressionType.js'; +import { SyncStream } from './stream.js'; +import { + And, + CompareRowValueWithStreamParameter, + EvaluateSimpleCondition, + ExistsOperator, + FilterOperator, + InOperator, + Not, + Or, + OverlapOperator, + ScalarExpression, + Subquery +} from './filter.js'; +import { + Expr, + ExprBinary, + nil, + NodeLocation, + parse, + SelectFromStatement, + SelectStatement, + Statement +} from 'pgsql-ast-parser'; +import { STREAM_FUNCTIONS } from './functions.js'; + +export function syncStreamFromSql( + descriptorName: string, + sql: string, + options: StreamParseOptions +): [SyncStream, SqlRuleError[]] { + const compiler = new SyncStreamCompiler(descriptorName, sql, options); + return [compiler.compile(), compiler.errors]; +} + +class SyncStreamCompiler { + descriptorName: string; + sql: string; + options: StreamParseOptions; + + errors: SqlRuleError[]; + + constructor(descriptorName: string, sql: string, options: StreamParseOptions) { + this.descriptorName = descriptorName; + this.sql = sql; + this.options = options; + this.errors = []; + } + + compile(): SyncStream { + const [stmt, ...illegalRest] = parse(this.sql, { locationTracking: true }); + + // TODO: Share more of this code with SqlDataQuery + if (illegalRest.length > 0) { + throw new SqlRuleError('Only a single SELECT statement is supported', this.sql, illegalRest[0]?._location); + } + + const { query, tableRef, alias, querySchema, sourceTable } = this.checkValidSelectStatement(stmt); + + const tools = new SqlTools({ + table: alias, + parameterTables: [], + valueTables: [alias], + sql: this.sql, + schema: querySchema, + parameterFunctions: STREAM_FUNCTIONS, + supportsParameterExpressions: true, + supportsExpandingParameters: true // needed for table.column IN (subscription.parameters() -> ...) + }); + tools.checkSpecificNameCase(tableRef); + let filter = this.whereClauseToFilters(tools, query.where); + filter = filter.toDisjunctiveNormalForm(tools); + + const stream = new SyncStream( + this.descriptorName, + new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable)) + ); + stream.subscribedToByDefault = this.options.auto_subscribe ?? false; + if (filter.isValid(tools)) { + stream.variants = filter.compileVariants(this.descriptorName); + } + + this.errors.push(...tools.errors); + return stream; + } + + private compileDataQuery( + tools: SqlTools, + query: SelectFromStatement, + alias: string, + sourceTable: TablePattern + ): BaseSqlDataQueryOptions { + let hasId = false; + let hasWildcard = false; + let extractors: RowValueExtractor[] = []; + const querySchema = tools.schema; + + for (let column of query.columns ?? []) { + const name = tools.getOutputName(column); + if (name != '*') { + const clause = tools.compileRowValueExtractor(column.expr); + if (isClauseError(clause)) { + // Error logged already + continue; + } + extractors.push({ + extract: (tables, output) => { + output[name] = clause.evaluate(tables); + }, + getTypes(schema, into) { + const def = clause.getColumnDefinition(schema); + + into[name] = { name, type: def?.type ?? ExpressionType.NONE, originalType: def?.originalType }; + } + }); + } else { + extractors.push({ + extract: (tables, output) => { + const row = tables[alias]; + for (let key in row) { + if (key.startsWith('_')) { + continue; + } + output[key] ??= row[key]; + } + }, + getTypes(schema, into) { + for (let column of schema.getColumns(alias)) { + into[column.name] ??= column; + } + } + }); + } + if (name == 'id') { + hasId = true; + } else if (name == '*') { + hasWildcard = true; + if (querySchema == null) { + // Not performing schema-based validation - assume there is an id + hasId = true; + } else { + const idType = querySchema.getColumn(alias, 'id')?.type ?? ExpressionType.NONE; + if (!idType.isNone()) { + hasId = true; + } + } + } + } + if (!hasId) { + const error = new SqlRuleError(`Query must return an "id" column`, this.sql, query.columns?.[0]._location); + if (hasWildcard) { + // Schema-based validations are always warnings + error.type = 'warning'; + } + tools.errors.push(error); + } + + return { + sourceTable, + table: alias, + sql: this.sql, + columns: query.columns ?? [], + descriptorName: this.descriptorName, + tools, + extractors, + // Streams don't have traditional parameters, and parameters aren't used in the rest of the stream implementation. + // Instead, we represent parameters as an array in stream variants. + bucketParameters: [] + } satisfies BaseSqlDataQueryOptions; + } + + private checkUnsupportedFeatures(stmt: SelectFromStatement) { + this.errors.push(...checkUnsupportedFeatures(this.sql, stmt)); + } + + private whereClauseToFilters(tools: SqlTools, clause: Expr | nil): FilterOperator { + // We need to handle some functions specially here: + // 1. IN subqueries are not allowed in regular data queries, so we handle them here instead of relying on SqlTools + // 2. Since IN operators can be composed with other operators using AND and OR, we need to handle those operators + // as well. + // Apart from that we can rely on compileClause + if (clause != null) { + if (clause.type == 'binary') { + let operator, scalarCombinator; + + if (clause.op == 'AND') { + operator = And; + scalarCombinator = andFilters; + } else if (clause.op == 'OR') { + operator = Or; + scalarCombinator = orFilters; + } else if (clause.op == 'IN' || clause.op == 'NOT IN') { + const filter = this.compileInOperator(tools, clause); + return clause.op == 'NOT IN' ? new Not(clause._location ?? null, filter) : filter; + } else if (clause.op == '&&') { + return this.compileOverlapOperator(tools, clause); + } + + // Try to combine AND and OR operators on a scalar level first, without introducing more filters. + if (operator && scalarCombinator) { + const left = this.whereClauseToFilters(tools, clause.left); + const right = this.whereClauseToFilters(tools, clause.right); + + if (left instanceof EvaluateSimpleCondition && right instanceof EvaluateSimpleCondition) { + let directCombination; + + try { + directCombination = scalarCombinator(left.expression, right.expression); + } catch (e) { + // Left and right might be a combination of row and parameter values that can't be combined like this. Ok, + // we can represent thas as separate filter instances. + } + + if (directCombination && isScalarExpression(directCombination)) { + return new EvaluateSimpleCondition(clause._location ?? null, directCombination); + } + } + + return new operator(clause._location ?? null, left, right); + } + } else if (clause.type == 'unary') { + if (clause.op == 'NOT') { + const inner = this.whereClauseToFilters(tools, clause.operand); + if (inner instanceof EvaluateSimpleCondition) { + // We can just negate that directly. + return inner.negate(tools); + } else { + return new Not(clause._location ?? null, inner); + } + } + } + } + + const regularClause = tools.compileClause(clause); + return compiledClauseToFilter(tools, clause?._location ?? null, regularClause); + } + + private compileInOperator(tools: SqlTools, clause: ExprBinary): FilterOperator { + // There are different kinds of `IN` operators we support in stream definitions: + // + // 1. Left row clause, right subquery: `WHERE issue_in IN (SELECT id FROM issue WHERE owner_id = request.user())` + // 2. Left parameter clause, right subquery: `WHERE request.user_id() IN (SELECT * FROM user_id FROM users WHERE is_admin)`. + // 3. Left parameter clause, right row data: `WHERE request.user() IN comments.tagged_users`. + // 4. Left row clause, right parameter data: `WHERE id IN subscription_parameters.ids`. + // 5. Left and right both row clauses, both parameter clauses, or mix or static and row/parameter clauses. + const left = tools.compileClause(clause.left); + const location = clause._location ?? null; + if (isClauseError(left)) { + return recoverErrorClause(tools); + } + + if (clause.right.type == 'select') { + if (!isScalarExpression(left)) { + if (!isClauseError(left)) { + tools.error( + 'This may contain values derived from the source row to sync or a value derived from stream parameters, but never both.', + clause.left + ); + } + + return recoverErrorClause(tools); + } + + const subqueryResult = this.compileSubquery(clause.right); + if (!subqueryResult) { + return recoverErrorClause(tools); + } + const [subquery, subqueryTools] = subqueryResult; + + if (isStaticValueClause(left)) { + tools.error( + 'For IN subqueries, the left operand must either depend on the row to sync or stream parameters.', + clause.left + ); + return recoverErrorClause(tools); + } + + if (isParameterValueClause(left)) { + // Case 2: We can't implement this as an actual IN operator because we need to use exact parameter lookups (so + // we can't, for instance, resolve `SELECT * FROM users WHERE is_admin` via parameter data sets). Since the + // left clause doesn't depend on row data however, we can push it down into the subquery where it would be + // introduced as a parameter: `EXISTS (SELECT _ FROM users WHERE is_admin AND user_id = request.user_id())`. + const additionalClause = subqueryTools.parameterMatchClause(subquery.column, left); + subquery.addFilter(compiledClauseToFilter(subqueryTools, null, additionalClause)); + return new ExistsOperator(location, subquery); + } else { + // Case 1 + return new InOperator(location, left, subquery); + } + } + + const right = tools.compileClause(clause.right); + + // For cases 3-5, we can actually use SqlTools.compileClause. Case 3 and 4 are handled specially in there and return + // a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value + // or a parameter-value clause which we can wrap in EvaluateSimpleCondition. + const combined = tools.compileInClause(clause.left, left, clause.right, right); + return compiledClauseToFilter(tools, location, combined); + } + + private compileOverlapOperator(tools: SqlTools, clause: ExprBinary): FilterOperator { + const left = tools.compileClause(clause.left); + const location = clause._location ?? null; + + if (isClauseError(left)) { + return recoverErrorClause(tools); + } + + if (clause.right.type == 'select') { + if (!isRowValueClause(left)) { + if (!isClauseError(left)) { + tools.error('The left-hand side of an && operator must be derived from the row to sync..', clause.left); + } + + return recoverErrorClause(tools); + } + + const subqueryResult = this.compileSubquery(clause.right); + if (!subqueryResult) { + return recoverErrorClause(tools); + } + const [subquery] = subqueryResult; + return new OverlapOperator(location, left, subquery); + } + + const right = tools.compileClause(clause.right); + + // For cases 3-5, we can actually uses SqlTools.compileClause. Case 3 and 4 are handled specially in there and return + // a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value + // or a parameter-value clause which we can wrap in EvaluateSimpleCondition. + const combined = tools.compileOverlapClause(clause.left, left, clause.right, right); + return compiledClauseToFilter(tools, location, combined); + } + + private compileSubquery(stmt: SelectStatement): [Subquery, SqlTools] | undefined { + // A subquery is similar to a data query in legacy sync rules. Importantly, despite being an expression, subqueries + // can't reference columns from the outer query! The syntax is always `SELECT FROM WHERE + // `. + let validated; + try { + validated = this.checkValidSelectStatement(stmt); + } catch (e) { + if (e instanceof SqlRuleError) { + this.errors.push(e); + } + return undefined; + } + + const { query, alias, querySchema, tableRef, sourceTable } = validated; + // Create a new tools instance for this - the subquery does not have access to the outer one. + const tools = new SqlTools({ + table: alias, + parameterTables: [], + valueTables: [alias], + sql: this.sql, + schema: querySchema, + supportsParameterExpressions: true, + parameterFunctions: STREAM_FUNCTIONS + }); + tools.checkSpecificNameCase(tableRef); + + if (query.columns?.length != 1) { + tools.error('This subquery must return exactly one column', query); + } + + const column = tools.compileRowValueExtractor(query.columns?.[0]?.expr); + if (isClauseError(column)) { + return; + } + + const where = tools.compileClause(query.where); + + this.errors.push(...tools.errors); + return [new Subquery(sourceTable, column, compiledClauseToFilter(tools, query.where?._location, where)), tools]; + } + + private checkValidSelectStatement(stmt: Statement) { + if (!isSelectStatement(stmt)) { + throw new SqlRuleError('Only SELECT statements are supported', this.sql, stmt._location); + } + + if (stmt.from == null || stmt.from.length != 1 || stmt.from[0].type != 'table') { + throw new SqlRuleError('Must SELECT from a single table', this.sql, stmt); + } + + this.checkUnsupportedFeatures(stmt); + + const tableRef = stmt.from?.[0].name; + if (tableRef?.name == null) { + throw new SqlRuleError('Must SELECT from a single table', this.sql, stmt.from?.[0]._location); + } + const alias: string = tableRef.alias ?? tableRef.name; + + const sourceTable = new TablePattern(tableRef.schema ?? this.options.defaultSchema, tableRef.name); + let querySchema: QuerySchema | undefined = undefined; + const schema = this.options.schema; + if (schema) { + const tables = schema.getTables(sourceTable); + if (tables.length == 0) { + const e = new SqlRuleError( + `Table ${sourceTable.schema}.${sourceTable.tablePattern} not found`, + this.sql, + stmt.from?.[0]?._location + ); + e.type = 'warning'; + + this.errors.push(e); + } else { + querySchema = new TableQuerySchema(tables, alias); + } + } + + return { + query: stmt, + tableRef, + alias, + querySchema, + sourceTable + }; + } +} + +function isScalarExpression(clause: CompiledClause): clause is ScalarExpression { + return isRowValueClause(clause) || isStaticValueClause(clause) || isParameterValueClause(clause); +} + +function recoverErrorClause(tools: SqlTools): EvaluateSimpleCondition { + // An error has already been logged. + return new EvaluateSimpleCondition(null, tools.compileClause(null) as StaticValueClause); +} + +function compiledClauseToFilter(tools: SqlTools, location: NodeLocation | nil, regularClause: CompiledClause) { + if (isScalarExpression(regularClause)) { + return new EvaluateSimpleCondition(location ?? null, regularClause); + } else if (isParameterMatchClause(regularClause)) { + return new CompareRowValueWithStreamParameter(location ?? null, regularClause); + } else if (isClauseError(regularClause)) { + return recoverErrorClause(tools); + } else { + throw new Error('Unknown clause type'); + } +} diff --git a/packages/sync-rules/src/streams/functions.ts b/packages/sync-rules/src/streams/functions.ts new file mode 100644 index 00000000..cd6c2bb2 --- /dev/null +++ b/packages/sync-rules/src/streams/functions.ts @@ -0,0 +1,45 @@ +import { + globalRequestParameterFunctions, + parameterFunctions, + request_user_id, + SqlParameterFunction +} from '../request_functions.js'; +import { ParameterValueSet } from '../types.js'; + +export const STREAM_FUNCTIONS: Record> = { + subscription: { + ...parameterFunctions({ + schema: 'subscription', + extractJsonString: function (v: ParameterValueSet): string { + return v.rawStreamParameters ?? '{}'; + }, + extractJsonParsed: function (v: ParameterValueSet) { + return v.streamParameters ?? {}; + }, + sourceDescription: 'Unauthenticated subscription parameters as JSON', + sourceDocumentation: + 'parameters passed by the client for this stream as a JSON string. These parameters are not authenticated - any value can be passed in by the client.', + usesAuthenticatedRequestParameters: false, + usesUnauthenticatedRequestParameters: true + }) + }, + connection: { + ...globalRequestParameterFunctions('connection') + }, + auth: { + user_id: request_user_id, + ...parameterFunctions({ + schema: 'auth', + extractJsonString: function (v: ParameterValueSet): string { + return v.rawTokenPayload; + }, + extractJsonParsed: function (v: ParameterValueSet) { + return v.tokenParameters; + }, + sourceDescription: 'JWT payload as JSON', + sourceDocumentation: 'JWT payload as a JSON string. This is always validated against trusted keys', + usesAuthenticatedRequestParameters: true, + usesUnauthenticatedRequestParameters: false + }) + } +}; diff --git a/packages/sync-rules/src/streams/parameter.ts b/packages/sync-rules/src/streams/parameter.ts new file mode 100644 index 00000000..73dc9f08 --- /dev/null +++ b/packages/sync-rules/src/streams/parameter.ts @@ -0,0 +1,82 @@ +import { ParameterLookup } from '../BucketParameterQuerier.js'; +import { SourceTableInterface } from '../SourceTableInterface.js'; +import { TablePattern } from '../TablePattern.js'; +import { + EvaluateRowOptions, + ParameterValueSet, + RequestParameters, + SqliteJsonValue, + SqliteRow, + SqliteValue +} from '../types.js'; + +/** + * A source of parameterization, causing data from the source table to be distributed into multiple buckets instead of + * a single one. + * + * Parameters are introduced when the select statement defining the stream has a where clause with elements where: + * + * 1. Values in the row to sync are compared against request parameters: {@link CompareRowValueWithStreamParameter}. + * 2. Values in the row to sync are compared against a subquery: {@link InOperator}. + */ +export interface BucketParameter { + lookup: StaticLookup | EqualsRowInSubqueryLookup | OverlapsSubqueryLookup; + /** + * Given a row in the table the stream is selecting from, return all possible instantiations of this parameter that + * would match the row. + * + * This is used to assign rows to buckets. For instance, considering the query + * `SELECT * FROM asset WHERE owner = request.user_id()`, we would introduce a parameter. For that parameter, + * `filterRow(assetRow)` would return `assetRow.owner`. + * When a user connects, {@link StaticLookup.fromRequest} would return the user ID from the token. A matching bucket would + * then contain the oplog data for assets with the matching `owner` column. + */ + filterRow(options: EvaluateRowOptions): SqliteJsonValue[]; +} + +export interface SubqueryEvaluator { + parameterTable: TablePattern; + + lookupsForParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): SubqueryLookups | null; + lookupsForRequest(params: RequestParameters): ParameterLookup[]; +} + +export interface SubqueryLookups { + lookups: ParameterLookup[]; + /** + * The value that the single column in the subquery evaluated to. + */ + value: SqliteJsonValue; +} + +/** + * An association of rows to subscription parameters that does not depend on a subquery. + */ +export interface StaticLookup { + type: 'static'; + + /** + * The value this lookup evaluates to for a specific request. + * + * This is typically a singleton array, e.g. the user's id in `WHERE owner_id = token.user_id()`. To desugar `IN` + * queries on parameter data, this can also return multiple values though: `WHERE owner_id IN subscription.parameters() -> 'user_ids'`. + */ + fromRequest(parameters: ParameterValueSet): SqliteValue[]; +} + +/** + * An association of rows that is matched if a value in the source row is contained in the results of a subquery. + */ +export interface EqualsRowInSubqueryLookup { + type: 'in'; + subquery: SubqueryEvaluator; +} + +/** + * An association of rows that is matched if a source-row value (interpreted as a JSON array) overlaps with rows + * contained in the results of a subqery. + */ +export interface OverlapsSubqueryLookup { + type: 'overlap'; + subquery: SubqueryEvaluator; +} diff --git a/packages/sync-rules/src/streams/stream.ts b/packages/sync-rules/src/streams/stream.ts new file mode 100644 index 00000000..7d1cfdb0 --- /dev/null +++ b/packages/sync-rules/src/streams/stream.ts @@ -0,0 +1,181 @@ +import { BaseSqlDataQuery } from '../BaseSqlDataQuery.js'; +import { BucketInclusionReason, BucketPriority, DEFAULT_BUCKET_PRIORITY } from '../BucketDescription.js'; +import { BucketParameterQuerier, PendingQueriers } from '../BucketParameterQuerier.js'; +import { BucketSource, BucketSourceType, ResultSetDescription } from '../BucketSource.js'; +import { ColumnDefinition } from '../ExpressionType.js'; +import { SourceTableInterface } from '../SourceTableInterface.js'; +import { GetQuerierOptions, RequestedStream } from '../SqlSyncRules.js'; +import { TablePattern } from '../TablePattern.js'; +import { + EvaluatedParametersResult, + EvaluateRowOptions, + EvaluationResult, + RequestParameters, + SourceSchema, + SqliteRow +} from '../types.js'; +import { StreamVariant } from './variant.js'; + +export class SyncStream implements BucketSource { + name: string; + subscribedToByDefault: boolean; + priority: BucketPriority; + variants: StreamVariant[]; + data: BaseSqlDataQuery; + + constructor(name: string, data: BaseSqlDataQuery) { + this.name = name; + this.subscribedToByDefault = false; + this.priority = DEFAULT_BUCKET_PRIORITY; + this.variants = []; + this.data = data; + } + + public get type(): BucketSourceType { + return BucketSourceType.SYNC_STREAM; + } + + pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void { + const subscriptions = options.streams[this.name] ?? []; + + if (!this.subscribedToByDefault && !subscriptions.length) { + // The client is not subscribing to this stream, so don't query buckets related to it. + return; + } + + let hasExplicitDefaultSubscription = false; + for (const subscription of subscriptions) { + let subscriptionParams = options.globalParameters; + if (subscription.parameters != null) { + subscriptionParams = subscriptionParams.withAddedStreamParameters(subscription.parameters); + } else { + hasExplicitDefaultSubscription = true; + } + + this.queriersForSubscription(result, subscription, subscriptionParams); + } + + // If the stream is subscribed to by default and there is no explicit subscription that would match the default + // subscription, also include the default querier. + if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) { + this.queriersForSubscription(result, null, options.globalParameters); + } + } + + private queriersForSubscription( + result: PendingQueriers, + subscription: RequestedStream | null, + params: RequestParameters + ) { + const reason: BucketInclusionReason = subscription != null ? { subscription: subscription.opaque_id } : 'default'; + const queriers: BucketParameterQuerier[] = []; + + try { + for (const variant of this.variants) { + const querier = variant.querier(this, reason, params); + if (querier) { + queriers.push(querier); + } + } + + result.queriers.push(...queriers); + } catch (e) { + result.errors.push({ + descriptor: this.name, + message: `Error evaluating bucket ids: ${e.message}`, + subscription: subscription ?? undefined + }); + } + } + + hasDynamicBucketQueries(): boolean { + return this.variants.some((v) => v.hasDynamicBucketQueries); + } + + tableSyncsData(table: SourceTableInterface): boolean { + return this.data.applies(table); + } + + tableSyncsParameters(table: SourceTableInterface): boolean { + for (const variant of this.variants) { + for (const subquery of variant.subqueries) { + if (subquery.parameterTable.matches(table)) { + return true; + } + } + } + + return false; + } + + getSourceTables(): Set { + let result = new Set(); + result.add(this.data.sourceTable); + for (let variant of this.variants) { + for (const subquery of variant.subqueries) { + result.add(subquery.parameterTable); + } + } + + // Note: No physical tables for global_parameter_queries + + return result; + } + + resolveResultSets(schema: SourceSchema, tables: Record>) { + this.data.resolveResultSets(schema, tables); + } + + debugRepresentation() { + return { + name: this.name, + type: BucketSourceType[this.type], + variants: this.variants.map((v) => v.debugRepresentation()), + data: { + table: this.data.sourceTable, + columns: this.data.columnOutputNames() + } + }; + + throw new Error('Method not implemented.'); + } + + debugWriteOutputTables(result: Record): void { + result[this.data.table!] ??= []; + const r = { + query: this.data.sql + }; + + result[this.data.table!].push(r); + } + + evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] { + const result: EvaluatedParametersResult[] = []; + + for (const variant of this.variants) { + variant.pushParameterRowEvaluation(result, sourceTable, row); + } + + return result; + } + + evaluateRow(options: EvaluateRowOptions): EvaluationResult[] { + if (!this.data.applies(options.sourceTable)) { + return []; + } + + const stream = this; + return this.data.evaluateRowWithOptions({ + table: options.sourceTable, + row: options.record, + bucketIds() { + const bucketIds: string[] = []; + for (const variant of stream.variants) { + bucketIds.push(...variant.bucketIdsForRow(stream.name, options)); + } + + return bucketIds; + } + }); + } +} diff --git a/packages/sync-rules/src/streams/utils.ts b/packages/sync-rules/src/streams/utils.ts new file mode 100644 index 00000000..caafd0bf --- /dev/null +++ b/packages/sync-rules/src/streams/utils.ts @@ -0,0 +1,12 @@ +export function* cartesianProduct(...sets: T[][]): Generator { + if (sets.length == 0) { + yield []; + return; + } + + const [head, ...tail] = sets; + for (let h of head) { + const remainder = cartesianProduct(...tail); + for (let r of remainder) yield [h, ...r]; + } +} diff --git a/packages/sync-rules/src/streams/variant.ts b/packages/sync-rules/src/streams/variant.ts new file mode 100644 index 00000000..b217de35 --- /dev/null +++ b/packages/sync-rules/src/streams/variant.ts @@ -0,0 +1,347 @@ +import { BucketInclusionReason, ResolvedBucket } from '../BucketDescription.js'; +import { BucketParameterQuerier, ParameterLookup } from '../BucketParameterQuerier.js'; +import { SourceTableInterface } from '../SourceTableInterface.js'; +import { + EvaluatedParametersResult, + EvaluateRowOptions, + RequestParameters, + SqliteJsonValue, + SqliteRow +} from '../types.js'; +import { isJsonValue, JSONBucketNameSerialize, normalizeParameterValue } from '../utils.js'; +import { BucketParameter, SubqueryEvaluator } from './parameter.js'; +import { SyncStream } from './stream.js'; +import { cartesianProduct } from './utils.js'; + +/** + * A variant of a stream. + * + * Variants are introduced on {@link Or} filters, since different sub-filters (with potentially different) bucket + * parameters can both cause a row to be matched. + * + * Consider the query `SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issue WHERE owner_id = request.user()) OR request.is_admin()`. + * Here, the filter is an or clause matching rows where: + * + * - An {@link InOperator} associatates comments in issues owned by the requesting user. This gets implemented with a + * parameter lookup index mapping `issue.owner_id => issue.id`. `comments.issue_id` is a bucket parameter resolved + * dynamically. + * - Or, the user is an admin, in which case all comments are matched. There are no bucket parameters for this + * variant. + * + * The introduction of stream variants allows the `evaluateParameterRow` and `queriersForSubscription` implementations + * to operate independently. + * + * Multiple variants may cause the same row to get synced via different buckets. Depending on the request, users may + * also receive multiple buckets with the same data. This is not an issue! Clients deduplicate rows received across + * buckets, so we don't have to filter for this case in the sync service. + */ +export class StreamVariant { + id: number; + parameters: BucketParameter[]; + subqueries: SubqueryEvaluator[]; + + /** + * Additional filters that don't introduce bucket parameters, but can exclude rows. + * + * This is introduced for streams like `SELECT * FROM assets WHERE LENGTH(assets.name < 10)`. + */ + additionalRowFilters: ((options: EvaluateRowOptions) => boolean)[]; + + /** + * Additional filters that are evaluated against the request of the stream subscription. + * + * These filters can either only depend on values in the request alone (e.g. `WHERE token_parameters.is_admin`), or + * on results from a subquery (e.g. `WHERE request.user_id() IN (SELECT id FROM user WHERE is_admin)`). + */ + requestFilters: RequestFilter[]; + + constructor(id: number) { + this.id = id; + this.parameters = []; + this.subqueries = []; + this.additionalRowFilters = []; + this.requestFilters = []; + } + + /** + * Given a row in the table this stream selects from, returns all ids of buckets to which that row belongs to. + */ + bucketIdsForRow(streamName: string, options: EvaluateRowOptions): string[] { + return this.instantiationsForRow(options).map((values) => this.buildBucketId(streamName, values)); + } + + /** + * Given a row to evaluate, returns all instantiations of parameters that satisfy conditions. + * + * The inner arrays will have a length equal to the amount of parameters in this variant. + */ + instantiationsForRow(options: EvaluateRowOptions): SqliteJsonValue[][] { + for (const additional of this.additionalRowFilters) { + if (!additional(options)) { + return []; + } + } + + // Contains an array of all values satisfying each parameter. So this array has the same length as the amount of + // parameters, and each nested array has a dynamic length. + const parameterInstantiations: SqliteJsonValue[][] = []; + for (const parameter of this.parameters) { + const matching = parameter.filterRow(options); + if (matching.length == 0) { + // The final list of bucket ids is the cartesian product of all matching parameters. So if there's no parameter + // satisfying this value, we know the final list will be empty. + return []; + } + + parameterInstantiations.push(matching); + } + + // Combine the map of values like {param_1: [foo, bar], param_2: [baz]} into parameter arrays: + // [foo, baz], [bar, baz]. + return this.cartesianProductOfParameterInstantiations(parameterInstantiations); + } + + /** + * Turns an array of values for each parameter into an array of all instantiations by effectively building the + * cartesian product of the parameter sets. + * + * @param instantiations An array containing values for each parameter. + * @returns Each instantiation, with each sub-array having a value for a parameter. + */ + private cartesianProductOfParameterInstantiations(instantiations: SqliteJsonValue[][]): SqliteJsonValue[][] { + return [...cartesianProduct(...instantiations)]; + } + + get hasDynamicBucketQueries(): boolean { + return this.requestFilters.some((f) => f.type == 'dynamic'); + } + + querier(stream: SyncStream, reason: BucketInclusionReason, params: RequestParameters): BucketParameterQuerier | null { + const instantiation = this.partiallyEvaluateParameters(params); + if (instantiation == null) { + return null; + } + + interface ResolvedDynamicParameter { + index: number; + subquery: SubqueryEvaluator; + } + + const dynamicRequestFilters: SubqueryRequestFilter[] = this.requestFilters.filter((f) => f.type == 'dynamic'); + const dynamicParameters: ResolvedDynamicParameter[] = []; + const subqueryToLookups = new Map(); + + for (let i = 0; i < this.parameters.length; i++) { + const parameter = this.parameters[i]; + const lookup = parameter.lookup; + + if (lookup.type == 'in' || lookup.type == 'overlap') { + dynamicParameters.push({ + index: i, + subquery: lookup.subquery + }); + } + } + + for (const subquery of this.subqueries) { + subqueryToLookups.set(subquery, subquery.lookupsForRequest(params)); + } + + const staticBuckets: ResolvedBucket[] = []; + if (dynamicParameters.length == 0 && dynamicRequestFilters.length == 0) { + // When we have no dynamic parameters, the partial evaluation is a full instantiation. + const instantiations = this.cartesianProductOfParameterInstantiations(instantiation as SqliteJsonValue[][]); + for (const instantiation of instantiations) { + staticBuckets.push(this.resolveBucket(stream, instantiation, reason)); + } + } + + const variant = this; + return { + staticBuckets: staticBuckets, + hasDynamicBuckets: this.subqueries.length != 0, + parameterQueryLookups: [...subqueryToLookups.values()].flatMap((f) => f), + async queryDynamicBucketDescriptions(source) { + // Evaluate subqueries + const subqueryResults = new Map(); + for (const [subquery, lookups] of subqueryToLookups.entries()) { + const rows = await source.getParameterSets(lookups); + // The result column used in parameter sets is always named result, see pushParameterRowEvaluation + const values = rows.map((r) => r.result); + subqueryResults.set(subquery, values); + } + + // Check if we have a subquery-based request filter rejecting the row. + for (const filter of dynamicRequestFilters) { + if (!filter.matches(params, subqueryResults.get(filter.subquery)!)) { + return []; + } + } + + const perParameterInstantiation: (SqliteJsonValue | BucketParameter)[][] = []; + for (const parameter of instantiation) { + if (Array.isArray(parameter)) { + // Statically-resolved values + perParameterInstantiation.push(parameter); + } else { + // to be instantiated with dynamic lookup + perParameterInstantiation.push([parameter as BucketParameter]); + } + } + + for (const lookup of dynamicParameters) { + perParameterInstantiation[lookup.index] = subqueryResults.get(lookup.subquery)!; + } + + const product = variant.cartesianProductOfParameterInstantiations( + perParameterInstantiation as SqliteJsonValue[][] + ); + + return Promise.resolve(product.map((e) => variant.resolveBucket(stream, e, reason))); + } + }; + } + + findStaticInstantiations(params: RequestParameters): SqliteJsonValue[][] { + if (this.subqueries.length) { + return []; + } + + // This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters. + return this.partiallyEvaluateParameters(params) as SqliteJsonValue[][]; + } + + /** + * Creates lookup indices for dynamically-resolved parameters. + * + * Resolving dynamic parameters is a two-step process: First, for tables referenced in subqueries, we create an index + * to resolve which request parameters would match rows in subqueries. Then, when resolving bucket ids for a request, + * we compute subquery results by looking up results in that index. + * + * This implements the first step of that process. + * + * @param result The array into which evaluation results should be written to. + * @param sourceTable A table we depend on in a subquery. + * @param row Row data to index. + */ + pushParameterRowEvaluation(result: EvaluatedParametersResult[], sourceTable: SourceTableInterface, row: SqliteRow) { + for (const subquery of this.subqueries) { + if (subquery.parameterTable.matches(sourceTable)) { + const lookups = subquery.lookupsForParameterRow(sourceTable, row); + if (lookups == null) { + continue; + } + + // The row of the subquery. Since we only support subqueries with a single column, we unconditionally name the + // column `result` for simplicity. + const resultRow = { result: lookups.value }; + + result.push( + ...lookups.lookups.map((l) => ({ + lookup: l, + bucketParameters: [resultRow] + })) + ); + } + } + } + + debugRepresentation(): any { + return { + id: this.id, + parameters: this.parameters.map((p) => ({ + type: p.lookup.type + })), + subqueries: this.subqueries.map((s) => ({ + table: s.parameterTable + })), + additional_row_filters: this.additionalRowFilters.length, + request_filters: this.requestFilters.map((f) => f.type) + }; + } + + /** + * Replaces {@link StreamVariant.parameters} with static values looked up in request parameters. + * + * Dynamic parameters that depend on subquery results are not replaced. + * This returns null if there's a {@link StaticRequestFilter} that doesn't match the request. + */ + private partiallyEvaluateParameters(params: RequestParameters): (SqliteJsonValue[] | BucketParameter)[] | null { + for (const filter of this.requestFilters) { + if (filter.type == 'static' && !filter.matches(params)) { + return null; + } + } + + const instantiation: (SqliteJsonValue[] | BucketParameter)[] = []; + for (const parameter of this.parameters) { + const lookup = parameter.lookup; + if (lookup.type == 'static') { + const values = lookup.fromRequest(params)?.filter(isJsonValue); + if (values.length == 0) { + // Parameter not instantiable for this request. Since parameters in a single variant form a conjunction, that + // means the whole request won't find anything here. + return null; + } + + instantiation.push(values); + } else { + instantiation.push(parameter); + } + } + + return instantiation; + } + + /** + * Builds a bucket id for an instantiation, like `stream|0[1,2,"foo"]`. + * + * @param streamName The name of the stream, included in the bucket id + * @param instantiation An instantiation for all parameters in this variant. + * @returns The generated bucket id + */ + private buildBucketId(streamName: string, instantiation: SqliteJsonValue[]) { + if (instantiation.length != this.parameters.length) { + throw Error('Internal error, instantiation length mismatch'); + } + + return `${streamName}|${this.id}${JSONBucketNameSerialize.stringify(instantiation)}`; + } + + private resolveBucket( + stream: SyncStream, + instantiation: SqliteJsonValue[], + reason: BucketInclusionReason + ): ResolvedBucket { + return { + definition: stream.name, + inclusion_reasons: [reason], + bucket: this.buildBucketId(stream.name, instantiation), + priority: stream.priority + }; + } +} +/** + * A stateless filter condition that only depends on the request itself, e.g. `WHERE token_parameters.is_admin`. + */ +export interface StaticRequestFilter { + type: 'static'; + matches(params: RequestParameters): boolean; +} + +/** + * A filter condition that depends on parameters and an evaluated subquery, e.g. + * `WHERE request.user_id() IN (SELECT id FROM users WHERE ...)`. + */ +export interface SubqueryRequestFilter { + type: 'dynamic'; + subquery: SubqueryEvaluator; + + /** + * Checks whether the parameter matches values from the subquery. + * + * @param results The values that the subquery evaluates to. + */ + matches(params: RequestParameters, results: SqliteJsonValue[]): boolean; +} +export type RequestFilter = StaticRequestFilter | SubqueryRequestFilter; diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index 91b23a26..430ee2b9 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -18,6 +18,10 @@ export interface QueryParseOptions extends SyncRulesOptions { priority?: BucketPriority; } +export interface StreamParseOptions extends QueryParseOptions { + auto_subscribe?: boolean; +} + export interface EvaluatedParameters { lookup: ParameterLookup; @@ -80,16 +84,19 @@ export interface ParameterValueSet { * JSON string of raw request parameters. */ rawUserParameters: string; + userParameters: SqliteJsonRow; /** * For streams, the raw JSON string of stream parameters. */ rawStreamParameters: string | null; + streamParameters: SqliteJsonRow | null; /** * JSON string of raw request parameters. */ rawTokenPayload: string; + tokenParameters: SqliteJsonRow; userId: string; } @@ -103,6 +110,7 @@ export class RequestParameters implements ParameterValueSet { */ rawUserParameters: string; + streamParameters: SqliteJsonRow | null; rawStreamParameters: string | null; /** @@ -112,7 +120,21 @@ export class RequestParameters implements ParameterValueSet { userId: string; - constructor(tokenPayload: RequestJwtPayload, clientParameters: Record) { + constructor(tokenPayload: RequestJwtPayload, clientParameters: Record); + constructor(params: RequestParameters); + + constructor(tokenPayload: RequestJwtPayload | RequestParameters, clientParameters?: Record) { + if (tokenPayload instanceof RequestParameters) { + this.tokenParameters = tokenPayload.tokenParameters; + this.userParameters = tokenPayload.userParameters; + this.rawUserParameters = tokenPayload.rawUserParameters; + this.rawTokenPayload = tokenPayload.rawTokenPayload; + this.streamParameters = tokenPayload.streamParameters; + this.rawStreamParameters = tokenPayload.rawStreamParameters; + this.userId = tokenPayload.userId; + return; + } + // This type is verified when we verify the token const legacyParameters = tokenPayload.parameters as Record | undefined; @@ -127,7 +149,8 @@ export class RequestParameters implements ParameterValueSet { this.rawTokenPayload = JSONBig.stringify(tokenPayload); this.rawUserParameters = JSONBig.stringify(clientParameters); - this.userParameters = toSyncRulesParameters(clientParameters); + this.userParameters = toSyncRulesParameters(clientParameters!); + this.streamParameters = null; this.rawStreamParameters = null; } @@ -136,12 +159,15 @@ export class RequestParameters implements ParameterValueSet { return this.tokenParameters[column]; } else if (table == 'user_parameters') { return this.userParameters[column]; + } else if (table == 'subscription_parameters' && this.streamParameters != null) { + return this.streamParameters[column]; } throw new Error(`Unknown table: ${table}`); } withAddedStreamParameters(params: Record): RequestParameters { - const clone = structuredClone(this); + const clone = new RequestParameters(this); + clone.streamParameters = params; clone.rawStreamParameters = JSONBig.stringify(params); return clone; @@ -249,7 +275,7 @@ export interface EvaluateRowOptions { } /** - * This is a clause that matches row and parameter values. + * This is a clause that matches row and parameter values for equality. * * Example: * [WHERE] users.org_id = bucket.org_id @@ -265,6 +291,9 @@ export interface ParameterMatchClause { * * ['token_parameters.user_id'] for a parameter query * * These parameters are always matched by this clause, and no additional parameters are matched. + * + * For a single match clause, this array will have a single element. When match clauses are combined with `AND`, + * the result is represented as a {@link ParameterMatchClause} with multiple input parameters. */ inputParameters: InputParameter[]; diff --git a/packages/sync-rules/test/matchers.d.ts b/packages/sync-rules/test/matchers.d.ts new file mode 100644 index 00000000..fc152746 --- /dev/null +++ b/packages/sync-rules/test/matchers.d.ts @@ -0,0 +1,10 @@ +import 'vitest'; + +interface CustomMatchers { + toBeSqlRuleError: (message: string, location: string) => R; +} + +declare module 'vitest' { + interface Assertion extends CustomMatchers {} + interface AsymmetricMatchersContaining extends CustomMatchers {} +} diff --git a/packages/sync-rules/test/matchers.ts b/packages/sync-rules/test/matchers.ts new file mode 100644 index 00000000..f613b67e --- /dev/null +++ b/packages/sync-rules/test/matchers.ts @@ -0,0 +1,33 @@ +import { beforeAll, expect } from 'vitest'; +import { SqlRuleError } from '../src/index.js'; + +beforeAll(() => { + expect.extend({ + toBeSqlRuleError(received, expectedMessage, expectedLocation) { + const { isNot } = this; + + const message = () => { + return `expected ${received} ${isNot ? ' not' : ''} to be SQL error with ${expectedMessage} at ${expectedLocation}`; + }; + + if (received instanceof SqlRuleError) { + const actualLocation = + received.location && received.sql.substring(received.location.start, received.location.end); + + return { + pass: received.message == expectedMessage && actualLocation == expectedLocation, + actual: { + message: received.message, + location: actualLocation + }, + message + }; + } else { + return { + pass: false, + message + }; + } + } + }); +}); diff --git a/packages/sync-rules/test/src/streams.test.ts b/packages/sync-rules/test/src/streams.test.ts new file mode 100644 index 00000000..ab70ff42 --- /dev/null +++ b/packages/sync-rules/test/src/streams.test.ts @@ -0,0 +1,692 @@ +/// +import { describe, expect, test } from 'vitest'; +import { + BucketParameterQuerier, + DEFAULT_TAG, + GetBucketParameterQuerierResult, + mergeBucketParameterQueriers, + ParameterLookup, + QuerierError, + SourceTableInterface, + SqliteJsonRow, + SqliteRow, + StaticSchema, + SyncStream, + syncStreamFromSql +} from '../../src/index.js'; +import { normalizeQuerierOptions, PARSE_OPTIONS, TestSourceTable } from './util.js'; + +describe('streams', () => { + test('without filter', () => { + const desc = parseStream('SELECT * FROM comments'); + + expect(desc.variants).toHaveLength(1); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo' })).toStrictEqual(['stream|0[]']); + expect(desc.evaluateRow({ sourceTable: USERS, record: { id: 'foo' } })).toHaveLength(0); + }); + + test('row condition', () => { + const desc = parseStream('SELECT * FROM comments WHERE length(content) > 5'); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'a' })).toStrictEqual([]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'aaaaaa' })).toStrictEqual(['stream|0[]']); + }); + + test('stream parameter', () => { + const desc = parseStream("SELECT * FROM comments WHERE issue_id = subscription.parameter('id')"); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', issue_id: 'a' })).toStrictEqual(['stream|0["a"]']); + }); + + test('row filter and stream parameter', async () => { + const desc = parseStream( + "SELECT * FROM comments WHERE length(content) > 5 AND issue_id = subscription.parameter('id')" + ); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'a', issue_id: 'a' })).toStrictEqual([]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'aaaaaa', issue_id: 'i' })).toStrictEqual([ + 'stream|0["i"]' + ]); + + expect(await queryBucketIds(desc, { parameters: { id: 'subscribed_issue' } })).toStrictEqual([ + 'stream|0["subscribed_issue"]' + ]); + }); + + describe('or', () => { + test('parameter match or request condition', async () => { + const desc = parseStream("SELECT * FROM issues WHERE owner_id = auth.user_id() OR auth.parameter('is_admin')"); + + expect(evaluateBucketIds(desc, ISSUES, { id: 'foo', owner_id: 'u1' })).toStrictEqual([ + 'stream|0["u1"]', + 'stream|1[]' + ]); + + expect( + await queryBucketIds(desc, { + token_parameters: { + user_id: 'u1', + is_admin: false + } + }) + ).toStrictEqual(['stream|0["u1"]']); + + expect( + await queryBucketIds(desc, { + token_parameters: { + user_id: 'u1', + is_admin: true + } + }) + ).toStrictEqual(['stream|0["u1"]', 'stream|1[]']); + }); + + test('parameter match or row condition', async () => { + const desc = parseStream('SELECT * FROM issues WHERE owner_id = auth.user_id() OR LENGTH(name) = 3'); + expect(evaluateBucketIds(desc, ISSUES, { id: 'foo', owner_id: 'a', name: '' })).toStrictEqual(['stream|0["a"]']); + expect(evaluateBucketIds(desc, ISSUES, { id: 'foo', owner_id: 'a', name: 'aaa' })).toStrictEqual([ + 'stream|0["a"]', + 'stream|1[]' + ]); + + expect( + await queryBucketIds(desc, { + token_parameters: { + user_id: 'u1' + } + }) + ).toStrictEqual(['stream|0["u1"]', 'stream|1[]']); + }); + + test('row condition or parameter condition', async () => { + const desc = parseStream("SELECT * FROM comments WHERE LENGTH(content) > 5 OR auth.parameter('is_admin')"); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'short' })).toStrictEqual(['stream|1[]']); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'longer' })).toStrictEqual([ + 'stream|0[]', + 'stream|1[]' + ]); + + expect( + await queryBucketIds(desc, { + token_parameters: { + is_admin: false + } + }) + ).toStrictEqual(['stream|0[]']); + expect( + await queryBucketIds(desc, { + token_parameters: { + is_admin: true + } + }) + ).toStrictEqual(['stream|0[]', 'stream|1[]']); + }); + + test('row condition or row condition', () => { + const desc = parseStream( + 'SELECT * FROM comments WHERE LENGTH(content) > 5 OR json_array_length(tagged_users) > 1' + ); + // Complex conditions that only operate on row data don't need variants. + expect(desc.variants).toHaveLength(1); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'short', tagged_users: '[]' })).toStrictEqual([]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'longer', tagged_users: '[]' })).toStrictEqual([ + 'stream|0[]' + ]); + expect( + evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'longer', tagged_users: '["a","b"]' }) + ).toStrictEqual(['stream|0[]']); + expect( + evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'short', tagged_users: '["a","b"]' }) + ).toStrictEqual(['stream|0[]']); + }); + + test('request condition or request condition', async () => { + const desc = parseStream("SELECT * FROM comments WHERE auth.parameter('a') OR auth.parameters() ->> 'b'"); + // Complex conditions that only operate on request data don't need variants. + expect(desc.variants).toHaveLength(1); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'whatever]' })).toStrictEqual(['stream|0[]']); + expect( + await queryBucketIds(desc, { + token_parameters: { + a: false, + b: false + } + }) + ).toStrictEqual([]); + expect( + await queryBucketIds(desc, { + token_parameters: { + a: true, + b: false + } + }) + ).toStrictEqual(['stream|0[]']); + }); + + test('subquery or token parameter', async () => { + const desc = parseStream( + "SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issues WHERE owner_id = auth.user_id()) OR auth.parameter('is_admin')" + ); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'i1' })).toStrictEqual([ + 'stream|0["i1"]', + 'stream|1[]' + ]); + + expect(desc.evaluateParameterRow(ISSUES, { id: 'i1', owner_id: 'u1' })).toStrictEqual([ + { + lookup: ParameterLookup.normalized('stream', '0', ['u1']), + bucketParameters: [ + { + result: 'i1' + } + ] + } + ]); + + function getParameterSets(lookups: ParameterLookup[]) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['u1'])]); + + return [{ result: 'i1' }]; + } + expect( + await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: false }, getParameterSets }) + ).toStrictEqual(['stream|0["i1"]']); + expect( + await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: true }, getParameterSets }) + ).toStrictEqual(['stream|1[]', 'stream|0["i1"]']); + }); + }); + + describe('in', () => { + test('row value in subquery', async () => { + const desc = parseStream( + 'SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issues WHERE owner_id = auth.user_id())' + ); + + expect(desc.tableSyncsParameters(ISSUES)).toBe(true); + expect(desc.evaluateParameterRow(ISSUES, { id: 'issue_id', owner_id: 'user1', name: 'name' })).toStrictEqual([ + { + lookup: ParameterLookup.normalized('stream', '0', ['user1']), + bucketParameters: [ + { + result: 'issue_id' + } + ] + } + ]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id' })).toStrictEqual([ + 'stream|0["issue_id"]' + ]); + + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'user1' }, + getParameterSets(lookups) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); + + return [{ result: 'issue_id' }]; + } + }) + ).toStrictEqual(['stream|0["issue_id"]']); + }); + + test('parameter value in subquery', async () => { + const desc = parseStream('SELECT * FROM issues WHERE auth.user_id() IN (SELECT id FROM users WHERE is_admin)'); + + expect(desc.tableSyncsParameters(ISSUES)).toBe(false); + expect(desc.tableSyncsParameters(USERS)).toBe(true); + + expect(desc.evaluateParameterRow(USERS, { id: 'u', is_admin: 1n })).toStrictEqual([ + { + lookup: ParameterLookup.normalized('stream', '0', ['u']), + bucketParameters: [ + { + result: 'u' + } + ] + } + ]); + expect(desc.evaluateParameterRow(USERS, { id: 'u', is_admin: 0n })).toStrictEqual([]); + + // Should return bucket id for admin users + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'u' }, + getParameterSets: (lookups: ParameterLookup[]) => { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['u'])]); + return [{ result: 'u' }]; + } + }) + ).toStrictEqual(['stream|0[]']); + + // And not for others + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'u2' }, + getParameterSets: (lookups: ParameterLookup[]) => { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['u2'])]); + return []; + } + }) + ).toStrictEqual([]); + }); + + test('two subqueries', async () => { + const desc = parseStream(`SELECT * FROM users WHERE + id IN (SELECT user_a FROM friends WHERE user_b = auth.user_id()) OR + id IN (SELECT user_b FROM friends WHERE user_a = auth.user_id()) + `); + + expect(evaluateBucketIds(desc, USERS, { id: 'a', name: 'a' })).toStrictEqual(['stream|0["a"]', 'stream|1["a"]']); + + expect(desc.evaluateParameterRow(FRIENDS, { user_a: 'a', user_b: 'b' })).toStrictEqual([ + { + lookup: ParameterLookup.normalized('stream', '0', ['b']), + bucketParameters: [ + { + result: 'a' + } + ] + }, + { + lookup: ParameterLookup.normalized('stream', '1', ['a']), + bucketParameters: [ + { + result: 'b' + } + ] + } + ]); + + function getParameterSets(lookups: ParameterLookup[]) { + expect(lookups).toHaveLength(1); + const [lookup] = lookups; + if (lookup.values[1] == '0') { + expect(lookup).toStrictEqual(ParameterLookup.normalized('stream', '0', ['a'])); + return []; + } else { + expect(lookup).toStrictEqual(ParameterLookup.normalized('stream', '1', ['a'])); + return [{ result: 'b' }]; + } + } + + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'a' }, + getParameterSets + }) + ).toStrictEqual(['stream|1["b"]']); + }); + + test('on parameter data', async () => { + const desc = parseStream("SELECT * FROM comments WHERE issue_id IN (subscription.parameters() -> 'issue_id')"); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i' })).toStrictEqual(['stream|0["i"]']); + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'a' }, + parameters: { issue_id: ['i1', 'i2'] } + }) + ).toStrictEqual(['stream|0["i1"]', 'stream|0["i2"]']); + }); + + test('on parameter data and table', async () => { + const desc = parseStream( + "SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issues WHERE owner_id = auth.user_id()) AND label IN (subscription.parameters() -> 'labels')" + ); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i', label: 'l' })).toStrictEqual([ + 'stream|0["i","l"]' + ]); + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'a' }, + parameters: { labels: ['l1', 'l2'] }, + getParameterSets(lookups) { + expect(lookups).toHaveLength(1); + const [lookup] = lookups; + expect(lookup).toStrictEqual(ParameterLookup.normalized('stream', '0', ['a'])); + return [{ result: 'i1' }, { result: 'i2' }]; + } + }) + ).toStrictEqual(['stream|0["i1","l1"]', 'stream|0["i1","l2"]', 'stream|0["i2","l1"]', 'stream|0["i2","l2"]']); + }); + }); + + describe('overlap', () => { + test('row value in subquery', async () => { + const desc = parseStream( + 'SELECT * FROM comments WHERE tagged_users && (SELECT user_a FROM friends WHERE user_b = auth.user_id())' + ); + + expect(desc.tableSyncsParameters(FRIENDS)).toBe(true); + expect(desc.evaluateParameterRow(FRIENDS, { user_a: 'a', user_b: 'b' })).toStrictEqual([ + { + lookup: ParameterLookup.normalized('stream', '0', ['b']), + bucketParameters: [ + { + result: 'a' + } + ] + } + ]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', tagged_users: '["a", "b"]' })).toStrictEqual([ + 'stream|0["a"]', + 'stream|0["b"]' + ]); + + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'user1' }, + getParameterSets(lookups) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); + + return [{ result: 'issue_id' }]; + } + }) + ).toStrictEqual(['stream|0["issue_id"]']); + }); + }); + + describe('errors', () => { + test('IN operator with static left clause', () => { + const [_, errors] = syncStreamFromSql( + 's', + "SELECT * FROM issues WHERE 'static' IN (SELECT id FROM users WHERE is_admin)", + options + ); + + expect(errors).toMatchObject([ + expect.toBeSqlRuleError( + 'For IN subqueries, the left operand must either depend on the row to sync or stream parameters.', + "'static'" + ) + ]); + }); + + test('negated subquery', () => { + const [_, errors] = syncStreamFromSql( + 's', + 'select * from comments where issue_id not in (select id from issues where owner_id = auth.user_id())', + options + ); + + expect(errors).toMatchObject([ + expect.toBeSqlRuleError( + 'Negations are not allowed here', + 'issue_id not in (select id from issues where owner_id = auth.user_id()' + ) + ]); + }); + + test('negated subquery from outer not operator', () => { + const [_, errors] = syncStreamFromSql( + 's', + 'select * from comments where not (issue_id in (select id from issues where owner_id = auth.user_id()))', + options + ); + + expect(errors).toMatchObject([ + expect.toBeSqlRuleError( + 'Negations are not allowed here', + 'not (issue_id in (select id from issues where owner_id = auth.user_id()' + ) + ]); + }); + + test('subquery with two columns', () => { + const [_, errors] = syncStreamFromSql( + 's', + 'select * from comments where issue_id in (select id, owner_id from issues where owner_id = auth.user_id())', + options + ); + + expect(errors).toMatchObject([ + expect.toBeSqlRuleError( + 'This subquery must return exactly one column', + 'select id, owner_id from issues where owner_id = auth.user_id()' + ) + ]); + }); + + test('legacy request function', () => { + const [_, errors] = syncStreamFromSql('s', 'select * from issues where owner_id = request.user_id()', options); + + expect(errors).toMatchObject([ + expect.toBeSqlRuleError("Function 'request.user_id' is not defined", 'request.user_id()') + ]); + }); + }); + + describe('normalization', () => { + test('double negation', async () => { + const desc = parseStream( + 'select * from comments where NOT (issue_id not in (select id from issues where owner_id = auth.user_id()))' + ); + + expect(desc.evaluateParameterRow(ISSUES, { id: 'issue_id', owner_id: 'user1', name: 'name' })).toStrictEqual([ + { + lookup: ParameterLookup.normalized('stream', '0', ['user1']), + bucketParameters: [ + { + result: 'issue_id' + } + ] + } + ]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id' })).toStrictEqual([ + 'stream|0["issue_id"]' + ]); + + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'user1' }, + getParameterSets(lookups) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); + + return [{ result: 'issue_id' }]; + } + }) + ).toStrictEqual(['stream|0["issue_id"]']); + }); + + test('negated or', () => { + const desc = parseStream( + 'select * from comments where not (length(content) = 5 OR issue_id not in (select id from issues where owner_id = auth.user_id()))' + ); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foo' })).toStrictEqual([ + 'stream|0["issue_id"]' + ]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foooo' })).toStrictEqual([]); + }); + + test('negated and', () => { + const desc = parseStream( + 'select * from comments where not (length(content) = 5 AND issue_id not in (select id from issues where owner_id = auth.user_id()))' + ); + expect(desc.variants).toHaveLength(2); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foo' })).toStrictEqual([ + 'stream|0[]', + 'stream|1["issue_id"]' + ]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foooo' })).toStrictEqual([ + 'stream|1["issue_id"]' + ]); + }); + + test('distribute and', async () => { + const desc = parseStream( + `select * from comments where + (issue_id in (select id from issues where owner_id = auth.user_id()) + OR auth.parameter('is_admin')) + AND + LENGTH(content) > 2 + ` + ); + expect(desc.variants).toHaveLength(2); + + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'a' })).toStrictEqual([]); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'aaa' })).toStrictEqual([ + 'stream|0["issue_id"]', + 'stream|1[]' + ]); + + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'user1' }, + getParameterSets(lookups) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); + + return [{ result: 'issue_id' }]; + } + }) + ).toStrictEqual(['stream|0["issue_id"]']); + expect( + await queryBucketIds(desc, { + token_parameters: { user_id: 'user1', is_admin: true }, + getParameterSets(lookups) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); + + return [{ result: 'issue_id' }]; + } + }) + ).toStrictEqual(['stream|1[]', 'stream|0["issue_id"]']); + }); + }); +}); + +const USERS = new TestSourceTable('users'); +const ISSUES = new TestSourceTable('issues'); +const COMMENTS = new TestSourceTable('comments'); +const FRIENDS = new TestSourceTable('friends'); + +const schema = new StaticSchema([ + { + tag: DEFAULT_TAG, + schemas: [ + { + name: 'test_schema', + tables: [ + { + name: 'users', + columns: [ + { name: 'id', pg_type: 'uuid' }, + { name: 'name', pg_type: 'text' }, + { name: 'is_admin', pg_type: 'bool' } + ] + }, + { + name: 'issues', + columns: [ + { name: 'id', pg_type: 'uuid' }, + { name: 'owner_id', pg_type: 'uuid' }, + { name: 'name', pg_type: 'text' } + ] + }, + { + name: 'comments', + columns: [ + { name: 'id', pg_type: 'uuid' }, + { name: 'issue_id', pg_type: 'uuid' }, + { name: 'content', pg_type: 'text' }, + { name: 'tagged_users', pg_type: 'text' }, + { name: 'label', pg_type: 'text' } + ] + }, + { + name: 'friends', + columns: [ + { name: 'id', pg_type: 'uuid' }, + { name: 'user_a', pg_type: 'uuid' }, + { name: 'user_b', pg_type: 'uuid' } + ] + } + ] + } + ] + } +]); + +const options = { schema: schema, ...PARSE_OPTIONS }; + +function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface, record: SqliteRow) { + return stream.evaluateRow({ sourceTable, record }).map((r) => { + if ('error' in r) { + throw new Error(`Unexpected error evaluating row: ${r.error}`); + } + + return r.bucket; + }); +} + +async function createQueriers( + stream: SyncStream, + options?: { + token_parameters?: Record; + parameters?: Record; + getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[]; + } +): Promise { + const queriers: BucketParameterQuerier[] = []; + const errors: QuerierError[] = []; + const pending = { queriers, errors }; + + stream.pushBucketParameterQueriers( + pending, + normalizeQuerierOptions( + options?.token_parameters ?? {}, + {}, + { stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] } + ) + ); + + return { querier: mergeBucketParameterQueriers(queriers), errors }; +} + +async function queryBucketIds( + stream: SyncStream, + options?: { + token_parameters?: Record; + parameters?: Record; + getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[]; + } +) { + const { querier, errors } = await createQueriers(stream, options); + expect(errors).toHaveLength(0); + + async function getParameterSets(lookups: ParameterLookup[]): Promise { + const provided = options?.getParameterSets; + if (provided) { + return provided(lookups); + } else { + throw 'unexpected dynamic lookup'; + } + } + + const buckets: string[] = []; + + buckets.push(...querier.staticBuckets.map((b) => b.bucket)); + if (querier.hasDynamicBuckets) { + buckets.push( + ...( + await querier.queryDynamicBucketDescriptions({ + getParameterSets + }) + ).map((e) => e.bucket) + ); + } + + return buckets; +} + +function parseStream(sql: string, name = 'stream') { + const [stream, errors] = syncStreamFromSql(name, sql, options); + if (errors.length) { + throw new Error(`Unexpected errors when parsing stream ${sql}: ${errors}`); + } + + return stream; +} diff --git a/packages/sync-rules/test/src/util.ts b/packages/sync-rules/test/src/util.ts index 3c098a9b..42fe8ac2 100644 --- a/packages/sync-rules/test/src/util.ts +++ b/packages/sync-rules/test/src/util.ts @@ -1,6 +1,7 @@ import { DEFAULT_TAG, GetQuerierOptions, + RequestedStream, RequestJwtPayload, RequestParameters, SourceTableInterface, @@ -64,12 +65,13 @@ export function normalizeTokenParameters( export function normalizeQuerierOptions( token_parameters: Record, - user_parameters?: Record + user_parameters?: Record, + streams?: Record ): GetQuerierOptions { const globalParameters = normalizeTokenParameters(token_parameters, user_parameters); return { globalParameters, hasDefaultStreams: true, - streams: {} + streams: streams ?? {} }; } diff --git a/packages/sync-rules/vitest.config.ts b/packages/sync-rules/vitest.config.ts index 727bf774..852cbb00 100644 --- a/packages/sync-rules/vitest.config.ts +++ b/packages/sync-rules/vitest.config.ts @@ -2,5 +2,7 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ plugins: [], - test: {} + test: { + setupFiles: ['test/matchers.ts'] + } });