Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RTL13] Fix existing impl. for server sent DETACH #1054

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void attach(CompletionListener listener) throws AblyException {

void attach(boolean forceReattach, CompletionListener listener) {
clearAttachTimers();
attachWithTimeout(forceReattach, listener);
attachWithTimeout(forceReattach, listener, null);
}

/**
Expand All @@ -217,7 +217,7 @@ synchronized void transferQueuedPresenceMessages(List<QueuedMessage> messagesToT

private boolean attachResume;

private void attachImpl(final boolean forceReattach, final CompletionListener listener) throws AblyException {
private void attachImpl(final boolean forceReattach, final CompletionListener listener, ErrorInfo reattachmentReason) throws AblyException {
Log.v(TAG, "attach(); channel = " + name);
if(!forceReattach) {
/* check preconditions */
Expand All @@ -244,12 +244,12 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
}

// (RTL4i)
if (connectionManager.getConnectionState().state == ConnectionState.connecting
|| connectionManager.getConnectionState().state == ConnectionState.disconnected) {
ConnectionState connState = connectionManager.getConnectionState().state;
if (connState == ConnectionState.connecting || connState == ConnectionState.disconnected) {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
}
setState(ChannelState.attaching, null);
setState(ChannelState.attaching, reattachmentReason);
return;
}

Expand Down Expand Up @@ -277,7 +277,7 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
attachMessage.setFlag(Flag.attach_resume);
}

setState(ChannelState.attaching, null);
setState(ChannelState.attaching, reattachmentReason);
connectionManager.send(attachMessage, true, null);
} catch(AblyException e) {
throw e;
Expand Down Expand Up @@ -350,8 +350,9 @@ private void detachImpl(CompletionListener listener) throws AblyException {
default:
}
ConnectionManager connectionManager = ably.connection.connectionManager;
if(!connectionManager.isActive())
if(!connectionManager.isActive()) { // RTL5g
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());
}

sendDetachMessage(listener);
}
Expand Down Expand Up @@ -469,14 +470,14 @@ synchronized private void clearAttachTimers() {
}

private void attachWithTimeout(final CompletionListener listener) throws AblyException {
this.attachWithTimeout(false, listener);
this.attachWithTimeout(false, listener, null);
}

/**
* Attach channel, if not attached within timeout set state to suspended and
* set up timer to reattach it later
*/
synchronized private void attachWithTimeout(final boolean forceReattach, final CompletionListener listener) {
synchronized private void attachWithTimeout(final boolean forceReattach, final CompletionListener listener, ErrorInfo reattachmentReason) {
checkChannelIsNotReleased();
Timer currentAttachTimer;
try {
Expand All @@ -501,7 +502,7 @@ public void onError(ErrorInfo reason) {
clearAttachTimers();
callCompletionListenerError(listener, reason);
}
});
}, reattachmentReason);
} catch(AblyException e) {
attachTimer = null;
callCompletionListenerError(listener, e.errorInfo);
Expand Down Expand Up @@ -609,6 +610,7 @@ public void onError(ErrorInfo reason) {
detachImpl(completionListener);
} catch (AblyException e) {
attachTimer = null;
callCompletionListenerError(listener, e.errorInfo); // RTL5g
}

if(attachTimer == null) {
Expand Down Expand Up @@ -1296,18 +1298,12 @@ void onChannelMessage(ProtocolMessage msg) {
case detached:
ChannelState oldState = state;
switch(oldState) {
// RTL13a
case attached:
case suspended: //RTL13a
/* Unexpected detach, reattach when possible */
setDetached((msg.error != null) ? msg.error : REASON_NOT_ATTACHED);
case suspended:
/* Unexpected detach, reattach immediately as per RTL13a */
Log.v(TAG, String.format(Locale.ROOT, "Server initiated detach for channel %s; attempting reattach", name));
try {
attachWithTimeout(null);
} catch (AblyException e) {
/* Send message error */
Log.e(TAG, "Attempting reattach threw exception", e);
setDetached(e.errorInfo);
}
attachWithTimeout(true, null, msg.error);
break;
case attaching:
/* RTL13b says we need to be suspended, but continue to retry */
Expand Down
167 changes: 163 additions & 4 deletions lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.util.Log;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

Expand Down Expand Up @@ -944,6 +945,9 @@ public void attach_success_callback() {
}
}

/**
* Spec: RTL4g
*/
@Test
public void attach_success_callback_for_channel_in_failed_state() {
AblyRealtime ably = null;
Expand Down Expand Up @@ -1036,6 +1040,7 @@ public void attach_fail_callback() {
/**
* When client detaches from a channel successfully after initialized state,
* verify attach {@code CompletionListener#onSuccess()} gets called.
* Spec: RTL5a
*/
@Test
public void detach_success_callback_initialized() {
Expand Down Expand Up @@ -1068,6 +1073,9 @@ public void detach_success_callback_initialized() {
}
}

/**
* Spec: RTL5j
*/
@Test
public void detach_success_callback_on_suspended_state() {
AblyRealtime ably = null;
Expand Down Expand Up @@ -1105,6 +1113,9 @@ public void detach_success_callback_on_suspended_state() {
}
}

/**
* Spec: RTL5b
*/
@Test
public void detach_failure_callback_on_failed_state() {
AblyRealtime ably = null;
Expand Down Expand Up @@ -1146,6 +1157,79 @@ public void detach_failure_callback_on_failed_state() {
}
}

/**
* When connection is in failed or suspended, set error in callback
* Spec: RTL5g
*/
@Test
public void detach_fail_callback_for_connection_invalid_state() {
AblyRealtime ably = null;
try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
ably = new AblyRealtime(opts);
ConnectionWaiter connWaiter = new ConnectionWaiter(ably.connection);

/* wait until connected */
connWaiter.waitFor(ConnectionState.connected);

/* create a channel and attach */
final Channel channel = ably.channels.get("detach_failure");
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
channel.attach();
channelWaiter.waitFor(ChannelState.attached);

// Simulate connection closing from outside
ably.connection.connectionManager.requestState(new ConnectionManager.StateIndication(
ConnectionState.closing,
new ErrorInfo("Connection is closing", 80001)
));
/* wait until connection closing */
connWaiter.waitFor(ConnectionState.closing);

// channel state is ATTACHED despite closing connection state
assertEquals(ChannelState.attached, channel.state);

/* detach */
Helpers.CompletionWaiter detachWaiter1 = new Helpers.CompletionWaiter();
channel.detach(detachWaiter1);

/* Verify onSuccess callback gets called */
detachWaiter1.waitFor();
assertFalse(detachWaiter1.success);
assertNotNull(detachWaiter1.error);
assertEquals("Connection is closing", detachWaiter1.error.message);
assertEquals(80001, detachWaiter1.error.code);

// Simulate connection failure
ably.connection.connectionManager.requestState(ConnectionState.failed);
/* wait until connection failed */
connWaiter.waitFor(ConnectionState.failed);

// Mock channel state to ATTACHED despite failed connection state
channelWaiter.waitFor(ChannelState.failed);
channel.state = ChannelState.attached;
assertEquals(ChannelState.attached, channel.state);

/* detach */
Helpers.CompletionWaiter detachWaiter2 = new Helpers.CompletionWaiter();
channel.detach(detachWaiter2);

/* Verify onSuccess callback gets called */
detachWaiter2.waitFor();
assertFalse(detachWaiter2.success);
assertNotNull(detachWaiter2.error);
assertEquals("Connection failed", detachWaiter2.error.message);
assertEquals(80000, detachWaiter2.error.code);

} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
} finally {
if(ably != null)
ably.close();
}
}

/**
* When client detaches from a channel successfully after attached state,
* verify attach {@code CompletionListener#onSuccess()} gets called.
Expand Down Expand Up @@ -1183,6 +1267,7 @@ public void detach_success_callback_attached() throws AblyException {
/**
* When client detaches from a channel successfully after detaching state,
* verify attach {@code CompletionListener#onSuccess()} gets called.
* Spec: RTL5i
*/
@Test
public void detach_success_callback_detaching() throws AblyException {
Expand Down Expand Up @@ -1698,15 +1783,15 @@ public void channel_server_initiated_attached() throws AblyException {

/*
* Establish connection, attach channel, simulate sending detached messages
* from the server, test correct behaviour
* from the server for channel in attached state.
*
* Tests RTL13a
*/
@Test
public void channel_server_initiated_detached() throws AblyException {
public void server_initiated_detach_for_attached_channel() throws AblyException {
AblyRealtime ably = null;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
final String channelName = "channel_server_initiated_attach_detach";
final String channelName = "channel_server_initiated_detach_for_attached_channel";

try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
Expand All @@ -1728,13 +1813,87 @@ public void channel_server_initiated_detached() throws AblyException {
ProtocolMessage detachedMessage = new ProtocolMessage() {{
action = Action.detached;
channel = channelName;
error = new ErrorInfo("Simulated detach", 40000);
}};
ably.connection.connectionManager.onMessage(null, detachedMessage);

/* Channel should transition to attaching, then to attached */
channelWaiter.waitFor(ChannelState.attaching);
ErrorInfo detachErr = channelWaiter.waitFor(ChannelState.attaching);
Assert.assertNotNull(detachErr);
Assert.assertEquals(40000, detachErr.code);
Assert.assertEquals("Simulated detach", detachErr.message);

channelWaiter.waitFor(ChannelState.attached);

List<ChannelState> channelStates = channelWaiter.getRecordedStates();
Assert.assertEquals(4, channelStates.size());
Assert.assertEquals(ChannelState.attaching, channelStates.get(0));
Assert.assertEquals(ChannelState.attached, channelStates.get(1));
Assert.assertEquals(ChannelState.attaching, channelStates.get(2));
Assert.assertEquals(ChannelState.attached, channelStates.get(3));

} finally {
if (ably != null)
ably.close();
Defaults.realtimeRequestTimeout = oldRealtimeTimeout;
}
}

/*
* Establish connection, attach channel, simulate sending detached messages
* from the server for channel in suspended state.
*
* Tests RTL13a
*/
@Test
public void server_initiated_detach_for_suspended_channel() throws AblyException {
AblyRealtime ably = null;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
final String channelName = "channel_server_initiated_detach_for_suspended_channel";

try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);

/* Make test faster */
Defaults.realtimeRequestTimeout = 1000;
opts.channelRetryTimeout = 1000;

ably = new AblyRealtime(opts);
new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);

Channel channel = ably.channels.get(channelName);
ChannelWaiter channelWaiter = new ChannelWaiter(channel);

channel.attach();
channelWaiter.waitFor(ChannelState.attached);

channel.setSuspended(new ErrorInfo("Set state to suspended", 400), true);
channelWaiter.waitFor(ChannelState.suspended);

/* Inject detached message as if from the server */
ProtocolMessage detachedMessage = new ProtocolMessage() {{
action = Action.detached;
channel = channelName;
error = new ErrorInfo("Simulated detach", 40000);
}};
ably.connection.connectionManager.onMessage(null, detachedMessage);

/* Channel should transition to attaching, then to attached */
ErrorInfo detachError = channelWaiter.waitFor(ChannelState.attaching);
Assert.assertNotNull(detachError);
Assert.assertEquals(40000, detachError.code);
Assert.assertEquals("Simulated detach", detachError.message);

channelWaiter.waitFor(ChannelState.attached);

List<ChannelState> channelStates = channelWaiter.getRecordedStates();
Assert.assertEquals(5, channelStates.size());
Assert.assertEquals(ChannelState.attaching, channelStates.get(0));
Assert.assertEquals(ChannelState.attached, channelStates.get(1));
Assert.assertEquals(ChannelState.suspended, channelStates.get(2));
Assert.assertEquals(ChannelState.attaching, channelStates.get(3));
Assert.assertEquals(ChannelState.attached, channelStates.get(4));

} finally {
if (ably != null)
ably.close();
Expand Down
Loading