@@ -6,10 +6,12 @@ import (
66 "encoding/json"
77 "net"
88 "strings"
9+ "sync"
910
1011 "github.com/coder/websocket"
1112 "github.com/coder/websocket/wsjson"
1213 "github.com/gin-gonic/gin"
14+ "github.com/jetkvm/kvm/internal/hidrpc"
1315 "github.com/jetkvm/kvm/internal/logging"
1416 "github.com/pion/webrtc/v4"
1517 "github.com/rs/zerolog"
@@ -23,9 +25,11 @@ type Session struct {
2325 HidChannel * webrtc.DataChannel
2426 shouldUmountVirtualMedia bool
2527
28+ rpcQueue chan webrtc.DataChannelMessage
29+
2630 hidRPCAvailable bool
27- hidQueue chan webrtc. DataChannelMessage
28- rpcQueue chan webrtc.DataChannelMessage
31+ hidQueueLock sync. Mutex
32+ hidQueue [] chan webrtc.DataChannelMessage
2933}
3034
3135type SessionConfig struct {
@@ -70,6 +74,23 @@ func (s *Session) ExchangeOffer(offerStr string) (string, error) {
7074 return base64 .StdEncoding .EncodeToString (localDescription ), nil
7175}
7276
77+ func (s * Session ) initQueues () {
78+ s .hidQueueLock .Lock ()
79+ defer s .hidQueueLock .Unlock ()
80+
81+ s .hidQueue = make ([]chan webrtc.DataChannelMessage , 0 )
82+ for i := 0 ; i < 4 ; i ++ {
83+ q := make (chan webrtc.DataChannelMessage , 256 )
84+ s .hidQueue = append (s .hidQueue , q )
85+ }
86+ }
87+
88+ func (s * Session ) handleQueues (index int ) {
89+ for msg := range s .hidQueue [index ] {
90+ onHidMessage (msg .Data , s )
91+ }
92+ }
93+
7394func newSession (config SessionConfig ) (* Session , error ) {
7495 webrtcSettingEngine := webrtc.SettingEngine {
7596 LoggerFactory : logging .GetPionDefaultLoggerFactory (),
@@ -111,7 +132,7 @@ func newSession(config SessionConfig) (*Session, error) {
111132
112133 session := & Session {peerConnection : peerConnection }
113134 session .rpcQueue = make (chan webrtc.DataChannelMessage , 256 )
114- session .hidQueue = make ( chan webrtc. DataChannelMessage , 256 )
135+ session .initQueues ( )
115136
116137 go func () {
117138 for msg := range session .rpcQueue {
@@ -120,11 +141,9 @@ func newSession(config SessionConfig) (*Session, error) {
120141 }
121142 }()
122143
123- go func () {
124- for msg := range session .hidQueue {
125- onHidMessage (msg .Data , session )
126- }
127- }()
144+ for i := 0 ; i < len (session .hidQueue ); i ++ {
145+ go session .handleQueues (i )
146+ }
128147
129148 peerConnection .OnDataChannel (func (d * webrtc.DataChannel ) {
130149 defer func () {
@@ -154,7 +173,19 @@ func newSession(config SessionConfig) (*Session, error) {
154173 l .Trace ().Msg ("received data in HID RPC message handler" )
155174
156175 // Enqueue to ensure ordered processing
157- session .hidQueue <- msg
176+ queueIndex := hidrpc .GetQueueIndex (hidrpc .MessageType (msg .Data [0 ]))
177+ if queueIndex >= len (session .hidQueue ) || queueIndex < 0 {
178+ l .Warn ().Int ("queueIndex" , queueIndex ).Msg ("received data in HID RPC message handler, but queue index not found" )
179+ queueIndex = 3
180+ }
181+
182+ queue := session .hidQueue [queueIndex ]
183+ if queue != nil {
184+ queue <- msg
185+ } else {
186+ l .Warn ().Int ("queueIndex" , queueIndex ).Msg ("received data in HID RPC message handler, but queue is nil" )
187+ return
188+ }
158189 })
159190 case "rpc" :
160191 session .RPCChannel = d
@@ -238,11 +269,13 @@ func newSession(config SessionConfig) (*Session, error) {
238269 close (session .rpcQueue )
239270 session .rpcQueue = nil
240271 }
272+
241273 // Stop HID RPC processor
242- if session .hidQueue != nil {
243- close (session .hidQueue )
244- session .hidQueue = nil
274+ for i := 0 ; i < len ( session .hidQueue ); i ++ {
275+ close (session .hidQueue [ i ] )
276+ session .hidQueue [ i ] = nil
245277 }
278+
246279 if session .shouldUmountVirtualMedia {
247280 if err := rpcUnmountImage (); err != nil {
248281 scopedLogger .Warn ().Err (err ).Msg ("unmount image failed on connection close" )
0 commit comments