Skip to content

Commit

Permalink
Merge pull request #1054 from ably/fix/RTL13-channel-detach
Browse files Browse the repository at this point in the history
[RTL13] Fix existing impl. for server sent DETACH
  • Loading branch information
sacOO7 authored Jan 13, 2025
2 parents 2c0f111 + 8eec5b5 commit 15bc122
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 24 deletions.
36 changes: 16 additions & 20 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void attach(CompletionListener listener) throws AblyException {

void attach(boolean forceReattach, CompletionListener listener) {
clearAttachTimers();
attachWithTimeout(forceReattach, listener);
attachWithTimeout(forceReattach, listener, null);
}

/**
Expand All @@ -217,7 +217,7 @@ synchronized void transferQueuedPresenceMessages(List<QueuedMessage> messagesToT

private boolean attachResume;

private void attachImpl(final boolean forceReattach, final CompletionListener listener) throws AblyException {
private void attachImpl(final boolean forceReattach, final CompletionListener listener, ErrorInfo reattachmentReason) throws AblyException {
Log.v(TAG, "attach(); channel = " + name);
if(!forceReattach) {
/* check preconditions */
Expand All @@ -244,12 +244,12 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
}

// (RTL4i)
if (connectionManager.getConnectionState().state == ConnectionState.connecting
|| connectionManager.getConnectionState().state == ConnectionState.disconnected) {
ConnectionState connState = connectionManager.getConnectionState().state;
if (connState == ConnectionState.connecting || connState == ConnectionState.disconnected) {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
}
setState(ChannelState.attaching, null);
setState(ChannelState.attaching, reattachmentReason);
return;
}

Expand Down Expand Up @@ -277,7 +277,7 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
attachMessage.setFlag(Flag.attach_resume);
}

setState(ChannelState.attaching, null);
setState(ChannelState.attaching, reattachmentReason);
connectionManager.send(attachMessage, true, null);
} catch(AblyException e) {
throw e;
Expand Down Expand Up @@ -350,8 +350,9 @@ private void detachImpl(CompletionListener listener) throws AblyException {
default:
}
ConnectionManager connectionManager = ably.connection.connectionManager;
if(!connectionManager.isActive())
if(!connectionManager.isActive()) { // RTL5g
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());
}

sendDetachMessage(listener);
}
Expand Down Expand Up @@ -469,14 +470,14 @@ synchronized private void clearAttachTimers() {
}

private void attachWithTimeout(final CompletionListener listener) throws AblyException {
this.attachWithTimeout(false, listener);
this.attachWithTimeout(false, listener, null);
}

/**
* Attach channel, if not attached within timeout set state to suspended and
* set up timer to reattach it later
*/
synchronized private void attachWithTimeout(final boolean forceReattach, final CompletionListener listener) {
synchronized private void attachWithTimeout(final boolean forceReattach, final CompletionListener listener, ErrorInfo reattachmentReason) {
checkChannelIsNotReleased();
Timer currentAttachTimer;
try {
Expand All @@ -501,7 +502,7 @@ public void onError(ErrorInfo reason) {
clearAttachTimers();
callCompletionListenerError(listener, reason);
}
});
}, reattachmentReason);
} catch(AblyException e) {
attachTimer = null;
callCompletionListenerError(listener, e.errorInfo);
Expand Down Expand Up @@ -609,6 +610,7 @@ public void onError(ErrorInfo reason) {
detachImpl(completionListener);
} catch (AblyException e) {
attachTimer = null;
callCompletionListenerError(listener, e.errorInfo); // RTL5g
}

if(attachTimer == null) {
Expand Down Expand Up @@ -1296,18 +1298,12 @@ void onChannelMessage(ProtocolMessage msg) {
case detached:
ChannelState oldState = state;
switch(oldState) {
// RTL13a
case attached:
case suspended: //RTL13a
/* Unexpected detach, reattach when possible */
setDetached((msg.error != null) ? msg.error : REASON_NOT_ATTACHED);
case suspended:
/* Unexpected detach, reattach immediately as per RTL13a */
Log.v(TAG, String.format(Locale.ROOT, "Server initiated detach for channel %s; attempting reattach", name));
try {
attachWithTimeout(null);
} catch (AblyException e) {
/* Send message error */
Log.e(TAG, "Attempting reattach threw exception", e);
setDetached(e.errorInfo);
}
attachWithTimeout(true, null, msg.error);
break;
case attaching:
/* RTL13b says we need to be suspended, but continue to retry */
Expand Down
167 changes: 163 additions & 4 deletions lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.util.Log;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

Expand Down Expand Up @@ -944,6 +945,9 @@ public void attach_success_callback() {
}
}

/**
* Spec: RTL4g
*/
@Test
public void attach_success_callback_for_channel_in_failed_state() {
AblyRealtime ably = null;
Expand Down Expand Up @@ -1036,6 +1040,7 @@ public void attach_fail_callback() {
/**
* When client detaches from a channel successfully after initialized state,
* verify attach {@code CompletionListener#onSuccess()} gets called.
* Spec: RTL5a
*/
@Test
public void detach_success_callback_initialized() {
Expand Down Expand Up @@ -1068,6 +1073,9 @@ public void detach_success_callback_initialized() {
}
}

/**
* Spec: RTL5j
*/
@Test
public void detach_success_callback_on_suspended_state() {
AblyRealtime ably = null;
Expand Down Expand Up @@ -1105,6 +1113,9 @@ public void detach_success_callback_on_suspended_state() {
}
}

/**
* Spec: RTL5b
*/
@Test
public void detach_failure_callback_on_failed_state() {
AblyRealtime ably = null;
Expand Down Expand Up @@ -1146,6 +1157,79 @@ public void detach_failure_callback_on_failed_state() {
}
}

/**
* When connection is in failed or suspended, set error in callback
* Spec: RTL5g
*/
@Test
public void detach_fail_callback_for_connection_invalid_state() {
AblyRealtime ably = null;
try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
ably = new AblyRealtime(opts);
ConnectionWaiter connWaiter = new ConnectionWaiter(ably.connection);

/* wait until connected */
connWaiter.waitFor(ConnectionState.connected);

/* create a channel and attach */
final Channel channel = ably.channels.get("detach_failure");
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
channel.attach();
channelWaiter.waitFor(ChannelState.attached);

// Simulate connection closing from outside
ably.connection.connectionManager.requestState(new ConnectionManager.StateIndication(
ConnectionState.closing,
new ErrorInfo("Connection is closing", 80001)
));
/* wait until connection closing */
connWaiter.waitFor(ConnectionState.closing);

// channel state is ATTACHED despite closing connection state
assertEquals(ChannelState.attached, channel.state);

/* detach */
Helpers.CompletionWaiter detachWaiter1 = new Helpers.CompletionWaiter();
channel.detach(detachWaiter1);

/* Verify onSuccess callback gets called */
detachWaiter1.waitFor();
assertFalse(detachWaiter1.success);
assertNotNull(detachWaiter1.error);
assertEquals("Connection is closing", detachWaiter1.error.message);
assertEquals(80001, detachWaiter1.error.code);

// Simulate connection failure
ably.connection.connectionManager.requestState(ConnectionState.failed);
/* wait until connection failed */
connWaiter.waitFor(ConnectionState.failed);

// Mock channel state to ATTACHED despite failed connection state
channelWaiter.waitFor(ChannelState.failed);
channel.state = ChannelState.attached;
assertEquals(ChannelState.attached, channel.state);

/* detach */
Helpers.CompletionWaiter detachWaiter2 = new Helpers.CompletionWaiter();
channel.detach(detachWaiter2);

/* Verify onSuccess callback gets called */
detachWaiter2.waitFor();
assertFalse(detachWaiter2.success);
assertNotNull(detachWaiter2.error);
assertEquals("Connection failed", detachWaiter2.error.message);
assertEquals(80000, detachWaiter2.error.code);

} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
} finally {
if(ably != null)
ably.close();
}
}

/**
* When client detaches from a channel successfully after attached state,
* verify attach {@code CompletionListener#onSuccess()} gets called.
Expand Down Expand Up @@ -1183,6 +1267,7 @@ public void detach_success_callback_attached() throws AblyException {
/**
* When client detaches from a channel successfully after detaching state,
* verify attach {@code CompletionListener#onSuccess()} gets called.
* Spec: RTL5i
*/
@Test
public void detach_success_callback_detaching() throws AblyException {
Expand Down Expand Up @@ -1698,15 +1783,15 @@ public void channel_server_initiated_attached() throws AblyException {

/*
* Establish connection, attach channel, simulate sending detached messages
* from the server, test correct behaviour
* from the server for channel in attached state.
*
* Tests RTL13a
*/
@Test
public void channel_server_initiated_detached() throws AblyException {
public void server_initiated_detach_for_attached_channel() throws AblyException {
AblyRealtime ably = null;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
final String channelName = "channel_server_initiated_attach_detach";
final String channelName = "channel_server_initiated_detach_for_attached_channel";

try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
Expand All @@ -1728,13 +1813,87 @@ public void channel_server_initiated_detached() throws AblyException {
ProtocolMessage detachedMessage = new ProtocolMessage() {{
action = Action.detached;
channel = channelName;
error = new ErrorInfo("Simulated detach", 40000);
}};
ably.connection.connectionManager.onMessage(null, detachedMessage);

/* Channel should transition to attaching, then to attached */
channelWaiter.waitFor(ChannelState.attaching);
ErrorInfo detachErr = channelWaiter.waitFor(ChannelState.attaching);
Assert.assertNotNull(detachErr);
Assert.assertEquals(40000, detachErr.code);
Assert.assertEquals("Simulated detach", detachErr.message);

channelWaiter.waitFor(ChannelState.attached);

List<ChannelState> channelStates = channelWaiter.getRecordedStates();
Assert.assertEquals(4, channelStates.size());
Assert.assertEquals(ChannelState.attaching, channelStates.get(0));
Assert.assertEquals(ChannelState.attached, channelStates.get(1));
Assert.assertEquals(ChannelState.attaching, channelStates.get(2));
Assert.assertEquals(ChannelState.attached, channelStates.get(3));

} finally {
if (ably != null)
ably.close();
Defaults.realtimeRequestTimeout = oldRealtimeTimeout;
}
}

/*
* Establish connection, attach channel, simulate sending detached messages
* from the server for channel in suspended state.
*
* Tests RTL13a
*/
@Test
public void server_initiated_detach_for_suspended_channel() throws AblyException {
AblyRealtime ably = null;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
final String channelName = "channel_server_initiated_detach_for_suspended_channel";

try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);

/* Make test faster */
Defaults.realtimeRequestTimeout = 1000;
opts.channelRetryTimeout = 1000;

ably = new AblyRealtime(opts);
new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);

Channel channel = ably.channels.get(channelName);
ChannelWaiter channelWaiter = new ChannelWaiter(channel);

channel.attach();
channelWaiter.waitFor(ChannelState.attached);

channel.setSuspended(new ErrorInfo("Set state to suspended", 400), true);
channelWaiter.waitFor(ChannelState.suspended);

/* Inject detached message as if from the server */
ProtocolMessage detachedMessage = new ProtocolMessage() {{
action = Action.detached;
channel = channelName;
error = new ErrorInfo("Simulated detach", 40000);
}};
ably.connection.connectionManager.onMessage(null, detachedMessage);

/* Channel should transition to attaching, then to attached */
ErrorInfo detachError = channelWaiter.waitFor(ChannelState.attaching);
Assert.assertNotNull(detachError);
Assert.assertEquals(40000, detachError.code);
Assert.assertEquals("Simulated detach", detachError.message);

channelWaiter.waitFor(ChannelState.attached);

List<ChannelState> channelStates = channelWaiter.getRecordedStates();
Assert.assertEquals(5, channelStates.size());
Assert.assertEquals(ChannelState.attaching, channelStates.get(0));
Assert.assertEquals(ChannelState.attached, channelStates.get(1));
Assert.assertEquals(ChannelState.suspended, channelStates.get(2));
Assert.assertEquals(ChannelState.attaching, channelStates.get(3));
Assert.assertEquals(ChannelState.attached, channelStates.get(4));

} finally {
if (ably != null)
ably.close();
Expand Down

0 comments on commit 15bc122

Please sign in to comment.