@@ -167,6 +167,114 @@ describe('Watch', () => {
167
167
stream . destroy ( ) ;
168
168
} ) ;
169
169
170
+ it ( 'should call setKeepAlive on the socket to extend the default of 5 mins' , async ( ) => {
171
+ const kc = new KubeConfig ( ) ;
172
+
173
+ const mockSocket = {
174
+ setKeepAlive : function ( enable : boolean , timeout : number ) {
175
+ this . keepAliveEnabled = enable ;
176
+ this . keepAliveTimeout = timeout ;
177
+ } ,
178
+ keepAliveEnabled : false ,
179
+ keepAliveTimeout : 0 ,
180
+ } ;
181
+ Object . assign ( kc , {
182
+ ...fakeConfig ,
183
+ applyToFetchOptions : async ( ) => ( {
184
+ agent : {
185
+ sockets : {
186
+ 'mock-url' : [ mockSocket ] ,
187
+ } ,
188
+ } ,
189
+ } ) ,
190
+ } ) ;
191
+ const watch = new Watch ( kc ) ;
192
+
193
+ const obj1 = {
194
+ type : 'ADDED' ,
195
+ object : {
196
+ foo : 'bar' ,
197
+ } ,
198
+ } ;
199
+
200
+ const path = '/some/path/to/object' ;
201
+
202
+ const stream = new PassThrough ( ) ;
203
+
204
+ const [ scope ] = systemUnderTest ( ) ;
205
+
206
+ let response : IncomingMessage | undefined ;
207
+
208
+ const s = scope
209
+ . get ( path )
210
+ . query ( {
211
+ watch : 'true' ,
212
+ a : 'b' ,
213
+ } )
214
+ . reply ( 200 , function ( ) : PassThrough {
215
+ this . req . on ( 'response' , ( r ) => {
216
+ response = r ;
217
+ } ) ;
218
+ stream . push ( JSON . stringify ( obj1 ) + '\n' ) ;
219
+ return stream ;
220
+ } ) ;
221
+
222
+ const receivedTypes : string [ ] = [ ] ;
223
+ const receivedObjects : string [ ] = [ ] ;
224
+ let doneCalled = 0 ;
225
+ let doneErr : any ;
226
+
227
+ let handledAllObjectsResolve : any ;
228
+ const handledAllObjectsPromise = new Promise ( ( resolve ) => {
229
+ handledAllObjectsResolve = resolve ;
230
+ } ) ;
231
+
232
+ let doneResolve : any ;
233
+ const donePromise = new Promise ( ( resolve ) => {
234
+ doneResolve = resolve ;
235
+ } ) ;
236
+
237
+ await watch . watch (
238
+ path ,
239
+ {
240
+ a : 'b' ,
241
+ } ,
242
+ ( phase : string , obj : string ) => {
243
+ receivedTypes . push ( phase ) ;
244
+ receivedObjects . push ( obj ) ;
245
+ if ( receivedObjects . length ) {
246
+ handledAllObjectsResolve ( ) ;
247
+ }
248
+ } ,
249
+ ( err : any ) => {
250
+ doneCalled += 1 ;
251
+ doneErr = err ;
252
+ doneResolve ( ) ;
253
+ } ,
254
+ ) ;
255
+
256
+ await handledAllObjectsPromise ;
257
+
258
+ deepStrictEqual ( receivedTypes , [ obj1 . type ] ) ;
259
+ deepStrictEqual ( receivedObjects , [ obj1 . object ] ) ;
260
+
261
+ strictEqual ( doneCalled , 0 ) ;
262
+ strictEqual ( mockSocket . keepAliveEnabled , true ) ;
263
+ strictEqual ( mockSocket . keepAliveTimeout , 30000 ) ;
264
+
265
+ const errIn = new Error ( 'err' ) ;
266
+ ( response as IncomingMessage ) . destroy ( errIn ) ;
267
+
268
+ await donePromise ;
269
+
270
+ strictEqual ( doneCalled , 1 ) ;
271
+ deepStrictEqual ( doneErr , errIn ) ;
272
+
273
+ s . done ( ) ;
274
+
275
+ stream . destroy ( ) ;
276
+ } ) ;
277
+
170
278
it ( 'should handle server errors correctly' , async ( ) => {
171
279
const kc = new KubeConfig ( ) ;
172
280
Object . assign ( kc , fakeConfig ) ;
0 commit comments