Skip to content

Commit 8fa7fed

Browse files
committed
fix(DataStore): dataStore cannot connect to model's sync subscriptions (AWS_LAMBDA auth type) #3549
1 parent b96fbda commit 8fa7fed

11 files changed

+114
-16
lines changed

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration.swift

+7
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public struct DataStoreConfiguration {
6666

6767
/// Selective sync expressions
6868
public let syncExpressions: [DataStoreSyncExpression]
69+
70+
/// Use syncExpressions as subscriptions filter
71+
public let subscriptionFiltering: Bool
6972

7073
/// Authorization mode strategy
7174
public var authModeStrategyType: AuthModeStrategyType
@@ -79,6 +82,7 @@ public struct DataStoreConfiguration {
7982
syncMaxRecords: UInt,
8083
syncPageSize: UInt,
8184
syncExpressions: [DataStoreSyncExpression],
85+
subscriptionFiltering: Bool = false,
8286
authModeStrategy: AuthModeStrategyType = .default,
8387
disableSubscriptions: @escaping () -> Bool) {
8488
self.errorHandler = errorHandler
@@ -87,6 +91,7 @@ public struct DataStoreConfiguration {
8791
self.syncMaxRecords = syncMaxRecords
8892
self.syncPageSize = syncPageSize
8993
self.syncExpressions = syncExpressions
94+
self.subscriptionFiltering = subscriptionFiltering
9095
self.authModeStrategyType = authModeStrategy
9196
self.disableSubscriptions = disableSubscriptions
9297
}
@@ -97,13 +102,15 @@ public struct DataStoreConfiguration {
97102
syncMaxRecords: UInt,
98103
syncPageSize: UInt,
99104
syncExpressions: [DataStoreSyncExpression],
105+
subscriptionFiltering: Bool = false,
100106
authModeStrategy: AuthModeStrategyType = .default) {
101107
self.errorHandler = errorHandler
102108
self.conflictHandler = conflictHandler
103109
self.syncInterval = syncInterval
104110
self.syncMaxRecords = syncMaxRecords
105111
self.syncPageSize = syncPageSize
106112
self.syncExpressions = syncExpressions
113+
self.subscriptionFiltering = subscriptionFiltering
107114
self.authModeStrategyType = authModeStrategy
108115
self.disableSubscriptions = { false }
109116
}

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
8787
authModeStrategy: resolvedAuthStrategy)
8888
// swiftlint:disable line_length
8989
let reconciliationQueueFactory = reconciliationQueueFactory ??
90-
AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:disableSubscriptions:)
90+
AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:subscriptionFiltering:disableSubscriptions:)
9191
// swiftlint:enable line_length
9292
let initialSyncOrchestratorFactory = initialSyncOrchestratorFactory ??
9393
AWSInitialSyncOrchestrator.init(dataStoreConfiguration:authModeStrategy:api:reconciliationQueue:storageAdapter:)
@@ -291,6 +291,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
291291
auth,
292292
authModeStrategy,
293293
nil,
294+
dataStoreConfiguration.subscriptionFiltering,
294295
dataStoreConfiguration.disableSubscriptions)
295296
reconciliationQueueSink = reconciliationQueue?
296297
.publisher

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift

+6-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import Combine
1111
import Foundation
1212

1313
typealias DisableSubscriptions = () -> Bool
14+
typealias SubscriptionFiltering = Bool
1415

1516
// Used for testing:
1617
typealias IncomingEventReconciliationQueueFactory =
@@ -21,6 +22,7 @@ typealias IncomingEventReconciliationQueueFactory =
2122
AuthCategoryBehavior?,
2223
AuthModeStrategy,
2324
ModelReconciliationQueueFactory?,
25+
SubscriptionFiltering,
2426
@escaping DisableSubscriptions
2527
) async -> IncomingEventReconciliationQueue
2628

@@ -52,6 +54,7 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
5254
auth: AuthCategoryBehavior? = nil,
5355
authModeStrategy: AuthModeStrategy,
5456
modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil,
57+
subscriptionFiltering: Bool,
5558
disableSubscriptions: @escaping () -> Bool = { false }) async {
5659
self.modelSchemasCount = modelSchemas.count
5760
self.modelReconciliationQueueSinks.set([:])
@@ -101,6 +104,7 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
101104
modelPredicate,
102105
auth,
103106
authModeStrategy,
107+
subscriptionFiltering,
104108
subscriptionsDisabled ? OperationDisabledIncomingSubscriptionEventPublisher() : nil)
105109

106110
reconciliationQueues.with { reconciliationQueues in
@@ -208,14 +212,15 @@ extension AWSIncomingEventReconciliationQueue: DefaultLogger {
208212
// MARK: - Static factory
209213
extension AWSIncomingEventReconciliationQueue {
210214
static let factory: IncomingEventReconciliationQueueFactory = {
211-
modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _, disableSubscriptions in
215+
modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _, subscriptionFiltering, disableSubscriptions in
212216
await AWSIncomingEventReconciliationQueue(modelSchemas: modelSchemas,
213217
api: api,
214218
storageAdapter: storageAdapter,
215219
syncExpressions: syncExpressions,
216220
auth: auth,
217221
authModeStrategy: authModeStrategy,
218222
modelReconciliationQueueFactory: nil,
223+
subscriptionFiltering: subscriptionFiltering,
219224
disableSubscriptions: disableSubscriptions)
220225
}
221226
}

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift

+4-2
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ final class AWSIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPubl
2626
api: APICategoryGraphQLBehaviorExtended,
2727
modelPredicate: QueryPredicate?,
2828
auth: AuthCategoryBehavior?,
29-
authModeStrategy: AuthModeStrategy) async {
29+
authModeStrategy: AuthModeStrategy,
30+
subscriptionFiltering: Bool) async {
3031
self.subscriptionEventSubject = PassthroughSubject<IncomingSubscriptionEventPublisherEvent, DataStoreError>()
3132
self.asyncEvents = await IncomingAsyncSubscriptionEventPublisher(modelSchema: modelSchema,
3233
api: api,
3334
modelPredicate: modelPredicate,
3435
auth: auth,
35-
authModeStrategy: authModeStrategy)
36+
authModeStrategy: authModeStrategy,
37+
subscriptionFiltering: subscriptionFiltering)
3638

3739
self.mapper = IncomingAsyncSubscriptionEventToAnyModelMapper()
3840
asyncEvents.subscribe(subscriber: mapper)

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift

+13-7
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
5151
modelPredicate: QueryPredicate?,
5252
auth: AuthCategoryBehavior?,
5353
authModeStrategy: AuthModeStrategy,
54+
subscriptionFiltering: Bool,
5455
awsAuthService: AWSAuthServiceBehavior? = nil) async {
5556
self.onCreateConnected = false
5657
self.onUpdateConnected = false
@@ -84,6 +85,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
8485
api: api,
8586
auth: auth,
8687
awsAuthService: self.awsAuthService,
88+
subscriptionFiltering: subscriptionFiltering,
8789
authTypeProvider: onCreateAuthTypeProvider),
8890
maxRetries: onCreateAuthTypeProvider.count,
8991
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
@@ -106,6 +108,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
106108
api: api,
107109
auth: auth,
108110
awsAuthService: self.awsAuthService,
111+
subscriptionFiltering: subscriptionFiltering,
109112
authTypeProvider: onUpdateAuthTypeProvider),
110113
maxRetries: onUpdateAuthTypeProvider.count,
111114
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
@@ -128,6 +131,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
128131
api: api,
129132
auth: auth,
130133
awsAuthService: self.awsAuthService,
134+
subscriptionFiltering: subscriptionFiltering,
131135
authTypeProvider: onDeleteAuthTypeProvider),
132136
maxRetries: onUpdateAuthTypeProvider.count,
133137
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
@@ -308,16 +312,18 @@ extension IncomingAsyncSubscriptionEventPublisher {
308312
api: APICategoryGraphQLBehaviorExtended,
309313
auth: AuthCategoryBehavior?,
310314
awsAuthService: AWSAuthServiceBehavior,
315+
subscriptionFiltering: Bool,
311316
authTypeProvider: AWSAuthorizationTypeIterator) -> RetryableGraphQLOperation<Payload>.RequestFactory {
312317
var authTypes = authTypeProvider
318+
313319
return {
314-
return await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema,
315-
where: predicate,
316-
subscriptionType: subscriptionType,
317-
api: api,
318-
auth: auth,
319-
authType: authTypes.next(),
320-
awsAuthService: awsAuthService)
320+
return await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema,
321+
where: subscriptionFiltering ? predicate : nil,
322+
subscriptionType: subscriptionType,
323+
api: api,
324+
auth: auth,
325+
authType: authTypes.next(),
326+
awsAuthService: awsAuthService)
321327
}
322328
}
323329
}

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ typealias ModelReconciliationQueueFactory = (
1919
QueryPredicate?,
2020
AuthCategoryBehavior?,
2121
AuthModeStrategy,
22+
SubscriptionFiltering,
2223
IncomingSubscriptionEventPublisher?
2324
) async -> ModelReconciliationQueue
2425

@@ -83,6 +84,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
8384
modelPredicate: QueryPredicate?,
8485
auth: AuthCategoryBehavior?,
8586
authModeStrategy: AuthModeStrategy,
87+
subscriptionFiltering: Bool,
8688
incomingSubscriptionEvents: IncomingSubscriptionEventPublisher? = nil) async {
8789

8890
self.modelSchema = modelSchema
@@ -108,7 +110,8 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
108110
api: api,
109111
modelPredicate: modelPredicate,
110112
auth: auth,
111-
authModeStrategy: authModeStrategy
113+
authModeStrategy: authModeStrategy,
114+
subscriptionFiltering: subscriptionFiltering
112115
)
113116
}
114117

AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueueTests.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase {
4242
storageAdapter: storageAdapter,
4343
syncExpressions: [],
4444
authModeStrategy: AWSDefaultAuthModeStrategy(),
45-
modelReconciliationQueueFactory: modelReconciliationQueueFactory)
45+
modelReconciliationQueueFactory: modelReconciliationQueueFactory,
46+
subscriptionFiltering: false)
4647
}
4748

4849
// This test case attempts to hit a race condition, and may be required to execute multiple times

AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisherTests.swift

+66-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase {
4141
modelPredicate: nil,
4242
auth: nil,
4343
authModeStrategy: AWSDefaultAuthModeStrategy(),
44+
subscriptionFiltering: false,
4445
awsAuthService: nil)
4546
let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper()
4647
asyncEvents.subscribe(subscriber: mapper)
@@ -73,6 +74,7 @@ final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase {
7374
modelPredicate: nil,
7475
auth: nil,
7576
authModeStrategy: AWSDefaultAuthModeStrategy(),
77+
subscriptionFiltering: false,
7678
awsAuthService: nil)
7779
let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper()
7880
asyncEvents.subscribe(subscriber: mapper)
@@ -107,10 +109,10 @@ final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase {
107109
sink.cancel()
108110
}
109111

110-
/// Given: IncomingAsyncSubscriptionEventPublisher initilized with modelPredicate
112+
/// Given: IncomingAsyncSubscriptionEventPublisher initilized with modelPredicate and subscriptionFiltering enabled
111113
/// When: IncomingAsyncSubscriptionEventPublisher subscribes to onCreate, onUpdate, onDelete events
112114
/// Then: IncomingAsyncSubscriptionEventPublisher provides correct filters in subscriptions request
113-
func testModelPredicateAsSubscribtionsFilter() async throws {
115+
func testSubscriptionFilteringEnabledModelPredicateAsSubscribtionsFilter() async throws {
114116

115117
let id1 = UUID().uuidString
116118
let id2 = UUID().uuidString
@@ -174,8 +176,70 @@ final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase {
174176
]),
175177
auth: nil,
176178
authModeStrategy: AWSDefaultAuthModeStrategy(),
179+
subscriptionFiltering: true,
177180
awsAuthService: nil)
178181

179182
await fulfillment(of: [correctFilterOnCreate, correctFilterOnUpdate, correctFilterOnDelete], timeout: 1)
180183
}
184+
185+
/// Given: IncomingAsyncSubscriptionEventPublisher initilized with modelPredicate and subscriptionFiltering disabled
186+
/// When: IncomingAsyncSubscriptionEventPublisher subscribes to onCreate, onUpdate, onDelete events
187+
/// Then: IncomingAsyncSubscriptionEventPublisher has no filters in subscriptions request
188+
func testSubscriptionFilteringDisabledModelPredicateIgnoredInSubscribtions() async throws {
189+
190+
let id1 = UUID().uuidString
191+
let id2 = UUID().uuidString
192+
193+
let noFilterOnCreate = expectation(description: "Correct no filter in onCreate request")
194+
let noFilterOnUpdate = expectation(description: "Correct no filter in onUpdate request")
195+
let noFilterOnDelete = expectation(description: "Correct no filter in onDelete request")
196+
197+
func validateVariables(_ variables: [String: Any]?) -> Bool {
198+
guard variables == nil else {
199+
XCTFail("The request contains variables with subscriptionFiltering disabled")
200+
return false
201+
}
202+
203+
return true
204+
}
205+
206+
let responder = SubscribeRequestListenerResponder<MutationSync<AnyModel>> { request, _, _ in
207+
if request.document.contains("onCreatePost") {
208+
if validateVariables(request.variables) {
209+
noFilterOnCreate.fulfill()
210+
}
211+
212+
} else if request.document.contains("onUpdatePost") {
213+
if validateVariables(request.variables) {
214+
noFilterOnUpdate.fulfill()
215+
}
216+
217+
} else if request.document.contains("onDeletePost") {
218+
if validateVariables(request.variables) {
219+
noFilterOnDelete.fulfill()
220+
}
221+
222+
} else {
223+
XCTFail("Unexpected request: \(request.document)")
224+
}
225+
226+
return nil
227+
}
228+
229+
apiPlugin.responders[.subscribeRequestListener] = responder
230+
231+
_ = await IncomingAsyncSubscriptionEventPublisher(
232+
modelSchema: Post.schema,
233+
api: apiPlugin,
234+
modelPredicate: QueryPredicateGroup(type: .or, predicates: [
235+
Post.keys.id.eq(id1),
236+
Post.keys.id.eq(id2)
237+
]),
238+
auth: nil,
239+
authModeStrategy: AWSDefaultAuthModeStrategy(),
240+
subscriptionFiltering: false,
241+
awsAuthService: nil)
242+
243+
await fulfillment(of: [noFilterOnCreate, noFilterOnUpdate, noFilterOnDelete], timeout: 1)
244+
}
181245
}

0 commit comments

Comments
 (0)