Skip to content

Commit cd3110f

Browse files
authored
feat(data): Events Lib WS Publishing (#3027)
1 parent d28f9e9 commit cd3110f

File tree

7 files changed

+227
-95
lines changed

7 files changed

+227
-95
lines changed

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class Events @VisibleForTesting internal constructor(
122122
fun channel(
123123
channelName: String,
124124
authorizers: ChannelAuthorizers = this.defaultChannelAuthorizers,
125-
) = EventsChannel(channelName, authorizers, endpoints, eventsWebSocketProvider)
125+
) = EventsChannel(channelName, authorizers, eventsWebSocketProvider)
126126

127127
/**
128128
* Method to disconnect from all channels.

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsChannel.kt

Lines changed: 71 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package com.amplifyframework.aws.appsync.events
1616

1717
import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
18-
import com.amplifyframework.aws.appsync.core.AppSyncRequest
1918
import com.amplifyframework.aws.appsync.events.data.ChannelAuthorizers
2019
import com.amplifyframework.aws.appsync.events.data.ConnectionClosedException
2120
import com.amplifyframework.aws.appsync.events.data.EventsException
@@ -34,7 +33,9 @@ import kotlinx.coroutines.flow.flow
3433
import kotlinx.coroutines.flow.flowOn
3534
import kotlinx.coroutines.flow.onCompletion
3635
import kotlinx.coroutines.flow.onStart
36+
import kotlinx.serialization.json.JsonArray
3737
import kotlinx.serialization.json.JsonElement
38+
import kotlinx.serialization.json.JsonPrimitive
3839

3940
/**
4041
* A class to manage channel subscriptions and publishes
@@ -45,7 +46,6 @@ import kotlinx.serialization.json.JsonElement
4546
class EventsChannel internal constructor(
4647
val name: String,
4748
val authorizers: ChannelAuthorizers,
48-
private val endpoints: EventsEndpoints,
4949
private val eventsWebSocketProvider: EventsWebSocketProvider
5050
) {
5151

@@ -83,12 +83,12 @@ class EventsChannel internal constructor(
8383
* @param authorizer for the publish call. If not provided, the EventChannel publish authorizer will be used.
8484
* @return result of publish.
8585
*/
86-
@Throws(EventsException::class)
86+
@Throws(Exception::class)
8787
suspend fun publish(
8888
event: JsonElement,
8989
authorizer: AppSyncAuthorizer = this.authorizers.publishAuthorizer
9090
): PublishResult {
91-
TODO("Need to implement")
91+
return publish(listOf(event), authorizer)
9292
}
9393

9494
/**
@@ -98,12 +98,43 @@ class EventsChannel internal constructor(
9898
* @param authorizer for the publish call. If not provided, the EventChannel publish authorizer will be used.
9999
* @return result of publish.
100100
*/
101-
@Throws(EventsException::class)
101+
@Throws(Exception::class)
102102
suspend fun publish(
103103
events: List<JsonElement>,
104104
authorizer: AppSyncAuthorizer = this.authorizers.publishAuthorizer
105-
): PublishResult {
106-
TODO("Need to implement")
105+
): PublishResult = coroutineScope {
106+
val publishId = UUID.randomUUID().toString()
107+
val publishMessage = WebSocketMessage.Send.Publish(
108+
id = publishId,
109+
channel = name,
110+
events = JsonArray(events.map { JsonPrimitive(it.toString()) }),
111+
)
112+
113+
val webSocket = eventsWebSocketProvider.getConnectedWebSocket()
114+
val deferredResponse = async { getPublishResponse(webSocket, publishId) }
115+
116+
val queued = webSocket.sendWithAuthorizer(publishMessage, authorizer)
117+
if (!queued) {
118+
throw webSocket.disconnectReason?.toCloseException() ?: ConnectionClosedException()
119+
}
120+
121+
return@coroutineScope when (val response = deferredResponse.await()) {
122+
is WebSocketMessage.Received.PublishSuccess -> {
123+
PublishResult(response.successfulEvents, response.failedEvents)
124+
}
125+
126+
is WebSocketMessage.ErrorContainer -> {
127+
val fallbackMessage = "Failed to publish event(s)"
128+
throw response.errors.firstOrNull()?.toEventsException(fallbackMessage)
129+
?: EventsException(fallbackMessage)
130+
}
131+
132+
is WebSocketMessage.Closed -> {
133+
throw response.reason.toCloseException()
134+
}
135+
136+
else -> throw EventsException("Received unexpected publish response of type: ${response::class}")
137+
}
107138
}
108139

109140
private fun createSubscriptionEventDataFlow(subscriptionHolder: SubscriptionHolder): Flow<EventsMessage> {
@@ -116,11 +147,12 @@ class EventsChannel internal constructor(
116147
emit(EventsMessage(it.event))
117148
}
118149
it is WebSocketMessage.Closed -> {
119-
if (it.reason is DisconnectReason.UserInitiated) {
120-
throw UserClosedConnectionException()
121-
} else {
122-
throw ConnectionClosedException(it.reason.throwable)
123-
}
150+
throw it.reason.toCloseException()
151+
}
152+
it is WebSocketMessage.ErrorContainer && it.id == subscriptionHolder.id -> {
153+
val exceptionMessage = "Received error for subscription"
154+
throw it.errors.firstOrNull()?.toEventsException(exceptionMessage)
155+
?: EventsException(exceptionMessage)
124156
}
125157
else -> Unit
126158
}
@@ -133,24 +165,31 @@ class EventsChannel internal constructor(
133165
subscriptionId: String,
134166
authorizer: AppSyncAuthorizer
135167
): Boolean = coroutineScope {
168+
// create a deferred holder for subscription response
136169
val deferredSubscriptionResponse = async { getSubscriptionResponse(webSocket, subscriptionId) }
137170

138171
// Publish subscription to websocket
139-
publishSubscription(webSocket, subscriptionId, authorizer)
172+
val queued = webSocket.sendWithAuthorizer(
173+
webSocketMessage = WebSocketMessage.Send.Subscription.Subscribe(id = subscriptionId, channel = name),
174+
authorizer = authorizer
175+
)
176+
if (!queued) {
177+
throw webSocket.disconnectReason?.toCloseException() ?: ConnectionClosedException()
178+
}
140179

141180
// Wait for subscription result to return
142181
when (val response = deferredSubscriptionResponse.await()) {
143182
is WebSocketMessage.Received.Subscription.SubscribeSuccess -> {
144183
return@coroutineScope true
145184
}
146-
is WebSocketMessage.Received.Subscription.SubscribeError -> {
185+
is WebSocketMessage.ErrorContainer -> {
147186
val exceptionMessage = "Subscribe failed for channel: $name"
148187
throw response.errors.firstOrNull()
149188
?.toEventsException(exceptionMessage)
150189
?: EventsException(exceptionMessage)
151190
}
152-
is WebSocketMessage.Received.ConnectionClosed -> {
153-
throw ConnectionClosedException()
191+
is WebSocketMessage.Closed -> {
192+
throw response.reason.toCloseException()
154193
}
155194
else -> throw EventsException("Received unexpected subscription response of type: ${response::class}")
156195
}
@@ -160,34 +199,22 @@ class EventsChannel internal constructor(
160199
return webSocket.events.first {
161200
when {
162201
it is WebSocketMessage.Received.Subscription && it.id == subscriptionId -> true
163-
it is WebSocketMessage.Received.ConnectionClosed -> true
202+
it is WebSocketMessage.ErrorContainer && it.id == subscriptionId -> true
203+
it is WebSocketMessage.Closed -> true
164204
else -> false
165205
}
166206
}
167207
}
168208

169-
private suspend fun publishSubscription(
170-
webSocket: EventsWebSocket,
171-
subscriptionId: String,
172-
authorizer: AppSyncAuthorizer
173-
) {
174-
val subscribeMessage = WebSocketMessage.Send.Subscription.Subscribe(
175-
id = subscriptionId,
176-
channel = name,
177-
authorization = authorizer.getAuthorizationHeaders(
178-
object : AppSyncRequest {
179-
override val url: String
180-
get() = endpoints.restEndpoint.toString()
181-
override val body: String?
182-
get() = null
183-
override val headers: Map<String, String>
184-
get() = emptyMap()
185-
override val method: AppSyncRequest.HttpMethod
186-
get() = AppSyncRequest.HttpMethod.GET
187-
}
188-
)
189-
)
190-
webSocket.send(subscribeMessage)
209+
private suspend fun getPublishResponse(webSocket: EventsWebSocket, publishId: String): WebSocketMessage {
210+
return webSocket.events.first {
211+
when {
212+
it is WebSocketMessage.Received.PublishSuccess && it.id == publishId -> true
213+
it is WebSocketMessage.ErrorContainer && it.id == publishId -> true
214+
it is WebSocketMessage.Closed -> true
215+
else -> false
216+
}
217+
}
191218
}
192219

193220
private fun completeSubscription(subscriptionHolder: SubscriptionHolder, throwable: Throwable?) {
@@ -198,7 +225,11 @@ class EventsChannel internal constructor(
198225

199226
if (currentWebSocket != null && isSubscribed && !isDisconnected) {
200227
// Unsubscribe from channel when flow is completed
201-
currentWebSocket.send(WebSocketMessage.Send.Subscription.Unsubscribe(subscriptionHolder.id))
228+
try {
229+
currentWebSocket.send(WebSocketMessage.Send.Subscription.Unsubscribe(subscriptionHolder.id))
230+
} catch (e: Exception) {
231+
// do nothing with a failed unsubscribe post
232+
}
202233
}
203234
}
204235
}

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt

Lines changed: 66 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ import kotlinx.coroutines.flow.asSharedFlow
3131
import kotlinx.coroutines.flow.first
3232
import kotlinx.serialization.encodeToString
3333
import kotlinx.serialization.json.Json
34+
import kotlinx.serialization.json.JsonObject
35+
import kotlinx.serialization.json.JsonPrimitive
36+
import kotlinx.serialization.json.encodeToJsonElement
37+
import kotlinx.serialization.json.jsonObject
3438
import okhttp3.OkHttpClient
3539
import okhttp3.Request
3640
import okhttp3.Response
@@ -50,8 +54,9 @@ internal class EventsWebSocket(
5054

5155
private lateinit var webSocket: WebSocket
5256
@Volatile internal var isClosed = false
53-
private var disconnectReason: DisconnectReason? = null
57+
internal var disconnectReason: WebSocketDisconnectReason? = null
5458
private val connectionTimeoutTimer = ConnectionTimeoutTimer(onTimeout = ::onTimeout)
59+
val preAuthPublishHeaders: Map<String, String> by lazy { mapOf(HeaderKeys.HOST to eventsEndpoints.host) }
5560
private val logger = loggerProvider?.getLogger(TAG)
5661

5762
@Throws(ConnectException::class)
@@ -75,7 +80,7 @@ internal class EventsWebSocket(
7580
webSocket.cancel()
7681
throw ConnectException(connectionResponse.reason.throwable)
7782
}
78-
is WebSocketMessage.Received.ConnectionError -> {
83+
is WebSocketMessage.ErrorContainer -> {
7984
webSocket.cancel()
8085
throw ConnectException(
8186
connectionResponse.errors.firstOrNull()?.toEventsException()
@@ -91,7 +96,7 @@ internal class EventsWebSocket(
9196
}
9297

9398
suspend fun disconnect(flushEvents: Boolean) = coroutineScope {
94-
disconnectReason = DisconnectReason.UserInitiated
99+
disconnectReason = WebSocketDisconnectReason.UserInitiated
95100
val deferredClosedResponse = async { getClosedResponse() }
96101
when (flushEvents) {
97102
true -> webSocket.close(NORMAL_CLOSE_CODE, "User initiated disconnect")
@@ -101,16 +106,20 @@ internal class EventsWebSocket(
101106
}
102107

103108
override fun onOpen(webSocket: WebSocket, response: Response) {
104-
logger?.debug ("onOpen: sending connection init")
105-
send(WebSocketMessage.Send.ConnectionInit())
109+
logger?.debug("onOpen: sending connection init")
110+
try {
111+
send(WebSocketMessage.Send.ConnectionInit())
112+
} catch (e: Exception) {
113+
logger?.error("onOpen: exception encountered", e) // do nothing. closure will handle error
114+
}
106115
}
107116

108117
override fun onMessage(webSocket: WebSocket, text: String) {
109118
connectionTimeoutTimer.resetTimeoutTimer()
110119
logger?.debug { "Websocket onMessage: $text" }
111120
try {
112121
val eventMessage = json.decodeFromString<WebSocketMessage.Received>(text)
113-
_events.tryEmit(eventMessage)
122+
emitEvent(eventMessage)
114123
} catch (e: Exception) {
115124
logger?.error(e) { "Websocket onMessage: exception encountered" }
116125
}
@@ -128,27 +137,69 @@ internal class EventsWebSocket(
128137
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
129138
// Events api sends normal close code even in failure
130139
// so inspecting code/reason isn't helpful as it should be
131-
logger?.debug {"onClosed: reason = $disconnectReason" }
140+
logger?.debug { "onClosed: reason = $disconnectReason" }
132141
handleClosed()
133142
}
134143

135144
private fun onTimeout() {
136-
disconnectReason = DisconnectReason.Timeout
145+
disconnectReason = WebSocketDisconnectReason.Timeout
137146
webSocket.cancel()
138147
}
139148

140149
private fun handleClosed() {
141150
connectionTimeoutTimer.stop()
142-
_events.tryEmit(
143-
WebSocketMessage.Closed(reason = disconnectReason ?: DisconnectReason.Service())
144-
)
151+
emitEvent(WebSocketMessage.Closed(reason = disconnectReason ?: WebSocketDisconnectReason.Service()))
145152
isClosed = true
146153
}
147154

148-
inline fun <reified T : WebSocketMessage> send(webSocketMessage: T) {
149-
val message = json.encodeToString(webSocketMessage)
150-
logger?.debug { "send: ${webSocketMessage::class.java}" }
151-
webSocket.send(message)
155+
// returns true if websocket queued up event. false if failed
156+
internal suspend inline fun <reified T : WebSocketMessage.Send> sendWithAuthorizer(
157+
webSocketMessage: T,
158+
authorizer: AppSyncAuthorizer
159+
): Boolean {
160+
// The base message will not include id, type, or authorization fields
161+
val baseMessageJson = json.encodeToJsonElement(webSocketMessage).jsonObject
162+
163+
// Create the authorization headers first
164+
val authHeaders = authorizer.getAuthorizationHeaders(object : AppSyncRequest {
165+
override val method = AppSyncRequest.HttpMethod.POST
166+
override val url = eventsEndpoints.restEndpoint.toString()
167+
override val headers = preAuthPublishHeaders
168+
override val body = json.encodeToString(baseMessageJson)
169+
})
170+
171+
// We reconstruct the message, adding in the id, type, and authorization fields
172+
val message = json.encodeToString(
173+
JsonObject(
174+
buildMap {
175+
putAll(baseMessageJson)
176+
put("id", JsonPrimitive(webSocketMessage.id))
177+
put("type", JsonPrimitive(webSocketMessage.type))
178+
put(
179+
"authorization",
180+
JsonObject((preAuthPublishHeaders + authHeaders).mapValues { JsonPrimitive(it.value) })
181+
)
182+
}
183+
)
184+
)
185+
186+
return send(message)
187+
}
188+
189+
// returns true if websocket queued up event. false if failed
190+
inline fun <reified T : WebSocketMessage> send(webSocketMessage: T): Boolean {
191+
return send(json.encodeToString(webSocketMessage))
192+
}
193+
194+
// returns true if websocket queued up event. false if failed
195+
private fun send(eventJson: String): Boolean {
196+
logger?.debug { "send: $eventJson" }
197+
return webSocket.send(eventJson)
198+
}
199+
200+
private fun emitEvent(event: WebSocketMessage) {
201+
logger?.debug { "emit ${event::class.java}" }
202+
_events.tryEmit(event)
152203
}
153204

154205
companion object {
@@ -206,9 +257,3 @@ private class ConnectAppSyncRequest(
206257
override val body: String
207258
get() = "{}"
208259
}
209-
210-
internal sealed class DisconnectReason(val throwable: Throwable?) {
211-
data object UserInitiated : DisconnectReason(null)
212-
data object Timeout : DisconnectReason(EventsException("Connection timed out."))
213-
class Service(throwable: Throwable? = null) : DisconnectReason(throwable)
214-
}

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ internal class EventsWebSocketProvider(
4040
val existingWebSocket: EventsWebSocket?
4141
get() = connectionResultReference.get()?.getOrNull()
4242

43-
4443
suspend fun getConnectedWebSocket(): EventsWebSocket = getConnectedWebSocketResult().getOrThrow()
4544

4645
private suspend fun getConnectedWebSocketResult(): Result<EventsWebSocket> = coroutineScope {

0 commit comments

Comments
 (0)