FEAT text adaptive scenario#1760
Conversation
…ra/text_adaptive_scenario
…ra/text_adaptive_scenario
…ra/text_adaptive_scenario
…-text-adaptive-scenario
- Remove prompt_sending from adaptive pool; enable baseline comparison - Expose max_attempts_per_objective via supported_parameters() (scam.py pattern) - Rename AdaptiveTechniqueSelector -> EpsilonGreedyTechniqueSelector - Extract TechniqueSelector Protocol; accept custom selector via kwarg - Per-decision RNG derivation (SHA-256) for resume reproducibility - Drop uuid.uuid4() fallback for objective IDs - Per-dataset atomic attacks (one AtomicAttack per dataset, not per objective) - AdaptiveDispatchParams with per-call seed_group and compatibility filtering - Context extraction moved to dispatcher - Rehydration uses get_attack_results with attribution_data filtering - Split selector.py into selectors/ folder (protocol.py + epsilon_greedy.py) - Update notebooks for new API patterns Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…-text-adaptive-scenario
- SIM108: use ternary for selector assignment - D101: add docstring to AdaptiveDispatchParams - DOC201/DOC501: add Returns/Raises sections to docstrings - TC003: move Sequence import into TYPE_CHECKING block - Fix trailing newline in epsilon_greedy.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
|
||
|
|
||
| @runtime_checkable | ||
| class TechniqueSelector(Protocol): |
There was a problem hiding this comment.
I might reformat this can call it "TechniqueSelector" or technique_selector.py
# returns the technique_id; calling select_async with the same SelectorContext twice should yield the same answer
async def select_async(
*,
technique_identifiers: Sequence[str],
context: SelectorContext,
number_to_get: int = 1,
) -> Sequence[str]:
"""Returns techniques in priority order (try first, try second).
Returns up to number_to_get; fewer if not enough techniques are available.
"""
@dataclass(frozen=True)
class SelectorContext:
scenario_result_id: str
scenario_class_name: str
objective: str
attempt_index: int
harm_category: str | None = None
memory_labels # in case the selector wants to make use
I don't think it needs to record results within the class; they're already in memory. We probably don't want to enforce "one at a time" logic or have custom resume logic. WE want to use all memory available for many of these, and that can vary. If we wanted to scope to a single scenario run, we could use the SelectorContext to limit it.
| with self._lock: | ||
| if decision_key: | ||
| effective_key = decision_key | ||
| else: |
There was a problem hiding this comment.
This algorithm is sound for in-memory bandit-style selection, but the in-memory counts are a weakness in two ways: (a) state is lost between runs, and (b) we only learn from this scenario's own attempts. With thousands of historical AttackResults in memory, an analytics-backed selector can be much more powerful. I would
- Move ASR aggregation to
pyrit/analytics/result_analysis.pyaddcompute_technique_success_rates(memory, *, technique_hashes, scope) -> dict[str, AttackStats]next to the existinganalyze_results. Default scope is all-history filtered by technique identity; pass anAnalyticsScopeto restrict (e.g. to this scenario_class, this scenario_result_id, a harm category, or to exclude in-flight runs). - Identify techniques using AttackTechnique eval hashes; but in the future we may want to identify parts of techniques
- See my comment on "protocol" about how I think AttackTechnique should look. For Epsilon greedy I think pool threshold and epsilon make sense, but we likely don't need count, lock, etc
An example of what result_analytics could look like
Python
# In pyrit/analytics/result_analysis.py (or technique_analytics.py)
async def compute_technique_success_rates(
memory: MemoryInterface,
*,
technique_hashes: Sequence[str],
scope: AnalyticsScope | None = None,
) -> dict[str, AttackStats]:
"""ASR per technique, filtered by AnalyticsScope (scenario_class,
scenario_result_id, harm_categories, since, labels)."""
|
|
||
| return techniques | ||
|
|
||
| def _build_atomic_for_dataset( |
There was a problem hiding this comment.
I like how you group these by dataset initially.
If you take the TechniqueSelector nudge (stateless, queries memory), I think a lot of this simplifies. At construction time per objective, pre-select all K techniques to try:
chosen = await selector.select_async(
technique_identifiers=all_techniques,
context=context,
number_to_get=max_attempts_per_objective,
)
# chosen is a Sequence[str] in priority order
for technique in chosen:
result = await run(technique)
if result.outcome == SUCCESS:
break
No per-attempt re-selection — within one objective the analytics signal barely moves, so K queries collapse to 1. Resume: just call select_async again with number_to_get = K - already_executed; the selector is deterministic-given-memory so the new picks are sensible relative to whatever's already in the DB.
| import logging | ||
| import uuid | ||
| from dataclasses import dataclass, field, replace | ||
| from datetime import datetime, timezone |
There was a problem hiding this comment.
Once the TechniqueSelector redesign lands (stateless, queries memory) and selection moves to batch-at-construction on AdaptiveScenario, this file shrinks and is super simplified — probably to ~50 lines. Sketching what could work
We could have a new attack that executes attacks sequentially like this
# pyrit/executor/attack/compound/strategy_sequence.py
class StopPolicy(StrEnum):
ON_SUCCESS = "on_success"
ON_SUCCESS_OR_ERROR = "on_success_or_error"
RUN_ALL = "run_all"
class StrategySequenceAttack(AttackStrategy):
"""Runs a sequence of attack strategies against one objective,
stopping per ``stop_policy``. Each strategy runs as a real
AtomicAttack so scenario attribution lands correctly."""
def __init__(
self,
*,
strategies: Sequence[AttackStrategy],
stop_policy: StopPolicy = StopPolicy.ON_SUCCESS,
) -> None:
...
Then you can call that with the selected techniques
- Make TechniqueSelector stateless: queries memory instead of internal counts - Identify techniques by AttackTechnique eval hashes instead of names - Pre-select K techniques via num_top_techniques parameter - Add SelectorScope enum (ALL_RUNS / CURRENT_RUN) - Move ASR aggregation to pyrit/analytics/scenario_analysis.py - Rename protocol.py to technique_selector.py - Remove ContextExtractor, SelectorContext, redundant VERSION/BASELINE_ATTACK_POLICY - Add scanner CLI section to notebook - Rename number_to_get to num_top_techniques - Remove label_key from user-facing API (hardcode ADAPTIVE_TECHNIQUE_LABEL) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add Adaptive Scenario Framework with
TextAdaptiveSummary
Introduces an adaptive scenario framework that picks attack techniques per-objective using an epsilon-greedy bandit informed by observed success rates, rather than running every selected technique against every objective. Concentrates spend on techniques that actually work against the target, and stops early on first success.
Adds:
AdaptiveScenario— modality-agnostic base classTextAdaptive— concrete text-attack subclassTechniqueSelectorProtocol +EpsilonGreedyTechniqueSelector— pluggable selector with Laplace-smoothed estimates and pooled cross-context backoffAdaptiveDispatchAttack— per-dataset dispatch strategy with per-call seed-group routing.pydocMotivation
Static scenarios are
O(techniques × objectives): every technique runs against every objective regardless of whether earlier attempts already succeeded or whether the technique is known to be ineffective against the target. For evaluation runs with many techniques and many objectives, this wastes spend on combinations that aren't informative.Adaptive scenarios reduce this to
O(max_attempts × objectives)by:epsilon) so the table doesn't collapse onto a single technique prematurely,How it works
For each objective the dispatcher loops up to
max_attempts_per_objectivetimes:epsilonpick a random technique, otherwise pick the one with the highest Laplace-smoothed success estimate(s + 1) / (n + 1). Cells with fewer thanpool_thresholdlocal observations fall back to the technique's pooled rate across all contexts (cold-start handling). Each decision derives a per-decision RNG fromSHA-256(random_seed|context|decision_key)for resume-safe reproducibility.AdaptiveDispatchParams.seed_group), merging the technique'sseed_techniqueif it declares one. Techniques incompatible with the current seed group are filtered per-call.(context, technique) → (successes, attempts)table and stop early on success.The selector is shared by reference across all dispatchers in a scenario run, so learning accumulates globally. The per-call
contextkey is derived by aContextExtractor;global_context(default) shares one table across all objectives,harm_category_contextpartitions by harm category.Public API
Adaptive scenarios are also resumable — pass
scenario_result_id="..."to the constructor and prior dispatch trails are replayed into the selector before the remaining objectives run.Notes
BASELINE_ATTACK_POLICY = Enabled—prompt_sendingis excluded from the adaptive technique pool and runs as the baseline comparison instead. This separates "what does the target do unprovoked" (baseline) from "what adversarial moves help" (adaptive techniques).AtomicAttackper dataset carrying all seed groups, with per-call seed-group routing viaAdaptiveDispatchParams. Per-call compatibility filtering happens inside the dispatcher.selector: TechniqueSelector | Noneon the scenario. WhenNone(default), anEpsilonGreedyTechniqueSelectoris created with default settings. Selector-specific params (epsilon,pool_threshold,random_seed) live on the selector, not the scenario.max_attempts_per_objectiveis a scenario parameter viasupported_parameters().get_attack_results(scenario_result_id=...)and filters byattribution_data["parent_collection"]to replay prior dispatch trails viarecord_outcome. Already-completed atomics are skipped by the baseScenarioresume path.AttackResultvia its own post-execute hook; the dispatcher returns areplace-based copy with a freshattack_result_id/timestampand the adaptive trail stamped onto metadata. Both rows shareconversation_id.EpsilonGreedyTechniqueSelectorguards its counts table with athreading.Lockso individualselect/record_outcomeoperations are atomic.Files
pyrit/scenario/scenarios/adaptive/adaptive_scenario.pypyrit/scenario/scenarios/adaptive/text_adaptive.pypyrit/scenario/scenarios/adaptive/selectors/protocol.pypyrit/scenario/scenarios/adaptive/selectors/epsilon_greedy.pypyrit/scenario/scenarios/adaptive/dispatcher.pypyrit/scenario/scenarios/adaptive/__init__.py,pyrit/scenario/scenarios/adaptive/selectors/__init__.pydoc/code/scenarios/3_adaptive_scenarios.py,doc/code/scenarios/3_adaptive_scenarios.ipynbtests/unit/scenario/scenarios/adaptive/test_epsilon_greedy.py,tests/unit/scenario/scenarios/adaptive/test_protocol.py,tests/unit/scenario/scenarios/adaptive/test_dispatcher.py,tests/unit/scenario/scenarios/adaptive/test_text_adaptive.pyTesting
pytest tests/unit/scenario/scenarios/adaptive/— 64 tests pass. Coverage includes:record_outcomesupported_parameters()