Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 232 additions & 2 deletions src/strands/vended_plugins/skills/skill.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,31 @@

This module defines the Skill dataclass and provides classmethods for
discovering, parsing, and loading skills from the filesystem, raw content,
or HTTPS URLs. Skills are directories containing a SKILL.md file with YAML
frontmatter metadata and markdown instructions.
HTTPS URLs, or Amazon S3. Skills are directories containing a SKILL.md file
with YAML frontmatter metadata and markdown instructions.
"""

from __future__ import annotations

import logging
import re
import tempfile
import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import boto3
import yaml

logger = logging.getLogger(__name__)

_SKILL_NAME_PATTERN = re.compile(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
_MAX_SKILL_NAME_LENGTH = 64
_S3_MIRROR_MAX_WORKERS = 4
_S3_MIRROR_CACHE: dict[tuple[str, str | None], Path] = {}


def _find_skill_md(skill_dir: Path) -> Path:
Expand Down Expand Up @@ -204,6 +209,172 @@ def _build_skill_from_frontmatter(
)


def _mirror_skills_from_s3(
bucket: str,
prefix: str | None = None,
*,
s3_client: Any = None,
local_dir: str | Path | None = None,
) -> Path | None:
"""Mirror skill directories from S3 to a local filesystem path.

Discovers all SKILL.md files under the given prefix, then downloads each
skill directory (SKILL.md + subdirectories like scripts/, references/,
assets/) in parallel using a thread pool.

Results are cached per (bucket, prefix) pair for the lifetime of the
process. Subsequent calls with the same arguments return the previously
mirrored directory without re-downloading.

Args:
bucket: S3 bucket name.
prefix: Optional key prefix to scope the scan (e.g. "agents/orchestrator/").
If None, scans the entire bucket.
s3_client: Optional pre-configured boto3 S3 client. If None, a default
client is created via ``boto3.client("s3")``.
local_dir: Optional local directory to mirror skills into. If None, a
temporary directory is created.

Returns:
Path to the local directory containing mirrored skill subdirectories,
or None if no skills were found.
"""
cache_key = (bucket, prefix)
if cache_key in _S3_MIRROR_CACHE:
logger.debug("bucket=<%s>, prefix=<%s> | s3 mirror cache hit", bucket, prefix or "")
return _S3_MIRROR_CACHE[cache_key]

if s3_client is None:
s3_client = boto3.client("s3")

# Normalize prefix
normalized_prefix = ""
if prefix:
normalized_prefix = prefix.rstrip("/") + "/"

# Discover all objects under the prefix
objects = _s3_list_all_objects(s3_client, bucket, normalized_prefix)

if not objects:
logger.warning("bucket=<%s>, prefix=<%s> | no objects found", bucket, normalized_prefix)
return None

# Find skill directories (those containing SKILL.md)
skill_dirs = _s3_find_skill_directories(objects, normalized_prefix)

if not skill_dirs:
logger.warning("bucket=<%s>, prefix=<%s> | no SKILL.md files found", bucket, normalized_prefix)
return None

logger.info(
"bucket=<%s>, prefix=<%s>, count=<%d> | found skills: %s",
bucket,
normalized_prefix,
len(skill_dirs),
", ".join(skill_dirs),
)

dest = _s3_resolve_local_dir(local_dir)
download_tasks = _s3_build_download_tasks(objects, skill_dirs, normalized_prefix)
_s3_download_parallel(s3_client, bucket, download_tasks, dest)

_S3_MIRROR_CACHE[cache_key] = dest
return dest


def _s3_resolve_local_dir(local_dir: str | Path | None) -> Path:
"""Resolve or create the local directory for S3 mirroring."""
if local_dir is None:
path = Path(tempfile.mkdtemp(prefix="strands-s3-skills-"))
else:
path = Path(local_dir)
path.mkdir(parents=True, exist_ok=True)
return path


def _s3_list_all_objects(s3_client: Any, bucket: str, prefix: str) -> list[str]:
"""List all object keys under a prefix, handling pagination."""
keys: list[str] = []
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

for page in pages:
for obj in page.get("Contents", []):
keys.append(obj["Key"])

return keys


def _s3_find_skill_directories(object_keys: list[str], prefix: str) -> list[str]:
"""Find skill directory names (relative to prefix) that contain SKILL.md.

Returns a sorted list of skill directory names (e.g. ["code-review", "pdf-processing"]).
"""
skill_dirs: list[str] = []

for key in object_keys:
relative = key[len(prefix) :]
parts = relative.split("/")

# We expect: <skill-name>/SKILL.md (or skill.md)
if len(parts) == 2 and parts[1].lower() == "skill.md":
skill_dirs.append(parts[0])

return sorted(set(skill_dirs))


def _s3_build_download_tasks(
object_keys: list[str],
skill_dirs: list[str],
prefix: str,
) -> list[tuple[str, str]]:
"""Build a list of (s3_key, relative_local_path) tuples to download.

Only includes objects that belong to a discovered skill directory.
"""
skill_dir_set = set(skill_dirs)
tasks: list[tuple[str, str]] = []

for key in object_keys:
relative = key[len(prefix) :]
parts = relative.split("/")

if parts[0] in skill_dir_set and len(parts) >= 2:
if not key.endswith("/"):
tasks.append((key, relative))

return tasks


def _s3_download_parallel(
s3_client: Any,
bucket: str,
tasks: list[tuple[str, str]],
dest: Path,
) -> None:
"""Download files from S3 in parallel using a thread pool."""
if not tasks:
return

def _download_one(task: tuple[str, str]) -> str:
s3_key, relative_path = task
local_path = dest / relative_path
local_path.parent.mkdir(parents=True, exist_ok=True)
s3_client.download_file(bucket, s3_key, str(local_path))
return relative_path

with ThreadPoolExecutor(max_workers=_S3_MIRROR_MAX_WORKERS) as executor:
futures = {executor.submit(_download_one, t): t for t in tasks}
for future in as_completed(futures):
try:
path = future.result()
logger.debug("s3_path=<%s> | downloaded", path)
except Exception:
s3_key, _ = futures[future]
logger.warning("bucket=<%s>, key=<%s> | failed to download", bucket, s3_key)
raise


@dataclass
class Skill:
r"""Represents an agent skill with metadata and instructions.
Expand Down Expand Up @@ -422,3 +593,62 @@ def from_directory(cls, skills_dir: str | Path, *, strict: bool = False) -> list

logger.debug("path=<%s>, count=<%d> | loaded skills from directory", skills_dir, len(skills))
return skills

@classmethod
def from_s3(
cls,
bucket: str,
prefix: str | None = None,
*,
s3_client: Any = None,
local_dir: str | Path | None = None,
strict: bool = False,
) -> list[Skill]:
"""Load skills from an Amazon S3 bucket.

Scans ``bucket`` (under ``prefix`` if given) for directories containing
a SKILL.md file, mirrors each skill directory (including subdirectories
like scripts/, references/, assets/) to a local path, then delegates to
:meth:`from_directory` for parsing.

Results are cached per (bucket, prefix) pair for the lifetime of the
process — subsequent calls with the same arguments return the cached
list without re-downloading.

Example::

from strands.vended_plugins.skills import Skill

# Load all skills under a prefix
skills = Skill.from_s3("my-bucket", prefix="agents/director/")

# Load from bucket root with a custom S3 client
import boto3
s3 = boto3.client("s3", region_name="eu-west-1")
skills = Skill.from_s3("my-bucket", s3_client=s3)

Args:
bucket: S3 bucket name.
prefix: Optional key prefix to scope the scan (e.g. "agents/director/").
If None, scans the entire bucket.
s3_client: Optional pre-configured boto3 S3 client. If None, a default
client is created via ``boto3.client("s3")``.
local_dir: Optional local directory to mirror skills into. If None, a
temporary directory is created.
strict: If True, raise on skill validation issues. If False (default),
warn and load anyway. Passed through to :meth:`from_directory`.

Returns:
List of Skill instances loaded from S3.
"""
local_path = _mirror_skills_from_s3(
bucket=bucket,
prefix=prefix,
s3_client=s3_client,
local_dir=local_dir,
)

if local_path is None:
return []

return cls.from_directory(local_path, strict=strict)
Loading