@@ -89,13 +89,17 @@ public class Server<
8989                    try await  self . error ( . invalidRequestFormat( messageType:  . complete) ) 
9090                    return 
9191                } 
92-                 try await  self . onOperationComplete ( completeRequest. id ) 
92+                 try await  self . onOperationComplete ( completeRequest) 
9393            case  . unknown: 
9494                try await  self . error ( . invalidType( ) ) 
9595            } 
9696        } 
9797    } 
9898
99+     deinit  { 
100+         subscriptionTasks. values. forEach  {  $0. cancel ( )  } 
101+     } 
102+ 
99103    /// Define a custom callback run during `connection_init` resolution that allows authorization using the `payload`.
100104    /// Throw from this closure to indicate that authorization has failed.
101105    /// - Parameter callback: The callback to assign
@@ -171,18 +175,15 @@ public class Server<
171175                    let  stream  =  try await  onSubscribe ( graphQLRequest) 
172176                    for  try await  event  in  stream { 
173177                        try Task . checkCancellation ( ) 
174-                         do  { 
175-                             try await  self . sendNext ( event,  id:  id) 
176-                         }  catch  { 
177-                             try await  self . sendError ( error,  id:  id) 
178-                             throw  error
179-                         } 
178+                         try await  self . sendNext ( event,  id:  id) 
180179                    } 
181180                }  catch  { 
182181                    try await  sendError ( error,  id:  id) 
182+                     subscriptionTasks. removeValue ( forKey:  id) 
183183                    throw  error
184184                } 
185185                try await  self . sendComplete ( id:  id) 
186+                 subscriptionTasks. removeValue ( forKey:  id) 
186187            } 
187188        }  else  { 
188189            do  { 
@@ -196,6 +197,20 @@ public class Server<
196197        } 
197198    } 
198199
200+     private  func  onOperationComplete( _ completeRequest:  CompleteRequest )  async  throws  { 
201+         guard  initialized else  { 
202+             try await  error ( . notInitialized( ) ) 
203+             return 
204+         } 
205+ 
206+         let  id  =  completeRequest. id
207+         if  let  task =  subscriptionTasks [ id]  { 
208+             task. cancel ( ) 
209+             subscriptionTasks. removeValue ( forKey:  id) 
210+         } 
211+         try await  self . onOperationComplete ( id) 
212+     } 
213+ 
199214    /// Send a `connection_ack` response through the messenger
200215    private  func  sendConnectionAck( _ payload:  [ String :  Map ] ? =  nil )  async  throws  { 
201216        guard  let  messenger =  messenger else  {  return  } 
0 commit comments