@@ -79,26 +79,32 @@ export class NodeStreamLink implements ILink {
79
79
this . recvBuffer . splice ( 0 , this . recvBuffer . length , rest . at ( - 1 ) ! ) ;
80
80
}
81
81
82
- // Second, process the packets. This involves steps that may throw errors, so we catch
83
- // them all. Also, the stream is paused so that this event handler isn't re-entered despite
84
- // using `await` here .
85
- this . stream . pause ( ) ;
82
+ // Second, convert the packet text to JSON. This can throw errors e.g. if there is foreign
83
+ // data injected between server replies, or the server is malfunctioning. In that case,
84
+ // stop processing input .
85
+ const packets : proto . ServerPacket [ ] = [ ] ;
86
86
for ( const packetText of packetTexts ) {
87
87
try {
88
- const packet = JSON . parse ( packetText ) as proto . ServerPacket ;
88
+ packets . push ( JSON . parse ( packetText ) as proto . ServerPacket ) ;
89
+ } catch ( error ) {
90
+ console . error ( 'malformed JSON: ' , packetText ) ;
91
+ this . stream . pause ( ) ;
92
+ return ;
93
+ }
94
+ }
95
+
96
+ // Finally, run the handler for each of the packets. If the handler blocks, don't wait for
97
+ // its completion, but run the next handler anyway; this is because a handler can send
98
+ // another client packet, causing `onStreamData` to be re-entered, anyway.
99
+ for ( const packet of packets ) {
100
+ ( async ( packet : proto . ServerPacket ) => {
89
101
try {
90
102
await this . onRecv ( packet ) ;
91
- this . stream . resume ( ) ;
92
103
} catch ( error ) {
93
104
console . error ( 'uncaught error in onRecv' , error ) ;
94
- return ; // leave paused
95
105
}
96
- } catch ( error ) {
97
- console . error ( 'malformed JSON: ' , packetText ) ;
98
- return ; // leave paused
99
- }
106
+ } ) ( packet ) ;
100
107
}
101
- this . stream . resume ( ) ;
102
108
}
103
109
104
110
private async onStreamEnd ( ) : Promise < void > {
0 commit comments