diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2b40b37..06f242d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -59,12 +59,11 @@ repos: hooks: - id: pytest name: pytest - entry: pytest -m "not integration_test" - language: python + entry: uv run pytest -m "not integration_test" + language: system types: [python] pass_filenames: false always_run: true - additional_dependencies: [pytest] ci: autofix_commit_msg: | diff --git a/README.md b/README.md index 3a21104..dd4af4a 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ vector-aixpert/ │ ├── src/aixpert/ │ ├── controlled_images/ # Baseline vs fairness-aware image generation +│ ├── deepfake_detection/ # Curated multimodal deepfake data preparation │ ├── data_generation/ │ │ ├── synthetic_data_generation/ │ │ │ ├── images/ # Domain/risk-specific image + VQA generation @@ -94,6 +95,9 @@ uv run mkdocs serve - **Controlled Images** — Matched baseline vs fairness-aware images across professions. ➜ [`src/aixpert/controlled_images/README.md`](src/aixpert/controlled_images/README.md) +- **Deepfake Detection** — FACT-HO bundle preparation for LAV-DF, FakeAVCeleb, and VCapAV. + ➜ [`src/aixpert/deepfake_detection/README.md`](src/aixpert/deepfake_detection/README.md) + - **Agent Pipeline (CrewAI)** — Single-agent orchestration for prompt/image/metadata generation. ➜ [`src/aixpert/data_generation/agent_pipeline/README.md`](src/aixpert/data_generation/agent_pipeline/README.md) @@ -149,4 +153,4 @@ Resources used in preparing this research were provided, in part, by the Provinc This work is part of the AIXpert project, funded by the **European Union's Horizon Europe Research and Innovation Programme** under Grant Agreement No. **101214389**, and the **Swiss State Secretariat for Education, Research and Innovation (SERI)**. Views expressed are those of the authors and do not necessarily reflect those of the European Union or funding authorities. -🌐 [Project Website](https://aixpert-project.eu/) · [LinkedIn](https://www.linkedin.com/company/aixpert-project/) · [X/Twitter](https://x.com/AIXPERT_project) · [YouTube](https://www.youtube.com/@AIXPERT_project) \ No newline at end of file +🌐 [Project Website](https://aixpert-project.eu/) · [LinkedIn](https://www.linkedin.com/company/aixpert-project/) · [X/Twitter](https://x.com/AIXPERT_project) · [YouTube](https://www.youtube.com/@AIXPERT_project) diff --git a/_typos.toml b/_typos.toml index d17a573..497ee6e 100644 --- a/_typos.toml +++ b/_typos.toml @@ -11,3 +11,4 @@ LLM = "LLM" # Large Language Model LLMs = "LLMs" # Large Language Models (plural) VQA = "VQA" # Visual Question Answering IG = "IG" # Integrated Gradients +HumAIne = "HumAIne" # EU project name diff --git a/docs/projects.md b/docs/projects.md index 3850090..5517f69 100644 --- a/docs/projects.md +++ b/docs/projects.md @@ -78,4 +78,4 @@ Statistical metrics (e.g. Statistical Parity, Equal Opportunity), zero-shot expl - See [CONTRIBUTING.md](https://github.com/VectorInstitute/vector-aixpert/blob/main/CONTRIBUTING.md) for coding standards (PEP8, Google docstrings), pre-commit hooks (`ruff`, `mypy`, `typos`, `nbQA`), branching, and tests. - **Run docs locally:** `uv sync --no-group docs` then `mkdocs serve` → [http://127.0.0.1:8000](http://127.0.0.1:8000) -- **CI:** GitHub Actions (`code_checks.yml`, `unit_tests.yml`, `integration_tests.yml`) \ No newline at end of file +- **CI:** GitHub Actions (`code_checks.yml`, `unit_tests.yml`, `integration_tests.yml`) diff --git a/pyproject.toml b/pyproject.toml index 8f9fe90..610416b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -132,6 +132,7 @@ ignore = [ # Ignore import violations in all `__init__.py` files. [tool.ruff.lint.per-file-ignores] "__init__.py" = ["E402", "F401", "F403", "F811"] +"src/aixpert/deepfake_detection/builders.py" = ["PLR0912"] # Ignoring undocumented public functions, public init, magic method, and magic numbers in tests folder "tests/*" = ["D103", "D105", "D107"] diff --git a/src/aixpert/deepfake_detection/README.md b/src/aixpert/deepfake_detection/README.md new file mode 100644 index 0000000..654d9a5 --- /dev/null +++ b/src/aixpert/deepfake_detection/README.md @@ -0,0 +1,38 @@ +# Deepfake Detection + +This module packages the most reviewable and reusable parts of the current +multimodal deepfake work into the `vector-aixpert` monorepo. + +## Scope + +The first version focuses on data preparation rather than full training: + +- FACT-HO sample and bundle domain objects +- deterministic grouping and split helpers +- dataset builders for `LAV-DF`, `FakeAVCeleb`, and manifest-first `VCapAV` +- a small CLI for one-sample smoke summaries + +## Why this is curated + +The original working directory contains many experiment scripts, environment +fixes, and cluster-specific launchers. For a first monorepo integration, this +module keeps only the parts that are easiest to review, test, and scale. + +That means this initial contribution intentionally excludes: + +- repeated training variants +- plotting and monitoring helpers +- local outputs and checkpoints +- user-specific absolute paths + +## Example + +From the repository root: + +```bash +uv run python -m aixpert.deepfake_detection.cli summarize \ + --dataset vcapav \ + --data-root /path/to/data \ + --metadata-path /path/to/vcapav_manifest.jsonl \ + --vcapav-split-strategy metadata +``` diff --git a/src/aixpert/deepfake_detection/__init__.py b/src/aixpert/deepfake_detection/__init__.py new file mode 100644 index 0000000..dbdd68f --- /dev/null +++ b/src/aixpert/deepfake_detection/__init__.py @@ -0,0 +1,36 @@ +"""Curated utilities for multimodal deepfake dataset preparation.""" + +from aixpert.deepfake_detection.builders import ( + DatasetPartitions, + FakeAVCelebBuilder, + FakeAVCelebConfig, + LAVDFBuilder, + LAVDFConfig, + SelectionLimits, + SplitConfig, + VCapAVBuilder, + VCapAVConfig, +) +from aixpert.deepfake_detection.core import ( + FactHOBundle, + FactHOSample, + assign_group_indices, + build_bundles_from_samples, +) + + +__all__ = [ + "DatasetPartitions", + "FactHOBundle", + "FactHOSample", + "FakeAVCelebBuilder", + "FakeAVCelebConfig", + "LAVDFBuilder", + "LAVDFConfig", + "SelectionLimits", + "SplitConfig", + "VCapAVBuilder", + "VCapAVConfig", + "assign_group_indices", + "build_bundles_from_samples", +] diff --git a/src/aixpert/deepfake_detection/builders.py b/src/aixpert/deepfake_detection/builders.py new file mode 100644 index 0000000..86d904b --- /dev/null +++ b/src/aixpert/deepfake_detection/builders.py @@ -0,0 +1,715 @@ +"""Dataset builders for a curated, testable FACT-HO preparation workflow.""" + +from __future__ import annotations + +import csv +import json +from abc import ABC, abstractmethod +from collections import defaultdict +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from aixpert.deepfake_detection.core import ( + FactHOBundle, + FactHOSample, + assign_group_indices, + assign_source_disjoint_splits, + build_bundles_from_samples, + count_bundle_groups, + count_bundle_patterns, + count_dataset_bundles, + count_dataset_samples, + duration_bucket, + infer_fakeav_modality_labels, + infer_vcapav_content_bucket, + rebalance_train_samples_by_method, + sanitize_slug, + select_bundles, + select_samples_by_split, +) + + +@dataclass(frozen=True) +class SplitConfig: + """Name the train/eval/test splits used across all builders.""" + + train: str = "train" + eval: str = "dev" + test: str = "test" + + +@dataclass(frozen=True) +class SelectionLimits: + """Cap how many samples or bundles are used per split.""" + + max_train_bundles: int = 0 + max_eval_bundles: int = 0 + max_test_bundles: int = 0 + max_train_samples: int = 0 + max_eval_samples: int = 0 + max_test_samples: int = 0 + + +@dataclass(frozen=True) +class LAVDFConfig: + """Configure LAV-DF loading.""" + + data_root: Path + metadata_path: Path | None = None + short_threshold: float = 5.0 + medium_threshold: float = 10.0 + + +@dataclass(frozen=True) +class FakeAVCelebConfig: + """Configure FakeAVCeleb loading.""" + + data_root: Path + metadata_path: Path | None = None + split_strategy: str = "source" + train_ratio: float = 0.8 + eval_ratio: float = 0.1 + test_ratio: float = 0.1 + rebalance_train: bool = True + max_fake_real_ratio: float = 12.0 + + +@dataclass(frozen=True) +class VCapAVConfig: + """Configure VCapAV loading from a manifest-first interface.""" + + data_root: Path + metadata_path: Path + split_strategy: str = "source" + train_ratio: float = 0.8 + eval_ratio: float = 0.1 + test_ratio: float = 0.1 + rebalance_train: bool = True + max_fake_real_ratio: float = 4.0 + content_buckets: int = 8 + + +@dataclass +class DatasetPartitions: + """Store per-split bundles together with an inspection summary.""" + + dataset_name: str + train_bundles: list[FactHOBundle] + eval_bundles: list[FactHOBundle] + test_bundles: list[FactHOBundle] + summary: dict[str, Any] = field(default_factory=dict) + + def to_summary_dict(self) -> dict[str, Any]: + """Return a JSON-serializable summary payload.""" + return { + "dataset_name": self.dataset_name, + "num_train_bundles": len(self.train_bundles), + "num_eval_bundles": len(self.eval_bundles), + "num_test_bundles": len(self.test_bundles), + **self.summary, + } + + +def build_vcapav_counterfactual_bundles( + samples: list[FactHOSample], +) -> list[FactHOBundle]: + """Expand VCapAV train samples into pairwise counterfactual bundles.""" + grouped: dict[tuple[str, str, str, str, str], list[FactHOSample]] = defaultdict( + list + ) + for sample in samples: + key = ( + sample.bundle_id, + sample.dataset_name, + sample.split, + sample.content_family, + sample.source, + ) + grouped[key].append(sample) + + bundles: list[FactHOBundle] = [] + for key, group_samples in grouped.items(): + bundle_id, dataset_name, split, content_family, source = key + by_pattern: dict[tuple[int, int], list[FactHOSample]] = defaultdict(list) + for sample in sorted( + group_samples, key=lambda item: (item.pattern, item.method, item.sample_id) + ): + by_pattern[(sample.label_a, sample.label_v)].append(sample) + + real_anchor = by_pattern.get((0, 0), [None])[0] + created = False + + if real_anchor is not None: + for suffix, pattern_key in [("af", (1, 0)), ("vf", (0, 1)), ("ff", (1, 1))]: + for sample in by_pattern.get(pattern_key, []): + bundles.append( + FactHOBundle( + bundle_id=f"{bundle_id}::{suffix}::{sample.method or sample.sample_id}", + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=[real_anchor, sample], + ) + ) + created = True + + if created: + continue + + bundles.append( + FactHOBundle( + bundle_id=bundle_id, + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=sorted( + group_samples, + key=lambda item: (item.pattern, item.method, item.sample_id), + ), + ) + ) + + bundles.sort(key=lambda item: (item.dataset_name, item.split, item.bundle_id)) + return bundles + + +def build_lavdf_counterfactual_bundles( + samples: list[FactHOSample], +) -> list[FactHOBundle]: + """Expand LAV-DF train samples into pairwise counterfactual bundles.""" + grouped: dict[tuple[str, str, str, str, str], list[FactHOSample]] = defaultdict( + list + ) + for sample in samples: + key = ( + sample.bundle_id, + sample.dataset_name, + sample.split, + sample.content_family, + sample.source, + ) + grouped[key].append(sample) + + bundles: list[FactHOBundle] = [] + for key, group_samples in grouped.items(): + bundle_id, dataset_name, split, content_family, source = key + ordered_samples = sorted( + group_samples, key=lambda item: (item.pattern, item.method, item.sample_id) + ) + by_pattern: dict[tuple[int, int], list[FactHOSample]] = defaultdict(list) + for sample in ordered_samples: + by_pattern[(sample.label_a, sample.label_v)].append(sample) + + real_anchors = by_pattern.get((0, 0), []) + fake_specs = [ + ("af", by_pattern.get((1, 0), [])), + ("vf", by_pattern.get((0, 1), [])), + ("ff", by_pattern.get((1, 1), [])), + ] + + used_anchor_indices: set[int] = set() + created = False + if real_anchors and any( + samples_for_pattern for _, samples_for_pattern in fake_specs + ): + for suffix, samples_for_pattern in fake_specs: + for sample_idx, sample in enumerate(samples_for_pattern): + anchor_idx = sample_idx % len(real_anchors) + anchor = real_anchors[anchor_idx] + used_anchor_indices.add(anchor_idx) + bundles.append( + FactHOBundle( + bundle_id=f"{bundle_id}::{suffix}::{sample.method or sample.sample_id}", + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=[anchor, sample], + ) + ) + created = True + + for anchor_idx, anchor in enumerate(real_anchors): + if anchor_idx in used_anchor_indices: + continue + bundles.append( + FactHOBundle( + bundle_id=f"{bundle_id}::rr::{anchor.method or anchor.sample_id}", + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=[anchor], + ) + ) + if created: + continue + + ff_anchors = by_pattern.get((1, 1), []) + fake_only_specs = [ + ("ffv", by_pattern.get((1, 0), [])), + ("ffa", by_pattern.get((0, 1), [])), + ] + used_ff_anchor_indices: set[int] = set() + created = False + if ff_anchors and any( + samples_for_pattern for _, samples_for_pattern in fake_only_specs + ): + for suffix, samples_for_pattern in fake_only_specs: + for sample_idx, sample in enumerate(samples_for_pattern): + anchor_idx = sample_idx % len(ff_anchors) + anchor = ff_anchors[anchor_idx] + used_ff_anchor_indices.add(anchor_idx) + bundles.append( + FactHOBundle( + bundle_id=f"{bundle_id}::{suffix}::{sample.method or sample.sample_id}", + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=[anchor, sample], + ) + ) + created = True + + for anchor_idx, anchor in enumerate(ff_anchors): + if anchor_idx in used_ff_anchor_indices: + continue + bundles.append( + FactHOBundle( + bundle_id=f"{bundle_id}::ff::{anchor.method or anchor.sample_id}", + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=[anchor], + ) + ) + if created: + continue + + bundles.append( + FactHOBundle( + bundle_id=bundle_id, + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=ordered_samples, + ) + ) + + bundles.sort(key=lambda item: (item.dataset_name, item.split, item.bundle_id)) + return bundles + + +class DatasetBuilder(ABC): + """Abstract base class for dataset-specific sample and bundle builders.""" + + dataset_name: str + + def __init__(self, split_config: SplitConfig | None = None, seed: int = 42) -> None: + self.split_config = split_config or SplitConfig() + self.seed = seed + + @abstractmethod + def load_samples(self) -> list[FactHOSample]: + """Load flat samples from the dataset metadata source.""" + + def train_bundle_builder(self, samples: list[FactHOSample]) -> list[FactHOBundle]: + """Build train bundles from train samples.""" + return build_bundles_from_samples(samples) + + def eval_bundle_builder(self, samples: list[FactHOSample]) -> list[FactHOBundle]: + """Build eval bundles from eval samples.""" + return build_bundles_from_samples(samples) + + def test_bundle_builder(self, samples: list[FactHOSample]) -> list[FactHOBundle]: + """Build test bundles from test samples.""" + return build_bundles_from_samples(samples) + + def adjust_train_samples( + self, train_samples: list[FactHOSample] + ) -> list[FactHOSample]: + """Optionally rebalance or prune train samples before bundling.""" + return train_samples + + def _summary( + self, + all_samples: list[FactHOSample], + train_samples: list[FactHOSample], + eval_samples: list[FactHOSample], + test_samples: list[FactHOSample], + train_bundles: list[FactHOBundle], + eval_bundles: list[FactHOBundle], + test_bundles: list[FactHOBundle], + train_bundle_mode: str, + ) -> dict[str, Any]: + assign_group_indices(train_bundles) + return { + "dataset_name": self.dataset_name, + "num_all_samples": len(all_samples), + "num_train_samples": len(train_samples), + "num_eval_samples": len(eval_samples), + "num_test_samples": len(test_samples), + "num_train_bundles": len(train_bundles), + "num_eval_bundles": len(eval_bundles), + "num_test_bundles": len(test_bundles), + "train_bundle_mode": train_bundle_mode, + "train_bundle_patterns": count_bundle_patterns(train_bundles), + "eval_bundle_patterns": count_bundle_patterns(eval_bundles), + "test_bundle_patterns": count_bundle_patterns(test_bundles), + "train_bundle_groups": count_bundle_groups(train_bundles), + "train_dataset_bundles": count_dataset_bundles(train_bundles), + "train_dataset_samples": count_dataset_samples(train_samples), + "eval_dataset_samples": count_dataset_samples(eval_samples), + "test_dataset_samples": count_dataset_samples(test_samples), + } + + def partition(self, limits: SelectionLimits | None = None) -> DatasetPartitions: + """Build train/eval/test partitions together with a compact summary.""" + limits = limits or SelectionLimits() + all_samples = self.load_samples() + + train_samples = select_samples_by_split( + samples=all_samples, + split=self.split_config.train, + max_samples=limits.max_train_samples, + seed=self.seed, + ) + eval_samples = select_samples_by_split( + samples=all_samples, + split=self.split_config.eval, + max_samples=limits.max_eval_samples, + seed=self.seed, + ) + test_samples = select_samples_by_split( + samples=all_samples, + split=self.split_config.test, + max_samples=limits.max_test_samples, + seed=self.seed, + ) + + train_samples = self.adjust_train_samples(train_samples) + train_bundles = select_bundles( + self.train_bundle_builder(train_samples), + limits.max_train_bundles, + self.seed, + ) + eval_bundles = select_bundles( + self.eval_bundle_builder(eval_samples), limits.max_eval_bundles, self.seed + ) + test_bundles = select_bundles( + self.test_bundle_builder(test_samples), limits.max_test_bundles, self.seed + ) + + train_bundle_mode = "full" + if type(self).train_bundle_builder is not DatasetBuilder.train_bundle_builder: + train_bundle_mode = "pairwise_counterfactual" + + return DatasetPartitions( + dataset_name=self.dataset_name, + train_bundles=train_bundles, + eval_bundles=eval_bundles, + test_bundles=test_bundles, + summary=self._summary( + all_samples=all_samples, + train_samples=train_samples, + eval_samples=eval_samples, + test_samples=test_samples, + train_bundles=train_bundles, + eval_bundles=eval_bundles, + test_bundles=test_bundles, + train_bundle_mode=train_bundle_mode, + ), + ) + + +class LAVDFBuilder(DatasetBuilder): + """Load and bundle LAV-DF metadata.""" + + dataset_name = "lavdf" + + def __init__( + self, + config: LAVDFConfig, + split_config: SplitConfig | None = None, + seed: int = 42, + ) -> None: + super().__init__(split_config=split_config, seed=seed) + self.config = config + + def load_samples(self) -> list[FactHOSample]: + """Load LAV-DF metadata into flat FACT-HO samples.""" + metadata_path = self.config.metadata_path or ( + self.config.data_root / "metadata.json" + ) + if not metadata_path.exists(): + raise FileNotFoundError(f"LAV-DF metadata not found: {metadata_path}") + + rows = json.loads(metadata_path.read_text(encoding="utf-8")) + samples: list[FactHOSample] = [] + for row in rows: + rel_path = row.get("file") + if not rel_path: + continue + abs_path = self.config.data_root / rel_path + if not abs_path.exists(): + continue + n_fakes = int(row.get("n_fakes", 0) or 0) + audio_label = 1 if bool(row.get("modify_audio", False)) else 0 + video_label = 1 if bool(row.get("modify_video", False)) else 0 + bundle_id = str(row.get("original") or rel_path) + duration = float(row.get("duration", 0.0) or 0.0) + samples.append( + FactHOSample( + sample_id=f"lavdf::{rel_path}", + dataset_name=self.dataset_name, + split=str(row.get("split", "unknown")), + bundle_id=bundle_id, + content_family=duration_bucket( + duration=duration, + short_threshold=self.config.short_threshold, + medium_threshold=self.config.medium_threshold, + ), + video_path=str(abs_path), + audio_path=str(abs_path), + label_y=1 if n_fakes > 0 else 0, + label_a=audio_label, + label_v=video_label, + method="lavdf", + source=bundle_id, + metadata={ + "duration": duration, + "n_fakes": n_fakes, + "original": row.get("original"), + }, + ) + ) + return samples + + def train_bundle_builder(self, samples: list[FactHOSample]) -> list[FactHOBundle]: + """Build memory-friendlier pairwise train bundles for LAV-DF.""" + return build_lavdf_counterfactual_bundles(samples) + + +class FakeAVCelebBuilder(DatasetBuilder): + """Load and bundle FakeAVCeleb metadata.""" + + dataset_name = "fakeavceleb" + + def __init__( + self, + config: FakeAVCelebConfig, + split_config: SplitConfig | None = None, + seed: int = 42, + ) -> None: + super().__init__(split_config=split_config, seed=seed) + self.config = config + + def load_samples(self) -> list[FactHOSample]: + """Load FakeAVCeleb metadata into flat FACT-HO samples.""" + metadata_path = self.config.metadata_path or ( + self.config.data_root / "meta_data.csv" + ) + if not metadata_path.exists(): + raise FileNotFoundError(f"FakeAVCeleb metadata not found: {metadata_path}") + + samples: list[FactHOSample] = [] + with metadata_path.open("r", encoding="utf-8") as handle: + reader = csv.DictReader(handle) + for idx, row in enumerate(reader): + raw_filename = (row.get("filename") or "").strip() + raw_path = (row.get("path") or "").strip() + raw_dir_extra = (row.get("path_dir") or row.get("") or "").strip() + + if raw_filename: + filename = raw_filename + rel_dir = ( + raw_path + if raw_path and not raw_path.lower().endswith(".mp4") + else raw_dir_extra + ) + else: + filename = raw_path + rel_dir = raw_dir_extra + + if rel_dir.startswith("FakeAVCeleb/"): + rel_dir = rel_dir[len("FakeAVCeleb/") :] + + if not filename: + continue + + full_path = self.config.data_root / rel_dir / filename + if not full_path.exists(): + continue + + sample_type = (row.get("type") or "").strip() + category = (row.get("category") or "").strip() + method = (row.get("method") or "").strip() + source = (row.get("source") or "unknown").strip() + race = sanitize_slug(str(row.get("race", "")), "unknownrace") + gender = sanitize_slug(str(row.get("gender", "")), "unknowngender") + audio_label, video_label = infer_fakeav_modality_labels( + sample_type, category, method + ) + label_y = 1 if (audio_label == 1 or video_label == 1) else 0 + + target1 = (row.get("target1") or "").strip() + target2 = (row.get("target2") or "").strip() + anchor_target = target1 if target1 and target1 != "-" else target2 + if not anchor_target or anchor_target == "-": + anchor_target = filename + bundle_id = f"{source}::{anchor_target}" + content_family = f"face_{race}_{gender}" + + samples.append( + FactHOSample( + sample_id=f"fakeav::{source}::{filename}::{idx}", + dataset_name=self.dataset_name, + split=(row.get("split") or "unknown").strip(), + bundle_id=bundle_id, + content_family=content_family, + video_path=str(full_path), + audio_path=str(full_path), + label_y=label_y, + label_a=audio_label, + label_v=video_label, + method=method or "unknown", + source=source, + metadata={ + "race": race, + "gender": gender, + "category": category, + "sample_type": sample_type, + }, + ) + ) + + if self.config.split_strategy == "source": + assign_source_disjoint_splits( + samples=samples, + train_split_name=self.split_config.train, + eval_split_name=self.split_config.eval, + test_split_name=self.split_config.test, + train_ratio=self.config.train_ratio, + eval_ratio=self.config.eval_ratio, + test_ratio=self.config.test_ratio, + seed=self.seed, + ) + return samples + + def adjust_train_samples( + self, train_samples: list[FactHOSample] + ) -> list[FactHOSample]: + """Optionally rebalance fake methods in the train split.""" + if not self.config.rebalance_train: + return train_samples + return rebalance_train_samples_by_method( + train_samples=train_samples, + max_fake_real_ratio=self.config.max_fake_real_ratio, + seed=self.seed, + ) + + +class VCapAVBuilder(DatasetBuilder): + """Load and bundle VCapAV samples from a curated manifest file.""" + + dataset_name = "vcapav" + + def __init__( + self, + config: VCapAVConfig, + split_config: SplitConfig | None = None, + seed: int = 42, + ) -> None: + super().__init__(split_config=split_config, seed=seed) + self.config = config + + def _resolve_path(self, raw_path: str) -> str: + path = Path(raw_path) + if not path.is_absolute(): + path = self.config.data_root / path + return str(path) + + def load_samples(self) -> list[FactHOSample]: + """Load a manifest-first VCapAV view without cluster-specific zip extraction.""" + if not self.config.metadata_path.exists(): + raise FileNotFoundError( + f"VCapAV metadata not found: {self.config.metadata_path}" + ) + + samples: list[FactHOSample] = [] + with self.config.metadata_path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + stripped_line = raw_line.strip() + if not stripped_line: + continue + row = json.loads(stripped_line) + source = str( + row.get("source") + or row.get("clip_id") + or row.get("bundle_id") + or row.get("group_key") + ) + content_family = row.get( + "content_family" + ) or infer_vcapav_content_bucket( + source=source, + num_buckets=self.config.content_buckets, + ) + samples.append( + FactHOSample( + sample_id=str( + row.get("sample_id") or row.get("uid") or row.get("clip_id") + ), + dataset_name=self.dataset_name, + split=str(row.get("split", "unknown")), + bundle_id=str( + row.get("bundle_id") + or row.get("group_key") + or row.get("clip_id") + ), + content_family=str(content_family), + video_path=self._resolve_path(str(row["video_path"])), + audio_path=self._resolve_path(str(row["audio_path"])), + label_y=int(row.get("label_y", row.get("label", 0))), + label_a=int(row.get("label_a", row.get("audio_label", 0))), + label_v=int(row.get("label_v", row.get("video_label", 0))), + method=str(row.get("method", "unknown")), + source=source, + metadata={"scenario": row.get("scenario", "")}, + ) + ) + + if self.config.split_strategy == "source": + assign_source_disjoint_splits( + samples=samples, + train_split_name=self.split_config.train, + eval_split_name=self.split_config.eval, + test_split_name=self.split_config.test, + train_ratio=self.config.train_ratio, + eval_ratio=self.config.eval_ratio, + test_ratio=self.config.test_ratio, + seed=self.seed, + ) + return samples + + def train_bundle_builder(self, samples: list[FactHOSample]) -> list[FactHOBundle]: + """Build pairwise counterfactual train bundles for VCapAV.""" + return build_vcapav_counterfactual_bundles(samples) + + def adjust_train_samples( + self, train_samples: list[FactHOSample] + ) -> list[FactHOSample]: + """Optionally rebalance fake methods in the train split.""" + if not self.config.rebalance_train: + return train_samples + return rebalance_train_samples_by_method( + train_samples=train_samples, + max_fake_real_ratio=self.config.max_fake_real_ratio, + seed=self.seed, + ) diff --git a/src/aixpert/deepfake_detection/cli.py b/src/aixpert/deepfake_detection/cli.py new file mode 100644 index 0000000..8eb2be2 --- /dev/null +++ b/src/aixpert/deepfake_detection/cli.py @@ -0,0 +1,148 @@ +"""Command-line entrypoints for curated dataset preparation workflows.""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any + +from aixpert.deepfake_detection.builders import ( + FakeAVCelebBuilder, + FakeAVCelebConfig, + LAVDFBuilder, + LAVDFConfig, + SelectionLimits, + SplitConfig, + VCapAVBuilder, + VCapAVConfig, +) + + +def build_parser() -> argparse.ArgumentParser: + """Build the top-level CLI parser.""" + parser = argparse.ArgumentParser( + description="Curated FACT-HO dataset summary helpers." + ) + subparsers = parser.add_subparsers(dest="command", required=True) + + summarize = subparsers.add_parser( + "summarize", help="Build bundles and print a JSON summary." + ) + summarize.add_argument( + "--dataset", choices=["lavdf", "fakeavceleb", "vcapav"], required=True + ) + summarize.add_argument("--data-root", type=Path, required=True) + summarize.add_argument("--metadata-path", type=Path, default=None) + summarize.add_argument("--output-path", type=Path, default=None) + summarize.add_argument("--seed", type=int, default=42) + summarize.add_argument("--train-split", default="train") + summarize.add_argument("--eval-split", default="dev") + summarize.add_argument("--test-split", default="test") + summarize.add_argument("--max-train-bundles", type=int, default=0) + summarize.add_argument("--max-eval-bundles", type=int, default=0) + summarize.add_argument("--max-test-bundles", type=int, default=0) + summarize.add_argument("--max-train-samples", type=int, default=0) + summarize.add_argument("--max-eval-samples", type=int, default=0) + summarize.add_argument("--max-test-samples", type=int, default=0) + summarize.add_argument("--lavdf-short-threshold", type=float, default=5.0) + summarize.add_argument("--lavdf-medium-threshold", type=float, default=10.0) + summarize.add_argument( + "--fakeav-split-strategy", choices=["source", "metadata"], default="source" + ) + summarize.add_argument("--fakeav-train-ratio", type=float, default=0.8) + summarize.add_argument("--fakeav-eval-ratio", type=float, default=0.1) + summarize.add_argument("--fakeav-test-ratio", type=float, default=0.1) + summarize.add_argument("--fakeav-max-fake-real-ratio", type=float, default=12.0) + summarize.add_argument("--fakeav-no-rebalance-train", action="store_true") + summarize.add_argument( + "--vcapav-split-strategy", choices=["source", "metadata"], default="source" + ) + summarize.add_argument("--vcapav-train-ratio", type=float, default=0.8) + summarize.add_argument("--vcapav-eval-ratio", type=float, default=0.1) + summarize.add_argument("--vcapav-test-ratio", type=float, default=0.1) + summarize.add_argument("--vcapav-max-fake-real-ratio", type=float, default=4.0) + summarize.add_argument("--vcapav-content-buckets", type=int, default=8) + summarize.add_argument("--vcapav-no-rebalance-train", action="store_true") + return parser + + +def make_builder(args: argparse.Namespace) -> Any: + """Instantiate the dataset builder requested by the CLI.""" + split_config = SplitConfig( + train=args.train_split, eval=args.eval_split, test=args.test_split + ) + if args.dataset == "lavdf": + return LAVDFBuilder( + config=LAVDFConfig( + data_root=args.data_root, + metadata_path=args.metadata_path, + short_threshold=args.lavdf_short_threshold, + medium_threshold=args.lavdf_medium_threshold, + ), + split_config=split_config, + seed=args.seed, + ) + if args.dataset == "fakeavceleb": + return FakeAVCelebBuilder( + config=FakeAVCelebConfig( + data_root=args.data_root, + metadata_path=args.metadata_path, + split_strategy=args.fakeav_split_strategy, + train_ratio=args.fakeav_train_ratio, + eval_ratio=args.fakeav_eval_ratio, + test_ratio=args.fakeav_test_ratio, + rebalance_train=not args.fakeav_no_rebalance_train, + max_fake_real_ratio=args.fakeav_max_fake_real_ratio, + ), + split_config=split_config, + seed=args.seed, + ) + return VCapAVBuilder( + config=VCapAVConfig( + data_root=args.data_root, + metadata_path=args.metadata_path + or args.data_root / "vcapav_manifest.jsonl", + split_strategy=args.vcapav_split_strategy, + train_ratio=args.vcapav_train_ratio, + eval_ratio=args.vcapav_eval_ratio, + test_ratio=args.vcapav_test_ratio, + rebalance_train=not args.vcapav_no_rebalance_train, + max_fake_real_ratio=args.vcapav_max_fake_real_ratio, + content_buckets=args.vcapav_content_buckets, + ), + split_config=split_config, + seed=args.seed, + ) + + +def summarize_command(args: argparse.Namespace) -> dict[str, Any]: + """Build bundles for one dataset and return the JSON summary payload.""" + builder = make_builder(args) + partitions = builder.partition( + SelectionLimits( + max_train_bundles=args.max_train_bundles, + max_eval_bundles=args.max_eval_bundles, + max_test_bundles=args.max_test_bundles, + max_train_samples=args.max_train_samples, + max_eval_samples=args.max_eval_samples, + max_test_samples=args.max_test_samples, + ) + ) + return partitions.to_summary_dict() + + +def main() -> None: + """Run the CLI.""" + parser = build_parser() + args = parser.parse_args() + payload = summarize_command(args) + text = json.dumps(payload, indent=2, ensure_ascii=False) + if args.output_path is not None: + args.output_path.parent.mkdir(parents=True, exist_ok=True) + args.output_path.write_text(text + "\n", encoding="utf-8") + print(text) + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/deepfake_detection/core.py b/src/aixpert/deepfake_detection/core.py new file mode 100644 index 0000000..9c2ff5d --- /dev/null +++ b/src/aixpert/deepfake_detection/core.py @@ -0,0 +1,328 @@ +"""Core domain objects and reusable helpers for FACT-HO bundle preparation.""" + +from __future__ import annotations + +import hashlib +import random +import re +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class FactHOSample: + """Represent one multimodal sample with overall and modality-specific labels.""" + + sample_id: str + dataset_name: str + split: str + bundle_id: str + content_family: str + video_path: str + audio_path: str + label_y: int + label_a: int + label_v: int + method: str = "" + source: str = "" + metadata: dict[str, Any] = field(default_factory=dict) + + @property + def pattern(self) -> str: + """Return the compact audio/video supervision signature for this sample.""" + return f"{'F' if self.label_a else 'R'}{'F' if self.label_v else 'R'}" + + +@dataclass +class FactHOBundle: + """Group counterfactual samples that share source content and split membership.""" + + bundle_id: str + dataset_name: str + split: str + content_family: str + source: str + samples: list[FactHOSample] + group_name: str = "" + group_index: int = -1 + + @property + def pattern_signature(self) -> str: + """Summarize which audio/video label combinations appear in the bundle.""" + values = sorted({sample.pattern for sample in self.samples}) + return "+".join(values) + + +def count_binary(values: list[int]) -> tuple[int, int]: + """Count positive and negative labels in a binary sequence.""" + pos = int(sum(int(value) for value in values)) + neg = len(values) - pos + return pos, neg + + +def build_content_group_name(bundle: FactHOBundle) -> str: + """Build the stable group identifier used for robust bundle grouping.""" + signature = bundle.pattern_signature or "unknown" + return f"{bundle.dataset_name}|{bundle.content_family}|{signature}" + + +def assign_group_indices(bundles: list[FactHOBundle]) -> list[str]: + """Assign stable integer group ids to bundles based on dataset/content signature.""" + group_names = sorted({build_content_group_name(bundle) for bundle in bundles}) + mapping = {name: idx for idx, name in enumerate(group_names)} + for bundle in bundles: + bundle.group_name = build_content_group_name(bundle) + bundle.group_index = mapping[bundle.group_name] + return group_names + + +def build_bundles_from_samples(samples: list[FactHOSample]) -> list[FactHOBundle]: + """Group flat samples into deterministic FACT-HO bundles.""" + grouped: dict[tuple[str, str, str, str, str], list[FactHOSample]] = defaultdict( + list + ) + for sample in samples: + key = ( + sample.bundle_id, + sample.dataset_name, + sample.split, + sample.content_family, + sample.source, + ) + grouped[key].append(sample) + + bundles: list[FactHOBundle] = [] + for key, bundle_samples in grouped.items(): + bundle_id, dataset_name, split, content_family, source = key + ordered_samples = sorted( + bundle_samples, key=lambda item: (item.pattern, item.method, item.sample_id) + ) + bundles.append( + FactHOBundle( + bundle_id=bundle_id, + dataset_name=dataset_name, + split=split, + content_family=content_family, + source=source, + samples=ordered_samples, + ) + ) + + bundles.sort(key=lambda item: (item.dataset_name, item.split, item.bundle_id)) + return bundles + + +def count_bundle_patterns(bundles: list[FactHOBundle]) -> dict[str, int]: + """Count bundle pattern signatures for one partition.""" + counts: dict[str, int] = defaultdict(int) + for bundle in bundles: + counts[bundle.pattern_signature] += 1 + return dict(sorted(counts.items())) + + +def count_bundle_groups(bundles: list[FactHOBundle]) -> dict[str, int]: + """Count bundles per robust-group identifier.""" + counts: dict[str, int] = defaultdict(int) + for bundle in bundles: + key = bundle.group_name or build_content_group_name(bundle) + counts[key] += 1 + return dict(sorted(counts.items())) + + +def count_dataset_bundles(bundles: list[FactHOBundle]) -> dict[str, int]: + """Count bundles per dataset.""" + counts: dict[str, int] = defaultdict(int) + for bundle in bundles: + counts[bundle.dataset_name] += 1 + return dict(sorted(counts.items())) + + +def count_dataset_samples(samples: list[FactHOSample]) -> dict[str, int]: + """Count samples per dataset.""" + counts: dict[str, int] = defaultdict(int) + for sample in samples: + counts[sample.dataset_name] += 1 + return dict(sorted(counts.items())) + + +def normalize_ratio( + train_ratio: float, eval_ratio: float, test_ratio: float +) -> tuple[float, float, float]: + """Normalize split ratios so they sum to one.""" + total = train_ratio + eval_ratio + test_ratio + if total <= 0: + raise ValueError("train/eval/test ratios must sum to a positive number.") + return train_ratio / total, eval_ratio / total, test_ratio / total + + +def assign_source_disjoint_splits( + samples: list[FactHOSample], + train_split_name: str, + eval_split_name: str, + test_split_name: str, + train_ratio: float, + eval_ratio: float, + test_ratio: float, + seed: int, +) -> None: + """Assign source-disjoint train/eval/test splits in place.""" + train_ratio, eval_ratio, test_ratio = normalize_ratio( + train_ratio, eval_ratio, test_ratio + ) + source_ids = sorted({sample.source for sample in samples}) + if len(source_ids) < 3: + raise RuntimeError( + f"Need at least 3 unique source IDs for source-disjoint split, got {len(source_ids)}." + ) + + rng = random.Random(seed) + rng.shuffle(source_ids) + n_sources = len(source_ids) + + n_train = max(1, int(round(n_sources * train_ratio))) + n_eval = max(1, int(round(n_sources * eval_ratio))) + n_test = max(1, n_sources - n_train - n_eval) + if n_train + n_eval + n_test > n_sources: + overflow = n_train + n_eval + n_test - n_sources + n_train = max(1, n_train - overflow) + + if n_sources - (n_train + n_eval) <= 0: + n_test = 1 + if n_train > n_eval: + n_train -= 1 + else: + n_eval -= 1 + + train_sources = set(source_ids[:n_train]) + eval_sources = set(source_ids[n_train : n_train + n_eval]) + test_sources = set(source_ids[n_train + n_eval :]) + if not test_sources: + test_sources.add(source_ids[-1]) + train_sources.discard(source_ids[-1]) + + for sample in samples: + if sample.source in train_sources: + sample.split = train_split_name + elif sample.source in eval_sources: + sample.split = eval_split_name + else: + sample.split = test_split_name + + +def sanitize_slug(raw: str, default: str) -> str: + """Normalize free-form text into a lowercase slug.""" + text = re.sub(r"[^a-z0-9]+", "_", (raw or "").strip().lower()).strip("_") + return text or default + + +def duration_bucket( + duration: float, short_threshold: float, medium_threshold: float +) -> str: + """Map clip duration to a stable content bucket label.""" + if duration <= short_threshold: + return "face_short" + if duration <= medium_threshold: + return "face_medium" + return "face_long" + + +def select_samples_by_split( + samples: list[FactHOSample], split: str, max_samples: int, seed: int +) -> list[FactHOSample]: + """Filter one split and optionally cap the sample count.""" + subset = [sample for sample in samples if sample.split == split] + rng = random.Random(seed) + rng.shuffle(subset) + if max_samples > 0: + subset = subset[:max_samples] + return subset + + +def select_bundles( + bundles: list[FactHOBundle], max_bundles: int, seed: int +) -> list[FactHOBundle]: + """Optionally subsample bundles with a deterministic shuffle.""" + rng = random.Random(seed) + shuffled = list(bundles) + rng.shuffle(shuffled) + if max_bundles > 0: + shuffled = shuffled[:max_bundles] + shuffled.sort(key=lambda bundle: (bundle.dataset_name, bundle.bundle_id)) + return shuffled + + +def rebalance_train_samples_by_method( + train_samples: list[FactHOSample], + max_fake_real_ratio: float, + seed: int, +) -> list[FactHOSample]: + """Downsample fake methods so train skew stays within a target ratio.""" + if max_fake_real_ratio <= 0: + return train_samples + + real_samples = [sample for sample in train_samples if sample.label_y == 0] + fake_samples = [sample for sample in train_samples if sample.label_y == 1] + if not real_samples or not fake_samples: + return train_samples + + target_fake = int(round(len(real_samples) * max_fake_real_ratio)) + target_fake = min(target_fake, len(fake_samples)) + if target_fake >= len(fake_samples): + return train_samples + + rng = random.Random(seed) + by_method: dict[str, list[FactHOSample]] = defaultdict(list) + for sample in fake_samples: + by_method[sample.method or "unknown"].append(sample) + for values in by_method.values(): + rng.shuffle(values) + + keys = list(by_method.keys()) + rng.shuffle(keys) + selected_fake: list[FactHOSample] = [] + cursor = 0 + while len(selected_fake) < target_fake and keys: + key = keys[cursor % len(keys)] + bucket = by_method[key] + if bucket: + selected_fake.append(bucket.pop()) + cursor += 1 + continue + keys = [item for item in keys if by_method[item]] + if not keys: + break + + merged = list(real_samples) + selected_fake + rng.shuffle(merged) + return merged + + +def infer_fakeav_modality_labels( + sample_type: str, category: str, method: str +) -> tuple[int, int]: + """Infer audio/video fake labels from FakeAVCeleb metadata fields.""" + sample_type_l = (sample_type or "").strip().lower() + category_u = (category or "").strip().upper() + method_l = (method or "").strip().lower() + + if sample_type_l == "realvideo-realaudio" or category_u == "A": + return 0, 0 + if sample_type_l == "realvideo-fakeaudio" or category_u == "B": + return 1, 0 + if sample_type_l == "fakevideo-realaudio" or category_u == "C": + return 0, 1 + if sample_type_l == "fakevideo-fakeaudio" or category_u == "D": + return 1, 1 + if method_l == "real": + return 0, 0 + return 1, 1 + + +def infer_vcapav_content_bucket(source: str, num_buckets: int) -> str: + """Bucket VCapAV content sources into stable pseudo-content families.""" + if num_buckets <= 1: + return "environment_bucket_0" + digest = hashlib.md5(str(source).encode("utf-8")).hexdigest() + value = int(digest[:8], 16) % num_buckets + return f"environment_bucket_{value}" diff --git a/src/aixpert/deepfake_detection/py.typed b/src/aixpert/deepfake_detection/py.typed new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/aixpert/deepfake_detection/py.typed @@ -0,0 +1 @@ + diff --git a/tests/aixpert/deepfake_detection/__init__.py b/tests/aixpert/deepfake_detection/__init__.py new file mode 100644 index 0000000..5d3a897 --- /dev/null +++ b/tests/aixpert/deepfake_detection/__init__.py @@ -0,0 +1 @@ +"""Tests for the deepfake_detection module.""" diff --git a/tests/aixpert/deepfake_detection/test_builders.py b/tests/aixpert/deepfake_detection/test_builders.py new file mode 100644 index 0000000..de4f963 --- /dev/null +++ b/tests/aixpert/deepfake_detection/test_builders.py @@ -0,0 +1,175 @@ +"""Unit tests for the curated dataset builders.""" + +from __future__ import annotations + +import csv +import json +from pathlib import Path + +from aixpert.deepfake_detection.builders import ( + FakeAVCelebBuilder, + FakeAVCelebConfig, + LAVDFBuilder, + LAVDFConfig, + SplitConfig, + VCapAVBuilder, + VCapAVConfig, +) + + +def test_lavdf_builder_loads_counterfactual_pairs(media_root: Path) -> None: + """LAV-DF builder should create pairwise train bundles when possible.""" + video_dir = media_root / "lavdf" + video_dir.mkdir() + (video_dir / "clip_real.mp4").write_text("", encoding="utf-8") + (video_dir / "clip_fake.mp4").write_text("", encoding="utf-8") + + metadata_path = media_root / "lavdf_metadata.json" + metadata_path.write_text( + json.dumps( + [ + { + "file": "lavdf/clip_real.mp4", + "split": "train", + "original": "source_1", + "n_fakes": 0, + "modify_audio": False, + "modify_video": False, + "duration": 3.0, + }, + { + "file": "lavdf/clip_fake.mp4", + "split": "train", + "original": "source_1", + "n_fakes": 1, + "modify_audio": True, + "modify_video": False, + "duration": 3.0, + }, + ] + ), + encoding="utf-8", + ) + + builder = LAVDFBuilder( + LAVDFConfig(data_root=media_root, metadata_path=metadata_path) + ) + partitions = builder.partition() + + assert partitions.summary["train_bundle_mode"] == "pairwise_counterfactual" + assert len(partitions.train_bundles) == 1 + assert partitions.train_bundles[0].pattern_signature == "FR+RR" + + +def test_fakeav_builder_respects_metadata_split_strategy(media_root: Path) -> None: + """FakeAVCeleb builder should load explicit split labels from metadata.""" + data_dir = media_root / "fakeav" + data_dir.mkdir() + (data_dir / "real.mp4").write_text("", encoding="utf-8") + (data_dir / "fake.mp4").write_text("", encoding="utf-8") + + metadata_path = media_root / "fakeav.csv" + with metadata_path.open("w", encoding="utf-8", newline="") as handle: + writer = csv.DictWriter( + handle, + fieldnames=[ + "filename", + "path", + "path_dir", + "type", + "category", + "method", + "source", + "race", + "gender", + "target1", + "target2", + "split", + ], + ) + writer.writeheader() + writer.writerow( + { + "filename": "real.mp4", + "path": "", + "path_dir": "fakeav", + "type": "RealVideo-RealAudio", + "category": "A", + "method": "real", + "source": "src_a", + "race": "Asian", + "gender": "Female", + "target1": "anchor_a", + "target2": "-", + "split": "train", + } + ) + writer.writerow( + { + "filename": "fake.mp4", + "path": "", + "path_dir": "fakeav", + "type": "RealVideo-FakeAudio", + "category": "B", + "method": "tts_a", + "source": "src_a", + "race": "Asian", + "gender": "Female", + "target1": "anchor_a", + "target2": "-", + "split": "dev", + } + ) + + builder = FakeAVCelebBuilder( + FakeAVCelebConfig( + data_root=media_root, metadata_path=metadata_path, split_strategy="metadata" + ) + ) + partitions = builder.partition() + + assert partitions.summary["num_all_samples"] == 2 + assert len(partitions.train_bundles) == 1 + assert len(partitions.eval_bundles) == 1 + + +def test_vcapav_builder_loads_manifest_first_metadata(media_root: Path) -> None: + """VCapAV builder should support a single-sample manifest smoke test.""" + media_dir = media_root / "vcapav" + media_dir.mkdir() + (media_dir / "clip_001.mp4").write_text("", encoding="utf-8") + (media_dir / "clip_001.wav").write_text("", encoding="utf-8") + + metadata_path = media_root / "vcapav_manifest.jsonl" + metadata_path.write_text( + json.dumps( + { + "sample_id": "clip_001", + "clip_id": "clip_001", + "bundle_id": "clip_001", + "split": "train", + "video_path": "vcapav/clip_001.mp4", + "audio_path": "vcapav/clip_001.wav", + "label_y": 0, + "label_a": 0, + "label_v": 0, + "method": "real", + "source": "scene_001", + } + ) + + "\n", + encoding="utf-8", + ) + + builder = VCapAVBuilder( + VCapAVConfig( + data_root=media_root, + metadata_path=metadata_path, + split_strategy="metadata", + ), + split_config=SplitConfig(train="train", eval="dev", test="test"), + ) + partitions = builder.partition() + + assert partitions.summary["num_all_samples"] == 1 + assert len(partitions.train_bundles) == 1 diff --git a/tests/aixpert/deepfake_detection/test_cli.py b/tests/aixpert/deepfake_detection/test_cli.py new file mode 100644 index 0000000..6b26fd5 --- /dev/null +++ b/tests/aixpert/deepfake_detection/test_cli.py @@ -0,0 +1,60 @@ +"""Smoke tests for the command-line interface.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from aixpert.deepfake_detection.cli import build_parser, summarize_command + + +@pytest.mark.integration_test +def test_cli_summarize_supports_one_sample_vcapav_manifest(media_root: Path) -> None: + """The CLI should produce a JSON summary from a one-sample manifest.""" + media_dir = media_root / "vcapav" + media_dir.mkdir() + (media_dir / "clip_001.mp4").write_text("", encoding="utf-8") + (media_dir / "clip_001.wav").write_text("", encoding="utf-8") + + metadata_path = media_root / "smoke_manifest.jsonl" + metadata_path.write_text( + json.dumps( + { + "sample_id": "clip_001", + "clip_id": "clip_001", + "bundle_id": "clip_001", + "split": "train", + "video_path": "vcapav/clip_001.mp4", + "audio_path": "vcapav/clip_001.wav", + "label_y": 0, + "label_a": 0, + "label_v": 0, + "method": "real", + "source": "scene_001", + } + ) + + "\n", + encoding="utf-8", + ) + + parser = build_parser() + args = parser.parse_args( + [ + "summarize", + "--dataset", + "vcapav", + "--data-root", + str(media_root), + "--metadata-path", + str(metadata_path), + "--vcapav-split-strategy", + "metadata", + ] + ) + payload = summarize_command(args) + + assert payload["dataset_name"] == "vcapav" + assert payload["num_all_samples"] == 1 + assert payload["num_train_bundles"] == 1 diff --git a/tests/aixpert/deepfake_detection/test_core.py b/tests/aixpert/deepfake_detection/test_core.py new file mode 100644 index 0000000..aee09ea --- /dev/null +++ b/tests/aixpert/deepfake_detection/test_core.py @@ -0,0 +1,105 @@ +"""Unit tests for the lightweight FACT-HO domain helpers.""" + +from __future__ import annotations + +from aixpert.deepfake_detection.core import ( + FactHOSample, + assign_group_indices, + assign_source_disjoint_splits, + build_bundles_from_samples, + infer_fakeav_modality_labels, + infer_vcapav_content_bucket, + rebalance_train_samples_by_method, +) + + +def make_sample( + sample_id: str, source: str, split: str, label_a: int, label_v: int +) -> FactHOSample: + """Build a compact sample fixture.""" + return FactHOSample( + sample_id=sample_id, + dataset_name="fakeavceleb", + split=split, + bundle_id=source, + content_family="face_demo", + video_path=f"/tmp/{sample_id}.mp4", + audio_path=f"/tmp/{sample_id}.wav", + label_y=1 if (label_a or label_v) else 0, + label_a=label_a, + label_v=label_v, + method="demo", + source=source, + ) + + +def test_build_bundles_and_group_indices() -> None: + """Bundles should be deterministic and receive stable group indices.""" + samples = [ + make_sample("one", "clip_a", "train", 0, 0), + make_sample("two", "clip_a", "train", 1, 0), + make_sample("three", "clip_b", "train", 0, 0), + ] + bundles = build_bundles_from_samples(samples) + + assert [bundle.bundle_id for bundle in bundles] == ["clip_a", "clip_b"] + assert bundles[0].pattern_signature == "FR+RR" + + group_names = assign_group_indices(bundles) + assert len(group_names) == 2 + assert bundles[0].group_index >= 0 + assert bundles[1].group_index >= 0 + + +def test_assign_source_disjoint_splits() -> None: + """Source-disjoint assignment should cover all requested split names.""" + samples = [ + make_sample("a1", "source_a", "unknown", 0, 0), + make_sample("b1", "source_b", "unknown", 0, 0), + make_sample("c1", "source_c", "unknown", 1, 0), + ] + + assign_source_disjoint_splits( + samples, "train", "dev", "test", 0.6, 0.2, 0.2, seed=7 + ) + + splits = {sample.split for sample in samples} + assert splits == {"train", "dev", "test"} + + +def test_rebalance_train_samples_by_method() -> None: + """Method-aware rebalancing should reduce excessive fake skew.""" + train_samples = [ + make_sample("real", "r", "train", 0, 0), + make_sample("fake_1", "f1", "train", 1, 0), + make_sample("fake_2", "f2", "train", 1, 0), + make_sample("fake_3", "f3", "train", 0, 1), + ] + train_samples[1].method = "audio_a" + train_samples[2].method = "audio_b" + train_samples[3].method = "video_a" + + balanced = rebalance_train_samples_by_method( + train_samples, max_fake_real_ratio=1.0, seed=3 + ) + + num_fake = sum(sample.label_y for sample in balanced) + assert len(balanced) == 2 + assert num_fake == 1 + + +def test_infer_fakeav_modality_labels() -> None: + """Metadata labels should map to the expected modality supervision.""" + assert infer_fakeav_modality_labels("RealVideo-RealAudio", "", "") == (0, 0) + assert infer_fakeav_modality_labels("", "B", "") == (1, 0) + assert infer_fakeav_modality_labels("", "C", "") == (0, 1) + assert infer_fakeav_modality_labels("", "D", "") == (1, 1) + + +def test_infer_vcapav_content_bucket_is_deterministic() -> None: + """Content buckets should stay stable for a fixed source string.""" + bucket_a = infer_vcapav_content_bucket("clip_001", num_buckets=8) + bucket_b = infer_vcapav_content_bucket("clip_001", num_buckets=8) + + assert bucket_a == bucket_b + assert bucket_a.startswith("environment_bucket_") diff --git a/tests/aixpert/test_sample.py b/tests/aixpert/test_sample.py deleted file mode 100644 index dc366a2..0000000 --- a/tests/aixpert/test_sample.py +++ /dev/null @@ -1,16 +0,0 @@ -"""Sample test.""" - -import pytest - -import src.aixpert - - -@pytest.mark.integration_test() -def test_import(): - assert hasattr(src.aixpert, "__name__") - - -# TODO: Replace this -def test_samplefn(my_test_number: int) -> None: - """Test function.""" - assert my_test_number == 42 diff --git a/tests/conftest.py b/tests/conftest.py index 8b603f5..cc84d2f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,15 @@ -"""Conftest.""" +"""Shared pytest fixtures.""" + +from __future__ import annotations + +from pathlib import Path import pytest -# TODO: Replace this @pytest.fixture -def my_test_number() -> int: - """My test number. - - Returns - ------- - int: A really awesome number. - """ - return 42 +def media_root(tmp_path: Path) -> Path: + """Create a temporary media root for metadata-driven tests.""" + root = tmp_path / "media" + root.mkdir() + return root