88//! uploaded into the gateway container as a tar file via the Docker
99//! `put_archive` API, and then imported into containerd via `ctr images import`.
1010//!
11+ //! To avoid unbounded memory usage with large images, the export is streamed
12+ //! to a temporary file on disk, then streamed back through a tar wrapper into
13+ //! the Docker upload API. Peak memory usage is `O(chunk_size)` regardless of
14+ //! image size.
15+ //!
1116//! The standalone `ctr` binary is used (not `k3s ctr` which may not work in
1217//! all k3s versions) with the k3s containerd socket. The default containerd
1318//! namespace in k3s is already `k8s.io`, which is what kubelet uses.
1419
20+ use std:: pin:: Pin ;
21+
1522use bollard:: Docker ;
1623use bollard:: query_parameters:: UploadToContainerOptionsBuilder ;
1724use bytes:: Bytes ;
18- use futures:: StreamExt ;
25+ use futures:: { Stream , StreamExt } ;
1926use miette:: { IntoDiagnostic , Result , WrapErr } ;
27+ use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
2028
2129use crate :: runtime:: exec_capture_with_exit;
2230
@@ -26,11 +34,19 @@ const CONTAINERD_SOCK: &str = "/run/k3s/containerd/containerd.sock";
2634/// Path inside the container where the image tar is staged.
2735const IMPORT_TAR_PATH : & str = "/tmp/openshell-images.tar" ;
2836
37+ /// Size of chunks read from the temp file during streaming upload (8 MiB).
38+ const UPLOAD_CHUNK_SIZE : usize = 8 * 1024 * 1024 ;
39+
40+ /// Report export progress every N bytes (100 MiB).
41+ const PROGRESS_INTERVAL_BYTES : u64 = 100 * 1024 * 1024 ;
42+
2943/// Push a list of images from the local Docker daemon into a k3s gateway's
3044/// containerd runtime.
3145///
3246/// All images are exported as a single tar (shared layers are deduplicated),
33- /// uploaded to the container filesystem, and imported into containerd.
47+ /// streamed to a temporary file, then uploaded to the container filesystem
48+ /// and imported into containerd. Memory usage is bounded to `O(chunk_size)`
49+ /// regardless of image size.
3450pub async fn push_local_images (
3551 local_docker : & Docker ,
3652 gateway_docker : & Docker ,
@@ -42,17 +58,30 @@ pub async fn push_local_images(
4258 return Ok ( ( ) ) ;
4359 }
4460
45- // 1. Export all images from the local Docker daemon as a single tar .
46- let image_tar = collect_export ( local_docker, images) . await ?;
61+ // 1. Export all images from the local Docker daemon to a temp file .
62+ let ( tmp_file , file_size ) = export_to_tempfile ( local_docker, images, on_log ) . await ?;
4763 on_log ( format ! (
4864 "[progress] Exported {} MiB" ,
49- image_tar . len ( ) / ( 1024 * 1024 )
65+ file_size / ( 1024 * 1024 )
5066 ) ) ;
5167
52- // 2. Wrap the image tar as a file inside an outer tar archive and upload
53- // it into the container filesystem via the Docker put_archive API.
54- let outer_tar = wrap_in_tar ( IMPORT_TAR_PATH , & image_tar) ?;
55- upload_archive ( gateway_docker, container_name, & outer_tar) . await ?;
68+ // 2. Stream the image tar wrapped in an outer tar archive into the
69+ // container filesystem via the Docker put_archive API.
70+ let parent_dir = IMPORT_TAR_PATH . rsplit_once ( '/' ) . map_or ( "/" , |( dir, _) | dir) ;
71+ let options = UploadToContainerOptionsBuilder :: default ( )
72+ . path ( parent_dir)
73+ . build ( ) ;
74+
75+ let upload_stream = streaming_tar_upload ( IMPORT_TAR_PATH , tmp_file, file_size) ;
76+ gateway_docker
77+ . upload_to_container (
78+ container_name,
79+ Some ( options) ,
80+ bollard:: body_try_stream ( upload_stream) ,
81+ )
82+ . await
83+ . into_diagnostic ( )
84+ . wrap_err ( "failed to upload image tar into container" ) ?;
5685 on_log ( "[progress] Uploaded to gateway" . to_string ( ) ) ;
5786
5887 // 3. Import the tar into containerd via ctr.
@@ -93,59 +122,115 @@ pub async fn push_local_images(
93122 Ok ( ( ) )
94123}
95124
96- /// Collect the full export tar from `docker.export_images()` into memory.
97- async fn collect_export ( docker : & Docker , images : & [ & str ] ) -> Result < Vec < u8 > > {
125+ /// Stream the Docker image export directly to a temporary file.
126+ ///
127+ /// Returns the temp file handle and the total number of bytes written.
128+ /// Memory usage is `O(chunk_size)` — only one chunk is held at a time.
129+ /// Progress is reported every [`PROGRESS_INTERVAL_BYTES`] bytes.
130+ async fn export_to_tempfile (
131+ docker : & Docker ,
132+ images : & [ & str ] ,
133+ on_log : & mut impl FnMut ( String ) ,
134+ ) -> Result < ( tempfile:: NamedTempFile , u64 ) > {
135+ let tmp = tempfile:: NamedTempFile :: new ( )
136+ . into_diagnostic ( )
137+ . wrap_err ( "failed to create temp file for image export" ) ?;
138+
139+ // Open a second handle for async writing; the NamedTempFile retains
140+ // ownership and ensures cleanup on drop.
141+ let std_file = tmp
142+ . reopen ( )
143+ . into_diagnostic ( )
144+ . wrap_err ( "failed to reopen temp file for writing" ) ?;
145+ let mut async_file = tokio:: fs:: File :: from_std ( std_file) ;
146+
98147 let mut stream = docker. export_images ( images) ;
99- let mut buf = Vec :: new ( ) ;
148+ let mut total_bytes: u64 = 0 ;
149+ let mut last_reported: u64 = 0 ;
150+
100151 while let Some ( chunk) = stream. next ( ) . await {
101152 let bytes = chunk
102153 . into_diagnostic ( )
103154 . wrap_err ( "failed to read image export stream" ) ?;
104- buf. extend_from_slice ( & bytes) ;
155+ async_file
156+ . write_all ( & bytes)
157+ . await
158+ . into_diagnostic ( )
159+ . wrap_err ( "failed to write image data to temp file" ) ?;
160+ total_bytes += bytes. len ( ) as u64 ;
161+
162+ // Report progress periodically.
163+ if total_bytes >= last_reported + PROGRESS_INTERVAL_BYTES {
164+ let mb = total_bytes / ( 1024 * 1024 ) ;
165+ on_log ( format ! ( "[progress] Exported {mb} MiB" ) ) ;
166+ last_reported = total_bytes;
167+ }
105168 }
106- Ok ( buf)
107- }
108169
109- /// Wrap raw bytes as a single file inside a tar archive.
110- ///
111- /// The Docker `put_archive` API expects a tar that is extracted at a target
112- /// directory. We create a tar containing one entry whose name is the basename
113- /// of `file_path`, and upload it to the parent directory.
114- fn wrap_in_tar ( file_path : & str , data : & [ u8 ] ) -> Result < Vec < u8 > > {
115- let file_name = file_path. rsplit ( '/' ) . next ( ) . unwrap_or ( file_path) ;
116-
117- let mut builder = tar:: Builder :: new ( Vec :: new ( ) ) ;
118- let mut header = tar:: Header :: new_gnu ( ) ;
119- header. set_path ( file_name) . into_diagnostic ( ) ?;
120- header. set_size ( data. len ( ) as u64 ) ;
121- header. set_mode ( 0o644 ) ;
122- header. set_cksum ( ) ;
123- builder
124- . append ( & header, data)
125- . into_diagnostic ( )
126- . wrap_err ( "failed to build tar archive for image upload" ) ?;
127- builder
128- . into_inner ( )
170+ async_file
171+ . flush ( )
172+ . await
129173 . into_diagnostic ( )
130- . wrap_err ( "failed to finalize tar archive" )
131- }
132-
133- /// Upload a tar archive into the container at the parent directory of
134- /// [`IMPORT_TAR_PATH`].
135- async fn upload_archive ( docker : & Docker , container_name : & str , archive : & [ u8 ] ) -> Result < ( ) > {
136- let parent_dir = IMPORT_TAR_PATH . rsplit_once ( '/' ) . map_or ( "/" , |( dir, _) | dir) ;
174+ . wrap_err ( "failed to flush temp file" ) ?;
137175
138- let options = UploadToContainerOptionsBuilder :: default ( )
139- . path ( parent_dir)
140- . build ( ) ;
176+ Ok ( ( tmp, total_bytes) )
177+ }
141178
142- docker
143- . upload_to_container (
144- container_name,
145- Some ( options) ,
146- bollard:: body_full ( Bytes :: copy_from_slice ( archive) ) ,
147- )
148- . await
149- . into_diagnostic ( )
150- . wrap_err ( "failed to upload image tar into container" )
179+ /// Create a stream that yields an outer tar archive containing the image tar
180+ /// as a single entry, reading the image data from the temp file in chunks.
181+ ///
182+ /// The Docker `put_archive` API expects a tar that is extracted at a target
183+ /// directory. We construct a tar with one entry whose name is the basename
184+ /// of `file_path`. The stream yields:
185+ /// 1. A 512-byte GNU tar header
186+ /// 2. The file content in [`UPLOAD_CHUNK_SIZE`] chunks
187+ /// 3. Padding to a 512-byte boundary + two 512-byte zero EOF blocks
188+ ///
189+ /// Memory usage is O([`UPLOAD_CHUNK_SIZE`]) regardless of file size.
190+ fn streaming_tar_upload (
191+ file_path : & str ,
192+ tmp_file : tempfile:: NamedTempFile ,
193+ file_size : u64 ,
194+ ) -> Pin < Box < dyn Stream < Item = std:: result:: Result < Bytes , std:: io:: Error > > + Send > > {
195+ let file_name = file_path
196+ . rsplit ( '/' )
197+ . next ( )
198+ . unwrap_or ( file_path)
199+ . to_string ( ) ;
200+
201+ Box :: pin ( async_stream:: try_stream! {
202+ // 1. Build and yield the tar header.
203+ let mut header = tar:: Header :: new_gnu( ) ;
204+ header. set_path( & file_name) ?;
205+ header. set_size( file_size) ;
206+ header. set_mode( 0o644 ) ;
207+ header. set_cksum( ) ;
208+ yield Bytes :: copy_from_slice( header. as_bytes( ) ) ;
209+
210+ // 2. Stream the temp file content in chunks.
211+ let std_file = tmp_file. reopen( ) ?;
212+ let mut async_file = tokio:: fs:: File :: from_std( std_file) ;
213+ let mut buf = vec![ 0u8 ; UPLOAD_CHUNK_SIZE ] ;
214+ loop {
215+ let n = async_file. read( & mut buf) . await ?;
216+ if n == 0 {
217+ break ;
218+ }
219+ yield Bytes :: copy_from_slice( & buf[ ..n] ) ;
220+ }
221+
222+ // 3. Yield tar padding and EOF blocks.
223+ // Tar entries must be padded to a 512-byte boundary, followed by
224+ // two 512-byte zero blocks to signal end-of-archive.
225+ let padding_len = if file_size. is_multiple_of( 512 ) {
226+ 0
227+ } else {
228+ 512 - ( file_size % 512 ) as usize
229+ } ;
230+ let footer = vec![ 0u8 ; padding_len + 1024 ] ;
231+ yield Bytes :: from( footer) ;
232+
233+ // The NamedTempFile is dropped here, cleaning up the temp file.
234+ drop( tmp_file) ;
235+ } )
151236}
0 commit comments