@@ -12,6 +12,7 @@ import kotlinx.serialization.builtins.ListSerializer
1212import kotlinx.serialization.json.Json
1313import kotlinx.serialization.json.JsonElement
1414import kotlinx.serialization.modules.SerializersModule
15+ import kotlin.coroutines.EmptyCoroutineContext
1516import kotlin.js.JsName
1617import kotlin.jvm.Synchronized
1718import kotlin.properties.ReadWriteProperty
@@ -33,7 +34,8 @@ abstract class PubSub {
3334 val logger = Logger <PubSub >()
3435 }
3536
36- open class Origin (val id : String ) {
37+ open class Origin (val id : String , clientId : String? ) {
38+ val clientId = clientId?.let { RpcClientId (it) }
3739 override fun toString (): String = " Origin($id )"
3840 }
3941
@@ -212,7 +214,9 @@ abstract class PubSub {
212214 val name = commandPort.name
213215 if (hasServerChannel(name)) error(" Command channel $name already exists." )
214216 putServerChannel(name,
215- ServerCommandChannel (commandPort) { command -> callback(command) })
217+ ServerCommandChannel (commandPort) { command ->
218+ callback(command)
219+ })
216220 }
217221
218222 class ServerCommandChannel <C , R >(
@@ -225,7 +229,7 @@ abstract class PubSub {
225229 fromConnection : Connection ,
226230 handlerScope : CoroutineScope
227231 ) {
228- handlerScope.launch {
232+ handlerScope.launch(fromConnection.clientId ? : EmptyCoroutineContext ) {
229233 try {
230234 val command = commandPort.fromJson(commandJson)
231235 val reply = callback.invoke(command)
@@ -271,7 +275,7 @@ abstract class PubSub {
271275 private val topics : Topics ,
272276 private val commandChannels : CommandChannels ,
273277 private val handlerScope : CoroutineScope
274- ) : Origin(" connection $name " ), Network.WebSocketListener {
278+ ) : Origin(" connection $name " , name ), Network.WebSocketListener {
275279 var isConnected: Boolean = false
276280 var everConnected: Boolean = false
277281
@@ -523,14 +527,19 @@ abstract class PubSub {
523527 httpServer : Network .HttpServer ,
524528 private val handlerScope : CoroutineScope
525529 ) : Endpoint(), IServer, RpcEndpoint {
526- private val publisher = Origin (" Server-side publisher" )
530+ private val publisher = Origin (" Server-side publisher" , null )
527531 private val topics: Topics = Topics ()
528532 override val commandChannels: CommandChannels = CommandChannels ()
529533 private val connectionListeners: MutableList < (ConnectionFromClient ) -> Unit > = mutableListOf ()
534+ private val connectionCountByHost = hashMapOf<String , Int >()
530535
531536 init {
532537 httpServer.listenWebSocket(" /sm/ws" ) { incomingConnection ->
533- val name = " server ${incomingConnection.toAddress} to ${incomingConnection.fromAddress} "
538+ val fromAddress = incomingConnection.fromAddress.toString()
539+ val number = connectionCountByHost[fromAddress] ? : 0
540+ connectionCountByHost[fromAddress] = number + 1
541+
542+ val name = " ${incomingConnection.fromAddress}${if (number > 0 ) " -$number " else " " } "
534543 ConnectionFromClient (name, topics, commandChannels, handlerScope)
535544 .also { connection -> connectionListeners.forEach { it.invoke(connection) } }
536545 }
@@ -664,7 +673,7 @@ abstract class PubSub {
664673
665674 @JsName(" subscribe" )
666675 fun <T > subscribe (topic : Topic <T >, onUpdateFn : (T ) -> Unit ): Channel <T > {
667- val subscriber = Origin (" Client-side subscriber at ${link.myAddress} " )
676+ val subscriber = Origin (" Client-side subscriber at ${link.myAddress} " , link.myAddress.toString() )
668677
669678 val topicName = topic.name
670679
0 commit comments