1- import { Readable , type Transform } from "node:stream " ;
1+ import { serve } from "bun " ;
22import split2 from "split2" ;
33import { dbFunctions } from "~/core/database" ;
44import { getDockerClient } from "~/core/docker/client" ;
@@ -9,134 +9,119 @@ import {
99import { logger } from "~/core/utils/logger" ;
1010import type { DockerStatsEvent } from "~/typings/docker" ;
1111
12- export function createDockerStatsStream ( ) : Readable {
13- const stream = new Readable ( {
14- objectMode : true ,
15- read ( ) { } ,
16- } ) ;
12+ // Track all connected WebSocket clients
13+ const clients = new Set < Bun . ServerWebSocket < unknown > > ( ) ;
1714
18- const substreams : Array < {
19- statsStream : Readable ;
20- splitStream : Transform ;
21- } > = [ ] ;
22-
23- const cleanup = ( ) => {
24- for ( const { statsStream, splitStream } of substreams ) {
25- try {
26- statsStream . unpipe ( splitStream ) ;
27- statsStream . destroy ( ) ;
28- splitStream . destroy ( ) ;
29- } catch ( error ) {
30- logger . error ( `Cleanup error: ${ error } ` ) ;
31- }
15+ // Broadcast a DockerStatsEvent to every connected client
16+ function broadcast ( event : DockerStatsEvent ) {
17+ const message = JSON . stringify ( event ) ;
18+ for ( const ws of clients ) {
19+ if ( ws . readyState === 1 ) {
20+ ws . send ( message ) ;
3221 }
33- substreams . length = 0 ;
34- } ;
35-
36- stream . on ( "close" , cleanup ) ;
37- stream . on ( "error" , cleanup ) ;
38-
39- ( async ( ) => {
40- try {
41- const hosts = dbFunctions . getDockerHosts ( ) ;
42- logger . debug ( `Retrieved ${ hosts . length } docker host(s)` ) ;
43-
44- for ( const host of hosts ) {
45- if ( stream . destroyed ) break ;
22+ }
23+ }
4624
47- try {
48- const docker = getDockerClient ( host ) ;
49- await docker . ping ( ) ;
50- const containers = await docker . listContainers ( {
51- all : true ,
52- } ) ;
25+ // Start Docker stats polling and broadcasting
26+ export async function startDockerStatsBroadcast ( ) {
27+ logger . debug ( "Starting Docker stats broadcast..." ) ;
5328
54- logger . debug (
55- `Found ${ containers . length } containers on ${ host . name } (id: ${ host . id } )` ,
56- ) ;
29+ try {
30+ const hosts = dbFunctions . getDockerHosts ( ) ;
31+ logger . debug ( `Retrieved ${ hosts . length } Docker host(s)` ) ;
5732
58- for ( const containerInfo of containers ) {
59- if ( stream . destroyed ) break ;
33+ for ( const host of hosts ) {
34+ try {
35+ const docker = getDockerClient ( host ) ;
36+ await docker . ping ( ) ;
37+ const containers = await docker . listContainers ( { all : true } ) ;
38+ logger . debug (
39+ `Host ${ host . name } contains ${ containers . length } containers` ,
40+ ) ;
6041
42+ for ( const info of containers ) {
43+ // Kick off one independent async task per container
44+ ( async ( ) => {
6145 try {
62- const container = docker . getContainer ( containerInfo . Id ) ;
63- const statsStream = ( await container . stats ( {
64- stream : true ,
65- } ) ) as Readable ;
66- const splitStream = split2 ( ) ;
67-
68- substreams . push ( { statsStream, splitStream } ) ;
46+ const statsStream = await docker
47+ . getContainer ( info . Id )
48+ . stats ( { stream : true } ) ;
49+ const splitter = split2 ( ) ;
50+ statsStream . pipe ( splitter ) ;
6951
70- statsStream
71- . on ( "close" , ( ) => splitStream . destroy ( ) )
72- . pipe ( splitStream )
73- . on ( "data" , ( line : string ) => {
74- if ( stream . destroyed || ! line ) return ;
75-
76- try {
77- const stats = JSON . parse ( line ) ;
78- const event : DockerStatsEvent = {
79- type : "stats" ,
80- id : containerInfo . Id ,
81- hostId : host . id ,
82- name : containerInfo . Names [ 0 ] . replace ( / ^ \/ / , "" ) ,
83- image : containerInfo . Image ,
84- status : containerInfo . Status ,
85- state : containerInfo . State ,
86- cpuUsage : calculateCpuPercent ( stats ) ?? 0 ,
87- memoryUsage : calculateMemoryUsage ( stats ) ?? 0 ,
88- } ;
89- stream . push ( event ) ;
90- } catch ( error ) {
91- stream . push ( {
92- type : "error" ,
93- hostId : host . id ,
94- containerId : containerInfo . Id ,
95- error : `Parse error: ${
96- error instanceof Error ? error . message : String ( error )
97- } `,
98- } ) ;
99- }
100- } )
101- . on ( "error" , ( error : Error ) => {
102- stream . push ( {
52+ for await ( const line of splitter ) {
53+ if ( ! line ) continue ;
54+ try {
55+ const stats = JSON . parse ( line ) ;
56+ broadcast ( {
57+ type : "stats" ,
58+ id : info . Id ,
59+ hostId : host . id ,
60+ name : info . Names [ 0 ] . replace ( / ^ \/ / , "" ) ,
61+ image : info . Image ,
62+ status : info . Status ,
63+ state : stats . state || info . State ,
64+ cpuUsage : calculateCpuPercent ( stats ) ?? 0 ,
65+ memoryUsage : calculateMemoryUsage ( stats ) ?? 0 ,
66+ } ) ;
67+ } catch ( err ) {
68+ broadcast ( {
10369 type : "error" ,
10470 hostId : host . id ,
105- containerId : containerInfo . Id ,
106- error : `Stream error: ${ error . message } ` ,
71+ containerId : info . Id ,
72+ error : `Parse error: ${ ( err as Error ) . message } ` ,
10773 } ) ;
108- } ) ;
109- } catch ( error ) {
110- stream . push ( {
74+ }
75+ }
76+ } catch ( err ) {
77+ broadcast ( {
11178 type : "error" ,
11279 hostId : host . id ,
113- containerId : containerInfo . Id ,
114- error : `Container error: ${
115- error instanceof Error ? error . message : String ( error )
116- } `,
80+ containerId : info . Id ,
81+ error : `Stats stream error: ${ ( err as Error ) . message } ` ,
11782 } ) ;
11883 }
119- }
120- } catch ( error ) {
121- stream . push ( {
122- type : "error" ,
123- hostId : host . id ,
124- error : `Host connection error: ${
125- error instanceof Error ? error . message : String ( error )
126- } `,
127- } ) ;
84+ } ) ( ) ;
12885 }
86+ } catch ( err ) {
87+ broadcast ( {
88+ type : "error" ,
89+ hostId : host . id ,
90+ error : `Host connection error: ${ ( err as Error ) . message } ` ,
91+ } ) ;
12992 }
130- } catch ( error ) {
131- stream . push ( {
132- type : "error" ,
133- error : `Initialization error: ${
134- error instanceof Error ? error . message : String ( error )
135- } `,
136- } ) ;
137- stream . destroy ( ) ;
13893 }
139- } ) ( ) ;
140-
141- return stream ;
94+ } catch ( err ) {
95+ broadcast ( {
96+ type : "error" ,
97+ hostId : 0 ,
98+ error : `Initialization error: ${ ( err as Error ) . message } ` ,
99+ } ) ;
100+ }
142101}
102+
103+ serve ( {
104+ port : 4837 ,
105+ reusePort : true ,
106+ fetch ( req , server ) {
107+ // Upgrade requests to WebSocket
108+ if ( req . url . endsWith ( "/ws/docker" ) ) {
109+ if ( server . upgrade ( req ) ) {
110+ return ; // auto 101 Switching Protocols
111+ }
112+ }
113+ return new Response ( "Expected WebSocket upgrade" , { status : 426 } ) ;
114+ } ,
115+
116+ websocket : {
117+ open ( ws ) {
118+ logger . debug ( "Client connected via WebSocket" ) ;
119+ clients . add ( ws ) ;
120+ } ,
121+ close ( ws , code , reason ) {
122+ logger . debug ( `Client disconnected (${ code } ): ${ reason } ` ) ;
123+ clients . delete ( ws ) ;
124+ } ,
125+ message ( ) { } ,
126+ } ,
127+ } ) ;
0 commit comments