From 2597986af4f10d0b2ed1df56df58d7a70bc7da82 Mon Sep 17 00:00:00 2001 From: David Gamero Date: Wed, 25 Mar 2026 22:54:55 -0400 Subject: [PATCH 1/5] refactor: complete node-fetch to undici migration --- examples/raw-example.js | 2 +- package-lock.json | 54 ---------- package.json | 2 - src/config.ts | 24 ----- src/config_test.ts | 45 +------- src/health.ts | 19 ++-- src/health_test.ts | 130 +++++++++++++++++------ src/index.ts | 3 - src/log.ts | 28 +++-- src/log_test.ts | 114 ++++++++++++-------- src/util.ts | 4 +- src/util_test.ts | 2 +- src/watch.ts | 27 ++--- src/watch_test.ts | 223 ++++++++++++---------------------------- 14 files changed, 286 insertions(+), 391 deletions(-) diff --git a/examples/raw-example.js b/examples/raw-example.js index a1e3c8182b..3eef12ecde 100644 --- a/examples/raw-example.js +++ b/examples/raw-example.js @@ -1,5 +1,5 @@ import * as k8s from '@kubernetes/client-node'; -import fetch from 'node-fetch'; +import { fetch } from 'undici'; import https from 'node:https'; const kc = new k8s.KubeConfig(); diff --git a/package-lock.json b/package-lock.json index 5a4de84f72..17f2adae6e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,14 +11,12 @@ "dependencies": { "@types/js-yaml": "^4.0.1", "@types/node": "^25.0.0", - "@types/node-fetch": "^2.6.13", "@types/stream-buffers": "^3.0.3", "form-data": "^4.0.0", "hpagent": "^1.2.0", "isomorphic-ws": "^5.0.0", "js-yaml": "^4.1.0", "jsonpath-plus": "^10.3.0", - "node-fetch": "^2.7.0", "openid-client": "^6.1.3", "rfc4648": "^1.3.0", "socks": "^2.8.4", @@ -936,16 +934,6 @@ "undici-types": "~7.18.0" } }, - "node_modules/@types/node-fetch": { - "version": "2.6.13", - "resolved": "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.13.tgz", - "integrity": "sha512-QGpRVpzSaUs30JBSGPjOg4Uveu384erbHBoT1zeONvyCfwQxIkUshLAOqN/k9EjGviPRmWTTe6aH2qySWKTVSw==", - "license": "MIT", - "dependencies": { - "@types/node": "*", - "form-data": "^4.0.4" - } - }, "node_modules/@types/stream-buffers": { "version": "3.0.8", "resolved": "https://registry.npmjs.org/@types/stream-buffers/-/stream-buffers-3.0.8.tgz", @@ -2675,26 +2663,6 @@ "node": ">=18.20.0 <20 || >=20.12.1" } }, - "node_modules/node-fetch": { - "version": "2.7.0", - "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz", - "integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==", - "license": "MIT", - "dependencies": { - "whatwg-url": "^5.0.0" - }, - "engines": { - "node": "4.x || >=6.0.0" - }, - "peerDependencies": { - "encoding": "^0.1.0" - }, - "peerDependenciesMeta": { - "encoding": { - "optional": true - } - } - }, "node_modules/oauth4webapi": { "version": "3.8.4", "resolved": "https://registry.npmjs.org/oauth4webapi/-/oauth4webapi-3.8.4.tgz", @@ -3178,12 +3146,6 @@ "url": "https://github.com/sponsors/SuperchupuDev" } }, - "node_modules/tr46": { - "version": "0.0.3", - "resolved": "https://registry.npmjs.org/tr46/-/tr46-0.0.3.tgz", - "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==", - "license": "MIT" - }, "node_modules/ts-api-utils": { "version": "2.4.0", "resolved": "https://registry.npmjs.org/ts-api-utils/-/ts-api-utils-2.4.0.tgz", @@ -3356,22 +3318,6 @@ "node": ">=10.12.0" } }, - "node_modules/webidl-conversions": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz", - "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==", - "license": "BSD-2-Clause" - }, - "node_modules/whatwg-url": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-5.0.0.tgz", - "integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==", - "license": "MIT", - "dependencies": { - "tr46": "~0.0.3", - "webidl-conversions": "^3.0.0" - } - }, "node_modules/which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", diff --git a/package.json b/package.json index 2be5d3f715..c9bcba5629 100644 --- a/package.json +++ b/package.json @@ -58,14 +58,12 @@ "dependencies": { "@types/js-yaml": "^4.0.1", "@types/node": "^25.0.0", - "@types/node-fetch": "^2.6.13", "@types/stream-buffers": "^3.0.3", "form-data": "^4.0.0", "hpagent": "^1.2.0", "isomorphic-ws": "^5.0.0", "js-yaml": "^4.1.0", "jsonpath-plus": "^10.3.0", - "node-fetch": "^2.7.0", "openid-client": "^6.1.3", "rfc4648": "^1.3.0", "socks": "^2.8.4", diff --git a/src/config.ts b/src/config.ts index bfe743c670..2145ebc662 100644 --- a/src/config.ts +++ b/src/config.ts @@ -6,7 +6,6 @@ import yaml from 'js-yaml'; import net from 'node:net'; import path from 'node:path'; -import { Headers, RequestInit } from 'node-fetch'; import { Agent as UndiciAgent, ProxyAgent as UndiciProxyAgent, @@ -229,29 +228,6 @@ export class KubeConfig implements SecurityAuthentication { this.makePathsAbsolute(rootDirectory); } - public async applyToFetchOptions(opts: https.RequestOptions): Promise { - await this.applyToHTTPSOptions(opts); - const headers = new Headers(); - for (const [key, val] of Object.entries(opts.headers || {})) { - if (Array.isArray(val)) { - val.forEach((innerVal) => { - headers.append(key, innerVal); - }); - } else if (typeof val === 'number' || typeof val === 'string') { - headers.set(key, val.toString()); - } - } - if (opts.auth) { - headers.set('Authorization', 'Basic ' + Buffer.from(opts.auth).toString('base64')); - } - return { - agent: opts.agent, - headers, - method: opts.method, - timeout: opts.timeout, - } as RequestInit; - } - public async applyToHTTPSOptions(opts: https.RequestOptions | WebSocket.ClientOptions): Promise { const user = this.getCurrentUser(); const cluster = this.getCurrentCluster(); diff --git a/src/config_test.ts b/src/config_test.ts index 2c8416e215..a1a6611425 100644 --- a/src/config_test.ts +++ b/src/config_test.ts @@ -11,16 +11,15 @@ import child_process from 'node:child_process'; import { readFileSync } from 'node:fs'; import https from 'node:https'; import http from 'node:http'; -import { Agent, RequestOptions } from 'node:https'; +import { RequestOptions } from 'node:https'; import path, { dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; import mockfs from 'mock-fs'; import { Authenticator } from './auth.js'; -import fetch, { Headers } from 'node-fetch'; import { HttpMethod } from './index.js'; -import { assertRequestAgentsEqual, assertRequestOptionsEqual } from './test/match-buffer.js'; +import { assertRequestOptionsEqual } from './test/match-buffer.js'; import { CoreV1Api, RequestContext } from './api.js'; import { bufferFromFileOrString, findHomeDir, findObject, KubeConfig, makeAbsolutePath } from './config.js'; import { ActionOnInvalid, Cluster, newClusters, newContexts, newUsers, User } from './config_types.js'; @@ -276,42 +275,6 @@ describe('KubeConfig', () => { }); }); - describe('applytoFetchOptions', () => { - it('should apply cert configs', async () => { - const kc = new KubeConfig(); - kc.loadFromFile(kcFileName); - kc.setCurrentContext('passwd'); - - const opts: https.RequestOptions = { - method: 'POST', - timeout: 5, - headers: { - number: 5, - string: 'str', - empty: undefined, - list: ['a', 'b'], - }, - }; - const requestInit = await kc.applyToFetchOptions(opts); - const expectedCA = Buffer.from('CADATA2', 'utf-8'); - const expectedAgent = new https.Agent({ - ca: expectedCA, - rejectUnauthorized: false, - }); - - strictEqual(requestInit.method, 'POST'); - // timeout has been removed from the spec. - strictEqual((requestInit as any).timeout, 5); - const headers = requestInit.headers as Headers; - strictEqual(Array.from(headers).length, 4); - strictEqual(headers.get('Authorization'), 'Basic Zm9vOmJhcg=='); - strictEqual(headers.get('list'), 'a, b'); - strictEqual(headers.get('number'), '5'); - strictEqual(headers.get('string'), 'str'); - assertRequestAgentsEqual(requestInit.agent as Agent, expectedAgent); - }); - }); - describe('applyHTTPSOptions', () => { it('should apply tls-server-name to https.RequestOptions', async () => { const kc = new KubeConfig(); @@ -625,10 +588,6 @@ describe('KubeConfig', () => { strictEqual(namespaceList.kind, 'NamespaceList'); strictEqual(namespaceList.items.length, 1); strictEqual(namespaceList.items[0].metadata?.name, 'default'); - - const res2 = await fetch(`https://${host}:${port}`, await kc.applyToFetchOptions({})); - strictEqual(res2.status, 200); - strictEqual(await res2.text(), 'ok'); }); }); diff --git a/src/health.ts b/src/health.ts index b26e3557b1..e03da628e9 100644 --- a/src/health.ts +++ b/src/health.ts @@ -1,6 +1,7 @@ -import fetch from 'node-fetch'; +import { fetch } from 'undici'; import { KubeConfig } from './config.js'; import { RequestOptions } from 'node:https'; +import { HttpMethod, RequestContext } from './gen/http/http.js'; export class Health { public config: KubeConfig; @@ -27,15 +28,17 @@ export class Health { throw new Error('No currently active cluster'); } - const requestURL = new URL(cluster.server + path); - const requestInit = await this.config.applyToFetchOptions(opts); - if (opts.signal) { - requestInit.signal = opts.signal; - } - requestInit.method = 'GET'; + const requestURL = cluster.server + path; + const ctx = new RequestContext(requestURL, HttpMethod.GET); + await this.config.applySecurityAuthentication(ctx); try { - const response = await fetch(requestURL.toString(), requestInit); + const response = await fetch(requestURL, { + method: 'GET', + headers: ctx.getHeaders(), + dispatcher: ctx.getDispatcher(), + signal: opts?.signal, + }); const status = response.status; if (status === 200) { return true; diff --git a/src/health_test.ts b/src/health_test.ts index 6e97b1ce4a..034165be8c 100644 --- a/src/health_test.ts +++ b/src/health_test.ts @@ -1,6 +1,6 @@ import { describe, it } from 'node:test'; import { rejects, strictEqual } from 'node:assert'; -import nock from 'nock'; +import { MockAgent, getGlobalDispatcher, setGlobalDispatcher } from 'undici'; import { KubeConfig } from './config.js'; import { Health } from './health.js'; @@ -20,12 +20,26 @@ describe('Health', () => { ].forEach((test) => { describe(test.path, () => { it('should throw an error if no current active cluster', async () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); const kc = new KubeConfig(); const health = new Health(kc); - await rejects(test.method(health, {}), { message: 'No currently active cluster' }); + + try { + await rejects(test.method(health, {}), { message: 'No currently active cluster' }); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it(`should return true if ${test.path} returns with status 200`, async () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -38,15 +52,25 @@ describe('Health', () => { } as User; kc.loadFromClusterAndUser(cluster, user); - const scope = nock('https://server.com').get(test.path).reply(200); + const pool = mockAgent.get('https://server.com'); + pool.intercept({ path: test.path, method: 'GET' }).reply(200); const health = new Health(kc); - const r = await test.method(health, {}); - strictEqual(r, true); - scope.done(); + try { + const r = await test.method(health, {}); + strictEqual(r, true); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it(`should return false if ${test.path} returns with status 500`, async () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -59,15 +83,25 @@ describe('Health', () => { } as User; kc.loadFromClusterAndUser(cluster, user); - const scope = nock('https://server.com').get(test.path).reply(500); + const pool = mockAgent.get('https://server.com'); + pool.intercept({ path: test.path, method: 'GET' }).reply(500); const health = new Health(kc); - const r = await test.method(health, {}); - strictEqual(r, false); - scope.done(); + try { + const r = await test.method(health, {}); + strictEqual(r, false); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it(`should return true if ${test.path} returns status 404 and /healthz returns status 200`, async () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -80,17 +114,26 @@ describe('Health', () => { } as User; kc.loadFromClusterAndUser(cluster, user); - const scope = nock('https://server.com'); - scope.get(test.path).reply(404); - scope.get('/healthz').reply(200); + const pool = mockAgent.get('https://server.com'); + pool.intercept({ path: test.path, method: 'GET' }).reply(404); + pool.intercept({ path: '/healthz', method: 'GET' }).reply(200); const health = new Health(kc); - const r = await test.method(health, {}); - strictEqual(r, true); - scope.done(); + try { + const r = await test.method(health, {}); + strictEqual(r, true); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it(`should return false if ${test.path} returns status 404 and /healthz returns status 500`, async () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -103,17 +146,26 @@ describe('Health', () => { } as User; kc.loadFromClusterAndUser(cluster, user); - const scope = nock('https://server.com'); - scope.get(test.path).reply(404); - scope.get('/healthz').reply(500); + const pool = mockAgent.get('https://server.com'); + pool.intercept({ path: test.path, method: 'GET' }).reply(404); + pool.intercept({ path: '/healthz', method: 'GET' }).reply(500); const health = new Health(kc); - const r = await test.method(health, {}); - strictEqual(r, false); - scope.done(); + try { + const r = await test.method(health, {}); + strictEqual(r, false); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it(`should return true if both ${test.path} and /healthz return status 404`, async () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -126,17 +178,26 @@ describe('Health', () => { } as User; kc.loadFromClusterAndUser(cluster, user); - const scope = nock('https://server.com'); - scope.get(test.path).reply(404); - scope.get('/healthz').reply(404); + const pool = mockAgent.get('https://server.com'); + pool.intercept({ path: test.path, method: 'GET' }).reply(404); + pool.intercept({ path: '/healthz', method: 'GET' }).reply(404); const health = new Health(kc); - const r = await test.method(health, {}); - strictEqual(r, true); - scope.done(); + try { + const r = await test.method(health, {}); + strictEqual(r, true); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it('should throw an error when fetch throws an error', async () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -149,12 +210,17 @@ describe('Health', () => { } as User; kc.loadFromClusterAndUser(cluster, user); - const scope = nock('https://server.com'); - scope.get(test.path).replyWithError(new Error('an error')); + const pool = mockAgent.get('https://server.com'); + pool.intercept({ path: test.path, method: 'GET' }).replyWithError(new Error('an error')); const health = new Health(kc); - await rejects(test.method(health, {}), { message: 'Error occurred in health request' }); - scope.done(); + try { + await rejects(test.method(health, {}), { message: 'Error occurred in health request' }); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); }); }); diff --git a/src/index.ts b/src/index.ts index fb6b1bba01..c77543056b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,6 +18,3 @@ export * from './health.js'; export * from './middleware.js'; export * from './patch.js'; export { type ConfigOptions, type User, type Cluster, type Context } from './config_types.js'; - -// Export AbortError and FetchError so that instanceof checks in user code will definitely use the same instances -export { AbortError, FetchError } from 'node-fetch'; diff --git a/src/log.ts b/src/log.ts index 419c8b9a0c..36093dc7a5 100644 --- a/src/log.ts +++ b/src/log.ts @@ -1,7 +1,8 @@ -import fetch from 'node-fetch'; -import { Writable } from 'node:stream'; +import { fetch } from 'undici'; +import { Readable, Writable } from 'node:stream'; import { ApiException } from './api.js'; import { KubeConfig } from './config.js'; +import { HttpMethod, RequestContext } from './gen/http/http.js'; import { V1Status } from './gen/index.js'; import { normalizeResponseHeaders } from './util.js'; @@ -129,18 +130,31 @@ export class Log { searchParams.set('container', containerName); AddOptionsToSearchParams(options, searchParams); - const requestInit = await this.config.applyToFetchOptions({}); + const ctx = new RequestContext(requestURL.toString(), HttpMethod.GET); + await this.config.applySecurityAuthentication(ctx); const controller = new AbortController(); - requestInit.signal = controller.signal as AbortSignal; - requestInit.method = 'GET'; try { - const response = await fetch(requestURL.toString(), requestInit); + const response = await fetch(requestURL.toString(), { + method: 'GET', + headers: ctx.getHeaders(), + dispatcher: ctx.getDispatcher(), + signal: controller.signal, + }); const status = response.status; if (status === 200) { // TODO: the follow search param still has the stream close prematurely based on my testing - response.body!.pipe(stream); + if (!response.body) { + throw new ApiException( + status, + 'Error occurred in log request', + undefined, + normalizeResponseHeaders(response), + ); + } + const nodeStream = Readable.fromWeb(response.body as any); + nodeStream.pipe(stream); } else if (status === 500) { const v1status = (await response.json()) as V1Status; const v1code = v1status.code; diff --git a/src/log_test.ts b/src/log_test.ts index f462fbbb5c..c988b25bad 100644 --- a/src/log_test.ts +++ b/src/log_test.ts @@ -1,6 +1,6 @@ -import { afterEach, describe, it } from 'node:test'; +import { describe, it } from 'node:test'; import { strictEqual, rejects, throws } from 'node:assert'; -import nock from 'nock'; +import { MockAgent, getGlobalDispatcher, setGlobalDispatcher } from 'undici'; import { AddOptionsToSearchParams, Log, LogOptions } from './log.js'; import { KubeConfig } from './config.js'; import { Writable } from 'node:stream'; @@ -35,9 +35,14 @@ describe('Log', () => { config.setCurrentContext('foo'); const log = new Log(config); - afterEach(() => { - nock.cleanAll(); - }); + const setupMockAgent = () => { + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + const pool = mockAgent.get('https://example.com'); + return { originalDispatcher, mockAgent, pool }; + }; it('should make a request with correct parameters', async () => { const namespace = 'default'; @@ -58,22 +63,20 @@ describe('Log', () => { timestamps: true, }; - nock('https://example.com') - .get('/api/v1/namespaces/default/pods/mypod/log') - .query({ - container: 'mycontainer', - follow: 'true', - limitBytes: '100', - pretty: 'true', - previous: 'true', - sinceSeconds: '1', - tailLines: '1', - timestamps: 'true', - }) - .reply(200, 'log data'); - - const controller = await log.log(namespace, podName, containerName, stream, options); - strictEqual(controller instanceof AbortController, true); + const { originalDispatcher, mockAgent, pool } = setupMockAgent(); + pool.intercept({ + method: 'GET', + path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer&follow=true&limitBytes=100&pretty=true&previous=true&sinceSeconds=1&tailLines=1×tamps=true', + }).reply(200, 'log data'); + + try { + const controller = await log.log(namespace, podName, containerName, stream, options); + strictEqual(controller instanceof AbortController, true); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it('should throw an error if no active cluster', async () => { @@ -103,14 +106,23 @@ describe('Log', () => { }, }); - nock('https://example.com') - .get('/api/v1/namespaces/default/pods/mypod/log') - .query({ container: 'mycontainer' }) - .reply(501, { message: 'Error occurred in log request' }); + const { originalDispatcher, mockAgent, pool } = setupMockAgent(); + pool.intercept({ + method: 'GET', + path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer', + }).reply(501, JSON.stringify({ message: 'Error occurred in log request' }), { + headers: { 'content-type': 'application/json' }, + }); - await rejects(async () => { - await log.log(namespace, podName, containerName, stream); - }, /Error occurred in log request/); + try { + await rejects(async () => { + await log.log(namespace, podName, containerName, stream); + }, /Error occurred in log request/); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it('should handle API exceptions on 500', async () => { @@ -123,14 +135,23 @@ describe('Log', () => { }, }); - nock('https://example.com') - .get('/api/v1/namespaces/default/pods/mypod/log') - .query({ container: 'mycontainer' }) - .reply(500, { message: 'Error occurred in log request' }); + const { originalDispatcher, mockAgent, pool } = setupMockAgent(); + pool.intercept({ + method: 'GET', + path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer', + }).reply(500, JSON.stringify({ message: 'Error occurred in log request' }), { + headers: { 'content-type': 'application/json' }, + }); - await rejects(async () => { - await log.log(namespace, podName, containerName, stream); - }, /Error occurred in log request/); + try { + await rejects(async () => { + await log.log(namespace, podName, containerName, stream); + }, /Error occurred in log request/); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it('should handle V1Status with code and message', async () => { @@ -157,14 +178,21 @@ describe('Log', () => { code: 404, }; - nock('https://example.com') - .get('/api/v1/namespaces/default/pods/mypod/log') - .query({ container: 'mycontainer' }) - .reply(500, v1Status); - - await rejects(async () => { - await log.log(namespace, podName, containerName, stream); - }, /Pod not found/); + const { originalDispatcher, mockAgent, pool } = setupMockAgent(); + pool.intercept({ + method: 'GET', + path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer', + }).reply(500, JSON.stringify(v1Status), { headers: { 'content-type': 'application/json' } }); + + try { + await rejects(async () => { + await log.log(namespace, podName, containerName, stream); + }, /Pod not found/); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); }); describe('AddOptionsToSearchParams', () => { diff --git a/src/util.ts b/src/util.ts index 8d81c54263..d9e8b6e32d 100644 --- a/src/util.ts +++ b/src/util.ts @@ -150,10 +150,8 @@ export function totalForResource(pod: V1Pod, resource: string): ResourceStatus { return new ResourceStatus(reqTotal, limitTotal, resource); } -// There is a disconnect between the ApiException headers and the response headers from fetch +// There is a disconnect between the ApiException headers and the response headers from fetch. // ApiException expects { [key: string]: string } whereas some fetch implementations provide: { [key: string]: string[] } -// https://github.com/node-fetch/node-fetch/issues/783 -// https://github.com/node-fetch/node-fetch/pull/1757 export function normalizeResponseHeaders(response: { headers: { entries(): Iterable<[string, string]> } }): { [key: string]: string; } { diff --git a/src/util_test.ts b/src/util_test.ts index a932b7bbd0..d5ec57ead2 100644 --- a/src/util_test.ts +++ b/src/util_test.ts @@ -1,6 +1,6 @@ import { describe, it } from 'node:test'; import { deepStrictEqual, strictEqual, throws } from 'node:assert'; -import { Response } from 'node-fetch'; +import { Response } from 'undici'; import { CoreV1Api, V1Container, V1Pod } from './api.js'; import { normalizeResponseHeaders, diff --git a/src/watch.ts b/src/watch.ts index 7664b07291..ecec1566a8 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -1,7 +1,9 @@ import { STATUS_CODES } from 'node:http'; import { createInterface } from 'node:readline'; -import fetch from 'node-fetch'; +import { Readable } from 'node:stream'; +import { fetch } from 'undici'; import { KubeConfig } from './config.js'; +import { HttpMethod, RequestContext } from './gen/http/http.js'; export class Watch { public static SERVER_SIDE_CLOSE: object = { error: 'Connection closed on server' }; @@ -37,12 +39,12 @@ export class Watch { } } - const requestInit = await this.config.applyToFetchOptions({}); - const controller = new AbortController(); const timeoutSignal = AbortSignal.timeout(this.requestTimeoutMs); - requestInit.signal = AbortSignal.any([controller.signal, timeoutSignal]); - requestInit.method = 'GET'; + const signal = AbortSignal.any([controller.signal, timeoutSignal]); + + const ctx = new RequestContext(watchURL.toString(), HttpMethod.GET); + await this.config.applySecurityAuthentication(ctx); let doneCalled: boolean = false; const doneCallOnce = (err: any) => { @@ -54,16 +56,15 @@ export class Watch { }; try { - const response = await fetch(watchURL, requestInit); - - if (requestInit.agent && typeof requestInit.agent === 'object') { - for (const socket of Object.values(requestInit.agent.sockets).flat()) { - socket?.setKeepAlive(true, 30000); - } - } + const response = await fetch(watchURL, { + method: 'GET', + headers: ctx.getHeaders(), + dispatcher: ctx.getDispatcher(), + signal, + }); if (response.status === 200) { - const body = response.body!; + const body = Readable.fromWeb(response.body! as any); body.on('error', doneCallOnce); body.on('close', () => doneCallOnce(null)); diff --git a/src/watch_test.ts b/src/watch_test.ts index 7be5c1f6c0..1106bd4a36 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -1,11 +1,10 @@ import { describe, it } from 'node:test'; import { deepStrictEqual, rejects, strictEqual } from 'node:assert'; -import nock from 'nock'; -import { PassThrough } from 'node:stream'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; import { KubeConfig } from './config.js'; import { Cluster, Context, User } from './config_types.js'; import { Watch } from './watch.js'; -import { ServerResponse, createServer } from 'node:http'; +import { IncomingMessage, ServerResponse, createServer } from 'node:http'; import { AddressInfo } from 'node:net'; const server = 'https://foo.company.com'; @@ -34,9 +33,11 @@ const fakeConfig: { ], }; -const systemUnderTest = (): [nock.Scope] => { - const scope = nock(fakeConfig.clusters[0].server); - return [scope]; +const systemUnderTest = (): [MockAgent] => { + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + return [mockAgent]; }; describe('Watch', () => { @@ -46,11 +47,12 @@ describe('Watch', () => { }); it('should handle error from request stream', async () => { + const originalDispatcher = getGlobalDispatcher(); + const [mockAgent] = systemUnderTest(); const kc = new KubeConfig(); const path = '/some/path/to/object?watch=true'; - - const [scope] = systemUnderTest(); - const s = scope.get(path).reply(500, 'Error: some error'); + const pool = mockAgent.get(fakeConfig.clusters[0].server); + pool.intercept({ path, method: 'GET' }).reply(500, 'Error: some error'); Object.assign(kc, fakeConfig); const watch = new Watch(kc); @@ -58,18 +60,23 @@ describe('Watch', () => { let doneCalled = false; let doneErr: any; - await watch.watch( - path, - {}, - (phase: string, obj: string) => {}, - (err: any) => { - doneCalled = true; - doneErr = err; - }, - ); - strictEqual(doneCalled, true); - strictEqual(doneErr.toString(), 'Error: Internal Server Error'); - s.done(); + try { + await watch.watch( + path, + {}, + (_phase: string, _obj: string) => {}, + (err: any) => { + doneCalled = true; + doneErr = err; + }, + ); + strictEqual(doneCalled, true); + strictEqual(doneErr.toString(), 'Error: Internal Server Error'); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); it('should not call watch done callback more than once', async (t) => { @@ -90,7 +97,7 @@ describe('Watch', () => { const path = '/some/path/to/object'; let response: ServerResponse | undefined; - const kc = await setupMockSystem(t, (req, res) => { + const kc = await setupMockSystem(t, (_req, res) => { response = res; res.write(JSON.stringify(obj1) + '\n'); res.write(JSON.stringify(obj2) + '\n'); @@ -140,7 +147,8 @@ describe('Watch', () => { response!.destroy(); await donePromise; strictEqual(doneCalled, 1); - strictEqual(doneErr.code, 'ERR_STREAM_PREMATURE_CLOSE'); + strictEqual(doneErr?.name, 'TypeError'); + strictEqual(doneErr?.message, 'terminated'); }); it('should not call the done callback more than once on unexpected connection loss', async (t) => { @@ -150,7 +158,7 @@ describe('Watch', () => { // // This reproduces a bug where AbortController.abort() inside // doneCallOnce could cause done() to be invoked twice. - const kc = await setupMockSystem(t, (req, res) => { + const kc = await setupMockSystem(t, (_req, res) => { res.writeHead(200, { 'Content-Type': 'application/json', 'Transfer-Encoding': 'chunked', @@ -181,91 +189,6 @@ describe('Watch', () => { strictEqual(doneCalled, 1); }); - it('should call setKeepAlive on the socket to extend the default of 5 mins', async (t) => { - let response: ServerResponse | undefined; - const kc = await setupMockSystem(t, (req, res) => { - response = res; - res.write(JSON.stringify(obj1) + '\n'); - }); - const mockSocket = { - setKeepAlive: function (enable: boolean, timeout: number) { - this.keepAliveEnabled = enable; - this.keepAliveTimeout = timeout; - }, - keepAliveEnabled: false, - keepAliveTimeout: 0, - }; - - (kc as any).applyToFetchOptions = async () => { - return { - agent: { - sockets: { - 'mock-url': [mockSocket], - }, - }, - }; - }; - const watch = new Watch(kc); - - const obj1 = { - type: 'ADDED', - object: { - foo: 'bar', - }, - }; - - const path = '/some/path/to/object'; - const receivedTypes: string[] = []; - const receivedObjects: string[] = []; - let doneCalled = 0; - let doneErr: any; - - let handledAllObjectsResolve: any; - const handledAllObjectsPromise = new Promise((resolve) => { - handledAllObjectsResolve = resolve; - }); - - let doneResolve: any; - const donePromise = new Promise((resolve) => { - doneResolve = resolve; - }); - - await watch.watch( - path, - { - a: 'b', - }, - (phase: string, obj: string) => { - receivedTypes.push(phase); - receivedObjects.push(obj); - if (receivedObjects.length) { - handledAllObjectsResolve(); - } - }, - (err: any) => { - doneCalled += 1; - doneErr = err; - doneResolve(); - }, - ); - - await handledAllObjectsPromise; - - deepStrictEqual(receivedTypes, [obj1.type]); - deepStrictEqual(receivedObjects, [obj1.object]); - - strictEqual(doneCalled, 0); - strictEqual(mockSocket.keepAliveEnabled, true); - strictEqual(mockSocket.keepAliveTimeout, 30000); - - response!.destroy(); - - await donePromise; - - strictEqual(doneCalled, 1); - strictEqual(doneErr.code, 'ERR_STREAM_PREMATURE_CLOSE'); - }); - it('should handle server errors correctly', async (t) => { const obj1 = { type: 'ADDED', @@ -275,7 +198,7 @@ describe('Watch', () => { }; const path = '/some/path/to/object?watch=true'; let response: ServerResponse | undefined; - const kc = await setupMockSystem(t, (req, res) => { + const kc = await setupMockSystem(t, (_req, res) => { response = res; res.write(JSON.stringify(obj1) + '\n'); }); @@ -324,14 +247,11 @@ describe('Watch', () => { await donePromise; strictEqual(doneErr.length, 1); - strictEqual(doneErr[0].code, 'ERR_STREAM_PREMATURE_CLOSE'); + strictEqual(doneErr[0]?.name, 'TypeError'); + strictEqual(doneErr[0]?.message, 'terminated'); }); - it('should handle server side close correctly', async () => { - const kc = new KubeConfig(); - Object.assign(kc, fakeConfig); - const watch = new Watch(kc); - + it('should handle server side close correctly', async (t) => { const obj1 = { type: 'ADDED', object: { @@ -339,17 +259,14 @@ describe('Watch', () => { }, }; - const stream = new PassThrough(); - - const [scope] = systemUnderTest(); + const kc = await setupMockSystem(t, (_req, res) => { + res.write(JSON.stringify(obj1) + '\n'); + res.end(); + }); + const watch = new Watch(kc); const path = '/some/path/to/object?watch=true'; - const s = scope.get(path).reply(200, function (): PassThrough { - stream.push(JSON.stringify(obj1) + '\n'); - return stream; - }); - const receivedTypes: string[] = []; const receivedObjects: string[] = []; const doneErr: any[] = []; @@ -386,23 +303,13 @@ describe('Watch', () => { deepStrictEqual(receivedObjects, [obj1.object]); strictEqual(doneErr.length, 0); - stream.end(); - await donePromise; strictEqual(doneErr.length, 1); strictEqual(doneErr[0], null); - - s.done(); - - stream.destroy(); }); - it('should ignore JSON parse errors', async () => { - const kc = new KubeConfig(); - Object.assign(kc, fakeConfig); - const watch = new Watch(kc); - + it('should ignore JSON parse errors', async (t) => { const obj = { type: 'MODIFIED', object: { @@ -410,18 +317,15 @@ describe('Watch', () => { }, }; - const stream = new PassThrough(); - - const [scope] = systemUnderTest(); + const kc = await setupMockSystem(t, (_req, res) => { + res.write(JSON.stringify(obj) + '\n'); + res.write('{"truncated json\n'); + res.end(); + }); + const watch = new Watch(kc); const path = '/some/path/to/object?watch=true'; - const s = scope.get(path).reply(200, () => { - stream.push(JSON.stringify(obj) + '\n'); - stream.write('{"truncated json\n'); - return stream; - }); - const receivedTypes: string[] = []; const receivedObjects: string[] = []; @@ -442,14 +346,10 @@ describe('Watch', () => { }, ); - stream.end(); - await donePromise; deepStrictEqual(receivedTypes, [obj.type]); deepStrictEqual(receivedObjects, [obj.object]); - - s.done(); }); it('should timeout when server takes too long to respond', async (t) => { @@ -482,7 +382,7 @@ describe('Watch', () => { await donePromise; - strictEqual(doneErr.name, 'AbortError'); + strictEqual(doneErr.name, 'TimeoutError'); }); it('should throw on empty config', async () => { @@ -499,15 +399,24 @@ describe('Watch', () => { }); }); -async function setupMockSystem(ctx, handler) { +type TestContextWithAfter = { + after: (fn: () => void) => void; +}; + +async function setupMockSystem( + ctx: TestContextWithAfter | undefined, + handler: (req: IncomingMessage, res: ServerResponse) => void, +) { const server = createServer(handler); - ctx.after(() => { - try { - server.close(); - } catch { - // Ignore errors during server close. - } - }); + if (ctx?.after) { + ctx.after(() => { + try { + server.close(); + } catch { + // Ignore errors during server close. + } + }); + } const port = await new Promise((resolve) => { server.listen(0, () => { resolve((server.address() as AddressInfo).port); From 300c014c902efda2a50845cf4dd1f41f48d33369 Mon Sep 17 00:00:00 2001 From: David Gamero Date: Wed, 25 Mar 2026 23:10:37 -0400 Subject: [PATCH 2/5] refactor: remove dead TLS code in applySecurityAuthentication, drop nock dependency, migrate cache_test to MockAgent --- package-lock.json | 97 ----------------------------------------------- package.json | 1 - src/cache_test.ts | 79 ++++++++++++++++++++------------------ src/config.ts | 8 ---- 4 files changed, 42 insertions(+), 143 deletions(-) diff --git a/package-lock.json b/package-lock.json index 17f2adae6e..a6e2855e5d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -34,7 +34,6 @@ "eslint-plugin-erasable-syntax-only": "^0.4.0", "husky": "^9.0.6", "mock-fs": "^5.2.0", - "nock": "^14.0.5", "prettier": "^3.0.0", "pretty-quick": "^4.0.0", "ts-mockito": "^2.3.1", @@ -766,49 +765,6 @@ "jsep": "^0.4.0||^1.0.0" } }, - "node_modules/@mswjs/interceptors": { - "version": "0.41.2", - "resolved": "https://registry.npmjs.org/@mswjs/interceptors/-/interceptors-0.41.2.tgz", - "integrity": "sha512-7G0Uf0yK3f2bjElBLGHIQzgRgMESczOMyYVasq1XK8P5HaXtlW4eQhz9MBL+TQILZLaruq+ClGId+hH0w4jvWw==", - "dev": true, - "license": "MIT", - "dependencies": { - "@open-draft/deferred-promise": "^2.2.0", - "@open-draft/logger": "^0.3.0", - "@open-draft/until": "^2.0.0", - "is-node-process": "^1.2.0", - "outvariant": "^1.4.3", - "strict-event-emitter": "^0.5.1" - }, - "engines": { - "node": ">=18" - } - }, - "node_modules/@open-draft/deferred-promise": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/@open-draft/deferred-promise/-/deferred-promise-2.2.0.tgz", - "integrity": "sha512-CecwLWx3rhxVQF6V4bAgPS5t+So2sTbPgAzafKkVizyi7tlwpcFpdFqq+wqF2OwNBmqFuu6tOyouTuxgpMfzmA==", - "dev": true, - "license": "MIT" - }, - "node_modules/@open-draft/logger": { - "version": "0.3.0", - "resolved": "https://registry.npmjs.org/@open-draft/logger/-/logger-0.3.0.tgz", - "integrity": "sha512-X2g45fzhxH238HKO4xbSr7+wBS8Fvw6ixhTDuvLd5mqh6bJJCFAPwU9mPDxbcrRtfxv4u5IHCEH77BmxvXmmxQ==", - "dev": true, - "license": "MIT", - "dependencies": { - "is-node-process": "^1.2.0", - "outvariant": "^1.4.0" - } - }, - "node_modules/@open-draft/until": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/@open-draft/until/-/until-2.1.0.tgz", - "integrity": "sha512-U69T3ItWHvLwGg5eJ0n3I62nWuE6ilHlmz7zM0npLBRvPRd7e6NYmg54vvRtP5mZG7kZqZCFVdsTWo7BPtBujg==", - "dev": true, - "license": "MIT" - }, "node_modules/@pkgr/core": { "version": "0.2.7", "resolved": "https://registry.npmjs.org/@pkgr/core/-/core-0.2.7.tgz", @@ -2300,13 +2256,6 @@ "node": ">=0.10.0" } }, - "node_modules/is-node-process": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/is-node-process/-/is-node-process-1.2.0.tgz", - "integrity": "sha512-Vg4o6/fqPxIjtxgUH5QLJhwZ7gW5diGCVlXpuUfELC62CuxM1iHcRe51f2W1FDy04Ai4KJkagKjx3XaqyfRKXw==", - "dev": true, - "license": "MIT" - }, "node_modules/isexe": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/isexe/-/isexe-2.0.0.tgz", @@ -2419,13 +2368,6 @@ "dev": true, "license": "MIT" }, - "node_modules/json-stringify-safe": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz", - "integrity": "sha512-ZClg6AaYvamvYEE82d3Iyd3vSSIjQ+odgjaTzRuO3s7toCdFKczob2i0zCh7JE8kWn17yvAWhUVxvqGwUalsRA==", - "dev": true, - "license": "ISC" - }, "node_modules/jsonpath-plus": { "version": "10.4.0", "resolved": "https://registry.npmjs.org/jsonpath-plus/-/jsonpath-plus-10.4.0.tgz", @@ -2648,21 +2590,6 @@ "dev": true, "license": "MIT" }, - "node_modules/nock": { - "version": "14.0.11", - "resolved": "https://registry.npmjs.org/nock/-/nock-14.0.11.tgz", - "integrity": "sha512-u5xUnYE+UOOBA6SpELJheMCtj2Laqx15Vl70QxKo43Wz/6nMHXS7PrEioXLjXAwhmawdEMNImwKCcPhBJWbKVw==", - "dev": true, - "license": "MIT", - "dependencies": { - "@mswjs/interceptors": "^0.41.0", - "json-stringify-safe": "^5.0.1", - "propagate": "^2.0.0" - }, - "engines": { - "node": ">=18.20.0 <20 || >=20.12.1" - } - }, "node_modules/oauth4webapi": { "version": "3.8.4", "resolved": "https://registry.npmjs.org/oauth4webapi/-/oauth4webapi-3.8.4.tgz", @@ -2712,13 +2639,6 @@ "node": ">= 0.8.0" } }, - "node_modules/outvariant": { - "version": "1.4.3", - "resolved": "https://registry.npmjs.org/outvariant/-/outvariant-1.4.3.tgz", - "integrity": "sha512-+Sl2UErvtsoajRDKCE5/dBz4DIvHXQQnAxtQTF04OJxY0+DyZXSo5P5Bb7XYWOh81syohlYL24hbDwxedPUJCA==", - "dev": true, - "license": "MIT" - }, "node_modules/p-limit": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz", @@ -2872,16 +2792,6 @@ "node": ">= 4" } }, - "node_modules/propagate": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/propagate/-/propagate-2.0.1.tgz", - "integrity": "sha512-vGrhOavPSTz4QVNuBNdcNXePNdNMaO1xj9yBeH1ScQPjk/rhg9sSlCXPhMkFuaNNW/syTvYqsnbIJxMBfRbbag==", - "dev": true, - "license": "MIT", - "engines": { - "node": ">= 8" - } - }, "node_modules/pump": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.2.tgz", @@ -3053,13 +2963,6 @@ "bare-events": "^2.2.0" } }, - "node_modules/strict-event-emitter": { - "version": "0.5.1", - "resolved": "https://registry.npmjs.org/strict-event-emitter/-/strict-event-emitter-0.5.1.tgz", - "integrity": "sha512-vMgjE/GGEPEFnhFub6pa4FmJBRBVOLpIII2hvCZ8Kzb7K0hlHo7mQv6xYrBvCL2LtAIBwFUK8wvuJgTVSQ5MFQ==", - "dev": true, - "license": "MIT" - }, "node_modules/supports-color": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", diff --git a/package.json b/package.json index c9bcba5629..c25b9268e1 100644 --- a/package.json +++ b/package.json @@ -81,7 +81,6 @@ "eslint-plugin-erasable-syntax-only": "^0.4.0", "husky": "^9.0.6", "mock-fs": "^5.2.0", - "nock": "^14.0.5", "prettier": "^3.0.0", "pretty-quick": "^4.0.0", "ts-mockito": "^2.3.1", diff --git a/src/cache_test.ts b/src/cache_test.ts index 9746425149..177e29c2d5 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -8,7 +8,7 @@ import { KubeConfig } from './config.js'; import { Cluster, Context, User } from './config_types.js'; import { ListPromise } from './informer.js'; -import nock from 'nock'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; import { Watch } from './watch.js'; const server = 'https://foo.company.com'; @@ -1474,51 +1474,56 @@ describe('ListWatchCache', () => { const informer = new ListWatch(path, watch, listFn, false, APP_LABEL_SELECTOR); - const scope = nock(fakeConfig.clusters[0].server); - const s = scope - .get(path) - .query({ - watch: true, - resourceVersion: '12345', - labelSelector: APP_LABEL_SELECTOR, - }) - .reply( + const originalDispatcher = getGlobalDispatcher(); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + + try { + const pool = mockAgent.get(fakeConfig.clusters[0].server); + pool.intercept({ + path: `${path}?watch=true&resourceVersion=12345&labelSelector=app%3Dfoo`, + method: 'GET', + }).reply( 200, - () => - JSON.stringify({ - type: 'ADDED', - object: { - metadata: { - name: 'name3', - labels: { - app: 'foo3', - }, - } as V1ObjectMeta, - }, - }) + '\n', + JSON.stringify({ + type: 'ADDED', + object: { + metadata: { + name: 'name3', + labels: { + app: 'foo3', + }, + } as V1ObjectMeta, + }, + }) + '\n', ); - await informer.start(); + await informer.start(); - let doneResolve: any; - const donePromise = new Promise((resolve) => { - doneResolve = resolve; - }); + let doneResolve: any; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); - informer.on('add', doneResolve); + informer.on('add', doneResolve); - const value = await donePromise; + const value = await donePromise; - deepStrictEqual(value, { - metadata: { - labels: { - app: 'foo3', + deepStrictEqual(value, { + metadata: { + labels: { + app: 'foo3', + }, + name: 'name3', }, - name: 'name3', - }, - }); + }); - s.done(); + mockAgent.assertNoPendingInterceptors(); + } finally { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + } }); }); diff --git a/src/config.ts b/src/config.ts index 2145ebc662..a2ab4b6bee 100644 --- a/src/config.ts +++ b/src/config.ts @@ -284,14 +284,6 @@ export class KubeConfig implements SecurityAuthentication { await this.applyOptions(httpsOptions); - if (cluster && cluster.skipTLSVerify) { - agentOptions.rejectUnauthorized = false; - } - - if (cluster && cluster.tlsServerName) { - agentOptions.servername = cluster.tlsServerName; - } - if (user && user.username) { const auth = Buffer.from(`${user.username}:${user.password}`).toString('base64'); context.setHeaderParam('Authorization', `Basic ${auth}`); From 2ed869f71724736adc10510308a5611499dab2b0 Mon Sep 17 00:00:00 2001 From: David Gamero Date: Thu, 26 Mar 2026 00:48:00 -0400 Subject: [PATCH 3/5] refactor(tests): deduplicate MockAgent setup into beforeEach/afterEach hooks --- src/cache_test.ts | 78 +-- src/health_test.ts | 117 ++--- src/integration_test.ts | 31 +- src/log_test.ts | 78 ++- src/metrics_test.ts | 264 +++++----- src/object_test.ts | 1067 ++++++++++++++++----------------------- src/top_test.ts | 149 +++--- src/watch_test.ts | 57 ++- 8 files changed, 768 insertions(+), 1073 deletions(-) diff --git a/src/cache_test.ts b/src/cache_test.ts index 177e29c2d5..9e25234b96 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -1432,7 +1432,7 @@ describe('ListWatchCache', () => { strictEqual(listCalls, 2); }); - it('should send label selector', async () => { + it('should send label selector', async (t) => { const APP_LABEL_SELECTOR = 'app=foo'; const list: V1Namespace[] = [ @@ -1479,51 +1479,51 @@ describe('ListWatchCache', () => { setGlobalDispatcher(mockAgent); mockAgent.disableNetConnect(); - try { - const pool = mockAgent.get(fakeConfig.clusters[0].server); - pool.intercept({ - path: `${path}?watch=true&resourceVersion=12345&labelSelector=app%3Dfoo`, - method: 'GET', - }).reply( - 200, - JSON.stringify({ - type: 'ADDED', - object: { - metadata: { - name: 'name3', - labels: { - app: 'foo3', - }, - } as V1ObjectMeta, - }, - }) + '\n', - ); + t.after(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); - await informer.start(); + const pool = mockAgent.get(fakeConfig.clusters[0].server); + pool.intercept({ + path: `${path}?watch=true&resourceVersion=12345&labelSelector=app%3Dfoo`, + method: 'GET', + }).reply( + 200, + JSON.stringify({ + type: 'ADDED', + object: { + metadata: { + name: 'name3', + labels: { + app: 'foo3', + }, + } as V1ObjectMeta, + }, + }) + '\n', + ); - let doneResolve: any; - const donePromise = new Promise((resolve) => { - doneResolve = resolve; - }); + await informer.start(); - informer.on('add', doneResolve); + let doneResolve: any; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); - const value = await donePromise; + informer.on('add', doneResolve); - deepStrictEqual(value, { - metadata: { - labels: { - app: 'foo3', - }, - name: 'name3', + const value = await donePromise; + + deepStrictEqual(value, { + metadata: { + labels: { + app: 'foo3', }, - }); + name: 'name3', + }, + }); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + mockAgent.assertNoPendingInterceptors(); }); }); diff --git a/src/health_test.ts b/src/health_test.ts index 034165be8c..c0c4e89fb4 100644 --- a/src/health_test.ts +++ b/src/health_test.ts @@ -1,6 +1,6 @@ -import { describe, it } from 'node:test'; +import { afterEach, beforeEach, describe, it } from 'node:test'; import { rejects, strictEqual } from 'node:assert'; -import { MockAgent, getGlobalDispatcher, setGlobalDispatcher } from 'undici'; +import { MockAgent, getGlobalDispatcher, setGlobalDispatcher, type Dispatcher } from 'undici'; import { KubeConfig } from './config.js'; import { Health } from './health.js'; @@ -19,27 +19,28 @@ describe('Health', () => { }, ].forEach((test) => { describe(test.path, () => { - it('should throw an error if no current active cluster', async () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); setGlobalDispatcher(mockAgent); mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + + it('should throw an error if no current active cluster', async () => { const kc = new KubeConfig(); const health = new Health(kc); - - try { - await rejects(test.method(health, {}), { message: 'No currently active cluster' }); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(test.method(health, {}), { message: 'No currently active cluster' }); }); it(`should return true if ${test.path} returns with status 200`, async () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -56,21 +57,12 @@ describe('Health', () => { pool.intercept({ path: test.path, method: 'GET' }).reply(200); const health = new Health(kc); - try { - const r = await test.method(health, {}); - strictEqual(r, true); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const r = await test.method(health, {}); + strictEqual(r, true); + mockAgent.assertNoPendingInterceptors(); }); it(`should return false if ${test.path} returns with status 500`, async () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -87,21 +79,12 @@ describe('Health', () => { pool.intercept({ path: test.path, method: 'GET' }).reply(500); const health = new Health(kc); - try { - const r = await test.method(health, {}); - strictEqual(r, false); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const r = await test.method(health, {}); + strictEqual(r, false); + mockAgent.assertNoPendingInterceptors(); }); it(`should return true if ${test.path} returns status 404 and /healthz returns status 200`, async () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -119,21 +102,12 @@ describe('Health', () => { pool.intercept({ path: '/healthz', method: 'GET' }).reply(200); const health = new Health(kc); - try { - const r = await test.method(health, {}); - strictEqual(r, true); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const r = await test.method(health, {}); + strictEqual(r, true); + mockAgent.assertNoPendingInterceptors(); }); it(`should return false if ${test.path} returns status 404 and /healthz returns status 500`, async () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -151,21 +125,12 @@ describe('Health', () => { pool.intercept({ path: '/healthz', method: 'GET' }).reply(500); const health = new Health(kc); - try { - const r = await test.method(health, {}); - strictEqual(r, false); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const r = await test.method(health, {}); + strictEqual(r, false); + mockAgent.assertNoPendingInterceptors(); }); it(`should return true if both ${test.path} and /healthz return status 404`, async () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -183,21 +148,12 @@ describe('Health', () => { pool.intercept({ path: '/healthz', method: 'GET' }).reply(404); const health = new Health(kc); - try { - const r = await test.method(health, {}); - strictEqual(r, true); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const r = await test.method(health, {}); + strictEqual(r, true); + mockAgent.assertNoPendingInterceptors(); }); it('should throw an error when fetch throws an error', async () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -214,13 +170,8 @@ describe('Health', () => { pool.intercept({ path: test.path, method: 'GET' }).replyWithError(new Error('an error')); const health = new Health(kc); - try { - await rejects(test.method(health, {}), { message: 'Error occurred in health request' }); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(test.method(health, {}), { message: 'Error occurred in health request' }); + mockAgent.assertNoPendingInterceptors(); }); }); }); diff --git a/src/integration_test.ts b/src/integration_test.ts index beb46a7ad4..1a8b074dcf 100644 --- a/src/integration_test.ts +++ b/src/integration_test.ts @@ -1,6 +1,6 @@ -import { describe, it } from 'node:test'; +import { afterEach, beforeEach, describe, it } from 'node:test'; import { deepEqual } from 'node:assert'; -import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher, type Dispatcher } from 'undici'; import { CoreV1Api } from './api.js'; import { KubeConfig } from './config.js'; @@ -8,7 +8,22 @@ import { Cluster, User } from './config_types.js'; describe('FullRequest', () => { describe('getPods', () => { - it('should get pods successfully', async (t) => { + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + + it('should get pods successfully', async () => { const kc = new KubeConfig(); const cluster = { name: 'foo', @@ -32,16 +47,6 @@ describe('FullRequest', () => { }; const auth = Buffer.from(`${username}:${password}`).toString('base64'); - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); - - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); - const pool = mockAgent.get('https://nowhere.foo'); pool.intercept({ path: '/api/v1/namespaces/default/pods', diff --git a/src/log_test.ts b/src/log_test.ts index c988b25bad..4210e5ab3a 100644 --- a/src/log_test.ts +++ b/src/log_test.ts @@ -1,6 +1,6 @@ -import { describe, it } from 'node:test'; +import { afterEach, beforeEach, describe, it } from 'node:test'; import { strictEqual, rejects, throws } from 'node:assert'; -import { MockAgent, getGlobalDispatcher, setGlobalDispatcher } from 'undici'; +import { MockAgent, getGlobalDispatcher, setGlobalDispatcher, type Dispatcher } from 'undici'; import { AddOptionsToSearchParams, Log, LogOptions } from './log.js'; import { KubeConfig } from './config.js'; import { Writable } from 'node:stream'; @@ -35,14 +35,22 @@ describe('Log', () => { config.setCurrentContext('foo'); const log = new Log(config); - const setupMockAgent = () => { - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + let pool: ReturnType; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); setGlobalDispatcher(mockAgent); mockAgent.disableNetConnect(); - const pool = mockAgent.get('https://example.com'); - return { originalDispatcher, mockAgent, pool }; - }; + pool = mockAgent.get('https://example.com'); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); it('should make a request with correct parameters', async () => { const namespace = 'default'; @@ -63,20 +71,14 @@ describe('Log', () => { timestamps: true, }; - const { originalDispatcher, mockAgent, pool } = setupMockAgent(); pool.intercept({ method: 'GET', path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer&follow=true&limitBytes=100&pretty=true&previous=true&sinceSeconds=1&tailLines=1×tamps=true', }).reply(200, 'log data'); - try { - const controller = await log.log(namespace, podName, containerName, stream, options); - strictEqual(controller instanceof AbortController, true); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const controller = await log.log(namespace, podName, containerName, stream, options); + strictEqual(controller instanceof AbortController, true); + mockAgent.assertNoPendingInterceptors(); }); it('should throw an error if no active cluster', async () => { @@ -106,7 +108,6 @@ describe('Log', () => { }, }); - const { originalDispatcher, mockAgent, pool } = setupMockAgent(); pool.intercept({ method: 'GET', path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer', @@ -114,15 +115,10 @@ describe('Log', () => { headers: { 'content-type': 'application/json' }, }); - try { - await rejects(async () => { - await log.log(namespace, podName, containerName, stream); - }, /Error occurred in log request/); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(async () => { + await log.log(namespace, podName, containerName, stream); + }, /Error occurred in log request/); + mockAgent.assertNoPendingInterceptors(); }); it('should handle API exceptions on 500', async () => { @@ -135,7 +131,6 @@ describe('Log', () => { }, }); - const { originalDispatcher, mockAgent, pool } = setupMockAgent(); pool.intercept({ method: 'GET', path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer', @@ -143,15 +138,10 @@ describe('Log', () => { headers: { 'content-type': 'application/json' }, }); - try { - await rejects(async () => { - await log.log(namespace, podName, containerName, stream); - }, /Error occurred in log request/); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(async () => { + await log.log(namespace, podName, containerName, stream); + }, /Error occurred in log request/); + mockAgent.assertNoPendingInterceptors(); }); it('should handle V1Status with code and message', async () => { @@ -178,21 +168,15 @@ describe('Log', () => { code: 404, }; - const { originalDispatcher, mockAgent, pool } = setupMockAgent(); pool.intercept({ method: 'GET', path: '/api/v1/namespaces/default/pods/mypod/log?container=mycontainer', }).reply(500, JSON.stringify(v1Status), { headers: { 'content-type': 'application/json' } }); - try { - await rejects(async () => { - await log.log(namespace, podName, containerName, stream); - }, /Pod not found/); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(async () => { + await log.log(namespace, podName, containerName, stream); + }, /Pod not found/); + mockAgent.assertNoPendingInterceptors(); }); }); describe('AddOptionsToSearchParams', () => { diff --git a/src/metrics_test.ts b/src/metrics_test.ts index b05138307e..4f4e3529ab 100644 --- a/src/metrics_test.ts +++ b/src/metrics_test.ts @@ -1,6 +1,6 @@ -import { describe, it } from 'node:test'; +import { beforeEach, afterEach, describe, it } from 'node:test'; import { deepStrictEqual, ok, match, rejects, strictEqual } from 'node:assert'; -import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher, type Dispatcher } from 'undici'; import { KubeConfig } from './config.js'; import { V1Status, ApiException } from './gen/index.js'; import { Metrics, NodeMetricsList, PodMetricsList } from './metrics.js'; @@ -81,59 +81,58 @@ const testConfigOptions: any = { currentContext: 'currentContext', }; -const systemUnderTest = (options: any = testConfigOptions): [Metrics, MockAgent] => { +const systemUnderTest = (options: any = testConfigOptions, mockAgent: MockAgent): Metrics => { const kc = new KubeConfig(); kc.loadFromOptions(options); const metricsClient = new Metrics(kc); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); - - return [metricsClient, mockAgent]; + return metricsClient; }; describe('Metrics', () => { + const realDispatcher = getGlobalDispatcher(); describe('getPodMetrics', () => { + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + it('should return cluster scope empty pods list', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, JSON.stringify(emptyPodMetrics), { headers: { 'content-type': 'application/json' } }, ); - try { - const response = await metricsClient.getPodMetrics(); - deepStrictEqual(response, emptyPodMetrics); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const response = await metricsClient.getPodMetrics(); + deepStrictEqual(response, emptyPodMetrics); + mockAgent.assertNoPendingInterceptors(); }); it('should return cluster scope empty pods list when namespace is empty string', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, JSON.stringify(emptyPodMetrics), { headers: { 'content-type': 'application/json' } }, ); - try { - const response = await metricsClient.getPodMetrics(''); - deepStrictEqual(response, emptyPodMetrics); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const response = await metricsClient.getPodMetrics(''); + deepStrictEqual(response, emptyPodMetrics); + mockAgent.assertNoPendingInterceptors(); }); it('should return namespace scope empty pods list', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: `/apis/metrics.k8s.io/v1beta1/namespaces/${TEST_NAMESPACE}/pods`, @@ -141,36 +140,24 @@ describe('Metrics', () => { }).reply(200, JSON.stringify(emptyPodMetrics), { headers: { 'content-type': 'application/json' }, }); - try { - const response = await metricsClient.getPodMetrics(TEST_NAMESPACE); - deepStrictEqual(response, emptyPodMetrics); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const response = await metricsClient.getPodMetrics(TEST_NAMESPACE); + deepStrictEqual(response, emptyPodMetrics); + mockAgent.assertNoPendingInterceptors(); }); it('should return cluster scope pods metrics list', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, JSON.stringify(mockedPodMetrics), { headers: { 'content-type': 'application/json' } }, ); - try { - const response = await metricsClient.getPodMetrics(); - deepStrictEqual(response, mockedPodMetrics); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const response = await metricsClient.getPodMetrics(); + deepStrictEqual(response, mockedPodMetrics); + mockAgent.assertNoPendingInterceptors(); }); it('should return namespace scope pods metric list', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: `/apis/metrics.k8s.io/v1beta1/namespaces/${TEST_NAMESPACE}/pods`, @@ -178,160 +165,143 @@ describe('Metrics', () => { }).reply(200, JSON.stringify(mockedPodMetrics), { headers: { 'content-type': 'application/json' }, }); - try { - const response = await metricsClient.getPodMetrics(TEST_NAMESPACE); - deepStrictEqual(response, mockedPodMetrics); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const response = await metricsClient.getPodMetrics(TEST_NAMESPACE); + deepStrictEqual(response, mockedPodMetrics); + mockAgent.assertNoPendingInterceptors(); }); it('should when connection refused', async () => { - const kc = new KubeConfig(); - kc.loadFromOptions({ - clusters: [{ name: 'cluster', server: 'https://127.0.0.1:51011' }], - users: [{ name: 'user', password: 'password' }], - contexts: [{ name: 'currentContext', cluster: 'cluster', user: 'user' }], - currentContext: 'currentContext', - }); - const metricsClient = new Metrics(kc); - await rejects(metricsClient.getPodMetrics(), (err) => { - ok(err instanceof ApiException); - match(err.message, /connect ECONNREFUSED 127.0.0.1:51011/); - return true; - }); - }); - it('should throw when no current cluster', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest({ - clusters: [{ name: 'cluster', server: 'https://127.0.0.1:51010' }], - users: [{ name: 'user', password: 'password' }], - contexts: [{ name: 'currentContext', cluster: 'cluster', user: 'user' }], - }); + const originalMockDispatcher = getGlobalDispatcher(); try { - await rejects(metricsClient.getPodMetrics(), { - name: 'Error', - message: 'No currently active cluster', + setGlobalDispatcher(realDispatcher); + const kc = new KubeConfig(); + kc.loadFromOptions({ + clusters: [{ name: 'cluster', server: 'https://127.0.0.1:51011' }], + users: [{ name: 'user', password: 'password' }], + contexts: [{ name: 'currentContext', cluster: 'cluster', user: 'user' }], + currentContext: 'currentContext', + }); + const metricsClient = new Metrics(kc); + await rejects(metricsClient.getPodMetrics(), (err) => { + ok(err instanceof ApiException); + match(err.message, /connect ECONNREFUSED 127.0.0.1:51011/); + return true; }); - mockAgent.assertNoPendingInterceptors(); } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + setGlobalDispatcher(originalMockDispatcher); } }); + it('should throw when no current cluster', async () => { + const metricsClient = systemUnderTest( + { + clusters: [{ name: 'cluster', server: 'https://127.0.0.1:51010' }], + users: [{ name: 'user', password: 'password' }], + contexts: [{ name: 'currentContext', cluster: 'cluster', user: 'user' }], + }, + mockAgent, + ); + await rejects(metricsClient.getPodMetrics(), { + name: 'Error', + message: 'No currently active cluster', + }); + mockAgent.assertNoPendingInterceptors(); + }); it('should resolve to error when 500 - V1 Status', async () => { const response: V1Status = { code: 12345, message: 'some message', }; - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 500, JSON.stringify(response), { headers: { 'content-type': 'application/json' } }, ); - try { - await rejects(metricsClient.getPodMetrics(), (e) => { - ok(e instanceof ApiException); - strictEqual(e.code, response.code); - strictEqual(e.body.message, response.message); - return true; - }); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(metricsClient.getPodMetrics(), (e) => { + ok(e instanceof ApiException); + strictEqual(e.code, response.code); + strictEqual(e.body.message, response.message); + return true; + }); + mockAgent.assertNoPendingInterceptors(); }); it('should resolve to error when 500 - non-V1Status', async () => { const response = 'some other response'; - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 500, JSON.stringify(response), { headers: { 'content-type': 'application/json' } }, ); - try { - await rejects(metricsClient.getPodMetrics(), (e) => { - ok(e instanceof ApiException); - strictEqual(e.code, 500); - match(e.message, /Error occurred in metrics request/); - return true; - }); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(metricsClient.getPodMetrics(), (e) => { + ok(e instanceof ApiException); + strictEqual(e.code, 500); + match(e.message, /Error occurred in metrics request/); + return true; + }); + mockAgent.assertNoPendingInterceptors(); }); }); describe('getNodeMetrics', () => { + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + it('should return empty nodes list', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/nodes', method: 'GET' }).reply( 200, JSON.stringify(emptyNodeMetrics), { headers: { 'content-type': 'application/json' } }, ); - try { - const response = await metricsClient.getNodeMetrics(); - deepStrictEqual(response, emptyNodeMetrics); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const response = await metricsClient.getNodeMetrics(); + deepStrictEqual(response, emptyNodeMetrics); + mockAgent.assertNoPendingInterceptors(); }); it('should return nodes metrics list', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/nodes', method: 'GET' }).reply( 200, JSON.stringify(mockedNodeMetrics), { headers: { 'content-type': 'application/json' } }, ); - try { - const response = await metricsClient.getNodeMetrics(); - deepStrictEqual(response, mockedNodeMetrics); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + const response = await metricsClient.getNodeMetrics(); + deepStrictEqual(response, mockedNodeMetrics); + mockAgent.assertNoPendingInterceptors(); }); it('should resolve to error when 500', async () => { const response: V1Status = { code: 12345, message: 'some message', }; - const originalDispatcher = getGlobalDispatcher(); - const [metricsClient, mockAgent] = systemUnderTest(); + const metricsClient = systemUnderTest(testConfigOptions, mockAgent); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/nodes', method: 'GET' }).reply( 500, JSON.stringify(response), { headers: { 'content-type': 'application/json' } }, ); - try { - await rejects(metricsClient.getNodeMetrics(), (e) => { - ok(e instanceof ApiException); - strictEqual(e.code, response.code); - strictEqual(e.body.message, response.message); - return true; - }); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await rejects(metricsClient.getNodeMetrics(), (e) => { + ok(e instanceof ApiException); + strictEqual(e.code, response.code); + strictEqual(e.body.message, response.message); + return true; + }); + mockAgent.assertNoPendingInterceptors(); }); }); }); diff --git a/src/object_test.ts b/src/object_test.ts index c34d1f8c0f..927e6ad965 100644 --- a/src/object_test.ts +++ b/src/object_test.ts @@ -1,6 +1,6 @@ -import { before, describe, it } from 'node:test'; +import { afterEach, beforeEach, before, describe, it } from 'node:test'; import { deepStrictEqual, ok, rejects, strictEqual } from 'node:assert'; -import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher, type Dispatcher } from 'undici'; import { Configuration, V1APIResource, V1APIResourceList, V1Secret } from './api.js'; import { KubeConfig } from './config.js'; import { KubernetesObjectApi } from './object.js'; @@ -475,6 +475,21 @@ describe('KubernetesObject', () => { }; describe('specUriPath', () => { + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + it('should return a namespaced path', async () => { const c = KubernetesObjectApiTest.makeApiClient(); const o = { @@ -485,22 +500,13 @@ describe('KubernetesObject', () => { namespace: 'fugazi', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'patch'); - strictEqual(r, '/api/v1/namespaces/fugazi/services/repeater'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'patch'); + strictEqual(r, '/api/v1/namespaces/fugazi/services/repeater'); + mockAgent.assertNoPendingInterceptors(); }); it('should default to apiVersion v1', async () => { @@ -512,22 +518,13 @@ describe('KubernetesObject', () => { namespace: 'fugazi', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'patch'); - strictEqual(r, '/api/v1/namespaces/fugazi/serviceaccounts/repeater'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'patch'); + strictEqual(r, '/api/v1/namespaces/fugazi/serviceaccounts/repeater'); + mockAgent.assertNoPendingInterceptors(); }); it('should default to context namespace', async () => { @@ -546,22 +543,13 @@ describe('KubernetesObject', () => { name: 'repeater', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'patch'); - strictEqual(r, '/api/v1/namespaces/straight-edge/pods/repeater'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'patch'); + strictEqual(r, '/api/v1/namespaces/straight-edge/pods/repeater'); + mockAgent.assertNoPendingInterceptors(); }); it('should default to default namespace', async () => { @@ -580,22 +568,13 @@ describe('KubernetesObject', () => { name: 'repeater', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'patch'); - strictEqual(r, '/api/v1/namespaces/default/pods/repeater'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'patch'); + strictEqual(r, '/api/v1/namespaces/default/pods/repeater'); + mockAgent.assertNoPendingInterceptors(); }); it('should return a non-namespaced path', async () => { @@ -607,22 +586,13 @@ describe('KubernetesObject', () => { name: 'repeater', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'delete'); - strictEqual(r, '/api/v1/namespaces/repeater'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'delete'); + strictEqual(r, '/api/v1/namespaces/repeater'); + mockAgent.assertNoPendingInterceptors(); }); it('should return a namespaced path without name', async () => { @@ -634,22 +604,13 @@ describe('KubernetesObject', () => { namespace: 'fugazi', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'create'); - strictEqual(r, '/api/v1/namespaces/fugazi/services'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'create'); + strictEqual(r, '/api/v1/namespaces/fugazi/services'); + mockAgent.assertNoPendingInterceptors(); }); it('should return a non-namespaced path without name', async () => { @@ -661,22 +622,13 @@ describe('KubernetesObject', () => { name: 'repeater', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'create'); - strictEqual(r, '/api/v1/namespaces'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'create'); + strictEqual(r, '/api/v1/namespaces'); + mockAgent.assertNoPendingInterceptors(); }); it('should return a namespaced path for non-core resource', async () => { @@ -689,22 +641,13 @@ describe('KubernetesObject', () => { namespace: 'fugazi', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/apis/apps/v1', method: 'GET' }).reply(200, resourceBodies.apps, { - headers: { 'content-type': 'application/json' }, - }); - const r = await c.specUriPath(o, 'read'); - strictEqual(r, '/apis/apps/v1/namespaces/fugazi/deployments/repeater'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/apis/apps/v1', method: 'GET' }).reply(200, resourceBodies.apps, { + headers: { 'content-type': 'application/json' }, + }); + const r = await c.specUriPath(o, 'read'); + strictEqual(r, '/apis/apps/v1/namespaces/fugazi/deployments/repeater'); + mockAgent.assertNoPendingInterceptors(); }); it('should return a non-namespaced path for non-core resource', async () => { @@ -716,24 +659,15 @@ describe('KubernetesObject', () => { name: 'repeater', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/apis/rbac.authorization.k8s.io/v1', method: 'GET' }).reply( - 200, - resourceBodies.rbac, - { headers: { 'content-type': 'application/json' } }, - ); - const r = await c.specUriPath(o, 'read'); - strictEqual(r, '/apis/rbac.authorization.k8s.io/v1/clusterroles/repeater'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/apis/rbac.authorization.k8s.io/v1', method: 'GET' }).reply( + 200, + resourceBodies.rbac, + { headers: { 'content-type': 'application/json' } }, + ); + const r = await c.specUriPath(o, 'read'); + strictEqual(r, '/apis/rbac.authorization.k8s.io/v1/clusterroles/repeater'); + mockAgent.assertNoPendingInterceptors(); }); it('should handle a variety of resources', async () => { @@ -835,38 +769,29 @@ describe('KubernetesObject', () => { e: '/apis/storage.k8s.io/v1/storageclasses/repeater', }, ]; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - for (const k of a) { - pool.intercept({ path: k.p, method: 'GET' }).reply(200, k.b, { - headers: { 'content-type': 'application/json' }, - }); - } - for (const k of a) { - const c = KubernetesObjectApiTest.makeApiClient(); - const o: KubernetesObject = { - apiVersion: k.apiVersion, - kind: k.kind, - metadata: { - name: 'repeater', - }, - }; - if (k.ns) { - o.metadata = o.metadata || {}; - o.metadata.namespace = 'fugazi'; - } - const r = await c.specUriPath(o, 'patch'); - strictEqual(r, k.e); + for (const k of a) { + pool.intercept({ path: k.p, method: 'GET' }).reply(200, k.b, { + headers: { 'content-type': 'application/json' }, + }); + } + for (const k of a) { + const c = KubernetesObjectApiTest.makeApiClient(); + const o: KubernetesObject = { + apiVersion: k.apiVersion, + kind: k.kind, + metadata: { + name: 'repeater', + }, + }; + if (k.ns) { + o.metadata = o.metadata || {}; + o.metadata.namespace = 'fugazi'; } - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + const r = await c.specUriPath(o, 'patch'); + strictEqual(r, k.e); } + mockAgent.assertNoPendingInterceptors(); }); it('should handle a variety of resources without names', async () => { @@ -968,34 +893,25 @@ describe('KubernetesObject', () => { e: '/apis/storage.k8s.io/v1/storageclasses', }, ]; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - for (const k of a) { - pool.intercept({ path: k.p, method: 'GET' }).reply(200, k.b, { - headers: { 'content-type': 'application/json' }, - }); - } - for (const k of a) { - const c = KubernetesObjectApiTest.makeApiClient(); - const o: KubernetesObject = { - apiVersion: k.apiVersion, - kind: k.kind, - }; - if (k.ns) { - o.metadata = { namespace: 'fugazi' }; - } - const r = await c.specUriPath(o, 'create'); - strictEqual(r, k.e); + for (const k of a) { + pool.intercept({ path: k.p, method: 'GET' }).reply(200, k.b, { + headers: { 'content-type': 'application/json' }, + }); + } + for (const k of a) { + const c = KubernetesObjectApiTest.makeApiClient(); + const o: KubernetesObject = { + apiVersion: k.apiVersion, + kind: k.kind, + }; + if (k.ns) { + o.metadata = { namespace: 'fugazi' }; } - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + const r = await c.specUriPath(o, 'create'); + strictEqual(r, k.e); } + mockAgent.assertNoPendingInterceptors(); }); it('should throw an error if kind missing', async () => { @@ -1022,24 +938,15 @@ describe('KubernetesObject', () => { namespace: 'fugazi', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - await rejects(c.specUriPath(o, 'read'), { - name: 'Error', - message: 'Required spec property name is not set', - }); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + await rejects(c.specUriPath(o, 'read'), { + name: 'Error', + message: 'Required spec property name is not set', + }); + mockAgent.assertNoPendingInterceptors(); }); it('should throw an error if resource is not valid', async () => { @@ -1052,33 +959,39 @@ describe('KubernetesObject', () => { namespace: 'fugazi', }, }; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - await rejects(c.specUriPath(o, 'create'), { - name: 'Error', - message: 'Unrecognized API version and kind: v1 Ingress', - }); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + await rejects(c.specUriPath(o, 'create'), { + name: 'Error', + message: 'Unrecognized API version and kind: v1 Ingress', + }); + mockAgent.assertNoPendingInterceptors(); }); }); describe('resource', () => { let client: KubernetesObjectApiTest; + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + before(function () { client = KubernetesObjectApiTest.makeApiClient(); }); + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + it('should throw an error if apiVersion not set', async () => { for (const a of [null, undefined]) { await rejects(client.resource(a as unknown as string, 'Service'), { @@ -1113,71 +1026,53 @@ describe('KubernetesObject', () => { }, }); - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - await c.resource('v1', 'Service'); - strictEqual(preMiddlewareCalled, true); - strictEqual(postMiddlewareCalled, true); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + await c.resource('v1', 'Service'); + strictEqual(preMiddlewareCalled, true); + strictEqual(postMiddlewareCalled, true); + mockAgent.assertNoPendingInterceptors(); }); it('should cache API response', async () => { const c = KubernetesObjectApiTest.makeApiClient(); - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const s = await c.resource('v1', 'Service'); - if (!s) { - throw new Error('old TypeScript compiler'); - } - strictEqual(s.kind, 'Service'); - strictEqual(s.name, 'services'); - strictEqual(s.namespaced, true); - ok(c.apiVersionResourceCache); - ok(c.apiVersionResourceCache.v1); - const sa = await c.resource('v1', 'ServiceAccount'); - if (!sa) { - throw new Error('old TypeScript compiler'); - } - strictEqual(sa.kind, 'ServiceAccount'); - strictEqual(sa.name, 'serviceaccounts'); - strictEqual(sa.namespaced, true); - const p = await c.resource('v1', 'Pod'); - if (!p) { - throw new Error('old TypeScript compiler'); - } - strictEqual(p.kind, 'Pod'); - strictEqual(p.name, 'pods'); - strictEqual(p.namespaced, true); - const pv = await c.resource('v1', 'PersistentVolume'); - if (!pv) { - throw new Error('old TypeScript compiler'); - } - strictEqual(pv.kind, 'PersistentVolume'); - strictEqual(pv.name, 'persistentvolumes'); - strictEqual(pv.namespaced, false); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const s = await c.resource('v1', 'Service'); + if (!s) { + throw new Error('old TypeScript compiler'); + } + strictEqual(s.kind, 'Service'); + strictEqual(s.name, 'services'); + strictEqual(s.namespaced, true); + ok(c.apiVersionResourceCache); + ok(c.apiVersionResourceCache.v1); + const sa = await c.resource('v1', 'ServiceAccount'); + if (!sa) { + throw new Error('old TypeScript compiler'); + } + strictEqual(sa.kind, 'ServiceAccount'); + strictEqual(sa.name, 'serviceaccounts'); + strictEqual(sa.namespaced, true); + const p = await c.resource('v1', 'Pod'); + if (!p) { + throw new Error('old TypeScript compiler'); } + strictEqual(p.kind, 'Pod'); + strictEqual(p.name, 'pods'); + strictEqual(p.namespaced, true); + const pv = await c.resource('v1', 'PersistentVolume'); + if (!pv) { + throw new Error('old TypeScript compiler'); + } + strictEqual(pv.kind, 'PersistentVolume'); + strictEqual(pv.name, 'persistentvolumes'); + strictEqual(pv.namespaced, false); + mockAgent.assertNoPendingInterceptors(); }); it('should re-request on cache miss', async () => { @@ -1198,38 +1093,32 @@ describe('KubernetesObject', () => { }, ], } as any; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - const s = await c.resource('v1', 'Service'); - if (!s) { - throw new Error('old TypeScript compiler'); - } - strictEqual(s.kind, 'Service'); - strictEqual(s.name, 'services'); - strictEqual(s.namespaced, true); - ok(c.apiVersionResourceCache); - ok(c.apiVersionResourceCache.v1); - strictEqual( - c.apiVersionResourceCache.v1.resources.length, - JSON.parse(resourceBodies.core).resources.length, - ); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + const s = await c.resource('v1', 'Service'); + if (!s) { + throw new Error('old TypeScript compiler'); } + strictEqual(s.kind, 'Service'); + strictEqual(s.name, 'services'); + strictEqual(s.namespaced, true); + ok(c.apiVersionResourceCache); + ok(c.apiVersionResourceCache.v1); + strictEqual( + c.apiVersionResourceCache.v1.resources.length, + JSON.parse(resourceBodies.core).resources.length, + ); + mockAgent.assertNoPendingInterceptors(); }); }); describe('verbs', () => { let client: KubernetesObjectApi; + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + before(() => { const kc = new KubeConfig(); kc.loadFromOptions(testConfigOptions); @@ -1240,6 +1129,18 @@ describe('KubernetesObject', () => { ); }); + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + it('should modify resources with defaults', async () => { const s = { apiVersion: 'v1', @@ -1388,27 +1289,17 @@ describe('KubernetesObject', () => { }`, }, ]; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - for (const m of methods) { - pool.intercept({ path: m.p, method: m.v }).reply(m.c, m.b, { - headers: { 'content-type': 'application/json' }, - }); - } - for (const m of methods) { - // TODO: Figure out why Typescript barfs if we do m.call - const hack_m = m.m as any; - await hack_m.call(client, s); - } - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + for (const m of methods) { + pool.intercept({ path: m.p, method: m.v }).reply(m.c, m.b, { + headers: { 'content-type': 'application/json' }, + }); + } + for (const m of methods) { + const hack_m = m.m as any; + await hack_m.call(client, s); } + mockAgent.assertNoPendingInterceptors(); }); it('should modify resources with pretty set', async () => { @@ -1559,31 +1450,21 @@ describe('KubernetesObject', () => { }`, }, ]; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - for (const p of ['true', 'false']) { - for (const m of methods) { - pool.intercept({ path: `${m.p}?pretty=${p}`, method: m.v }).reply(m.c, m.b, { - headers: { 'content-type': 'application/json' }, - }); - } + for (const p of ['true', 'false']) { + for (const m of methods) { + pool.intercept({ path: `${m.p}?pretty=${p}`, method: m.v }).reply(m.c, m.b, { + headers: { 'content-type': 'application/json' }, + }); } - for (const p of ['true', 'false']) { - for (const m of methods) { - // TODO: Figure out why Typescript barfs if we do m.call - const hack_m = m.m as any; - await hack_m.call(client, s, p); - } + } + for (const p of ['true', 'false']) { + for (const m of methods) { + const hack_m = m.m as any; + await hack_m.call(client, s, p); } - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); } + mockAgent.assertNoPendingInterceptors(); }); it('should set dryRun', async () => { @@ -1698,27 +1579,17 @@ describe('KubernetesObject', () => { }`, }, ]; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - for (const m of methods) { - pool.intercept({ path: `${m.p}?dryRun=All`, method: m.v }).reply(m.c, m.b, { - headers: { 'content-type': 'application/json' }, - }); - } - for (const m of methods) { - // TODO: Figure out why Typescript barfs if we do m.call - const hack_m = m.m as any; - await hack_m.call(client, s, undefined, 'All'); - } - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + for (const m of methods) { + pool.intercept({ path: `${m.p}?dryRun=All`, method: m.v }).reply(m.c, m.b, { + headers: { 'content-type': 'application/json' }, + }); } + for (const m of methods) { + const hack_m = m.m as any; + await hack_m.call(client, s, undefined, 'All'); + } + mockAgent.assertNoPendingInterceptors(); }); it('should properly serialize resources on modify', async () => { @@ -1803,28 +1674,17 @@ describe('KubernetesObject', () => { b: returnBody, }, ]; - const originalDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - for (const m of methods) { - // Use regex to verify the body contains 'from' (not '_from' - testing TypeScript→JSON serialization) - pool.intercept({ path: m.p, method: m.v, body: /"from":/ }).reply(m.c, m.b, { - headers: { 'content-type': 'application/json' }, - }); - } - for (const m of methods) { - // TODO: Figure out why Typescript barfs if we do m.call - const hack_m = m.m as any; - await hack_m.call(client, netPol); - } - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); + for (const m of methods) { + pool.intercept({ path: m.p, method: m.v, body: /"from":/ }).reply(m.c, m.b, { + headers: { 'content-type': 'application/json' }, + }); + } + for (const m of methods) { + const hack_m = m.m as any; + await hack_m.call(client, netPol); } + mockAgent.assertNoPendingInterceptors(); }); it('should replace a resource', async () => { @@ -1851,18 +1711,13 @@ describe('KubernetesObject', () => { }, }, }; - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ - path: '/api/v1/namespaces/default/services?fieldManager=ManageField', - method: 'POST', - }).reply( - 201, - `{ + pool.intercept({ + path: '/api/v1/namespaces/default/services?fieldManager=ManageField', + method: 'POST', + }).reply( + 201, + `{ "kind": "Service", "apiVersion": "v1", "metadata": { @@ -1895,14 +1750,14 @@ describe('KubernetesObject', () => { "loadBalancer": {} } }`, - { headers: { 'content-type': 'application/json' } }, - ); - pool.intercept({ - path: '/api/v1/namespaces/default/services/k8s-js-client-test?pretty=true', - method: 'PUT', - }).reply( - 200, - `{ + { headers: { 'content-type': 'application/json' } }, + ); + pool.intercept({ + path: '/api/v1/namespaces/default/services/k8s-js-client-test?pretty=true', + method: 'PUT', + }).reply( + 200, + `{ "kind": "Service", "apiVersion": "v1", "metadata": { @@ -1936,14 +1791,14 @@ describe('KubernetesObject', () => { "loadBalancer": {} } }`, - { headers: { 'content-type': 'application/json' } }, - ); - pool.intercept({ - path: '/api/v1/namespaces/default/services/k8s-js-client-test?gracePeriodSeconds=7&propagationPolicy=Foreground', - method: 'DELETE', - }).reply( - 200, - `{ + { headers: { 'content-type': 'application/json' } }, + ); + pool.intercept({ + path: '/api/v1/namespaces/default/services/k8s-js-client-test?gracePeriodSeconds=7&propagationPolicy=Foreground', + method: 'DELETE', + }).reply( + 200, + `{ "apiVersion": "v1", "details": { "kind": "services", @@ -1954,64 +1809,51 @@ describe('KubernetesObject', () => { "metadata": {}, "status": "Success" }`, - { headers: { 'content-type': 'application/json' } }, - ); + { headers: { 'content-type': 'application/json' } }, + ); - const c = await client.create(s, undefined, undefined, 'ManageField'); - (c.metadata.annotations as Record).test = '1'; - const r = await client.replace(c, 'true'); - strictEqual((r.metadata.annotations as Record).test, '1'); - ok( - parseInt((r.metadata as any).resourceVersion, 10) > - parseInt((c.metadata as any).resourceVersion, 10), - ); - await client.delete(s, undefined, undefined, 7, undefined, 'Foreground'); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + const c = await client.create(s, undefined, undefined, 'ManageField'); + (c.metadata.annotations as Record).test = '1'; + const r = await client.replace(c, 'true'); + strictEqual((r.metadata.annotations as Record).test, '1'); + ok( + parseInt((r.metadata as any).resourceVersion, 10) > + parseInt((c.metadata as any).resourceVersion, 10), + ); + await client.delete(s, undefined, undefined, 7, undefined, 'Foreground'); + mockAgent.assertNoPendingInterceptors(); }); it('should read a resource', async () => { - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ - path: '/api/v1/namespaces/default/secrets/test-secret-1', - method: 'GET', - }).reply( - 200, - JSON.stringify({ - apiVersion: 'v1', - kind: 'Secret', - metadata: { - name: 'test-secret-1', - namespace: 'default', - uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', - creationTimestamp: '2022-01-01T00:00:00.000Z', - }, - data: { key: 'value' }, - }), - { headers: { 'content-type': 'application/json' } }, - ); - const secret = await client.read({ + pool.intercept({ + path: '/api/v1/namespaces/default/secrets/test-secret-1', + method: 'GET', + }).reply( + 200, + JSON.stringify({ apiVersion: 'v1', kind: 'Secret', - metadata: { name: 'test-secret-1', namespace: 'default' }, - }); - strictEqual(secret instanceof V1Secret, true); - deepStrictEqual(secret.data, { key: 'value' }); - ok(secret.metadata); - deepStrictEqual(secret.metadata!.creationTimestamp, new Date('2022-01-01T00:00:00.000Z')); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + metadata: { + name: 'test-secret-1', + namespace: 'default', + uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', + creationTimestamp: '2022-01-01T00:00:00.000Z', + }, + data: { key: 'value' }, + }), + { headers: { 'content-type': 'application/json' } }, + ); + const secret = await client.read({ + apiVersion: 'v1', + kind: 'Secret', + metadata: { name: 'test-secret-1', namespace: 'default' }, + }); + strictEqual(secret instanceof V1Secret, true); + deepStrictEqual(secret.data, { key: 'value' }); + ok(secret.metadata); + deepStrictEqual(secret.metadata!.creationTimestamp, new Date('2022-01-01T00:00:00.000Z')); + mockAgent.assertNoPendingInterceptors(); }); it('should read a custom resource', async () => { @@ -2031,137 +1873,125 @@ describe('KubernetesObject', () => { }, ], }; - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ - path: '/apis/example.com/v1/namespaces/default/customtestresources/test-1', - method: 'GET', - }).reply( - 200, - JSON.stringify({ - apiVersion: 'example.com/v1', - kind: 'CustomTestResource', - metadata: { - name: 'test-1', - namespace: 'default', - uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', - creationTimestamp: '2022-01-01T00:00:00.000Z', - }, - spec: { key: 'value' }, - }), - { headers: { 'content-type': 'application/json' } }, - ); - const custom = await client.read({ + pool.intercept({ + path: '/apis/example.com/v1/namespaces/default/customtestresources/test-1', + method: 'GET', + }).reply( + 200, + JSON.stringify({ apiVersion: 'example.com/v1', kind: 'CustomTestResource', - metadata: { name: 'test-1', namespace: 'default' }, - }); - deepStrictEqual(custom.spec, { key: 'value' }); - ok(custom.metadata); - deepStrictEqual(custom.metadata!.creationTimestamp, new Date('2022-01-01T00:00:00.000Z')); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + metadata: { + name: 'test-1', + namespace: 'default', + uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', + creationTimestamp: '2022-01-01T00:00:00.000Z', + }, + spec: { key: 'value' }, + }), + { headers: { 'content-type': 'application/json' } }, + ); + const custom = await client.read({ + apiVersion: 'example.com/v1', + kind: 'CustomTestResource', + metadata: { name: 'test-1', namespace: 'default' }, + }); + deepStrictEqual(custom.spec, { key: 'value' }); + ok(custom.metadata); + deepStrictEqual(custom.metadata!.creationTimestamp, new Date('2022-01-01T00:00:00.000Z')); + mockAgent.assertNoPendingInterceptors(); }); it('should list resources in a namespace', async () => { - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1/namespaces/default/secrets', method: 'GET' }).reply( - 200, - JSON.stringify({ - apiVersion: 'v1', - kind: 'SecretList', - items: [ - { - apiVersion: 'v1', - kind: 'Secret', - metadata: { - name: 'test-secret-1', - uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', - }, + pool.intercept({ path: '/api/v1/namespaces/default/secrets', method: 'GET' }).reply( + 200, + JSON.stringify({ + apiVersion: 'v1', + kind: 'SecretList', + items: [ + { + apiVersion: 'v1', + kind: 'Secret', + metadata: { + name: 'test-secret-1', + uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', }, - ], - metadata: { resourceVersion: '216532459', continue: 'abc' }, - }), - { headers: { 'content-type': 'application/json' } }, - ); - const lr = await client.list('v1', 'Secret', 'default'); - const items = lr.items; - strictEqual(items.length, 1); - strictEqual(items[0] instanceof V1Secret, true); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + }, + ], + metadata: { resourceVersion: '216532459', continue: 'abc' }, + }), + { headers: { 'content-type': 'application/json' } }, + ); + const lr = await client.list('v1', 'Secret', 'default'); + const items = lr.items; + strictEqual(items.length, 1); + strictEqual(items[0] instanceof V1Secret, true); + mockAgent.assertNoPendingInterceptors(); }); it('should list resources in all namespaces', async () => { - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ - path: '/api/v1/secrets?fieldSelector=metadata.name%3Dtest-secret1&labelSelector=app%3Dmy-app&limit=5', - method: 'GET', - }).reply( - 200, - JSON.stringify({ - apiVersion: 'v1', - kind: 'SecretList', - items: [ - { - apiVersion: 'v1', - kind: 'Secret', - metadata: { - name: 'test-secret-1', - uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', - }, + pool.intercept({ + path: '/api/v1/secrets?fieldSelector=metadata.name%3Dtest-secret1&labelSelector=app%3Dmy-app&limit=5', + method: 'GET', + }).reply( + 200, + JSON.stringify({ + apiVersion: 'v1', + kind: 'SecretList', + items: [ + { + apiVersion: 'v1', + kind: 'Secret', + metadata: { + name: 'test-secret-1', + uid: 'a4fd7a65-2af5-4ef1-a0bc-cb34a308b821', }, - ], - metadata: { resourceVersion: '216532459', continue: 'abc' }, - }), - { headers: { 'content-type': 'application/json' } }, - ); - const lr = await client.list( - 'v1', - 'Secret', - undefined, - undefined, - undefined, - undefined, - 'metadata.name=test-secret1', - 'app=my-app', - 5, - ); - const items = lr.items; - strictEqual(items.length, 1); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + }, + ], + metadata: { resourceVersion: '216532459', continue: 'abc' }, + }), + { headers: { 'content-type': 'application/json' } }, + ); + const lr = await client.list( + 'v1', + 'Secret', + undefined, + undefined, + undefined, + undefined, + 'metadata.name=test-secret1', + 'app=my-app', + 5, + ); + const items = lr.items; + strictEqual(items.length, 1); + mockAgent.assertNoPendingInterceptors(); }); describe('errors', () => { - let client: KubernetesObjectApi; + let errorClient: KubernetesObjectApi; + let errorMockAgent: MockAgent; + let errorOriginalDispatcher: Dispatcher; + before(() => { const kc = new KubeConfig(); kc.loadFromOptions(testConfigOptions); - client = KubernetesObjectApi.makeApiClient(kc); + errorClient = KubernetesObjectApi.makeApiClient(kc); + }); + + beforeEach(() => { + errorOriginalDispatcher = getGlobalDispatcher(); + errorMockAgent = new MockAgent(); + setGlobalDispatcher(errorMockAgent); + errorMockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await errorMockAgent.close(); + setGlobalDispatcher(errorOriginalDispatcher); }); it('should throw error if no spec', async () => { @@ -2199,17 +2029,8 @@ describe('KubernetesObject', () => { }, }, }; - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); - try { - await rejects(client.read(s)); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + await rejects(errorClient.read(s)); + errorMockAgent.assertNoPendingInterceptors(); }); it('should throw an error if name not valid', async () => { @@ -2233,18 +2054,13 @@ describe('KubernetesObject', () => { }, }, }; - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); - const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { - headers: { 'content-type': 'application/json' }, - }); - pool.intercept({ path: '/api/v1/namespaces/default/services', method: 'POST' }).reply( - 422, - `{ + const pool = errorMockAgent.get('https://d.i.y'); + pool.intercept({ path: '/api/v1', method: 'GET' }).reply(200, resourceBodies.core, { + headers: { 'content-type': 'application/json' }, + }); + pool.intercept({ path: '/api/v1/namespaces/default/services', method: 'POST' }).reply( + 422, + `{ "kind": "Status", "apiVersion": "v1", "metadata": {}, @@ -2264,18 +2080,14 @@ describe('KubernetesObject', () => { }, "code": 422 }`, - { headers: { 'content-type': 'application/json' } }, - ); + { headers: { 'content-type': 'application/json' } }, + ); - await rejects(client.create(s), { - name: 'Error', - code: 422, - }); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + await rejects(errorClient.create(s), { + name: 'Error', + code: 422, + }); + errorMockAgent.assertNoPendingInterceptors(); }); it('should throw an error if apiVersion not valid', async () => { @@ -2312,25 +2124,16 @@ describe('KubernetesObject', () => { }, }, }; - const origDispatcher = getGlobalDispatcher(); - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); - const pool = mockAgent.get('https://d.i.y'); - try { - pool.intercept({ path: '/apis/applications/v1', method: 'GET' }).reply(404, '{}', { - headers: { 'content-type': 'application/json' }, - }); - await rejects(client.create(d), { - name: 'Error', - code: 404, - message: /Failed to fetch resource metadata for applications\/v1\/Deployment/, - }); - } finally { - mockAgent.assertNoPendingInterceptors(); - await mockAgent.close(); - setGlobalDispatcher(origDispatcher); - } + const pool = errorMockAgent.get('https://d.i.y'); + pool.intercept({ path: '/apis/applications/v1', method: 'GET' }).reply(404, '{}', { + headers: { 'content-type': 'application/json' }, + }); + await rejects(errorClient.create(d), { + name: 'Error', + code: 404, + message: /Failed to fetch resource metadata for applications\/v1\/Deployment/, + }); + errorMockAgent.assertNoPendingInterceptors(); }); it('should throw error if no apiVersion', async () => { diff --git a/src/top_test.ts b/src/top_test.ts index 70024f2f93..3ed23535ae 100644 --- a/src/top_test.ts +++ b/src/top_test.ts @@ -1,6 +1,6 @@ -import { describe, it } from 'node:test'; +import { afterEach, beforeEach, describe, it } from 'node:test'; import { deepEqual, deepStrictEqual, strictEqual } from 'assert'; -import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher, type Dispatcher } from 'undici'; import { KubeConfig } from './config.js'; import { Metrics, PodMetricsList } from './metrics.js'; import { CurrentResourceUsage, ResourceUsage, topNodes, topPods } from './top.js'; @@ -173,33 +173,31 @@ const testConfigOptions: any = { currentContext: 'currentContext', }; -const systemUnderTest = ( - namespace?: string, - options: any = testConfigOptions, -): [() => ReturnType, () => ReturnType, MockAgent] => { - const kc = new KubeConfig(); - kc.loadFromOptions(options); - const metricsClient = new Metrics(kc); - const core = kc.makeApiClient(CoreV1Api); - const topPodsFunc = () => topPods(core, metricsClient, namespace); - const topNodesFunc = () => topNodes(core); +describe('Top', () => { + describe('topPods', () => { + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + let topPodsFunc: () => ReturnType; - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); - return [topPodsFunc, topNodesFunc, mockAgent]; -}; + const kc = new KubeConfig(); + kc.loadFromOptions(testConfigOptions); + const metricsClient = new Metrics(kc); + const core = kc.makeApiClient(CoreV1Api); + topPodsFunc = () => topPods(core, metricsClient); + }); -describe('Top', () => { - describe('topPods', () => { - it('should return empty when no pods', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [topPodsFunc, _, mockAgent] = systemUnderTest(); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + + it('should return empty when no pods', async () => { const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, @@ -215,13 +213,12 @@ describe('Top', () => { deepStrictEqual(result, []); mockAgent.assertNoPendingInterceptors(); }); - it('should return use cluster scope when namespace empty string', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [topPodsFunc, _, mockAgent] = systemUnderTest(''); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + it('should return use cluster scope when namespace empty string', async () => { + const kc = new KubeConfig(); + kc.loadFromOptions(testConfigOptions); + const metricsClient = new Metrics(kc); + const core = kc.makeApiClient(CoreV1Api); + const topPodsFuncEmpty = () => topPods(core, metricsClient, ''); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, @@ -233,17 +230,11 @@ describe('Top', () => { JSON.stringify({ items: [] }), { headers: { 'content-type': 'application/json' } }, ); - const result = await topPodsFunc(); + const result = await topPodsFuncEmpty(); deepStrictEqual(result, []); mockAgent.assertNoPendingInterceptors(); }); - it('should return cluster wide pod metrics', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [topPodsFunc, _, mockAgent] = systemUnderTest(); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + it('should return cluster wide pod metrics', async () => { const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, @@ -312,13 +303,7 @@ describe('Top', () => { ]); mockAgent.assertNoPendingInterceptors(); }); - it('should return best effort pod metrics', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [topPodsFunc, _, mockAgent] = systemUnderTest(); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + it('should return best effort pod metrics', async () => { const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, @@ -351,13 +336,7 @@ describe('Top', () => { ]); mockAgent.assertNoPendingInterceptors(); }); - it('should return 0 when pod metrics missing', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [topPodsFunc, _, mockAgent] = systemUnderTest(); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + it('should return 0 when pod metrics missing', async () => { const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, @@ -385,13 +364,7 @@ describe('Top', () => { deepStrictEqual(result[1].Containers, []); mockAgent.assertNoPendingInterceptors(); }); - it('should return empty array when pods missing', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [topPodsFunc, _, mockAgent] = systemUnderTest(); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + it('should return empty array when pods missing', async () => { const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/apis/metrics.k8s.io/v1beta1/pods', method: 'GET' }).reply( 200, @@ -407,13 +380,12 @@ describe('Top', () => { strictEqual(result.length, 0); mockAgent.assertNoPendingInterceptors(); }); - it('should return namespace pod metrics', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [topPodsFunc, _, mockAgent] = systemUnderTest(TEST_NAMESPACE); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + it('should return namespace pod metrics', async () => { + const kc = new KubeConfig(); + kc.loadFromOptions(testConfigOptions); + const metricsClient = new Metrics(kc); + const core = kc.makeApiClient(CoreV1Api); + const topPodsFuncNamespace = () => topPods(core, metricsClient, TEST_NAMESPACE); const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: `/apis/metrics.k8s.io/v1beta1/namespaces/${TEST_NAMESPACE}/pods`, @@ -426,7 +398,7 @@ describe('Top', () => { JSON.stringify({ items: podList }), { headers: { 'content-type': 'application/json' } }, ); - const result = await topPodsFunc(); + const result = await topPodsFuncNamespace(); strictEqual(result.length, 2); deepStrictEqual(result[0].CPU, new CurrentResourceUsage(0.05, 0.1, 0.1)); deepStrictEqual( @@ -485,13 +457,28 @@ describe('Top', () => { }); }); describe('topNodes', () => { - it('should return empty when no nodes', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [_, topNodesFunc, mockAgent] = systemUnderTest(); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + let topNodesFunc: () => ReturnType; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + + const kc = new KubeConfig(); + kc.loadFromOptions(testConfigOptions); + const core = kc.makeApiClient(CoreV1Api); + topNodesFunc = () => topNodes(core); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + + it('should return empty when no nodes', async () => { const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/api/v1/nodes', method: 'GET' }).reply( 200, @@ -503,13 +490,7 @@ describe('Top', () => { mockAgent.assertNoPendingInterceptors(); }); - it('should return cluster wide node metrics', async (t) => { - const originalDispatcher = getGlobalDispatcher(); - const [_, topNodesFunc, mockAgent] = systemUnderTest(); - t.after(async () => { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - }); + it('should return cluster wide node metrics', async () => { const pool = mockAgent.get(testConfigOptions.clusters[0].server); pool.intercept({ path: '/api/v1/pods', method: 'GET' }) .reply(200, JSON.stringify({ items: podList }), { diff --git a/src/watch_test.ts b/src/watch_test.ts index 1106bd4a36..4aad61a05d 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -1,6 +1,6 @@ -import { describe, it } from 'node:test'; +import { afterEach, beforeEach, describe, it } from 'node:test'; import { deepStrictEqual, rejects, strictEqual } from 'node:assert'; -import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher, type Dispatcher } from 'undici'; import { KubeConfig } from './config.js'; import { Cluster, Context, User } from './config_types.js'; import { Watch } from './watch.js'; @@ -33,22 +33,28 @@ const fakeConfig: { ], }; -const systemUnderTest = (): [MockAgent] => { - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - mockAgent.disableNetConnect(); - return [mockAgent]; -}; - describe('Watch', () => { + let mockAgent: MockAgent; + let originalDispatcher: Dispatcher; + + beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + mockAgent.disableNetConnect(); + }); + + afterEach(async () => { + await mockAgent.close(); + setGlobalDispatcher(originalDispatcher); + }); + it('should construct correctly', () => { const kc = new KubeConfig(); new Watch(kc); }); it('should handle error from request stream', async () => { - const originalDispatcher = getGlobalDispatcher(); - const [mockAgent] = systemUnderTest(); const kc = new KubeConfig(); const path = '/some/path/to/object?watch=true'; const pool = mockAgent.get(fakeConfig.clusters[0].server); @@ -60,23 +66,18 @@ describe('Watch', () => { let doneCalled = false; let doneErr: any; - try { - await watch.watch( - path, - {}, - (_phase: string, _obj: string) => {}, - (err: any) => { - doneCalled = true; - doneErr = err; - }, - ); - strictEqual(doneCalled, true); - strictEqual(doneErr.toString(), 'Error: Internal Server Error'); - mockAgent.assertNoPendingInterceptors(); - } finally { - await mockAgent.close(); - setGlobalDispatcher(originalDispatcher); - } + await watch.watch( + path, + {}, + (_phase: string, _obj: string) => {}, + (err: any) => { + doneCalled = true; + doneErr = err; + }, + ); + strictEqual(doneCalled, true); + strictEqual(doneErr.toString(), 'Error: Internal Server Error'); + mockAgent.assertNoPendingInterceptors(); }); it('should not call watch done callback more than once', async (t) => { From d74afab1d5b041f83d1612db40447a58ed80f182 Mon Sep 17 00:00:00 2001 From: David Gamero Date: Thu, 26 Mar 2026 17:12:28 -0400 Subject: [PATCH 4/5] test: rename misnamed integration_test, add watch streaming integration test --- ...ntegration_test.ts => fullrequest_test.ts} | 0 src/test/integration/index.ts | 2 + src/test/integration/watchPods.ts | 92 +++++++++++++++++++ 3 files changed, 94 insertions(+) rename src/{integration_test.ts => fullrequest_test.ts} (100%) create mode 100644 src/test/integration/watchPods.ts diff --git a/src/integration_test.ts b/src/fullrequest_test.ts similarity index 100% rename from src/integration_test.ts rename to src/fullrequest_test.ts diff --git a/src/test/integration/index.ts b/src/test/integration/index.ts index 176f01bc3b..d3979a39fc 100644 --- a/src/test/integration/index.ts +++ b/src/test/integration/index.ts @@ -1,9 +1,11 @@ import patchNamespace from './patchNamespace.js'; import cpFromPod from './cpFromPod.js'; import portForwardIntegration from './portForward.js'; +import watchPods from './watchPods.js'; console.log('Integration testing'); await patchNamespace(); await cpFromPod(); await portForwardIntegration(); +await watchPods(); diff --git a/src/test/integration/watchPods.ts b/src/test/integration/watchPods.ts new file mode 100644 index 0000000000..6aaa18c8ee --- /dev/null +++ b/src/test/integration/watchPods.ts @@ -0,0 +1,92 @@ +import assert from 'node:assert'; +import { CoreV1Api, KubeConfig, V1Pod } from '../../index.js'; +import { Watch } from '../../watch.js'; +import { generateName } from './name.js'; + +export default async function watchPods() { + const kc = new KubeConfig(); + kc.loadFromDefault(); + + const coreV1Client = kc.makeApiClient(CoreV1Api); + const watch = new Watch(kc); + + const namespace = 'default'; + const labelKey = 'watch-test'; + const labelValue = generateName('run'); + + console.log(`\n=== Watch Integration Test (label=${labelValue}) ===`); + + const receivedEvents: { type: string; name: string }[] = []; + + let addResolve: () => void; + const addPromise = new Promise((resolve) => { + addResolve = resolve; + }); + + let deleteResolve: () => void; + const deletePromise = new Promise((resolve) => { + deleteResolve = resolve; + }); + + const controller = await watch.watch( + `/api/v1/namespaces/${namespace}/pods`, + { labelSelector: `${labelKey}=${labelValue}` }, + (phase: string, obj: V1Pod) => { + const name = obj.metadata?.name ?? 'unknown'; + console.log(`Watch event: ${phase} ${name}`); + receivedEvents.push({ type: phase, name }); + + if (phase === 'ADDED') addResolve(); + if (phase === 'DELETED') deleteResolve(); + }, + (err: any) => { + if (err && err.name !== 'AbortError') console.log('Watch done with error:', err); + }, + ); + + const podName = generateName('watch-test-pod'); + try { + console.log(`Creating pod ${podName}`); + const pod = new V1Pod(); + pod.metadata = { name: podName, labels: { [labelKey]: labelValue } }; + pod.spec = { + containers: [{ name: 'test', image: 'busybox', command: ['sleep', '3600'] }], + restartPolicy: 'Never', + }; + await coreV1Client.createNamespacedPod({ namespace, body: pod }); + + await Promise.race([ + addPromise, + new Promise((_, reject) => + setTimeout(() => reject(new Error('Timed out waiting for ADDED event')), 15000), + ), + ]); + + const addEvent = receivedEvents.find((e) => e.type === 'ADDED' && e.name === podName); + assert.ok(addEvent, 'Should have received ADDED event for pod'); + console.log('✓ Received ADDED event'); + + console.log(`Deleting pod ${podName}`); + await coreV1Client.deleteNamespacedPod({ name: podName, namespace }); + + await Promise.race([ + deletePromise, + new Promise((_, reject) => + setTimeout(() => reject(new Error('Timed out waiting for DELETED event')), 15000), + ), + ]); + + const deleteEvent = receivedEvents.find((e) => e.type === 'DELETED' && e.name === podName); + assert.ok(deleteEvent, 'Should have received DELETED event for pod'); + console.log('✓ Received DELETED event'); + } finally { + controller.abort(); + try { + await coreV1Client.deleteNamespacedPod({ name: podName, namespace }); + } catch { + // already deleted or never created + } + } + + console.log('Watch integration test passed!'); +} From 7a25b4f9b9a286fa0ad9ce93b183ab7f44cfa3c0 Mon Sep 17 00:00:00 2001 From: David Gamero Date: Fri, 27 Mar 2026 05:33:10 -0400 Subject: [PATCH 5/5] fix(informer): treat TimeoutError as reconnectable, normalize timeout errors, add exponential backoff + integration test --- src/cache.ts | 22 ++++ src/cache_test.ts | 127 +++++++++++++++++++++ src/test/integration/index.ts | 2 + src/test/integration/informerReconnect.ts | 129 ++++++++++++++++++++++ src/watch.ts | 6 +- 5 files changed, 285 insertions(+), 1 deletion(-) create mode 100644 src/test/integration/informerReconnect.ts diff --git a/src/cache.ts b/src/cache.ts index b8b1e89818..6d16114307 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -24,12 +24,19 @@ export interface ObjectCache { export type CacheMap = Map>; export class ListWatch implements ObjectCache, Informer { + private static readonly BASE_RECONNECT_DELAY_MS = 1000; + private static readonly MAX_RECONNECT_DELAY_MS = 30000; + private objects: CacheMap = new Map(); private resourceVersion: string; private readonly indexCache: { [key: string]: T[] } = {}; private readonly callbackCache: { [key: string]: (ObjectCallback | ErrorCallback)[] } = {}; private request: AbortController | undefined; private stopped: boolean = false; + private reconnectDelayMs: number = 0; + private hasConnected: boolean = false; + private delayFn: (ms: number) => Promise = (ms) => + new Promise((resolve) => setTimeout(resolve, ms)); private readonly path: string; private readonly watch: Watch; private readonly listFn: ListPromise; @@ -63,6 +70,8 @@ export class ListWatch implements ObjectCache, In public async start(): Promise { this.stopped = false; + this.reconnectDelayMs = 0; + this.hasConnected = false; await this.doneHandler(null); } @@ -151,6 +160,8 @@ export class ListWatch implements ObjectCache, In ((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410) ) { this.resourceVersion = ''; + } else if (err && (err as { name?: string }).name === 'TimeoutError') { + // Watch client-side timeout — reconnect from last known resourceVersion } else if (err) { this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err)); return; @@ -186,6 +197,16 @@ export class ListWatch implements ObjectCache, In if (this.fieldSelector !== undefined) { queryParams.fieldSelector = ObjectSerializer.serialize(this.fieldSelector, 'string'); } + if (this.reconnectDelayMs > 0 && this.hasConnected) { + await this.delayFn(this.reconnectDelayMs); + } + if (this.hasConnected) { + this.reconnectDelayMs = Math.min( + this.reconnectDelayMs > 0 ? this.reconnectDelayMs * 2 : ListWatch.BASE_RECONNECT_DELAY_MS, + ListWatch.MAX_RECONNECT_DELAY_MS, + ); + } + this.hasConnected = true; this.request = await this.watch.watch( this.path, queryParams, @@ -236,6 +257,7 @@ export class ListWatch implements ObjectCache, In // nothing to do, here for documentation, mostly. break; } + this.reconnectDelayMs = 0; this.resourceVersion = obj.metadata ? obj.metadata!.resourceVersion || '' : ''; } } diff --git a/src/cache_test.ts b/src/cache_test.ts index 9e25234b96..0e1dc8040e 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -1525,6 +1525,133 @@ describe('ListWatchCache', () => { mockAgent.assertNoPendingInterceptors(); }); + + it('should apply exponential backoff on repeated reconnects', async () => { + const fakeWatch = mock.mock(Watch); + const listObj = { + metadata: { resourceVersion: '12345' } as V1ListMeta, + items: [] as V1Namespace[], + } as V1NamespaceList; + + const listFn: ListPromise = () => Promise.resolve(listObj); + + let watchCalls = 0; + const delayValues: number[] = []; + const promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + watchCalls++; + resolve(new AbortController()); + return Promise.resolve(new AbortController()); + }); + }); + + const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); + (cache as any).delayFn = (ms: number) => { + delayValues.push(ms); + return Promise.resolve(); + }; + await promise; + strictEqual(watchCalls, 1); + + const [, , , doneHandler] = mock.capture(fakeWatch.watch).last(); + + await doneHandler(null); + strictEqual(watchCalls, 2); + deepStrictEqual(delayValues, []); + + await doneHandler(null); + strictEqual(watchCalls, 3); + deepStrictEqual(delayValues, [1000]); + + await doneHandler(null); + strictEqual(watchCalls, 4); + deepStrictEqual(delayValues, [1000, 2000]); + + await doneHandler(null); + strictEqual(watchCalls, 5); + deepStrictEqual(delayValues, [1000, 2000, 4000]); + }); + + it('should reset backoff after receiving a watch event', async () => { + const fakeWatch = mock.mock(Watch); + const listObj = { + metadata: { resourceVersion: '12345' } as V1ListMeta, + items: [] as V1Namespace[], + } as V1NamespaceList; + + const listFn: ListPromise = () => Promise.resolve(listObj); + + const delayValues: number[] = []; + const promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + resolve(new AbortController()); + return Promise.resolve(new AbortController()); + }); + }); + + const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); + (cache as any).delayFn = (ms: number) => { + delayValues.push(ms); + return Promise.resolve(); + }; + await promise; + + const [, , watchHandler, doneHandler] = mock.capture(fakeWatch.watch).last(); + + await doneHandler(null); + await doneHandler(null); + deepStrictEqual(delayValues, [1000]); + + watchHandler('ADDED', { + metadata: { name: 'reset', namespace: 'default', resourceVersion: '99' } as V1ObjectMeta, + } as V1Namespace); + + delayValues.length = 0; + await doneHandler(null); + deepStrictEqual(delayValues, []); + + await doneHandler(null); + deepStrictEqual(delayValues, [1000]); + }); + + it('should reconnect with backoff on TimeoutError', async () => { + const fakeWatch = mock.mock(Watch); + const listObj = { + metadata: { resourceVersion: '12345' } as V1ListMeta, + items: [] as V1Namespace[], + } as V1NamespaceList; + + const listFn: ListPromise = () => Promise.resolve(listObj); + + let watchCalls = 0; + let errorEmitted = false; + const promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + watchCalls++; + resolve(new AbortController()); + return Promise.resolve(new AbortController()); + }); + }); + + const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); + (cache as any).delayFn = () => Promise.resolve(); + await promise; + cache.on('error', () => (errorEmitted = true)); + strictEqual(watchCalls, 1); + + const [, , , doneHandler] = mock.capture(fakeWatch.watch).last(); + + const timeoutError = new DOMException('The operation was aborted due to timeout', 'TimeoutError'); + await doneHandler(timeoutError); + strictEqual(watchCalls, 2); + strictEqual(errorEmitted, false); + }); }); describe('delete items', () => { diff --git a/src/test/integration/index.ts b/src/test/integration/index.ts index d3979a39fc..4fed1ee3b3 100644 --- a/src/test/integration/index.ts +++ b/src/test/integration/index.ts @@ -2,6 +2,7 @@ import patchNamespace from './patchNamespace.js'; import cpFromPod from './cpFromPod.js'; import portForwardIntegration from './portForward.js'; import watchPods from './watchPods.js'; +import informerReconnect from './informerReconnect.js'; console.log('Integration testing'); @@ -9,3 +10,4 @@ await patchNamespace(); await cpFromPod(); await portForwardIntegration(); await watchPods(); +await informerReconnect(); diff --git a/src/test/integration/informerReconnect.ts b/src/test/integration/informerReconnect.ts new file mode 100644 index 0000000000..12abb8a244 --- /dev/null +++ b/src/test/integration/informerReconnect.ts @@ -0,0 +1,129 @@ +import assert from 'node:assert'; +import { setTimeout } from 'node:timers/promises'; +import { CoreV1Api, KubeConfig, V1ConfigMap, V1ConfigMapList } from '../../index.js'; +import { Watch } from '../../watch.js'; +import { ListWatch } from '../../cache.js'; +import { generateName } from './name.js'; + +export default async function informerReconnect() { + const kc = new KubeConfig(); + kc.loadFromDefault(); + + const coreV1Client = kc.makeApiClient(CoreV1Api); + const namespace = 'default'; + const labelKey = 'informer-test'; + const labelValue = generateName('run'); + const labelSelector = `${labelKey}=${labelValue}`; + + console.log(`\n=== Informer Reconnect Integration Test (label=${labelValue}) ===`); + + const watch = new Watch(kc); + const listFn = async (): Promise => { + return coreV1Client.listNamespacedConfigMap({ + namespace, + labelSelector, + }); + }; + + const informer = new ListWatch( + `/api/v1/namespaces/${namespace}/configmaps`, + watch, + listFn, + false, + labelSelector, + ); + + const addedNames: string[] = []; + let connectCount = 0; + let errorCount = 0; + + informer.on('add', (obj: V1ConfigMap) => { + const name = obj.metadata?.name ?? 'unknown'; + console.log(`Informer event: add ${name}`); + addedNames.push(name); + }); + + informer.on('connect', () => { + connectCount++; + console.log(`Informer event: connect (#${connectCount})`); + }); + + informer.on('error', (err: any) => { + errorCount++; + console.log(`Informer event: error ${err}`); + }); + + const cm1Name = generateName('cm1'); + const cm2Name = generateName('cm2'); + + try { + await informer.start(); + console.log('Informer started'); + + console.log(`Creating configmap ${cm1Name}`); + await coreV1Client.createNamespacedConfigMap({ + namespace, + body: { + metadata: { name: cm1Name, labels: { [labelKey]: labelValue } }, + data: { key: 'value1' }, + }, + }); + + for (let i = 0; i < 30; i++) { + if (addedNames.includes(cm1Name)) break; + await setTimeout(500); + } + assert.ok(addedNames.includes(cm1Name), 'Should have received add event for cm1'); + console.log('✓ Received add event for cm1'); + + const initialConnects = connectCount; + console.log(`Waiting for watch reconnection (up to 45s)...`); + for (let i = 0; i < 90; i++) { + if (connectCount > initialConnects) break; + await setTimeout(500); + } + assert.ok(connectCount > initialConnects, 'Informer should have reconnected'); + console.log(`✓ Informer reconnected (connect count: ${connectCount})`); + + console.log(`Creating configmap ${cm2Name}`); + await coreV1Client.createNamespacedConfigMap({ + namespace, + body: { + metadata: { name: cm2Name, labels: { [labelKey]: labelValue } }, + data: { key: 'value2' }, + }, + }); + + for (let i = 0; i < 30; i++) { + if (addedNames.includes(cm2Name)) break; + await setTimeout(500); + } + assert.ok(addedNames.includes(cm2Name), 'Should have received add event for cm2 after reconnect'); + console.log('✓ Received add event for cm2 after reconnection'); + + const cm1Duplicates = addedNames.filter((n) => n === cm1Name).length; + assert.strictEqual( + cm1Duplicates, + 1, + `cm1 should only appear once in add events, got ${cm1Duplicates}`, + ); + console.log('✓ No duplicate add events for cm1 (delta-only after reconnect)'); + + assert.strictEqual(errorCount, 0, `Expected no errors, got ${errorCount}`); + console.log('✓ No error events'); + } finally { + await informer.stop(); + try { + await coreV1Client.deleteNamespacedConfigMap({ name: cm1Name, namespace }); + } catch { + // already deleted + } + try { + await coreV1Client.deleteNamespacedConfigMap({ name: cm2Name, namespace }); + } catch { + // already deleted + } + } + + console.log('Informer reconnect integration test passed!'); +} diff --git a/src/watch.ts b/src/watch.ts index ecec1566a8..85058117e2 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -51,7 +51,11 @@ export class Watch { if (!doneCalled) { doneCalled = true; controller.abort(); - done(err); + if (err && timeoutSignal.aborted) { + done(new DOMException('The operation was aborted due to timeout', 'TimeoutError')); + } else { + done(err); + } } };