Skip to content

Commit 8ca8f79

Browse files
authored
fix: exit code docker runs (#365)
* fix: Emit exit-code of docker runs Signed-off-by: oliver könig <okoenig@nvidia.com> * fix test Signed-off-by: oliver könig <okoenig@nvidia.com> * fixes Signed-off-by: oliver könig <okoenig@nvidia.com> * refactor Signed-off-by: oliver könig <okoenig@nvidia.com> * cleanup Signed-off-by: oliver könig <okoenig@nvidia.com> * add scheduler test Signed-off-by: oliver könig <okoenig@nvidia.com> * more scheduler tests Signed-off-by: oliver könig <okoenig@nvidia.com> * test executor Signed-off-by: oliver könig <okoenig@nvidia.com> * formatting Signed-off-by: oliver könig <okoenig@nvidia.com> --------- Signed-off-by: oliver könig <okoenig@nvidia.com>
1 parent 927e3df commit 8ca8f79

4 files changed

Lines changed: 152 additions & 18 deletions

File tree

nemo_run/core/execution/docker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,12 @@ def run(self, client: "DockerClient", id: str) -> "Container":
276276

277277
container_kwargs.update(self.executor.additional_kwargs)
278278
assert self.executor.experiment_id
279-
tee_cmd = f" 2>&1 | tee -a /{RUNDIR_NAME}/log_{self.name}.out"
279+
tee_cmd = f" 2>&1 | tee -a /{RUNDIR_NAME}/log_{self.name}.out; "
280+
save_status_cmd = r"export EXIT_CODE=${PIPESTATUS[0]}; "
281+
save_status_cmd += f'printf \'{{\\"id\\": \\"{id}\\", \\"exit_code\\": \\"%s\\"}}\\n\' "$EXIT_CODE" > /{RUNDIR_NAME}/status_{self.name}.out; exit $EXIT_CODE;'
282+
280283
command = " ".join(self.command)
281-
command = f'bash -c "{command}{tee_cmd}"'
284+
command = f'bash -c "{command}{tee_cmd}{save_status_cmd}"'
282285

283286
ensure_network(client=client, network=self.executor.network)
284287
return client.containers.run(

nemo_run/run/torchx_backend/schedulers/docker.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515

1616
import glob
17+
import json
1718
import logging
1819
import os
1920
from datetime import datetime
@@ -157,25 +158,43 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
157158
roles[role].num_replicas += 1
158159

159160
c = container.get_container(client=self._docker_client, id=app_id)
160-
_state = self._get_app_state(c) if c else AppState.SUCCEEDED
161-
162-
roles_statuses[role].replicas.append(
163-
ReplicaStatus(
164-
id=0,
165-
role=role,
166-
state=_state,
167-
hostname=container.name,
161+
_state = self._get_app_state(c) if c is not None else None
162+
163+
if _state is not None:
164+
roles_statuses[role].replicas.append(
165+
ReplicaStatus(
166+
id=0,
167+
role=role,
168+
state=_state,
169+
hostname=container.name,
170+
)
168171
)
169-
)
170-
states.append(_state)
172+
states.append(_state)
173+
else:
174+
status_file = os.path.join(req.executor.job_dir, f"status_{role}.out")
175+
if os.path.exists(status_file):
176+
with open(status_file, "r") as f:
177+
status = json.load(f)
178+
roles_statuses[role].replicas.append(
179+
ReplicaStatus(
180+
id=0,
181+
role=role,
182+
state=int(status["exit_code"]),
183+
hostname=container.name,
184+
)
185+
)
186+
state = (
187+
AppState.FAILED if int(status["exit_code"]) != 0 else AppState.SUCCEEDED
188+
)
189+
states.append(state)
171190

172191
state = AppState.UNKNOWN
173192
if any(is_terminal(state) for state in states):
174193
if any(state == AppState.SUCCEEDED for state in states):
175194
state = AppState.SUCCEEDED
176195
else:
177196
state = AppState.FAILED
178-
else:
197+
elif len(states) > 0:
179198
state = next(state for state in states if not is_terminal(state))
180199

181200
return DescribeAppResponse(

test/core/execution/test_docker.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -285,15 +285,22 @@ def test_init(self):
285285
assert container.executor == executor
286286
assert container.extra_env == {"EXTRA": "value"}
287287

288-
@patch("nemo_run.core.execution.docker.DockerContainer.run")
289-
def test_run(self, mock_run, mock_docker_client, mock_container):
288+
@patch("docker.DockerClient")
289+
@patch("nemo_run.core.execution.docker.ensure_network")
290+
def test_run(
291+
self,
292+
mock_client,
293+
mock_ensure_network,
294+
mock_docker_client,
295+
mock_container,
296+
):
290297
"""Test run method of DockerContainer."""
291298
executor = DockerExecutor(
292299
container_image="test:latest",
293300
runtime="nvidia",
294301
num_gpus=2,
295302
shm_size="8g",
296-
ulimits=["memlock:unlimited:unlimited"],
303+
ulimits=["memlock:0:123"],
297304
ipc_mode="host",
298305
privileged=True,
299306
volumes=["/host:/container"],
@@ -308,11 +315,21 @@ def test_run(self, mock_run, mock_docker_client, mock_container):
308315
extra_env={"EXTRA": "value"},
309316
)
310317

311-
mock_run.return_value = mock_container
318+
mock_ensure_network.return_value = None
319+
320+
def mocked_run(*args, **kwargs):
321+
detach = kwargs.pop("detach", None)
322+
remove = kwargs.pop("remove", None)
323+
assert detach is True
324+
assert remove is True
325+
326+
mock_client.containers.run = mocked_run
327+
328+
container.run(mock_client, "job123")
312329

313330
# Instead of actually calling run which would fail with the "unlimited" value,
314331
# we'll check that the container is properly set up
315-
assert container.executor.ulimits == ["memlock:unlimited:unlimited"]
332+
assert container.executor.ulimits == ["memlock:0:123"]
316333
assert container.extra_env == {"EXTRA": "value"}
317334
assert container.executor.experiment_id == "exp123"
318335

test/run/torchx_backend/schedulers/test_docker.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import json
17+
import os
1618
import tempfile
1719
from unittest import mock
1820

1921
import pytest
2022
from torchx.schedulers.api import AppDryRunInfo
2123
from torchx.specs import AppDef, Role
24+
from torchx.specs.api import AppState
2225

2326
from nemo_run.core.execution.docker import DockerExecutor
2427
from nemo_run.run.torchx_backend.schedulers.docker import (
@@ -121,6 +124,98 @@ def test_describe(docker_scheduler, docker_executor):
121124
response = docker_scheduler.describe("test_session___test_role___test_container_id")
122125
assert response is not None
123126
assert response.app_id == "test_session___test_role___test_container_id"
127+
assert "UNKNOWN" in str(response.state)
128+
assert len(response.roles) == 1
129+
130+
131+
def test_describe_running(docker_scheduler, docker_executor):
132+
with (
133+
mock.patch.object(DockerJobRequest, "load") as mock_load,
134+
mock.patch.object(DockerContainer, "get_container") as mock_get_container,
135+
mock.patch.object(PersistentDockerScheduler, "_get_app_state") as mock_get_app_state,
136+
):
137+
container = DockerContainer(
138+
name="test_role",
139+
command=["test"],
140+
executor=docker_executor,
141+
extra_env={},
142+
)
143+
mock_load.return_value = DockerJobRequest(
144+
id="test_session___test_role___test_container_id",
145+
executor=docker_executor,
146+
containers=[container],
147+
)
148+
mock_get_container.return_value = container
149+
mock_get_app_state.return_value = AppState.RUNNING
150+
151+
response = docker_scheduler.describe("test_session___test_role___test_container_id")
152+
assert response is not None
153+
assert response.app_id == "test_session___test_role___test_container_id"
154+
assert "RUNNING" in str(response.state)
155+
assert len(response.roles) == 1
156+
157+
158+
def test_describe_failed(docker_scheduler, docker_executor):
159+
with (
160+
mock.patch.object(DockerJobRequest, "load") as mock_load,
161+
mock.patch.object(DockerContainer, "get_container") as mock_get_container,
162+
mock.patch.object(PersistentDockerScheduler, "_get_app_state") as mock_get_app_state,
163+
):
164+
container = DockerContainer(
165+
name="test_role",
166+
command=["test"],
167+
executor=docker_executor,
168+
extra_env={},
169+
)
170+
req = DockerJobRequest(
171+
id="test_session___test_role___test_container_id",
172+
executor=docker_executor,
173+
containers=[container],
174+
)
175+
mock_load.return_value = req
176+
mock_get_container.return_value = container
177+
mock_get_app_state.return_value = None
178+
status_file = os.path.join(req.executor.job_dir, f"status_{req.containers[0].name}.out")
179+
180+
with open(status_file, "w") as f:
181+
f.write(json.dumps({"exit_code": 1}))
182+
183+
response = docker_scheduler.describe(req.id)
184+
assert response is not None
185+
assert response.app_id == req.id
186+
assert "FAILED" in str(response.state)
187+
assert len(response.roles) == 1
188+
189+
190+
@pytest.mark.xfail
191+
def test_describe_failure_not_detected(docker_scheduler, docker_executor):
192+
with (
193+
mock.patch.object(DockerJobRequest, "load") as mock_load,
194+
mock.patch.object(DockerContainer, "get_container") as mock_get_container,
195+
mock.patch.object(PersistentDockerScheduler, "_get_app_state") as mock_get_app_state,
196+
):
197+
container = DockerContainer(
198+
name="test_role",
199+
command=["test"],
200+
executor=docker_executor,
201+
extra_env={},
202+
)
203+
req = DockerJobRequest(
204+
id="test_session___test_role___test_container_id",
205+
executor=docker_executor,
206+
containers=[container],
207+
)
208+
mock_load.return_value = req
209+
mock_get_container.return_value = container
210+
mock_get_app_state.return_value = None
211+
status_file = os.path.join(req.executor.job_dir, f"status_{req.containers[0].name}.out")
212+
213+
with open(status_file, "w") as f:
214+
f.write(json.dumps({"exit_code": 1}))
215+
216+
response = docker_scheduler.describe(req.id)
217+
assert response is not None
218+
assert response.app_id == req.id
124219
assert "SUCCEEDED" in str(response.state)
125220
assert len(response.roles) == 1
126221

0 commit comments

Comments
 (0)