Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions limacharlie/commands/_adapter_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from __future__ import annotations

import re
from typing import Any

import click
Expand Down Expand Up @@ -156,6 +157,176 @@ def adapter_types(org: Any, hive_name: str = "cloud_sensor") -> list[dict[str, s
"""


_UUID_RE = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")


def _record_root(schema: Any) -> tuple[dict | None, dict | None]:
"""Return (root_schema, record_node) for an adapter hive schema.

Unwraps the {"schema": {...}} envelope and follows the root $ref into the
record definition (CloudSensorRecord / ExternalAdapterConfig).
"""
if isinstance(schema, dict) and isinstance(schema.get("schema"), dict):
schema = schema["schema"]
if not isinstance(schema, dict):
return None, None
record = schema
ref = schema.get("$ref")
if isinstance(ref, str):
resolved = _resolve_ref(schema, ref)
if resolved is not None:
record = resolved
return schema, record


def adapter_type_schema(org: Any, hive_name: str, type_name: str) -> tuple[Any, dict | None]:
"""Resolve one adapter type's config sub-schema.

Returns (root_schema, type_node): type_node is the JSON-Schema node for the
per-type config (e.g. the threatlocker sub-struct), or None if the type is
unknown. root_schema is returned so the caller can flatten/render with $ref
resolution against $defs.
"""
schema = Hive(org, hive_name).get_schema()
root, record = _record_root(schema)
if record is None:
return schema, None
props = record.get("properties")
if not isinstance(props, dict) or type_name not in props:
return root, None
return root, props[type_name]


def adapter_sensors(org: Any, hive_name: str, key: str) -> dict[str, Any]:
"""Find the live sensor(s) belonging to an adapter record.

Reads the adapter record, extracts its installation key (the sensor iid),
and returns the sensors whose iid matches. Falls back to matching the
adapter's configured hostname when the installation key isn't a bare iid
(UUID). The sensors list is empty when the adapter hasn't registered a
sensor yet (e.g. it has not delivered any events).
"""
record = Hive(org, hive_name).get(key)
data = getattr(record, "data", None) or {}
sensor_type = data.get("sensor_type")
sub = data.get(sensor_type, {}) if sensor_type else {}
client_options = sub.get("client_options", {}) if isinstance(sub, dict) else {}
identity = client_options.get("identity", {}) if isinstance(client_options, dict) else {}
install_key = identity.get("installation_key")
hostname = client_options.get("hostname")

if isinstance(install_key, str) and _UUID_RE.match(install_key):
match_by, match_val = "iid", install_key
elif hostname:
match_by, match_val = "hostname", hostname
else:
return {"adapter": key, "match_by": None, "match_value": None, "selector": None, "sensors": [],
"note": "Adapter record has no resolvable installation_key (iid) or hostname to match on."}

# Filter server-side with a sensor selector (both iid and hostname are
# supported selector fields), so this scales to large fleets instead of
# paging every sensor and filtering client-side.
selector = f'{match_by} == "{match_val}"'
sensors = [
{"sid": s.get("sid"), "hostname": s.get("hostname"), "iid": s.get("iid"),
"is_online": s.get("is_online"), "last_seen": s.get("alive")}
for s in org.list_sensors(selector=selector)
]
out = {"adapter": key, "match_by": match_by, "match_value": match_val, "selector": selector, "sensors": sensors}
if not sensors:
out["note"] = ("No sensor registered for this adapter yet — it has not delivered any "
"events (a cloud adapter materializes a sensor on first event). Normal "
"for a freshly-created adapter.")
return out


_EXPLAIN_SCHEMA = """\
Show the configuration schema for ONE adapter/sensor type as a flat field
listing (field | type | required | notes), resolved from the live adapter hive
JSON-Schema. Use this before 'set' to learn the exact field set and where each
field lives (e.g. that hostname goes under client_options, not at the top
level). Pass --output json for the raw JSON-Schema node.

Run 'list-types' first to see the valid --type values.
"""

_EXPLAIN_SENSORS = """\
List the live sensor(s) this adapter has produced. Reads the adapter record's
installation key (iid) and returns sensors whose iid matches — the reliable way
to get a cloud/external adapter's SID without decoding the installation key.

An empty result means the adapter has not registered a sensor yet (it has not
delivered any events); that is expected for a freshly-created adapter.
"""


def add_schema(group: click.Group, command_path: str, hive_name: str = "cloud_sensor") -> None:
"""Attach a ``schema --type <t>`` subcommand to an adapter command group."""
from ..cli import pass_context
from ..client import Client
from ..sdk.organization import Organization
from ..output import format_output, detect_output_format
from ..discovery import register_explain
from .hive import _flatten_schema

@group.command("schema", help="Show one adapter type's config schema.")
@click.option("--type", "type_name", required=True, help="Adapter type (see list-types).")
@pass_context
def schema_cmd(ctx, type_name) -> None:
client = Client(
oid=ctx.obj.oid, environment=ctx.obj.environment,
print_debug_fn=ctx.obj.debug_fn, debug_full_response=ctx.obj.debug_full,
debug_curl=ctx.obj.debug_curl, debug_verbose=ctx.obj.debug_verbose,
)
org = Organization(client)
root, node = adapter_type_schema(org, hive_name, type_name)
if node is None:
types = ", ".join(t["type"] for t in adapter_types(org, hive_name))
raise click.UsageError(f"Unknown adapter type '{type_name}'. Valid types: {types}")
if ctx.obj.quiet:
return
fmt = ctx.obj.output_format or detect_output_format()
if fmt == "table" and isinstance(root, dict):
rows = _flatten_schema(node, root)
if rows:
click.echo(format_output(rows, fmt))
return
# Raw JSON-Schema node for machine formats: resolve a top-level $ref and
# carry the root $defs so nested $refs in the node stay resolvable.
resolved = _resolve_ref(root, node["$ref"]) if isinstance(node, dict) and "$ref" in node and isinstance(root, dict) else node
if isinstance(resolved, dict) and isinstance(root, dict) and isinstance(root.get("$defs"), dict):
resolved = {**resolved, "$defs": root["$defs"]}
click.echo(format_output(resolved, fmt))

register_explain(command_path, _EXPLAIN_SCHEMA)


def add_sensors(group: click.Group, command_path: str, hive_name: str = "cloud_sensor") -> None:
"""Attach a ``sensors --key <adapter>`` subcommand to an adapter command group."""
from ..cli import pass_context
from ..client import Client
from ..sdk.organization import Organization
from ..output import format_output, detect_output_format
from ..discovery import register_explain

@group.command("sensors", help="List the live sensor(s) this adapter produced.")
@click.option("--key", required=True, help="Adapter record key.")
@pass_context
def sensors_cmd(ctx, key) -> None:
client = Client(
oid=ctx.obj.oid, environment=ctx.obj.environment,
print_debug_fn=ctx.obj.debug_fn, debug_full_response=ctx.obj.debug_full,
debug_curl=ctx.obj.debug_curl, debug_verbose=ctx.obj.debug_verbose,
)
org = Organization(client)
data = adapter_sensors(org, hive_name, key)
if not ctx.obj.quiet:
fmt = ctx.obj.output_format or detect_output_format()
click.echo(format_output(data, fmt))

register_explain(command_path, _EXPLAIN_SENSORS)


def add_list_types(group: click.Group, command_path: str, hive_name: str = "cloud_sensor") -> None:
"""Attach a ``list-types`` subcommand to an adapter command group.

Expand Down
4 changes: 3 additions & 1 deletion limacharlie/commands/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from __future__ import annotations

from ._hive_shortcut import make_hive_group
from ._adapter_types import add_list_types
from ._adapter_types import add_list_types, add_schema, add_sensors
from ..discovery import register_explain

group = make_hive_group("external-adapter", "external_adapter", "external adapter")
add_list_types(group, "external-adapter.list-types", "external_adapter")
add_schema(group, "external-adapter.schema", "external_adapter")
add_sensors(group, "external-adapter.sensors", "external_adapter")

# Override the generic hive explains with adapter-specific documentation.

Expand Down
44 changes: 43 additions & 1 deletion limacharlie/commands/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,19 @@ def list_keys(ctx, name) -> None:
"--permissions", required=True,
help="Comma-separated list of permissions (e.g., 'dr.list,dr.set').",
)
@click.option(
"--store-secret", "store_secret", default=None,
help="Also store the freshly-minted key value into the secret hive under this "
"key name ({data: {secret: <value>}}), so it can be referenced as "
"hive://secret/<name>. The value is written directly without transiting "
"intermediate files — collapses the mint -> store -> reference chain.",
)
@click.option(
"--store-secret-tag", "store_secret_tags", multiple=True,
help="Tag to apply to the stored secret record (repeatable). Only used with --store-secret.",
)
@pass_context
def create(ctx, name, permissions) -> None:
def create(ctx, name, permissions, store_secret, store_secret_tags) -> None:
perm_list = [p.strip() for p in permissions.split(",") if p.strip()]
if not perm_list:
click.echo("Error: At least one permission is required.", err=True)
Expand All @@ -140,6 +151,37 @@ def create(ctx, name, permissions) -> None:
data = org.add_api_key(name, perm_list)
if not ctx.obj.quiet:
click.echo(f"API key '{name}' created.")

if store_secret:
# The key value is only shown once; persist it into the secret hive in
# the same step so callers never have to capture and re-pipe it.
value = data.get("api_key") or data.get("secret") or data.get("key")
if not value:
# The key was already created; surface it so it isn't lost, then fail.
click.echo(
"Error: API key created but no key value was returned to store as a secret.",
err=True,
)
_output(ctx, data)
ctx.exit(4)
return
from ..sdk.hive import Hive, HiveRecord
secret_hive = Hive(org, "secret")
# If a secret of this name already exists, carry its etag so the write is
# a conditional update (no lost update on a concurrent change) and we can
# report create-vs-overwrite instead of silently clobbering it.
existing_etag = None
try:
existing_etag = secret_hive.get_metadata(store_secret).etag
except Exception:
existing_etag = None # not found -> creating a new secret
record = HiveRecord(store_secret, data={"secret": value},
tags=list(store_secret_tags), enabled=True, etag=existing_etag)
secret_hive.set(record)
if not ctx.obj.quiet:
verb = "Updated existing" if existing_etag else "Stored key value in new"
click.echo(f"{verb} secret '{store_secret}' (reference it as hive://secret/{store_secret}).")

_output(ctx, data)


Expand Down
4 changes: 3 additions & 1 deletion limacharlie/commands/cloud_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from __future__ import annotations

from ._hive_shortcut import make_hive_group
from ._adapter_types import add_list_types
from ._adapter_types import add_list_types, add_schema, add_sensors
from ..discovery import register_explain

group = make_hive_group("cloud-adapter", "cloud_sensor", "cloud adapter")
add_list_types(group, "cloud-adapter.list-types", "cloud_sensor")
add_schema(group, "cloud-adapter.schema", "cloud_sensor")
add_sensors(group, "cloud-adapter.sensors", "cloud_sensor")

# Override the generic hive explains with cloud adapter documentation.

Expand Down
Loading