diff --git a/backends/__init__.py b/backends/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/backends/__init__.py @@ -0,0 +1 @@ + diff --git a/backends/model_converter/convert_model.py b/backends/model_converter/convert_model.py index a9839a08..02b863c2 100644 --- a/backends/model_converter/convert_model.py +++ b/backends/model_converter/convert_model.py @@ -121,6 +121,7 @@ def convert_model(checkpoint_filename=None, out_filename=None, torch_weights=No raise ValueError("Invalid sd_version "+ sd_version) model_metadata = {"float_type" : cur_dtype , "sd_type" :sd_version, "type" : sd_type } print("__converted_model_data__" , json.dumps(model_metadata)) + return {"output_path": out_filename, "model_metadata": model_metadata} def usage(): @@ -155,4 +156,3 @@ def usage(): convert_model(checkpoint_filename , out_filename ) - diff --git a/backends/stable_diffusion/__init__.py b/backends/stable_diffusion/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/backends/stable_diffusion/__init__.py @@ -0,0 +1 @@ + diff --git a/backends/stable_diffusion/diffusionbee_backend.py b/backends/stable_diffusion/diffusionbee_backend.py index 8bc6f379..b458bd0a 100644 --- a/backends/stable_diffusion/diffusionbee_backend.py +++ b/backends/stable_diffusion/diffusionbee_backend.py @@ -1,75 +1,23 @@ print("starting backend") -import numpy as np -import argparse -from PIL import Image import json -import random import multiprocessing import sys -import copy -import math import time import traceback -import os -from pathlib import Path - - -# b2py t2im {"prompt": "sun glasses" , "img_width":640 , "img_height" : 640 , "num_imgs" : 10 , "input_image":"/Users/divamgupta/Downloads/inn.png" , "mask_image" : "/Users/divamgupta/Downloads/maa.png" , "is_inpaint":true } - -if not ( getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS')): - print("Adding sys paths") - dir_path = os.path.dirname(os.path.realpath(__file__)) - sys.path.append(os.path.join(dir_path , "../model_converter")) - - model_interface_path = os.environ.get('MODEL_INTERFACE_PATH') or "../stable_diffusion_tf_models" - sys.path.append( os.path.join(dir_path , model_interface_path) ) -else: - print("not adding sys paths") - - -from convert_model import convert_model -from stable_diffusion.stable_diffusion import StableDiffusion , ModelContainer -from stable_diffusion.utils.utils import get_sd_run_from_dict - -from applets.applets import register_applet , run_applet -from applets.frame_interpolator import FrameInterpolator -# get the model interface form the environ -USE_DUMMY_INTERFACE = False -if USE_DUMMY_INTERFACE : - from fake_interface import ModelInterface -else: - from interface import ModelInterface - -model_container = ModelContainer() - - - -home_path = Path.home() - -projects_root_path = os.path.join(home_path, ".diffusionbee") - -if not os.path.isdir(projects_root_path): - os.mkdir(projects_root_path) - - - - -if 'DEBUG' in os.environ and str(os.environ['DEBUG']) == '1': - debug_output_path = os.path.join(projects_root_path, "debug_outs") - if not os.path.isdir(debug_output_path): - os.mkdir(debug_output_path) - print("Debug outputs stored at : " , debug_output_path ) -else: - debug_output_path = None - +try: + from .applets.applets import register_applet, run_applet + from .applets.frame_interpolator import FrameInterpolator + from .service import DiffusionBeeService, generation_result_payload +except ImportError: + from applets.applets import register_applet, run_applet + from applets.frame_interpolator import FrameInterpolator + from service import DiffusionBeeService, generation_result_payload -defualt_data_root = os.path.join(projects_root_path, "images") - +# b2py t2im {"prompt": "sun glasses" , "img_width":640 , "img_height" : 640 , "num_imgs" : 10 , "input_image":"/Users/divamgupta/Downloads/inn.png" , "mask_image" : "/Users/divamgupta/Downloads/maa.png" , "is_inpaint":true } -if not os.path.isdir(defualt_data_root): - os.mkdir(defualt_data_root) +service = DiffusionBeeService() @@ -95,47 +43,10 @@ def __getattr__(self, attr): -def process_opt(d, generator): - - batch_size = 1# int(d['batch_size']) - n_imgs = math.ceil(d['num_imgs'] / batch_size) - sd_run = get_sd_run_from_dict(d) - - for i in range(n_imgs): - - sd_run.img_id = i - - print("got" , d ) - - outs = generator.generate(sd_run) - - if outs is None: - return - - img = outs['img'] - - if img is None: - return - - for i in range(len(img)): - s = ''.join(filter(str.isalnum, str(d['prompt'])[:30] )) - fpath = os.path.join(defualt_data_root , "%s_%d.png"%(s , random.randint(0 ,100000000)) ) - - Image.fromarray(img[i]).save(fpath) - ret_dict = {"generated_img_path" : fpath} - - if 'aux_img' in outs: - ret_dict['aux_output_image_path'] = outs['aux_img'] - - print("sdbk nwim %s"%(json.dumps(ret_dict)) ) - - - - def diffusion_bee_main(): time.sleep(2) - register_applet(model_container , FrameInterpolator) + register_applet(service.get_model_container(), FrameInterpolator) print("sdbk mltl Loading Model") @@ -148,8 +59,8 @@ def callback(state="" , progress=-1): if "__stop__" in get_input(): return "stop" - generator = StableDiffusion( model_container , ModelInterface , None , model_name=None, callback=callback, debug_output_path=debug_output_path ) - + service._progress_callback = callback + service._get_generator() print("sdbk mdld") @@ -171,8 +82,9 @@ def callback(state="" , progress=-1): try: d = json.loads(inp_str) print("sdbk inwk") # working on the input - - process_opt(d, generator) + result = service.generate_images(d, progress_callback=callback) + for image in generation_result_payload(result)["images"]: + print("sdbk nwim %s"%(json.dumps(image)) ) except Exception as e: traceback.print_exc() @@ -193,7 +105,7 @@ def callback(state="" , progress=-1): print("sdbk errr %s"%(str(e))) -from stable_diffusion.utils.stdin_input import is_avail, get_input +from stable_diffusion.utils.stdin_input import is_avail, get_input if __name__ == "__main__": @@ -202,8 +114,7 @@ def callback(state="" , progress=-1): if len(sys.argv) > 1 and sys.argv[1] == 'convert_model': checkpoint_filename = sys.argv[2] out_filename = sys.argv[3] - convert_model(checkpoint_filename, out_filename ) + service.convert_model(checkpoint_filename, out_filename) print("model converted ") else: diffusion_bee_main() - diff --git a/backends/stable_diffusion/service.py b/backends/stable_diffusion/service.py new file mode 100644 index 00000000..7b0bd81a --- /dev/null +++ b/backends/stable_diffusion/service.py @@ -0,0 +1,217 @@ +from __future__ import annotations + +import os +import random +import sys +from dataclasses import asdict, dataclass +from importlib.util import module_from_spec, spec_from_file_location +from pathlib import Path +from typing import Callable + +from PIL import Image + + +@dataclass +class GeneratedImage: + generated_img_path: str + aux_output_image_path: str | None = None + + +@dataclass +class GenerationResult: + images: list[GeneratedImage] + + +ProgressCallback = Callable[[str, float], str | None] + + +def _ensure_backend_import_paths() -> Path: + backend_root = Path(__file__).resolve().parent + model_converter_root = backend_root.parent / "model_converter" + model_interface_path = os.environ.get("MODEL_INTERFACE_PATH") or "../stable_diffusion_tf_models" + model_interface_root = (backend_root / model_interface_path).resolve() + + for path in (model_converter_root, model_interface_root): + path_str = str(path) + if path_str not in sys.path: + sys.path.append(path_str) + + return backend_root + + +def get_projects_root() -> Path: + projects_root = Path.home() / ".diffusionbee" + projects_root.mkdir(parents=True, exist_ok=True) + return projects_root + + +def get_default_output_dir() -> Path: + output_dir = get_projects_root() / "images" + output_dir.mkdir(parents=True, exist_ok=True) + return output_dir + + +def get_debug_output_path() -> str | None: + if str(os.environ.get("DEBUG")) != "1": + return None + + debug_output_dir = get_projects_root() / "debug_outs" + debug_output_dir.mkdir(parents=True, exist_ok=True) + return str(debug_output_dir) + + +def _load_convert_model(): + _ensure_backend_import_paths() + from convert_model import convert_model + + return convert_model + + +def _load_model_interface(use_dummy_interface: bool): + backend_root = _ensure_backend_import_paths() + if use_dummy_interface: + module_path = backend_root / "fake_interface" / "interface.py" + module_name = "diffusionbee_fake_interface" + else: + model_interface_path = os.environ.get("MODEL_INTERFACE_PATH") or "../stable_diffusion_tf_models" + module_path = (backend_root / model_interface_path / "interface.py").resolve() + module_name = "diffusionbee_model_interface" + + spec = spec_from_file_location(module_name, module_path) + if spec is None or spec.loader is None: + raise ImportError(f"Unable to load ModelInterface from {module_path}") + + module = module_from_spec(spec) + spec.loader.exec_module(module) + return module.ModelInterface + + +def _load_generator_dependencies(): + _ensure_backend_import_paths() + from stable_diffusion.stable_diffusion import ModelContainer, StableDiffusion + from stable_diffusion.utils.utils import get_sd_run_from_dict + + return ModelContainer, StableDiffusion, get_sd_run_from_dict + + +class DiffusionBeeService: + def __init__( + self, + *, + use_dummy_interface: bool = False, + output_dir: str | Path | None = None, + image_writer: Callable[[object, str], None] | None = None, + random_int: Callable[[int, int], int] | None = None, + generator_factory: Callable[[Callable[[str, float], str | None]], object] | None = None, + sd_run_factory: Callable[[dict], object] | None = None, + ) -> None: + self.use_dummy_interface = use_dummy_interface + self.output_dir = Path(output_dir) if output_dir is not None else get_default_output_dir() + self.output_dir.mkdir(parents=True, exist_ok=True) + self.image_writer = image_writer or self._default_image_writer + self.random_int = random_int or random.randint + self._generator_factory = generator_factory + self._sd_run_factory = sd_run_factory + self._progress_callback: ProgressCallback | None = None + self._generator = None + self._model_container = None + + def _default_image_writer(self, image_array: object, output_path: str) -> None: + Image.fromarray(image_array).save(output_path) + + def _handle_progress(self, state: str = "", progress: float = -1) -> str | None: + if self._progress_callback is None: + return None + return self._progress_callback(state, progress) + + def _get_generator(self): + if self._generator is not None: + return self._generator + + if self._generator_factory is not None: + self._generator = self._generator_factory(self._handle_progress) + return self._generator + + ModelContainer, StableDiffusion, _ = _load_generator_dependencies() + ModelInterface = _load_model_interface(self.use_dummy_interface) + + self._model_container = ModelContainer() + self._generator = StableDiffusion( + self._model_container, + ModelInterface, + None, + model_name=None, + callback=self._handle_progress, + debug_output_path=get_debug_output_path(), + ) + return self._generator + + def get_model_container(self): + if self._model_container is not None: + return self._model_container + + generator = self._get_generator() + model_container = getattr(generator, "model_container", None) + if model_container is None: + raise RuntimeError("Generator does not expose a model_container") + return model_container + + def _build_sd_run(self, request: dict): + if self._sd_run_factory is not None: + return self._sd_run_factory(request) + + _, _, get_sd_run_from_dict = _load_generator_dependencies() + return get_sd_run_from_dict(request) + + def _make_output_path(self, prompt: str) -> Path: + stem = "".join(filter(str.isalnum, prompt[:30])) or "image" + return self.output_dir / f"{stem}_{self.random_int(0, 100000000)}.png" + + def generate_images( + self, + request: dict, + *, + progress_callback: ProgressCallback | None = None, + ) -> GenerationResult: + if "prompt" not in request or not str(request["prompt"]).strip(): + raise ValueError("prompt must not be empty") + + self._progress_callback = progress_callback + generator = self._get_generator() + sd_run = self._build_sd_run(request) + total_images = int(request.get("num_imgs", 1) or 1) + images: list[GeneratedImage] = [] + + for image_index in range(total_images): + sd_run.img_id = image_index + outs = generator.generate(sd_run) + if outs is None or outs.get("img") is None: + break + + for image_array in outs["img"]: + output_path = self._make_output_path(str(request["prompt"])) + self.image_writer(image_array, str(output_path)) + images.append( + GeneratedImage( + generated_img_path=str(output_path), + aux_output_image_path=outs.get("aux_img"), + ) + ) + + return GenerationResult(images=images) + + def convert_model(self, checkpoint_path: str, output_path: str) -> dict: + if not checkpoint_path.strip(): + raise ValueError("checkpoint_path must not be empty") + if not output_path.strip(): + raise ValueError("output_path must not be empty") + + convert_model = _load_convert_model() + result = convert_model(checkpoint_path, output_path) + if result is None: + return {"output_path": output_path} + return result + + +def generation_result_payload(result: GenerationResult) -> dict: + return {"images": [asdict(image) for image in result.images]} diff --git a/mcp_bundle/.gitignore b/mcp_bundle/.gitignore new file mode 100644 index 00000000..ac8bfacc --- /dev/null +++ b/mcp_bundle/.gitignore @@ -0,0 +1,3 @@ +*.mcpb +lib/ +venv/ diff --git a/mcp_bundle/README.md b/mcp_bundle/README.md new file mode 100644 index 00000000..7868e49a --- /dev/null +++ b/mcp_bundle/README.md @@ -0,0 +1,105 @@ +# Diffusion Bee MCP Bundle + +This directory is the starting point for a standalone MCP Bundle target. + +It is intentionally separate from the Electron desktop application in +[`electron_app/`](../electron_app) and the existing UI-to-backend bridge. The +goal is to expose a narrow, headless MCP server surface that can reuse backend +generation logic without depending on Electron process management or the +`b2py`/`sdbk` string protocol. + +## Current Status + +This now contains an experimental FastMCP-based server backed by a shared Python service +module in [`backends/stable_diffusion/service.py`](../backends/stable_diffusion/service.py). + +Included here: + +- `manifest.json`: provisional MCPB manifest for a Python-based local server +- `requirements.txt`: pins the Python MCP SDK to the stable v1 line +- `server/main.py`: FastMCP entry point and tool registration +- `server/service.py`: thin MCP-to-backend adapter +- `tests/`: unit tests for both the MCP adapter and backend service seam + +Not included yet: + +- packaged Python dependencies +- bundle build/validation automation + +## FastMCP + +This bundle is scaffolded around FastMCP rather than a hand-rolled MCP +transport. + +Install the Python dependency from this directory with: + +```bash +pip install -r mcp_bundle/requirements.txt +``` + +The current dependency target is `mcp[cli]<2` so the bundle stays on the +stable v1 SDK line instead of the pre-alpha v2 branch. + +Run the server directly with: + +```bash +python mcp_bundle/server/main.py +``` + +Run the current tests with: + +```bash +python -m unittest discover -s mcp_bundle/tests -p 'test_*.py' +``` + +## V0 Tool Surface + +The initial tool shape follows an Easy Diffusion-style approach: a small number +of high-level, opinionated tools rather than a graph API. + +Current tools: + +- `bundle_status`: working introspection tool for checking scaffold status +- `generate_image_tool`: forwards requests into the extracted backend service +- `convert_model_tool`: forwards model conversion requests into the extracted backend service + +The `generate_image_tool` parameters are intentionally close to the existing +frontend request shape: + +- `prompt` +- `negative_prompt` +- `width` +- `height` +- `steps` +- `guidance_scale` +- `seed` +- `num_images` +- `model_tdict_path` + +## Intended Architecture + +The MCP bundle should depend on extracted backend modules, not on Electron app +code. In particular, it should not import or mimic: + +- `electron_app/src/bridge.js` +- `electron_app/src/py_vue_bridge.js` +- renderer state update messages like `utds` or `sdbk` + +Instead, the extraction path should look like: + +1. Move generation and model-management operations behind a Python service API. +2. Keep CLI / stdio / MCP request handling inside `mcp_bundle/`. +3. Have the MCP server call shared backend functions directly. + +## Extracted Service + +The shared backend service now lives under `backends/stable_diffusion/` with a +small, explicit API centered on: + +- `generate_images(request) -> list[GeneratedImage]` +- `convert_model(checkpoint_path, output_path) -> ConversionResult` + +The Electron stdin backend and the FastMCP server both call into that shared +service. `mcp_bundle/server/main.py` stays transport-only, while +`mcp_bundle/server/service.py` is a narrow adapter that translates MCP tool +arguments into backend service requests. diff --git a/mcp_bundle/manifest.json b/mcp_bundle/manifest.json new file mode 100644 index 00000000..842d967a --- /dev/null +++ b/mcp_bundle/manifest.json @@ -0,0 +1,30 @@ +{ + "manifest_version": "0.1", + "dxt_version": "0.1", + "name": "diffusionbee-local", + "display_name": "Diffusion Bee Local", + "version": "0.1.0", + "description": "Standalone MCP bundle scaffold for Diffusion Bee local image generation.", + "author": { + "name": "Diffusion Bee" + }, + "homepage": "https://github.com/divamgupta/diffusionbee-stable-diffusion-ui", + "server": { + "type": "python", + "entry_point": "server/main.py", + "mcp_config": { + "command": "python", + "args": [ + "${__dirname}/server/main.py" + ] + } + }, + "compatibility": { + "platforms": [ + "darwin" + ], + "runtimes": { + "python": ">=3.9" + } + } +} diff --git a/mcp_bundle/requirements.txt b/mcp_bundle/requirements.txt new file mode 100644 index 00000000..58ff0f4e --- /dev/null +++ b/mcp_bundle/requirements.txt @@ -0,0 +1 @@ +mcp[cli]<2 diff --git a/mcp_bundle/server/main.py b/mcp_bundle/server/main.py new file mode 100644 index 00000000..4ca3959e --- /dev/null +++ b/mcp_bundle/server/main.py @@ -0,0 +1,103 @@ +"""FastMCP entry point for the Diffusion Bee MCP bundle.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +try: + from mcp.server.fastmcp import FastMCP +except ImportError as exc: # pragma: no cover - import guard for local setup + FastMCP = None + FASTMCP_IMPORT_ERROR = exc +else: + FASTMCP_IMPORT_ERROR = None + +try: + from .service import BundleStatus, convert_model, generate_image +except ImportError: + from service import BundleStatus, convert_model, generate_image + + +ROOT_DIR = Path(__file__).resolve().parents[1] + + +def _build_server() -> "FastMCP": + server = FastMCP( + "Diffusion Bee Local", + instructions=( + "High-level local image generation tools for Diffusion Bee. " + "This server is intentionally opinionated and does not expose " + "Electron-specific transport details." + ), + ) + + @server.tool() + def bundle_status() -> BundleStatus: + """Describe the current state of the Diffusion Bee MCP bundle.""" + return BundleStatus( + implementation_status="experimental", + bundle_root=str(ROOT_DIR), + backend_root=str(ROOT_DIR.parent / "backends" / "stable_diffusion"), + notes=[ + "FastMCP server and backend extraction are in place.", + "The MCP layer delegates to extracted backend services.", + "Use this server as a thin wrapper over shared backend services.", + ], + ) + + @server.tool() + def generate_image_tool( + prompt: str, + negative_prompt: str = "", + width: int = 512, + height: int = 512, + steps: int = 25, + guidance_scale: float = 7.5, + seed: int = 0, + num_images: int = 1, + model_tdict_path: str = "", + ) -> dict: + """Generate images from a prompt using an Easy Diffusion-style API.""" + return generate_image( + prompt=prompt, + negative_prompt=negative_prompt, + width=width, + height=height, + steps=steps, + guidance_scale=guidance_scale, + seed=seed, + num_images=num_images, + model_tdict_path=model_tdict_path, + ) + + @server.tool() + def convert_model_tool( + checkpoint_path: str, + output_path: str, + ) -> dict: + """Convert a checkpoint into Diffusion Bee's runtime format.""" + return convert_model( + checkpoint_path=checkpoint_path, + output_path=output_path, + ) + + return server + + +def main() -> int: + if FastMCP is None: + sys.stderr.write( + "FastMCP is not installed. Install dependencies from " + "mcp_bundle/requirements.txt before running this server.\n" + ) + sys.stderr.write(f"Import error: {FASTMCP_IMPORT_ERROR}\n") + return 1 + + server = _build_server() + server.run() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/mcp_bundle/server/service.py b/mcp_bundle/server/service.py new file mode 100644 index 00000000..3eac49ee --- /dev/null +++ b/mcp_bundle/server/service.py @@ -0,0 +1,81 @@ +"""Thin service layer for the Diffusion Bee MCP bundle. + +These functions keep the FastMCP layer thin by delegating to the extracted +backend service module. +""" + +from __future__ import annotations + +import sys +from dataclasses import asdict, dataclass +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[2] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from backends.stable_diffusion.service import DiffusionBeeService, generation_result_payload + + +@dataclass +class BundleStatus: + implementation_status: str + bundle_root: str + backend_root: str + notes: list[str] + + +_BACKEND_SERVICE: DiffusionBeeService | None = None + + +def _get_backend_service() -> DiffusionBeeService: + global _BACKEND_SERVICE + if _BACKEND_SERVICE is None: + _BACKEND_SERVICE = DiffusionBeeService() + return _BACKEND_SERVICE + + +def generate_image( + *, + prompt: str, + negative_prompt: str, + width: int, + height: int, + steps: int, + guidance_scale: float, + seed: int, + num_images: int, + model_tdict_path: str, +) -> dict: + if not prompt.strip(): + raise ValueError("prompt must not be empty") + + request = { + "prompt": prompt, + "negative_prompt": negative_prompt, + "img_width": width, + "img_height": height, + "num_steps": steps, + "guidance_scale": guidance_scale, + "seed": seed, + "num_imgs": num_images, + "model_tdict_path": model_tdict_path, + } + result = _get_backend_service().generate_images(request) + payload = generation_result_payload(result) + payload["request"] = request + return payload + + +def convert_model(*, checkpoint_path: str, output_path: str) -> dict: + if not checkpoint_path.strip(): + raise ValueError("checkpoint_path must not be empty") + if not output_path.strip(): + raise ValueError("output_path must not be empty") + + return _get_backend_service().convert_model(checkpoint_path, output_path) + + +def bundle_status_payload(status: BundleStatus) -> dict: + return asdict(status) diff --git a/mcp_bundle/tests/test_backend_service.py b/mcp_bundle/tests/test_backend_service.py new file mode 100644 index 00000000..885579c2 --- /dev/null +++ b/mcp_bundle/tests/test_backend_service.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from types import SimpleNamespace +from unittest import mock + +from backends.stable_diffusion.service import ( + DiffusionBeeService, + GenerationResult, + GeneratedImage, + generation_result_payload, +) + + +class FakeGenerator: + def __init__(self, callback): + self.callback = callback + self.calls = [] + self.model_container = object() + + def generate(self, sd_run): + self.calls.append(sd_run.img_id) + self.callback("Generating", 50.0) + return {"img": [object()], "aux_img": "/tmp/aux.png"} + + +class BackendServiceTests(unittest.TestCase): + def test_generation_result_payload_serializes_images(self) -> None: + payload = generation_result_payload( + GenerationResult(images=[GeneratedImage(generated_img_path="/tmp/out.png")]) + ) + + self.assertEqual( + payload, + {"images": [{"generated_img_path": "/tmp/out.png", "aux_output_image_path": None}]}, + ) + + def test_generate_images_saves_outputs_and_reports_progress(self) -> None: + written_paths: list[str] = [] + progress_events: list[tuple[str, float]] = [] + + def image_writer(_image, output_path: str) -> None: + written_paths.append(output_path) + + with tempfile.TemporaryDirectory() as temp_dir: + service = DiffusionBeeService( + output_dir=temp_dir, + image_writer=image_writer, + random_int=lambda _a, _b: 42, + generator_factory=lambda callback: FakeGenerator(callback), + sd_run_factory=lambda _request: SimpleNamespace(img_id=None), + ) + + result = service.generate_images( + {"prompt": "hello world", "num_imgs": 2}, + progress_callback=lambda state, progress: progress_events.append((state, progress)), + ) + + self.assertEqual(len(result.images), 2) + self.assertEqual(len(written_paths), 2) + self.assertTrue(all(path.endswith("helloworld_42.png") for path in written_paths)) + self.assertEqual(progress_events, [("Generating", 50.0), ("Generating", 50.0)]) + + @mock.patch("backends.stable_diffusion.service._load_convert_model") + def test_convert_model_delegates_to_converter(self, load_convert_model: mock.Mock) -> None: + load_convert_model.return_value = mock.Mock( + return_value={"output_path": "/tmp/model.tdict", "model_metadata": {"type": "sd_model"}} + ) + service = DiffusionBeeService(output_dir=Path("/tmp")) + + payload = service.convert_model("/tmp/in.ckpt", "/tmp/model.tdict") + + load_convert_model.return_value.assert_called_once_with("/tmp/in.ckpt", "/tmp/model.tdict") + self.assertEqual(payload["model_metadata"]["type"], "sd_model") + + +if __name__ == "__main__": + unittest.main() diff --git a/mcp_bundle/tests/test_service.py b/mcp_bundle/tests/test_service.py new file mode 100644 index 00000000..1fe9f5e9 --- /dev/null +++ b/mcp_bundle/tests/test_service.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +import unittest +from unittest import mock + +from mcp_bundle.server.service import ( + BundleStatus, + bundle_status_payload, + convert_model, + generate_image, +) + + +class BundleStatusPayloadTests(unittest.TestCase): + def test_bundle_status_payload_serializes_dataclass(self) -> None: + status = BundleStatus( + implementation_status="experimental", + bundle_root="/tmp/bundle", + backend_root="/tmp/backend", + notes=["note-1", "note-2"], + ) + + payload = bundle_status_payload(status) + + self.assertEqual( + payload, + { + "implementation_status": "experimental", + "bundle_root": "/tmp/bundle", + "backend_root": "/tmp/backend", + "notes": ["note-1", "note-2"], + }, + ) + + +class GenerateImageTests(unittest.TestCase): + def test_generate_image_rejects_empty_prompt(self) -> None: + with self.assertRaisesRegex(ValueError, "prompt must not be empty"): + generate_image( + prompt=" ", + negative_prompt="", + width=512, + height=512, + steps=25, + guidance_scale=7.5, + seed=0, + num_images=1, + model_tdict_path="", + ) + + @mock.patch("mcp_bundle.server.service._get_backend_service") + def test_generate_image_forwards_request_to_backend_service(self, get_backend_service: mock.Mock) -> None: + backend_service = get_backend_service.return_value + backend_service.generate_images.return_value = mock.Mock( + images=[mock.Mock(generated_img_path="/tmp/out.png", aux_output_image_path=None)] + ) + + with mock.patch( + "mcp_bundle.server.service.generation_result_payload", + return_value={"images": [{"generated_img_path": "/tmp/out.png", "aux_output_image_path": None}]}, + ): + payload = generate_image( + prompt="a lighthouse in fog", + negative_prompt="low quality", + width=640, + height=768, + steps=30, + guidance_scale=8.0, + seed=123, + num_images=2, + model_tdict_path="/models/default.tdict", + ) + + backend_service.generate_images.assert_called_once_with( + { + "prompt": "a lighthouse in fog", + "negative_prompt": "low quality", + "img_width": 640, + "img_height": 768, + "num_steps": 30, + "guidance_scale": 8.0, + "seed": 123, + "num_imgs": 2, + "model_tdict_path": "/models/default.tdict", + } + ) + self.assertEqual(payload["images"][0]["generated_img_path"], "/tmp/out.png") + self.assertEqual(payload["request"]["img_width"], 640) + + +class ConvertModelTests(unittest.TestCase): + def test_convert_model_requires_checkpoint_path(self) -> None: + with self.assertRaisesRegex(ValueError, "checkpoint_path must not be empty"): + convert_model(checkpoint_path=" ", output_path="/tmp/out") + + def test_convert_model_requires_output_path(self) -> None: + with self.assertRaisesRegex(ValueError, "output_path must not be empty"): + convert_model(checkpoint_path="/tmp/in.ckpt", output_path=" ") + + @mock.patch("mcp_bundle.server.service._get_backend_service") + def test_convert_model_delegates_to_backend_service(self, get_backend_service: mock.Mock) -> None: + backend_service = get_backend_service.return_value + backend_service.convert_model.return_value = {"output_path": "/tmp/model.tdict"} + + payload = convert_model( + checkpoint_path="/tmp/model.ckpt", + output_path="/tmp/model.tdict", + ) + + backend_service.convert_model.assert_called_once_with( + "/tmp/model.ckpt", + "/tmp/model.tdict", + ) + self.assertEqual(payload["output_path"], "/tmp/model.tdict") + + +if __name__ == "__main__": + unittest.main()