Skip to content

Commit 6d820c5

Browse files
authored
feat(data): Events Lib consistent exception handling (#3028)
1 parent cd3110f commit 6d820c5

File tree

7 files changed

+140
-46
lines changed

7 files changed

+140
-46
lines changed

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import com.amplifyframework.aws.appsync.core.LoggerProvider
1919
import com.amplifyframework.aws.appsync.events.data.ChannelAuthorizers
2020
import com.amplifyframework.aws.appsync.events.data.EventsException
2121
import com.amplifyframework.aws.appsync.events.data.PublishResult
22+
import com.amplifyframework.aws.appsync.events.data.toEventsException
2223
import kotlinx.coroutines.coroutineScope
2324
import kotlinx.serialization.json.Json
2425
import kotlinx.serialization.json.JsonElement
@@ -92,7 +93,11 @@ class Events @VisibleForTesting internal constructor(
9293
event: JsonElement,
9394
authorizer: AppSyncAuthorizer = this.defaultChannelAuthorizers.publishAuthorizer
9495
): PublishResult {
95-
return httpClient.post(channelName, authorizer, event)
96+
return try {
97+
httpClient.post(channelName, authorizer, event)
98+
} catch (exception: Exception) {
99+
throw exception.toEventsException()
100+
}
96101
}
97102

98103
/**
@@ -109,7 +114,11 @@ class Events @VisibleForTesting internal constructor(
109114
events: List<JsonElement>,
110115
authorizer: AppSyncAuthorizer = this.defaultChannelAuthorizers.publishAuthorizer
111116
): PublishResult {
112-
return httpClient.post(channelName, authorizer, events)
117+
return try {
118+
httpClient.post(channelName, authorizer, events)
119+
} catch (exception: Exception) {
120+
throw exception.toEventsException()
121+
}
113122
}
114123

115124
/**
@@ -129,8 +138,6 @@ class Events @VisibleForTesting internal constructor(
129138
*
130139
* @param flushEvents set to true (default) to allow all pending publish calls to succeed before disconnecting.
131140
* Setting to false will immediately disconnect, cancelling any in-progress or queued event publishes.
132-
* @param authorizers for the channel to use for subscriptions and publishes.
133-
* @return a channel to manage subscriptions and publishes.
134141
*/
135142
suspend fun disconnect(flushEvents: Boolean = true): Unit = coroutineScope {
136143
eventsWebSocketProvider.existingWebSocket?.disconnect(flushEvents)

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsChannel.kt

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import com.amplifyframework.aws.appsync.events.data.EventsMessage
2222
import com.amplifyframework.aws.appsync.events.data.PublishResult
2323
import com.amplifyframework.aws.appsync.events.data.UserClosedConnectionException
2424
import com.amplifyframework.aws.appsync.events.data.WebSocketMessage
25+
import com.amplifyframework.aws.appsync.events.data.toEventsException
2526
import java.util.UUID
2627
import kotlinx.coroutines.Dispatchers
2728
import kotlinx.coroutines.async
@@ -83,12 +84,16 @@ class EventsChannel internal constructor(
8384
* @param authorizer for the publish call. If not provided, the EventChannel publish authorizer will be used.
8485
* @return result of publish.
8586
*/
86-
@Throws(Exception::class)
87+
@Throws(EventsException::class)
8788
suspend fun publish(
8889
event: JsonElement,
8990
authorizer: AppSyncAuthorizer = this.authorizers.publishAuthorizer
9091
): PublishResult {
91-
return publish(listOf(event), authorizer)
92+
return try {
93+
publishToWebSocket(listOf(event), authorizer)
94+
} catch (exception: Exception) {
95+
throw exception.toEventsException()
96+
}
9297
}
9398

9499
/**
@@ -102,6 +107,17 @@ class EventsChannel internal constructor(
102107
suspend fun publish(
103108
events: List<JsonElement>,
104109
authorizer: AppSyncAuthorizer = this.authorizers.publishAuthorizer
110+
): PublishResult {
111+
return try {
112+
publishToWebSocket(events, authorizer)
113+
} catch (exception: Exception) {
114+
throw exception.toEventsException()
115+
}
116+
}
117+
118+
private suspend fun publishToWebSocket(
119+
events: List<JsonElement>,
120+
authorizer: AppSyncAuthorizer
105121
): PublishResult = coroutineScope {
106122
val publishId = UUID.randomUUID().toString()
107123
val publishMessage = WebSocketMessage.Send.Publish(

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import com.amplifyframework.aws.appsync.core.LoggerProvider
2121
import com.amplifyframework.aws.appsync.events.data.ConnectException
2222
import com.amplifyframework.aws.appsync.events.data.EventsException
2323
import com.amplifyframework.aws.appsync.events.data.WebSocketMessage
24+
import com.amplifyframework.aws.appsync.events.data.toEventsException
2425
import com.amplifyframework.aws.appsync.events.utils.ConnectionTimeoutTimer
2526
import com.amplifyframework.aws.appsync.events.utils.HeaderKeys
2627
import com.amplifyframework.aws.appsync.events.utils.HeaderValues

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/RestClient.kt

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ package com.amplifyframework.aws.appsync.events
1717

1818
import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
1919
import com.amplifyframework.aws.appsync.core.AppSyncRequest
20+
import com.amplifyframework.aws.appsync.events.data.EventsErrors
21+
import com.amplifyframework.aws.appsync.events.data.EventsException
2022
import com.amplifyframework.aws.appsync.events.data.PublishResult
23+
import com.amplifyframework.aws.appsync.events.data.toEventsException
2124
import com.amplifyframework.aws.appsync.events.utils.HeaderKeys
2225
import com.amplifyframework.aws.appsync.events.utils.HeaderValues
2326
import kotlinx.serialization.json.Json
@@ -74,15 +77,17 @@ internal class RestClient(
7477
}
7578
}.build()
7679

77-
try {
78-
val result = okHttpClient.newCall(authRequest).execute()
79-
return if (result.isSuccessful) {
80-
json.decodeFromString<PublishResult>(result.body.string())
81-
} else {
82-
TODO("Convert to proper exception type")
80+
val result = okHttpClient.newCall(authRequest).execute()
81+
val body = result.body.string()
82+
return if (result.isSuccessful) {
83+
json.decodeFromString<PublishResult>(body)
84+
} else {
85+
throw try {
86+
val errors = json.decodeFromString<EventsErrors>(body)
87+
errors.toEventsException("Failed to post event(s)")
88+
} catch (e: Exception) {
89+
EventsException.unknown("Failed to post event(s)", e)
8390
}
84-
} catch (e: Exception) {
85-
TODO("Convert to proper exception type")
8691
}
8792
}
8893
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
package com.amplifyframework.aws.appsync.events.data
16+
17+
import com.amplifyframework.aws.appsync.events.data.WebSocketMessage.Received
18+
import kotlinx.serialization.Serializable
19+
20+
@Serializable
21+
internal data class EventsErrors(val errors: List<EventsError>) : Received()
22+
23+
internal fun EventsErrors.toEventsException(fallbackMessage: String? = null): EventsException {
24+
return errors.firstOrNull()?.toEventsException(fallbackMessage) ?: EventsException.unknown(fallbackMessage)
25+
}
26+
27+
@Serializable
28+
internal data class EventsError(val errorType: String, val message: String? = null)
29+
30+
// fallback message is only used if WebSocketError didn't provide a message
31+
internal fun EventsError.toEventsException(fallbackMessage: String? = null): EventsException {
32+
val message = this.message ?: fallbackMessage
33+
return when (errorType) {
34+
"UnauthorizedException" -> UnauthorizedException(message)
35+
"BadRequestException" -> BadRequestException(message)
36+
"MaxSubscriptionsReachedError" -> MaxSubscriptionsReachedException(message)
37+
"LimitExceededError" -> RateLimitExceededException(message)
38+
"ResourceNotFound" -> ResourceNotFoundException(message)
39+
"UnsupportedOperation" -> UnsupportedOperationException(message)
40+
"InvalidInputError" -> InvalidInputException(message)
41+
else -> EventsException(message = "$errorType: $message")
42+
}
43+
}

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/EventsException.kt

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
package com.amplifyframework.aws.appsync.events.data
1616

17+
import java.net.UnknownHostException
18+
1719
/**
1820
* Base class for exceptions thrown in Events library
1921
*
@@ -28,15 +30,27 @@ open class EventsException internal constructor(
2830
) : Exception(message, cause) {
2931

3032
internal companion object {
31-
internal fun unknown(message: String? = null): EventsException {
33+
internal fun unknown(
34+
message: String? = null,
35+
cause: Throwable? = null
36+
): EventsException {
3237
return EventsException(
3338
message = message ?: "An unknown error occurred",
39+
cause = cause,
3440
recoverySuggestion = "This is not expected to occur. Contact AWS"
3541
)
3642
}
3743
}
3844
}
3945

46+
fun Exception.toEventsException(): EventsException {
47+
return when (this) {
48+
is EventsException -> this
49+
is UnknownHostException -> NetworkException(throwable = this)
50+
else -> EventsException.unknown(cause = this)
51+
}
52+
}
53+
4054
/**
4155
* Thrown when failing to connect to Events WebSocket.
4256
*/
@@ -66,43 +80,64 @@ class ConnectionClosedException internal constructor(cause: Throwable? = null) :
6680
/**
6781
* Thrown when rate limit is exceeded.
6882
*/
69-
internal class RateLimitExceededException internal constructor() : EventsException(
70-
message = "Rate limit exceeded",
83+
class RateLimitExceededException internal constructor(message: String?) : EventsException(
84+
message = message ?: "Rate limit exceeded",
7185
recoverySuggestion = "Try again later"
7286
)
7387

7488
/**
7589
* Thrown when operation is unsupported.
7690
*/
77-
internal class UnsupportedOperationException internal constructor() : EventsException(
78-
message = "WebSocket did not understand the operation",
91+
class UnsupportedOperationException internal constructor(message: String?) : EventsException(
92+
message = message ?: "WebSocket did not understand the operation",
7993
recoverySuggestion = "This is not expected to occur. Contact AWS"
8094
)
8195

8296
/**
8397
* Thrown when resource is not found.
8498
*/
85-
internal class ResourceNotFoundException internal constructor() : EventsException(
86-
message = "Resource not found",
87-
recoverySuggestion = "Check Event configuration values and try again"
99+
class ResourceNotFoundException internal constructor(message: String?) : EventsException(
100+
message = message ?: "Namespace not found",
101+
recoverySuggestion = "Check resource values and try again"
88102
)
89103

90104
/**
91105
* Thrown when hitting max subscription limit.
92106
*/
93-
class MaxSubscriptionsReachedException internal constructor(throwable: Throwable) : EventsException(
94-
message = "Max number of subscriptions reached",
107+
class MaxSubscriptionsReachedException internal constructor(message: String?) : EventsException(
108+
message = message ?: "Max number of subscriptions reached",
95109
recoverySuggestion = "Unsubscribe from existing channels before attempting to subscribe."
96110
)
97111

98112
/**
99113
* Thrown when attempting to send too many events or invalid request.
100114
*/
101-
class BadRequestException internal constructor() : EventsException(
102-
message = "Input exceeded 5 event limit",
103-
recoverySuggestion = "Submit 5 events or less."
115+
class BadRequestException internal constructor(message: String?) : EventsException(
116+
message = message ?: "An unknown error occurred"
117+
)
118+
119+
/**
120+
* Thrown when attempting to send too many events or invalid request over websocket.
121+
*/
122+
class InvalidInputException internal constructor(message: String?) : EventsException(
123+
message = message ?: "An unknown error occurred"
104124
)
105125

126+
/**
127+
* Thrown when we detect a failure in the network.
128+
* See the cause for the underlying error.
129+
*/
130+
class NetworkException internal constructor(throwable: Throwable) : EventsException(
131+
message = "Network error",
132+
cause = throwable,
133+
recoverySuggestion = "Check your internet connection and try again. See the cause for more details."
134+
)
135+
136+
/**
137+
* An internal exception that is not provided to the customer.
138+
* We use this exception so that we can differentiate between a connection being closed unexpectedly or by the user.
139+
* If the connection is closed by the user, we catch this exception and don't propagate it to the customer.
140+
*/
106141
internal class UserClosedConnectionException internal constructor() : EventsException(
107142
message = "The websocket connection was closed normally"
108143
)

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ internal sealed class WebSocketMessage {
8383
internal data object KeepAlive : Received()
8484

8585
@Serializable @SerialName("connection_error")
86-
internal data class ConnectionError(val errors: List<WebSocketError>) : Received()
86+
internal data class ConnectionError(val errors: List<EventsError>) : Received()
8787

8888
@Serializable
8989
internal sealed class Subscription : Received() {
@@ -101,13 +101,13 @@ internal sealed class WebSocketMessage {
101101
@Serializable @SerialName("subscribe_error")
102102
internal data class SubscribeError(
103103
override val id: String,
104-
override val errors: List<WebSocketError>
104+
override val errors: List<EventsError>
105105
) : Subscription(), ErrorContainer
106106

107107
@Serializable @SerialName("unsubscribe_error")
108108
internal data class UnsubscribeError(
109109
override val id: String,
110-
override val errors: List<WebSocketError>
110+
override val errors: List<EventsError>
111111
) : Subscription(), ErrorContainer
112112
}
113113

@@ -121,13 +121,13 @@ internal sealed class WebSocketMessage {
121121
@Serializable @SerialName("publish_error")
122122
data class PublishError(
123123
override val id: String? = null,
124-
override val errors: List<WebSocketError>
124+
override val errors: List<EventsError>
125125
) : Received(), ErrorContainer
126126

127127
@Serializable @SerialName("error")
128128
data class Error(
129129
override val id: String? = null,
130-
override val errors: List<WebSocketError>
130+
override val errors: List<EventsError>
131131
) : Received(), ErrorContainer
132132
}
133133

@@ -136,19 +136,6 @@ internal sealed class WebSocketMessage {
136136
// All errors contain an id and errors list
137137
internal interface ErrorContainer {
138138
val id: String?
139-
val errors: List<WebSocketError>
140-
}
141-
}
142-
143-
@Serializable
144-
data class WebSocketError(val errorType: String, val message: String? = null) {
145-
146-
// fallback message is only used if WebSocketError didn't provide a message
147-
fun toEventsException(fallbackMessage: String? = null): EventsException {
148-
val message = this.message ?: fallbackMessage
149-
return when (errorType) {
150-
"UnauthorizedException" -> UnauthorizedException(message)
151-
else -> EventsException(message = "$errorType: $message")
152-
}
139+
val errors: List<EventsError>
153140
}
154141
}

0 commit comments

Comments
 (0)