Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kRPC: Endpoints do not terminate when the transport does #100

Open
ShayOinif opened this issue Jun 13, 2024 · 17 comments
Open

kRPC: Endpoints do not terminate when the transport does #100

ShayOinif opened this issue Jun 13, 2024 · 17 comments

Comments

@ShayOinif
Copy link

ShayOinif commented Jun 13, 2024

Hello.
I don't know if it is an issue or not, but I definitely find it weird.
It seems like there is no way of telling when the client is disconnected like a status flow, or on disconnection lambda to register.
Right now when I consume flow and server goes away, the flow is now completed.
As a workaround I created my little helper flows (Also to maintain single client for my app), like so:

      internal val clientCoroutineScope = CoroutineScope(Job())
        
      internal val httpClientSharedFlow = channelFlow {
          val client = HttpClient { installRPC() }
          send(client)
          awaitClose {
              client.close()
          }
      }.shareIn(clientCoroutineScope, SharingStarted.WhileSubscribed(3.seconds), 1)
        
        internal sealed interface ClientState {
            data class Disconnected(val message: String) : ClientState
            data class Connected(val client: RPCClient) : ClientState
        }
        
        @OptIn(ExperimentalCoroutinesApi::class)
        internal val myServiceClientStateFlow = httpClientSharedFlow.flatMapLatest<HttpClient, ClientState> {
            val client = it.rpc {
                url("ws://0.0.0.0:3024/firstRoute")
                rpcConfig { serialization { json() } }
            }
        
            flow {
                emit(ClientState.Connected(client))
                client.coroutineContext.job.join()
            }.onCompletion {
                client.cancel()
                throw Throwable("Remote disconnected")
            }
        }.retryWhen { cause, _ ->
            repeat(3) {
                emit(ClientState.Disconnected("Error: $cause, retrying in ${3 - it} seconds..."))
                delay(1.seconds)
            }
            true
        }.stateIn(
            clientCoroutineScope,
            SharingStarted.WhileSubscribed(3.seconds),
            ClientState.Disconnected("Initial")
        )
        
        val uiClientStateFlow = myServiceClientStateFlow.map {
            when (it) {
                is ClientState.Connected -> "Connected"
                is ClientState.Disconnected -> "Disconnected - ${it.message}"
            }
        }
        
        @OptIn(ExperimentalCoroutinesApi::class, InternalRPCApi::class)
        val uiRpcFlow = myServiceClientStateFlow.transformLatest {
            if (it is ClientState.Connected) {
                streamScoped {
                    emitAll(
                        it.client.withService<MyService>().coldFlow(getPlatform().name)
                    )
                }
            }
        }.retryWhen { cause, attempt ->
            emit("Error - $attempt: $cause")
            delay(1.seconds)
            true
        }

I wonder if it is the intended use or am I missing something or should the interface or behavior change?
Thanks,
Shay Oinif

@Mr3zee
Copy link
Collaborator

Mr3zee commented Jun 28, 2024

Hi, thank you for the question!
I'm not sure what do you want to do. If a client or a service disconnects, its corresponding CoroutineScope will cancel. If the there was an error with a flow request - it should throw.
There is no retry mechanism right now, that is true

@ShayOinif
Copy link
Author

ShayOinif commented Jun 28, 2024

I simply expected if a service goes away, that the flow I am consuming will end no matter on what coroutine scope I collect it.

In a normal shutdown, krpc protocol sends an end message for all clients, but in a case of an unexpected crash, the coroutine scope of the backing - web socket with ktor krpc - ends but the corresponding client flow doesn't, unless collected on the client's coroutine scope.

@Mr3zee
Copy link
Collaborator

Mr3zee commented Jul 15, 2024

Hm, that sounds not right. Would you be able to provide a reproducer for that behaviour, please?

@ShayOinif
Copy link
Author

ShayOinif commented Jul 16, 2024

Clone this:
https://github.com/ShayOinif/MyExample

It builds on your samples but with only jvm server and compose desktop.

On completion of flow it should print done.

If server is around till end it prints done.

If I stop the server process while client collecting the flow simply is stuck and not finished.

@ShayOinif
Copy link
Author

The same applies with single shot requests.
I can create client.
Connect to service through web socket with ktor krpc.

Once it succeeds, all operations simply stuck if server goes away.
They don't even throw.

Tell me if you want me to add to this repo a button to make a single shot request so you could see what happens if initial communication to server succeeds but then it goes away.

@Mr3zee
Copy link
Collaborator

Mr3zee commented Jul 22, 2024

I can verify, that this is a bug, I'll rename the ticket

@Mr3zee Mr3zee changed the title Consuming flows from an RPC kRPC: Endpoints do not terminate when the transport does Jul 22, 2024
@Mr3zee
Copy link
Collaborator

Mr3zee commented Jul 22, 2024

@ShayOinif
Copy link
Author

I will verify.
I think it is the first thing I looked into and it worked as expected.
Think I used ktor 2.3.11

@ShayOinif
Copy link
Author

ShayOinif commented Jul 22, 2024

Mmmm, seems like it depends on api usage.

This server:

routing {
    webSocket("/") {
         repeat(10) {
            delay(1000)
             send("test $it")
         }
    }
}

And this client:

val client = HttpClient(CIO) {
    install(WebSockets)
}
runBlocking {
    client.webSocket(method = HttpMethod.Get, host = "127.0.0.1", port = 8080, path = "/") {
        while(true) {
            val othersMessage = incoming.receive() as? Frame.Text
            println(othersMessage?.readText())
        }
    }
}
client.close()

Doesn't reproduce the same issue.

@Mr3zee
Copy link
Collaborator

Mr3zee commented Jul 22, 2024

I guess it's specific to the webSocketSession then.
Anyway we also have the bug with flow cancellation, I verified without ktor, so it's two bugs in total

@Mr3zee
Copy link
Collaborator

Mr3zee commented Jul 22, 2024

Oh, and also in your repro there is no invokeOnCompletion, which is the bug indicator. The channel completed ok, but the job does not, that's why I used invokeOnCompletion in the ticket

@ShayOinif
Copy link
Author

Any update on this?
Seems like 0.2.-2/3 got better in many areas, so kudos.

But from last attempts, seems that new requests simply get cancelation if web socket session ended, but existing flow still doesn't end.

@ShayOinif
Copy link
Author

ShayOinif commented Aug 20, 2024

And in general, I think exposing some api to know when backing web socket is connected would be great.
Right now I simply poll myself the service with designated ping rpc and check that I didn't get cancelation exception on it (awkward).

@Mr3zee
Copy link
Collaborator

Mr3zee commented Aug 20, 2024

Hi! We fixed the issue from our side, but the bug persists in Ktor

UPD: maybe ktor specific, see youtrack.jetbrains.com/issue/KTOR-7234/WebSocketSessions-job-does-not-complete-on-abrupt-closure

@ShayOinif
Copy link
Author

OK, I see.
I guess then it is somewhat related, but for single shot rpc, if web socket session ended, you throw cancelation exception.
Any reason for that?
Inadvertently it makes calling code to get canceled as well.

@Mr3zee
Copy link
Collaborator

Mr3zee commented Aug 20, 2024

Probably just the current implementation approach. It may change later

@ShayOinif
Copy link
Author

ShayOinif commented Aug 21, 2024

And now I saw that you opened the issue for Ktor.
It is not quite accurate.

Client should be like this (and then it works):

    val ktorClient = HttpClient {
        install(WebSockets)
    }

    val session = ktorClient.webSocketSession(host = "localhost", port = 8080, path = "/test")

    session.coroutineContext.job.invokeOnCompletion {
        println("completed")
    }

    session.incoming.consumeEach {
        println((it as Frame.Text).readText())
    }

    session.coroutineContext.job.join()
    ktorClient.close()

Server easier to debug with this, but not a must:

        install(WebSockets)

        routing {
            webSocket("/test") {
                var i = 1
                while (true) {
                    delay(1.seconds)
                    outgoing.send(Frame.Text("test ${i++}"))
                }
            }
        }

But main take away from server is to install WeSockets plugin, not RPC plugin (This is your kotlinx-rpc plugin),
and that you didn't wait for the web socket session to end, so frames which are not control frames stop coming, and then you immediately close the client instead of waiting for session job to complete.
If you let it complete, then the completion clause prints completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants