-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathruntime_environment.py
More file actions
382 lines (319 loc) · 15.2 KB
/
runtime_environment.py
File metadata and controls
382 lines (319 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
import abc
import asyncio
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any, ContextManager, Protocol, runtime_checkable
from common.utils import tempfiles
from hackbot.utils.docker.docker_runtime import DockerWrapper
from hackbot.utils.git.clone import clone_repository
from hackbot.utils.misc import simple_cache
from common.utils.logging_utils import log
@dataclass
class ExecutionResults:
"""Unified results from command execution across different environments."""
stdout: str
stderr: str
exit_code: int
cmd_full: str # noqa
@runtime_checkable
class RuntimeEnvironment(Protocol):
"""Protocol defining the interface for runtime environments."""
@abc.abstractmethod
async def __aenter__(self, with_dir: Path) -> "RuntimeEnvironment":
"""Enter/start the runtime context.
Starts with a copy(!) of with_dir in the runtime environment."""
...
@abc.abstractmethod
async def __aexit__(
self,
_exc_type: type[BaseException] | None,
_exc_val: BaseException | None,
_exc_tb: Any,
) -> None:
"""Exit/stop the runtime context."""
...
@abc.abstractmethod
async def execute_command(
self,
cmd: list[str],
cwd: Path | None = None,
remove_strings: list[tuple[str, str]] | None = None,
) -> ExecutionResults:
"""Execute a command in the runtime environment and return the results.
Args:
cmd: The command to execute, use list[str] for proper escaping of commands
cwd: The working directory to execute the command in
remove_strings: Strings to replace in the output (say, temporary dir names that shouldn't matter for caching)
"""
...
@abc.abstractmethod
async def write_file(self, file_path: Path, content: str) -> None:
"""Write content to a file in the runtime environment.
Path is relative to environment root."""
...
@abc.abstractmethod
async def read_file(self, file_path: Path) -> str:
"""Read content from a file in the runtime environment.
Path is relative to environment root."""
...
@abc.abstractmethod
async def extract_dir(self, runtime_dir: Path, local_dir: Path) -> None:
"""Copy directory from runtime environment to a target directory (on real disk)."""
...
@abc.abstractmethod
async def find_files(
self, pattern: str, exclude_patterns: list[str] | None = None
) -> list[Path]:
"""Find files matching a pattern in the runtime environment.
Returns paths relative to environment root."""
...
@abc.abstractmethod
async def path_exists(self, path: Path) -> bool:
"""Check if a path exists in the runtime environment."""
...
@abc.abstractmethod
async def create_dir(self, path: Path) -> None:
"""Create a directory in the runtime environment."""
...
class DockerEnvironment(RuntimeEnvironment):
"""Docker implementation of RuntimeEnvironment."""
def __init__(self):
self.docker_instance: DockerWrapper | None = None
self.dir_ctx: ContextManager[Path] | None = None
self.dir: Path | None = None
def __enter__(self, with_dir: Path) -> "DockerEnvironment":
"""Enter/start the runtime context.
Starts with a copy(!) of with_dir in the runtime environment."""
self.dir_ctx = tempfiles.TemporaryDirectory(prefix="hb_docker_env_")
self.dir = self.dir_ctx.__enter__()
self.docker_instance = DockerWrapper()
self.docker_instance.__enter__()
# Copy the repo into the container
assert self.docker_instance.container is not None, "Container not initialized"
log.info(f"Copying repo into container {self.docker_instance.container.id}: {with_dir}")
# NB we replace the workspace dir with the repo dir
self.docker_instance.put_dir(with_dir, Path("/workspace/"))
log.info(f"Copied repo into container {self.docker_instance.container.id}: {with_dir}")
return self
async def __aenter__(self, with_dir: Path) -> "DockerEnvironment":
# The underlying enter is blocking, so we move it to a thread
return await asyncio.to_thread(self.__enter__, with_dir)
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
assert self.docker_instance is not None, "Docker instance not initialized"
assert self.dir_ctx is not None, "Backing dir not initialized"
self.docker_instance._cleanup() # type: ignore
self.dir_ctx.__exit__(exc_type, exc_val, exc_tb)
self.dir = None
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
# The underlying exit is blocking, so we move it to a thread
return await asyncio.to_thread(self.__exit__, exc_type, exc_val, exc_tb)
def sync_execute_command(
self,
cmd: list[str],
cwd: Path | None = None,
remove_strings: list[tuple[str, str]] | None = None,
) -> ExecutionResults:
assert self.docker_instance is not None, "Docker instance not initialized"
# Handle cwd by cd-ing into it
if cwd:
cmd = ["cd", str(cwd), "&&"] + cmd
# Note that it's the docker container from hackrepo_dockerfile that we're in. The sourcing has nothing to do with the materialize_build_vars.sh script.
full_cmd = [
"bash",
"-c",
"source /root/.env && cd /workspace && " + " ".join(cmd),
]
res = self.docker_instance.run_command(full_cmd)
remove_strings = remove_strings or []
for old_string, new_string in remove_strings:
res.stdout_logs = res.stdout_logs.replace(old_string, new_string)
res.stderr_logs = res.stderr_logs.replace(old_string, new_string)
return ExecutionResults(
stdout=res.stdout_logs,
stderr=res.stderr_logs,
exit_code=res.exit_code,
cmd_full=" ".join(cmd),
)
async def execute_command(
self,
cmd: list[str],
cwd: Path | None = None,
remove_strings: list[tuple[str, str]] | None = None,
) -> ExecutionResults:
# The underlying execute command is blocking, so we move it to a thread
return await asyncio.to_thread(self.sync_execute_command, cmd, cwd, remove_strings)
def sync_write_file(self, file_path: Path, content: str) -> None:
assert self.docker_instance is not None, "Docker instance not initialized"
return self.docker_instance.put_text_file(content, str(Path("/workspace") / file_path))
async def write_file(self, file_path: Path, content: str) -> None:
# The underlying write file is blocking, so we move it to a thread
return await asyncio.to_thread(self.sync_write_file, file_path, content)
def sync_read_file(self, file_path: Path) -> str:
assert self.docker_instance is not None, "Docker instance not initialized"
return self.docker_instance.get_text_file(str(Path("/workspace") / file_path))
async def read_file(self, file_path: Path) -> str:
# The underlying read file is blocking, so we move it to a thread
return await asyncio.to_thread(self.sync_read_file, file_path)
async def find_files(
self, pattern: str, exclude_patterns: list[str] | None = None
) -> list[Path]:
assert self.docker_instance is not None, "Docker instance not initialized"
# Use find command to find files. Note we start from the workspace root
exclude_cmd = ""
if exclude_patterns:
exclude_cmd = " | " + " | ".join(
[f"grep -v '{pattern}'" for pattern in exclude_patterns]
)
find_cmd = f"find . -type f -name '{pattern}'{exclude_cmd}"
result = await self.execute_command([find_cmd])
# Convert paths to relative paths
files: list[Path] = []
for file_path in result.stdout.strip().split("\n"):
if file_path:
files.append(Path(file_path))
return files
async def path_exists(self, path: Path) -> bool:
assert self.docker_instance is not None, "Docker instance not initialized"
# The 'test -e' command checks if a file or directory exists in the filesystem
# It returns exit code 0 if the path exists, non-zero if it doesn't
result = await self.execute_command(["test", "-e", str(Path("/workspace") / path)])
return result.exit_code == 0
def sync_extract_dir(self, runtime_dir: Path, local_dir: Path) -> None:
assert self.docker_instance is not None, "Docker instance not initialized"
# Uses the docker tar extraction from container underneath
self.docker_instance.get_dir(Path("/workspace") / runtime_dir, local_dir)
async def extract_dir(self, runtime_dir: Path, local_dir: Path) -> None:
# The underlying extract dir is blocking, so we move it to a thread
return await asyncio.to_thread(self.sync_extract_dir, runtime_dir, local_dir)
async def create_dir(self, path: Path) -> None:
"""Create a directory in the Docker environment."""
assert self.docker_instance is not None, "Container not running"
# Use mkdir -p to create parent directories as needed and ignore errors if it already exists
full_path_in_container = Path("/workspace") / path
result = await self.execute_command(["mkdir", "-p", str(full_path_in_container)])
if result.exit_code != 0:
log.error(
f"Failed to create directory '{full_path_in_container}' in container: {result.stderr}"
)
else:
log.debug(f"Ensured directory '{full_path_in_container}' exists in container")
class LocalEnvironment(RuntimeEnvironment):
"""Local implementation of RuntimeEnvironment."""
def __init__(self):
self.dir_ctx: ContextManager[Path] | None = None
self.dir: Path | None = None
def __enter__(self, with_dir: Path) -> "LocalEnvironment":
self.dir_ctx = tempfiles.TemporaryDirectory(prefix="hb_local_env_")
self.dir = self.dir_ctx.__enter__()
clone_repository(with_dir, target_dir=self.dir)
return self
async def __aenter__(self, with_dir: Path) -> "LocalEnvironment":
# The underlying enter is blocking, so we move it to a thread
return await asyncio.to_thread(self.__enter__, with_dir)
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
assert self.dir_ctx is not None, "Backing dir not initialized"
self.dir_ctx.__exit__(exc_type, exc_val, exc_tb)
self.dir = None
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
# The underlying exit is blocking, so we move it to a thread
return await asyncio.to_thread(self.__exit__, exc_type, exc_val, exc_tb)
def sync_execute_command(
self,
cmd: list[str],
cwd: Path | None = None,
remove_strings: list[tuple[str, str]] | None = None,
) -> ExecutionResults:
"""Execute a command on the local filesystem."""
assert self.dir is not None, "Local environment not initialized"
working_dir = self.dir
if cwd:
working_dir = self.dir / cwd
assert isinstance(cmd, list), "Command must be a list"
assert len(cmd) > 0, "Command must not be empty"
assert isinstance(cmd[0], str), "Command must start with a string"
result = subprocess.run(cmd, shell=False, cwd=working_dir, capture_output=True, text=True)
# Replace strings in the output if requested
remove_strings = remove_strings or [(self.dir.as_posix(), ".")]
for old_string, new_string in remove_strings:
result.stdout = result.stdout.replace(old_string, new_string)
result.stderr = result.stderr.replace(old_string, new_string)
return ExecutionResults(
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.returncode,
cmd_full=" ".join(cmd),
)
async def execute_command(
self,
cmd: list[str],
cwd: Path | None = None,
remove_strings: list[tuple[str, str]] | None = None,
) -> ExecutionResults:
# The underlying execute command is blocking, so we move it to a thread
return await asyncio.to_thread(self.sync_execute_command, cmd, cwd, remove_strings)
async def write_file(self, file_path: Path, content: str) -> None:
"""Write content to a file on the local filesystem."""
assert self.dir is not None, "Local environment not initialized"
full_path = self.dir / file_path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(content)
async def read_file(self, file_path: Path) -> str:
"""Read content from a file on the local filesystem."""
assert self.dir is not None, "Local environment not initialized"
return (self.dir / file_path).read_text()
async def find_files(
self, pattern: str, exclude_patterns: list[str] | None = None
) -> list[Path]:
"""Find files matching a pattern on the local filesystem."""
assert self.dir is not None, "Local environment not initialized"
files = list(self.dir.rglob(pattern))
if exclude_patterns:
for exclude in exclude_patterns:
files = [f for f in files if exclude not in str(f) and f.is_file()]
# Convert to relative paths
return [f.relative_to(self.dir) for f in files]
async def path_exists(self, path: Path) -> bool:
assert self.dir is not None, "Local environment not initialized"
return (self.dir / path).exists()
def sync_extract_dir(self, runtime_dir: Path, local_dir: Path) -> None:
"""Extract a directory from the runtime environment to the local filesystem."""
assert self.dir is not None, "Local environment not initialized"
assert isinstance(runtime_dir, Path), "Runtime directory must be a Path"
assert isinstance(local_dir, Path), "Local directory must be a Path"
# Make dest (local_dir), but ensure it's empty first
if not local_dir.exists():
local_dir.mkdir(parents=True)
else:
for _file in local_dir.iterdir():
assert False, "Local directory must be empty"
simple_cache.copytree(self.dir / runtime_dir, local_dir, dirs_exist_ok=True)
async def extract_dir(self, runtime_dir: Path, local_dir: Path) -> None:
# The underlying extract dir is blocking, so we move it to a thread
return await asyncio.to_thread(self.sync_extract_dir, runtime_dir, local_dir)
async def create_dir(self, path: Path) -> None:
"""Create a directory in the local environment."""
assert self.dir is not None, "Local environment not initialized"
full_path = self.dir / path
full_path.mkdir(parents=True, exist_ok=True)
log.debug(f"Created local directory: {full_path}")