Skip to content

Commit e70a3f5

Browse files
committed
fix(DataStore): dataStore cannot connect to model's sync subscriptions (AWS_LAMBDA auth type) #3549
1 parent b96fbda commit e70a3f5

File tree

3 files changed

+124
-19
lines changed

3 files changed

+124
-19
lines changed

Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

+46-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public protocol AnyGraphQLOperation {
1212
associatedtype Success
1313
associatedtype Failure: Error
1414
typealias ResultListener = (Result<Success, Failure>) -> Void
15+
typealias ErrorListener = (Failure) -> Void
1516
}
1617

1718
/// Abastraction for a retryable GraphQLOperation.
@@ -24,6 +25,7 @@ public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger {
2425
typealias RequestFactory = () async -> GraphQLRequest<Payload>
2526
typealias OperationFactory = (GraphQLRequest<Payload>, @escaping OperationResultListener) -> OperationType
2627
typealias OperationResultListener = OperationType.ResultListener
28+
typealias OperationErrorListener = OperationType.ErrorListener
2729

2830
/// Operation unique identifier
2931
var id: UUID { get }
@@ -45,9 +47,12 @@ public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger {
4547
var operationFactory: OperationFactory { get }
4648

4749
var resultListener: OperationResultListener { get }
50+
51+
var errorListener: OperationErrorListener { get }
4852

4953
init(requestFactory: @escaping RequestFactory,
5054
maxRetries: Int,
55+
errorListener: @escaping OperationErrorListener,
5156
resultListener: @escaping OperationResultListener,
5257
_ operationFactory: @escaping OperationFactory)
5358

@@ -71,6 +76,11 @@ extension RetryableGraphQLOperationBehavior {
7176
attempts += 1
7277
log.debug("[\(id)] - Try [\(attempts)/\(maxRetries)]")
7378
let wrappedResultListener: OperationResultListener = { result in
79+
if case let .failure(error) = result {
80+
// Give an operation a chance to prepare itself for a retry after a failure
81+
self.errorListener(error)
82+
}
83+
7484
if case let .failure(error) = result, self.shouldRetry(error: error as? APIError) {
7585
self.log.debug("\(error)")
7686
Task {
@@ -103,17 +113,20 @@ public final class RetryableGraphQLOperation<Payload: Decodable>: Operation, Ret
103113
public var attempts: Int = 0
104114
public var requestFactory: RequestFactory
105115
public var underlyingOperation: AtomicValue<GraphQLOperation<Payload>?> = AtomicValue(initialValue: nil)
116+
public var errorListener: OperationErrorListener
106117
public var resultListener: OperationResultListener
107118
public var operationFactory: OperationFactory
108119

109120
public init(requestFactory: @escaping RequestFactory,
110121
maxRetries: Int,
122+
errorListener: @escaping OperationErrorListener,
111123
resultListener: @escaping OperationResultListener,
112124
_ operationFactory: @escaping OperationFactory) {
113125
self.id = UUID()
114126
self.maxRetries = max(1, maxRetries)
115127
self.requestFactory = requestFactory
116128
self.operationFactory = operationFactory
129+
self.errorListener = errorListener
117130
self.resultListener = resultListener
118131
}
119132

@@ -154,17 +167,21 @@ public final class RetryableGraphQLSubscriptionOperation<Payload: Decodable>: Op
154167
public var attempts: Int = 0
155168
public var underlyingOperation: AtomicValue<GraphQLSubscriptionOperation<Payload>?> = AtomicValue(initialValue: nil)
156169
public var requestFactory: RequestFactory
170+
public var errorListener: OperationErrorListener
157171
public var resultListener: OperationResultListener
158172
public var operationFactory: OperationFactory
159-
173+
private var filterLimitRetried: Bool = false
174+
160175
public init(requestFactory: @escaping RequestFactory,
161176
maxRetries: Int,
177+
errorListener: @escaping OperationErrorListener,
162178
resultListener: @escaping OperationResultListener,
163179
_ operationFactory: @escaping OperationFactory) {
164180
self.id = UUID()
165181
self.maxRetries = max(1, maxRetries)
166182
self.requestFactory = requestFactory
167183
self.operationFactory = operationFactory
184+
self.errorListener = errorListener
168185
self.resultListener = resultListener
169186
}
170187
public override func main() {
@@ -178,9 +195,35 @@ public final class RetryableGraphQLSubscriptionOperation<Payload: Decodable>: Op
178195
}
179196

180197
public func shouldRetry(error: APIError?) -> Bool {
181-
return attempts < maxRetries
198+
// return attempts < maxRetries
199+
200+
guard case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error else {
201+
return false
202+
}
203+
204+
if let authError = underlyingError as? AuthError {
205+
switch authError {
206+
case .signedOut, .notAuthorized:
207+
return attempts < maxRetries
208+
default:
209+
return false
210+
}
211+
}
212+
213+
// TODO: - How to distinguish errors?
214+
// TODO: - Handle other errors
215+
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") &&
216+
filterLimitRetried == false {
217+
218+
// Just to be sure that endless retry won't happen
219+
filterLimitRetried = true
220+
maxRetries += 1
221+
222+
return true
223+
}
224+
225+
return false
182226
}
183-
184227
}
185228

186229
// MARK: GraphQLOperation - GraphQLSubscriptionOperation + AnyGraphQLOperation

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ final class InitialSyncOperation: AsynchronousOperation {
194194
lastSync: lastSyncTime,
195195
authType: authTypes.next())
196196
},
197-
maxRetries: authTypes.count,
197+
maxRetries: authTypes.count,
198+
errorListener: { _ in },
198199
resultListener: completionListener) { nextRequest, wrappedCompletionListener in
199200
api.query(request: nextRequest, listener: wrappedCompletionListener)
200201
}.main()

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift

+76-15
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
7373

7474
// onCreate operation
7575
let onCreateValueListener = onCreateValueListenerHandler(event:)
76-
let onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
76+
var onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
7777
operations: [.create, .read])
78+
var onCreateAuthType: AWSAuthorizationType? = onCreateAuthTypeProvider.next()
79+
var onCreateModelPredicate = modelPredicate
80+
7881
self.onCreateValueListener = onCreateValueListener
7982
self.onCreateOperation = RetryableGraphQLSubscriptionOperation(
8083
requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
8184
for: modelSchema,
82-
where: modelPredicate,
85+
where: { onCreateModelPredicate },
8386
subscriptionType: .onCreate,
8487
api: api,
8588
auth: auth,
8689
awsAuthService: self.awsAuthService,
87-
authTypeProvider: onCreateAuthTypeProvider),
90+
authTypeProvider: { onCreateAuthType }),
8891
maxRetries: onCreateAuthTypeProvider.count,
92+
errorListener: { error in
93+
// TODO: - How to distinguish errors?
94+
// TODO: - Handle other errors
95+
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") {
96+
onCreateModelPredicate = nil
97+
98+
} else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error,
99+
let authError = underlyingError as? AuthError {
100+
101+
switch authError {
102+
case .signedOut, .notAuthorized:
103+
onCreateAuthType = onCreateAuthTypeProvider.next()
104+
default:
105+
return
106+
}
107+
}
108+
},
89109
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
90110
api.subscribe(request: nextRequest,
91111
valueListener: onCreateValueListener,
@@ -95,19 +115,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
95115

96116
// onUpdate operation
97117
let onUpdateValueListener = onUpdateValueListenerHandler(event:)
98-
let onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
118+
var onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
99119
operations: [.update, .read])
120+
var onUpdateAuthType: AWSAuthorizationType? = onUpdateAuthTypeProvider.next()
121+
var onUpdateModelPredicate = modelPredicate
122+
100123
self.onUpdateValueListener = onUpdateValueListener
101124
self.onUpdateOperation = RetryableGraphQLSubscriptionOperation(
102125
requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
103126
for: modelSchema,
104-
where: modelPredicate,
127+
where: { onUpdateModelPredicate },
105128
subscriptionType: .onUpdate,
106129
api: api,
107130
auth: auth,
108131
awsAuthService: self.awsAuthService,
109-
authTypeProvider: onUpdateAuthTypeProvider),
132+
authTypeProvider: { onUpdateAuthType }),
110133
maxRetries: onUpdateAuthTypeProvider.count,
134+
errorListener: { error in
135+
// TODO: - How to distinguish errors?
136+
// TODO: - Handle other errors
137+
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") {
138+
onUpdateModelPredicate = nil
139+
140+
} else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error,
141+
let authError = underlyingError as? AuthError {
142+
143+
switch authError {
144+
case .signedOut, .notAuthorized:
145+
onUpdateAuthType = onUpdateAuthTypeProvider.next()
146+
default:
147+
return
148+
}
149+
}
150+
},
111151
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
112152
api.subscribe(request: nextRequest,
113153
valueListener: onUpdateValueListener,
@@ -117,19 +157,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
117157

118158
// onDelete operation
119159
let onDeleteValueListener = onDeleteValueListenerHandler(event:)
120-
let onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
160+
var onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
121161
operations: [.delete, .read])
162+
var onDeleteAuthType: AWSAuthorizationType? = onDeleteAuthTypeProvider.next()
163+
var onDeleteModelPredicate = modelPredicate
164+
122165
self.onDeleteValueListener = onDeleteValueListener
123166
self.onDeleteOperation = RetryableGraphQLSubscriptionOperation(
124167
requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
125168
for: modelSchema,
126-
where: modelPredicate,
169+
where: { onDeleteModelPredicate },
127170
subscriptionType: .onDelete,
128171
api: api,
129172
auth: auth,
130173
awsAuthService: self.awsAuthService,
131-
authTypeProvider: onDeleteAuthTypeProvider),
174+
authTypeProvider: { onDeleteAuthType }),
132175
maxRetries: onUpdateAuthTypeProvider.count,
176+
errorListener: { error in
177+
// TODO: - How to distinguish errors?
178+
// TODO: - Handle other errors
179+
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") {
180+
onDeleteModelPredicate = nil
181+
182+
} else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error,
183+
let authError = underlyingError as? AuthError {
184+
185+
switch authError {
186+
case .signedOut, .notAuthorized:
187+
onDeleteAuthType = onDeleteAuthTypeProvider.next()
188+
default:
189+
return
190+
}
191+
}
192+
},
133193
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
134194
api.subscribe(request: nextRequest,
135195
valueListener: onDeleteValueListener,
@@ -204,6 +264,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
204264
auth: AuthCategoryBehavior?,
205265
authType: AWSAuthorizationType?,
206266
awsAuthService: AWSAuthServiceBehavior) async -> GraphQLRequest<Payload> {
267+
207268
let request: GraphQLRequest<Payload>
208269
if modelSchema.hasAuthenticationRules,
209270
let _ = auth,
@@ -303,20 +364,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
303364
// MARK: - IncomingAsyncSubscriptionEventPublisher + API request factory
304365
extension IncomingAsyncSubscriptionEventPublisher {
305366
static func apiRequestFactoryFor(for modelSchema: ModelSchema,
306-
where predicate: QueryPredicate?,
367+
where predicate: @escaping () -> QueryPredicate?,
307368
subscriptionType: GraphQLSubscriptionType,
308369
api: APICategoryGraphQLBehaviorExtended,
309370
auth: AuthCategoryBehavior?,
310371
awsAuthService: AWSAuthServiceBehavior,
311-
authTypeProvider: AWSAuthorizationTypeIterator) -> RetryableGraphQLOperation<Payload>.RequestFactory {
312-
var authTypes = authTypeProvider
372+
authTypeProvider: @escaping () -> AWSAuthorizationType?) -> RetryableGraphQLOperation<Payload>.RequestFactory {
373+
313374
return {
314-
return await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema,
315-
where: predicate,
375+
await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema,
376+
where: predicate(),
316377
subscriptionType: subscriptionType,
317378
api: api,
318379
auth: auth,
319-
authType: authTypes.next(),
380+
authType: authTypeProvider(),
320381
awsAuthService: awsAuthService)
321382
}
322383
}

0 commit comments

Comments
 (0)