From 12708e79acdbfe6070ae814146fb0c5edafffcde Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Fri, 29 May 2026 20:59:14 -0700 Subject: [PATCH 1/3] cli: discoverability & ergonomics improvements Proactive developer-experience improvements found during routine review of the CLI surface. Each change is small and self-contained. - Global projection flags: add --fields, --sort-by, --reverse on the root group, mirroring the existing --filter module-level mechanism in output.py so they flow into every command's render path. - api-key list --name: filter the key-hash-keyed result down to the single matching key while preserving the raw object shape. - start-session: accept the hive://ai_agent/ URI form (the form the D&R 'start ai agent' action uses) in addition to a bare key. - hive validate: emit an explicit positive verdict ("Record is valid." to stderr; {"valid": true} for json/yaml when the API is silent), keeping stdout machine-stable. - hive set: add --tag-add/--tag-rm (additive), --comment, --expiry; metadata-only update when no data is supplied, overrides otherwise. - secret set --value and a 'tag' subcommand (add/rm/set) on hive shortcut groups; --value documents the shell-history exposure. - dr set --detect/--respond/--tag: assemble a rule from component files (mutually exclusive with --input-file). - cloud-adapter/external-adapter list-types: derive supported adapter types from the cloud_sensor schema with a curated fallback; fix stale "...and others" prose that omitted threatlocker. - hive schema: default to a flat field table (resolving $ref/$defs); raw JSON-Schema still available via --output json. - event types: note that an empty result on a fresh org is expected (the schema is observed, not declared). Co-Authored-By: Claude Opus 4.8 (1M context) --- limacharlie/ai_help.py | 2 + limacharlie/cli.py | 18 +- limacharlie/commands/_adapter_types.py | 158 +++++++ limacharlie/commands/_hive_shortcut.py | 138 ++++-- limacharlie/commands/adapter.py | 7 +- limacharlie/commands/ai.py | 6 +- limacharlie/commands/api_key.py | 22 +- limacharlie/commands/cloud_sensor.py | 6 +- limacharlie/commands/dr.py | 88 +++- limacharlie/commands/event.py | 11 + limacharlie/commands/hive.py | 229 ++++++++- limacharlie/output.py | 32 +- limacharlie/sdk/ai.py | 13 +- tests/unit/test_cli_ergonomics.py | 434 ++++++++++++++++++ .../unit/test_cli_lazy_loading_regression.py | 22 +- tests/unit/test_dataclasses.py | 2 +- tests/unit/test_sdk_ai_sessions.py | 31 ++ 17 files changed, 1130 insertions(+), 89 deletions(-) create mode 100644 limacharlie/commands/_adapter_types.py create mode 100644 tests/unit/test_cli_ergonomics.py diff --git a/limacharlie/ai_help.py b/limacharlie/ai_help.py index 4a75ea5f..c9673772 100644 --- a/limacharlie/ai_help.py +++ b/limacharlie/ai_help.py @@ -140,6 +140,8 @@ def _top_level_help(cli: click.Group) -> str: lines.append("--wide Don't truncate table columns") lines.append("--filter JMESPATH Filter/transform output") lines.append("--fields f1,f2 Select specific output fields") + lines.append("--sort-by FIELD Sort list output by a field") + lines.append("--reverse Reverse sorted order (with --sort-by)") lines.append("--quiet Suppress non-data output") lines.append("--debug Print HTTP request details") lines.append("--env NAME Use a named environment from config") diff --git a/limacharlie/cli.py b/limacharlie/cli.py index 21f123cb..d7135d13 100644 --- a/limacharlie/cli.py +++ b/limacharlie/cli.py @@ -53,6 +53,9 @@ class LimaCharlieContext: wide: bool = False no_warnings: bool = False filter_expr: str | None = None + fields: list[str] | None = None + sort_by: str | None = None + reverse: bool = False profile: str | None = None environment: str | None = None @@ -346,11 +349,14 @@ def _find_shadowed_opts( @click.option("--wide", "-W", is_flag=True, default=False, help="Disable table value truncation (show full values).") @click.option("--no-warnings", is_flag=True, default=False, help="Suppress advisory warnings (cost notices, memory hints, checkpoint suggestions).") @click.option("--filter", "filter_expr", default=None, help="JMESPath expression to filter/transform output (e.g. 'user_perms', 'keys(@)').") +@click.option("--fields", "fields", default=None, help="Comma-separated field names to keep in output (e.g. 'sid,hostname'). Applied to list/record output.") +@click.option("--sort-by", "sort_by", default=None, help="Field name to sort list output by.") +@click.option("--reverse", "reverse", is_flag=True, default=False, help="Reverse the order of sorted list output (use with --sort-by).") @click.option("--profile", default=None, help="Named credential profile to use.") @click.option("--env", "environment", default=None, help="Named environment from config file.") @click.version_option(version=__version__, prog_name="limacharlie") @click.pass_context -def cli(ctx: click.Context, oid: str | None, output_format: str | None, debug: bool, debug_full: bool, debug_curl: bool, quiet: bool, wide: bool, no_warnings: bool, filter_expr: str | None, profile: str | None, environment: str | None) -> None: +def cli(ctx: click.Context, oid: str | None, output_format: str | None, debug: bool, debug_full: bool, debug_curl: bool, quiet: bool, wide: bool, no_warnings: bool, filter_expr: str | None, fields: str | None, sort_by: str | None, reverse: bool, profile: str | None, environment: str | None) -> None: """LimaCharlie CLI - Endpoint Detection & Response platform. Manage sensors, detection rules, hive data, and more from the command line. @@ -368,14 +374,22 @@ def cli(ctx: click.Context, oid: str | None, output_format: str | None, debug: b lc_ctx.wide = wide lc_ctx.no_warnings = no_warnings or _config_no_warnings() lc_ctx.filter_expr = filter_expr + # Parse --fields into a clean list of names (drop blanks/whitespace). + field_list = [f.strip() for f in fields.split(",") if f.strip()] if fields else None + lc_ctx.fields = field_list + lc_ctx.sort_by = sort_by + lc_ctx.reverse = reverse lc_ctx.profile = profile lc_ctx.environment = environment # Lazy import: output pulls in jmespath, tabulate, yaml, csv (~14ms). # Deferring to here avoids that cost for fast paths like --help, --version, # and --ai-help that never render command output. - from .output import set_filter_expr, set_wide_mode + from .output import set_filter_expr, set_wide_mode, set_fields, set_sort_by, set_reverse set_wide_mode(wide) set_filter_expr(filter_expr) + set_fields(field_list) + set_sort_by(sort_by) + set_reverse(reverse) # Inject --ai-help on the root cli group itself (subcommands get it lazily diff --git a/limacharlie/commands/_adapter_types.py b/limacharlie/commands/_adapter_types.py new file mode 100644 index 00000000..bfada471 --- /dev/null +++ b/limacharlie/commands/_adapter_types.py @@ -0,0 +1,158 @@ +"""Shared helpers for listing supported adapter/sensor types. + +Both the cloud-adapter and external-adapter command groups expose a +``list-types`` subcommand. The list of supported types is derived at +runtime from the ``cloud_sensor`` hive's JSON-Schema so it cannot go +stale relative to the backend; a curated fallback is used only if the +schema cannot be fetched or does not advertise the per-type sub-structs. +""" + +from __future__ import annotations + +from typing import Any + +import click + +from ..sdk.hive import Hive + +# Fields that appear at the top level of an adapter config but are NOT +# adapter type names (they are shared across every adapter type). These +# are filtered out when deriving the type list from the schema. +_NON_TYPE_FIELDS = { + "sensor_type", + "client_options", + "mapping", + "mappings", + "indexing", +} + +# Curated fallback list. IMPORTANT: this is only used when the live +# cloud_sensor schema cannot be fetched or parsed. It MUST be kept in +# sync with the backend's supported adapter types; prefer the schema- +# derived list, which cannot go stale. +_FALLBACK_ADAPTER_TYPES: dict[str, str] = { + "1password": "1Password audit events", + "azure_event_hub": "Azure Event Hub stream", + "carbon_black": "VMware Carbon Black events", + "cato": "Cato Networks events", + "crowdstrike": "CrowdStrike Falcon Data Replicator", + "duo": "Cisco Duo authentication logs", + "entraid": "Microsoft Entra ID (Azure AD) logs", + "file": "Tail a local file", + "gcs": "Google Cloud Storage objects", + "github": "GitHub audit log", + "google_workspace": "Google Workspace activity", + "guardduty": "AWS GuardDuty findings", + "imap": "IMAP mailbox ingestion", + "itglue": "IT Glue records", + "k8s_pods": "Kubernetes pod logs", + "mac_unified_logging": "macOS unified logging", + "mimecast": "Mimecast email security logs", + "ms_graph": "Microsoft Graph API", + "office365": "Microsoft Office 365 management activity", + "okta": "Okta system log", + "pubsub": "Google Cloud Pub/Sub", + "s3": "AWS S3 objects", + "sentinelone": "SentinelOne events", + "simulation": "Simulated/test events", + "slack": "Slack audit logs", + "sophos": "Sophos Central events", + "sqs": "AWS SQS queue", + "stdin": "Read events from stdin (external adapter only)", + "syslog": "Syslog over TCP/UDP", + "threatlocker": "ThreatLocker unified audit", + "webhook": "Inbound HTTP webhook", + "wel": "Windows Event Log (external adapter only)", + "wiz": "Wiz cloud security findings", +} + + +def _types_from_schema(schema: Any) -> list[str] | None: + """Derive adapter type names from a cloud_sensor JSON-Schema. + + The per-adapter config lives in a sibling sub-struct keyed by the + adapter type name (e.g. ``s3``, ``syslog``), so the type names are + the object properties (minus the shared/non-type fields). Returns + ``None`` if the schema does not expose any usable type names. + """ + if isinstance(schema, dict) and "schema" in schema and isinstance(schema["schema"], dict): + schema = schema["schema"] + if not isinstance(schema, dict): + return None + + names: set[str] = set() + props = schema.get("properties") + if isinstance(props, dict): + names.update(props.keys()) + defs = schema.get("$defs") or schema.get("definitions") + if isinstance(defs, dict): + names.update(defs.keys()) + + names -= _NON_TYPE_FIELDS + cleaned = sorted(n for n in names if n and not n.startswith("_")) + return cleaned or None + + +def adapter_types(org: Any) -> list[dict[str, str]]: + """Return the supported adapter types as name/description rows. + + Prefers the live cloud_sensor hive schema; falls back to the curated + constant when the schema is unavailable or does not advertise types. + """ + derived: list[str] | None = None + try: + schema = Hive(org, "cloud_sensor").get_schema() + derived = _types_from_schema(schema) + except Exception: + derived = None + + if derived: + return [ + {"type": name, "description": _FALLBACK_ADAPTER_TYPES.get(name, "")} + for name in derived + ] + return [ + {"type": name, "description": desc} + for name, desc in sorted(_FALLBACK_ADAPTER_TYPES.items()) + ] + + +_EXPLAIN_LIST_TYPES = """\ +List the supported adapter/sensor type names with a short description. + +The list is derived from the live cloud_sensor hive JSON-Schema when +available (so it tracks the backend), with a curated fallback otherwise. +Use a type name as the top-level sensor_type when calling 'set'. + +Note: a few types are only meaningful for external (on-prem) adapters +(e.g. stdin, wel) versus cloud adapters; consult the set --ai-help for +the per-type config shape. +""" + + +def add_list_types(group: click.Group, command_path: str) -> None: + """Attach a ``list-types`` subcommand to an adapter command group.""" + from ..cli import pass_context + from ..client import Client + from ..sdk.organization import Organization + from ..output import format_output, detect_output_format + from ..discovery import register_explain + + @group.command("list-types", help="List supported adapter/sensor types.") + @pass_context + def list_types_cmd(ctx) -> None: + client = Client( + oid=ctx.obj.oid, + environment=ctx.obj.environment, + print_debug_fn=ctx.obj.debug_fn, + debug_full_response=ctx.obj.debug_full, + debug_curl=ctx.obj.debug_curl, + debug_verbose=ctx.obj.debug_verbose, + ) + org = Organization(client) + data = adapter_types(org) + if not ctx.obj.quiet: + fmt = ctx.obj.output_format or detect_output_format() + click.echo(format_output(data, fmt)) + + register_explain(command_path, _EXPLAIN_LIST_TYPES) diff --git a/limacharlie/commands/_hive_shortcut.py b/limacharlie/commands/_hive_shortcut.py index 55a226f6..0143ba8b 100644 --- a/limacharlie/commands/_hive_shortcut.py +++ b/limacharlie/commands/_hive_shortcut.py @@ -49,7 +49,11 @@ def make_hive_group(group_name: str, hive_name: str, noun_singular: str, noun_pl explain_get = f"Get a specific {noun_singular} by its key name from the '{hive_name}' hive." explain_set = ( f"Create or update {article} {noun_singular} in the '{hive_name}' hive. " - f"Provide data via --input-file (JSON/YAML) or stdin. " + f"Provide data via --input-file (JSON/YAML) or stdin, or use --value to set " + f"the secret value directly (wrapped as {{data: {{secret: }}}}). " + f"WARNING: --value exposes the value on the command line and in shell history; " + f"stdin or --input-file is the recommended path for humans. " + f"--tag (repeatable) and --comment populate usr_mtd. " f"New hive records default to disabled — pass --enabled to create-and-enable in one shot, " f"or include usr_mtd.enabled: true in the input file." ) @@ -82,41 +86,61 @@ def get_cmd(ctx, key) -> None: @grp.command("set", help=f"Create or update {article} {noun_singular}.") @click.option("--key", required=True, help="Record key name.") @click.option("--input-file", type=click.Path(exists=True), default=None, help="JSON or YAML file with record data.") + @click.option( + "--value", default=None, + help="Set the secret value directly (wraps into {data: {secret: }}). " + "WARNING: this exposes the value on the command line and in shell history; " + "stdin or --input-file remains the recommended path for humans.", + ) + @click.option("--tag", "tags", multiple=True, help="Tag to set in usr_mtd (repeatable).") + @click.option("--comment", default=None, help="Set usr_mtd.comment on the record.") @click.option( "--enabled/--disabled", "enabled", default=None, help=f"Set usr_mtd.enabled on the {noun_singular}. Overrides any value in the input file. Records default to disabled if neither this flag nor usr_mtd.enabled is provided.", ) @pass_context - def set_cmd(ctx, key, input_file, enabled) -> None: - if input_file: - with open(input_file, "r") as f: - content = f.read() - elif not sys.stdin.isatty(): - content = sys.stdin.read() + def set_cmd(ctx, key, input_file, value, tags, comment, enabled) -> None: + if value is not None: + if input_file: + raise click.UsageError("--value is mutually exclusive with --input-file/stdin.") + # Convenience wrapper so a secret value can be set without a + # file or stdin. --value is explicit intent, so stdin is ignored + # in this mode rather than consulted. + record = HiveRecord(key, data={"secret": value}) else: - raise click.UsageError("Provide data via --input-file or pipe to stdin.") - - # Parse input as YAML first (YAML is a superset of JSON) - try: - data = yaml.safe_load(content) - except Exception: - data = json.loads(content) - - # Support the same format as 'hive set': if the input has a - # "data" key, use it as the record data and extract usr_mtd. - if isinstance(data, dict) and "data" in data: - # Build a raw dict matching the API format so HiveRecord - # picks up usr_mtd and etag correctly. - raw = { - "data": data["data"], - "usr_mtd": data.get("usr_mtd", {}), - "sys_mtd": {}, - } - if data.get("etag"): - raw["sys_mtd"]["etag"] = data["etag"] - record = HiveRecord.from_raw(key, raw) - else: - record = HiveRecord(key, data=data) + if input_file: + with open(input_file, "r") as f: + content = f.read() + elif not sys.stdin.isatty(): + content = sys.stdin.read() + else: + raise click.UsageError("Provide data via --value, --input-file, or pipe to stdin.") + + # Parse input as YAML first (YAML is a superset of JSON) + try: + data = yaml.safe_load(content) + except Exception: + data = json.loads(content) + + # Support the same format as 'hive set': if the input has a + # "data" key, use it as the record data and extract usr_mtd. + if isinstance(data, dict) and "data" in data: + # Build a raw dict matching the API format so HiveRecord + # picks up usr_mtd and etag correctly. + raw = { + "data": data["data"], + "usr_mtd": data.get("usr_mtd", {}), + "sys_mtd": {}, + } + if data.get("etag"): + raw["sys_mtd"]["etag"] = data["etag"] + record = HiveRecord.from_raw(key, raw) + else: + record = HiveRecord(key, data=data) + if tags: + record.tags = list(tags) + if comment is not None: + record.comment = comment if enabled is not None: record.enabled = enabled org = _get_org(ctx) @@ -158,6 +182,57 @@ def disable_cmd(ctx, key) -> None: result = hive.set(record) _output(ctx, result) + @grp.group("tag", help=f"Manage tags on {noun_plural}.") + def tag_group() -> None: + pass + + def _merge_tags(existing: list[str] | None, changes: tuple[str, ...], remove: bool) -> list[str]: + if remove: + to_remove = {t.lower() for t in changes} + return [t for t in (existing or []) if t.lower() not in to_remove] + seen: dict[str, str] = {} + for tag in (existing or []) + list(changes): + k = tag.lower() + if k not in seen: + seen[k] = tag + return list(seen.values()) + + @tag_group.command("set", help=f"Replace all tags on {article} {noun_singular}.") + @click.option("--key", required=True, help="Record key name.") + @click.option("--tag", "-t", "tags", multiple=True, required=True, help="Tag value (repeatable).") + @pass_context + def tag_set_cmd(ctx, key, tags) -> None: + org = _get_org(ctx) + hive = Hive(org, hive_name) + record = hive.get_metadata(key) + record.tags = list(tags) + result = hive.set(record) + _output(ctx, result) + + @tag_group.command("add", help=f"Add tags to {article} {noun_singular} (merged with existing).") + @click.option("--key", required=True, help="Record key name.") + @click.option("--tag", "-t", "tags", multiple=True, required=True, help="Tag value to add (repeatable).") + @pass_context + def tag_add_cmd(ctx, key, tags) -> None: + org = _get_org(ctx) + hive = Hive(org, hive_name) + record = hive.get_metadata(key) + record.tags = _merge_tags(record.tags, tags, remove=False) + result = hive.set(record) + _output(ctx, result) + + @tag_group.command("rm", help=f"Remove tags from {article} {noun_singular}.") + @click.option("--key", required=True, help="Record key name.") + @click.option("--tag", "-t", "tags", multiple=True, required=True, help="Tag value to remove (repeatable).") + @pass_context + def tag_rm_cmd(ctx, key, tags) -> None: + org = _get_org(ctx) + hive = Hive(org, hive_name) + record = hive.get_metadata(key) + record.tags = _merge_tags(record.tags, tags, remove=True) + result = hive.set(record) + _output(ctx, result) + # Register explain texts. register_explain(f"{group_name}.list", explain_list) register_explain(f"{group_name}.get", explain_get) @@ -165,5 +240,8 @@ def disable_cmd(ctx, key) -> None: register_explain(f"{group_name}.delete", explain_delete) register_explain(f"{group_name}.enable", f"Enable {article} {noun_singular} by key (sets usr_mtd.enabled to true).") register_explain(f"{group_name}.disable", f"Disable {article} {noun_singular} by key (sets usr_mtd.enabled to false).") + register_explain(f"{group_name}.tag.set", f"Replace all tags on {article} {noun_singular} (fetch metadata, set tags).") + register_explain(f"{group_name}.tag.add", f"Add tags to {article} {noun_singular}, merged additively with existing tags.") + register_explain(f"{group_name}.tag.rm", f"Remove tags from {article} {noun_singular}, keeping the rest.") return grp diff --git a/limacharlie/commands/adapter.py b/limacharlie/commands/adapter.py index a24093dc..6dc5c18c 100644 --- a/limacharlie/commands/adapter.py +++ b/limacharlie/commands/adapter.py @@ -3,9 +3,11 @@ from __future__ import annotations from ._hive_shortcut import make_hive_group +from ._adapter_types import add_list_types from ..discovery import register_explain group = make_hive_group("external-adapter", "external_adapter", "external adapter") +add_list_types(group, "external-adapter.list-types") # Override the generic hive explains with adapter-specific documentation. @@ -16,8 +18,9 @@ Each record contains the adapter type and its connection settings. Common adapter types: syslog, file, s3, gcs, pubsub, webhook, stdin, -office365, 1password, crowdstrike, carbon_black, duo, sophos, and many -others. +office365, 1password, crowdstrike, carbon_black, duo, sophos, +threatlocker, and more. Run 'limacharlie external-adapter list-types' +for the full, up-to-date list. Use --output json for the full config including connection details. """) diff --git a/limacharlie/commands/ai.py b/limacharlie/commands/ai.py index 2817c130..13a28878 100644 --- a/limacharlie/commands/ai.py +++ b/limacharlie/commands/ai.py @@ -271,6 +271,10 @@ def summarize_detection(ctx, detection_id) -> None: so on. Any --option flag below overrides the matching field from the template; the rest of the template is used as-is. +--definition accepts BOTH a bare record key (e.g. "my-agent") and the +"hive://ai_agent/" URI form (the form the D&R "start ai agent" +action uses); both resolve to the same ai_agent record. + This lets you reuse one ai_agent definition as a starting point and vary only the bits you need per-run (swap the prompt, cap the budget, change the model, add an env var, etc.). @@ -354,7 +358,7 @@ def _parse_env_kv(items: tuple[str, ...]) -> dict[str, str] | None: @group.command("start-session") @click.option("--definition", required=True, - help="Name of the ai_agent hive record to use as template.") + help="ai_agent hive record to use as template. Accepts a bare key (my-agent) or the hive://ai_agent/ URI form.") @click.option("--prompt", default=None, help="Replace the prompt from the definition.") @click.option("--name", default=None, help="Replace the session name.") @click.option("--idempotent-key", default=None, help="Deduplication key for the session.") diff --git a/limacharlie/commands/api_key.py b/limacharlie/commands/api_key.py index 7b4db46d..dadaf0cf 100644 --- a/limacharlie/commands/api_key.py +++ b/limacharlie/commands/api_key.py @@ -56,6 +56,13 @@ def group() -> None: List all API keys in the organization. Each key entry shows the key name, hash, creation date, and associated permissions. +With --output json the result is an OBJECT keyed by key-hash, whose +values carry the key's name and permissions (it is not a list). + +Use --name to filter the result down to the single +matching key entry; the output keeps the same key-hash-keyed shape +(an object with one entry, or an empty object if no key matches). + The actual secret key value is only returned at creation time and cannot be retrieved later. Use --output json to get the full key metadata for auditing purposes. @@ -63,11 +70,24 @@ def group() -> None: register_explain("api-key.list", _EXPLAIN_LIST) +def _key_name(entry: Any) -> str | None: + """Extract the human name from an API key entry value.""" + if isinstance(entry, dict): + # The API has used both 'name' and 'key_name' over time; accept either. + return entry.get("name") or entry.get("key_name") + return None + + @group.command("list") +@click.option("--name", "name", default=None, help="Filter to the single API key with this name (output keeps the key-hash-keyed object shape).") @pass_context -def list_keys(ctx) -> None: +def list_keys(ctx, name) -> None: org = _get_org(ctx) data = org.get_api_keys() + if name is not None and isinstance(data, dict): + # Keep the raw shape (object keyed by key-hash); just narrow it down + # to the matching entry/entries for back-compat with json consumers. + data = {h: v for h, v in data.items() if _key_name(v) == name} _output(ctx, data) diff --git a/limacharlie/commands/cloud_sensor.py b/limacharlie/commands/cloud_sensor.py index e954a588..721a9f96 100644 --- a/limacharlie/commands/cloud_sensor.py +++ b/limacharlie/commands/cloud_sensor.py @@ -3,9 +3,11 @@ from __future__ import annotations from ._hive_shortcut import make_hive_group +from ._adapter_types import add_list_types from ..discovery import register_explain group = make_hive_group("cloud-adapter", "cloud_sensor", "cloud adapter") +add_list_types(group, "cloud-adapter.list-types") # Override the generic hive explains with cloud adapter documentation. @@ -59,7 +61,9 @@ across all types. Supported cloud adapter types include: webhook, s3, gcs, pubsub, -office365, 1password, crowdstrike, duo, sophos, and others. +office365, 1password, crowdstrike, duo, sophos, threatlocker, okta, +google_workspace, sentinelone, mimecast, and more. Run +'limacharlie cloud-adapter list-types' for the full, up-to-date list. Secrets can be referenced with hive://secret/name syntax to avoid storing credentials inline. diff --git a/limacharlie/commands/dr.py b/limacharlie/commands/dr.py index cf30e612..c82d6807 100644 --- a/limacharlie/commands/dr.py +++ b/limacharlie/commands/dr.py @@ -258,10 +258,17 @@ def get(ctx, key, namespace) -> None: --enabled to create-and-enable in one shot, or set usr_mtd.enabled in the input. +Instead of a full record, you can assemble a rule from separate +component files: --detect and --respond are loaded +and combined into {data: {detect, respond}, usr_mtd: {...}}. They +must be given together and are mutually exclusive with --input-file +(stdin is ignored in this mode). --tag (repeatable) adds usr_mtd tags. + Examples: limacharlie dr set --key my-rule --input-file rule.yaml --enabled cat rule.json | limacharlie dr set --key my-rule --enabled limacharlie dr set --key my-rule --namespace managed --input-file rule.yaml + limacharlie dr set --key my-rule --detect detect.yaml --respond respond.yaml --tag prod --enabled IMPORTANT: Do not write D&R rules from scratch. Use 'limacharlie ai generate-rule --prompt ""' to generate @@ -278,6 +285,15 @@ def get(ctx, key, namespace) -> None: "--input-file", type=click.Path(exists=True), default=None, help="JSON or YAML file with rule data.", ) +@click.option( + "--detect", "detect_path", type=click.Path(exists=True), default=None, + help="Path to the detection component (JSON/YAML). Use with --respond to assemble a rule. Mutually exclusive with --input-file/stdin.", +) +@click.option( + "--respond", "respond_path", type=click.Path(exists=True), default=None, + help="Path to the response component (JSON/YAML). Use with --detect to assemble a rule. Mutually exclusive with --input-file/stdin.", +) +@click.option("--tag", "tags", multiple=True, help="Tag to set in usr_mtd (repeatable).") @click.option( "--namespace", default=None, type=_NS_CHOICES, help="Namespace (default: general).", @@ -287,33 +303,55 @@ def get(ctx, key, namespace) -> None: help="Set usr_mtd.enabled on the rule. Overrides any value in the input file. New rules default to disabled if neither this flag nor usr_mtd.enabled is provided.", ) @pass_context -def set_cmd(ctx, key, input_file, namespace, enabled) -> None: - if input_file: - with open(input_file, "r") as f: - content = f.read() - elif not sys.stdin.isatty(): - content = sys.stdin.read() +def set_cmd(ctx, key, input_file, detect_path, respond_path, tags, namespace, enabled) -> None: + using_components = detect_path is not None or respond_path is not None + + if using_components: + # --detect/--respond assemble a rule in-command and express explicit + # intent, so they cannot be combined with a full record from + # --input-file (which would be ambiguous). Stdin is ignored in this + # mode rather than consulted. + if input_file: + raise click.UsageError( + "--detect/--respond are mutually exclusive with --input-file/stdin." + ) + if detect_path is None or respond_path is None: + raise click.UsageError("--detect and --respond must be provided together.") + detection = _load_file(detect_path) + response = _load_file(respond_path) + record = HiveRecord(key, data={"detect": detection, "respond": response}) else: - raise click.UsageError("Provide data via --input-file or pipe to stdin.") + if input_file: + with open(input_file, "r") as f: + content = f.read() + elif not sys.stdin.isatty(): + content = sys.stdin.read() + else: + raise click.UsageError( + "Provide data via --input-file, stdin, or --detect/--respond." + ) - try: - data = yaml.safe_load(content) - except Exception: - data = json.loads(content) - - # Support the full hive record format (with "data" wrapper) or - # a bare rule dict with detect/respond at the top level. - if isinstance(data, dict) and "data" in data: - raw = { - "data": data["data"], - "usr_mtd": data.get("usr_mtd", {}), - "sys_mtd": {}, - } - if data.get("etag"): - raw["sys_mtd"]["etag"] = data["etag"] - record = HiveRecord.from_raw(key, raw) - else: - record = HiveRecord(key, data=data) + try: + data = yaml.safe_load(content) + except Exception: + data = json.loads(content) + + # Support the full hive record format (with "data" wrapper) or + # a bare rule dict with detect/respond at the top level. + if isinstance(data, dict) and "data" in data: + raw = { + "data": data["data"], + "usr_mtd": data.get("usr_mtd", {}), + "sys_mtd": {}, + } + if data.get("etag"): + raw["sys_mtd"]["etag"] = data["etag"] + record = HiveRecord.from_raw(key, raw) + else: + record = HiveRecord(key, data=data) + + if tags: + record.tags = list(tags) if enabled is not None: record.enabled = enabled diff --git a/limacharlie/commands/event.py b/limacharlie/commands/event.py index dfdad90a..d6bfe3f5 100644 --- a/limacharlie/commands/event.py +++ b/limacharlie/commands/event.py @@ -279,6 +279,11 @@ def timeline(ctx: click.Context, sid: str, start: int, end: int) -> None: Use 'event schema --event-type ' to see the full field list for any given event type. +The schema is OBSERVED, not declared: it is built from events the org +has actually seen. On a fresh org with no telemetry yet, an empty +result is expected — it means no events have been observed, NOT a +misconfiguration. Types appear as sensors start reporting. + Examples: limacharlie event types limacharlie event types --platform windows @@ -290,6 +295,12 @@ def timeline(ctx: click.Context, sid: str, start: int, end: int) -> None: @click.option("--platform", default=None, help="Filter by platform (e.g., windows, linux, macos).") @pass_context def types(ctx: click.Context, platform: str | None) -> None: + """List observed event types and their schemas. + + The schema is observed (built from events the org has seen), not + declared, so an empty result on a fresh org just means no events + have been observed yet — it is not a misconfiguration. + """ org = _get_org(ctx) data = org.get_schemas(platform=platform) _output(ctx, data) diff --git a/limacharlie/commands/hive.py b/limacharlie/commands/hive.py index 7a7cf1f6..03594d48 100644 --- a/limacharlie/commands/hive.py +++ b/limacharlie/commands/hive.py @@ -20,6 +20,7 @@ from ..sdk.hive import Hive, HiveRecord from ..output import format_output, detect_output_format from ..discovery import register_explain +from ._time_validation import validate_epoch_seconds # --------------------------------------------------------------------------- @@ -231,6 +232,19 @@ def get(ctx, hive_name, key) -> None: The --enabled/--disabled flag, when given, overrides any value in the input file's usr_mtd.enabled. +Metadata flags (--tag-add, --tag-rm, --comment, --expiry) can be used +to manage usr_mtd without re-supplying the record data: + + * When NO data is supplied (no --input-file and no piped stdin), a + metadata-only update is performed: the current metadata is fetched, + --tag-add/--tag-rm are applied additively (existing tags are kept), + and --comment/--expiry/--enabled overwrite their fields. + * When data IS supplied, the same flags are applied as overrides on + top of the input's usr_mtd. + +--tag-add and --tag-rm are repeatable and additive (they never clobber +the existing tag set); applying both, a removal of an added tag wins. + Data payload examples per hive type: secret: {secret: "my-api-key"} yara: {rule: "rule MyRule { ... }"} @@ -243,10 +257,30 @@ def get(ctx, hive_name, key) -> None: limacharlie hive set --hive-name lookup --key my-lookup \\ --input-file record.yaml --enabled + + # Metadata-only: add/remove tags and set a comment without touching data. + limacharlie hive set --hive-name lookup --key my-lookup \\ + --tag-add prod --tag-add reviewed --tag-rm draft --comment "ready" """ register_explain("hive.set", _EXPLAIN_SET) +def _merge_tags(existing: list[str] | None, add: tuple[str, ...], rm: tuple[str, ...]) -> list[str]: + """Additively merge tag changes onto an existing tag list. + + Existing tags are preserved; --tag-add entries are appended (case- + insensitive dedup), then --tag-rm entries are removed. A tag both + added and removed in the same call is removed (removal wins). + """ + seen: dict[str, str] = {} + for tag in (existing or []) + list(add): + key = tag.lower() + if key not in seen: + seen[key] = tag + to_remove = {t.lower() for t in rm} + return [t for k, t in seen.items() if k not in to_remove] + + @group.command("set") @click.option("--hive-name", required=True, help="Hive name.") @click.option("--key", required=True, help="Record key.") @@ -255,23 +289,54 @@ def get(ctx, hive_name, key) -> None: "--enabled/--disabled", "enabled", default=None, help="Set usr_mtd.enabled on the record. Overrides any value in the input file. New records default to disabled if neither this flag nor usr_mtd.enabled is provided.", ) +@click.option("--tag-add", "tag_add", multiple=True, help="Tag to add (repeatable, additive; keeps existing tags).") +@click.option("--tag-rm", "tag_rm", multiple=True, help="Tag to remove (repeatable, additive; keeps other existing tags).") +@click.option("--comment", default=None, help="Set usr_mtd.comment on the record.") +@click.option("--expiry", default=None, type=int, help="Set usr_mtd.expiry (Unix epoch seconds, 0 = never).") @pass_context -def set_record(ctx, hive_name, key, input_file, enabled) -> None: +def set_record(ctx, hive_name, key, input_file, enabled, tag_add, tag_rm, comment, expiry) -> None: + if expiry is not None: + validate_epoch_seconds(expiry, "expiry") + data = _load_input(input_file) - if data is None: - click.echo( - "Error: No input data provided.\n" - "Suggestion: Use --input-file or pipe data to stdin.", - err=True, - ) - ctx.exit(4) - return + has_metadata_flags = bool(tag_add or tag_rm or comment is not None or expiry is not None or enabled is not None) org = _get_org(ctx) hive = Hive(org, hive_name) - record = _record_from_input(key, data) - if enabled is not None: - record.enabled = enabled + + if data is None: + if not has_metadata_flags: + click.echo( + "Error: No input data provided.\n" + "Suggestion: Use --input-file or pipe data to stdin, " + "or pass metadata flags (--tag-add/--tag-rm/--comment/--expiry/--enabled).", + err=True, + ) + ctx.exit(4) + return + # Metadata-only update: fetch current metadata so tags/comment/expiry + # that are not being changed are preserved (the API replaces usr_mtd + # wholesale), modeled on the enable/disable commands. + record = hive.get_metadata(key) + if tag_add or tag_rm: + record.tags = _merge_tags(record.tags, tag_add, tag_rm) + if comment is not None: + record.comment = comment + if expiry is not None: + record.expiry = expiry + if enabled is not None: + record.enabled = enabled + else: + record = _record_from_input(key, data) + if tag_add or tag_rm: + record.tags = _merge_tags(record.tags, tag_add, tag_rm) + if comment is not None: + record.comment = comment + if expiry is not None: + record.expiry = expiry + if enabled is not None: + record.enabled = enabled + result = hive.set(record) if not ctx.obj.quiet: click.echo(f"Record '{key}' set in hive '{hive_name}'.") @@ -376,6 +441,11 @@ def disable(ctx, hive_name, key) -> None: Validate a record against a hive's schema without saving it. Useful for checking whether record data is well-formed before pushing changes. The data format is the same as for the 'set' command. + +On success the command exits 0 and prints "Record is valid." to stderr. +When the API returns an empty/no-content response, the structured +formats (json/yaml) emit {"valid": true} so success is machine-readable. +On failure the command exits non-zero with the validation error. """ register_explain("hive.validate", _EXPLAIN_VALIDATE) @@ -399,7 +469,16 @@ def validate(ctx, hive_name, key, input_file) -> None: org = _get_org(ctx) hive = Hive(org, hive_name) record = _record_from_input(key, data) + # A failed validation raises (and exits non-zero) before reaching here, + # so getting this far is an explicit positive verdict. result = hive.validate(record) + # Keep stdout machine-stable: the human-readable confirmation goes to + # stderr, and when the API reports nothing we synthesize {"valid": true} + # so json/yaml consumers still get a definite success signal. + if not ctx.obj.quiet: + click.echo(f"Record '{key}' is valid.", err=True) + if not result: + result = {"valid": True} _output(ctx, result) @@ -417,13 +496,128 @@ def validate(ctx, hive_name, key, input_file) -> None: format (e.g., dr-general, dr-managed, dr-service, fp, extension_config) return an error indicating no schema is available. +By default the schema is rendered as a flat field table (name, type, +required, notes) with $ref/$defs resolved, so the accepted fields are +immediately readable. Use --output json to get the raw JSON Schema +(nothing is lost). + Example: limacharlie hive schema --hive-name secret - limacharlie hive schema --hive-name lookup + limacharlie hive schema --hive-name ai_agent + limacharlie hive schema --hive-name ai_agent --output json """ register_explain("hive.schema", _EXPLAIN_SCHEMA) +def _resolve_ref(ref: str, root: dict[str, Any]) -> dict[str, Any]: + """Resolve a local JSON-Schema $ref (e.g. '#/$defs/Foo') against root.""" + if not ref.startswith("#/"): + return {} + node: Any = root + for part in ref[2:].split("/"): + if isinstance(node, dict) and part in node: + node = node[part] + else: + return {} + return node if isinstance(node, dict) else {} + + +def _schema_type(node: dict[str, Any], root: dict[str, Any]) -> str: + """Best-effort human type string for a JSON-Schema node.""" + if "$ref" in node: + ref = node["$ref"] + name = ref.rsplit("/", 1)[-1] + return name or "object" + t = node.get("type") + if isinstance(t, list): + return "|".join(str(x) for x in t) + if t == "array": + items = node.get("items") + if isinstance(items, dict): + return f"array<{_schema_type(items, root)}>" + return "array" + if t: + return str(t) + if "enum" in node: + return "enum" + if "anyOf" in node or "oneOf" in node: + opts = node.get("anyOf") or node.get("oneOf") or [] + parts = [_schema_type(o, root) for o in opts if isinstance(o, dict)] + return "|".join(p for p in parts if p) or "any" + if "properties" in node: + return "object" + return "any" + + +def _flatten_schema(node: dict[str, Any], root: dict[str, Any], required: set[str] | None = None, + prefix: str = "", seen: set[int] | None = None) -> list[dict[str, str]]: + """Flatten a JSON-Schema object into name/type/required/notes rows.""" + if seen is None: + seen = set() + if required is None: + required = set() + + # Resolve a top-level $ref before descending. + if "$ref" in node: + node = _resolve_ref(node["$ref"], root) + + if id(node) in seen: + return [] + seen = seen | {id(node)} + + rows: list[dict[str, str]] = [] + props = node.get("properties") + if not isinstance(props, dict): + return rows + req = set(node.get("required", [])) + + for name, sub in props.items(): + if not isinstance(sub, dict): + continue + field = f"{prefix}{name}" + resolved = _resolve_ref(sub["$ref"], root) if "$ref" in sub else sub + type_str = _schema_type(sub, root) + + notes_parts: list[str] = [] + if "enum" in resolved: + vals = resolved["enum"] + shown = ", ".join(str(v) for v in vals[:8]) + if len(vals) > 8: + shown += ", ..." + notes_parts.append(f"enum: {shown}") + desc = resolved.get("description") or sub.get("description") + if desc: + notes_parts.append(str(desc).strip().splitlines()[0]) + + rows.append({ + "field": field, + "type": type_str, + "required": "yes" if name in req else "", + "notes": "; ".join(notes_parts), + }) + + # Recurse into nested objects (resolved object with properties). + if isinstance(resolved.get("properties"), dict): + rows.extend(_flatten_schema(resolved, root, prefix=f"{field}.", seen=seen)) + + return rows + + +def _flatten_hive_schema(data: Any) -> list[dict[str, str]] | None: + """Turn a hive get_schema() response into flat field rows. + + Returns None when the response has no resolvable object schema (so the + caller can fall back to printing the raw structure). + """ + if not isinstance(data, dict): + return None + root = data.get("schema", data) + if not isinstance(root, dict): + return None + rows = _flatten_schema(root, root) + return rows or None + + @group.command() @click.option("--hive-name", required=True, help="Hive name (e.g., secret, lookup, yara).") @pass_context @@ -431,6 +625,15 @@ def schema(ctx, hive_name) -> None: org = _get_org(ctx) hive = Hive(org, hive_name) data = hive.get_schema() + + fmt = ctx.obj.output_format or detect_output_format() + # json/jsonl/yaml/toon/csv consumers get the raw JSON Schema untouched. + # The default human view (table) is a flattened field listing. + if fmt == "table": + rows = _flatten_hive_schema(data) + if rows is not None: + _output(ctx, rows) + return _output(ctx, data) diff --git a/limacharlie/output.py b/limacharlie/output.py index 021a4425..71bf111b 100644 --- a/limacharlie/output.py +++ b/limacharlie/output.py @@ -36,6 +36,9 @@ # Module-level flags set by the CLI before any command runs. _wide_mode: bool = False _filter_expr: str | None = None +_fields: list[str] | None = None +_sort_by: str | None = None +_reverse: bool = False @@ -51,6 +54,24 @@ def set_filter_expr(expr: str | None) -> None: _filter_expr = expr +def set_fields(fields: list[str] | None) -> None: + """Set the list of fields to project, applied to all output.""" + global _fields + _fields = fields + + +def set_sort_by(sort_by: str | None) -> None: + """Set the field name to sort list output by, applied to all output.""" + global _sort_by + _sort_by = sort_by + + +def set_reverse(reverse: bool) -> None: + """Set whether sorted list output is reversed, applied to all output.""" + global _reverse + _reverse = reverse + + def detect_output_format() -> str: """Auto-detect the output format based on whether stdout is a TTY. @@ -87,9 +108,18 @@ def format_output( if fmt is None: fmt = detect_output_format() - # Fall back to module-level filter if none passed explicitly. + # Fall back to module-level projection state if none passed explicitly. + # Mirrors the set_filter_expr/_filter_expr mechanism so the global + # --filter/--fields/--sort-by/--reverse flags flow into every command's + # output without each command needing to thread them through. if filter_expr is None: filter_expr = _filter_expr + if fields is None: + fields = _fields + if sort_by is None: + sort_by = _sort_by + if not reverse: + reverse = _reverse # Apply jmespath filter if filter_expr and data is not None: diff --git a/limacharlie/sdk/ai.py b/limacharlie/sdk/ai.py index bb795561..a5855070 100644 --- a/limacharlie/sdk/ai.py +++ b/limacharlie/sdk/ai.py @@ -97,7 +97,11 @@ def start_session(self, definition_name: str, prompt: str | None = None, resolved automatically before the request is sent. Args: - definition_name: Name of the ai_agent hive record to use as template. + definition_name: Name of the ai_agent hive record to use as + template. Accepts either a bare record key (``my-agent``) + or the ``hive://ai_agent/`` URI form used by the + D&R ``start ai agent`` action; the prefix is stripped and + both resolve to the same record. prompt: Replace the prompt from the definition. name: Replace the session name. idempotent_key: Deduplication key. @@ -131,6 +135,13 @@ def start_session(self, definition_name: str, prompt: str | None = None, """ from .hive import Hive + # Accept both a bare record key and the hive://ai_agent/ URI + # form (the form the D&R 'start ai agent' action references), so the + # same identifier works from a rule and from the CLI/SDK. + _HIVE_PREFIX = "hive://ai_agent/" + if definition_name.startswith(_HIVE_PREFIX): + definition_name = definition_name[len(_HIVE_PREFIX):] + # Fetch the ai_agent definition; treat its fields as the template # that overrides stack on top of. record = Hive(self._org, "ai_agent").get(definition_name) diff --git a/tests/unit/test_cli_ergonomics.py b/tests/unit/test_cli_ergonomics.py new file mode 100644 index 00000000..b43718b8 --- /dev/null +++ b/tests/unit/test_cli_ergonomics.py @@ -0,0 +1,434 @@ +"""Tests for CLI discoverability/ergonomics improvements. + +Covers the global projection flags (--fields/--sort-by/--reverse), the +api-key list --name filter, hive validate verdict, hive set metadata +flags, secret set --value / tag subcommand, dr set --detect/--respond, +adapter list-types, and hive schema flat rendering. +""" + +import json +from unittest.mock import patch, MagicMock + +from click.testing import CliRunner + +from limacharlie.cli import cli +from limacharlie import output as output_mod +from limacharlie.sdk.hive import HiveRecord + + +def _extract_json(text): + """Return the JSON document embedded in mixed stdout/stderr output. + + This CliRunner version merges stderr into output, so a stderr status + line may precede the JSON body; slice from the first '{' or '['. + """ + for i, ch in enumerate(text): + if ch in "{[": + return text[i:] + return text + + +# --------------------------------------------------------------------------- +# Global projection state (module-level setters mirror set_filter_expr) +# --------------------------------------------------------------------------- + +class TestProjectionState: + def teardown_method(self): + output_mod.set_fields(None) + output_mod.set_sort_by(None) + output_mod.set_reverse(False) + + def test_fields_module_level_fallback(self): + output_mod.set_fields(["a", "c"]) + out = output_mod.format_output([{"a": 1, "b": 2, "c": 3}], "json") + assert json.loads(out) == [{"a": 1, "c": 3}] + + def test_sort_by_and_reverse(self): + output_mod.set_sort_by("a") + output_mod.set_reverse(True) + data = [{"a": 1}, {"a": 3}, {"a": 2}] + out = output_mod.format_output(data, "json") + assert [r["a"] for r in json.loads(out)] == [3, 2, 1] + + def test_explicit_arg_overrides_module_level(self): + output_mod.set_fields(["a"]) + # Explicitly passing fields should win over module-level state. + out = output_mod.format_output([{"a": 1, "b": 2}], "json", fields=["b"]) + assert json.loads(out) == [{"b": 2}] + + +class TestGlobalProjectionFlags: + @patch("limacharlie.commands.dr.Client") + @patch("limacharlie.commands.dr.Organization") + @patch("limacharlie.commands.dr.Hive") + def test_fields_flag_projects_output(self, mock_hive_cls, _org, _client): + rec = MagicMock() + rec.to_dict.return_value = {"data": {"x": 1}, "usr_mtd": {}, "sys_mtd": {}} + mock_hive = MagicMock() + mock_hive.list.return_value = {"r1": rec} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + # The list output is a dict keyed by record name; --fields on a dict + # narrows the keys. Use --fields to keep only "r1". + result = runner.invoke(cli, ["--output", "json", "--fields", "r1", "dr", "list"]) + assert result.exit_code == 0, result.output + parsed = json.loads(result.output) + assert set(parsed.keys()) == {"r1"} + # Reset module-level state so later tests are unaffected. + output_mod.set_fields(None) + + +# --------------------------------------------------------------------------- +# api-key list --name +# --------------------------------------------------------------------------- + +class TestApiKeyListName: + @patch("limacharlie.commands.api_key.Client") + @patch("limacharlie.commands.api_key.Organization") + def test_name_filter_keeps_keyed_shape(self, mock_org_cls, _client): + mock_org = MagicMock() + mock_org.get_api_keys.return_value = { + "hashA": {"name": "ci-key", "perms": ["dr.list"]}, + "hashB": {"name": "readonly", "perms": ["org.get"]}, + } + mock_org_cls.return_value = mock_org + + runner = CliRunner() + result = runner.invoke(cli, ["--output", "json", "api-key", "list", "--name", "ci-key"]) + assert result.exit_code == 0, result.output + parsed = json.loads(result.output) + # Still keyed by key-hash (object), narrowed to the match. + assert parsed == {"hashA": {"name": "ci-key", "perms": ["dr.list"]}} + + @patch("limacharlie.commands.api_key.Client") + @patch("limacharlie.commands.api_key.Organization") + def test_no_match_returns_empty_object(self, mock_org_cls, _client): + mock_org = MagicMock() + mock_org.get_api_keys.return_value = {"hashA": {"name": "ci-key"}} + mock_org_cls.return_value = mock_org + + runner = CliRunner() + result = runner.invoke(cli, ["--output", "json", "api-key", "list", "--name", "nope"]) + assert result.exit_code == 0, result.output + assert json.loads(result.output) == {} + + @patch("limacharlie.commands.api_key.Client") + @patch("limacharlie.commands.api_key.Organization") + def test_without_name_unchanged(self, mock_org_cls, _client): + raw = {"hashA": {"name": "ci-key"}, "hashB": {"name": "readonly"}} + mock_org = MagicMock() + mock_org.get_api_keys.return_value = raw + mock_org_cls.return_value = mock_org + + runner = CliRunner() + result = runner.invoke(cli, ["--output", "json", "api-key", "list"]) + assert result.exit_code == 0, result.output + assert json.loads(result.output) == raw + + +# --------------------------------------------------------------------------- +# hive validate verdict +# --------------------------------------------------------------------------- + +class TestHiveValidateVerdict: + @patch("limacharlie.commands.hive.Client") + @patch("limacharlie.commands.hive.Organization") + @patch("limacharlie.commands.hive.Hive") + def test_empty_response_emits_valid_true(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.validate.return_value = {} # API said nothing -> success + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke( + cli, + ["--output", "json", "hive", "validate", "--hive-name", "lookup", "--key", "k"], + input='{"data": {"x": 1}}\n', + ) + assert result.exit_code == 0, result.output + # Human confirmation goes to stderr (mixed into output here); the + # machine-stable JSON verdict is the {"valid": true} object. + assert "is valid." in result.output + assert json.loads(_extract_json(result.output)) == {"valid": True} + + @patch("limacharlie.commands.hive.Client") + @patch("limacharlie.commands.hive.Organization") + @patch("limacharlie.commands.hive.Hive") + def test_nonempty_response_preserved(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.validate.return_value = {"detail": "ok", "extra": 1} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke( + cli, + ["--output", "json", "hive", "validate", "--hive-name", "lookup", "--key", "k"], + input='{"data": {"x": 1}}\n', + ) + assert result.exit_code == 0, result.output + assert json.loads(_extract_json(result.output)) == {"detail": "ok", "extra": 1} + + +# --------------------------------------------------------------------------- +# hive set metadata flags +# --------------------------------------------------------------------------- + +class TestHiveSetMetadataFlags: + @staticmethod + def _existing(name="k"): + return HiveRecord(name=name, data=None, enabled=True, + tags=["keep", "draft"], expiry=10, comment="old") + + @patch("limacharlie.commands.hive.Client") + @patch("limacharlie.commands.hive.Organization") + @patch("limacharlie.commands.hive.Hive") + def test_metadata_only_additive_tags(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.get_metadata.return_value = self._existing() + mock_hive.set.return_value = {"etag": "new"} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke(cli, [ + "hive", "set", "--hive-name", "lookup", "--key", "k", + "--tag-add", "prod", "--tag-rm", "draft", "--comment", "new note", + ]) + assert result.exit_code == 0, result.output + # Metadata-only path fetches current metadata, no data load. + mock_hive.get_metadata.assert_called_once_with("k") + record = mock_hive.set.call_args[0][0] + # 'keep' preserved, 'draft' removed, 'prod' added. + assert set(record.tags) == {"keep", "prod"} + assert record.comment == "new note" + + @patch("limacharlie.commands.hive.Client") + @patch("limacharlie.commands.hive.Organization") + @patch("limacharlie.commands.hive.Hive") + def test_no_data_no_flags_errors(self, mock_hive_cls, _org, _client): + mock_hive_cls.return_value = MagicMock() + runner = CliRunner() + # No stdin, no input-file, no metadata flags -> usage error exit 4. + result = runner.invoke(cli, ["hive", "set", "--hive-name", "lookup", "--key", "k"]) + assert result.exit_code == 4 + + @patch("limacharlie.commands.hive.Client") + @patch("limacharlie.commands.hive.Organization") + @patch("limacharlie.commands.hive.Hive") + def test_data_with_metadata_overrides(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.set.return_value = {"etag": "new"} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke(cli, [ + "hive", "set", "--hive-name", "lookup", "--key", "k", + "--tag-add", "prod", "--comment", "c", + ], input='{"data": {"x": 1}}\n') + assert result.exit_code == 0, result.output + # Data was supplied, so no get_metadata fetch. + mock_hive.get_metadata.assert_not_called() + record = mock_hive.set.call_args[0][0] + assert record.tags == ["prod"] + assert record.comment == "c" + assert record.data == {"x": 1} + + +# --------------------------------------------------------------------------- +# secret set --value and tag subcommand +# --------------------------------------------------------------------------- + +class TestSecretValueAndTag: + @patch("limacharlie.commands._hive_shortcut.Client") + @patch("limacharlie.commands._hive_shortcut.Organization") + @patch("limacharlie.commands._hive_shortcut.Hive") + def test_set_value_wraps_secret(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.set.return_value = {"etag": "new"} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke(cli, [ + "secret", "set", "--key", "my-secret", "--value", "s3cr3t", + "--tag", "prod", "--comment", "note", + ]) + assert result.exit_code == 0, result.output + record = mock_hive.set.call_args[0][0] + assert record.data == {"secret": "s3cr3t"} + assert record.tags == ["prod"] + assert record.comment == "note" + + @patch("limacharlie.commands._hive_shortcut.Client") + @patch("limacharlie.commands._hive_shortcut.Organization") + @patch("limacharlie.commands._hive_shortcut.Hive") + def test_tag_add_merges(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.get_metadata.return_value = HiveRecord(name="my-secret", tags=["a"]) + mock_hive.set.return_value = {"etag": "new"} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke(cli, ["secret", "tag", "add", "--key", "my-secret", "-t", "b"]) + assert result.exit_code == 0, result.output + record = mock_hive.set.call_args[0][0] + assert set(record.tags) == {"a", "b"} + + @patch("limacharlie.commands._hive_shortcut.Client") + @patch("limacharlie.commands._hive_shortcut.Organization") + @patch("limacharlie.commands._hive_shortcut.Hive") + def test_tag_rm_keeps_others(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.get_metadata.return_value = HiveRecord(name="my-secret", tags=["a", "b"]) + mock_hive.set.return_value = {"etag": "new"} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke(cli, ["secret", "tag", "rm", "--key", "my-secret", "-t", "a"]) + assert result.exit_code == 0, result.output + record = mock_hive.set.call_args[0][0] + assert record.tags == ["b"] + + +# --------------------------------------------------------------------------- +# dr set --detect/--respond/--tag +# --------------------------------------------------------------------------- + +class TestDrSetComponents: + @patch("limacharlie.commands.dr.Client") + @patch("limacharlie.commands.dr.Organization") + @patch("limacharlie.commands.dr.Hive") + def test_detect_respond_assembles_rule(self, mock_hive_cls, _org, _client, tmp_path): + detect = tmp_path / "d.yaml" + respond = tmp_path / "r.yaml" + detect.write_text("op: is\npath: event/X\nvalue: y\n") + respond.write_text("- action: report\n name: n\n") + + mock_hive = MagicMock() + mock_hive.set.return_value = {"etag": "new"} + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke(cli, [ + "dr", "set", "--key", "r", "--detect", str(detect), + "--respond", str(respond), "--tag", "prod", "--enabled", + ]) + assert result.exit_code == 0, result.output + record = mock_hive.set.call_args[0][0] + assert record.data == { + "detect": {"op": "is", "path": "event/X", "value": "y"}, + "respond": [{"action": "report", "name": "n"}], + } + assert record.tags == ["prod"] + assert record.enabled is True + + @patch("limacharlie.commands.dr.Client") + @patch("limacharlie.commands.dr.Organization") + @patch("limacharlie.commands.dr.Hive") + def test_detect_with_input_file_errors(self, mock_hive_cls, _org, _client, tmp_path): + detect = tmp_path / "d.yaml" + respond = tmp_path / "r.yaml" + infile = tmp_path / "in.yaml" + for p in (detect, respond, infile): + p.write_text("op: is\n") + mock_hive_cls.return_value = MagicMock() + + runner = CliRunner() + result = runner.invoke(cli, [ + "dr", "set", "--key", "r", "--detect", str(detect), + "--respond", str(respond), "--input-file", str(infile), + ]) + assert result.exit_code != 0 + assert "mutually exclusive" in result.output + + +# --------------------------------------------------------------------------- +# adapter list-types +# --------------------------------------------------------------------------- + +class TestAdapterListTypes: + def test_fallback_includes_threatlocker(self): + from limacharlie.commands._adapter_types import adapter_types + # Passing an org whose schema fetch raises forces the fallback path. + rows = adapter_types(None) + types = {r["type"] for r in rows} + assert "threatlocker" in types + assert "webhook" in types + + def test_derived_from_schema(self): + from limacharlie.commands._adapter_types import _types_from_schema + schema = {"schema": { + "$defs": {"S3Config": {}, "SyslogConfig": {}}, + "properties": {"s3": {}, "syslog": {}, "sensor_type": {}, "client_options": {}}, + }} + names = _types_from_schema(schema) + # Non-type shared fields are excluded. + assert "sensor_type" not in names + assert "client_options" not in names + assert "s3" in names and "syslog" in names + + @patch("limacharlie.commands._adapter_types.Hive") + @patch("limacharlie.commands._adapter_types.Client", create=True) + def test_cli_list_types(self, _client, mock_hive_cls): + # When the schema fetch fails, the command still returns the curated list. + mock_hive = MagicMock() + mock_hive.get_schema.side_effect = Exception("no schema") + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + with patch("limacharlie.client.Client"): + result = runner.invoke(cli, ["--output", "json", "cloud-adapter", "list-types"]) + assert result.exit_code == 0, result.output + parsed = json.loads(result.output) + assert any(r["type"] == "threatlocker" for r in parsed) + + +# --------------------------------------------------------------------------- +# hive schema flat rendering +# --------------------------------------------------------------------------- + +class TestHiveSchemaFlat: + _SCHEMA = {"schema": { + "$ref": "#/$defs/Rec", + "$defs": { + "Rec": { + "type": "object", + "required": ["prompt"], + "properties": { + "prompt": {"type": "string"}, + "model": {"type": "string", "enum": ["a", "b"]}, + "nested": {"$ref": "#/$defs/Sub"}, + }, + }, + "Sub": {"type": "object", "properties": {"x": {"type": "integer"}}}, + }, + }} + + @patch("limacharlie.commands.hive.Client") + @patch("limacharlie.commands.hive.Organization") + @patch("limacharlie.commands.hive.Hive") + def test_default_table_is_flat(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.get_schema.return_value = self._SCHEMA + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + # Force table (the default human view) explicitly. + result = runner.invoke(cli, ["--output", "table", "hive", "schema", "--hive-name", "ai_agent"]) + assert result.exit_code == 0, result.output + assert "prompt" in result.output + assert "nested.x" in result.output # $ref resolved + flattened + + @patch("limacharlie.commands.hive.Client") + @patch("limacharlie.commands.hive.Organization") + @patch("limacharlie.commands.hive.Hive") + def test_json_keeps_raw(self, mock_hive_cls, _org, _client): + mock_hive = MagicMock() + mock_hive.get_schema.return_value = self._SCHEMA + mock_hive_cls.return_value = mock_hive + + runner = CliRunner() + result = runner.invoke(cli, ["--output", "json", "hive", "schema", "--hive-name", "ai_agent"]) + assert result.exit_code == 0, result.output + parsed = json.loads(result.output) + # Raw JSON Schema preserved, nothing lost. + assert parsed == self._SCHEMA diff --git a/tests/unit/test_cli_lazy_loading_regression.py b/tests/unit/test_cli_lazy_loading_regression.py index 8020d077..3315d909 100644 --- a/tests/unit/test_cli_lazy_loading_regression.py +++ b/tests/unit/test_cli_lazy_loading_regression.py @@ -125,7 +125,7 @@ "ai-memory": frozenset({ "delete", "delete-record", "get", "list", "list-records", "set", }), - "ai-skill": frozenset({"delete", "disable", "enable", "get", "list", "set"}), + "ai-skill": frozenset({"delete", "disable", "enable", "get", "list", "set", "tag"}), "api-key": frozenset({"create", "delete", "list"}), "arl": frozenset({"get"}), "artifact": frozenset({"download", "list", "upload"}), @@ -142,7 +142,7 @@ "entity", "export", "get", "list", "merge", "orgs", "report", "tag", "telemetry", "update", "update-note", }), - "cloud-adapter": frozenset({"delete", "disable", "enable", "get", "list", "set"}), + "cloud-adapter": frozenset({"delete", "disable", "enable", "get", "list", "list-types", "set", "tag"}), "detection": frozenset({"get", "list"}), "download": frozenset({"adapter", "list", "sensor"}), "dr": frozenset({ @@ -160,8 +160,8 @@ "list", "list-available", "rekey", "request", "schema", "subscribe", "unsubscribe", }), - "external-adapter": frozenset({"delete", "disable", "enable", "get", "list", "set"}), - "fp": frozenset({"delete", "disable", "enable", "get", "list", "set"}), + "external-adapter": frozenset({"delete", "disable", "enable", "get", "list", "list-types", "set", "tag"}), + "fp": frozenset({"delete", "disable", "enable", "get", "list", "set", "tag"}), "group": frozenset({ "create", "delete", "get", "list", "logs", "member-add", "member-remove", "org-add", "org-remove", @@ -178,8 +178,8 @@ "ioc": frozenset({"batch-enrich", "batch-search", "enrich", "hosts", "search"}), "job": frozenset({"delete", "get", "list", "wait"}), "logging": frozenset({"create", "delete", "get", "list"}), - "lookup": frozenset({"delete", "disable", "enable", "get", "list", "set"}), - "note": frozenset({"delete", "disable", "enable", "get", "list", "set"}), + "lookup": frozenset({"delete", "disable", "enable", "get", "list", "set", "tag"}), + "note": frozenset({"delete", "disable", "enable", "get", "list", "set", "tag"}), "org": frozenset({ "check-name", "config-get", "config-set", "create", "delete", "dismiss-error", "errors", "info", "list", "mitre", "quota", @@ -187,7 +187,7 @@ }), "output": frozenset({"create", "delete", "list"}), "payload": frozenset({"delete", "download", "list", "upload"}), - "playbook": frozenset({"delete", "disable", "enable", "get", "list", "set"}), + "playbook": frozenset({"delete", "disable", "enable", "get", "list", "set", "tag"}), "replay": frozenset({"run"}), "schema": frozenset({"get", "list", "reset"}), "search": frozenset({ @@ -195,12 +195,12 @@ "saved-create", "saved-delete", "saved-get", "saved-list", "saved-run", "validate", }), - "secret": frozenset({"delete", "disable", "enable", "get", "list", "set"}), + "secret": frozenset({"delete", "disable", "enable", "get", "list", "set", "tag"}), "sensor": frozenset({ "delete", "dump", "export", "get", "list", "set-version", "sweep", "upgrade", "wait-online", }), - "sop": frozenset({"delete", "disable", "enable", "get", "list", "set"}), + "sop": frozenset({"delete", "disable", "enable", "get", "list", "set", "tag"}), "spotcheck": frozenset({"run"}), "stream": frozenset({"audit", "detections", "events", "firehose"}), "sync": frozenset({"pull", "push"}), @@ -223,8 +223,8 @@ # Global options that must be present on the top-level cli group. EXPECTED_GLOBAL_OPTIONS = frozenset({ "oid", "output_format", "debug", "debug_full", "debug_curl", - "quiet", "wide", "no_warnings", "filter_expr", "profile", - "environment", + "quiet", "wide", "no_warnings", "filter_expr", "fields", + "sort_by", "reverse", "profile", "environment", }) diff --git a/tests/unit/test_dataclasses.py b/tests/unit/test_dataclasses.py index 50e6aab1..a846bc67 100644 --- a/tests/unit/test_dataclasses.py +++ b/tests/unit/test_dataclasses.py @@ -22,7 +22,7 @@ def test_default_values(self): def test_field_names(self): names = [f.name for f in fields(LimaCharlieContext)] - assert names == ["oid", "output_format", "debug", "debug_full", "debug_curl", "quiet", "wide", "no_warnings", "filter_expr", "profile", "environment"] + assert names == ["oid", "output_format", "debug", "debug_full", "debug_curl", "quiet", "wide", "no_warnings", "filter_expr", "fields", "sort_by", "reverse", "profile", "environment"] def test_custom_values(self): ctx = LimaCharlieContext(oid="abc", output_format="json", debug=True, quiet=True) diff --git a/tests/unit/test_sdk_ai_sessions.py b/tests/unit/test_sdk_ai_sessions.py index f4cbee9e..cf0aa17d 100644 --- a/tests/unit/test_sdk_ai_sessions.py +++ b/tests/unit/test_sdk_ai_sessions.py @@ -1260,3 +1260,34 @@ def test_history_raw_keeps_everything(self): ]) assert result.exit_code == 0, result.output assert "model_set" in result.output + + +class TestStartSessionHiveUri: + """start_session accepts both a bare key and the hive://ai_agent/ + URI form (the form the D&R 'start ai agent' action references).""" + + def test_strips_hive_ai_agent_prefix(self, ai, mock_org): + defn = {"prompt": "Go", "anthropic_secret": "sk", "lc_api_key_secret": "lc"} + with patch("limacharlie.sdk.hive.Hive") as MockHive: + hive_instance = MagicMock() + MockHive.return_value = hive_instance + hive_instance.get.return_value = _make_hive_record(defn) + mock_org.client.request.return_value = {"session_id": "s", "status": "pending"} + + ai.start_session("hive://ai_agent/my-agent") + + # The URI prefix must be stripped before the hive lookup so it + # resolves to the same record as the bare key. + hive_instance.get.assert_called_with("my-agent") + + def test_bare_key_unchanged(self, ai, mock_org): + defn = {"prompt": "Go", "anthropic_secret": "sk", "lc_api_key_secret": "lc"} + with patch("limacharlie.sdk.hive.Hive") as MockHive: + hive_instance = MagicMock() + MockHive.return_value = hive_instance + hive_instance.get.return_value = _make_hive_record(defn) + mock_org.client.request.return_value = {"session_id": "s", "status": "pending"} + + ai.start_session("my-agent") + + hive_instance.get.assert_called_with("my-agent") From 5b7a6deb952319b218b02d4fe5b9104ba9b15c66 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Sat, 30 May 2026 08:04:28 -0700 Subject: [PATCH 2/3] cli: generalize hive shortcut --value via per-hive value_key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The shared hive shortcut set command exposed --value wrapping the input as {data: {secret: }} for every hive, but only the secret hive uses a single "secret" data field — the wrapper was meaningless (and wrong) for the structured-data hives (lookup, fp, playbook, note, sop, adapters, ai-skill). make_hive_group now takes an optional value_key naming the hive's single scalar data field. --value is offered only when value_key is set (the secret group declares value_key="secret") and wraps as {data: {: }}. Structured-data hives no longer advertise --value at all. Co-Authored-By: Claude Opus 4.8 (1M context) --- limacharlie/commands/_hive_shortcut.py | 55 ++++++++++++++++++-------- limacharlie/commands/secret.py | 2 +- tests/unit/test_cli_ergonomics.py | 28 +++++++++++++ 3 files changed, 67 insertions(+), 18 deletions(-) diff --git a/limacharlie/commands/_hive_shortcut.py b/limacharlie/commands/_hive_shortcut.py index 0143ba8b..1696e537 100644 --- a/limacharlie/commands/_hive_shortcut.py +++ b/limacharlie/commands/_hive_shortcut.py @@ -28,7 +28,7 @@ def _output(ctx: click.Context, data: Any) -> None: click.echo(format_output(data, fmt)) -def make_hive_group(group_name: str, hive_name: str, noun_singular: str, noun_plural: str | None = None) -> click.Group: +def make_hive_group(group_name: str, hive_name: str, noun_singular: str, noun_plural: str | None = None, value_key: str | None = None) -> click.Group: """Create a Click group for a specific hive type. Args: @@ -36,6 +36,13 @@ def make_hive_group(group_name: str, hive_name: str, noun_singular: str, noun_pl hive_name: Hive backend name (e.g., "secret"). noun_singular: Human-readable singular (e.g., "secret"). noun_plural: Human-readable plural (defaults to noun_singular + "s"). + value_key: Name of the single scalar field in the record's ``data`` + payload for hives whose value is one scalar (e.g. ``"secret"`` for + the secret hive, whose data is ``{secret: }``). When set, the + 'set' command gains a ``--value`` convenience flag that wraps the + value as ``{data: {: }}``. Leave None for hives + whose data is structured (lookup, fp, playbook, …) — they have no + single value field and do not get ``--value``. Returns: click.Group: The configured group with list, get, set, delete commands. @@ -47,12 +54,17 @@ def make_hive_group(group_name: str, hive_name: str, noun_singular: str, noun_pl explain_list = f"List all {noun_plural} stored in the '{hive_name}' hive." explain_get = f"Get a specific {noun_singular} by its key name from the '{hive_name}' hive." + value_hint = ( + f"Or use --value to set the {noun_singular} value directly " + f"(wrapped as {{data: {{{value_key}: }}}}); --value exposes the value " + f"on the command line and in shell history, so stdin or --input-file is the " + f"recommended path for humans. " + if value_key else "" + ) explain_set = ( f"Create or update {article} {noun_singular} in the '{hive_name}' hive. " - f"Provide data via --input-file (JSON/YAML) or stdin, or use --value to set " - f"the secret value directly (wrapped as {{data: {{secret: }}}}). " - f"WARNING: --value exposes the value on the command line and in shell history; " - f"stdin or --input-file is the recommended path for humans. " + f"Provide data via --input-file (JSON/YAML) or stdin. " + f"{value_hint}" f"--tag (repeatable) and --comment populate usr_mtd. " f"New hive records default to disabled — pass --enabled to create-and-enable in one shot, " f"or include usr_mtd.enabled: true in the input file." @@ -83,15 +95,23 @@ def get_cmd(ctx, key) -> None: record = hive.get(key) _output(ctx, record.to_dict()) + def _value_option(fn): + # Only hives with a single scalar value field (value_key set, e.g. + # the secret hive) expose --value; structured-data hives do not, so + # there is no misleading flag and no hardcoded data-key assumption. + if value_key is None: + return fn + return click.option( + "--value", default=None, + help=f"Set the {noun_singular} value directly (wraps into {{data: {{{value_key}: }}}}). " + "WARNING: this exposes the value on the command line and in shell history; " + "stdin or --input-file remains the recommended path for humans.", + )(fn) + @grp.command("set", help=f"Create or update {article} {noun_singular}.") @click.option("--key", required=True, help="Record key name.") @click.option("--input-file", type=click.Path(exists=True), default=None, help="JSON or YAML file with record data.") - @click.option( - "--value", default=None, - help="Set the secret value directly (wraps into {data: {secret: }}). " - "WARNING: this exposes the value on the command line and in shell history; " - "stdin or --input-file remains the recommended path for humans.", - ) + @_value_option @click.option("--tag", "tags", multiple=True, help="Tag to set in usr_mtd (repeatable).") @click.option("--comment", default=None, help="Set usr_mtd.comment on the record.") @click.option( @@ -99,14 +119,14 @@ def get_cmd(ctx, key) -> None: help=f"Set usr_mtd.enabled on the {noun_singular}. Overrides any value in the input file. Records default to disabled if neither this flag nor usr_mtd.enabled is provided.", ) @pass_context - def set_cmd(ctx, key, input_file, value, tags, comment, enabled) -> None: + def set_cmd(ctx, key, input_file, tags, comment, enabled, value=None) -> None: if value is not None: if input_file: raise click.UsageError("--value is mutually exclusive with --input-file/stdin.") - # Convenience wrapper so a secret value can be set without a - # file or stdin. --value is explicit intent, so stdin is ignored - # in this mode rather than consulted. - record = HiveRecord(key, data={"secret": value}) + # Convenience wrapper so a single-value record (value_key set) can + # be set without a file or stdin. --value is explicit intent, so + # stdin is ignored in this mode rather than consulted. + record = HiveRecord(key, data={value_key: value}) else: if input_file: with open(input_file, "r") as f: @@ -114,7 +134,8 @@ def set_cmd(ctx, key, input_file, value, tags, comment, enabled) -> None: elif not sys.stdin.isatty(): content = sys.stdin.read() else: - raise click.UsageError("Provide data via --value, --input-file, or pipe to stdin.") + hint = "--value, --input-file, or pipe to stdin" if value_key else "--input-file or pipe to stdin" + raise click.UsageError(f"Provide data via {hint}.") # Parse input as YAML first (YAML is a superset of JSON) try: diff --git a/limacharlie/commands/secret.py b/limacharlie/commands/secret.py index 19235f8f..baad22f6 100644 --- a/limacharlie/commands/secret.py +++ b/limacharlie/commands/secret.py @@ -5,7 +5,7 @@ from ._hive_shortcut import make_hive_group from ..discovery import register_explain -group = make_hive_group("secret", "secret", "secret") +group = make_hive_group("secret", "secret", "secret", value_key="secret") # Override the generic hive explains with secret-specific documentation. diff --git a/tests/unit/test_cli_ergonomics.py b/tests/unit/test_cli_ergonomics.py index b43718b8..5f0ad416 100644 --- a/tests/unit/test_cli_ergonomics.py +++ b/tests/unit/test_cli_ergonomics.py @@ -258,6 +258,34 @@ def test_set_value_wraps_secret(self, mock_hive_cls, _org, _client): assert record.tags == ["prod"] assert record.comment == "note" + def test_value_not_offered_for_structured_hive(self): + # A hive without a single scalar value field (no value_key) must NOT + # expose --value — the secret-style {data: {secret: ...}} wrapper would + # be wrong for its data shape. Click rejects the unknown option. + runner = CliRunner() + result = runner.invoke(cli, ["lookup", "set", "--key", "k", "--value", "x"]) + assert result.exit_code != 0 + assert "no such option" in result.output.lower() or "No such option" in result.output + + @patch("limacharlie.commands._hive_shortcut.Client") + @patch("limacharlie.commands._hive_shortcut.Organization") + @patch("limacharlie.commands._hive_shortcut.Hive") + def test_tag_works_for_structured_hive(self, mock_hive_cls, _org, _client, tmp_path): + # --tag/--comment are generic metadata and remain available on every + # hive shortcut, even those without --value. + mock_hive = MagicMock() + mock_hive.set.return_value = {"etag": "new"} + mock_hive_cls.return_value = mock_hive + f = tmp_path / "l.yaml" + f.write_text("data:\n a: 1\n") + runner = CliRunner() + result = runner.invoke(cli, [ + "lookup", "set", "--key", "k", "--input-file", str(f), "--tag", "prod", + ]) + assert result.exit_code == 0, result.output + record = mock_hive.set.call_args[0][0] + assert record.tags == ["prod"] + @patch("limacharlie.commands._hive_shortcut.Client") @patch("limacharlie.commands._hive_shortcut.Organization") @patch("limacharlie.commands._hive_shortcut.Hive") From e217dba173d9eb0d3909c3fe94f27696d0be2126 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Sat, 30 May 2026 08:12:43 -0700 Subject: [PATCH 3/3] cli: fix adapter list-types to enumerate real adapter types per hive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit list-types derived its list from the bare JSON-Schema $defs keys, which are helper structs (ClientOptions, AckBufferOptions, Dict, …), not adapter types — so it printed garbage. The reflected schema is a root that $refs into the record definition (CloudSensorRecord / ExternalAdapterConfig); the real type names are that record's properties (s3, office365, threatlocker, …) minus the sensor_type discriminator. - Resolve the root $ref into the record and use its properties (fall back to inline root properties); never enumerate raw $defs keys. - Parameterize by hive so cloud-adapter reads cloud_sensor and external-adapter reads external_adapter (their type sets genuinely differ). Co-Authored-By: Claude Opus 4.8 (1M context) --- limacharlie/commands/_adapter_types.py | 93 +++++++++++++++++--------- limacharlie/commands/adapter.py | 2 +- limacharlie/commands/cloud_sensor.py | 2 +- tests/unit/test_cli_ergonomics.py | 45 +++++++++++-- 4 files changed, 104 insertions(+), 38 deletions(-) diff --git a/limacharlie/commands/_adapter_types.py b/limacharlie/commands/_adapter_types.py index bfada471..a5063343 100644 --- a/limacharlie/commands/_adapter_types.py +++ b/limacharlie/commands/_adapter_types.py @@ -67,50 +67,75 @@ } -def _types_from_schema(schema: Any) -> list[str] | None: - """Derive adapter type names from a cloud_sensor JSON-Schema. +def _resolve_ref(root: dict, ref: str) -> dict | None: + """Resolve a local JSON-Schema ``$ref`` (e.g. ``#/$defs/CloudSensorRecord``).""" + if not isinstance(ref, str) or not ref.startswith("#/"): + return None + node: Any = root + for part in ref[2:].split("/"): + if not isinstance(node, dict): + return None + node = node.get(part) + return node if isinstance(node, dict) else None + + +def _describe(name: str) -> str: + """Best-effort human description for a derived type name (blank if unknown).""" + return ( + _FALLBACK_ADAPTER_TYPES.get(name) + or _FALLBACK_ADAPTER_TYPES.get(name.replace("_", "")) + or "" + ) + - The per-adapter config lives in a sibling sub-struct keyed by the - adapter type name (e.g. ``s3``, ``syslog``), so the type names are - the object properties (minus the shared/non-type fields). Returns +def _types_from_schema(schema: Any) -> list[str] | None: + """Derive adapter type names from an adapter hive JSON-Schema. + + The reflected schema is a root that ``$ref``s into ``$defs`` (e.g. + ``CloudSensorRecord`` / ``ExternalAdapterConfig``); the per-adapter config + lives in that record as a property keyed by the adapter type name + (``s3``, ``office365``, ``threatlocker``, …), alongside the ``sensor_type`` + discriminator. The type names are therefore the record's ``properties`` + minus the shared/non-type fields — NOT the bare ``$defs`` keys, which are + helper structs (``ClientOptions``, ``AckBufferOptions``, …). Returns ``None`` if the schema does not expose any usable type names. """ - if isinstance(schema, dict) and "schema" in schema and isinstance(schema["schema"], dict): + if isinstance(schema, dict) and isinstance(schema.get("schema"), dict): schema = schema["schema"] if not isinstance(schema, dict): return None - names: set[str] = set() - props = schema.get("properties") - if isinstance(props, dict): - names.update(props.keys()) - defs = schema.get("$defs") or schema.get("definitions") - if isinstance(defs, dict): - names.update(defs.keys()) + # Follow a top-level $ref into the record definition. + record = schema + ref = schema.get("$ref") + if isinstance(ref, str): + resolved = _resolve_ref(schema, ref) + if resolved is not None: + record = resolved + + props = record.get("properties") + if not isinstance(props, dict): + return None - names -= _NON_TYPE_FIELDS - cleaned = sorted(n for n in names if n and not n.startswith("_")) - return cleaned or None + names = {n for n in props if n and not n.startswith("_")} - _NON_TYPE_FIELDS + return sorted(names) or None -def adapter_types(org: Any) -> list[dict[str, str]]: - """Return the supported adapter types as name/description rows. +def adapter_types(org: Any, hive_name: str = "cloud_sensor") -> list[dict[str, str]]: + """Return the supported adapter types for a hive as name/description rows. - Prefers the live cloud_sensor hive schema; falls back to the curated - constant when the schema is unavailable or does not advertise types. + Prefers the live hive schema (so it tracks the backend); falls back to the + curated constant when the schema is unavailable or does not advertise types. """ derived: list[str] | None = None try: - schema = Hive(org, "cloud_sensor").get_schema() + schema = Hive(org, hive_name).get_schema() derived = _types_from_schema(schema) except Exception: derived = None if derived: - return [ - {"type": name, "description": _FALLBACK_ADAPTER_TYPES.get(name, "")} - for name in derived - ] + return [{"type": name, "description": _describe(name)} for name in derived] return [ {"type": name, "description": desc} for name, desc in sorted(_FALLBACK_ADAPTER_TYPES.items()) @@ -120,18 +145,24 @@ def adapter_types(org: Any) -> list[dict[str, str]]: _EXPLAIN_LIST_TYPES = """\ List the supported adapter/sensor type names with a short description. -The list is derived from the live cloud_sensor hive JSON-Schema when +The list is derived from the live adapter hive JSON-Schema when available (so it tracks the backend), with a curated fallback otherwise. Use a type name as the top-level sensor_type when calling 'set'. -Note: a few types are only meaningful for external (on-prem) adapters -(e.g. stdin, wel) versus cloud adapters; consult the set --ai-help for +The cloud-adapter and external-adapter type sets differ: cloud adapters +run in LimaCharlie's infrastructure, external (on-prem) adapters add +types like syslog, file, stdin and wel. Consult the set --ai-help for the per-type config shape. """ -def add_list_types(group: click.Group, command_path: str) -> None: - """Attach a ``list-types`` subcommand to an adapter command group.""" +def add_list_types(group: click.Group, command_path: str, hive_name: str = "cloud_sensor") -> None: + """Attach a ``list-types`` subcommand to an adapter command group. + + ``hive_name`` selects which adapter hive's schema to enumerate + (``cloud_sensor`` vs ``external_adapter``) so each group lists its own + supported types. + """ from ..cli import pass_context from ..client import Client from ..sdk.organization import Organization @@ -150,7 +181,7 @@ def list_types_cmd(ctx) -> None: debug_verbose=ctx.obj.debug_verbose, ) org = Organization(client) - data = adapter_types(org) + data = adapter_types(org, hive_name) if not ctx.obj.quiet: fmt = ctx.obj.output_format or detect_output_format() click.echo(format_output(data, fmt)) diff --git a/limacharlie/commands/adapter.py b/limacharlie/commands/adapter.py index 6dc5c18c..2ad636e5 100644 --- a/limacharlie/commands/adapter.py +++ b/limacharlie/commands/adapter.py @@ -7,7 +7,7 @@ from ..discovery import register_explain group = make_hive_group("external-adapter", "external_adapter", "external adapter") -add_list_types(group, "external-adapter.list-types") +add_list_types(group, "external-adapter.list-types", "external_adapter") # Override the generic hive explains with adapter-specific documentation. diff --git a/limacharlie/commands/cloud_sensor.py b/limacharlie/commands/cloud_sensor.py index 721a9f96..b35adcca 100644 --- a/limacharlie/commands/cloud_sensor.py +++ b/limacharlie/commands/cloud_sensor.py @@ -7,7 +7,7 @@ from ..discovery import register_explain group = make_hive_group("cloud-adapter", "cloud_sensor", "cloud adapter") -add_list_types(group, "cloud-adapter.list-types") +add_list_types(group, "cloud-adapter.list-types", "cloud_sensor") # Override the generic hive explains with cloud adapter documentation. diff --git a/tests/unit/test_cli_ergonomics.py b/tests/unit/test_cli_ergonomics.py index 5f0ad416..b19df74e 100644 --- a/tests/unit/test_cli_ergonomics.py +++ b/tests/unit/test_cli_ergonomics.py @@ -382,18 +382,53 @@ def test_fallback_includes_threatlocker(self): assert "threatlocker" in types assert "webhook" in types - def test_derived_from_schema(self): + def test_derived_from_ref_rooted_schema(self): + # Mirrors the real reflected shape: a {"schema": {...}} wrapper whose + # root $refs into $defs/; type names are that record's + # properties (minus the discriminator), NOT the bare $defs keys. from limacharlie.commands._adapter_types import _types_from_schema schema = {"schema": { - "$defs": {"S3Config": {}, "SyslogConfig": {}}, - "properties": {"s3": {}, "syslog": {}, "sensor_type": {}, "client_options": {}}, + "$ref": "#/$defs/CloudSensorRecord", + "$defs": { + "CloudSensorRecord": { + "properties": { + "s3": {}, "office365": {}, "threatlocker": {}, "sensor_type": {}, + }, + }, + "ClientOptions": {}, "AckBufferOptions": {}, # helper structs + }, }} names = _types_from_schema(schema) - # Non-type shared fields are excluded. + assert set(names) == {"s3", "office365", "threatlocker"} + # The discriminator and helper-struct $defs names must NOT leak in. assert "sensor_type" not in names - assert "client_options" not in names + assert "ClientOptions" not in names and "AckBufferOptions" not in names + + def test_derived_from_inline_properties_schema(self): + # Fallback shape: no $ref, properties inline on the root. + from limacharlie.commands._adapter_types import _types_from_schema + schema = {"schema": { + "properties": {"s3": {}, "syslog": {}, "sensor_type": {}, "client_options": {}}, + }} + names = _types_from_schema(schema) + assert "sensor_type" not in names and "client_options" not in names assert "s3" in names and "syslog" in names + def test_per_hive_schema_selection(self): + # cloud-adapter and external-adapter must enumerate their OWN hive. + from limacharlie.commands import _adapter_types as at + captured = {} + + class _FakeHive: + def __init__(self, org, hive_name): + captured["hive_name"] = hive_name + def get_schema(self): + return {"schema": {"$ref": "#/$defs/R", "$defs": {"R": {"properties": {"s3": {}}}}}} + + with patch.object(at, "Hive", _FakeHive): + at.adapter_types(None, "external_adapter") + assert captured["hive_name"] == "external_adapter" + @patch("limacharlie.commands._adapter_types.Hive") @patch("limacharlie.commands._adapter_types.Client", create=True) def test_cli_list_types(self, _client, mock_hive_cls):