Skip to content

WIP: New unified syntax #313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: sync-streams
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/gorgeous-adults-confess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-sync-rules': minor
'@powersync/service-image': minor
---

Add support for streams, a new and simpler way to define what data gets synced to clients.
63 changes: 61 additions & 2 deletions packages/sync-rules/src/BaseSqlDataQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,29 @@ import { ColumnDefinition } from './ExpressionType.js';
import { SourceTableInterface } from './SourceTableInterface.js';
import { SqlTools } from './sql_filters.js';
import { TablePattern } from './TablePattern.js';
import { QueryParameters, QuerySchema, SourceSchema, SourceSchemaTable, SqliteJsonRow, SqliteRow } from './types.js';
import {
EvaluationResult,
QueryParameters,
QuerySchema,
SourceSchema,
SourceSchemaTable,
SqliteJsonRow,
SqliteRow
} from './types.js';
import { filterJsonRow } from './utils.js';
import { castAsText } from './sql_functions.js';

export interface RowValueExtractor {
extract(tables: QueryParameters, into: SqliteRow): void;
getTypes(schema: QuerySchema, into: Record<string, ColumnDefinition>): void;
}

export interface EvaluateRowOptions {
table: SourceTableInterface;
row: SqliteRow;
bucketIds: (params: QueryParameters) => string[];
}

export interface BaseSqlDataQueryOptions {
sourceTable: TablePattern;
table: string;
Expand All @@ -21,7 +36,6 @@ export interface BaseSqlDataQueryOptions {
descriptorName: string;
bucketParameters: string[];
tools: SqlTools;

errors?: SqlRuleError[];
}

Expand Down Expand Up @@ -149,6 +163,51 @@ export class BaseSqlDataQuery {
return result;
}

resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>) {
const outTables = this.getColumnOutputs(schema);
for (let table of outTables) {
tables[table.name] ??= {};
for (let column of table.columns) {
if (column.name != 'id') {
tables[table.name][column.name] ??= column;
}
}
}
}

evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] {
try {
const { table, row, bucketIds } = options;

const tables = { [this.table]: this.addSpecialParameters(table, row) };
const resolvedBucketIds = bucketIds(tables);

const data = this.transformRow(tables);
let id = data.id;
if (typeof id != 'string') {
// While an explicit cast would be better, this covers against very common
// issues when initially testing out sync, for example when the id column is an
// auto-incrementing integer.
// If there is no id column, we use a blank id. This will result in the user syncing
// a single arbitrary row for this table - better than just not being able to sync
// anything.
id = castAsText(id) ?? '';
}
const outputTable = this.getOutputName(table.name);

return resolvedBucketIds.map((bucketId) => {
return {
bucket: bucketId,
table: outputTable,
id: id,
data
} as EvaluationResult;
});
} catch (e) {
return [{ error: e.message ?? `Evaluating data query failed` }];
}
}

protected transformRow(tables: QueryParameters): SqliteJsonRow {
let result: SqliteRow = {};
for (let extractor of this.extractors) {
Expand Down
12 changes: 2 additions & 10 deletions packages/sync-rules/src/SqlBucketDescriptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,7 @@ export class SqlBucketDescriptor implements BucketSource {

resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>) {
for (let query of this.dataQueries) {
const outTables = query.getColumnOutputs(schema);
for (let table of outTables) {
tables[table.name] ??= {};
for (let column of table.columns) {
if (column.name != 'id') {
tables[table.name][column.name] ??= column;
}
}
}
query.resolveResultSets(schema, tables);
}
}

Expand All @@ -229,7 +221,7 @@ export class SqlBucketDescriptor implements BucketSource {
let all_data_queries = [...this.dataQueries.values()].flat();
return {
name: this.name,
type: this.type.toString(),
type: BucketSourceType[this.type],
bucket_parameters: this.bucketParameters,
global_parameter_queries: this.globalParameterQueries.map((q) => {
return {
Expand Down
39 changes: 9 additions & 30 deletions packages/sync-rules/src/SqlDataQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,36 +186,15 @@ export class SqlDataQuery extends BaseSqlDataQuery {
}

evaluateRow(table: SourceTableInterface, row: SqliteRow): EvaluationResult[] {
try {
const tables = { [this.table]: this.addSpecialParameters(table, row) };
const bucketParameters = this.filter.filterRow(tables);
const bucketIds = bucketParameters.map((params) =>
getBucketId(this.descriptorName, this.bucketParameters, params)
);

const data = this.transformRow(tables);
let id = data.id;
if (typeof id != 'string') {
// While an explicit cast would be better, this covers against very common
// issues when initially testing out sync, for example when the id column is an
// auto-incrementing integer.
// If there is no id column, we use a blank id. This will result in the user syncing
// a single arbitrary row for this table - better than just not being able to sync
// anything.
id = castAsText(id) ?? '';
const query = this;

return this.evaluateRowWithOptions({
table,
row,
bucketIds(tables) {
const bucketParameters = query.filter.filterRow(tables);
return bucketParameters.map((params) => getBucketId(query.descriptorName, query.bucketParameters, params));
}
const outputTable = this.getOutputName(table.name);

return bucketIds.map((bucketId) => {
return {
bucket: bucketId,
table: outputTable,
id: id,
data
} as EvaluationResult;
});
} catch (e) {
return [{ error: e.message ?? `Evaluating data query failed` }];
}
});
}
}
41 changes: 39 additions & 2 deletions packages/sync-rules/src/SqlSyncRules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import {
SourceSchema,
SqliteJsonRow,
SqliteRow,
StreamParseOptions,
SyncRules
} from './types.js';
import { BucketSource } from './BucketSource.js';
import { SyncStream } from './streams/stream.js';
import { syncStreamFromSql } from './streams/from_sql.js';

const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES');

Expand Down Expand Up @@ -137,6 +140,7 @@ export class SqlSyncRules implements SyncRules {

// Bucket definitions using explicit parameter and data queries.
const bucketMap = parsed.get('bucket_definitions') as YAMLMap;
const streamMap = parsed.get('streams') as YAMLMap | null;
const definitionNames = new Set<string>();
const checkUniqueName = (name: string, literal: Scalar) => {
if (definitionNames.has(name)) {
Expand All @@ -148,8 +152,8 @@ export class SqlSyncRules implements SyncRules {
return true;
};

if (bucketMap == null) {
rules.errors.push(new YamlError(new Error(`'bucket_definitions' is required`)));
if (bucketMap == null && streamMap == null) {
rules.errors.push(new YamlError(new Error(`'bucket_definitions' or 'streams' is required`)));

if (throwOnError) {
rules.throwOnError();
Expand Down Expand Up @@ -209,6 +213,39 @@ export class SqlSyncRules implements SyncRules {
rules.bucketSources.push(descriptor);
}

for (const entry of streamMap?.items ?? []) {
const { key: keyScalar, value } = entry as { key: Scalar; value: YAMLMap };
const key = keyScalar.toString();
if (!checkUniqueName(key, keyScalar)) {
continue;
}

const accept_potentially_dangerous_queries =
value.get('accept_potentially_dangerous_queries', true)?.value == true;

const queryOptions: StreamParseOptions = {
...options,
accept_potentially_dangerous_queries,
priority: rules.parsePriority(value),
auto_subscribe: value.get('auto_subscribe', true)?.value == true
};

const data = value.get('query', true) as unknown;
if (data instanceof Scalar) {
rules.withScalar(data, (q) => {
const [parsed, errors] = syncStreamFromSql(key, q, queryOptions);
rules.bucketSources.push(parsed);
return {
parsed: true,
errors
};
});
} else {
rules.errors.push(this.tokenError(data, 'Must be a string.'));
continue;
}
}

const eventMap = parsed.get('event_definitions') as YAMLMap;
for (const event of eventMap?.items ?? []) {
const { key, value } = event as { key: Scalar; value: YAMLSeq };
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-rules/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ export * from './schema-generators/schema-generators.js';
export * from './SourceTableInterface.js';
export * from './sql_filters.js';
export * from './sql_functions.js';
export { SqlBucketDescriptor } from './SqlBucketDescriptor.js';
export * from './SqlDataQuery.js';
export * from './SqlParameterQuery.js';
export * from './SqlSyncRules.js';
export * from './StaticSchema.js';
export { SyncStream } from './streams/stream.js';
export { syncStreamFromSql } from './streams/from_sql.js';
export * from './TablePattern.js';
export * from './types.js';
export * from './utils.js';
32 changes: 31 additions & 1 deletion packages/sync-rules/src/json_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,36 @@ export const syncRulesSchema: ajvModule.Schema = {
}
}
},
streams: {
type: 'object',
description: 'List of stream definitions',
examples: [{ user_details: { query: 'select * from users where id = auth.user_id()' } }],
patternProperties: {
'.*': {
type: 'object',
required: ['query'],
examples: [{ query: ['select * from mytable'] }],
properties: {
accept_potentially_dangerous_queries: {
description: 'If true, disables warnings on potentially dangerous queries',
type: 'boolean'
},
auto_subscribe: {
description: 'Whether clients will subscribe to this stream by default.',
type: 'boolean'
},
priority: {
description: 'Priority for the bucket (lower values indicate higher priority).',
type: 'integer'
},
query: {
description: 'The SQL query defining content to sync in this stream.',
type: 'string'
}
}
}
}
},
event_definitions: {
type: 'object',
description: 'Record of sync replication event definitions',
Expand Down Expand Up @@ -79,7 +109,7 @@ export const syncRulesSchema: ajvModule.Schema = {
}
}
},
required: ['bucket_definitions'],
required: [],
additionalProperties: false
} as const;

Expand Down
Loading
Loading