diff --git a/packages/firestore/.idea/runConfigurations/Integration_Tests__Emulator_w__Mock_Persistence_.xml b/packages/firestore/.idea/runConfigurations/Integration_Tests__Emulator_w__Mock_Persistence_.xml
deleted file mode 100644
index 6ee36820d2e..00000000000
--- a/packages/firestore/.idea/runConfigurations/Integration_Tests__Emulator_w__Mock_Persistence_.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-
-
- project
-
- $PROJECT_DIR$/../../node_modules/mocha
- $PROJECT_DIR$
- true
-
-
-
-
-
- bdd
- --require babel-register.js --require test/register.ts --require test/util/node_persistence.ts --timeout 5000
- PATTERN
- test/integration/{,!(browser|lite)/**/}*.test.ts
-
-
-
diff --git a/packages/firestore/src/api/database.ts b/packages/firestore/src/api/database.ts
index 812811f41ed..04bfda5ed2c 100644
--- a/packages/firestore/src/api/database.ts
+++ b/packages/firestore/src/api/database.ts
@@ -46,7 +46,7 @@ import {
connectFirestoreEmulator,
Firestore as LiteFirestore
} from '../lite-api/database';
-import { PipelineSource } from '../lite-api/pipeline-source';
+import { PipelineSource } from './pipeline_source';
import { DocumentReference, Query } from '../lite-api/reference';
import { newUserDataReader } from '../lite-api/user_data_reader';
import {
diff --git a/packages/firestore/src/api/pipeline.ts b/packages/firestore/src/api/pipeline.ts
index 047731b40e5..1b7e1c609e8 100644
--- a/packages/firestore/src/api/pipeline.ts
+++ b/packages/firestore/src/api/pipeline.ts
@@ -1,13 +1,22 @@
-import { firestoreClientExecutePipeline } from '../core/firestore_client';
+import {
+ firestoreClientExecutePipeline,
+ firestoreClientListenPipeline
+} from '../core/firestore_client';
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { PipelineResult } from '../lite-api/pipeline-result';
import { DocumentData, DocumentReference } from '../lite-api/reference';
-import { Stage } from '../lite-api/stage';
+import { AddFields, Sort, Stage, Where } from '../lite-api/stage';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
import { DocumentKey } from '../model/document_key';
import { ensureFirestoreConfigured, Firestore } from './database';
+import { DocumentSnapshot, PipelineSnapshot } from './snapshot';
+import { FirestoreError } from '../util/error';
+import { Unsubscribe } from './reference_impl';
+import { cast } from '../util/input_validation';
+import { Field, FilterCondition } from '../api';
+import { Expr } from '../lite-api/expressions';
export class Pipeline<
AppModelType = DocumentData
@@ -23,7 +32,7 @@ export class Pipeline<
* @param converter
*/
constructor(
- private db: Firestore,
+ readonly db: Firestore,
userDataReader: UserDataReader,
userDataWriter: AbstractUserDataWriter,
documentReferenceFactory: (id: DocumentKey) => DocumentReference,
@@ -42,6 +51,20 @@ export class Pipeline<
);
}
+ where(condition: FilterCondition & Expr): Pipeline {
+ const copy = this.stages.map(s => s);
+ super.readUserData('where', condition);
+ copy.push(new Where(condition));
+ return new Pipeline(
+ this.db,
+ this.userDataReader,
+ this.userDataWriter,
+ this.documentReferenceFactory,
+ copy,
+ this.converter
+ );
+ }
+
/**
* Executes this pipeline and returns a Promise to represent the asynchronous operation.
*
@@ -94,4 +117,31 @@ export class Pipeline<
return docs;
});
}
+
+ /**
+ * @internal
+ * @private
+ */
+ _onSnapshot(
+ next: (snapshot: PipelineSnapshot) => void,
+ error?: (error: FirestoreError) => void,
+ complete?: () => void
+ ): Unsubscribe {
+ // this.stages.push(
+ // new AddFields(
+ // this.selectablesToMap([
+ // '__name__',
+ // '__create_time__',
+ // '__update_time__'
+ // ])
+ // )
+ // );
+
+ this.stages.push(new Sort([Field.of('__name__').ascending()]));
+
+ const client = ensureFirestoreConfigured(this.db);
+ firestoreClientListenPipeline(client, this, { next, error, complete });
+
+ return () => {};
+ }
}
diff --git a/packages/firestore/src/api/pipeline_source.ts b/packages/firestore/src/api/pipeline_source.ts
new file mode 100644
index 00000000000..915564767e4
--- /dev/null
+++ b/packages/firestore/src/api/pipeline_source.ts
@@ -0,0 +1,91 @@
+// Copyright 2024 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+import { DocumentKey } from '../model/document_key';
+
+import { Firestore } from './database';
+import { Pipeline } from './pipeline';
+import { DocumentReference } from './reference';
+import {
+ CollectionGroupSource,
+ CollectionSource,
+ DatabaseSource,
+ DocumentsSource
+} from '../lite-api/stage';
+import { PipelineSource as LitePipelineSource } from '../lite-api/pipeline-source';
+import { UserDataReader } from '../lite-api/user_data_reader';
+import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
+
+/**
+ * Represents the source of a Firestore {@link Pipeline}.
+ * @beta
+ */
+export class PipelineSource extends LitePipelineSource {
+ /**
+ * @internal
+ * @private
+ * @param db
+ * @param userDataReader
+ * @param userDataWriter
+ * @param documentReferenceFactory
+ */
+ constructor(
+ db: Firestore,
+ userDataReader: UserDataReader,
+ userDataWriter: AbstractUserDataWriter,
+ documentReferenceFactory: (id: DocumentKey) => DocumentReference
+ ) {
+ super(db, userDataReader, userDataWriter, documentReferenceFactory);
+ }
+
+ collection(collectionPath: string): Pipeline {
+ return new Pipeline(
+ this.db as Firestore,
+ this.userDataReader,
+ this.userDataWriter,
+ this.documentReferenceFactory,
+ [new CollectionSource(collectionPath)]
+ );
+ }
+
+ collectionGroup(collectionId: string): Pipeline {
+ return new Pipeline(
+ this.db as Firestore,
+ this.userDataReader,
+ this.userDataWriter,
+ this.documentReferenceFactory,
+ [new CollectionGroupSource(collectionId)]
+ );
+ }
+
+ database(): Pipeline {
+ return new Pipeline(
+ this.db as Firestore,
+ this.userDataReader,
+ this.userDataWriter,
+ this.documentReferenceFactory,
+ [new DatabaseSource()]
+ );
+ }
+
+ documents(docs: DocumentReference[]): Pipeline {
+ return new Pipeline(
+ this.db as Firestore,
+ this.userDataReader,
+ this.userDataWriter,
+ this.documentReferenceFactory,
+ [DocumentsSource.of(docs)]
+ );
+ }
+}
diff --git a/packages/firestore/src/api/snapshot.ts b/packages/firestore/src/api/snapshot.ts
index 29e1616b61c..0489572317c 100644
--- a/packages/firestore/src/api/snapshot.ts
+++ b/packages/firestore/src/api/snapshot.ts
@@ -40,6 +40,8 @@ import { Code, FirestoreError } from '../util/error';
import { Firestore } from './database';
import { SnapshotListenOptions } from './reference_impl';
+import { Pipeline } from './pipeline';
+import { PipelineResult } from '../lite-api/pipeline-result';
/**
* Converter used by `withConverter()` to transform user objects of type
@@ -790,3 +792,24 @@ export function snapshotEqual(
return false;
}
+
+export class PipelineSnapshot {
+ /**
+ * The query on which you called `get` or `onSnapshot` in order to get this
+ * `QuerySnapshot`.
+ */
+ readonly pipeline: Pipeline;
+
+ /** @hideconstructor */
+ constructor(
+ pipeline: Pipeline,
+ readonly _snapshot: PipelineResult[]
+ ) {
+ this.pipeline = pipeline;
+ }
+
+ /** An array of all the documents in the `QuerySnapshot`. */
+ get results(): Array> {
+ return this._snapshot;
+ }
+}
diff --git a/packages/firestore/src/core/event_manager.ts b/packages/firestore/src/core/event_manager.ts
index 72d801f3934..c7af2425114 100644
--- a/packages/firestore/src/core/event_manager.ts
+++ b/packages/firestore/src/core/event_manager.ts
@@ -24,6 +24,9 @@ import { ObjectMap } from '../util/obj_map';
import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query';
import { OnlineState } from './types';
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
+import { Pipeline } from '../api/pipeline';
+import { PipelineSnapshot } from '../api/snapshot';
+import { PipelineResultView } from './sync_engine_impl';
/**
* Holds the listeners and the last received ViewSnapshot for a query being
@@ -64,6 +67,8 @@ export interface EventManager {
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise;
onFirstRemoteStoreListen?: (query: Query) => Promise;
onLastRemoteStoreUnlisten?: (query: Query) => Promise;
+ // TODO(pipeline): consolidate query and pipeline
+ onListenPipeline?: (pipeline: PipelineListener) => Promise;
terminate(): void;
}
@@ -85,6 +90,7 @@ export class EventManagerImpl implements EventManager {
) => Promise;
/** Callback invoked once all listeners to a Query are removed. */
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise;
+ onListenPipeline?: (pipeline: PipelineListener) => Promise;
/**
* Callback invoked when a Query starts listening to the remote store, while
@@ -123,6 +129,7 @@ function validateEventManager(eventManagerImpl: EventManagerImpl): void {
!!eventManagerImpl.onLastRemoteStoreUnlisten,
'onLastRemoteStoreUnlisten not set'
);
+ debugAssert(!!eventManagerImpl.onListenPipeline, 'onListenPipeline not set');
}
const enum ListenerSetupAction {
@@ -213,6 +220,25 @@ export async function eventManagerListen(
}
}
+export async function eventManagerListenPipeline(
+ eventManager: EventManager,
+ listener: PipelineListener
+): Promise {
+ const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
+ validateEventManager(eventManagerImpl);
+
+ try {
+ await eventManagerImpl.onListenPipeline!(listener);
+ } catch (e) {
+ const firestoreError = wrapInUserErrorIfRecoverable(
+ e as Error,
+ `Initialization of query '${listener.pipeline}' failed`
+ );
+ listener.onError(firestoreError);
+ return;
+ }
+}
+
export async function eventManagerUnlisten(
eventManager: EventManager,
listener: QueryListener
@@ -286,6 +312,13 @@ export function eventManagerOnWatchChange(
}
}
+export function eventManagerOnPipelineWatchChange(
+ eventManager: EventManager,
+ viewSnaps: PipelineResultView[]
+): void {
+ const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
+}
+
export function eventManagerOnWatchError(
eventManager: EventManager,
query: Query,
@@ -567,3 +600,22 @@ export class QueryListener {
return this.options.source !== ListenerDataSource.Cache;
}
}
+
+export class PipelineListener {
+ private view: PipelineResultView | null = null;
+
+ constructor(
+ readonly pipeline: Pipeline,
+ private queryObserver: Observer
+ ) {}
+
+ onViewSnapshot(view: PipelineResultView): boolean {
+ this.view = view;
+ this.queryObserver.next(view.toPipelineSnapshot());
+ return true;
+ }
+
+ onError(error: FirestoreError): void {
+ this.queryObserver.error(error);
+ }
+}
diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts
index 57aa99869da..6b09a4c92c7 100644
--- a/packages/firestore/src/core/firestore_client.ts
+++ b/packages/firestore/src/core/firestore_client.ts
@@ -23,7 +23,8 @@ import {
CredentialsProvider
} from '../api/credentials';
import { User } from '../auth/user';
-import { Pipeline } from '../lite-api/pipeline';
+import { Pipeline as LitePipeline } from '../lite-api/pipeline';
+import { Pipeline } from '../api/pipeline';
import { LocalStore } from '../local/local_store';
import {
localStoreConfigureFieldIndexes,
@@ -79,9 +80,11 @@ import {
addSnapshotsInSyncListener,
EventManager,
eventManagerListen,
+ eventManagerListenPipeline,
eventManagerUnlisten,
ListenOptions,
Observer,
+ PipelineListener,
QueryListener,
removeSnapshotsInSyncListener
} from './event_manager';
@@ -89,6 +92,7 @@ import { newQueryForPath, Query } from './query';
import { SyncEngine } from './sync_engine';
import {
syncEngineListen,
+ syncEngineListenPipeline,
syncEngineLoadBundle,
syncEngineRegisterPendingWritesCallback,
syncEngineUnlisten,
@@ -101,6 +105,8 @@ import { TransactionOptions } from './transaction_options';
import { TransactionRunner } from './transaction_runner';
import { View } from './view';
import { ViewSnapshot } from './view_snapshot';
+import { Unsubscribe } from '../api/reference_impl';
+import { PipelineSnapshot } from '../api/snapshot';
const LOG_TAG = 'FirestoreClient';
export const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
@@ -404,6 +410,10 @@ export async function getEventManager(
null,
onlineComponentProvider.syncEngine
);
+ eventManager.onListenPipeline = syncEngineListenPipeline.bind(
+ null,
+ onlineComponentProvider.syncEngine
+ );
return eventManager;
}
@@ -556,7 +566,7 @@ export function firestoreClientRunAggregateQuery(
export function firestoreClientExecutePipeline(
client: FirestoreClient,
- pipeline: Pipeline
+ pipeline: LitePipeline
): Promise {
const deferred = new Deferred();
@@ -571,6 +581,27 @@ export function firestoreClientExecutePipeline(
return deferred.promise;
}
+export function firestoreClientListenPipeline(
+ client: FirestoreClient,
+ pipeline: Pipeline,
+ observer: {
+ next?: (snapshot: PipelineSnapshot) => void;
+ error?: (error: FirestoreError) => void;
+ complete?: () => void;
+ }
+): Unsubscribe {
+ const wrappedObserver = new AsyncObserver(observer);
+ const listener = new PipelineListener(pipeline, wrappedObserver);
+ client.asyncQueue.enqueueAndForget(async () => {
+ const eventManager = await getEventManager(client);
+ return eventManagerListenPipeline(eventManager, listener);
+ });
+ return () => {
+ wrappedObserver.mute();
+ // TODO(pipeline): actually unlisten
+ };
+}
+
export function firestoreClientWrite(
client: FirestoreClient,
mutations: Mutation[]
diff --git a/packages/firestore/src/core/query.ts b/packages/firestore/src/core/query.ts
index b13296ad7ee..87e7e6ce5a6 100644
--- a/packages/firestore/src/core/query.ts
+++ b/packages/firestore/src/core/query.ts
@@ -35,6 +35,7 @@ import {
Target,
targetEquals
} from './target';
+import { Pipeline } from '../api/pipeline';
export const enum LimitType {
First = 'F',
diff --git a/packages/firestore/src/core/sync_engine_impl.ts b/packages/firestore/src/core/sync_engine_impl.ts
index f96cbea0f00..0bf4558a2a3 100644
--- a/packages/firestore/src/core/sync_engine_impl.ts
+++ b/packages/firestore/src/core/sync_engine_impl.ts
@@ -45,7 +45,8 @@ import { TargetData, TargetPurpose } from '../local/target_data';
import {
DocumentKeySet,
documentKeySet,
- DocumentMap
+ DocumentMap,
+ mutableDocumentMap
} from '../model/collections';
import { MutableDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
@@ -81,8 +82,10 @@ import {
import {
EventManager,
eventManagerOnOnlineStateChange,
+ eventManagerOnPipelineWatchChange,
eventManagerOnWatchChange,
- eventManagerOnWatchError
+ eventManagerOnWatchError,
+ PipelineListener
} from './event_manager';
import { ListenSequence } from './listen_sequence';
import {
@@ -115,6 +118,10 @@ import {
ViewChange
} from './view';
import { ViewSnapshot } from './view_snapshot';
+import { Pipeline } from '../api/pipeline';
+import { PipelineSnapshot } from '../api/snapshot';
+import { PipelineResult } from '../lite-api/pipeline-result';
+import { doc } from '../lite-api/reference';
const LOG_TAG = 'SyncEngine';
@@ -143,6 +150,76 @@ class QueryView {
) {}
}
+export class PipelineResultView {
+ private keyToIndexMap: ObjectMap;
+ constructor(public pipeline: Pipeline, public view: Array) {
+ this.keyToIndexMap = new ObjectMap(
+ key => key.toString(),
+ (a, b) => a.isEqual(b)
+ );
+ this.buildKeyToIndexMap();
+ }
+
+ private buildKeyToIndexMap(): void {
+ this.view.forEach((doc, index) => {
+ this.keyToIndexMap.set(doc.key, index);
+ });
+ }
+
+ addResult(key: DocumentKey, doc: MutableDocument) {
+ if (this.keyToIndexMap.has(key)) {
+ throw new Error(`Result with key ${key} already exists.`);
+ }
+ this.view.push(doc);
+ this.keyToIndexMap.set(key, this.view.length - 1);
+ }
+
+ removeResult(key: DocumentKey) {
+ const index = this.keyToIndexMap.get(key);
+ if (index === undefined) {
+ return; // Result not found, nothing to remove
+ }
+
+ // Remove from the array efficiently by swapping with the last element and popping
+ const lastIndex = this.view.length - 1;
+ if (index !== lastIndex) {
+ [this.view[index], this.view[lastIndex]] = [
+ this.view[lastIndex],
+ this.view[index]
+ ];
+ // Update the keyToIndexMap for the swapped element
+ this.keyToIndexMap.set(this.view[index].key, index);
+ }
+ this.view.pop();
+ this.keyToIndexMap.delete(key);
+ }
+
+ updateResult(key: DocumentKey, doc: MutableDocument) {
+ const index = this.keyToIndexMap.get(key);
+ if (index === undefined) {
+ throw new Error(`Result with key ${key} not found.`);
+ }
+ this.view[index] = doc;
+ }
+
+ toPipelineSnapshot(): PipelineSnapshot {
+ return new PipelineSnapshot(
+ this.pipeline,
+ this.view.map(
+ d =>
+ new PipelineResult(
+ this.pipeline.userDataWriter,
+ doc(this.pipeline.db, d.key.toString()),
+ d.data,
+ d.readTime.toTimestamp(),
+ d.createTime.toTimestamp(),
+ d.version.toTimestamp()
+ )
+ )
+ );
+ }
+}
+
/** Tracks a limbo resolution. */
class LimboResolution {
constructor(public key: DocumentKey) {}
@@ -208,6 +285,9 @@ class SyncEngineImpl implements SyncEngine {
queryEquals
);
queriesByTarget = new Map();
+ // TODO(pipeline): below is a hack for the lack of canonical id for pipelines
+ pipelineByTarget = new Map();
+ pipelineViewByTarget = new Map();
/**
* The keys of documents that are in limbo for which we haven't yet started a
* limbo resolution query. The strings in this set are the result of calling
@@ -285,6 +365,24 @@ export function newSyncEngine(
return syncEngine;
}
+export async function syncEngineListenPipeline(
+ syncEngine: SyncEngine,
+ pipeline: PipelineListener
+): Promise {
+ const syncEngineImpl = ensureWatchCallbacks(syncEngine);
+ const targetData = await localStoreAllocateTarget(
+ syncEngineImpl.localStore,
+ pipeline.pipeline
+ );
+ syncEngineImpl.pipelineByTarget.set(targetData.targetId, pipeline);
+ syncEngineImpl.pipelineViewByTarget.set(
+ targetData.targetId,
+ new PipelineResultView(pipeline.pipeline, [])
+ );
+
+ remoteStoreListen(syncEngineImpl.remoteStore, targetData);
+}
+
/**
* Initiates the new listen, resolves promise when listen enqueued to the
* server. All the subsequent view snapshots or errors are sent to the
@@ -708,6 +806,7 @@ export async function syncEngineRejectListen(
primitiveComparator
),
documentUpdates,
+ mutableDocumentMap(),
resolvedLimboDocuments
);
@@ -921,6 +1020,14 @@ function removeAndCleanupTarget(
): void {
syncEngineImpl.sharedClientState.removeLocalQueryTarget(targetId);
+ // TODO(pipeline): REMOVE this hack.
+ if (
+ !syncEngineImpl.queriesByTarget.has(targetId) ||
+ syncEngineImpl.queriesByTarget.get(targetId)!.length !== 0
+ ) {
+ return;
+ }
+
debugAssert(
syncEngineImpl.queriesByTarget.has(targetId) &&
syncEngineImpl.queriesByTarget.get(targetId)!.length !== 0,
@@ -1079,11 +1186,34 @@ export async function syncEngineEmitNewSnapsAndNotifyLocalStore(
const docChangesInAllViews: LocalViewChanges[] = [];
const queriesProcessed: Array> = [];
- if (syncEngineImpl.queryViewsByQuery.isEmpty()) {
+ if (
+ syncEngineImpl.queryViewsByQuery.isEmpty() &&
+ syncEngineImpl.pipelineViewByTarget.size === 0
+ ) {
// Return early since `onWatchChange()` might not have been assigned yet.
return;
}
+ syncEngineImpl.pipelineViewByTarget.forEach((results, targetId) => {
+ const change = remoteEvent?.targetChanges.get(targetId);
+ if (!!change) {
+ change.modifiedDocuments.forEach(key => {
+ results.updateResult(
+ key,
+ remoteEvent?.augmentedDocumentUpdates.get(key)!
+ );
+ });
+ change.addedDocuments.forEach(key => {
+ results.addResult(key, remoteEvent?.augmentedDocumentUpdates.get(key)!);
+ });
+ change.removedDocuments.forEach(key => {
+ results.removeResult(key);
+ });
+
+ syncEngineImpl.pipelineByTarget.get(targetId)?.onViewSnapshot(results);
+ }
+ });
+
syncEngineImpl.queryViewsByQuery.forEach((_, queryView) => {
debugAssert(
!!syncEngineImpl.applyDocChanges,
@@ -1216,10 +1346,11 @@ export function syncEngineGetRemoteKeysForTarget(
} else {
let keySet = documentKeySet();
const queries = syncEngineImpl.queriesByTarget.get(targetId);
- if (!queries) {
+ const pipelineView = syncEngineImpl.pipelineViewByTarget.get(targetId);
+ if (!queries && !pipelineView) {
return keySet;
}
- for (const query of queries) {
+ for (const query of queries ?? []) {
const queryView = syncEngineImpl.queryViewsByQuery.get(query);
debugAssert(
!!queryView,
@@ -1227,6 +1358,9 @@ export function syncEngineGetRemoteKeysForTarget(
);
keySet = keySet.unionWith(queryView.view.syncedDocuments);
}
+ for (const doc of pipelineView?.view ?? []) {
+ keySet = keySet.add(doc.key);
+ }
return keySet;
}
}
diff --git a/packages/firestore/src/core/target.ts b/packages/firestore/src/core/target.ts
index 4b12857fc2a..8ad61ad01a0 100644
--- a/packages/firestore/src/core/target.ts
+++ b/packages/firestore/src/core/target.ts
@@ -52,6 +52,7 @@ import {
orderByEquals,
stringifyOrderBy
} from './order_by';
+import { Pipeline } from '../api/pipeline';
/**
* A Target represents the WatchTarget representation of a Query, which is used
@@ -215,6 +216,12 @@ export function targetEquals(left: Target, right: Target): boolean {
return boundEquals(left.endAt, right.endAt);
}
+export function targetIsPipelineTarget(
+ target: Target | Pipeline
+): target is Pipeline {
+ return target instanceof Pipeline;
+}
+
export function targetIsDocumentTarget(target: Target): boolean {
return (
DocumentKey.isDocumentKey(target.path) &&
diff --git a/packages/firestore/src/lite-api/expressions.ts b/packages/firestore/src/lite-api/expressions.ts
index d12c17bdeda..8db799784b5 100644
--- a/packages/firestore/src/lite-api/expressions.ts
+++ b/packages/firestore/src/lite-api/expressions.ts
@@ -2336,7 +2336,7 @@ export class Mod extends FirestoreFunction {
*/
export class Eq extends FirestoreFunction implements FilterCondition {
constructor(private left: Expr, private right: Expr) {
- super('eq', [left, right]);
+ super('equals', [left, right]);
}
filterable = true as const;
}
diff --git a/packages/firestore/src/lite-api/pipeline-source.ts b/packages/firestore/src/lite-api/pipeline-source.ts
index 4b913e26ce7..b3069ec3319 100644
--- a/packages/firestore/src/lite-api/pipeline-source.ts
+++ b/packages/firestore/src/lite-api/pipeline-source.ts
@@ -40,10 +40,10 @@ export class PipelineSource {
* @param documentReferenceFactory
*/
constructor(
- private db: Firestore,
- private userDataReader: UserDataReader,
- private userDataWriter: AbstractUserDataWriter,
- private documentReferenceFactory: (id: DocumentKey) => DocumentReference
+ protected db: Firestore,
+ protected userDataReader: UserDataReader,
+ protected userDataWriter: AbstractUserDataWriter,
+ protected documentReferenceFactory: (id: DocumentKey) => DocumentReference
) {}
collection(collectionPath: string): Pipeline {
diff --git a/packages/firestore/src/lite-api/pipeline.ts b/packages/firestore/src/lite-api/pipeline.ts
index 4998692f0d0..1ab8800738b 100644
--- a/packages/firestore/src/lite-api/pipeline.ts
+++ b/packages/firestore/src/lite-api/pipeline.ts
@@ -24,7 +24,8 @@ import {
import { invokeExecutePipeline } from '../remote/datastore';
import {
getEncodedDatabaseId,
- JsonProtoSerializer, ProtoSerializable
+ JsonProtoSerializer,
+ ProtoSerializable
} from '../remote/serializer';
import { getDatastore } from './components';
@@ -117,7 +118,9 @@ function isReadableUserData(value: any): value is ReadableUserData {
/**
* Base-class implementation
*/
-export class Pipeline implements ProtoSerializable{
+export class Pipeline
+ implements ProtoSerializable
+{
/**
* @internal
* @private
@@ -130,21 +133,21 @@ export class Pipeline implements ProtoSerializable<
*/
constructor(
private liteDb: Firestore,
- private userDataReader: UserDataReader,
+ protected userDataReader: UserDataReader,
/**
* @internal
* @private
*/
- protected userDataWriter: AbstractUserDataWriter,
+ readonly userDataWriter: AbstractUserDataWriter,
/**
* @internal
* @private
*/
protected documentReferenceFactory: (id: DocumentKey) => DocumentReference,
- private stages: Stage[],
+ protected stages: Stage[],
// TODO(pipeline) support converter
//private converter: FirestorePipelineConverter = defaultPipelineConverter()
- private converter: unknown = {}
+ protected converter: unknown = {}
) {}
/**
@@ -236,7 +239,7 @@ export class Pipeline implements ProtoSerializable<
);
}
- private selectablesToMap(
+ protected selectablesToMap(
selectables: Array
): Map {
const result = new Map();
@@ -265,7 +268,7 @@ export class Pipeline implements ProtoSerializable<
* @return the expressionMap argument.
* @private
*/
- private readUserData<
+ protected readUserData<
T extends
| Map
| ReadableUserData[]
@@ -812,13 +815,31 @@ export class Pipeline implements ProtoSerializable<
* @private
*/
_toProto(jsonProtoSerializer: JsonProtoSerializer): ExecutePipelineRequest {
- const stages: ProtoStage[] = this.stages.map(stage =>
- stage._toProto(jsonProtoSerializer)
- );
- const structuredPipeline: StructuredPipeline = { pipeline: { stages } };
return {
database: getEncodedDatabaseId(jsonProtoSerializer),
- structuredPipeline
+ structuredPipeline: this._toStructuredPipeline(jsonProtoSerializer)
};
}
+
+ /**
+ * @internal
+ * @private
+ */
+ _toStructuredPipeline(
+ jsonProtoSerializer: JsonProtoSerializer
+ ): StructuredPipeline {
+ const stages: ProtoStage[] = this.stages.map(stage =>
+ stage._toProto(jsonProtoSerializer)
+ );
+ return { pipeline: { stages } };
+ }
+
+ /**
+ * @internal
+ * @private
+ */
+ // TODO(pipeline): do better than this
+ _toCanonicalId(jsonProtoSerializer: JsonProtoSerializer): String {
+ return JSON.stringify(this._toStructuredPipeline(jsonProtoSerializer));
+ }
}
diff --git a/packages/firestore/src/lite-api/stage.ts b/packages/firestore/src/lite-api/stage.ts
index 0a9ce8b7e35..9f4c7735f03 100644
--- a/packages/firestore/src/lite-api/stage.ts
+++ b/packages/firestore/src/lite-api/stage.ts
@@ -19,7 +19,8 @@ import {
} from '../protos/firestore_proto_api';
import { toNumber } from '../remote/number_serializer';
import {
- JsonProtoSerializer, ProtoSerializable,
+ JsonProtoSerializer,
+ ProtoSerializable,
toMapValue,
toStringValue
} from '../remote/serializer';
@@ -37,7 +38,7 @@ import { VectorValue } from './vector_value';
/**
* @beta
*/
-export interface Stage extends ProtoSerializable{
+export interface Stage extends ProtoSerializable {
name: string;
}
diff --git a/packages/firestore/src/local/indexeddb_schema.ts b/packages/firestore/src/local/indexeddb_schema.ts
index 0395756ab96..3c607a836f1 100644
--- a/packages/firestore/src/local/indexeddb_schema.ts
+++ b/packages/firestore/src/local/indexeddb_schema.ts
@@ -22,6 +22,7 @@ import {
Document as ProtoDocument,
DocumentsTarget as ProtoDocumentsTarget,
QueryTarget as ProtoQueryTarget,
+ PipelineQueryTarget as ProtoPipelineQueryTarget,
Write as ProtoWrite
} from '../protos/firestore_proto_api';
@@ -253,7 +254,10 @@ export interface DbRemoteDocumentGlobal {
* IndexedDb. We use the proto definitions for these two kinds of queries in
* order to avoid writing extra serialization logic.
*/
-export type DbQuery = ProtoQueryTarget | ProtoDocumentsTarget;
+export type DbQuery =
+ | ProtoQueryTarget
+ | ProtoDocumentsTarget
+ | ProtoPipelineQueryTarget;
/**
* An object to be stored in the 'targets' store in IndexedDb.
diff --git a/packages/firestore/src/local/indexeddb_target_cache.ts b/packages/firestore/src/local/indexeddb_target_cache.ts
index 9e93cc68838..770d10c2a83 100644
--- a/packages/firestore/src/local/indexeddb_target_cache.ts
+++ b/packages/firestore/src/local/indexeddb_target_cache.ts
@@ -268,7 +268,8 @@ export class IndexedDbTargetCache implements TargetCache {
const found = fromDbTarget(value);
// After finding a potential match, check that the target is
// actually equal to the requested target.
- if (targetEquals(target, found.target)) {
+ // TODO(pipeline): This needs to handle pipeline properly.
+ if (targetEquals(target, found.target as Target)) {
result = found;
control.done();
}
diff --git a/packages/firestore/src/local/local_serializer.ts b/packages/firestore/src/local/local_serializer.ts
index b8916608711..19b9dd83baa 100644
--- a/packages/firestore/src/local/local_serializer.ts
+++ b/packages/firestore/src/local/local_serializer.ts
@@ -19,7 +19,12 @@ import { Timestamp } from '../api/timestamp';
import { BundleMetadata, NamedQuery } from '../core/bundle';
import { LimitType, Query, queryWithLimit } from '../core/query';
import { SnapshotVersion } from '../core/snapshot_version';
-import { canonifyTarget, Target, targetIsDocumentTarget } from '../core/target';
+import {
+ canonifyTarget,
+ Target,
+ targetIsDocumentTarget,
+ targetIsPipelineTarget
+} from '../core/target';
import { MutableDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import {
@@ -36,18 +41,23 @@ import {
BundleMetadata as ProtoBundleMetadata,
NamedQuery as ProtoNamedQuery
} from '../protos/firestore_bundle_proto';
-import { DocumentsTarget as PublicDocumentsTarget } from '../protos/firestore_proto_api';
+import {
+ DocumentsTarget as PublicDocumentsTarget,
+ PipelineQueryTarget as PublicPipelineQueryTarget
+} from '../protos/firestore_proto_api';
import {
convertQueryTargetToQuery,
fromDocument,
fromDocumentsTarget,
fromMutation,
+ fromPipelineTarget,
fromQueryTarget,
fromVersion,
JsonProtoSerializer,
toDocument,
toDocumentsTarget,
toMutation,
+ toPipelineTarget,
toQueryTarget
} from '../remote/serializer';
import { debugAssert, fail } from '../util/assert';
@@ -71,6 +81,7 @@ import {
} from './indexeddb_schema';
import { DbDocumentOverlayKey, DbTimestampKey } from './indexeddb_sentinels';
import { TargetData, TargetPurpose } from './target_data';
+import { Pipeline } from '../api/pipeline';
/** Serializer for values stored in the LocalStore. */
export class LocalSerializer {
@@ -241,8 +252,10 @@ export function fromDbTarget(dbTarget: DbTarget): TargetData {
? fromDbTimestamp(dbTarget.lastLimboFreeSnapshotVersion)
: SnapshotVersion.min();
- let target: Target;
- if (isDocumentQuery(dbTarget.query)) {
+ let target: Target | Pipeline;
+ if (isPipelineQueryTarget(dbTarget.query)) {
+ target = fromPipelineTarget(dbTarget.query);
+ } else if (isDocumentQuery(dbTarget.query)) {
target = fromDocumentsTarget(dbTarget.query);
} else {
target = fromQueryTarget(dbTarget.query);
@@ -275,7 +288,21 @@ export function toDbTarget(
targetData.lastLimboFreeSnapshotVersion
);
let queryProto: DbQuery;
- if (targetIsDocumentTarget(targetData.target)) {
+ if (targetIsPipelineTarget(targetData.target)) {
+ queryProto = toPipelineTarget(
+ localSerializer.remoteSerializer,
+ targetData.target
+ );
+ return {
+ targetId: targetData.targetId,
+ canonicalId: '',
+ readTime: dbTimestamp,
+ resumeToken: '',
+ lastListenSequenceNumber: targetData.sequenceNumber,
+ lastLimboFreeSnapshotVersion: dbLastLimboFreeTimestamp,
+ query: queryProto
+ };
+ } else if (targetIsDocumentTarget(targetData.target)) {
queryProto = toDocumentsTarget(
localSerializer.remoteSerializer,
targetData.target
@@ -303,6 +330,12 @@ export function toDbTarget(
};
}
+function isPipelineQueryTarget(
+ dbQuery: DbQuery
+): dbQuery is PublicPipelineQueryTarget {
+ return (dbQuery as PublicPipelineQueryTarget).pipeline !== undefined;
+}
+
/**
* A helper function for figuring out what kind of query has been stored.
*/
diff --git a/packages/firestore/src/local/local_store_impl.ts b/packages/firestore/src/local/local_store_impl.ts
index 56f2b96f8d1..b0188f7a699 100644
--- a/packages/firestore/src/local/local_store_impl.ts
+++ b/packages/firestore/src/local/local_store_impl.ts
@@ -24,7 +24,12 @@ import {
queryToTarget
} from '../core/query';
import { SnapshotVersion } from '../core/snapshot_version';
-import { canonifyTarget, Target, targetEquals } from '../core/target';
+import {
+ canonifyTarget,
+ Target,
+ targetEquals,
+ targetIsPipelineTarget
+} from '../core/target';
import { BatchId, TargetId } from '../core/types';
import { Timestamp } from '../lite-api/timestamp';
import {
@@ -90,6 +95,7 @@ import { ClientId } from './shared_client_state';
import { isIndexedDbTransactionError } from './simple_db';
import { TargetCache } from './target_cache';
import { TargetData, TargetPurpose } from './target_data';
+import { Pipeline } from '../api/pipeline';
export const LOG_TAG = 'LocalStore';
@@ -935,9 +941,28 @@ export function localStoreReadDocument(
*/
export function localStoreAllocateTarget(
localStore: LocalStore,
- target: Target
+ target: Target | Pipeline
): Promise {
const localStoreImpl = debugCast(localStore, LocalStoreImpl);
+ if (targetIsPipelineTarget(target)) {
+ return localStoreImpl.persistence.runTransaction(
+ 'Allocate pipeline target',
+ 'readwrite',
+ txn => {
+ return localStoreImpl.targetCache
+ .allocateTargetId(txn)
+ .next(targetId => {
+ return new TargetData(
+ target,
+ targetId,
+ TargetPurpose.Listen,
+ txn.currentSequenceNumber
+ );
+ });
+ }
+ );
+ }
+
return localStoreImpl.persistence
.runTransaction('Allocate target', 'readwrite', txn => {
let targetData: TargetData;
@@ -1025,6 +1050,13 @@ export async function localStoreReleaseTarget(
): Promise {
const localStoreImpl = debugCast(localStore, LocalStoreImpl);
const targetData = localStoreImpl.targetDataByTarget.get(targetId);
+
+ // TODO(pipeline): this is a hack that only works because pipelines are the only ones returning nulls here.
+ // REMOVE ASAP.
+ if (targetData === null) {
+ return;
+ }
+
debugAssert(
targetData !== null,
`Tried to release nonexistent target: ${targetId}`
@@ -1063,7 +1095,8 @@ export async function localStoreReleaseTarget(
localStoreImpl.targetDataByTarget =
localStoreImpl.targetDataByTarget.remove(targetId);
- localStoreImpl.targetIdByTarget.delete(targetData!.target);
+ // TODO(pipeline): This needs to handle pipeline properly.
+ localStoreImpl.targetIdByTarget.delete(targetData!.target as Target);
}
/**
@@ -1220,15 +1253,21 @@ export function localStoreGetCachedTarget(
);
const cachedTargetData = localStoreImpl.targetDataByTarget.get(targetId);
if (cachedTargetData) {
- return Promise.resolve(cachedTargetData.target);
+ // TODO(pipeline): This needs to handle pipeline properly.
+ return Promise.resolve(cachedTargetData.target as Target);
} else {
return localStoreImpl.persistence.runTransaction(
'Get target data',
'readonly',
txn => {
- return targetCacheImpl
- .getTargetDataForTarget(txn, targetId)
- .next(targetData => (targetData ? targetData.target : null));
+ return (
+ targetCacheImpl
+ .getTargetDataForTarget(txn, targetId)
+ // TODO(pipeline): This needs to handle pipeline properly.
+ .next(targetData =>
+ targetData ? (targetData.target as Target) : null
+ )
+ );
}
);
}
diff --git a/packages/firestore/src/local/memory_target_cache.ts b/packages/firestore/src/local/memory_target_cache.ts
index 4d2a01d5651..f4a11ae4f66 100644
--- a/packages/firestore/src/local/memory_target_cache.ts
+++ b/packages/firestore/src/local/memory_target_cache.ts
@@ -101,7 +101,8 @@ export class MemoryTargetCache implements TargetCache {
}
private saveTargetData(targetData: TargetData): void {
- this.targets.set(targetData.target, targetData);
+ // TODO(pipeline): This needs to handle pipeline properly.
+ this.targets.set(targetData.target as Target, targetData);
const targetId = targetData.targetId;
if (targetId > this.highestTargetId) {
this.targetIdGenerator = new TargetIdGenerator(targetId);
@@ -117,7 +118,8 @@ export class MemoryTargetCache implements TargetCache {
targetData: TargetData
): PersistencePromise {
debugAssert(
- !this.targets.has(targetData.target),
+ // TODO(pipeline): This needs to handle pipeline properly.
+ !this.targets.has(targetData.target as Target),
'Adding a target that already exists'
);
this.saveTargetData(targetData);
@@ -130,7 +132,8 @@ export class MemoryTargetCache implements TargetCache {
targetData: TargetData
): PersistencePromise {
debugAssert(
- this.targets.has(targetData.target),
+ // TODO(pipeline): This needs to handle pipeline properly.
+ this.targets.has(targetData.target as Target),
'Updating a nonexistent target'
);
this.saveTargetData(targetData);
@@ -143,10 +146,11 @@ export class MemoryTargetCache implements TargetCache {
): PersistencePromise {
debugAssert(this.targetCount > 0, 'Removing a target from an empty cache');
debugAssert(
- this.targets.has(targetData.target),
+ // TODO(pipeline): This needs to handle pipeline properly.
+ this.targets.has(targetData.target as Target),
'Removing a nonexistent target from the cache'
);
- this.targets.delete(targetData.target);
+ this.targets.delete(targetData.target as Target);
this.references.removeReferencesForId(targetData.targetId);
this.targetCount -= 1;
return PersistencePromise.resolve();
diff --git a/packages/firestore/src/local/target_data.ts b/packages/firestore/src/local/target_data.ts
index a912c21d498..10b36f357b5 100644
--- a/packages/firestore/src/local/target_data.ts
+++ b/packages/firestore/src/local/target_data.ts
@@ -19,6 +19,7 @@ import { SnapshotVersion } from '../core/snapshot_version';
import { Target } from '../core/target';
import { ListenSequenceNumber, TargetId } from '../core/types';
import { ByteString } from '../util/byte_string';
+import { Pipeline } from '../api/pipeline';
/** An enumeration of the different purposes we have for targets. */
export const enum TargetPurpose {
@@ -47,7 +48,7 @@ export const enum TargetPurpose {
export class TargetData {
constructor(
/** The target being listened to. */
- readonly target: Target,
+ readonly target: Target | Pipeline,
/**
* The target ID to which the target corresponds; Assigned by the
* LocalStore for user listens and by the SyncEngine for limbo watches.
diff --git a/packages/firestore/src/protos/firestore_proto_api.ts b/packages/firestore/src/protos/firestore_proto_api.ts
index cc1c57259f5..926c4422cc5 100644
--- a/packages/firestore/src/protos/firestore_proto_api.ts
+++ b/packages/firestore/src/protos/firestore_proto_api.ts
@@ -356,6 +356,9 @@ export declare namespace firestoreV1ApiClientInterfaces {
parent?: string;
structuredQuery?: StructuredQuery;
}
+ interface PipelineQueryTarget {
+ pipeline?: StructuredPipeline;
+ }
interface ReadOnly {
readTime?: string;
}
@@ -424,6 +427,7 @@ export declare namespace firestoreV1ApiClientInterfaces {
interface Target {
query?: QueryTarget;
documents?: DocumentsTarget;
+ pipelineQuery?: PipelineQueryTarget;
resumeToken?: string | Uint8Array;
readTime?: Timestamp;
targetId?: number;
@@ -555,6 +559,8 @@ export declare type Pipeline = firestoreV1ApiClientInterfaces.Pipeline;
export declare type Precondition = firestoreV1ApiClientInterfaces.Precondition;
export declare type Projection = firestoreV1ApiClientInterfaces.Projection;
export declare type QueryTarget = firestoreV1ApiClientInterfaces.QueryTarget;
+export declare type PipelineQueryTarget =
+ firestoreV1ApiClientInterfaces.PipelineQueryTarget;
export declare type ReadOnly = firestoreV1ApiClientInterfaces.ReadOnly;
export declare type ReadWrite = firestoreV1ApiClientInterfaces.ReadWrite;
export declare type RollbackRequest =
diff --git a/packages/firestore/src/protos/google/firestore/v1/firestore.proto b/packages/firestore/src/protos/google/firestore/v1/firestore.proto
index 3e7b62e0609..09605a1b708 100644
--- a/packages/firestore/src/protos/google/firestore/v1/firestore.proto
+++ b/packages/firestore/src/protos/google/firestore/v1/firestore.proto
@@ -913,6 +913,15 @@ message Target {
}
}
+ // A target specified by a pipeline query.
+ message PipelineQueryTarget {
+ // The pipeline to run.
+ oneof pipeline_type {
+ // A pipelined operation in structured format.
+ StructuredPipeline pipeline = 1;
+ }
+ }
+
// The type of target to listen to.
oneof target_type {
// A target specified by a query.
@@ -920,6 +929,9 @@ message Target {
// A target specified by a set of document names.
DocumentsTarget documents = 3;
+
+ // A target specified by a pipeline query.
+ PipelineQueryTarget pipeline_query = 13;
}
// When to start listening.
diff --git a/packages/firestore/src/protos/google/firestore/v1/write.proto b/packages/firestore/src/protos/google/firestore/v1/write.proto
index d8465955b67..f1d1bbb9ec1 100644
--- a/packages/firestore/src/protos/google/firestore/v1/write.proto
+++ b/packages/firestore/src/protos/google/firestore/v1/write.proto
@@ -198,6 +198,12 @@ message WriteResult {
//
// Multiple [DocumentChange][google.firestore.v1.DocumentChange] messages may be returned for the same logical
// change, if multiple targets are affected.
+//
+// For PipelineQueryTargets, `document` will be in the new pipeline format,
+// (-- TODO(b/330735468): Insert link to spec. --)
+// For a Listen stream with both QueryTargets and PipelineQueryTargets present,
+// if a document matches both types of queries, then a separate DocumentChange
+// messages will be sent out one for each set.
message DocumentChange {
// The new state of the [Document][google.firestore.v1.Document].
//
diff --git a/packages/firestore/src/protos/protos.json b/packages/firestore/src/protos/protos.json
index 5b73c4647f8..c489388e1be 100644
--- a/packages/firestore/src/protos/protos.json
+++ b/packages/firestore/src/protos/protos.json
@@ -2343,7 +2343,8 @@
"targetType": {
"oneof": [
"query",
- "documents"
+ "documents",
+ "pipeline_query"
]
},
"resumeType": {
@@ -2362,6 +2363,10 @@
"type": "DocumentsTarget",
"id": 3
},
+ "pipelineQuery": {
+ "type": "PipelineQueryTarget",
+ "id": 13
+ },
"resumeToken": {
"type": "bytes",
"id": 4
@@ -2411,6 +2416,21 @@
"id": 2
}
}
+ },
+ "PipelineQueryTarget": {
+ "oneofs": {
+ "pipelineType": {
+ "oneof": [
+ "pipeline"
+ ]
+ }
+ },
+ "fields": {
+ "pipeline": {
+ "type": "StructuredPipeline",
+ "id": 1
+ }
+ }
}
}
},
@@ -3266,4 +3286,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/packages/firestore/src/remote/remote_event.ts b/packages/firestore/src/remote/remote_event.ts
index 49b2ef56a97..6af7861ee96 100644
--- a/packages/firestore/src/remote/remote_event.ts
+++ b/packages/firestore/src/remote/remote_event.ts
@@ -54,6 +54,11 @@ export class RemoteEvent {
* doc's new values (if not deleted).
*/
readonly documentUpdates: MutableDocumentMap,
+ /**
+ * A set of which augmented documents (pipeline) have changed or been deleted, along with the
+ * doc's new values (if not deleted).
+ */
+ readonly augmentedDocumentUpdates: MutableDocumentMap,
/**
* A set of which document updates are due only to limbo resolution targets.
*/
@@ -86,6 +91,7 @@ export class RemoteEvent {
targetChanges,
new SortedMap(primitiveComparator),
mutableDocumentMap(),
+ mutableDocumentMap(),
documentKeySet()
);
}
diff --git a/packages/firestore/src/remote/serializer.ts b/packages/firestore/src/remote/serializer.ts
index 4759571b4a5..e0d54482629 100644
--- a/packages/firestore/src/remote/serializer.ts
+++ b/packages/firestore/src/remote/serializer.ts
@@ -35,7 +35,11 @@ import {
queryToTarget
} from '../core/query';
import { SnapshotVersion } from '../core/snapshot_version';
-import { targetIsDocumentTarget, Target } from '../core/target';
+import {
+ targetIsDocumentTarget,
+ Target,
+ targetIsPipelineTarget
+} from '../core/target';
import { TargetId } from '../core/types';
import { Bytes } from '../lite-api/bytes';
import { GeoPoint } from '../lite-api/geo_point';
@@ -84,6 +88,7 @@ import {
OrderDirection as ProtoOrderDirection,
Precondition as ProtoPrecondition,
QueryTarget as ProtoQueryTarget,
+ PipelineQueryTarget as ProtoPipelineQueryTarget,
RunAggregationQueryRequest as ProtoRunAggregationQueryRequest,
Aggregation as ProtoAggregation,
Status as ProtoStatus,
@@ -111,6 +116,7 @@ import {
WatchTargetChange,
WatchTargetChangeState
} from './watch_change';
+import { Pipeline } from '../api/pipeline';
const DIRECTIONS = (() => {
const dirs: { [dir: string]: ProtoOrderDirection } = {};
@@ -1087,14 +1093,28 @@ export function toLabel(purpose: TargetPurpose): string | null {
}
}
+export function fromPipelineTarget(target: ProtoPipelineQueryTarget): Pipeline {
+ return {} as Pipeline;
+}
+
+export function toPipelineTarget(
+ serializer: JsonProtoSerializer,
+ target: Pipeline
+): ProtoPipelineQueryTarget {
+ return {
+ pipeline: target._toStructuredPipeline(serializer)
+ };
+}
+
export function toTarget(
serializer: JsonProtoSerializer,
targetData: TargetData
): ProtoTarget {
let result: ProtoTarget;
const target = targetData.target;
-
- if (targetIsDocumentTarget(target)) {
+ if (targetIsPipelineTarget(target)) {
+ result = { pipelineQuery: toPipelineTarget(serializer, target) };
+ } else if (targetIsDocumentTarget(target)) {
result = { documents: toDocumentsTarget(serializer, target) };
} else {
result = { query: toQueryTarget(serializer, target).queryTarget };
diff --git a/packages/firestore/src/remote/watch_change.ts b/packages/firestore/src/remote/watch_change.ts
index 38e10a23e35..dd595c9863d 100644
--- a/packages/firestore/src/remote/watch_change.ts
+++ b/packages/firestore/src/remote/watch_change.ts
@@ -17,7 +17,7 @@
import { DatabaseId } from '../core/database_info';
import { SnapshotVersion } from '../core/snapshot_version';
-import { targetIsDocumentTarget } from '../core/target';
+import { targetIsDocumentTarget, targetIsPipelineTarget } from '../core/target';
import { TargetId } from '../core/types';
import { ChangeType } from '../core/view_snapshot';
import { TargetData, TargetPurpose } from '../local/target_data';
@@ -292,6 +292,9 @@ export class WatchChangeAggregator {
/** Keeps track of the documents to update since the last raised snapshot. */
private pendingDocumentUpdates = mutableDocumentMap();
+ /** Keeps track of the augmented documents to update since the last raised snapshot. */
+ private pendingAugmentedDocumentUpdates = mutableDocumentMap();
+
/** A mapping of document keys to their set of target IDs. */
private pendingDocumentTargetMapping = documentTargetMap();
@@ -414,7 +417,9 @@ export class WatchChangeAggregator {
const targetData = this.targetDataForActiveTarget(targetId);
if (targetData) {
const target = targetData.target;
- if (targetIsDocumentTarget(target)) {
+ if (targetIsPipelineTarget(target)) {
+ //TODO(pipeline): handle existence filter correctly for pipelines
+ } else if (targetIsDocumentTarget(target)) {
if (expectedCount === 0) {
// The existence filter told us the document does not exist. We deduce
// that this document does not exist and apply a deleted document to
@@ -584,7 +589,11 @@ export class WatchChangeAggregator {
this.targetStates.forEach((targetState, targetId) => {
const targetData = this.targetDataForActiveTarget(targetId);
if (targetData) {
- if (targetState.current && targetIsDocumentTarget(targetData.target)) {
+ if (
+ targetState.current &&
+ !targetIsPipelineTarget(targetData.target) &&
+ targetIsDocumentTarget(targetData.target)
+ ) {
// Document queries for document that don't exist can produce an empty
// result set. To update our local cache, we synthesize a document
// delete if we have not previously received the document. This
@@ -645,16 +654,21 @@ export class WatchChangeAggregator {
this.pendingDocumentUpdates.forEach((_, doc) =>
doc.setReadTime(snapshotVersion)
);
+ this.pendingAugmentedDocumentUpdates.forEach((_, doc) =>
+ doc.setReadTime(snapshotVersion)
+ );
const remoteEvent = new RemoteEvent(
snapshotVersion,
targetChanges,
this.pendingTargetResets,
this.pendingDocumentUpdates,
+ this.pendingAugmentedDocumentUpdates,
resolvedLimboDocuments
);
this.pendingDocumentUpdates = mutableDocumentMap();
+ this.pendingAugmentedDocumentUpdates = mutableDocumentMap();
this.pendingDocumentTargetMapping = documentTargetMap();
this.pendingTargetResets = new SortedMap(
primitiveComparator
@@ -680,10 +694,17 @@ export class WatchChangeAggregator {
const targetState = this.ensureTargetState(targetId);
targetState.addDocumentChange(document.key, changeType);
- this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert(
- document.key,
- document
- );
+ if (
+ targetIsPipelineTarget(this.targetDataForActiveTarget(targetId)!.target)
+ ) {
+ this.pendingAugmentedDocumentUpdates =
+ this.pendingAugmentedDocumentUpdates.insert(document.key, document);
+ } else {
+ this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert(
+ document.key,
+ document
+ );
+ }
this.pendingDocumentTargetMapping =
this.pendingDocumentTargetMapping.insert(
@@ -725,10 +746,17 @@ export class WatchChangeAggregator {
);
if (updatedDocument) {
- this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert(
- key,
- updatedDocument
- );
+ if (
+ targetIsPipelineTarget(this.targetDataForActiveTarget(targetId)!.target)
+ ) {
+ this.pendingAugmentedDocumentUpdates =
+ this.pendingAugmentedDocumentUpdates.insert(key, updatedDocument);
+ } else {
+ this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert(
+ key,
+ updatedDocument
+ );
+ }
}
}
diff --git a/packages/firestore/test/integration/api/pipeline.listen.test.ts b/packages/firestore/test/integration/api/pipeline.listen.test.ts
new file mode 100644
index 00000000000..4752654b4ad
--- /dev/null
+++ b/packages/firestore/test/integration/api/pipeline.listen.test.ts
@@ -0,0 +1,340 @@
+// Copyright 2024 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+import { expect, use } from 'chai';
+import chaiAsPromised from 'chai-as-promised';
+
+import { addEqualityMatcher } from '../../util/equality_matcher';
+import { Deferred } from '../../util/promise';
+import {
+ add,
+ andExpression,
+ arrayContains,
+ arrayContainsAny,
+ avg,
+ CollectionReference,
+ Constant,
+ cosineDistance,
+ countAll,
+ doc,
+ DocumentData,
+ dotProduct,
+ endsWith,
+ eq,
+ euclideanDistance,
+ Field,
+ Firestore,
+ gt,
+ like,
+ limitToLast,
+ lt,
+ lte,
+ mapGet,
+ neq,
+ not,
+ onSnapshot,
+ orderBy,
+ orExpression,
+ PipelineResult,
+ query,
+ QuerySnapshot,
+ regexContains,
+ regexMatch,
+ setDoc,
+ setLogLevel,
+ startsWith,
+ strConcat,
+ subtract,
+ updateDoc
+} from '../util/firebase_export';
+import { apiDescribe, toDataArray, withTestCollection } from '../util/helpers';
+import { EventsAccumulator } from '../util/events_accumulator';
+import { PipelineSnapshot } from '../../../src/api/snapshot';
+
+use(chaiAsPromised);
+
+apiDescribe('Pipelines', persistence => {
+ addEqualityMatcher();
+ let firestore: Firestore;
+ let randomCol: CollectionReference;
+
+ async function testCollectionWithDocs(docs: {
+ [id: string]: DocumentData;
+ }): Promise> {
+ for (const id in docs) {
+ if (docs.hasOwnProperty(id)) {
+ const ref = doc(randomCol, id);
+ await setDoc(ref, docs[id]);
+ }
+ }
+ return randomCol;
+ }
+
+ function expectResults(
+ result: Array>,
+ ...docs: string[]
+ ): void;
+ function expectResults(
+ result: Array>,
+ ...data: DocumentData[]
+ ): void;
+
+ function expectResults(
+ result: Array>,
+ ...data: DocumentData[] | string[]
+ ): void {
+ expect(result.length).to.equal(data.length);
+
+ if (data.length > 0) {
+ if (typeof data[0] === 'string') {
+ const actualIds = result.map(result => result.ref?.id);
+ expect(actualIds).to.deep.equal(data);
+ } else {
+ result.forEach(r => {
+ expect(r.data()).to.deep.equal(data.shift());
+ });
+ }
+ }
+ }
+
+ // async function compareQueryAndPipeline(query: Query): Promise {
+ // const queryResults = await getDocs(query);
+ // const pipeline = query.pipeline();
+ // const pipelineResults = await pipeline.execute();
+ //
+ // expect(queryResults.docs.map(s => s._fieldsProto)).to.deep.equal(
+ // pipelineResults.map(r => r._fieldsProto)
+ // );
+ // return queryResults;
+ // }
+
+ // TODO(pipeline): move this to a util file
+ async function setupBookDocs(): Promise> {
+ const bookDocs: { [id: string]: DocumentData } = {
+ book1: {
+ title: "The Hitchhiker's Guide to the Galaxy",
+ author: 'Douglas Adams',
+ genre: 'Science Fiction',
+ published: 1979,
+ rating: 4.2,
+ tags: ['comedy', 'space', 'adventure'],
+ awards: {
+ hugo: true,
+ nebula: false,
+ others: { unknown: { year: 1980 } }
+ },
+ nestedField: { 'level.1': { 'level.2': true } }
+ },
+ book2: {
+ title: 'Pride and Prejudice',
+ author: 'Jane Austen',
+ genre: 'Romance',
+ published: 1813,
+ rating: 4.5,
+ tags: ['classic', 'social commentary', 'love'],
+ awards: { none: true }
+ },
+ book3: {
+ title: 'One Hundred Years of Solitude',
+ author: 'Gabriel García Márquez',
+ genre: 'Magical Realism',
+ published: 1967,
+ rating: 4.3,
+ tags: ['family', 'history', 'fantasy'],
+ awards: { nobel: true, nebula: false }
+ },
+ book4: {
+ title: 'The Lord of the Rings',
+ author: 'J.R.R. Tolkien',
+ genre: 'Fantasy',
+ published: 1954,
+ rating: 4.7,
+ tags: ['adventure', 'magic', 'epic'],
+ awards: { hugo: false, nebula: false }
+ },
+ book5: {
+ title: "The Handmaid's Tale",
+ author: 'Margaret Atwood',
+ genre: 'Dystopian',
+ published: 1985,
+ rating: 4.1,
+ tags: ['feminism', 'totalitarianism', 'resistance'],
+ awards: { 'arthur c. clarke': true, 'booker prize': false }
+ },
+ book6: {
+ title: 'Crime and Punishment',
+ author: 'Fyodor Dostoevsky',
+ genre: 'Psychological Thriller',
+ published: 1866,
+ rating: 4.3,
+ tags: ['philosophy', 'crime', 'redemption'],
+ awards: { none: true }
+ },
+ book7: {
+ title: 'To Kill a Mockingbird',
+ author: 'Harper Lee',
+ genre: 'Southern Gothic',
+ published: 1960,
+ rating: 4.2,
+ tags: ['racism', 'injustice', 'coming-of-age'],
+ awards: { pulitzer: true }
+ },
+ book8: {
+ title: '1984',
+ author: 'George Orwell',
+ genre: 'Dystopian',
+ published: 1949,
+ rating: 4.2,
+ tags: ['surveillance', 'totalitarianism', 'propaganda'],
+ awards: { prometheus: true }
+ },
+ book9: {
+ title: 'The Great Gatsby',
+ author: 'F. Scott Fitzgerald',
+ genre: 'Modernist',
+ published: 1925,
+ rating: 4.0,
+ tags: ['wealth', 'american dream', 'love'],
+ awards: { none: true }
+ },
+ book10: {
+ title: 'Dune',
+ author: 'Frank Herbert',
+ genre: 'Science Fiction',
+ published: 1965,
+ rating: 4.6,
+ tags: ['politics', 'desert', 'ecology'],
+ awards: { hugo: true, nebula: true }
+ }
+ };
+ return testCollectionWithDocs(bookDocs);
+ }
+
+ let testDeferred: Deferred | undefined;
+ let withTestCollectionPromise: Promise | undefined;
+
+ beforeEach(async () => {
+ const setupDeferred = new Deferred();
+ testDeferred = new Deferred();
+ withTestCollectionPromise = withTestCollection(
+ persistence,
+ {},
+ async (collectionRef, firestoreInstance) => {
+ randomCol = collectionRef;
+ firestore = firestoreInstance;
+ await setupBookDocs();
+ setupDeferred.resolve();
+
+ return testDeferred?.promise;
+ }
+ );
+
+ await setupDeferred.promise;
+ setLogLevel('debug');
+ });
+
+ afterEach(async () => {
+ testDeferred?.resolve();
+ await withTestCollectionPromise;
+ setLogLevel('info');
+ });
+
+ it('basic listen works', async () => {
+ const storeEvent = new EventsAccumulator();
+
+ let result = onSnapshot(randomCol, storeEvent.storeEvent);
+ let snapshot = await storeEvent.awaitEvent();
+
+ expect(toDataArray(snapshot)).to.deep.equal([
+ { k: 'b', sort: 1 },
+ { k: 'a', sort: 0 }
+ ]);
+ });
+
+ it.only('basic listen works', async () => {
+ const storeEvent = new EventsAccumulator();
+
+ let result = firestore
+ .pipeline()
+ .collection(randomCol.path)
+ .where(eq('author', 'Douglas Adams'))
+ ._onSnapshot(storeEvent.storeEvent);
+ let snapshot = await storeEvent.awaitEvent();
+
+ expect(toDataArray(snapshot)).to.deep.equal([
+ {
+ title: "The Hitchhiker's Guide to the Galaxy",
+ author: 'Douglas Adams',
+ genre: 'Science Fiction',
+ published: 1979,
+ rating: 4.2,
+ tags: ['comedy', 'space', 'adventure'],
+ awards: {
+ hugo: true,
+ nebula: false,
+ others: { unknown: { year: 1980 } }
+ },
+ nestedField: { 'level.1': { 'level.2': true } }
+ }
+ ]);
+
+ await updateDoc(doc(randomCol, 'book1'), { rating: 4.3 });
+ snapshot = await storeEvent.awaitEvent();
+ snapshot = await storeEvent.awaitEvent();
+ expect(toDataArray(snapshot)).to.deep.equal([
+ {
+ title: "The Hitchhiker's Guide to the Galaxy",
+ author: 'Douglas Adams',
+ genre: 'Science Fiction',
+ published: 1979,
+ rating: 4.3,
+ tags: ['comedy', 'space', 'adventure'],
+ awards: {
+ hugo: true,
+ nebula: false,
+ others: { unknown: { year: 1980 } }
+ },
+ nestedField: { 'level.1': { 'level.2': true } }
+ }
+ ]);
+
+ await updateDoc(doc(randomCol, 'book2'), { author: 'Douglas Adams' });
+ snapshot = await storeEvent.awaitEvent();
+ expect(toDataArray(snapshot)).to.deep.equal([
+ {
+ title: "The Hitchhiker's Guide to the Galaxy",
+ author: 'Douglas Adams',
+ genre: 'Science Fiction',
+ published: 1979,
+ rating: 4.3,
+ tags: ['comedy', 'space', 'adventure'],
+ awards: {
+ hugo: true,
+ nebula: false,
+ others: { unknown: { year: 1980 } }
+ },
+ nestedField: { 'level.1': { 'level.2': true } }
+ },
+ {
+ title: 'Pride and Prejudice',
+ author: 'Douglas Adams', //'Jane Austen',
+ genre: 'Romance',
+ published: 1813,
+ rating: 4.5,
+ tags: ['classic', 'social commentary', 'love'],
+ awards: { none: true }
+ }
+ ]);
+ });
+});
diff --git a/packages/firestore/test/integration/api/pipeline.test.ts b/packages/firestore/test/integration/api/pipeline.test.ts
index 48e4e3a4c1b..e5dcfa5aa86 100644
--- a/packages/firestore/test/integration/api/pipeline.test.ts
+++ b/packages/firestore/test/integration/api/pipeline.test.ts
@@ -55,7 +55,7 @@ import { apiDescribe, withTestCollection } from '../util/helpers';
use(chaiAsPromised);
-apiDescribe.only('Pipelines', persistence => {
+apiDescribe('Pipelines', persistence => {
addEqualityMatcher();
let firestore: Firestore;
let randomCol: CollectionReference;
diff --git a/packages/firestore/test/integration/prime_backend.test.ts b/packages/firestore/test/integration/prime_backend.test.ts
index c1c121e9a0f..54d57b5fabc 100644
--- a/packages/firestore/test/integration/prime_backend.test.ts
+++ b/packages/firestore/test/integration/prime_backend.test.ts
@@ -36,22 +36,22 @@ before(
this.timeout(PRIMING_TIMEOUT_MS);
return withTestDoc(new MemoryEagerPersistenceMode(), async (doc, db) => {
- const accumulator = new EventsAccumulator();
- const unsubscribe = onSnapshot(doc, accumulator.storeEvent);
-
- // Wait for watch to initialize and deliver first event.
- await accumulator.awaitRemoteEvent();
-
- // Use a transaction to perform a write without triggering any local events.
- await runTransaction(db, async txn => {
- txn.set(doc, { value: 'done' });
- });
-
- // Wait to see the write on the watch stream.
- const docSnap = await accumulator.awaitRemoteEvent();
- expect(docSnap.get('value')).to.equal('done');
-
- unsubscribe();
+ // const accumulator = new EventsAccumulator();
+ // const unsubscribe = onSnapshot(doc, accumulator.storeEvent);
+ //
+ // // Wait for watch to initialize and deliver first event.
+ // await accumulator.awaitRemoteEvent();
+ //
+ // // Use a transaction to perform a write without triggering any local events.
+ // await runTransaction(db, async txn => {
+ // txn.set(doc, { value: 'done' });
+ // });
+ //
+ // // Wait to see the write on the watch stream.
+ // const docSnap = await accumulator.awaitRemoteEvent();
+ // expect(docSnap.get('value')).to.equal('done');
+ //
+ // unsubscribe();
});
}
);
diff --git a/packages/firestore/test/integration/util/events_accumulator.ts b/packages/firestore/test/integration/util/events_accumulator.ts
index 02f3ae65495..65d8fb3e1ee 100644
--- a/packages/firestore/test/integration/util/events_accumulator.ts
+++ b/packages/firestore/test/integration/util/events_accumulator.ts
@@ -20,12 +20,15 @@ import { expect } from 'chai';
import { Deferred } from '../../util/promise';
import { DocumentSnapshot, QuerySnapshot } from './firebase_export';
+import { PipelineSnapshot } from '../../../src/api/snapshot';
/**
* A helper object that can accumulate an arbitrary amount of events and resolve
* a promise when expected number has been emitted.
*/
-export class EventsAccumulator {
+export class EventsAccumulator<
+ T extends DocumentSnapshot | QuerySnapshot | PipelineSnapshot
+> {
private events: T[] = [];
private waitingFor: number = 0;
private deferred: Deferred | null = null;
diff --git a/packages/firestore/test/integration/util/helpers.ts b/packages/firestore/test/integration/util/helpers.ts
index 647360db463..55a437eb6f8 100644
--- a/packages/firestore/test/integration/util/helpers.ts
+++ b/packages/firestore/test/integration/util/helpers.ts
@@ -53,6 +53,7 @@ import {
TARGET_DB_ID,
USE_EMULATOR
} from './settings';
+import { PipelineSnapshot } from '../../../src/api/snapshot';
/* eslint-disable no-restricted-globals */
@@ -218,8 +219,14 @@ apiDescribe.skip = apiDescribeInternal.bind(null, describe.skip);
apiDescribe.only = apiDescribeInternal.bind(null, describe.only);
/** Converts the documents in a QuerySnapshot to an array with the data of each document. */
-export function toDataArray(docSet: QuerySnapshot): DocumentData[] {
- return docSet.docs.map(d => d.data());
+export function toDataArray(
+ docSet: QuerySnapshot | PipelineSnapshot
+): DocumentData[] {
+ if (docSet instanceof QuerySnapshot) {
+ return docSet.docs.map(d => d.data());
+ } else {
+ return docSet.results.map(d => d.data()!);
+ }
}
/** Converts the changes in a QuerySnapshot to an array with the data of each document. */
diff --git a/packages/firestore/test/unit/local/indexeddb_persistence.test.ts b/packages/firestore/test/unit/local/indexeddb_persistence.test.ts
index e44bb73e47b..965af19043f 100644
--- a/packages/firestore/test/unit/local/indexeddb_persistence.test.ts
+++ b/packages/firestore/test/unit/local/indexeddb_persistence.test.ts
@@ -21,7 +21,7 @@ import { Context } from 'mocha';
import { queryToTarget } from '../../../src/core/query';
import { SnapshotVersion } from '../../../src/core/snapshot_version';
-import { canonifyTarget } from '../../../src/core/target';
+import { canonifyTarget, Target } from '../../../src/core/target';
import {
decodeResourcePath,
encodeResourcePath
@@ -911,7 +911,8 @@ describe('IndexedDbSchema: createOrUpgradeDb', () => {
const targetsStore = txn.store(DbTargetStore);
return targetsStore.iterate((key, value) => {
const targetData = fromDbTarget(value).target;
- const expectedCanonicalId = canonifyTarget(targetData);
+ // TODO(pipeline): This needs to handle pipeline properly.
+ const expectedCanonicalId = canonifyTarget(targetData as Target);
const actualCanonicalId = value.canonicalId;
expect(actualCanonicalId).to.equal(expectedCanonicalId);
diff --git a/packages/firestore/test/unit/local/target_cache.test.ts b/packages/firestore/test/unit/local/target_cache.test.ts
index 8928bbcdde1..00f21719103 100644
--- a/packages/firestore/test/unit/local/target_cache.test.ts
+++ b/packages/firestore/test/unit/local/target_cache.test.ts
@@ -168,7 +168,8 @@ function genericTargetCacheTests(
it('can set and read a target', async () => {
const targetData = testTargetData(QUERY_ROOMS, 1, 1);
await cache.addTargetData(targetData);
- const read = await cache.getTargetData(targetData.target);
+ // TODO(pipeline): This needs to handle pipeline properly.
+ const read = await cache.getTargetData(targetData.target as Target);
expect(read).to.deep.equal(targetData);
});
@@ -210,7 +211,8 @@ function genericTargetCacheTests(
await cache.addTargetData(testTargetData(QUERY_ROOMS, 1, 1));
const updated = testTargetData(QUERY_ROOMS, 1, 2);
await cache.updateTargetData(updated);
- const retrieved = await cache.getTargetData(updated.target);
+ // TODO(pipeline): This needs to handle pipeline properly.
+ const retrieved = await cache.getTargetData(updated.target as Target);
expect(retrieved).to.deep.equal(updated);
});