Skip to content
43 changes: 43 additions & 0 deletions examples/port-forward-deployment.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const deploymentName = process.argv[3] ?? 'demo-deployment';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a deployment in Kubernetes
// by resolving the deployment to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-deployment.js [namespace] [deploymentName] [localPort] [remotePort]
// Example: node port-forward-deployment.js default my-app 8080 3000
// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to deployment ${namespace}/${deploymentName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
43 changes: 43 additions & 0 deletions examples/port-forward-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const serviceName = process.argv[3] ?? 'demo-service';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a service in Kubernetes
// by resolving the service to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-service.js [namespace] [serviceName] [localPort] [remotePort]
// Example: node port-forward-service.js default my-service 8080 80
// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
46 changes: 46 additions & 0 deletions examples/typescript/port-forward/port-forward-deployment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const deploymentName = process.argv[3] ?? 'demo-deployment';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a deployment in Kubernetes
// by resolving the deployment to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-deployment.ts [namespace] [deploymentName] [localPort] [remotePort]
// Example: node port-forward-deployment.ts default my-app 8080 3000
// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket);
} catch (error) {
console.error(
`Error port-forwarding to deployment ${namespace}/${deploymentName}:`,
(error as Error).message,
);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error: NodeJS.ErrnoException) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
43 changes: 43 additions & 0 deletions examples/typescript/port-forward/port-forward-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const serviceName = process.argv[3] ?? 'demo-service';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a service in Kubernetes
// by resolving the service to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-service.ts [namespace] [serviceName] [localPort] [remotePort]
// Example: node port-forward-service.ts default my-service 8080 80
// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error: NodeJS.ErrnoException) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
61 changes: 30 additions & 31 deletions examples/typescript/watch/watch-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,36 @@ const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const watch = new k8s.Watch(kc);
const req = await watch
.watch(
'/api/v1/namespaces',
// optional query parameters can go here.
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
console.log('new object:');
} else if (type === 'MODIFIED') {
console.log('changed object:');
} else if (type === 'DELETED') {
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
console.log('unknown type: ' + type);
}
console.log(apiObj);
},
// done callback is called if the watch terminates either normally or with an error
(err) => {
if (err) {
console.error(err);
} else {
console.log('watch finished normally')
}
},
)
const req = await watch.watch(
'/api/v1/namespaces',
// optional query parameters can go here.
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
console.log('new object:');
} else if (type === 'MODIFIED') {
console.log('changed object:');
} else if (type === 'DELETED') {
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
console.log('unknown type: ' + type);
}
console.log(apiObj);
},
// done callback is called if the watch terminates either normally or with an error
(err) => {
if (err) {
console.error(err);
} else {
console.log('watch finished normally');
}
},
);
// watch returns a request object which you can use to abort the watch.
setTimeout(() => {
req.abort();
Expand Down
127 changes: 127 additions & 0 deletions src/portforward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ import WebSocket from 'isomorphic-ws';
import querystring from 'node:querystring';
import stream from 'node:stream';

import { AppsV1Api, CoreV1Api, V1Pod } from './gen/index.js';
import { KubeConfig } from './config.js';
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler.js';

export class PortForward {
private readonly config: KubeConfig;
private readonly handler: WebSocketInterface;
private readonly disconnectOnErr: boolean;

// handler is a parameter really only for injecting for testing.
constructor(config: KubeConfig, disconnectOnErr?: boolean, handler?: WebSocketInterface) {
this.config = config;
this.handler = handler || new WebSocketHandler(config);
this.disconnectOnErr = disconnectOnErr === undefined ? true : disconnectOnErr;
}
Expand Down Expand Up @@ -70,4 +73,128 @@ export class PortForward {

return WebSocketHandler.restartableHandleStandardInput(createWebSocket, input, 0, retryCount);
}

/**
* Port forward to a service by resolving to the first ready pod selected by the service's selector.
*
* @param namespace - The namespace of the service
* @param serviceName - The name of the service
* @param targetPorts - The target ports to forward to
* @param output - The writable stream for output
* @param err - The writable stream for error output (can be null)
* @param input - The readable stream for input
* @param retryCount - The number of times to retry the connection
* @throws Will throw an error if the service is not found or has no ready pods
*/
public async portForwardService(
namespace: string,
serviceName: string,
targetPorts: number[],
output: stream.Writable,
err: stream.Writable | null,
input: stream.Readable,
retryCount: number = 0,
): Promise<WebSocket.WebSocket | (() => WebSocket.WebSocket | null)> {
const coreApi = this.config.makeApiClient(CoreV1Api);
const service = await coreApi.readNamespacedService({ name: serviceName, namespace });

if (!service.spec?.selector || Object.keys(service.spec.selector).length === 0) {
throw new Error(`Service ${namespace}/${serviceName} has no selector defined`);
}

const labelSelector = this.buildLabelSelector(service.spec.selector);
const pod = await this.getFirstReadyPod(namespace, labelSelector);

return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount);
}

/**
* Port forward to a deployment by resolving to the first ready pod selected by the deployment's selector.
*
* @param namespace - The namespace of the deployment
* @param deploymentName - The name of the deployment
* @param targetPorts - The target ports to forward to
* @param output - The writable stream for output
* @param err - The writable stream for error output (can be null)
* @param input - The readable stream for input
* @param retryCount - The number of times to retry the connection
* @throws Will throw an error if the deployment is not found or has no ready pods
*/
public async portForwardDeployment(
namespace: string,
deploymentName: string,
targetPorts: number[],
output: stream.Writable,
err: stream.Writable | null,
input: stream.Readable,
retryCount: number = 0,
): Promise<WebSocket.WebSocket | (() => WebSocket.WebSocket | null)> {
const appsApi = this.config.makeApiClient(AppsV1Api);
const deployment = await appsApi.readNamespacedDeployment({ name: deploymentName, namespace });

if (
!deployment.spec?.selector?.matchLabels ||
Object.keys(deployment.spec.selector.matchLabels).length === 0
) {
throw new Error(`Deployment ${namespace}/${deploymentName} has no selector defined`);
}

const labelSelector = this.buildLabelSelector(deployment.spec.selector.matchLabels);
const pod = await this.getFirstReadyPod(namespace, labelSelector);

return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount);
}

/**
* Get the first ready pod matching the label selector.
*
* @param namespace - The namespace to query
* @param labelSelector - The label selector to filter pods
* @returns The first ready pod
* @throws Will throw an error if no ready pods are found
*/
private async getFirstReadyPod(namespace: string, labelSelector: string): Promise<V1Pod> {
const coreApi = this.config.makeApiClient(CoreV1Api);
const podList = await coreApi.listNamespacedPod({ namespace, labelSelector });

if (!podList.items || podList.items.length === 0) {
throw new Error(`No pods found with selector "${labelSelector}" in namespace ${namespace}`);
}

// Find the first pod with Ready status
for (const pod of podList.items) {
if (this.isPodReady(pod)) {
return pod;
}
}

throw new Error(`No ready pods found with selector "${labelSelector}" in namespace ${namespace}`);
}

/**
* Check if a pod is ready by looking at its status conditions.
*
* @param pod - The pod to check
* @returns True if the pod has a Ready condition with status True
*/
private isPodReady(pod: V1Pod): boolean {
if (!pod.status?.conditions) {
return false;
}
return pod.status.conditions.some(
(condition) => condition.type === 'Ready' && condition.status === 'True',
);
}

/**
* Build a Kubernetes label selector string from a label object.
*
* @param labels - An object of label key-value pairs
* @returns A Kubernetes label selector string
*/
private buildLabelSelector(labels: { [key: string]: string }): string {
return Object.entries(labels)
.map(([key, value]) => `${key}=${value}`)
.join(',');
}
}
Loading