|
15 | 15 | //===----------------------------------------------------------------------===// |
16 | 16 |
|
17 | 17 | #if os(macOS) |
| 18 | +import ContainerizationArchive |
18 | 19 | import ContainerizationError |
19 | 20 | import ContainerizationExtras |
20 | 21 | import ContainerizationOCI |
@@ -123,6 +124,9 @@ public final class LinuxContainer: Container, Sendable { |
123 | 124 | // the host. |
124 | 125 | private let guestVsockPorts: Atomic<UInt32> |
125 | 126 |
|
| 127 | + // Queue for copy IO. |
| 128 | + private let copyQueue = DispatchQueue(label: "com.apple.containerization.copy") |
| 129 | + |
126 | 130 | private enum State: Sendable { |
127 | 131 | /// The container class has been created but no live resources are running. |
128 | 132 | case initialized |
@@ -1043,52 +1047,219 @@ extension LinuxContainer { |
1043 | 1047 | /// Default chunk size for file transfers (1MiB). |
1044 | 1048 | public static let defaultCopyChunkSize = 1024 * 1024 |
1045 | 1049 |
|
1046 | | - /// Copy a file from the host into the container. |
| 1050 | + /// Copy a file or directory from the host into the container. |
| 1051 | + /// |
| 1052 | + /// Data transfer happens over a dedicated vsock connection. For directories, |
| 1053 | + /// the source is archived as tar+gzip and streamed directly through vsock |
| 1054 | + /// without intermediate temp files. |
1047 | 1055 | public func copyIn( |
1048 | 1056 | from source: URL, |
1049 | 1057 | to destination: URL, |
1050 | 1058 | mode: UInt32 = 0o644, |
1051 | 1059 | createParents: Bool = true, |
1052 | | - chunkSize: Int = defaultCopyChunkSize, |
1053 | | - progress: ProgressHandler? = nil |
| 1060 | + chunkSize: Int = defaultCopyChunkSize |
1054 | 1061 | ) async throws { |
1055 | 1062 | try await self.state.withLock { |
1056 | 1063 | let state = try $0.startedState("copyIn") |
1057 | 1064 |
|
| 1065 | + var isDirectory: ObjCBool = false |
| 1066 | + guard FileManager.default.fileExists(atPath: source.path, isDirectory: &isDirectory) else { |
| 1067 | + throw ContainerizationError(.notFound, message: "copyIn: source not found '\(source.path)'") |
| 1068 | + } |
| 1069 | + let isArchive = isDirectory.boolValue |
| 1070 | + |
1058 | 1071 | let guestPath = URL(filePath: self.root).appending(path: destination.path) |
1059 | | - try await state.vm.withAgent { agent in |
1060 | | - try await agent.copyIn( |
1061 | | - from: source, |
1062 | | - to: guestPath, |
1063 | | - mode: mode, |
1064 | | - createParents: createParents, |
1065 | | - chunkSize: chunkSize, |
1066 | | - progress: progress |
1067 | | - ) |
| 1072 | + let port = self.hostVsockPorts.wrappingAdd(1, ordering: .relaxed).oldValue |
| 1073 | + let listener = try state.vm.listen(port) |
| 1074 | + |
| 1075 | + try await withThrowingTaskGroup(of: Void.self) { group in |
| 1076 | + group.addTask { |
| 1077 | + try await state.vm.withAgent { agent in |
| 1078 | + guard let vminitd = agent as? Vminitd else { |
| 1079 | + throw ContainerizationError(.unsupported, message: "copyIn requires Vminitd agent") |
| 1080 | + } |
| 1081 | + try await vminitd.copy( |
| 1082 | + direction: .copyIn, |
| 1083 | + guestPath: guestPath, |
| 1084 | + vsockPort: port, |
| 1085 | + mode: mode, |
| 1086 | + createParents: createParents, |
| 1087 | + isArchive: isArchive |
| 1088 | + ) |
| 1089 | + } |
| 1090 | + } |
| 1091 | + |
| 1092 | + group.addTask { |
| 1093 | + guard let conn = await listener.first(where: { _ in true }) else { |
| 1094 | + throw ContainerizationError(.internalError, message: "copyIn: vsock connection not established") |
| 1095 | + } |
| 1096 | + try listener.finish() |
| 1097 | + |
| 1098 | + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in |
| 1099 | + self.copyQueue.async { |
| 1100 | + do { |
| 1101 | + defer { conn.closeFile() } |
| 1102 | + |
| 1103 | + if isArchive { |
| 1104 | + let writer = try ArchiveWriter(configuration: .init(format: .pax, filter: .gzip)) |
| 1105 | + try writer.open(fileDescriptor: conn.fileDescriptor) |
| 1106 | + try writer.archiveDirectory(source) |
| 1107 | + try writer.finishEncoding() |
| 1108 | + } else { |
| 1109 | + let srcFd = open(source.path, O_RDONLY) |
| 1110 | + guard srcFd != -1 else { |
| 1111 | + throw ContainerizationError( |
| 1112 | + .internalError, |
| 1113 | + message: "copyIn: failed to open '\(source.path)': \(String(cString: strerror(errno)))" |
| 1114 | + ) |
| 1115 | + } |
| 1116 | + defer { close(srcFd) } |
| 1117 | + |
| 1118 | + var buf = [UInt8](repeating: 0, count: chunkSize) |
| 1119 | + while true { |
| 1120 | + let n = read(srcFd, &buf, buf.count) |
| 1121 | + if n == 0 { break } |
| 1122 | + guard n > 0 else { |
| 1123 | + throw ContainerizationError( |
| 1124 | + .internalError, |
| 1125 | + message: "copyIn: read error: \(String(cString: strerror(errno)))" |
| 1126 | + ) |
| 1127 | + } |
| 1128 | + var written = 0 |
| 1129 | + while written < n { |
| 1130 | + let w = buf.withUnsafeBytes { ptr in |
| 1131 | + write(conn.fileDescriptor, ptr.baseAddress! + written, n - written) |
| 1132 | + } |
| 1133 | + guard w > 0 else { |
| 1134 | + throw ContainerizationError( |
| 1135 | + .internalError, |
| 1136 | + message: "copyIn: vsock write error: \(String(cString: strerror(errno)))" |
| 1137 | + ) |
| 1138 | + } |
| 1139 | + written += w |
| 1140 | + } |
| 1141 | + } |
| 1142 | + } |
| 1143 | + continuation.resume() |
| 1144 | + } catch { |
| 1145 | + continuation.resume(throwing: error) |
| 1146 | + } |
| 1147 | + } |
| 1148 | + } |
| 1149 | + } |
| 1150 | + |
| 1151 | + try await group.waitForAll() |
1068 | 1152 | } |
1069 | 1153 | } |
1070 | 1154 | } |
1071 | 1155 |
|
1072 | | - /// Copy a file from the container to the host. |
| 1156 | + /// Copy a file or directory from the container to the host. |
| 1157 | + /// |
| 1158 | + /// Data transfer happens over a dedicated vsock connection. For directories, |
| 1159 | + /// the guest archives the source as tar+gzip and streams it directly through |
| 1160 | + /// vsock. The host extracts the archive without intermediate temp files. |
1073 | 1161 | public func copyOut( |
1074 | 1162 | from source: URL, |
1075 | 1163 | to destination: URL, |
1076 | 1164 | createParents: Bool = true, |
1077 | | - chunkSize: Int = defaultCopyChunkSize, |
1078 | | - progress: ProgressHandler? = nil |
| 1165 | + chunkSize: Int = defaultCopyChunkSize |
1079 | 1166 | ) async throws { |
1080 | 1167 | try await self.state.withLock { |
1081 | 1168 | let state = try $0.startedState("copyOut") |
1082 | 1169 |
|
| 1170 | + if createParents { |
| 1171 | + let parentDir = destination.deletingLastPathComponent() |
| 1172 | + try FileManager.default.createDirectory(at: parentDir, withIntermediateDirectories: true) |
| 1173 | + } |
| 1174 | + |
1083 | 1175 | let guestPath = URL(filePath: self.root).appending(path: source.path) |
1084 | | - try await state.vm.withAgent { agent in |
1085 | | - try await agent.copyOut( |
1086 | | - from: guestPath, |
1087 | | - to: destination, |
1088 | | - createParents: createParents, |
1089 | | - chunkSize: chunkSize, |
1090 | | - progress: progress |
1091 | | - ) |
| 1176 | + let port = self.hostVsockPorts.wrappingAdd(1, ordering: .relaxed).oldValue |
| 1177 | + let listener = try state.vm.listen(port) |
| 1178 | + |
| 1179 | + let (metadataStream, metadataCont) = AsyncStream.makeStream(of: Vminitd.CopyMetadata.self) |
| 1180 | + |
| 1181 | + try await withThrowingTaskGroup(of: Void.self) { group in |
| 1182 | + group.addTask { |
| 1183 | + try await state.vm.withAgent { agent in |
| 1184 | + guard let vminitd = agent as? Vminitd else { |
| 1185 | + throw ContainerizationError(.unsupported, message: "copyOut requires Vminitd agent") |
| 1186 | + } |
| 1187 | + try await vminitd.copy( |
| 1188 | + direction: .copyOut, |
| 1189 | + guestPath: guestPath, |
| 1190 | + vsockPort: port, |
| 1191 | + onMetadata: { meta in |
| 1192 | + metadataCont.yield(meta) |
| 1193 | + metadataCont.finish() |
| 1194 | + } |
| 1195 | + ) |
| 1196 | + } |
| 1197 | + } |
| 1198 | + |
| 1199 | + group.addTask { |
| 1200 | + guard let metadata = await metadataStream.first(where: { _ in true }) else { |
| 1201 | + throw ContainerizationError(.internalError, message: "copyOut: no metadata received") |
| 1202 | + } |
| 1203 | + |
| 1204 | + guard let conn = await listener.first(where: { _ in true }) else { |
| 1205 | + throw ContainerizationError(.internalError, message: "copyOut: vsock connection not established") |
| 1206 | + } |
| 1207 | + try listener.finish() |
| 1208 | + |
| 1209 | + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in |
| 1210 | + self.copyQueue.async { |
| 1211 | + do { |
| 1212 | + defer { conn.closeFile() } |
| 1213 | + |
| 1214 | + if metadata.isArchive { |
| 1215 | + try FileManager.default.createDirectory(at: destination, withIntermediateDirectories: true) |
| 1216 | + let fh = FileHandle(fileDescriptor: dup(conn.fileDescriptor), closeOnDealloc: true) |
| 1217 | + let reader = try ArchiveReader(format: .pax, filter: .gzip, fileHandle: fh) |
| 1218 | + _ = try reader.extractContents(to: destination) |
| 1219 | + } else { |
| 1220 | + let destFd = open(destination.path, O_WRONLY | O_CREAT | O_TRUNC, 0o644) |
| 1221 | + guard destFd != -1 else { |
| 1222 | + throw ContainerizationError( |
| 1223 | + .internalError, |
| 1224 | + message: "copyOut: failed to open '\(destination.path)': \(String(cString: strerror(errno)))" |
| 1225 | + ) |
| 1226 | + } |
| 1227 | + defer { close(destFd) } |
| 1228 | + |
| 1229 | + var buf = [UInt8](repeating: 0, count: chunkSize) |
| 1230 | + while true { |
| 1231 | + let n = read(conn.fileDescriptor, &buf, buf.count) |
| 1232 | + if n == 0 { break } |
| 1233 | + guard n > 0 else { |
| 1234 | + throw ContainerizationError( |
| 1235 | + .internalError, |
| 1236 | + message: "copyOut: vsock read error: \(String(cString: strerror(errno)))" |
| 1237 | + ) |
| 1238 | + } |
| 1239 | + var written = 0 |
| 1240 | + while written < n { |
| 1241 | + let w = buf.withUnsafeBytes { ptr in |
| 1242 | + write(destFd, ptr.baseAddress! + written, n - written) |
| 1243 | + } |
| 1244 | + guard w > 0 else { |
| 1245 | + throw ContainerizationError( |
| 1246 | + .internalError, |
| 1247 | + message: "copyOut: write error: \(String(cString: strerror(errno)))" |
| 1248 | + ) |
| 1249 | + } |
| 1250 | + written += w |
| 1251 | + } |
| 1252 | + } |
| 1253 | + } |
| 1254 | + continuation.resume() |
| 1255 | + } catch { |
| 1256 | + continuation.resume(throwing: error) |
| 1257 | + } |
| 1258 | + } |
| 1259 | + } |
| 1260 | + } |
| 1261 | + |
| 1262 | + try await group.waitForAll() |
1092 | 1263 | } |
1093 | 1264 | } |
1094 | 1265 | } |
|
0 commit comments