diff --git a/src/watch.ts b/src/watch.ts index ac842180be..702f0f3da3 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -41,6 +41,7 @@ export class Watch { const controller = new AbortController(); requestInit.signal = controller.signal as AbortSignal; requestInit.method = 'GET'; + requestInit.timeout = 30000; let doneCalled: boolean = false; const doneCallOnce = (err: any) => { @@ -54,6 +55,12 @@ export class Watch { try { const response = await fetch(watchURL, requestInit); + if (requestInit.agent && typeof requestInit.agent === 'object') { + for (const socket of Object.values(requestInit.agent.sockets).flat()) { + socket?.setKeepAlive(true, 30000); + } + } + if (response.status === 200) { const body = response.body!; diff --git a/src/watch_test.ts b/src/watch_test.ts index 0e9c65c0e0..fd2fe60dd8 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -167,6 +167,114 @@ describe('Watch', () => { stream.destroy(); }); + it('should call setKeepAlive on the socket to extend the default of 5 mins', async () => { + const kc = new KubeConfig(); + + const mockSocket = { + setKeepAlive: function (enable: boolean, timeout: number) { + this.keepAliveEnabled = enable; + this.keepAliveTimeout = timeout; + }, + keepAliveEnabled: false, + keepAliveTimeout: 0, + }; + Object.assign(kc, { + ...fakeConfig, + applyToFetchOptions: async () => ({ + agent: { + sockets: { + 'mock-url': [mockSocket], + }, + }, + }), + }); + const watch = new Watch(kc); + + const obj1 = { + type: 'ADDED', + object: { + foo: 'bar', + }, + }; + + const path = '/some/path/to/object'; + + const stream = new PassThrough(); + + const [scope] = systemUnderTest(); + + let response: IncomingMessage | undefined; + + const s = scope + .get(path) + .query({ + watch: 'true', + a: 'b', + }) + .reply(200, function (): PassThrough { + this.req.on('response', (r) => { + response = r; + }); + stream.push(JSON.stringify(obj1) + '\n'); + return stream; + }); + + const receivedTypes: string[] = []; + const receivedObjects: string[] = []; + let doneCalled = 0; + let doneErr: any; + + let handledAllObjectsResolve: any; + const handledAllObjectsPromise = new Promise((resolve) => { + handledAllObjectsResolve = resolve; + }); + + let doneResolve: any; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + await watch.watch( + path, + { + a: 'b', + }, + (phase: string, obj: string) => { + receivedTypes.push(phase); + receivedObjects.push(obj); + if (receivedObjects.length) { + handledAllObjectsResolve(); + } + }, + (err: any) => { + doneCalled += 1; + doneErr = err; + doneResolve(); + }, + ); + + await handledAllObjectsPromise; + + deepStrictEqual(receivedTypes, [obj1.type]); + deepStrictEqual(receivedObjects, [obj1.object]); + + strictEqual(doneCalled, 0); + strictEqual(mockSocket.keepAliveEnabled, true); + strictEqual(mockSocket.keepAliveTimeout, 30000); + + const errIn = new Error('err'); + (response as IncomingMessage).destroy(errIn); + + await donePromise; + + strictEqual(doneCalled, 1); + deepStrictEqual(doneErr, errIn); + + s.done(); + + stream.destroy(); + }); + it('should handle server errors correctly', async () => { const kc = new KubeConfig(); Object.assign(kc, fakeConfig);