Skip to content
43 changes: 43 additions & 0 deletions examples/port-forward-deployment.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const deploymentName = process.argv[3] ?? 'demo-deployment';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a deployment in Kubernetes
// by resolving the deployment to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-deployment.js [namespace] [deploymentName] [localPort] [remotePort]
// Example: node port-forward-deployment.js default my-app 8080 3000
// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to deployment ${namespace}/${deploymentName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
43 changes: 43 additions & 0 deletions examples/port-forward-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const serviceName = process.argv[3] ?? 'demo-service';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a service in Kubernetes
// by resolving the service to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-service.js [namespace] [serviceName] [localPort] [remotePort]
// Example: node port-forward-service.js default my-service 8080 80
// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
46 changes: 46 additions & 0 deletions examples/typescript/port-forward/port-forward-deployment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const deploymentName = process.argv[3] ?? 'demo-deployment';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a deployment in Kubernetes
// by resolving the deployment to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-deployment.ts [namespace] [deploymentName] [localPort] [remotePort]
// Example: node port-forward-deployment.ts default my-app 8080 3000
// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket);
} catch (error) {
console.error(
`Error port-forwarding to deployment ${namespace}/${deploymentName}:`,
(error as Error).message,
);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error: NodeJS.ErrnoException) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
43 changes: 43 additions & 0 deletions examples/typescript/port-forward/port-forward-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const serviceName = process.argv[3] ?? 'demo-service';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a service in Kubernetes
// by resolving the service to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-service.ts [namespace] [serviceName] [localPort] [remotePort]
// Example: node port-forward-service.ts default my-service 8080 80
// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error: NodeJS.ErrnoException) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
64 changes: 32 additions & 32 deletions examples/typescript/watch/watch-example.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
import * as k8s from '@kubernetes/client-node';
// import * as k8s from '@kubernetes/client-node';
import * as k8s from '../../../dist/index.js';
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like maybe it was left over from local testing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed it, good catch.
Prettier autoformatted the file so I would leave it in here.


const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const watch = new k8s.Watch(kc);
const req = await watch
.watch(
'/api/v1/namespaces',
// optional query parameters can go here.
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
console.log('new object:');
} else if (type === 'MODIFIED') {
console.log('changed object:');
} else if (type === 'DELETED') {
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
console.log('unknown type: ' + type);
}
console.log(apiObj);
},
// done callback is called if the watch terminates either normally or with an error
(err) => {
if (err) {
console.error(err);
} else {
console.log('watch finished normally')
}
},
)
const req = await watch.watch(
'/api/v1/namespaces',
// optional query parameters can go here.
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
console.log('new object:');
} else if (type === 'MODIFIED') {
console.log('changed object:');
} else if (type === 'DELETED') {
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
console.log('unknown type: ' + type);
}
console.log(apiObj);
},
// done callback is called if the watch terminates either normally or with an error
(err) => {
if (err) {
console.error(err);
} else {
console.log('watch finished normally');
}
},
);
// watch returns a request object which you can use to abort the watch.
setTimeout(() => {
req.abort();
Expand Down
127 changes: 127 additions & 0 deletions src/portforward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ import WebSocket from 'isomorphic-ws';
import querystring from 'node:querystring';
import stream from 'node:stream';

import { AppsV1Api, CoreV1Api, V1Pod } from './gen/index.js';
import { KubeConfig } from './config.js';
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler.js';

export class PortForward {
private readonly config: KubeConfig;
private readonly handler: WebSocketInterface;
private readonly disconnectOnErr: boolean;

// handler is a parameter really only for injecting for testing.
constructor(config: KubeConfig, disconnectOnErr?: boolean, handler?: WebSocketInterface) {
this.config = config;
this.handler = handler || new WebSocketHandler(config);
this.disconnectOnErr = disconnectOnErr === undefined ? true : disconnectOnErr;
}
Expand Down Expand Up @@ -70,4 +73,128 @@ export class PortForward {

return WebSocketHandler.restartableHandleStandardInput(createWebSocket, input, 0, retryCount);
}

/**
* Port forward to a service by resolving to the first ready pod selected by the service's selector.
*
* @param namespace - The namespace of the service
* @param serviceName - The name of the service
* @param targetPorts - The target ports to forward to
* @param output - The writable stream for output
* @param err - The writable stream for error output (can be null)
* @param input - The readable stream for input
* @param retryCount - The number of times to retry the connection
* @throws Will throw an error if the service is not found or has no ready pods
*/
public async portForwardService(
namespace: string,
serviceName: string,
targetPorts: number[],
output: stream.Writable,
err: stream.Writable | null,
input: stream.Readable,
retryCount: number = 0,
): Promise<WebSocket.WebSocket | (() => WebSocket.WebSocket | null)> {
const coreApi = this.config.makeApiClient(CoreV1Api);
const service = await coreApi.readNamespacedService({ name: serviceName, namespace });

if (!service.spec?.selector || Object.keys(service.spec.selector).length === 0) {
throw new Error(`Service ${namespace}/${serviceName} has no selector defined`);
}

const labelSelector = this.buildLabelSelector(service.spec.selector);
const pod = await this.getFirstReadyPod(namespace, labelSelector);

return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount);
}

/**
* Port forward to a deployment by resolving to the first ready pod selected by the deployment's selector.
*
* @param namespace - The namespace of the deployment
* @param deploymentName - The name of the deployment
* @param targetPorts - The target ports to forward to
* @param output - The writable stream for output
* @param err - The writable stream for error output (can be null)
* @param input - The readable stream for input
* @param retryCount - The number of times to retry the connection
* @throws Will throw an error if the deployment is not found or has no ready pods
*/
public async portForwardDeployment(
namespace: string,
deploymentName: string,
targetPorts: number[],
output: stream.Writable,
err: stream.Writable | null,
input: stream.Readable,
retryCount: number = 0,
): Promise<WebSocket.WebSocket | (() => WebSocket.WebSocket | null)> {
const appsApi = this.config.makeApiClient(AppsV1Api);
const deployment = await appsApi.readNamespacedDeployment({ name: deploymentName, namespace });

if (
!deployment.spec?.selector?.matchLabels ||
Object.keys(deployment.spec.selector.matchLabels).length === 0
) {
throw new Error(`Deployment ${namespace}/${deploymentName} has no selector defined`);
}

const labelSelector = this.buildLabelSelector(deployment.spec.selector.matchLabels);
const pod = await this.getFirstReadyPod(namespace, labelSelector);

return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount);
}

/**
* Get the first ready pod matching the label selector.
*
* @param namespace - The namespace to query
* @param labelSelector - The label selector to filter pods
* @returns The first ready pod
* @throws Will throw an error if no ready pods are found
*/
private async getFirstReadyPod(namespace: string, labelSelector: string): Promise<V1Pod> {
const coreApi = this.config.makeApiClient(CoreV1Api);
const podList = await coreApi.listNamespacedPod({ namespace, labelSelector });

if (!podList.items || podList.items.length === 0) {
throw new Error(`No pods found with selector "${labelSelector}" in namespace ${namespace}`);
}

// Find the first pod with Ready status
for (const pod of podList.items) {
if (this.isPodReady(pod)) {
return pod;
}
}

throw new Error(`No ready pods found with selector "${labelSelector}" in namespace ${namespace}`);
}

/**
* Check if a pod is ready by looking at its status conditions.
*
* @param pod - The pod to check
* @returns True if the pod has a Ready condition with status True
*/
private isPodReady(pod: V1Pod): boolean {
if (!pod.status?.conditions) {
return false;
}
return pod.status.conditions.some(
(condition) => condition.type === 'Ready' && condition.status === 'True',
);
}

/**
* Build a Kubernetes label selector string from a label object.
*
* @param labels - An object of label key-value pairs
* @returns A Kubernetes label selector string
*/
private buildLabelSelector(labels: { [key: string]: string }): string {
return Object.entries(labels)
.map(([key, value]) => `${key}=${value}`)
.join(',');
}
}
Loading