Skip to content

Commit a1d444f

Browse files
author
Ross Anthony
committed
Reintroduce timeout and keep-alive for watch requests to match client-go
1 parent adf37cc commit a1d444f

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

src/watch.ts

+7
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export class Watch {
4141
const controller = new AbortController();
4242
requestInit.signal = controller.signal as AbortSignal;
4343
requestInit.method = 'GET';
44+
requestInit.timeout = 30000;
4445

4546
let doneCalled: boolean = false;
4647
const doneCallOnce = (err: any) => {
@@ -54,6 +55,12 @@ export class Watch {
5455
try {
5556
const response = await fetch(watchURL, requestInit);
5657

58+
if (requestInit.agent && typeof requestInit.agent === 'object') {
59+
for (const socket of Object.values(requestInit.agent.sockets).flat()) {
60+
socket?.setKeepAlive(true, 30000);
61+
}
62+
}
63+
5764
if (response.status === 200) {
5865
const body = response.body!;
5966

src/watch_test.ts

+108
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,114 @@ describe('Watch', () => {
167167
stream.destroy();
168168
});
169169

170+
it('should call setKeepAlive on the socket to extend the default of 5 mins', async () => {
171+
const kc = new KubeConfig();
172+
173+
const mockSocket = {
174+
setKeepAlive: function (enable: boolean, timeout: number) {
175+
this.keepAliveEnabled = enable;
176+
this.keepAliveTimeout = timeout;
177+
},
178+
keepAliveEnabled: false,
179+
keepAliveTimeout: 0,
180+
};
181+
Object.assign(kc, {
182+
...fakeConfig,
183+
applyToFetchOptions: async () => ({
184+
agent: {
185+
sockets: {
186+
'mock-url': [mockSocket],
187+
},
188+
},
189+
}),
190+
});
191+
const watch = new Watch(kc);
192+
193+
const obj1 = {
194+
type: 'ADDED',
195+
object: {
196+
foo: 'bar',
197+
},
198+
};
199+
200+
const path = '/some/path/to/object';
201+
202+
const stream = new PassThrough();
203+
204+
const [scope] = systemUnderTest();
205+
206+
let response: IncomingMessage | undefined;
207+
208+
const s = scope
209+
.get(path)
210+
.query({
211+
watch: 'true',
212+
a: 'b',
213+
})
214+
.reply(200, function (): PassThrough {
215+
this.req.on('response', (r) => {
216+
response = r;
217+
});
218+
stream.push(JSON.stringify(obj1) + '\n');
219+
return stream;
220+
});
221+
222+
const receivedTypes: string[] = [];
223+
const receivedObjects: string[] = [];
224+
let doneCalled = 0;
225+
let doneErr: any;
226+
227+
let handledAllObjectsResolve: any;
228+
const handledAllObjectsPromise = new Promise((resolve) => {
229+
handledAllObjectsResolve = resolve;
230+
});
231+
232+
let doneResolve: any;
233+
const donePromise = new Promise((resolve) => {
234+
doneResolve = resolve;
235+
});
236+
237+
await watch.watch(
238+
path,
239+
{
240+
a: 'b',
241+
},
242+
(phase: string, obj: string) => {
243+
receivedTypes.push(phase);
244+
receivedObjects.push(obj);
245+
if (receivedObjects.length) {
246+
handledAllObjectsResolve();
247+
}
248+
},
249+
(err: any) => {
250+
doneCalled += 1;
251+
doneErr = err;
252+
doneResolve();
253+
},
254+
);
255+
256+
await handledAllObjectsPromise;
257+
258+
deepStrictEqual(receivedTypes, [obj1.type]);
259+
deepStrictEqual(receivedObjects, [obj1.object]);
260+
261+
strictEqual(doneCalled, 0);
262+
strictEqual(mockSocket.keepAliveEnabled, true);
263+
strictEqual(mockSocket.keepAliveTimeout, 30000);
264+
265+
const errIn = new Error('err');
266+
(response as IncomingMessage).destroy(errIn);
267+
268+
await donePromise;
269+
270+
strictEqual(doneCalled, 1);
271+
deepStrictEqual(doneErr, errIn);
272+
273+
s.done();
274+
275+
stream.destroy();
276+
});
277+
170278
it('should handle server errors correctly', async () => {
171279
const kc = new KubeConfig();
172280
Object.assign(kc, fakeConfig);

0 commit comments

Comments
 (0)