Skip to content

Commit

Permalink
Merge pull request #1066 from ably/CHAT-5063/add-sdk-wrapper-kotlin
Browse files Browse the repository at this point in the history
[CHAT-5063] feat: add `createWrapperSdkProxy` implementation for kotlin adapter of ably-java
  • Loading branch information
ttypic authored Feb 24, 2025
2 parents c588af3 + 86027f5 commit 950ea19
Show file tree
Hide file tree
Showing 22 changed files with 903 additions and 44 deletions.
9 changes: 7 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ lombok = "8.10"
okhttp = "4.12.0"
test-retry = "1.6.0"
kotlin = "2.1.10"
coroutine = "1.9.0"
turbine = "1.2.0"

[libraries]
gson = { group = "com.google.code.gson", name = "gson", version.ref = "gson" }
Expand All @@ -41,11 +43,14 @@ dexmaker = { group = "com.crittercism.dexmaker", name = "dexmaker", version.ref
dexmaker-dx = { group = "com.crittercism.dexmaker", name = "dexmaker-dx", version.ref = "dexmaker" }
dexmaker-mockito = { group = "com.crittercism.dexmaker", name = "dexmaker-mockito", version.ref = "dexmaker" }
android-retrostreams = { group = "net.sourceforge.streamsupport", name = "android-retrostreams", version.ref = "android-retrostreams" }
okhttp = { group ="com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" }
okhttp = { group = "com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" }
coroutine-core = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutine" }
coroutine-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutine" }
turbine = { group = "app.cash.turbine", name = "turbine", version.ref = "turbine" }

[bundles]
common = ["msgpack", "vcdiff-core"]
tests = ["junit","hamcrest-all", "nanohttpd", "nanohttpd-nanolets", "nanohttpd-websocket", "mockito-core", "concurrentunit", "slf4j-simple"]
tests = ["junit", "hamcrest-all", "nanohttpd", "nanohttpd-nanolets", "nanohttpd-websocket", "mockito-core", "concurrentunit", "slf4j-simple"]
instrumental-android = ["android-test-runner", "android-test-rules", "dexmaker", "dexmaker-dx", "dexmaker-mockito", "android-retrostreams"]

[plugins]
Expand Down
13 changes: 13 additions & 0 deletions lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@ public AsyncHttpScheduler(HttpCore httpCore, ClientOptions options) {
super(httpCore, new CloseableThreadPoolExecutor(options));
}

private AsyncHttpScheduler(HttpCore httpCore, CloseableExecutor executor) {
super(httpCore, executor);
}

private static final long KEEP_ALIVE_TIME = 2000L;

protected static final String TAG = AsyncHttpScheduler.class.getName();

/**
* [Internal Method]
* <p>
* We use this method to implement proxy Realtime / Rest clients that add additional data to the underlying client.
*/
public AsyncHttpScheduler exchangeHttpCore(HttpCore httpCore) {
return new AsyncHttpScheduler(httpCore, this.executor);
}

private static class CloseableThreadPoolExecutor implements CloseableExecutor {
private final ThreadPoolExecutor executor;

Expand Down
9 changes: 9 additions & 0 deletions lib/src/main/java/io/ably/lib/http/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ public void close() throws Exception {
asyncHttp.close();
}

/**
* [Internal Method]
* <p>
* We use this method to implement proxy Realtime / Rest clients that add additional data to the underlying client.
*/
public Http exchangeHttpCore(HttpCore httpCore) {
return new Http(asyncHttp.exchangeHttpCore(httpCore), new SyncHttpScheduler(httpCore));
}

public class Request<Result> {
private final Execute<Result> execute;

Expand Down
36 changes: 35 additions & 1 deletion lib/src/main/java/io/ably/lib/http/HttpCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public class HttpCore {
private final HttpEngine engine;
private HttpAuth proxyAuth;

/**
* This field is used for analytics purposes.
* <p>
* It holds additional agents that should be added after the Realtime/Rest client is initialized.
* - **Static agents** are set in `ClientOptions`.
* - **Dynamic agents** are added later by higher-level SDKs like Chat or Asset Tracking
* and are provided in the `createWrapperSdkProxy` call.
*/
private Map<String, String> dynamicAgents;

/*************************
* Public API
*************************/
Expand Down Expand Up @@ -103,6 +113,18 @@ public HttpCore(ClientOptions options, Auth auth, PlatformAgentProvider platform
this.engine = engineFactory.create(new HttpEngineConfig(ClientOptionsUtils.convertToProxyConfig(options)));
}

private HttpCore(HttpCore underlyingHttpCore, Map<String, String> dynamicAgents) {
this.options = underlyingHttpCore.options;
this.auth = underlyingHttpCore.auth;
this.platformAgentProvider = underlyingHttpCore.platformAgentProvider;
this.scheme = underlyingHttpCore.scheme;
this.port = underlyingHttpCore.port;
this.hosts = underlyingHttpCore.hosts;
this.proxyAuth = underlyingHttpCore.proxyAuth;
this.engine = underlyingHttpCore.engine;
this.dynamicAgents = dynamicAgents;
}

/**
* Make a synchronous HTTP request specified by URL and proxy, retrying if necessary on WWW-Authenticate
*
Expand Down Expand Up @@ -307,7 +329,10 @@ private Map<String, String> collectRequestHeaders(URL url, String method, Param[

/* pass required headers */
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a
requestHeaders.put(Defaults.ABLY_AGENT_HEADER, AgentHeaderCreator.create(options.agents, platformAgentProvider));
Map<String, String> additionalAgents = new HashMap<>();
if (options.agents != null) additionalAgents.putAll(options.agents);
if (dynamicAgents != null) additionalAgents.putAll(dynamicAgents);
requestHeaders.put(Defaults.ABLY_AGENT_HEADER, AgentHeaderCreator.create(additionalAgents, platformAgentProvider));
if (options.clientId != null)
requestHeaders.put(Defaults.ABLY_CLIENT_ID_HEADER, Base64Coder.encodeString(options.clientId));

Expand Down Expand Up @@ -455,6 +480,15 @@ private Response executeRequest(HttpRequest request) {
return response;
}

/**
* [Internal Method]
* <p>
* We use this method to implement proxy Realtime / Rest clients that add additional agents to the underlying client.
*/
public HttpCore injectDynamicAgents(Map<String, String> wrapperSDKAgents) {
return new HttpCore(this, wrapperSDKAgents);
}

/**
* Interface for an entity that supplies an httpCore request body
*/
Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/http/HttpScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public <T> Future<T> ablyHttpExecuteWithRetry(
return request;
}

private final CloseableExecutor executor;
protected final CloseableExecutor executor;
private final HttpCore httpCore;

protected static final String TAG = HttpScheduler.class.getName();
Expand Down
17 changes: 13 additions & 4 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.TimerTask;

import io.ably.lib.http.BasePaginatedQuery;
import io.ably.lib.http.Http;
import io.ably.lib.http.HttpCore;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.transport.ConnectionManager;
Expand Down Expand Up @@ -1148,7 +1149,11 @@ else if(!"false".equalsIgnoreCase(param.value)) {
* @throws AblyException
*/
public PaginatedResult<Message> history(Param[] params) throws AblyException {
return historyImpl(params).sync();
return historyImpl(ably.http, params).sync();
}

PaginatedResult<Message> history(Http http, Param[] params) throws AblyException {
return historyImpl(http, params).sync();
}

/**
Expand Down Expand Up @@ -1177,18 +1182,22 @@ public PaginatedResult<Message> history(Param[] params) throws AblyException {
* @throws AblyException
*/
public void historyAsync(Param[] params, Callback<AsyncPaginatedResult<Message>> callback) {
historyImpl(params).async(callback);
historyAsync(ably.http, params, callback);
}

void historyAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Message>> callback) {
historyImpl(http, params).async(callback);
}

private BasePaginatedQuery.ResultRequest<Message> historyImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<Message> historyImpl(Http http, Param[] params) {
try {
params = replacePlaceholderParams((Channel) this, params);
} catch (AblyException e) {
return new BasePaginatedQuery.ResultRequest.Failed<Message>(e);
}

HttpCore.BodyHandler<Message> bodyHandler = MessageSerializer.getMessageResponseHandler(options);
return new BasePaginatedQuery<Message>(ably.http, basePath + "/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
return new BasePaginatedQuery<Message>(http, basePath + "/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
}

/************************************
Expand Down
17 changes: 13 additions & 4 deletions lib/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.ably.lib.realtime;

import io.ably.lib.http.BasePaginatedQuery;
import io.ably.lib.http.Http;
import io.ably.lib.http.HttpCore;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.transport.ConnectionManager;
Expand Down Expand Up @@ -794,7 +795,11 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
* @throws AblyException
*/
public PaginatedResult<PresenceMessage> history(Param[] params) throws AblyException {
return historyImpl(params).sync();
return history(channel.ably.http, params);
}

PaginatedResult<PresenceMessage> history(Http http, Param[] params) throws AblyException {
return historyImpl(http, params).sync();
}

/**
Expand All @@ -821,10 +826,14 @@ public PaginatedResult<PresenceMessage> history(Param[] params) throws AblyExcep
* @throws AblyException
*/
public void historyAsync(Param[] params, Callback<AsyncPaginatedResult<PresenceMessage>> callback) {
historyImpl(params).async(callback);
historyImpl(channel.ably.http, params).async(callback);
}

void historyAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<PresenceMessage>> callback) {
historyImpl(http, params).async(callback);
}

private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Http http, Param[] params) {
try {
params = Channel.replacePlaceholderParams(channel, params);
} catch (AblyException e) {
Expand All @@ -833,7 +842,7 @@ private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Param[] pa

AblyRealtime ably = channel.ably;
HttpCore.BodyHandler<PresenceMessage> bodyHandler = PresenceSerializer.getPresenceResponseHandler(channel.options);
return new BasePaginatedQuery<PresenceMessage>(ably.http, channel.basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
return new BasePaginatedQuery<PresenceMessage>(http, channel.basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler).get();
}

/**
Expand Down
32 changes: 28 additions & 4 deletions lib/src/main/java/io/ably/lib/rest/AblyBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ public void release(String channelName) {
* @throws AblyException
*/
public long time() throws AblyException {
return timeImpl().sync().longValue();
return time(http);
}

long time(Http http) throws AblyException {
return timeImpl(http).sync();
}

/**
Expand All @@ -196,10 +200,14 @@ public long time() throws AblyException {
* This callback is invoked on a background thread
*/
public void timeAsync(Callback<Long> callback) {
timeImpl().async(callback);
timeAsync(http, callback);
}

void timeAsync(Http http, Callback<Long> callback) {
timeImpl(http).async(callback);
}

private Http.Request<Long> timeImpl() {
private Http.Request<Long> timeImpl(Http http) {
final Param[] params = this.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c
return http.request(new Http.Execute<Long>() {
@Override
Expand Down Expand Up @@ -237,7 +245,11 @@ public Long handleResponse(HttpCore.Response response, ErrorInfo error) throws A
* @throws AblyException
*/
public PaginatedResult<Stats> stats(Param[] params) throws AblyException {
return new PaginatedQuery<Stats>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get();
return stats(http, params);
}

PaginatedResult<Stats> stats(Http http, Param[] params) throws AblyException {
return new PaginatedQuery<>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get();
}

/**
Expand All @@ -261,6 +273,10 @@ public PaginatedResult<Stats> stats(Param[] params) throws AblyException {
* This callback is invoked on a background thread
*/
public void statsAsync(Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
statsAsync(http, params, callback);
}

void statsAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
(new AsyncPaginatedQuery<Stats>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler)).get(callback);
}

Expand All @@ -284,6 +300,10 @@ public void statsAsync(Param[] params, Callback<AsyncPaginatedResult<Stats>> cal
* @throws AblyException if it was not possible to complete the request, or an error response was received
*/
public HttpPaginatedResponse request(String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers) throws AblyException {
return request(http, method, path, params, body, headers);
}

HttpPaginatedResponse request(Http http, String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers) throws AblyException {
headers = HttpUtils.mergeHeaders(HttpUtils.defaultAcceptHeaders(false), headers);
return new HttpPaginatedQuery(http, method, path, headers, params, body).exec();
}
Expand Down Expand Up @@ -311,6 +331,10 @@ public HttpPaginatedResponse request(String method, String path, Param[] params,
* This callback is invoked on a background thread
*/
public void requestAsync(String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers, final AsyncHttpPaginatedResponse.Callback callback) {
requestAsync(http, method, path, params, body, headers, callback);
}

void requestAsync(Http http, String method, String path, Param[] params, HttpCore.RequestBody body, Param[] headers, final AsyncHttpPaginatedResponse.Callback callback) {
headers = HttpUtils.mergeHeaders(HttpUtils.defaultAcceptHeaders(false), headers);
(new AsyncHttpPaginatedQuery(http, method, path, headers, params, body)).exec(callback);
}
Expand Down
Loading

0 comments on commit 950ea19

Please sign in to comment.