diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 554cce7ae..0ebca67ac 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -20,6 +20,8 @@ lombok = "8.10"
okhttp = "4.12.0"
test-retry = "1.6.0"
kotlin = "2.1.10"
+coroutine = "1.9.0"
+turbine = "1.2.0"
[libraries]
gson = { group = "com.google.code.gson", name = "gson", version.ref = "gson" }
@@ -41,11 +43,14 @@ dexmaker = { group = "com.crittercism.dexmaker", name = "dexmaker", version.ref
dexmaker-dx = { group = "com.crittercism.dexmaker", name = "dexmaker-dx", version.ref = "dexmaker" }
dexmaker-mockito = { group = "com.crittercism.dexmaker", name = "dexmaker-mockito", version.ref = "dexmaker" }
android-retrostreams = { group = "net.sourceforge.streamsupport", name = "android-retrostreams", version.ref = "android-retrostreams" }
-okhttp = { group ="com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" }
+okhttp = { group = "com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" }
+coroutine-core = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutine" }
+coroutine-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutine" }
+turbine = { group = "app.cash.turbine", name = "turbine", version.ref = "turbine" }
[bundles]
common = ["msgpack", "vcdiff-core"]
-tests = ["junit","hamcrest-all", "nanohttpd", "nanohttpd-nanolets", "nanohttpd-websocket", "mockito-core", "concurrentunit", "slf4j-simple"]
+tests = ["junit", "hamcrest-all", "nanohttpd", "nanohttpd-nanolets", "nanohttpd-websocket", "mockito-core", "concurrentunit", "slf4j-simple"]
instrumental-android = ["android-test-runner", "android-test-rules", "dexmaker", "dexmaker-dx", "dexmaker-mockito", "android-retrostreams"]
[plugins]
diff --git a/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java b/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java
index 598b9337f..285cd856c 100644
--- a/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java
+++ b/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java
@@ -16,10 +16,23 @@ public AsyncHttpScheduler(HttpCore httpCore, ClientOptions options) {
super(httpCore, new CloseableThreadPoolExecutor(options));
}
+ private AsyncHttpScheduler(HttpCore httpCore, CloseableExecutor executor) {
+ super(httpCore, executor);
+ }
+
private static final long KEEP_ALIVE_TIME = 2000L;
protected static final String TAG = AsyncHttpScheduler.class.getName();
+ /**
+ * [Internal Method]
+ *
+ * We use this method to implement proxy Realtime / Rest clients that add additional data to the underlying client.
+ */
+ public AsyncHttpScheduler exchangeHttpCore(HttpCore httpCore) {
+ return new AsyncHttpScheduler(httpCore, this.executor);
+ }
+
private static class CloseableThreadPoolExecutor implements CloseableExecutor {
private final ThreadPoolExecutor executor;
diff --git a/lib/src/main/java/io/ably/lib/http/Http.java b/lib/src/main/java/io/ably/lib/http/Http.java
index cf40b7c0b..708ccf13b 100644
--- a/lib/src/main/java/io/ably/lib/http/Http.java
+++ b/lib/src/main/java/io/ably/lib/http/Http.java
@@ -21,6 +21,15 @@ public void close() throws Exception {
asyncHttp.close();
}
+ /**
+ * [Internal Method]
+ *
+ * We use this method to implement proxy Realtime / Rest clients that add additional data to the underlying client.
+ */
+ public Http exchangeHttpCore(HttpCore httpCore) {
+ return new Http(asyncHttp.exchangeHttpCore(httpCore), new SyncHttpScheduler(httpCore));
+ }
+
public class Request {
private final Execute execute;
diff --git a/lib/src/main/java/io/ably/lib/http/HttpCore.java b/lib/src/main/java/io/ably/lib/http/HttpCore.java
index 470562b26..b2701241b 100644
--- a/lib/src/main/java/io/ably/lib/http/HttpCore.java
+++ b/lib/src/main/java/io/ably/lib/http/HttpCore.java
@@ -68,6 +68,16 @@ public class HttpCore {
private final HttpEngine engine;
private HttpAuth proxyAuth;
+ /**
+ * This field is used for analytics purposes.
+ *
+ * It holds additional agents that should be added after the Realtime/Rest client is initialized.
+ * - **Static agents** are set in `ClientOptions`.
+ * - **Dynamic agents** are added later by higher-level SDKs like Chat or Asset Tracking
+ * and are provided in the `createWrapperSdkProxy` call.
+ */
+ private Map dynamicAgents;
+
/*************************
* Public API
*************************/
@@ -103,6 +113,18 @@ public HttpCore(ClientOptions options, Auth auth, PlatformAgentProvider platform
this.engine = engineFactory.create(new HttpEngineConfig(ClientOptionsUtils.convertToProxyConfig(options)));
}
+ private HttpCore(HttpCore underlyingHttpCore, Map dynamicAgents) {
+ this.options = underlyingHttpCore.options;
+ this.auth = underlyingHttpCore.auth;
+ this.platformAgentProvider = underlyingHttpCore.platformAgentProvider;
+ this.scheme = underlyingHttpCore.scheme;
+ this.port = underlyingHttpCore.port;
+ this.hosts = underlyingHttpCore.hosts;
+ this.proxyAuth = underlyingHttpCore.proxyAuth;
+ this.engine = underlyingHttpCore.engine;
+ this.dynamicAgents = dynamicAgents;
+ }
+
/**
* Make a synchronous HTTP request specified by URL and proxy, retrying if necessary on WWW-Authenticate
*
@@ -307,7 +329,10 @@ private Map collectRequestHeaders(URL url, String method, Param[
/* pass required headers */
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a
- requestHeaders.put(Defaults.ABLY_AGENT_HEADER, AgentHeaderCreator.create(options.agents, platformAgentProvider));
+ Map additionalAgents = new HashMap<>();
+ if (options.agents != null) additionalAgents.putAll(options.agents);
+ if (dynamicAgents != null) additionalAgents.putAll(dynamicAgents);
+ requestHeaders.put(Defaults.ABLY_AGENT_HEADER, AgentHeaderCreator.create(additionalAgents, platformAgentProvider));
if (options.clientId != null)
requestHeaders.put(Defaults.ABLY_CLIENT_ID_HEADER, Base64Coder.encodeString(options.clientId));
@@ -455,6 +480,15 @@ private Response executeRequest(HttpRequest request) {
return response;
}
+ /**
+ * [Internal Method]
+ *
+ * We use this method to implement proxy Realtime / Rest clients that add additional agents to the underlying client.
+ */
+ public HttpCore injectDynamicAgents(Map wrapperSDKAgents) {
+ return new HttpCore(this, wrapperSDKAgents);
+ }
+
/**
* Interface for an entity that supplies an httpCore request body
*/
diff --git a/lib/src/main/java/io/ably/lib/http/HttpScheduler.java b/lib/src/main/java/io/ably/lib/http/HttpScheduler.java
index 343a7f728..1da80c526 100644
--- a/lib/src/main/java/io/ably/lib/http/HttpScheduler.java
+++ b/lib/src/main/java/io/ably/lib/http/HttpScheduler.java
@@ -440,7 +440,7 @@ public Future ablyHttpExecuteWithRetry(
return request;
}
- private final CloseableExecutor executor;
+ protected final CloseableExecutor executor;
private final HttpCore httpCore;
protected static final String TAG = HttpScheduler.class.getName();
diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
index 6805b4614..800d7342e 100644
--- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
+++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
@@ -10,6 +10,7 @@
import java.util.TimerTask;
import io.ably.lib.http.BasePaginatedQuery;
+import io.ably.lib.http.Http;
import io.ably.lib.http.HttpCore;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.transport.ConnectionManager;
@@ -1148,7 +1149,11 @@ else if(!"false".equalsIgnoreCase(param.value)) {
* @throws AblyException
*/
public PaginatedResult history(Param[] params) throws AblyException {
- return historyImpl(params).sync();
+ return historyImpl(ably.http, params).sync();
+ }
+
+ PaginatedResult history(Http http, Param[] params) throws AblyException {
+ return historyImpl(http, params).sync();
}
/**
@@ -1177,10 +1182,14 @@ public PaginatedResult history(Param[] params) throws AblyException {
* @throws AblyException
*/
public void historyAsync(Param[] params, Callback> callback) {
- historyImpl(params).async(callback);
+ historyAsync(ably.http, params, callback);
+ }
+
+ void historyAsync(Http http, Param[] params, Callback> callback) {
+ historyImpl(http, params).async(callback);
}
- private BasePaginatedQuery.ResultRequest historyImpl(Param[] params) {
+ private BasePaginatedQuery.ResultRequest historyImpl(Http http, Param[] params) {
try {
params = replacePlaceholderParams((Channel) this, params);
} catch (AblyException e) {
@@ -1188,7 +1197,7 @@ private BasePaginatedQuery.ResultRequest historyImpl(Param[] params) {
}
HttpCore.BodyHandler bodyHandler = MessageSerializer.getMessageResponseHandler(options);
- return new BasePaginatedQuery(ably.http, basePath + "/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
+ return new BasePaginatedQuery(http, basePath + "/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
}
/************************************
diff --git a/lib/src/main/java/io/ably/lib/realtime/Presence.java b/lib/src/main/java/io/ably/lib/realtime/Presence.java
index 940b1d077..a74d3bf50 100644
--- a/lib/src/main/java/io/ably/lib/realtime/Presence.java
+++ b/lib/src/main/java/io/ably/lib/realtime/Presence.java
@@ -1,6 +1,7 @@
package io.ably.lib.realtime;
import io.ably.lib.http.BasePaginatedQuery;
+import io.ably.lib.http.Http;
import io.ably.lib.http.HttpCore;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.transport.ConnectionManager;
@@ -794,7 +795,11 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
* @throws AblyException
*/
public PaginatedResult history(Param[] params) throws AblyException {
- return historyImpl(params).sync();
+ return history(channel.ably.http, params);
+ }
+
+ PaginatedResult history(Http http, Param[] params) throws AblyException {
+ return historyImpl(http, params).sync();
}
/**
@@ -821,10 +826,14 @@ public PaginatedResult history(Param[] params) throws AblyExcep
* @throws AblyException
*/
public void historyAsync(Param[] params, Callback> callback) {
- historyImpl(params).async(callback);
+ historyImpl(channel.ably.http, params).async(callback);
+ }
+
+ void historyAsync(Http http, Param[] params, Callback> callback) {
+ historyImpl(http, params).async(callback);
}
- private BasePaginatedQuery.ResultRequest historyImpl(Param[] params) {
+ private BasePaginatedQuery.ResultRequest historyImpl(Http http, Param[] params) {
try {
params = Channel.replacePlaceholderParams(channel, params);
} catch (AblyException e) {
@@ -833,7 +842,7 @@ private BasePaginatedQuery.ResultRequest historyImpl(Param[] pa
AblyRealtime ably = channel.ably;
HttpCore.BodyHandler bodyHandler = PresenceSerializer.getPresenceResponseHandler(channel.options);
- return new BasePaginatedQuery(ably.http, channel.basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
+ return new BasePaginatedQuery(http, channel.basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
}
/**
diff --git a/lib/src/main/java/io/ably/lib/rest/AblyBase.java b/lib/src/main/java/io/ably/lib/rest/AblyBase.java
index c0c745a52..8782b9290 100644
--- a/lib/src/main/java/io/ably/lib/rest/AblyBase.java
+++ b/lib/src/main/java/io/ably/lib/rest/AblyBase.java
@@ -179,7 +179,11 @@ public void release(String channelName) {
* @throws AblyException
*/
public long time() throws AblyException {
- return timeImpl().sync().longValue();
+ return time(http);
+ }
+
+ long time(Http http) throws AblyException {
+ return timeImpl(http).sync();
}
/**
@@ -196,10 +200,14 @@ public long time() throws AblyException {
* This callback is invoked on a background thread
*/
public void timeAsync(Callback callback) {
- timeImpl().async(callback);
+ timeAsync(http, callback);
+ }
+
+ void timeAsync(Http http, Callback callback) {
+ timeImpl(http).async(callback);
}
- private Http.Request timeImpl() {
+ private Http.Request timeImpl(Http http) {
final Param[] params = this.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c
return http.request(new Http.Execute() {
@Override
@@ -237,7 +245,11 @@ public Long handleResponse(HttpCore.Response response, ErrorInfo error) throws A
* @throws AblyException
*/
public PaginatedResult stats(Param[] params) throws AblyException {
- return new PaginatedQuery(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get();
+ return stats(http, params);
+ }
+
+ PaginatedResult stats(Http http, Param[] params) throws AblyException {
+ return new PaginatedQuery<>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get();
}
/**
@@ -261,6 +273,10 @@ public PaginatedResult stats(Param[] params) throws AblyException {
* This callback is invoked on a background thread
*/
public void statsAsync(Param[] params, Callback> callback) {
+ statsAsync(http, params, callback);
+ }
+
+ void statsAsync(Http http, Param[] params, Callback> callback) {
(new AsyncPaginatedQuery(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler)).get(callback);
}
@@ -284,6 +300,10 @@ public void statsAsync(Param[] params, Callback> cal
* @throws AblyException if it was not possible to complete the request, or an error response was received
*/
public HttpPaginatedResponse request(String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers) throws AblyException {
+ return request(http, method, path, params, body, headers);
+ }
+
+ HttpPaginatedResponse request(Http http, String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers) throws AblyException {
headers = HttpUtils.mergeHeaders(HttpUtils.defaultAcceptHeaders(false), headers);
return new HttpPaginatedQuery(http, method, path, headers, params, body).exec();
}
@@ -311,6 +331,10 @@ public HttpPaginatedResponse request(String method, String path, Param[] params,
* This callback is invoked on a background thread
*/
public void requestAsync(String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers, final AsyncHttpPaginatedResponse.Callback callback) {
+ requestAsync(http, method, path, params, body, headers, callback);
+ }
+
+ void requestAsync(Http http, String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers, final AsyncHttpPaginatedResponse.Callback callback) {
headers = HttpUtils.mergeHeaders(HttpUtils.defaultAcceptHeaders(false), headers);
(new AsyncHttpPaginatedQuery(http, method, path, headers, params, body)).exec(callback);
}
diff --git a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java
index 4ce2591ac..a4c81a34d 100644
--- a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java
+++ b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java
@@ -48,7 +48,11 @@ public class ChannelBase {
* @throws AblyException
*/
public void publish(String name, Object data) throws AblyException {
- publishImpl(name, data).sync();
+ publish(ably.http, name, data);
+ }
+
+ void publish(Http http, String name, Object data) throws AblyException {
+ publishImpl(http, name, data).sync();
}
/**
@@ -63,11 +67,15 @@ public void publish(String name, Object data) throws AblyException {
* This listener is invoked on a background thread.
*/
public void publishAsync(String name, Object data, CompletionListener listener) {
- publishImpl(name, data).async(new CompletionListener.ToCallback(listener));
+ publishAsync(ably.http, name, data, listener);
}
- private Http.Request publishImpl(String name, Object data) {
- return publishImpl(new Message[] {new Message(name, data)});
+ void publishAsync(Http http, String name, Object data, CompletionListener listener) {
+ publishImpl(http, name, data).async(new CompletionListener.ToCallback(listener));
+ }
+
+ private Http.Request publishImpl(Http http, String name, Object data) {
+ return publishImpl(http, new Message[] {new Message(name, data)});
}
/**
@@ -79,7 +87,11 @@ private Http.Request publishImpl(String name, Object data) {
* @throws AblyException
*/
public void publish(final Message[] messages) throws AblyException {
- publishImpl(messages).sync();
+ publish(ably.http, messages);
+ }
+
+ void publish(Http http, final Message[] messages) throws AblyException {
+ publishImpl(http, messages).sync();
}
/**
@@ -91,11 +103,15 @@ public void publish(final Message[] messages) throws AblyException {
* This listener is invoked on a background thread.
*/
public void publishAsync(final Message[] messages, final CompletionListener listener) {
- publishImpl(messages).async(new CompletionListener.ToCallback(listener));
+ publishAsync(ably.http, messages, listener);
+ }
+
+ void publishAsync(Http http, final Message[] messages, final CompletionListener listener) {
+ publishImpl(http, messages).async(new CompletionListener.ToCallback(listener));
}
- private Http.Request publishImpl(final Message[] messages) {
- return ably.http.request(new Http.Execute() {
+ private Http.Request publishImpl(Http http, final Message[] messages) {
+ return http.request(new Http.Execute() {
@Override
public void execute(HttpScheduler http, final Callback callback) throws AblyException {
/* handle message ids */
@@ -133,7 +149,11 @@ public void execute(HttpScheduler http, final Callback callback) throws Ab
* @throws AblyException
*/
public PaginatedResult history(Param[] params) throws AblyException {
- return historyImpl(params).sync();
+ return history(ably.http, params);
+ }
+
+ PaginatedResult history(Http http, Param[] params) throws AblyException {
+ return historyImpl(http, params).sync();
}
/**
@@ -143,13 +163,17 @@ public PaginatedResult history(Param[] params) throws AblyException {
* @return
*/
public void historyAsync(Param[] params, Callback> callback) {
- historyImpl(params).async(callback);
+ historyAsync(ably.http, params, callback);
}
- private BasePaginatedQuery.ResultRequest historyImpl(Param[] initialParams) {
+ void historyAsync(Http http, Param[] params, Callback> callback) {
+ historyImpl(http, params).async(callback);
+ }
+
+ private BasePaginatedQuery.ResultRequest historyImpl(Http http, Param[] initialParams) {
HttpCore.BodyHandler bodyHandler = MessageSerializer.getMessageResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
- return (new BasePaginatedQuery(ably.http, basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
+ return (new BasePaginatedQuery(http, basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}
/**
@@ -174,7 +198,11 @@ public class Presence {
* @throws AblyException
*/
public PaginatedResult get(Param[] params) throws AblyException {
- return getImpl(params).sync();
+ return get(ably.http, params);
+ }
+
+ PaginatedResult get(Http http, Param[] params) throws AblyException {
+ return getImpl(http, params).sync();
}
/**
@@ -195,13 +223,17 @@ public PaginatedResult get(Param[] params) throws AblyException
* This callback is invoked on a background thread.
*/
public void getAsync(Param[] params, Callback> callback) {
- getImpl(params).async(callback);
+ getAsync(ably.http, params, callback);
+ }
+
+ void getAsync(Http http, Param[] params, Callback> callback) {
+ getImpl(http, params).async(callback);
}
- private BasePaginatedQuery.ResultRequest getImpl(Param[] initialParams) {
+ private BasePaginatedQuery.ResultRequest getImpl(Http http, Param[] initialParams) {
HttpCore.BodyHandler bodyHandler = PresenceSerializer.getPresenceResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
- return (new BasePaginatedQuery(ably.http, basePath + "/presence", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
+ return (new BasePaginatedQuery(http, basePath + "/presence", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}
/**
@@ -226,7 +258,11 @@ private BasePaginatedQuery.ResultRequest getImpl(Param[] initia
* @throws AblyException
*/
public PaginatedResult history(Param[] params) throws AblyException {
- return historyImpl(params).sync();
+ return history(ably.http, params);
+ }
+
+ PaginatedResult history(Http http, Param[] params) throws AblyException {
+ return historyImpl(http, params).sync();
}
/**
@@ -253,13 +289,17 @@ public PaginatedResult history(Param[] params) throws AblyExcep
* @throws AblyException
*/
public void historyAsync(Param[] params, Callback> callback) {
- historyImpl(params).async(callback);
+ historyAsync(ably.http, params, callback);
+ }
+
+ void historyAsync(Http http, Param[] params, Callback> callback) {
+ historyImpl(http, params).async(callback);
}
- private BasePaginatedQuery.ResultRequest historyImpl(Param[] initialParams) {
+ private BasePaginatedQuery.ResultRequest historyImpl(Http http, Param[] initialParams) {
HttpCore.BodyHandler bodyHandler = PresenceSerializer.getPresenceResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
- return (new BasePaginatedQuery(ably.http, basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
+ return (new BasePaginatedQuery(http, basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}
}
diff --git a/lib/src/main/java/io/ably/lib/util/AgentHeaderCreator.java b/lib/src/main/java/io/ably/lib/util/AgentHeaderCreator.java
index be13caef9..8999e94f5 100644
--- a/lib/src/main/java/io/ably/lib/util/AgentHeaderCreator.java
+++ b/lib/src/main/java/io/ably/lib/util/AgentHeaderCreator.java
@@ -27,6 +27,7 @@ public static String create(Map additionalAgents, PlatformAgentP
agentStringBuilder.append(AGENT_ENTRY_SEPARATOR);
agentStringBuilder.append(platformAgent);
}
+
return agentStringBuilder.toString();
}
diff --git a/pubsub-adapter/build.gradle.kts b/pubsub-adapter/build.gradle.kts
index a0959e9db..e4ed66412 100644
--- a/pubsub-adapter/build.gradle.kts
+++ b/pubsub-adapter/build.gradle.kts
@@ -6,5 +6,18 @@ plugins {
dependencies {
compileOnly(project(":java"))
+ testImplementation(kotlin("test"))
testImplementation(project(":java"))
+ testImplementation(libs.nanohttpd)
+ testImplementation(libs.coroutine.core)
+ testImplementation(libs.coroutine.test)
+ testImplementation(libs.turbine)
+}
+
+tasks.withType {
+ useJUnitPlatform()
+}
+
+tasks.register("runUnitTests") {
+ beforeTest(closureOf { logger.lifecycle("-> $this") })
}
diff --git a/pubsub-adapter/src/main/kotlin/com/ably/pubsub/WrapperSdkProxy.kt b/pubsub-adapter/src/main/kotlin/com/ably/pubsub/WrapperSdkProxy.kt
new file mode 100644
index 000000000..d39b3b09b
--- /dev/null
+++ b/pubsub-adapter/src/main/kotlin/com/ably/pubsub/WrapperSdkProxy.kt
@@ -0,0 +1,20 @@
+package com.ably.pubsub
+
+data class WrapperSdkProxyOptions(val agents: Map)
+
+interface SdkWrapperCompatible {
+
+ /**
+ * Creates a proxy client to be used to supply analytics information for Ably-authored SDKs.
+ * The proxy client shares the state of the `RealtimeClient` or `RestClient` instance on which this method is called.
+ * This method should only be called by Ably-authored SDKs.
+ */
+ fun createWrapperSdkProxy(options: WrapperSdkProxyOptions): T
+}
+
+fun RealtimeClient.createWrapperSdkProxy(options: WrapperSdkProxyOptions): RealtimeClient =
+ (this as SdkWrapperCompatible<*>).createWrapperSdkProxy(options) as RealtimeClient
+
+fun RestClient.createWrapperSdkProxy(options: WrapperSdkProxyOptions): RestClient =
+ (this as SdkWrapperCompatible<*>).createWrapperSdkProxy(options) as RestClient
+
diff --git a/pubsub-adapter/src/main/kotlin/io/ably/lib/realtime/RealtimeClientAdapter.kt b/pubsub-adapter/src/main/kotlin/io/ably/lib/realtime/RealtimeClientAdapter.kt
index a4e69dc48..aef5e4e52 100644
--- a/pubsub-adapter/src/main/kotlin/io/ably/lib/realtime/RealtimeClientAdapter.kt
+++ b/pubsub-adapter/src/main/kotlin/io/ably/lib/realtime/RealtimeClientAdapter.kt
@@ -1,9 +1,7 @@
package io.ably.lib.realtime
import com.ably.http.HttpMethod
-import com.ably.pubsub.Channels
-import com.ably.pubsub.RealtimeChannel
-import com.ably.pubsub.RealtimeClient
+import com.ably.pubsub.*
import com.ably.query.OrderBy
import com.ably.query.TimeUnit
import io.ably.lib.buildStatsParams
@@ -17,7 +15,7 @@ import io.ably.lib.types.*
*/
fun RealtimeClient(javaClient: AblyRealtime): RealtimeClient = RealtimeClientAdapter(javaClient)
-internal class RealtimeClientAdapter(private val javaClient: AblyRealtime) : RealtimeClient {
+internal class RealtimeClientAdapter(private val javaClient: AblyRealtime) : RealtimeClient, SdkWrapperCompatible {
override val channels: Channels
get() = RealtimeChannelsAdapter(javaClient.channels)
override val connection: Connection
@@ -68,4 +66,10 @@ internal class RealtimeClientAdapter(private val javaClient: AblyRealtime) : Rea
) = javaClient.requestAsync(method.toString(), path, params.toTypedArray(), body, headers.toTypedArray(), callback)
override fun close() = javaClient.close()
+
+ override fun createWrapperSdkProxy(options: WrapperSdkProxyOptions): RealtimeClient {
+ val httpCoreWithAgents = javaClient.httpCore.injectDynamicAgents(options.agents)
+ val httpModule = javaClient.http.exchangeHttpCore(httpCoreWithAgents)
+ return WrapperRealtimeClient(javaClient, this, httpModule, options.agents)
+ }
}
diff --git a/pubsub-adapter/src/main/kotlin/io/ably/lib/realtime/WrapperRealtimeClient.kt b/pubsub-adapter/src/main/kotlin/io/ably/lib/realtime/WrapperRealtimeClient.kt
new file mode 100644
index 000000000..efe51d90a
--- /dev/null
+++ b/pubsub-adapter/src/main/kotlin/io/ably/lib/realtime/WrapperRealtimeClient.kt
@@ -0,0 +1,141 @@
+package io.ably.lib.realtime
+
+import com.ably.http.HttpMethod
+import com.ably.pubsub.*
+import com.ably.query.OrderBy
+import com.ably.query.TimeUnit
+import io.ably.lib.buildHistoryParams
+import io.ably.lib.buildStatsParams
+import io.ably.lib.http.Http
+import io.ably.lib.http.HttpCore
+import io.ably.lib.rest.*
+import io.ably.lib.types.*
+
+internal class WrapperRealtimeClient(
+ private val javaClient: AblyRealtime,
+ private val adapter: RealtimeClientAdapter,
+ private val httpModule: Http,
+ private val agents: Map,
+) : SdkWrapperCompatible, RealtimeClient by adapter {
+
+ override val channels: Channels
+ get() = WrapperRealtimeChannels(javaClient.channels, httpModule, agents)
+
+ override fun createWrapperSdkProxy(options: WrapperSdkProxyOptions): RealtimeClient =
+ adapter.createWrapperSdkProxy(options.copy(agents = options.agents + agents))
+
+ override fun time(): Long = javaClient.time(httpModule)
+
+ override fun timeAsync(callback: Callback) = javaClient.timeAsync(httpModule, callback)
+
+ override fun stats(
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ unit: TimeUnit
+ ): PaginatedResult =
+ javaClient.stats(httpModule, buildStatsParams(start, end, limit, orderBy, unit).toTypedArray())
+
+ override fun statsAsync(
+ callback: Callback>,
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ unit: TimeUnit
+ ) = javaClient.statsAsync(httpModule, buildStatsParams(start, end, limit, orderBy, unit).toTypedArray(), callback)
+
+ override fun request(
+ path: String,
+ method: HttpMethod,
+ params: List,
+ body: HttpCore.RequestBody?,
+ headers: List,
+ ) = javaClient.request(httpModule, method.toString(), path, params.toTypedArray(), body, headers.toTypedArray())!!
+
+ override fun requestAsync(
+ path: String,
+ callback: AsyncHttpPaginatedResponse.Callback,
+ method: HttpMethod,
+ params: List,
+ body: HttpCore.RequestBody?,
+ headers: List,
+ ) = javaClient.requestAsync(
+ httpModule,
+ method.toString(),
+ path,
+ params.toTypedArray(),
+ body,
+ headers.toTypedArray(),
+ callback
+ )
+}
+
+internal class WrapperRealtimeChannels(
+ private val javaChannels: AblyRealtime.Channels,
+ private val httpModule: Http,
+ private val agents: Map,
+) :
+ Channels by RealtimeChannelsAdapter(javaChannels) {
+
+ override fun get(name: String): RealtimeChannel {
+ if (javaChannels.containsKey(name)) return WrapperRealtimeChannel(javaChannels.get(name), httpModule)
+ return try {
+ WrapperRealtimeChannel(javaChannels.get(name, ChannelOptions().injectAgents(agents)), httpModule)
+ } catch (e: AblyException) {
+ WrapperRealtimeChannel(javaChannels.get(name), httpModule)
+ }
+
+ }
+
+ override fun get(name: String, options: ChannelOptions): RealtimeChannel =
+ WrapperRealtimeChannel(javaChannels.get(name, options.injectAgents(agents)), httpModule)
+}
+
+internal class WrapperRealtimeChannel(private val javaChannel: Channel, private val httpModule: Http) :
+ RealtimeChannel by RealtimeChannelAdapter(javaChannel) {
+
+ override val presence: RealtimePresence
+ get() = WrapperRealtimePresence(javaChannel.presence, httpModule)
+
+ override fun history(start: Long?, end: Long?, limit: Int, orderBy: OrderBy): PaginatedResult =
+ javaChannel.history(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray())
+
+ override fun historyAsync(
+ callback: Callback>,
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ ) =
+ javaChannel.historyAsync(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray(), callback)
+}
+
+internal class WrapperRealtimePresence(private val javaPresence: Presence, private val httpModule: Http) :
+ RealtimePresence by RealtimePresenceAdapter(javaPresence) {
+
+ override fun history(start: Long?, end: Long?, limit: Int, orderBy: OrderBy): PaginatedResult =
+ javaPresence.history(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray())
+
+ override fun historyAsync(
+ callback: Callback>,
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ ) =
+ javaPresence.historyAsync(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray(), callback)
+}
+
+private fun ChannelOptions.injectAgents(agents: Map): ChannelOptions {
+ val options = ChannelOptions()
+ options.params = (this.params ?: mapOf()) + mapOf(
+ "agent" to agents.map { "${it.key}/${it.value}" }.joinToString(" "),
+ )
+ options.modes = modes
+ options.cipherParams = cipherParams
+ options.attachOnSubscribe = attachOnSubscribe
+ options.encrypted = encrypted
+ return options
+}
diff --git a/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/RestClientAdapter.kt b/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/RestClientAdapter.kt
index c7a163554..b45efc31e 100644
--- a/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/RestClientAdapter.kt
+++ b/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/RestClientAdapter.kt
@@ -1,9 +1,7 @@
package io.ably.lib.rest
import com.ably.http.HttpMethod
-import com.ably.pubsub.Channels
-import com.ably.pubsub.RestChannel
-import com.ably.pubsub.RestClient
+import com.ably.pubsub.*
import com.ably.query.OrderBy
import com.ably.query.TimeUnit
import io.ably.lib.buildStatsParams
@@ -16,7 +14,7 @@ import io.ably.lib.types.*
*/
fun RestClient(javaClient: AblyRest): RestClient = RestClientAdapter(javaClient)
-internal class RestClientAdapter(private val javaClient: AblyRest) : RestClient {
+internal class RestClientAdapter(private val javaClient: AblyRest) : RestClient, SdkWrapperCompatible {
override val channels: Channels
get() = RestChannelsAdapter(javaClient.channels)
override val auth: Auth
@@ -65,4 +63,10 @@ internal class RestClientAdapter(private val javaClient: AblyRest) : RestClient
) = javaClient.requestAsync(method.toString(), path, params.toTypedArray(), body, headers.toTypedArray(), callback)
override fun close() = javaClient.close()
+
+ override fun createWrapperSdkProxy(options: WrapperSdkProxyOptions): RestClient {
+ val httpCoreWithAgents = javaClient.httpCore.injectDynamicAgents(options.agents)
+ val httpModule = javaClient.http.exchangeHttpCore(httpCoreWithAgents)
+ return WrapperRestClient(javaClient, this, httpModule, options.agents)
+ }
}
diff --git a/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/RestClientUtils.kt b/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/RestClientUtils.kt
new file mode 100644
index 000000000..49d1c52e1
--- /dev/null
+++ b/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/RestClientUtils.kt
@@ -0,0 +1,28 @@
+package io.ably.lib.rest
+
+import io.ably.lib.http.Http
+import io.ably.lib.http.HttpCore
+import io.ably.lib.types.*
+
+fun AblyBase.time(http: Http): Long = time(http)
+fun AblyBase.timeAsync(http: Http, callback: Callback): Unit = timeAsync(http, callback)
+fun AblyBase.stats(http: Http, params: Array): PaginatedResult = stats(http, params)
+fun AblyBase.statsAsync(http: Http, params: Array, callback: Callback>): Unit =
+ this.statsAsync(http, params, callback)
+fun AblyBase.request(
+ http: Http,
+ method: String,
+ path: String,
+ params: Array?,
+ body: HttpCore.RequestBody?,
+ headers: Array?
+): HttpPaginatedResponse = this.request(http, method, path, params, body, headers)
+fun AblyBase.requestAsync(
+ http: Http,
+ method: String?,
+ path: String?,
+ params: Array?,
+ body: HttpCore.RequestBody?,
+ headers: Array?,
+ callback: AsyncHttpPaginatedResponse.Callback?
+): Unit = this.requestAsync(http, method, path, params, body, headers, callback)
diff --git a/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/WrapperRestClient.kt b/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/WrapperRestClient.kt
new file mode 100644
index 000000000..6cf3c2dc0
--- /dev/null
+++ b/pubsub-adapter/src/main/kotlin/io/ably/lib/rest/WrapperRestClient.kt
@@ -0,0 +1,138 @@
+package io.ably.lib.rest
+
+import com.ably.http.HttpMethod
+import com.ably.pubsub.*
+import com.ably.query.OrderBy
+import com.ably.query.TimeUnit
+import io.ably.lib.buildHistoryParams
+import io.ably.lib.buildRestPresenceParams
+import io.ably.lib.buildStatsParams
+import io.ably.lib.http.Http
+import io.ably.lib.http.HttpCore
+import io.ably.lib.realtime.CompletionListener
+import io.ably.lib.rest.ChannelBase.Presence
+import io.ably.lib.types.*
+
+internal class WrapperRestClient(
+ private val javaClient: AblyRest,
+ private val adapter: RestClientAdapter,
+ private val httpModule: Http,
+ private val agents: Map,
+) : SdkWrapperCompatible, RestClient by adapter {
+ override val channels: Channels
+ get() = WrapperRestChannels(javaClient.channels, httpModule)
+
+ override fun createWrapperSdkProxy(options: WrapperSdkProxyOptions): RestClient =
+ adapter.createWrapperSdkProxy(options.copy(agents = options.agents + agents))
+
+ override fun time(): Long = javaClient.time(httpModule)
+
+ override fun timeAsync(callback: Callback) = javaClient.timeAsync(httpModule, callback)
+
+ override fun stats(
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ unit: TimeUnit
+ ): PaginatedResult =
+ javaClient.stats(httpModule, buildStatsParams(start, end, limit, orderBy, unit).toTypedArray())
+
+ override fun statsAsync(
+ callback: Callback>,
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ unit: TimeUnit
+ ) = javaClient.statsAsync(httpModule, buildStatsParams(start, end, limit, orderBy, unit).toTypedArray(), callback)
+
+ override fun request(
+ path: String,
+ method: HttpMethod,
+ params: List,
+ body: HttpCore.RequestBody?,
+ headers: List,
+ ) = javaClient.request(httpModule, method.toString(), path, params.toTypedArray(), body, headers.toTypedArray())!!
+
+ override fun requestAsync(
+ path: String,
+ callback: AsyncHttpPaginatedResponse.Callback,
+ method: HttpMethod,
+ params: List,
+ body: HttpCore.RequestBody?,
+ headers: List,
+ ) = javaClient.requestAsync(
+ httpModule,
+ method.toString(),
+ path,
+ params.toTypedArray(),
+ body,
+ headers.toTypedArray(),
+ callback
+ )
+
+}
+
+internal class WrapperRestChannels(private val javaChannels: AblyBase.Channels, private val httpModule: Http) :
+ Channels by RestChannelsAdapter(javaChannels) {
+ override fun get(name: String): RestChannel = WrapperRestChannel(javaChannels.get(name), httpModule)
+
+ override fun get(name: String, options: ChannelOptions): RestChannel =
+ WrapperRestChannel(javaChannels.get(name, options), httpModule)
+}
+
+internal class WrapperRestChannel(private val javaChannel: Channel, private val httpModule: Http) :
+ RestChannel by RestChannelAdapter(javaChannel) {
+
+ override val presence: RestPresence
+ get() = WrapperRestPresence(javaChannel.presence, httpModule)
+
+ override fun publish(name: String?, data: Any?) = javaChannel.publish(httpModule, name, data)
+
+ override fun publish(messages: List) = javaChannel.publish(httpModule, messages.toTypedArray())
+
+ override fun publishAsync(name: String?, data: Any?, listener: CompletionListener) =
+ javaChannel.publishAsync(httpModule, name, data, listener)
+
+ override fun publishAsync(messages: List, listener: CompletionListener) =
+ javaChannel.publishAsync(httpModule, messages.toTypedArray(), listener)
+
+ override fun history(start: Long?, end: Long?, limit: Int, orderBy: OrderBy): PaginatedResult =
+ javaChannel.history(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray())
+
+ override fun historyAsync(
+ callback: Callback>,
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ ) =
+ javaChannel.historyAsync(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray(), callback)
+}
+
+internal class WrapperRestPresence(private val javaPresence: Presence, private val httpModule: Http) :
+ RestPresence by RestPresenceAdapter(javaPresence) {
+ override fun get(limit: Int, clientId: String?, connectionId: String?): PaginatedResult =
+ javaPresence.get(buildRestPresenceParams(limit, clientId, connectionId).toTypedArray())
+
+ override fun getAsync(
+ callback: Callback>,
+ limit: Int,
+ clientId: String?,
+ connectionId: String?
+ ) =
+ javaPresence.getAsync(buildRestPresenceParams(limit, clientId, connectionId).toTypedArray(), callback)
+
+ override fun history(start: Long?, end: Long?, limit: Int, orderBy: OrderBy): PaginatedResult =
+ javaPresence.history(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray())
+
+ override fun historyAsync(
+ callback: Callback>,
+ start: Long?,
+ end: Long?,
+ limit: Int,
+ orderBy: OrderBy,
+ ) =
+ javaPresence.historyAsync(httpModule, buildHistoryParams(start, end, limit, orderBy).toTypedArray(), callback)
+}
diff --git a/pubsub-adapter/src/test/kotlin/com/ably/EmbeddedServer.kt b/pubsub-adapter/src/test/kotlin/com/ably/EmbeddedServer.kt
new file mode 100644
index 000000000..94033ace0
--- /dev/null
+++ b/pubsub-adapter/src/test/kotlin/com/ably/EmbeddedServer.kt
@@ -0,0 +1,54 @@
+package com.ably
+
+import fi.iki.elonen.NanoHTTPD
+import kotlinx.coroutines.channels.BufferOverflow
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableSharedFlow
+import java.io.ByteArrayInputStream
+
+data class Request(
+ val path: String,
+ val params: Map = emptyMap(),
+ val headers: Map = emptyMap(),
+)
+
+data class Response(
+ val mimeType: String,
+ val data: ByteArray,
+)
+
+fun json(json: String): Response = Response(
+ mimeType = "application/json",
+ data = json.toByteArray(),
+)
+
+fun interface RequestHandler {
+ fun handle(request: Request): Response
+}
+
+class EmbeddedServer(port: Int, private val requestHandler: RequestHandler? = null) : NanoHTTPD(port) {
+ private val _servedRequests = MutableSharedFlow(
+ extraBufferCapacity = 1,
+ onBufferOverflow = BufferOverflow.DROP_OLDEST,
+ )
+
+ val servedRequests: Flow = _servedRequests
+
+ override fun serve(session: IHTTPSession): Response {
+ val request = Request(
+ path = session.uri,
+ params = session.parms,
+ headers = session.headers,
+ )
+ _servedRequests.tryEmit(request)
+ val response = requestHandler?.handle(request)
+ return response?.toNanoHttp() ?: newFixedLengthResponse("404")
+ }
+}
+
+private fun Response.toNanoHttp(): NanoHTTPD.Response = NanoHTTPD.newFixedLengthResponse(
+ NanoHTTPD.Response.Status.OK,
+ mimeType,
+ ByteArrayInputStream(data),
+ data.size.toLong(),
+)
diff --git a/pubsub-adapter/src/test/kotlin/com/ably/Utils.kt b/pubsub-adapter/src/test/kotlin/com/ably/Utils.kt
new file mode 100644
index 000000000..46dc1e384
--- /dev/null
+++ b/pubsub-adapter/src/test/kotlin/com/ably/Utils.kt
@@ -0,0 +1,17 @@
+package com.ably
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
+
+suspend fun waitFor(timeoutInMs: Long = 10_000, block: suspend () -> Boolean) {
+ withContext(Dispatchers.Default) {
+ withTimeout(timeoutInMs) {
+ do {
+ val success = block()
+ delay(100)
+ } while (!success)
+ }
+ }
+}
diff --git a/pubsub-adapter/src/test/kotlin/com/ably/pubsub/SdkWrapperAgentChannelParamTest.kt b/pubsub-adapter/src/test/kotlin/com/ably/pubsub/SdkWrapperAgentChannelParamTest.kt
new file mode 100644
index 000000000..dbf4ca9c6
--- /dev/null
+++ b/pubsub-adapter/src/test/kotlin/com/ably/pubsub/SdkWrapperAgentChannelParamTest.kt
@@ -0,0 +1,89 @@
+package com.ably.pubsub
+
+import io.ably.lib.realtime.AblyRealtime
+import io.ably.lib.realtime.RealtimeClient
+import io.ably.lib.realtime.RealtimeClientAdapter
+import io.ably.lib.realtime.channelOptions
+import io.ably.lib.types.ChannelMode
+import io.ably.lib.types.ChannelOptions
+import io.ably.lib.types.ClientOptions
+import kotlinx.coroutines.test.runTest
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertNull
+import kotlin.test.assertTrue
+
+class SdkWrapperAgentChannelParamTest {
+
+ @Test
+ fun `should add agent information to Realtime channels params`() = runTest {
+ val javaRealtimeClient = createAblyRealtime()
+ val realtimeClient = RealtimeClientAdapter(javaRealtimeClient)
+ val wrapperSdkClient =
+ realtimeClient.createWrapperSdkProxy(WrapperSdkProxyOptions(agents = mapOf("chat-android" to "0.1.0")))
+
+ // create channel from sdk proxy wrapper
+ wrapperSdkClient.channels.get("chat-channel")
+
+ // create channel without sdk proxy wrapper
+ realtimeClient.channels.get("regular-channel")
+
+ assertEquals(
+ "chat-android/0.1.0",
+ javaRealtimeClient.channels.get("chat-channel").channelOptions?.params?.get("agent")
+ )
+
+ assertNull(
+ javaRealtimeClient.channels.get("regular-channel").channelOptions?.params?.get("agent")
+ )
+ }
+
+ @Test
+ fun `should add agent information to Realtime channels params when channel created with custom options`() = runTest {
+ val javaRealtimeClient = createAblyRealtime()
+ val realtimeClient = RealtimeClient(javaRealtimeClient)
+ val wrapperSdkClient =
+ realtimeClient.createWrapperSdkProxy(WrapperSdkProxyOptions(agents = mapOf("chat-android" to "0.1.0")))
+
+ // create channel from sdk proxy wrapper
+ wrapperSdkClient.channels.get("chat-channel", ChannelOptions().apply {
+ params = mapOf("foo" to "bar")
+ modes = arrayOf(ChannelMode.presence)
+ })
+
+ // create channel without sdk proxy wrapper
+ realtimeClient.channels.get("regular-channel", ChannelOptions().apply {
+ encrypted = true
+ })
+
+ assertEquals(
+ "chat-android/0.1.0",
+ javaRealtimeClient.channels.get("chat-channel").channelOptions?.params?.get("agent")
+ )
+
+ assertEquals(
+ "bar",
+ javaRealtimeClient.channels.get("chat-channel").channelOptions?.params?.get("foo")
+ )
+
+ assertEquals(
+ ChannelMode.presence,
+ javaRealtimeClient.channels.get("chat-channel").channelOptions?.modes?.get(0)
+ )
+
+ assertNull(
+ javaRealtimeClient.channels.get("regular-channel").channelOptions?.params?.get("agent")
+ )
+
+ assertTrue(
+ javaRealtimeClient.channels.get("regular-channel").channelOptions?.encrypted ?: false
+ )
+ }
+}
+
+private fun createAblyRealtime(): AblyRealtime {
+ val options = ClientOptions("xxxxx:yyyyyyy").apply {
+ autoConnect = false
+ }
+ return AblyRealtime(options)
+}
diff --git a/pubsub-adapter/src/test/kotlin/com/ably/pubsub/SdkWrapperAgentHeaderTest.kt b/pubsub-adapter/src/test/kotlin/com/ably/pubsub/SdkWrapperAgentHeaderTest.kt
new file mode 100644
index 000000000..4ab7dda2c
--- /dev/null
+++ b/pubsub-adapter/src/test/kotlin/com/ably/pubsub/SdkWrapperAgentHeaderTest.kt
@@ -0,0 +1,201 @@
+package com.ably.pubsub
+
+import app.cash.turbine.test
+import com.ably.EmbeddedServer
+import com.ably.json
+import com.ably.pubsub.SdkWrapperAgentHeaderTest.Companion.PORT
+import com.ably.waitFor
+import fi.iki.elonen.NanoHTTPD
+import io.ably.lib.BuildConfig
+import io.ably.lib.realtime.AblyRealtime
+import io.ably.lib.realtime.RealtimeClient
+import io.ably.lib.rest.AblyRest
+import io.ably.lib.rest.RestClient
+import io.ably.lib.types.ClientOptions
+import kotlinx.coroutines.test.runTest
+import org.junit.jupiter.api.AfterAll
+import org.junit.jupiter.api.BeforeAll
+import kotlin.test.Test
+import kotlin.test.assertEquals
+
+class SdkWrapperAgentHeaderTest {
+
+ @Test
+ fun `should use additional agents in Realtime wrapper SDK client calls`() = runTest {
+ val realtimeClient = createRealtimeClient()
+
+ val wrapperSdkClient =
+ realtimeClient.createWrapperSdkProxy(WrapperSdkProxyOptions(agents = mapOf("chat-android" to "0.1.0")))
+
+ server.servedRequests.test {
+ wrapperSdkClient.time()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ realtimeClient.time()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ wrapperSdkClient.request("/time")
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+ }
+
+ @Test
+ fun `should use additional agents in Rest wrapper SDK client calls`() = runTest {
+ val restClient = createRealtimeClient()
+
+ val wrapperSdkClient =
+ restClient.createWrapperSdkProxy(WrapperSdkProxyOptions(agents = mapOf("chat-android" to "0.1.0")))
+
+ server.servedRequests.test {
+ wrapperSdkClient.time()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ restClient.time()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ wrapperSdkClient.request("/time")
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+ }
+
+ @Test
+ fun `should use additional agents in Rest wrapper SDK channel calls`() = runTest {
+ val restClient = createRestClient()
+
+ val wrapperSdkClient =
+ restClient.createWrapperSdkProxy(WrapperSdkProxyOptions(agents = mapOf("chat-android" to "0.1.0")))
+
+ server.servedRequests.test {
+ wrapperSdkClient.channels.get("test").history()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ restClient.channels.get("test").history()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ wrapperSdkClient.channels.get("test").presence.history()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+ }
+
+ @Test
+ fun `should use additional agents in Realtime wrapper SDK channel calls`() = runTest {
+ val realtimeClient = createRealtimeClient()
+
+ val wrapperSdkClient =
+ realtimeClient.createWrapperSdkProxy(WrapperSdkProxyOptions(agents = mapOf("chat-android" to "0.1.0")))
+
+ server.servedRequests.test {
+ wrapperSdkClient.channels.get("test").history()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ realtimeClient.channels.get("test").history()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+
+ server.servedRequests.test {
+ wrapperSdkClient.channels.get("test").presence.history()
+ assertEquals(
+ setOf("ably-java/${BuildConfig.VERSION}", "jre/${System.getProperty("java.version")}", "chat-android/0.1.0"),
+ awaitItem().headers["ably-agent"]?.split(" ")?.toSet(),
+ )
+ }
+ }
+
+ companion object {
+
+ const val PORT = 27332
+ lateinit var server: EmbeddedServer
+
+ @JvmStatic
+ @BeforeAll
+ fun setUp() = runTest {
+ server = EmbeddedServer(PORT) {
+ when (it.path) {
+ "/time" -> json("[1739551931167]")
+ else -> json("[]")
+ }
+ }
+ server.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true)
+ waitFor { server.wasStarted() }
+ }
+
+ @JvmStatic
+ @AfterAll
+ fun tearDown() {
+ server.stop()
+ }
+ }
+}
+
+private fun createRealtimeClient(): RealtimeClient {
+ val options = ClientOptions("xxxxx:yyyyyyy").apply {
+ port = PORT
+ useBinaryProtocol = false
+ realtimeHost = "localhost"
+ restHost = "localhost"
+ tls = false
+ autoConnect = false
+ }
+
+ return RealtimeClient(AblyRealtime(options))
+}
+
+private fun createRestClient(): RestClient {
+ val options = ClientOptions("xxxxx:yyyyyyy").apply {
+ port = PORT
+ useBinaryProtocol = false
+ realtimeHost = "localhost"
+ restHost = "localhost"
+ tls = false
+ autoConnect = false
+ }
+
+ return RestClient(AblyRest(options))
+}
diff --git a/pubsub-adapter/src/test/kotlin/io/ably/lib/realtime/ChannelUtils.kt b/pubsub-adapter/src/test/kotlin/io/ably/lib/realtime/ChannelUtils.kt
new file mode 100644
index 000000000..a99e5a67f
--- /dev/null
+++ b/pubsub-adapter/src/test/kotlin/io/ably/lib/realtime/ChannelUtils.kt
@@ -0,0 +1,6 @@
+package io.ably.lib.realtime
+
+import io.ably.lib.types.ChannelOptions
+
+val ChannelBase.channelOptions: ChannelOptions?
+ get() = options