@@ -13,16 +13,20 @@ import com.intellij.util.application
13
13
import com.jetbrains.rd.platform.codeWithMe.portForwarding.*
14
14
import com.jetbrains.rd.util.URI
15
15
import com.jetbrains.rd.util.lifetime.Lifetime
16
+ import fleet.util.async.throttleLatest
16
17
import io.gitpod.supervisor.api.Status
17
18
import io.gitpod.supervisor.api.Status.PortsStatus
18
19
import io.gitpod.supervisor.api.StatusServiceGrpc
19
20
import io.grpc.stub.ClientCallStreamObserver
20
21
import io.grpc.stub.ClientResponseObserver
21
22
import kotlinx.coroutines.*
22
23
import kotlinx.coroutines.future.asDeferred
24
+ import kotlinx.coroutines.flow.MutableSharedFlow
23
25
import org.apache.http.client.utils.URIBuilder
24
26
import java.util.*
25
27
import java.util.concurrent.CompletableFuture
28
+ import kotlinx.coroutines.Dispatchers
29
+ import kotlinx.coroutines.withContext
26
30
27
31
@Suppress(" UnstableApiUsage" )
28
32
abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService {
@@ -34,8 +38,22 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
34
38
private val perClientPortForwardingManager = service<PerClientPortForwardingManager >()
35
39
private val ignoredPortsForNotificationService = service<GitpodIgnoredPortsForNotificationService >()
36
40
private val lifetime = Lifetime .Eternal .createNested()
41
+ private val portStatusFlow = MutableSharedFlow <Status .PortsStatusResponse >()
42
+
43
+ init {
44
+ // Start collecting port status updates with throttling
45
+ runJob(lifetime) {
46
+ portStatusFlow
47
+ .throttleLatest(1000 ) // Throttle to 1 second
48
+ .collect { response ->
49
+ withContext(Dispatchers .IO ) {
50
+ syncPortsListWithClient(response)
51
+ }
52
+ }
53
+ }
37
54
38
- init { start() }
55
+ start()
56
+ }
39
57
40
58
private fun start () {
41
59
if (application.isHeadlessEnvironment) return
@@ -86,7 +104,11 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
86
104
}
87
105
88
106
override fun onNext (response : Status .PortsStatusResponse ) {
89
- application.invokeLater { syncPortsListWithClient(response) }
107
+ application.invokeLater {
108
+ runJob(lifetime) {
109
+ portStatusFlow.emit(response)
110
+ }
111
+ }
90
112
}
91
113
92
114
override fun onCompleted () {
0 commit comments