From 93fc009b71983156e565f954e3f781d191e2bbe5 Mon Sep 17 00:00:00 2001 From: SSobol77 Date: Fri, 26 Jun 2026 04:35:56 +0200 Subject: [PATCH] feat: add F4 diagnostics linter panel --- docs/extensions/diagnostics-model.md | 44 +- src/ecli/core/Ecli.py | 276 ++++- src/ecli/diagnostics/__init__.py | 22 + src/ecli/diagnostics/display.py | 96 ++ src/ecli/diagnostics/models.py | 131 +++ src/ecli/diagnostics/ruff_provider.py | 322 ++++++ src/ecli/diagnostics/service.py | 223 ++++ src/ecli/integrations/LinterBridge.py | 103 +- src/ecli/ui/DrawScreen.py | 48 +- src/ecli/ui/KeyBinder.py | 2 +- src/ecli/ui/PanelManager.py | 2 + src/ecli/ui/panels.py | 544 +++++++++ .../test_existing_keybindings.py | 5 +- tests/core/test_diagnostics_service.py | 284 +++++ tests/ui/test_diagnostics_panel.py | 1011 +++++++++++++++++ 15 files changed, 3099 insertions(+), 14 deletions(-) create mode 100644 src/ecli/diagnostics/__init__.py create mode 100644 src/ecli/diagnostics/display.py create mode 100644 src/ecli/diagnostics/models.py create mode 100644 src/ecli/diagnostics/ruff_provider.py create mode 100644 src/ecli/diagnostics/service.py create mode 100644 tests/core/test_diagnostics_service.py create mode 100644 tests/ui/test_diagnostics_panel.py diff --git a/docs/extensions/diagnostics-model.md b/docs/extensions/diagnostics-model.md index 58254d0..0ef187d 100644 --- a/docs/extensions/diagnostics-model.md +++ b/docs/extensions/diagnostics-model.md @@ -23,20 +23,24 @@ See the LICENSE file in the project root for full license text. | Field | Type | Required | Meaning | Notes | |---|---|---:|---|---| +| `file_path` | string | Yes | file containing the diagnostic | absolute or provider-reported path | | `source` | string | Yes | origin system (`ruff`, `devops-linter`, etc.) | normalized identifier | -| `severity` | enum | Yes | issue severity | `error` / `warning` / `info` | +| `severity` | enum | Yes | issue severity | `error` / `warning` / `info` / `hint` | | `message` | string | Yes | diagnostic text | user-visible | | `line` | int | Yes | 1-based line | normalize from source if needed | -| `column` | int | No | 1-based column | optional if unavailable | +| `column` | int | Yes | 1-based column | set to `1` when unavailable | | `end_line` | int | No | range end line | optional | | `end_column` | int | No | range end column | optional | -| `code` | string | No | provider code/id | optional | +| `code` | string/null | No | provider code/id | optional; null provider codes must not drop the diagnostic | +| `fix_hint` | string | No | provider fix description | text only; ECLI must not apply edits automatically in the F4 milestone | +| `suggested_code` | string | No | optional provider-proposed code shape | preview-only; never applied by the F4 milestone | ## Severity Taxonomy - `error`: actionable issue likely blocking correctness. - `warning`: non-fatal issue requiring review. - `info`: advisory. +- `hint`: lowest-severity guidance or optional improvement. ## External Source Mapping Table @@ -48,9 +52,41 @@ See the LICENSE file in the project root for full license text. ## Normalization Rules - normalize numeric positions to 1-based display coordinates. -- fill absent optional fields with null/omission. +- fill absent line/column values with 1 for visible, navigable output. +- deterministic UI ordering is severity, file path, line, column, source, code. +- fix data is display-only text; automatic code modification is out of scope for + the F4 Diagnostics Panel milestone. - reject malformed mandatory fields and emit fallback diagnostic. +## F4 Panel Display Contract + +- main-list rows use short severity, source, project-relative/basename path, + line, column, and message: + ` :: `. +- absolute paths may remain in the normalized diagnostic model, but the panel + must not render absolute paths before the message. +- constrained-width rows must preserve severity/source, line/column, and visible + message text before spending remaining space on path context. +- the right-side panel is the only authoritative diagnostics list. +- `F4` opens/closes the panel and must not start Ruff automatically. +- `r` runs diagnostics for the current file; `R` runs workspace diagnostics. +- a completed clean run displays `Diagnostics: PASS` and `No issues found.`; + the status bar message is `Diagnostics: PASS — no issues found.` and the + PASS label uses the success colour role when colour is available. +- `Enter` is navigation-only and must report + `Jumped to ::` on success. +- `d` or `Space` opens a centered `Diagnostic details` popup for the currently + selected diagnostic only. The popup may show path, line/column, source, code, + full message, fix hint, and suggested code shape, and must include + `Preview only. No changes were applied.` +- the centered popup must never render a second full diagnostics list. +- while the panel is open, the selected diagnostic marks only the matching + editor line-number/gutter using severity colour (`error`, `warning`, + `info`/`hint`). Selection changes update the marker without moving the + cursor; `Enter` remains the only jump action. The marker clears on panel + close, refresh start, clean/replaced snapshots, stale selections, and file + switches away from the diagnostic file. + ## Malformed Diagnostics Handling - malformed item: log warning/error, skip item, continue pipeline. diff --git a/src/ecli/core/Ecli.py b/src/ecli/core/Ecli.py index 81e05d4..7c4a698 100755 --- a/src/ecli/core/Ecli.py +++ b/src/ecli/core/Ecli.py @@ -77,6 +77,7 @@ from ecli.core.AsyncEngine import AsyncEngine from ecli.core.CodeCommenter import CodeCommenter +from ecli.diagnostics.display import diagnostic_display_path # Imports from ecli package from ecli.core.History import History @@ -94,7 +95,7 @@ hit_test, ) from ecli.ui.PanelManager import PanelManager -from ecli.ui.panels import FileBrowserPanel, GitPanel +from ecli.ui.panels import DiagnosticsPanel, FileBrowserPanel, GitPanel from ecli.ui.pysh_console_panel import PySHConsolePanel from ecli.ui.textops import normalize_paste_text, selection_to_text from ecli.utils.logging_config import logger, log_record_to_file_handlers @@ -129,6 +130,8 @@ def process_queues(self) -> bool: class _LightweightLinterBridge: """No-op linter bridge used by lightweight editor instances.""" + diagnostics_snapshot: Any = None + def shutdown(self) -> None: return None @@ -141,6 +144,10 @@ def run_linter(self, _code: str) -> list[Any]: def process_lsp_queue(self) -> bool: return False + def request_diagnostics_refresh(self, scope: str = "buffer") -> bool: + _ = scope + return False + class _LightweightPanelManager: """No-op panel manager used by lightweight editor instances.""" @@ -202,6 +209,22 @@ def draw_active_panel(self) -> None: } +def _hex_color_looks_green(value: str) -> bool: + """Return True when a hex colour is visibly green-dominant.""" + if not isinstance(value, str): + return False + text = value.strip().lstrip("#") + if len(text) != 6: + return False + try: + red = int(text[0:2], 16) + green = int(text[2:4], 16) + blue = int(text[4:6], 16) + except ValueError: + return False + return green > red and green >= blue and green >= 80 + + ## ==================== Ecli Class ==================== class Ecli: """Class Ecli @@ -410,6 +433,14 @@ def _handle_lint_status( self.lint_panel_message = new_panel_message_str panel_state_or_content_changed = True + if self._diagnostics_panel_is_visible(): + if self.lint_panel_active: + self.lint_panel_active = False + panel_state_or_content_changed = True + if panel_state_or_content_changed: + self._force_full_redraw = True + return + if activate_lint_panel_if_issues and self.lint_panel_message: no_issues_substrings = ["no issues found", "no linting issues"] panel_message_lower = self.lint_panel_message.strip().lower() @@ -424,6 +455,10 @@ def _handle_lint_status( if panel_state_or_content_changed and self.lint_panel_active: self._force_full_redraw = True + def _diagnostics_panel_is_visible(self) -> bool: + panel = getattr(self, "diagnostics_panel_instance", None) + return bool(panel is not None and getattr(panel, "visible", False)) + # -- Initialization and Setup --- def __init__( self, @@ -468,6 +503,7 @@ def __init__( self.current_file_path: str | None = None self.file_path: str | None = None self.filename: str | None = None + self.diagnostic_line_highlight: dict[str, Any] | None = None # --- initialize state & subsystems with clear failure boundaries --- try: @@ -552,6 +588,7 @@ def _initialize_state(self) -> None: self.search_matches: list[tuple[int, int, int]] = [] self.current_match_idx: int = -1 self.highlighted_matches: list[tuple[int, int, int]] = [] + self.diagnostic_line_highlight: dict[str, Any] | None = None self.current_language: Optional[str] = None self._lexer: Optional[TextLexer] = None self.custom_syntax_patterns: list[tuple[re.Pattern, str]] = [] @@ -570,6 +607,56 @@ def _initialize_state(self) -> None: self.internal_clipboard: str = "" self.service_registry: Optional[ServiceRegistry] = None + def set_diagnostic_line_highlight( + self, + diagnostic: Any, + *, + generation: int | None = None, + ) -> None: + """Expose the selected diagnostics-panel line to the renderer.""" + file_path = str(getattr(diagnostic, "file_path", "") or "") + if not file_path: + self.clear_diagnostic_line_highlight() + return + try: + line = max(1, int(getattr(diagnostic, "line", 1))) + except (TypeError, ValueError): + self.clear_diagnostic_line_highlight() + return + severity = str(getattr(diagnostic, "severity", "info") or "info") + if severity not in {"error", "warning", "info", "hint"}: + severity = "info" + next_highlight = { + "file_path": os.path.abspath(file_path), + "line": line, + "severity": severity, + "generation": generation, + } + if getattr(self, "diagnostic_line_highlight", None) != next_highlight: + self.diagnostic_line_highlight = next_highlight + self._force_full_redraw = True + + def clear_diagnostic_line_highlight(self) -> None: + """Clear the selected diagnostics-panel source-line marker.""" + highlight = getattr(self, "diagnostic_line_highlight", None) + self.diagnostic_line_highlight = None + if highlight is not None: + self._force_full_redraw = True + + def _clear_diagnostic_line_highlight_if_file_mismatch(self) -> None: + """Drop stale diagnostic highlighting after file switches.""" + highlight = getattr(self, "diagnostic_line_highlight", None) + if highlight is None: + self.diagnostic_line_highlight = None + return + if not isinstance(highlight, dict): + return + highlighted_path = str(highlight.get("file_path") or "") + current_path = os.path.abspath(self.filename) if self.filename else "" + if highlighted_path and current_path == highlighted_path: + return + self.clear_diagnostic_line_highlight() + # --- Component Initialization --- def _initialize_components(self) -> None: """Initializes all editor components like History, Drawer, Git, etc.""" @@ -588,6 +675,7 @@ def _initialize_components(self) -> None: self.panel_manager: Optional[PanelManager] = None self.git_panel_instance: Optional[GitPanel] = None self.file_browser_instance: Optional[FileBrowserPanel] = None + self.diagnostics_panel_instance: Optional[DiagnosticsPanel] = None self.pysh_console_panel_instance: Optional[PySHConsolePanel] = None if not self.is_lightweight: @@ -600,6 +688,7 @@ def _initialize_components(self) -> None: self.panel_manager = PanelManager(self) self.git_panel_instance = GitPanel(self.stdscr, self) self.file_browser_instance = FileBrowserPanel(self.stdscr, self) + self.diagnostics_panel_instance = DiagnosticsPanel(self.stdscr, self) self.pysh_console_panel_instance = PySHConsolePanel(self.stdscr, self) if self.file_browser_instance and self.git_panel_instance: @@ -972,6 +1061,14 @@ def toggle_file_browser(self) -> bool: # so a redraw is always required. return True + def toggle_diagnostics_panel(self) -> bool: + """Open or close the non-blocking Diagnostics panel.""" + if self.diagnostics_panel_instance and self.panel_manager: + self.panel_manager.show_panel_instance(self.diagnostics_panel_instance) + else: + self._set_status_message("Diagnostics panel not available.") + return True + def toggle_terminal_panel(self) -> bool: """Open or focus the ECLI-owned PySH Console Panel.""" panel_manager = self.panel_manager @@ -5243,6 +5340,7 @@ def open_file(self, filename_to_open: Optional[str] = None) -> bool: # noqa: py if not os.path.exists(actual_filename_to_open): self.text = [""] self.filename = None + self.clear_diagnostic_line_highlight() self.modified = False self.encoding = "utf-8" self.history.clear() @@ -5307,6 +5405,7 @@ def open_file(self, filename_to_open: Optional[str] = None) -> bool: # noqa: py self.text = lines self.filename = actual_filename_to_open + self._clear_diagnostic_line_highlight_if_file_mismatch() self.modified = False self._file_loaded_from_disk = True self.encoding = final_encoding_used @@ -5340,6 +5439,101 @@ def open_file(self, filename_to_open: Optional[str] = None) -> bool: # noqa: py # A full redraw with the error message is the main goal. return True + def goto_diagnostic(self, diagnostic: Any) -> bool: + """Navigate to a normalized diagnostic location.""" + file_path = str(getattr(diagnostic, "file_path", "") or "") + try: + target_line = max(1, int(getattr(diagnostic, "line", 1))) + target_column = max(1, int(getattr(diagnostic, "column", 1))) + except (TypeError, ValueError): + self._set_status_message("Diagnostics: invalid diagnostic location.") + return True + + if not file_path: + self._set_status_message("Diagnostics: selected item has no file path.") + return True + + label = self._diagnostic_display_label(file_path) + current_path = os.path.abspath(self.filename) if self.filename else None + target_path = os.path.abspath(file_path) + if current_path != target_path: + if not os.path.exists(target_path): + self._set_status_message( + f"Diagnostics: file not available: {label}" + ) + logging.warning( + "Diagnostic navigation failed: file not available: %s", + target_path, + ) + return True + self.open_file(target_path) + current_path = os.path.abspath(self.filename) if self.filename else None + if current_path != target_path: + self._set_status_message(f"Diagnostics: could not open {label}") + logging.warning( + "Diagnostic navigation failed: open_file did not load target: %s", + target_path, + ) + return True + + line_count = len(self.text) + if target_line > line_count: + self._set_status_message( + f"Diagnostics: line out of range for {label}:{target_line}:{target_column}" + ) + logging.warning( + "Diagnostic navigation failed: line out of range: file=%s line=%s line_count=%s", + target_path, + target_line, + line_count, + ) + return True + + line_text = self.text[target_line - 1] if self.text else "" + max_column = len(line_text) + 1 + if target_column > max_column: + self._set_status_message( + f"Diagnostics: column out of range for {label}:{target_line}:{target_column}" + ) + logging.warning( + "Diagnostic navigation failed: column out of range: file=%s line=%s column=%s max_column=%s", + target_path, + target_line, + target_column, + max_column, + ) + return True + + self.cursor_y = target_line - 1 + self.cursor_x = target_column - 1 + self._ensure_cursor_in_bounds() + self._clamp_scroll() + panel_manager = getattr(self, "panel_manager", None) + if not (panel_manager and panel_manager.is_panel_active()): + self.focus = "editor" + self._force_full_redraw = True + self._set_status_message(f"Jumped to {label}:{target_line}:{target_column}") + return True + + def _diagnostic_display_label(self, file_path: str) -> str: + """Return the path label used in diagnostics navigation status.""" + project_root = None + current_filename = getattr(self, "filename", None) + if current_filename: + current_path = Path(str(current_filename)).expanduser() + project_root = str(current_path.parent if current_path.suffix else current_path) + else: + registry = getattr(self, "service_registry", None) + project_service = getattr(registry, "project_service", None) + root = getattr(project_service, "root", None) + if root: + project_root = str(root) + return diagnostic_display_path( + file_path, + project_root=project_root, + cwd=os.getcwd(), + ) + # --- save file ------------------ def save_file(self) -> bool: """Saves the current document to its existing filename. @@ -6096,6 +6290,7 @@ def _reset_state_for_new_file(self) -> None: self.text = [""] self.filename = None + self.clear_diagnostic_line_highlight() self.encoding = "UTF-8" self.modified = False self._lexer = None @@ -7462,6 +7657,57 @@ def _init_chrome_color_pairs( self.colors[name] = curses.A_REVERSE return pair_id + def _success_foreground_hex( + self, palette: ThemePalette, theme_hex: dict[str, str] + ) -> str: + """Return a green success foreground for PASS/success UI states.""" + candidates = ( + palette.success, + theme_hex.get("git_info", ""), + palette.tag, + palette.comment, + "#2EA043", + ) + for value in candidates: + if _hex_color_looks_green(value): + return value + return palette.success or palette.foreground + + def _init_success_color_pairs( + self, + palette: ThemePalette, + theme_hex: dict[str, str], + start_pair_id: int, + can_use_256_colors: bool, + ) -> int: + """Allocate success attributes for editor, panel, and status surfaces.""" + if not can_use_256_colors: + fallback = curses.A_BOLD + self.colors["ui_success"] = fallback + self.colors["ui_panel_success"] = fallback + self.colors["ui_status_success"] = curses.A_REVERSE | curses.A_BOLD + return start_pair_id + + pair_id = start_pair_id + success_fg_hex = self._success_foreground_hex(palette, theme_hex) + surface_bgs = { + "ui_success": palette.background, + "ui_panel_success": palette.header_bg, + "ui_status_success": palette.status_bg, + } + for name, bg_hex in surface_bgs.items(): + if pair_id >= curses.COLOR_PAIRS: + self.colors[name] = self.colors.get("ui_success", curses.A_BOLD) + continue + try: + curses.init_pair(pair_id, hex_to_xterm(success_fg_hex), hex_to_xterm(bg_hex)) + self.colors[name] = curses.color_pair(pair_id) | curses.A_BOLD + pair_id += 1 + except (curses.error, Exception) as exc: # noqa: BLE001 + logging.debug("Success pair '%s' fell back to ui_success: %s", name, exc) + self.colors[name] = self.colors.get("ui_success", curses.A_BOLD) + return pair_id + def _init_current_line_variants( self, palette: ThemePalette, @@ -7555,6 +7801,8 @@ def init_colors(self) -> None: "ui_selection": curses.A_REVERSE, "ui_info": curses.A_NORMAL, "ui_success": curses.A_NORMAL, + "ui_panel_success": curses.A_NORMAL, + "ui_status_success": curses.A_REVERSE | curses.A_BOLD, "ui_warning": curses.A_BOLD, "ui_error": curses.A_BOLD, "ui_dim": curses.A_DIM, @@ -7639,7 +7887,12 @@ def init_colors(self) -> None: ) # Allocate UI-chrome pairs (header/status/footer/border/current-line). - self._init_chrome_color_pairs(palette, pair_id_counter, can_use_256_colors) + pair_id_counter = self._init_chrome_color_pairs( + palette, pair_id_counter, can_use_256_colors + ) + self._init_success_color_pairs( + palette, theme_hex, pair_id_counter, can_use_256_colors + ) # Paint the editor surface with the theme background so cleared regions # (erase / clrtoeol) match the palette instead of the terminal default. @@ -8583,6 +8836,12 @@ def _process_events_and_input(self) -> bool: # Then, attempt to read a key press from the user. key_input = self.keybinder.get_key_input() + # A worker can publish results while get_key_input() is waiting for the + # next key. Drain once more before dispatch so the key acts on current + # UI-side state, e.g. Enter on a diagnostics result that just arrived. + if self._process_all_queues(): + redraw_needed = True + # Proceed only if a valid key was received (not an error or timeout). if key_input != curses.ERR and key_input != -1: # Bracketed paste: insert the whole payload as one transaction and @@ -8617,10 +8876,9 @@ def _handle_input_dispatch(self, key_input: Any) -> bool: """Dispatches a key press to the correct handler (panel or editor). Routing order: - 1. Global Help shortcut. - 2. Global AI Code Assistant shortcut. - 3. Focused active panel. - 4. Main editor keybinder. + 1. Global Help, Quit, Diagnostics, and tool-panel shortcuts. + 2. Focused active panel. + 3. Main editor keybinder. Args: key_input: The key event received from curses. @@ -8632,6 +8890,12 @@ def _handle_input_dispatch(self, key_input: Any) -> bool: if self.keybinder.is_key_for_action(key_input, "help"): return self.handle_input(key_input) + if self.keybinder.is_key_for_action(key_input, "quit"): + return self.handle_input(key_input) + + if self.keybinder.is_key_for_action(key_input, "lint"): + return self.handle_input(key_input) + if self.keybinder.is_key_for_action(key_input, "toggle_widget_panel"): return self.handle_input(key_input) diff --git a/src/ecli/diagnostics/__init__.py b/src/ecli/diagnostics/__init__.py new file mode 100644 index 0000000..fab5619 --- /dev/null +++ b/src/ecli/diagnostics/__init__.py @@ -0,0 +1,22 @@ +"""Unified diagnostics service and provider interfaces for ECLI.""" + +from ecli.diagnostics.display import diagnostic_display_path +from ecli.diagnostics.models import ( + Diagnostic, + DiagnosticRequest, + DiagnosticResult, + DiagnosticsSnapshot, + ProviderState, +) +from ecli.diagnostics.service import DiagnosticsService + + +__all__ = [ + "Diagnostic", + "DiagnosticRequest", + "DiagnosticResult", + "DiagnosticsService", + "DiagnosticsSnapshot", + "ProviderState", + "diagnostic_display_path", +] diff --git a/src/ecli/diagnostics/display.py b/src/ecli/diagnostics/display.py new file mode 100644 index 0000000..d59666b --- /dev/null +++ b/src/ecli/diagnostics/display.py @@ -0,0 +1,96 @@ +"""Display helpers for diagnostics UI surfaces.""" + +from __future__ import annotations + +import os +from pathlib import Path + + +def diagnostic_display_path( + file_path: str | None, + *, + project_root: str | os.PathLike[str] | None = None, + cwd: str | os.PathLike[str] | None = None, +) -> str: + """Return a stable user-facing path for diagnostics. + + Absolute provider paths are reduced to a project-relative path when possible. + If the file is outside the known roots, the basename is used so diagnostics + rows never start with a terminal-width-consuming absolute path. + """ + if not file_path: + return "" + + raw_path = str(file_path) + path = Path(raw_path).expanduser() + if not path.is_absolute(): + return _normalize_separator(raw_path) + + for root in _candidate_roots(project_root=project_root, cwd=cwd): + relative = _relative_under_root(path, root) + if relative is not None: + return _normalize_separator(relative) + + return path.name or raw_path + + +def truncate_middle(text: str, max_width: int) -> str: + """Truncate text in the middle using ASCII dots.""" + if max_width <= 0: + return "" + if len(text) <= max_width: + return text + if max_width <= 3: + return text[:max_width] + keep = max_width - 3 + left = max(1, keep // 2) + right = max(1, keep - left) + return f"{text[:left]}...{text[-right:]}" + + +def truncate_end(text: str, max_width: int) -> str: + """Truncate text at the end using ASCII dots.""" + if max_width <= 0: + return "" + if len(text) <= max_width: + return text + if max_width <= 3: + return text[:max_width] + return f"{text[: max_width - 3]}..." + + +def _candidate_roots( + *, + project_root: str | os.PathLike[str] | None, + cwd: str | os.PathLike[str] | None, +) -> tuple[Path, ...]: + roots: list[Path] = [] + if project_root: + roots.append(Path(project_root).expanduser()) + roots.append(Path(cwd).expanduser() if cwd else Path.cwd()) + + unique: list[Path] = [] + seen: set[str] = set() + for root in roots: + root_abs = Path(os.path.abspath(os.fspath(root))) + key = os.fspath(root_abs) + if key not in seen: + seen.add(key) + unique.append(root_abs) + return tuple(unique) + + +def _relative_under_root(path: Path, root: Path) -> str | None: + try: + relative = os.path.relpath(os.fspath(path), os.fspath(root)) + except ValueError: + return None + if relative == ".": + return path.name + if relative == ".." or relative.startswith(f"..{os.sep}"): + return None + return relative + + +def _normalize_separator(path: str) -> str: + return path.replace(os.sep, "/").replace("\\", "/") diff --git a/src/ecli/diagnostics/models.py b/src/ecli/diagnostics/models.py new file mode 100644 index 0000000..4009521 --- /dev/null +++ b/src/ecli/diagnostics/models.py @@ -0,0 +1,131 @@ +"""Data contracts for normalized editor diagnostics.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Literal + + +DiagnosticSeverity = Literal["error", "warning", "info", "hint"] +DiagnosticScope = Literal["buffer", "workspace"] +DiagnosticStatus = Literal["idle", "running", "ready", "skipped", "error"] + +_SEVERITY_ORDER: dict[DiagnosticSeverity, int] = { + "error": 0, + "warning": 1, + "info": 2, + "hint": 3, +} + + +@dataclass(frozen=True, order=True) +class Diagnostic: + """One normalized diagnostic item shown by ECLI.""" + + file_path: str + line: int + column: int + severity: DiagnosticSeverity + code: str | None + message: str + source: str + fix_hint: str | None = None + suggested_code: str | None = None + + def sort_key(self) -> tuple[int, str, int, int, str, str, str]: + """Return the deterministic ordering key for diagnostics.""" + return ( + _SEVERITY_ORDER[self.severity], + self.file_path, + self.line, + self.column, + self.source, + self.code or "", + self.message, + ) + + +@dataclass(frozen=True) +class ProviderState: + """Visible enabled/disabled state for a diagnostics provider.""" + + name: str + enabled: bool + + +@dataclass(frozen=True) +class DiagnosticRequest: + """Immutable request handed to diagnostics providers.""" + + generation: int + scope: DiagnosticScope + file_path: str | None + text: str | None + project_root: str + language: str | None = None + + +@dataclass(frozen=True) +class DiagnosticResult: + """Diagnostics result produced by a background worker.""" + + generation: int + diagnostics: tuple[Diagnostic, ...] + status: DiagnosticStatus + message: str + provider_states: tuple[ProviderState, ...] = field(default_factory=tuple) + + +@dataclass(frozen=True) +class DiagnosticsSnapshot: + """UI-side snapshot of the last accepted diagnostics state.""" + + generation: int = 0 + diagnostics: tuple[Diagnostic, ...] = () + status: DiagnosticStatus = "idle" + message: str = "Diagnostics not run yet." + provider_states: tuple[ProviderState, ...] = () + running_generation: int | None = None + pending_generation: int | None = None + + def with_refresh_state( + self, + *, + generation: int, + pending_generation: int | None, + provider_states: tuple[ProviderState, ...], + message: str, + ) -> DiagnosticsSnapshot: + """Return a snapshot representing an active or pending refresh.""" + return DiagnosticsSnapshot( + generation=self.generation, + diagnostics=(), + status="running", + message=message, + provider_states=provider_states, + running_generation=generation, + pending_generation=pending_generation, + ) + + def with_result( + self, + result: DiagnosticResult, + *, + running_generation: int | None, + pending_generation: int | None, + ) -> DiagnosticsSnapshot: + """Return a snapshot updated with an accepted worker result.""" + return DiagnosticsSnapshot( + generation=result.generation, + diagnostics=result.diagnostics, + status=result.status, + message=result.message, + provider_states=result.provider_states, + running_generation=running_generation, + pending_generation=pending_generation, + ) + + +def sort_diagnostics(diagnostics: list[Diagnostic]) -> tuple[Diagnostic, ...]: + """Sort diagnostics according to the required deterministic contract.""" + return tuple(sorted(diagnostics, key=lambda diagnostic: diagnostic.sort_key())) diff --git a/src/ecli/diagnostics/ruff_provider.py b/src/ecli/diagnostics/ruff_provider.py new file mode 100644 index 0000000..fa5bb96 --- /dev/null +++ b/src/ecli/diagnostics/ruff_provider.py @@ -0,0 +1,322 @@ +"""Ruff diagnostics provider.""" + +from __future__ import annotations + +import json +import logging +import re +import shutil +import subprocess +from collections.abc import Callable, Sequence +from pathlib import Path +from typing import Any + +from ecli.diagnostics.models import ( + Diagnostic, + DiagnosticRequest, + DiagnosticResult, + DiagnosticSeverity, + DiagnosticStatus, + ProviderState, + sort_diagnostics, +) + + +logger = logging.getLogger(__name__) + +Runner = Callable[..., subprocess.CompletedProcess[str]] + +_TEXT_DIAGNOSTIC_RE = re.compile( + r"^(?P.+?):(?P\d+):(?P\d+):\s+" + r"(?P[A-Z]+[0-9]+[A-Z0-9]*)\s+(?P.+)$" +) +_PYTHON_SUFFIXES = frozenset({".py", ".pyi"}) +_ERROR_CODES = frozenset({"E999", "F821", "F822", "F823", "F831", "F901"}) + + +class RuffDiagnosticProvider: + """Diagnostics provider backed by ``ruff check``.""" + + name = "ruff" + + def __init__( + self, + *, + enabled: bool = True, + executable: str = "ruff", + timeout_seconds: float = 15.0, + runner: Runner = subprocess.run, + ) -> None: + """Initialize the provider with explicit subprocess dependencies.""" + self.enabled = enabled + self.executable = executable + self.timeout_seconds = timeout_seconds + self._runner = runner + + def run(self, request: DiagnosticRequest) -> DiagnosticResult: + """Run Ruff for a buffer or workspace request.""" + if request.scope == "buffer": + return self._run_buffer(request) + return self._run_workspace(request) + + def _run_buffer(self, request: DiagnosticRequest) -> DiagnosticResult: + if not _is_python_file(request.file_path, request.language): + return self._status_result( + request, + status="skipped", + message="Ruff diagnostics are only available for Python files.", + ) + if request.file_path is None: + return self._status_result( + request, + status="skipped", + message="Ruff diagnostics require a saved Python file.", + ) + executable = shutil.which(self.executable) + if executable is None: + return self._missing_ruff(request) + + command = [ + executable, + "check", + "--output-format=json", + "--stdin-filename", + request.file_path, + "-", + ] + text = request.text if request.text is not None else "" + return self._run_command(request, command, input_text=text) + + def _run_workspace(self, request: DiagnosticRequest) -> DiagnosticResult: + executable = shutil.which(self.executable) + if executable is None: + return self._missing_ruff(request) + command = [executable, "check", "--output-format=json", "."] + return self._run_command(request, command, input_text=None) + + def _run_command( + self, + request: DiagnosticRequest, + command: Sequence[str], + *, + input_text: str | None, + ) -> DiagnosticResult: + cwd = request.project_root or "." + try: + completed = self._runner( + list(command), + input=input_text, + capture_output=True, + text=True, + cwd=cwd, + timeout=self.timeout_seconds, + check=False, + ) + except subprocess.TimeoutExpired: + return self._status_result( + request, + status="error", + message=f"Ruff timed out after {self.timeout_seconds:.1f}s.", + ) + except OSError as exc: + logger.warning("Ruff execution failed: %s", exc) + return self._status_result( + request, + status="error", + message=f"Ruff execution failed: {exc}", + ) + + diagnostics = parse_ruff_output( + completed.stdout, + fallback_text=completed.stderr, + default_file=request.file_path or request.project_root, + ) + if diagnostics: + return DiagnosticResult( + generation=request.generation, + diagnostics=diagnostics, + status="ready" if completed.returncode in (0, 1) else "error", + message=f"Ruff: {len(diagnostics)} diagnostic(s).", + provider_states=(ProviderState(name=self.name, enabled=self.enabled),), + ) + + if completed.returncode in (0, 1): + return DiagnosticResult( + generation=request.generation, + diagnostics=(), + status="ready", + message="Diagnostics: PASS — no issues found.", + provider_states=(ProviderState(name=self.name, enabled=self.enabled),), + ) + + message = _first_non_empty_line(completed.stderr) or "Ruff failed." + return self._status_result( + request, + status="error", + message=message, + ) + + def _missing_ruff(self, request: DiagnosticRequest) -> DiagnosticResult: + return self._status_result( + request, + status="error", + message="Ruff executable not found in PATH.", + ) + + def _status_result( + self, + request: DiagnosticRequest, + *, + status: DiagnosticStatus, + message: str, + ) -> DiagnosticResult: + return DiagnosticResult( + generation=request.generation, + diagnostics=(), + status=status, + message=message, + provider_states=(ProviderState(name=self.name, enabled=self.enabled),), + ) + + +def parse_ruff_output( + stdout: str, + *, + fallback_text: str = "", + default_file: str = "", +) -> tuple[Diagnostic, ...]: + """Parse Ruff JSON output, with deterministic text-output fallback.""" + stripped = stdout.strip() + if stripped: + try: + raw = json.loads(stripped) + except json.JSONDecodeError: + logger.debug("Ruff JSON parse failed; falling back to text parser.") + else: + if isinstance(raw, list): + return _parse_json_items(raw, default_file=default_file) + + return _parse_text_output( + "\n".join(part for part in (stdout, fallback_text) if part), + default_file=default_file, + ) + + +def _parse_json_items(raw: list[Any], *, default_file: str) -> tuple[Diagnostic, ...]: + diagnostics: list[Diagnostic] = [] + for item in raw: + if not isinstance(item, dict): + logger.warning("Dropping malformed Ruff diagnostic item: %r", item) + continue + try: + code = _optional_code(item.get("code")) + location = item.get("location") + if not isinstance(location, dict): + location = {} + file_path = str(item.get("filename") or default_file) + fix_hint = _fix_hint(item.get("fix")) + suggested_code = _suggested_code(item.get("fix")) + diagnostics.append( + Diagnostic( + file_path=file_path, + line=_positive_int(location.get("row")), + column=_positive_int(location.get("column")), + severity=_severity_for_code(code), + code=code, + message=str(item.get("message") or "Ruff diagnostic"), + source="ruff", + fix_hint=fix_hint, + suggested_code=suggested_code, + ) + ) + except (TypeError, ValueError) as exc: + logger.warning("Dropping malformed Ruff diagnostic: %s", exc) + return sort_diagnostics(diagnostics) + + +def _parse_text_output(text: str, *, default_file: str) -> tuple[Diagnostic, ...]: + diagnostics: list[Diagnostic] = [] + for line in text.splitlines(): + match = _TEXT_DIAGNOSTIC_RE.match(line.strip()) + if not match: + continue + code = match.group("code") + diagnostics.append( + Diagnostic( + file_path=match.group("file") or default_file, + line=int(match.group("line")), + column=int(match.group("column")), + severity=_severity_for_code(code), + code=code, + message=match.group("message").strip(), + source="ruff", + ) + ) + return sort_diagnostics(diagnostics) + + +def _is_python_file(file_path: str | None, language: str | None) -> bool: + if language == "python": + return True + if file_path is None: + return False + return Path(file_path).suffix.lower() in _PYTHON_SUFFIXES + + +def _severity_for_code(code: str | None) -> DiagnosticSeverity: + if code and (code in _ERROR_CODES or code.startswith("E9")): + return "error" + return "warning" + + +def _optional_code(raw_code: Any) -> str | None: + if raw_code is None: + return None + if isinstance(raw_code, str): + stripped = raw_code.strip() + return stripped or None + text = str(raw_code).strip() + return text or None + + +def _positive_int(value: Any) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + return 1 + return max(1, parsed) + + +def _fix_hint(raw_fix: Any) -> str | None: + if not isinstance(raw_fix, dict): + return None + message = raw_fix.get("message") + if isinstance(message, str) and message: + return message + return "Fix available" + + +def _suggested_code(raw_fix: Any) -> str | None: + if not isinstance(raw_fix, dict): + return None + edits = raw_fix.get("edits") + if not isinstance(edits, list): + return None + snippets: list[str] = [] + for edit in edits: + if not isinstance(edit, dict): + continue + content = edit.get("content") + if isinstance(content, str) and content: + snippets.append(content) + if not snippets: + return None + return "\n".join(snippets) + + +def _first_non_empty_line(text: str) -> str | None: + for line in text.splitlines(): + stripped = line.strip() + if stripped: + return stripped + return None diff --git a/src/ecli/diagnostics/service.py b/src/ecli/diagnostics/service.py new file mode 100644 index 0000000..157e6f0 --- /dev/null +++ b/src/ecli/diagnostics/service.py @@ -0,0 +1,223 @@ +"""Provider registry and bounded async diagnostics execution.""" + +from __future__ import annotations + +import logging +import queue +import threading +from typing import Protocol + +from ecli.diagnostics.models import ( + Diagnostic, + DiagnosticRequest, + DiagnosticResult, + DiagnosticsSnapshot, + ProviderState, + sort_diagnostics, +) + + +logger = logging.getLogger(__name__) + + +class DiagnosticProvider(Protocol): + """Protocol implemented by diagnostics providers.""" + + name: str + enabled: bool + + def run(self, request: DiagnosticRequest) -> DiagnosticResult: + """Run diagnostics for the given request.""" + + +class DiagnosticsService: + """Registry plus one-in-flight diagnostics worker scheduler.""" + + def __init__(self) -> None: + """Initialize an empty provider registry and worker state.""" + self._providers: dict[str, DiagnosticProvider] = {} + self._lock = threading.RLock() + self._generation = 0 + self._in_flight = False + self._running_generation: int | None = None + self._pending_request: DiagnosticRequest | None = None + self._results: queue.Queue[DiagnosticResult] = queue.Queue() + + def register_provider(self, provider: DiagnosticProvider) -> None: + """Register or replace a provider by name.""" + with self._lock: + self._providers[provider.name] = provider + + def set_provider_enabled(self, provider_name: str, enabled: bool) -> bool: + """Set a provider enabled flag; return False if it is unknown.""" + with self._lock: + provider = self._providers.get(provider_name) + if provider is None: + return False + provider.enabled = enabled + return True + + def provider_states(self) -> tuple[ProviderState, ...]: + """Return deterministic provider enabled/disabled state.""" + with self._lock: + return tuple( + ProviderState(name=name, enabled=provider.enabled) + for name, provider in sorted(self._providers.items()) + ) + + def request_refresh( + self, + *, + scope: str, + file_path: str | None, + text: str | None, + project_root: str, + language: str | None, + ) -> tuple[int, bool, int | None]: + """Schedule diagnostics refresh and return generation/coalescing state. + + Returns: + A tuple ``(generation, started_now, pending_generation)``. At most + one worker runs at a time. While a worker is active, the newest + request replaces any older pending request. + """ + if scope not in ("buffer", "workspace"): + raise ValueError(f"unsupported diagnostics scope: {scope}") + + with self._lock: + self._generation += 1 + request = DiagnosticRequest( + generation=self._generation, + scope=scope, # type: ignore[arg-type] + file_path=file_path, + text=text, + project_root=project_root, + language=language, + ) + if self._in_flight: + self._pending_request = request + return request.generation, False, request.generation + + self._start_worker_locked(request) + return request.generation, True, None + + def drain_results(self) -> list[DiagnosticResult]: + """Drain completed worker results without blocking the caller.""" + results: list[DiagnosticResult] = [] + while True: + try: + results.append(self._results.get_nowait()) + except queue.Empty: + return results + + def worker_state(self) -> tuple[int | None, int | None]: + """Return running and pending generations.""" + with self._lock: + pending = ( + self._pending_request.generation + if self._pending_request is not None + else None + ) + return self._running_generation, pending + + def _start_worker_locked(self, request: DiagnosticRequest) -> None: + self._in_flight = True + self._running_generation = request.generation + worker = threading.Thread( + target=self._run_worker, + args=(request,), + name=f"DiagnosticsWorker-{request.generation}", + daemon=True, + ) + worker.start() + + def _run_worker(self, request: DiagnosticRequest) -> None: + try: + result = self._run_providers(request) + except Exception as exc: + logger.exception( + "Diagnostics worker crashed for generation %s", + request.generation, + ) + result = DiagnosticResult( + generation=request.generation, + diagnostics=( + Diagnostic( + file_path=request.file_path or request.project_root, + line=1, + column=1, + severity="error", + code="DIAGNOSTICS-WORKER", + message=f"Diagnostics worker failed: {exc}", + source="ecli", + ), + ), + status="error", + message="Diagnostics worker failed. See logs/editor.log.", + provider_states=self.provider_states(), + ) + self._results.put(result) + with self._lock: + next_request = self._pending_request + self._pending_request = None + if next_request is not None: + self._start_worker_locked(next_request) + else: + self._in_flight = False + self._running_generation = None + + def _run_providers(self, request: DiagnosticRequest) -> DiagnosticResult: + enabled = self._enabled_providers() + if not enabled: + return DiagnosticResult( + generation=request.generation, + diagnostics=(), + status="skipped", + message="Diagnostics providers are disabled.", + provider_states=self.provider_states(), + ) + + diagnostics: list[Diagnostic] = [] + messages: list[str] = [] + status = "ready" + saw_skipped = False + for provider in enabled: + result = provider.run(request) + diagnostics.extend(result.diagnostics) + if result.status == "error": + status = "error" + elif result.status == "skipped" and status != "error": + saw_skipped = True + if result.message: + messages.append(result.message) + + sorted_diagnostics = sort_diagnostics(diagnostics) + if status == "error": + message = messages[-1] if messages else "Diagnostics failed." + elif sorted_diagnostics: + message = f"Diagnostics: {len(sorted_diagnostics)} issue(s)." + elif saw_skipped: + status = "skipped" + message = messages[-1] if messages else "Diagnostics skipped." + else: + message = "Diagnostics: PASS — no issues found." + return DiagnosticResult( + generation=request.generation, + diagnostics=sorted_diagnostics, + status=status, + message=message, + provider_states=self.provider_states(), + ) + + def _enabled_providers(self) -> tuple[DiagnosticProvider, ...]: + with self._lock: + return tuple( + provider + for _name, provider in sorted(self._providers.items()) + if provider.enabled + ) + + +def initial_snapshot(provider_states: tuple[ProviderState, ...]) -> DiagnosticsSnapshot: + """Return the initial diagnostics UI snapshot.""" + return DiagnosticsSnapshot(provider_states=provider_states) diff --git a/src/ecli/integrations/LinterBridge.py b/src/ecli/integrations/LinterBridge.py index d17a919..0a9af56 100755 --- a/src/ecli/integrations/LinterBridge.py +++ b/src/ecli/integrations/LinterBridge.py @@ -36,8 +36,13 @@ import threading import traceback import types +from pathlib import Path from typing import TYPE_CHECKING, Any, Optional +from ecli.diagnostics.models import DiagnosticsSnapshot +from ecli.diagnostics.ruff_provider import RuffDiagnosticProvider +from ecli.diagnostics.service import DiagnosticsService, initial_snapshot + if TYPE_CHECKING: from ecli.core.Ecli import Ecli @@ -63,6 +68,16 @@ def __init__(self, editor: "Ecli") -> None: self._shutting_down: bool = False self.lsp_seq_id: int = 0 self.lsp_doc_versions: dict[str, int] = {} + linter_config = self.editor.config.get("linter", {}) + ruff_enabled = bool(linter_config.get("enabled", True)) + self.diagnostics_service = DiagnosticsService() + self.diagnostics_service.register_provider( + RuffDiagnosticProvider(enabled=ruff_enabled) + ) + self.diagnostics_snapshot: DiagnosticsSnapshot = initial_snapshot( + self.diagnostics_service.provider_states() + ) + self._latest_diagnostics_request_generation = 0 # --- State for DevOps linters --- self.HAS_DEVOPS_LINTERS: bool = ( @@ -77,6 +92,92 @@ def __init__(self, editor: "Ecli") -> None: logging.error("Found 'lint_devops' but failed to import it: %s", e) self.HAS_DEVOPS_LINTERS = False + def request_diagnostics_refresh(self, scope: str = "buffer") -> bool: + """Schedule a bounded background diagnostics refresh.""" + file_path = self._current_file_path() + language = self._current_language(file_path) + text = os.linesep.join(self.editor.text) + project_root = str(self._project_root(file_path)) + generation, started, pending_generation = ( + self.diagnostics_service.request_refresh( + scope=scope, + file_path=file_path, + text=text, + project_root=project_root, + language=language, + ) + ) + self._latest_diagnostics_request_generation = generation + provider_states = self.diagnostics_service.provider_states() + if started: + msg = ( + "Diagnostics refresh started." + if scope == "buffer" + else "Workspace diagnostics refresh started." + ) + else: + msg = "Diagnostics refresh coalesced; newest request will run next." + self.diagnostics_snapshot = self.diagnostics_snapshot.with_refresh_state( + generation=generation, + pending_generation=pending_generation, + provider_states=provider_states, + message=msg, + ) + self.editor._set_status_message(msg) + self.editor._force_full_redraw = True + logging.info( + "Diagnostics refresh requested: generation=%s scope=%s started=%s pending=%s", + generation, + scope, + started, + pending_generation, + ) + return True + + def process_diagnostics_queue(self) -> bool: + """Apply completed diagnostics worker results on the UI thread.""" + changed = False + for result in self.diagnostics_service.drain_results(): + if result.generation < self._latest_diagnostics_request_generation: + logging.info( + "Dropping stale diagnostics result generation=%s latest=%s", + result.generation, + self._latest_diagnostics_request_generation, + ) + changed = True + continue + running, pending = self.diagnostics_service.worker_state() + self.diagnostics_snapshot = self.diagnostics_snapshot.with_result( + result, + running_generation=running, + pending_generation=pending, + ) + self.editor._set_status_message(result.message) + self.editor._force_full_redraw = True + changed = True + return changed + + def _current_file_path(self) -> str | None: + for attribute in ("filename", "current_file_path", "file_path"): + value = getattr(self.editor, attribute, None) + if value: + return str(value) + return None + + def _current_language(self, file_path: str | None) -> str | None: + language = getattr(self.editor, "current_language", None) + if language: + return str(language) + if file_path and Path(file_path).suffix.lower() in {".py", ".pyi"}: + return "python" + return None + + def _project_root(self, file_path: str | None) -> Path: + if file_path: + path = Path(file_path).expanduser() + return path.parent if path.suffix else path + return Path.cwd() + def run_linter(self, code: Optional[str] = None) -> bool: """Acts as the primary dispatcher for all linting operations. @@ -308,7 +409,7 @@ def _lsp_reader_loop(self) -> None: def process_lsp_queue(self) -> bool: """Processes all pending messages from the internal LSP server queue.""" - changed = False + changed = self.process_diagnostics_queue() while not self.lsp_message_q.empty(): try: message = self.lsp_message_q.get_nowait() diff --git a/src/ecli/ui/DrawScreen.py b/src/ecli/ui/DrawScreen.py index fc9a9aa..b8090ee 100755 --- a/src/ecli/ui/DrawScreen.py +++ b/src/ecli/ui/DrawScreen.py @@ -706,6 +706,7 @@ def _draw_line_numbers(self) -> None: current_num_color = self.colors.get( "ui_current_line_number", line_num_color | curses.A_BOLD ) + diagnostic_line = self._active_diagnostic_highlight_line() cursor_y = self.editor.cursor_y # Iterating over visible lines on the screen for screen_row in range(self.editor.visible_lines): @@ -715,7 +716,10 @@ def _draw_line_numbers(self) -> None: # Checking if this line exists in self.text if line_idx < len(self.editor.text): is_current = line_idx == cursor_y - color = current_num_color if is_current else line_num_color + if diagnostic_line is not None and line_idx == diagnostic_line: + color = self._diagnostic_line_number_attr(line_num_color) + else: + color = current_num_color if is_current else line_num_color # Formatting the line number (1-based) line_num_str = ( f"{line_idx + 1:>{max_line_num_digits}} " # Right-aligning + space @@ -738,6 +742,41 @@ def _draw_line_numbers(self) -> None: f"Curses error drawing empty line number background at ({draw_y}, {gutter_x}): {e}" ) + def _active_diagnostic_highlight_line(self) -> int | None: + """Return the selected diagnostic line index for the current file.""" + highlight = getattr(self.editor, "diagnostic_line_highlight", None) + if not isinstance(highlight, dict): + return None + highlighted_path = str(highlight.get("file_path") or "") + if not highlighted_path: + return None + current_filename = getattr(self.editor, "filename", None) + if not current_filename: + return None + if os.path.abspath(str(current_filename)) != highlighted_path: + return None + try: + line_number = int(highlight.get("line", 0)) + except (TypeError, ValueError): + return None + if line_number < 1: + return None + return line_number - 1 + + def _diagnostic_line_number_attr(self, fallback: int) -> int: + """Return the severity-specific gutter attribute for selected diagnostic.""" + highlight = getattr(self.editor, "diagnostic_line_highlight", None) + severity = "" + if isinstance(highlight, dict): + severity = str(highlight.get("severity") or "") + if severity == "error": + return self.colors.get("ui_error", fallback | curses.A_BOLD) + if severity == "warning": + return self.colors.get("ui_warning", fallback | curses.A_BOLD) + if severity in {"info", "hint"}: + return self.colors.get("ui_info", fallback | curses.A_BOLD) + return fallback | curses.A_BOLD + def _draw_lint_panel(self) -> None: """Draws a popup panel with the linter's results.""" if not getattr(self.editor, "lint_panel_active", False): @@ -1237,6 +1276,10 @@ def _draw_status_bar(self) -> None: "ui_error", self.colors.get("status_error", curses.A_REVERSE | curses.A_BOLD), ) + c_success = self.colors.get( + "ui_status_success", + self.colors.get("ui_success", c_norm | curses.A_BOLD), + ) c_git = self.colors.get("ui_success", c_norm) c_dirty = self.colors.get("ui_warning", c_norm | curses.A_BOLD) @@ -1306,6 +1349,9 @@ def _draw_status_bar(self) -> None: if "error" in msg.lower(): err_x = left_w + pad_left self.stdscr.chgat(y, err_x, msg_w, c_err) + elif msg.startswith("Diagnostics: PASS"): + pass_x = left_w + pad_left + self.stdscr.chgat(y, pass_x, msg_w, c_success) except curses.error: pass # drawing outside screen diff --git a/src/ecli/ui/KeyBinder.py b/src/ecli/ui/KeyBinder.py index d363bba..8223cd6 100755 --- a/src/ecli/ui/KeyBinder.py +++ b/src/ecli/ui/KeyBinder.py @@ -867,7 +867,7 @@ def _setup_action_map(self) -> dict[int | str, Callable[..., Any]]: if not self.editor.is_lightweight: if self.editor.linter_bridge: - action_to_method_map["lint"] = self.editor.run_lint_async + action_to_method_map["lint"] = self.editor.toggle_diagnostics_panel action_to_method_map["show_lint_panel"] = self.editor.show_lint_panel if self.editor.async_engine: diff --git a/src/ecli/ui/PanelManager.py b/src/ecli/ui/PanelManager.py index dc2cef7..91b1dce 100755 --- a/src/ecli/ui/PanelManager.py +++ b/src/ecli/ui/PanelManager.py @@ -31,6 +31,7 @@ AiResponsePanel, BasePanel, CommandPlanPanel, + DiagnosticsPanel, FileBrowserPanel, ServicesPanel, SystemDoctorPanel, @@ -87,6 +88,7 @@ def __init__(self, editor_instance: "Ecli"): self.registered_panels: dict[str, type[BasePanel]] = { "ai_response": AiResponsePanel, "command_plan": CommandPlanPanel, + "diagnostics": DiagnosticsPanel, "file_browser": FileBrowserPanel, "pysh_console": PySHConsolePanel, "services_status": ServicesPanel, diff --git a/src/ecli/ui/panels.py b/src/ecli/ui/panels.py index 49d2de7..eff7ac3 100755 --- a/src/ecli/ui/panels.py +++ b/src/ecli/ui/panels.py @@ -56,6 +56,12 @@ import pyperclip from wcwidth import wcswidth +from ecli.diagnostics.display import ( + diagnostic_display_path, + truncate_end, + truncate_middle, +) +from ecli.diagnostics.models import Diagnostic, DiagnosticsSnapshot from ecli.integrations.GitBridge import GitBridge, GitCommandResult from ecli.ui.design import TuiDesign from ecli.ui.geometry import centered_modal_rect, compute_layout @@ -87,6 +93,7 @@ "system_doctor", "ai_provider", "command_plan", + "diagnostics", "services", "terminal", # PySH Console Panel (F11) } @@ -1740,6 +1747,10 @@ def __init__( ) self.attr_warning = self.editor.colors.get("ui_panel_warning", curses.A_BOLD) self.attr_error = self.editor.colors.get("ui_panel_error", curses.A_BOLD) + self.attr_success = self.editor.colors.get( + "ui_panel_success", + self.editor.colors.get("ui_success", curses.A_BOLD), + ) def resize(self) -> None: """Handle terminal resize by recreating the backing panel window.""" @@ -2106,6 +2117,539 @@ def _service_lines(self) -> list[str]: return lines +class DiagnosticsPanel(_ReadOnlyRightPanel): + """Right-side diagnostics list panel backed by the diagnostics service.""" + + title = " Diagnostics " + panel_kind = "diagnostics" + close_key = getattr(curses, "KEY_F4", 268) + _SEVERITY_LABELS = { + "error": "E", + "warning": "W", + "info": "I", + "hint": "H", + } + + def __init__( + self, stdscr: CursesWindow, main_editor_instance: Ecli, **kwargs: Any + ) -> None: + super().__init__(stdscr, main_editor_instance, **kwargs) + self.selected_idx = 0 + self._last_navigation_message: str | None = None + self._details_generation: int | None = None + self._details_key: tuple[object, ...] | None = None + self._details_diagnostic: Diagnostic | None = None + + def open(self) -> None: + """Open without starting a linter run.""" + super().open() + self._clear_legacy_lint_popup() + self._sync_selected_diagnostic_highlight(self._snapshot(), self._diagnostics()) + self.editor._set_status_message( + "Diagnostics: r run file, R run workspace, Enter jump, d details" + ) + + def close(self) -> None: + """Close the panel and any selected-diagnostic details popup.""" + self._clear_details_popup() + self._clear_diagnostic_line_highlight() + self._clear_legacy_lint_popup() + super().close() + + def handle_key(self, key: Any) -> bool: + """Handle diagnostics panel navigation and explicit refresh actions.""" + if self._details_is_open() and key == 27: + self._clear_details_popup() + self.editor._set_status_message("Diagnostic details closed.") + return True + if self._handle_close_or_focus_key(key): + return True + if self._matches_key(key, "r"): + return self._request_refresh("buffer") + if self._matches_key(key, "R"): + return self._request_refresh("workspace") + if self._matches_key(key, "d") or key in (ord(" "), " "): + self._open_details_popup() + return True + if key in (curses.KEY_UP, ord("k"), "k"): + self._move_selection(-1) + return True + if key in (curses.KEY_DOWN, ord("j"), "j"): + self._move_selection(1) + return True + if key == curses.KEY_PPAGE: + self._move_selection(-max(1, self.height - 4)) + return True + if key == curses.KEY_NPAGE: + self._move_selection(max(1, self.height - 4)) + return True + if key in (curses.KEY_ENTER, 10, 13, "\n"): + self._jump_to_selected() + return True + return False + + def draw(self) -> None: + """Render diagnostics state and current selection.""" + if not self.visible or self.win is None: + return + if self.height < 5 or self.width < 25: + return + self.win.erase() + self._draw_frame(self._footer()) + + snapshot = self._snapshot() + diagnostics = self._diagnostics() + self._invalidate_stale_details(snapshot, diagnostics) + self.selected_idx = min(self.selected_idx, max(0, len(diagnostics) - 1)) + self._sync_selected_diagnostic_highlight(snapshot, diagnostics) + status_lines, viewport_height = self._status_lines_with_viewport( + snapshot, diagnostics + ) + if diagnostics: + self.scroll = min( + max(0, self.scroll), + max(0, len(diagnostics) - viewport_height), + ) + if self.selected_idx < self.scroll: + self.scroll = self.selected_idx + elif self.selected_idx >= self.scroll + viewport_height: + self.scroll = self.selected_idx - viewport_height + 1 + else: + self.scroll = 0 + + for row_offset, line in enumerate(status_lines): + try: + self.win.addnstr( + row_offset + 1, + 1, + line, + max(1, self.width - 2), + self._line_attr(line), + ) + except curses.error: + pass + + start_row = 1 + len(status_lines) + rows = [self._format_diagnostic(diagnostic) for diagnostic in diagnostics] + for row_offset, line in enumerate(rows[self.scroll : self.scroll + viewport_height]): + row = start_row + row_offset + absolute_idx = self.scroll + row_offset + attr = self._line_attr(line) + if diagnostics and absolute_idx == self.selected_idx: + attr |= self.attr_selected + try: + self.win.addnstr(row, 1, line, max(1, self.width - 2), attr) + except curses.error: + pass + self._refresh_window() + self._draw_details_popup() + + def _handle_close_or_focus_key(self, key: Any) -> bool: + if key in (self.close_key, 27, ord("q"), "q"): + self._clear_details_popup() + self._clear_diagnostic_line_highlight() + self._clear_legacy_lint_popup() + if hasattr(self.editor, "panel_manager") and self.editor.panel_manager: + self.editor.panel_manager.close_active_panel() + else: + self.close() + return True + if key == getattr(curses, "KEY_F12", 276): + if hasattr(self.editor, "toggle_focus"): + self.editor.toggle_focus() + return True + return False + + def _request_refresh(self, scope: str) -> bool: + bridge = getattr(self.editor, "linter_bridge", None) + requester = getattr(bridge, "request_diagnostics_refresh", None) + if not callable(requester): + self.editor._set_status_message("Diagnostics service is not available.") + return True + self._clear_details_popup() + self._clear_diagnostic_line_highlight() + self._clear_legacy_lint_popup() + self._last_navigation_message = None + return bool(requester(scope=scope)) + + def _move_selection(self, delta: int) -> None: + diagnostics = self._diagnostics() + if not diagnostics: + self.selected_idx = 0 + return + new_idx = max(0, min(len(diagnostics) - 1, self.selected_idx + delta)) + if new_idx != self.selected_idx: + self._last_navigation_message = None + self._clear_details_popup() + self.selected_idx = new_idx + self._sync_selected_diagnostic_highlight(self._snapshot(), diagnostics) + + def _jump_to_selected(self) -> None: + diagnostic = self._selected_diagnostic() + if diagnostic is None: + self.editor._set_status_message("Diagnostics: no diagnostic selected.") + return + navigator = getattr(self.editor, "goto_diagnostic", None) + if callable(navigator): + before = getattr(self.editor, "status_message", "") + navigator(diagnostic) + after = getattr(self.editor, "status_message", "") + if isinstance(after, str) and after and after != before: + self._last_navigation_message = after + return + self.editor._set_status_message("Diagnostics navigation is not available.") + + def _selected_diagnostic(self) -> Diagnostic | None: + diagnostics = self._diagnostics() + if not diagnostics: + return None + self.selected_idx = max(0, min(len(diagnostics) - 1, self.selected_idx)) + return diagnostics[self.selected_idx] + + def _status_lines_with_viewport( + self, + snapshot: DiagnosticsSnapshot | None, + diagnostics: tuple[Diagnostic, ...], + ) -> tuple[list[str], int]: + showing: tuple[int, int] | None = None + status_lines = self._status_lines(snapshot, diagnostics, showing) + while True: + viewport_height = max(0, self.height - 2 - len(status_lines)) + visible_count = min(len(diagnostics), viewport_height) + next_showing = ( + (visible_count, len(diagnostics)) + if diagnostics and visible_count < len(diagnostics) + else None + ) + next_status_lines = self._status_lines( + snapshot, diagnostics, next_showing + ) + if next_showing == showing and len(next_status_lines) == len(status_lines): + return next_status_lines, viewport_height + showing = next_showing + status_lines = next_status_lines + + def _status_lines( + self, + snapshot: DiagnosticsSnapshot | None, + diagnostics: tuple[Diagnostic, ...], + showing: tuple[int, int] | None, + ) -> list[str]: + if snapshot is None or snapshot.status == "idle": + return [ + "Diagnostics not run yet.", + "Press r to run diagnostics for this file.", + ] + if snapshot.status == "running": + return ["Running diagnostics..."] + if snapshot.status == "skipped": + return [f"Diagnostics skipped: {self._status_reason(snapshot)}"] + if snapshot.status == "error": + return [f"Diagnostics failed: {self._status_reason(snapshot)}"] + if diagnostics: + lines = [f"Diagnostics: {len(diagnostics)} issue(s)."] + if showing is not None: + visible_count, total_count = showing + lines.append(f"Showing {visible_count}/{total_count} diagnostics") + return lines + return ["Diagnostics: PASS", "No issues found."] + + def _status_reason(self, snapshot: DiagnosticsSnapshot) -> str: + message = " ".join(str(snapshot.message or "").split()) + for prefix in ( + "Diagnostics skipped:", + "Diagnostics failed:", + "Diagnostics error:", + ): + if message.startswith(prefix): + message = message[len(prefix) :].strip() + break + return message or "unknown reason" + + def _line_attr(self, line: str) -> int: + if line.startswith("E "): + return self.attr_error + if line.startswith("W "): + return self.attr_warning + if line.startswith(("I ", "H ")): + return self.attr_dim + if line.startswith("Diagnostics failed:"): + return self.attr_error + if line.startswith("Diagnostics skipped:"): + return self.attr_warning + if line.startswith("Diagnostics: PASS"): + return self.attr_success + if line.startswith(("Press ", "Showing ")): + return self.attr_dim + return super()._line_attr(line) + + def _format_diagnostic(self, diagnostic: Diagnostic) -> str: + severity = self._SEVERITY_LABELS.get(diagnostic.severity, "?") + source = truncate_end(str(diagnostic.source or "?"), 12) + prefix = f"{severity} {source} " + path = self._display_path(diagnostic.file_path) + location_suffix = f":{diagnostic.line}:{diagnostic.column}" + message = self._row_message(diagnostic) + content_width = max(1, self.width - 2) + + full_row = f"{prefix}{path}{location_suffix} {message}" + if len(full_row) <= content_width: + return full_row + return self._fit_diagnostic_row( + prefix=prefix, + path=path, + location_suffix=location_suffix, + message=message, + content_width=content_width, + ) + + def _fit_diagnostic_row( + self, + *, + prefix: str, + path: str, + location_suffix: str, + message: str, + content_width: int, + ) -> str: + if content_width <= len(prefix): + return truncate_end(prefix, content_width) + + usable = content_width - len(prefix) + full_location = f"{path}{location_suffix}" + if usable <= len(location_suffix) + 2: + return truncate_end(f"{prefix}{full_location}", content_width) + + min_message_width = min(len(message), max(7, content_width // 4)) + location_width = min( + len(full_location), + max(len(location_suffix) + 1, usable - min_message_width - 1), + ) + message_width = max(1, usable - location_width - 1) + if message_width < min_message_width and location_width > len(location_suffix) + 1: + deficit = min_message_width - message_width + shrink_by = min(deficit, location_width - len(location_suffix) - 1) + location_width -= shrink_by + message_width += shrink_by + + location = self._fit_location(path, location_suffix, location_width) + message_width = max(1, usable - len(location) - 1) + row = f"{prefix}{location} {truncate_end(message, message_width)}" + return truncate_end(row, content_width) + + def _fit_location(self, path: str, suffix: str, max_width: int) -> str: + location = f"{path}{suffix}" + if len(location) <= max_width: + return location + basename = Path(path).name or path + basename_location = f"{basename}{suffix}" + if len(basename_location) <= max_width: + return basename_location + path_width = max(1, max_width - len(suffix)) + return f"{truncate_middle(basename, path_width)}{suffix}" + + def _row_message(self, diagnostic: Diagnostic) -> str: + message = " ".join(str(diagnostic.message or "Diagnostic").split()) + return message or "Diagnostic" + + def _display_path(self, file_path: str | None) -> str: + return diagnostic_display_path( + file_path, + project_root=self._diagnostics_project_root(), + cwd=os.getcwd(), + ) + + def _diagnostics_project_root(self) -> str | None: + filename = getattr(self.editor, "filename", None) + if filename: + path = Path(str(filename)).expanduser() + return str(path.parent if path.suffix else path) + registry = getattr(self.editor, "service_registry", None) + project_service = getattr(registry, "project_service", None) + root = getattr(project_service, "root", None) + return str(root) if root else None + + def _diagnostics(self) -> tuple[Diagnostic, ...]: + snapshot = self._snapshot() + if snapshot is None: + return () + return snapshot.diagnostics + + def _snapshot(self) -> DiagnosticsSnapshot | None: + bridge = getattr(self.editor, "linter_bridge", None) + snapshot = getattr(bridge, "diagnostics_snapshot", None) + if isinstance(snapshot, DiagnosticsSnapshot): + return snapshot + return None + + def _footer(self) -> str: + full = " r:Run file R:Run workspace Enter:Jump d:Details Esc/F4/q:Close " + compact = " r:Run R:Workspace Enter:Jump d:Details Esc/F4/q:Close " + return full if len(full) <= self.width - 4 else compact + + def _open_details_popup(self) -> None: + diagnostic = self._selected_diagnostic() + snapshot = self._snapshot() + if diagnostic is None or snapshot is None: + self.editor._set_status_message("Diagnostics: no diagnostic selected.") + return + self._details_generation = snapshot.generation + self._details_key = self._diagnostic_key(diagnostic) + self._details_diagnostic = diagnostic + self._clear_legacy_lint_popup() + self.editor._set_status_message("Diagnostic details opened.") + if hasattr(self.editor, "_force_full_redraw"): + self.editor._force_full_redraw = True + + def _draw_details_popup(self) -> None: + if not self._details_is_open(): + return + diagnostic = self._details_diagnostic + if diagnostic is None: + return + modal_width = min(max(52, self.term_width // 2), max(20, self.term_width - 4)) + content_width = max(1, modal_width - 4) + popup_lines = self._details_lines(diagnostic, content_width) + modal_height = min( + max(9, len(popup_lines) + 4), + max(5, self.term_height - 2), + ) + rect = centered_modal_rect( + self.term_width, + self.term_height, + modal_width, + modal_height, + ) + try: + win = curses.newwin(rect.height, rect.width, rect.y, rect.x) + win.keypad(True) + self._make_opaque(win) + win.border() + title = "Diagnostic details" + win.addnstr(0, 2, f" {title} ", max(1, rect.width - 4), self.attr_title) + for row_offset, line in enumerate(popup_lines[: rect.height - 3]): + win.addnstr( + row_offset + 1, + 2, + line, + content_width, + self.attr_text, + ) + win.addnstr( + rect.height - 2, + 2, + "Esc: close", + content_width, + self.attr_dim, + ) + self._present(win) + except curses.error: + pass + + def _details_lines( + self, diagnostic: Diagnostic, content_width: int + ) -> list[str]: + path = self._display_path(diagnostic.file_path) + lines = [ + f"File: {path}", + f"Location: {diagnostic.line}:{diagnostic.column}", + f"Source: {diagnostic.source or '?'}", + ] + if diagnostic.code: + lines.append(f"Code: {diagnostic.code}") + message = self._row_message(diagnostic) + lines.extend(self._wrapped_detail("Message: ", message, content_width)) + if diagnostic.fix_hint: + lines.extend( + self._wrapped_detail("Fix hint: ", diagnostic.fix_hint, content_width) + ) + if diagnostic.suggested_code: + code_shape = " ".join(str(diagnostic.suggested_code).split()) + lines.extend(self._wrapped_detail("Suggested: ", code_shape, content_width)) + lines.append("Preview only. No changes were applied.") + return lines + + def _wrapped_detail(self, prefix: str, value: str, width: int) -> list[str]: + width = max(20, min(width, 96)) + text = " ".join(str(value).split()) + wrapped = textwrap.wrap(text, width=max(1, width - len(prefix))) or [""] + lines = [f"{prefix}{wrapped[0]}"] + indent = " " * len(prefix) + lines.extend(f"{indent}{line}" for line in wrapped[1:]) + return lines + + def _details_is_open(self) -> bool: + return self._details_generation is not None + + def _clear_details_popup(self) -> None: + self._details_generation = None + self._details_key = None + self._details_diagnostic = None + if hasattr(self.editor, "_force_full_redraw"): + self.editor._force_full_redraw = True + + def _invalidate_stale_details( + self, + snapshot: DiagnosticsSnapshot | None, + diagnostics: tuple[Diagnostic, ...], + ) -> None: + if not self._details_is_open(): + return + if snapshot is None or snapshot.generation != self._details_generation: + self._clear_details_popup() + return + if not diagnostics: + self._clear_details_popup() + return + selected = self._selected_diagnostic() + if selected is None or self._diagnostic_key(selected) != self._details_key: + self._clear_details_popup() + + def _sync_selected_diagnostic_highlight( + self, + snapshot: DiagnosticsSnapshot | None, + diagnostics: tuple[Diagnostic, ...], + ) -> None: + if snapshot is None or snapshot.status != "ready" or not diagnostics: + self._clear_diagnostic_line_highlight() + return + diagnostic = self._selected_diagnostic() + if diagnostic is None: + self._clear_diagnostic_line_highlight() + return + setter = getattr(self.editor, "set_diagnostic_line_highlight", None) + if callable(setter): + setter(diagnostic, generation=snapshot.generation) + + def _clear_diagnostic_line_highlight(self) -> None: + clearer = getattr(self.editor, "clear_diagnostic_line_highlight", None) + if callable(clearer): + clearer() + + def _diagnostic_key(self, diagnostic: Diagnostic) -> tuple[object, ...]: + return ( + diagnostic.file_path, + diagnostic.line, + diagnostic.column, + diagnostic.severity, + diagnostic.source, + diagnostic.code, + diagnostic.message, + diagnostic.fix_hint, + diagnostic.suggested_code, + ) + + def _clear_legacy_lint_popup(self) -> None: + if hasattr(self.editor, "lint_panel_active"): + self.editor.lint_panel_active = False + drawer = getattr(self.editor, "drawer", None) + if drawer is not None and hasattr(drawer, "_next_lint_panel_hide_ts"): + drawer._next_lint_panel_hide_ts = 0 + + def _matches_key(self, key: Any, char: str) -> bool: + return key == char or key == ord(char) + + # ==================== GitPanel Class ==================== class GitPanel(BasePanel): """Class GitPanel diff --git a/tests/characterization/test_existing_keybindings.py b/tests/characterization/test_existing_keybindings.py index cf96e59..7d05013 100644 --- a/tests/characterization/test_existing_keybindings.py +++ b/tests/characterization/test_existing_keybindings.py @@ -187,6 +187,9 @@ def toggle_system_doctor_panel(self) -> bool: def run_lint_async(self) -> bool: return self._record("run_lint_async") + def toggle_diagnostics_panel(self) -> bool: + return self._record("toggle_diagnostics_panel") + def show_lint_panel(self) -> bool: return self._record("show_lint_panel") @@ -231,7 +234,7 @@ def test_action_map_keeps_existing_help_ai_file_manager_and_git_entrypoints() -> def test_current_f4_behavior_remains_diagnostics_not_git_panel() -> None: binder = make_keybinder() - assert method_name_for_key(binder, curses.KEY_F4) == "run_lint_async" + assert method_name_for_key(binder, curses.KEY_F4) == "toggle_diagnostics_panel" assert method_name_for_key(binder, curses.KEY_F4) != "show_git_panel" diff --git a/tests/core/test_diagnostics_service.py b/tests/core/test_diagnostics_service.py new file mode 100644 index 0000000..f03b54d --- /dev/null +++ b/tests/core/test_diagnostics_service.py @@ -0,0 +1,284 @@ +"""Tests for normalized diagnostics providers and scheduler behavior.""" + +from __future__ import annotations + +import json +import subprocess +import threading +import time +from pathlib import Path +from typing import Any + +import pytest + +from ecli.diagnostics.models import ( + Diagnostic, + DiagnosticRequest, + DiagnosticResult, + DiagnosticsSnapshot, +) +from ecli.diagnostics.ruff_provider import RuffDiagnosticProvider, parse_ruff_output +from ecli.diagnostics.service import DiagnosticsService + + +def request(tmp_path: Path, generation: int = 1) -> DiagnosticRequest: + return DiagnosticRequest( + generation=generation, + scope="buffer", + file_path=str(tmp_path / "sample.py"), + text="import os\n", + project_root=str(tmp_path), + language="python", + ) + + +def test_diagnostics_sorting_is_deterministic() -> None: + diagnostics = [ + Diagnostic("b.py", 2, 1, "warning", "B001", "b", "ruff"), + Diagnostic("a.py", 9, 1, "hint", "H001", "h", "ruff"), + Diagnostic("a.py", 1, 9, "error", "F821", "e2", "ruff"), + Diagnostic("a.py", 1, 1, "error", "F401", "e1", "ruff"), + ] + + ordered = sorted(diagnostics, key=lambda item: item.sort_key()) + + assert [ + (item.severity, item.file_path, item.line, item.column) for item in ordered + ] == [ + ("error", "a.py", 1, 1), + ("error", "a.py", 1, 9), + ("warning", "b.py", 2, 1), + ("hint", "a.py", 9, 1), + ] + + +def test_parse_ruff_json_output_normalizes_fix_hint_and_order() -> None: + raw = json.dumps( + [ + { + "filename": "b.py", + "location": {"row": 3, "column": 2}, + "code": "B006", + "message": "mutable default", + }, + { + "filename": "a.py", + "location": {"row": 1, "column": 5}, + "code": "F821", + "message": "undefined name", + "fix": {"message": "Replace with known_name"}, + }, + ] + ) + + diagnostics = parse_ruff_output(raw, default_file="fallback.py") + + assert [item.code for item in diagnostics] == ["F821", "B006"] + assert diagnostics[0].severity == "error" + assert diagnostics[0].fix_hint == "Replace with known_name" + assert diagnostics[1].severity == "warning" + + +def test_parse_ruff_json_preserves_syntax_null_and_invalid_codes() -> None: + raw = json.dumps( + [ + { + "filename": "sample.py", + "location": {"row": 4, "column": 1}, + "code": "invalid-syntax", + "message": "Expected an indented block", + }, + { + "filename": "sample.py", + "location": {"row": 5, "column": 1}, + "code": None, + "message": "null code diagnostic", + }, + { + "filename": "sample.py", + "location": {"row": 6, "column": 1}, + "code": {"unexpected": "shape"}, + "message": "invalid code diagnostic", + }, + ] + ) + + diagnostics = parse_ruff_output(raw, default_file="fallback.py") + + assert len(diagnostics) == 3 + assert [item.message for item in diagnostics] == [ + "Expected an indented block", + "null code diagnostic", + "invalid code diagnostic", + ] + assert diagnostics[0].code == "invalid-syntax" + assert diagnostics[1].code is None + assert diagnostics[2].code == "{'unexpected': 'shape'}" + + +def test_parse_ruff_text_output_is_deterministic() -> None: + diagnostics = parse_ruff_output( + "b.py:4:2: B006 mutable default\na.py:1:5: F821 undefined name\n", + default_file="fallback.py", + ) + + assert [ + (item.file_path, item.line, item.column, item.code) for item in diagnostics + ] == [ + ("a.py", 1, 5, "F821"), + ("b.py", 4, 2, "B006"), + ] + + +def test_missing_ruff_produces_controlled_diagnostic( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr("ecli.diagnostics.ruff_provider.shutil.which", lambda _: None) + provider = RuffDiagnosticProvider() + + result = provider.run(request(tmp_path)) + + assert result.status == "error" + assert result.diagnostics == () + assert "not found" in result.message + + +def test_non_python_buffer_does_not_run_ruff( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + calls: list[Any] = [] + monkeypatch.setattr("ecli.diagnostics.ruff_provider.shutil.which", lambda _: "ruff") + provider = RuffDiagnosticProvider(runner=lambda *args, **kwargs: calls.append(args)) + non_python = DiagnosticRequest( + generation=1, + scope="buffer", + file_path=str(tmp_path / "README.md"), + text="# readme\n", + project_root=str(tmp_path), + language="markdown", + ) + + result = provider.run(non_python) + + assert result.status == "skipped" + assert result.diagnostics == () + assert "only available for Python files" in result.message + assert calls == [] + + +def test_buffer_refresh_uses_stdin_filename_and_project_cwd( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + calls: list[dict[str, Any]] = [] + + def runner(command: list[str], **kwargs: Any) -> subprocess.CompletedProcess[str]: + calls.append({"command": command, **kwargs}) + return subprocess.CompletedProcess(command, 0, stdout="[]", stderr="") + + monkeypatch.setattr("ecli.diagnostics.ruff_provider.shutil.which", lambda _: "ruff") + provider = RuffDiagnosticProvider(runner=runner) + + result = provider.run(request(tmp_path)) + + assert result.status == "ready" + assert result.message == "Diagnostics: PASS — no issues found." + assert calls[0]["command"] == [ + "ruff", + "check", + "--output-format=json", + "--stdin-filename", + str(tmp_path / "sample.py"), + "-", + ] + assert calls[0]["cwd"] == str(tmp_path) + assert calls[0]["input"] == "import os\n" + + +def test_diagnostics_snapshot_result_replaces_previous_diagnostics() -> None: + old = Diagnostic("old.py", 1, 1, "warning", "F401", "old", "ruff") + new = Diagnostic("new.py", 2, 1, "warning", "F821", "new", "ruff") + snapshot = DiagnosticsSnapshot( + generation=1, + diagnostics=(old,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + result = DiagnosticResult( + generation=2, + diagnostics=(new,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + + updated = snapshot.with_result( + result, + running_generation=None, + pending_generation=None, + ) + + assert updated.generation == 2 + assert updated.diagnostics == (new,) + + +class BlockingProvider: + name = "blocking" + enabled = True + + def __init__(self) -> None: + """Initialize blocking provider state.""" + self.calls: list[int] = [] + self.release_first = threading.Event() + + def run(self, diagnostic_request: DiagnosticRequest) -> DiagnosticResult: + self.calls.append(diagnostic_request.generation) + if diagnostic_request.generation == 1: + assert self.release_first.wait(3) + return DiagnosticResult( + generation=diagnostic_request.generation, + diagnostics=(), + status="ready", + message="Diagnostics: PASS — no issues found.", + ) + + +def test_repeated_refresh_is_coalesced_to_latest_pending(tmp_path: Path) -> None: + provider = BlockingProvider() + service = DiagnosticsService() + service.register_provider(provider) + + gen1, started1, pending1 = service.request_refresh( + scope="buffer", + file_path=str(tmp_path / "a.py"), + text="", + project_root=str(tmp_path), + language="python", + ) + gen2, started2, pending2 = service.request_refresh( + scope="buffer", + file_path=str(tmp_path / "b.py"), + text="", + project_root=str(tmp_path), + language="python", + ) + gen3, started3, pending3 = service.request_refresh( + scope="buffer", + file_path=str(tmp_path / "c.py"), + text="", + project_root=str(tmp_path), + language="python", + ) + + assert (gen1, started1, pending1) == (1, True, None) + assert (gen2, started2, pending2) == (2, False, 2) + assert (gen3, started3, pending3) == (3, False, 3) + + provider.release_first.set() + deadline = time.monotonic() + 3 + while time.monotonic() < deadline and provider.calls != [1, 3]: + time.sleep(0.01) + + assert provider.calls == [1, 3] + assert all(result.generation != 2 for result in service.drain_results()) diff --git a/tests/ui/test_diagnostics_panel.py b/tests/ui/test_diagnostics_panel.py new file mode 100644 index 0000000..58911f7 --- /dev/null +++ b/tests/ui/test_diagnostics_panel.py @@ -0,0 +1,1011 @@ +"""Tests for the F4 Diagnostics panel integration.""" + +from __future__ import annotations + +import curses +import logging +import os +import threading +from pathlib import Path +from typing import Any, cast + +import pytest + +from ecli.core.Ecli import Ecli +from ecli.diagnostics.models import Diagnostic, DiagnosticsSnapshot +from ecli.ui.DrawScreen import DrawScreen +from ecli.ui.KeyBinder import KeyBinder +from ecli.ui.panels import DiagnosticsPanel + + +FAKE_WINDOWS: list[FakeWindow] = [] + + +class FakeWindow: + def __init__(self) -> None: + """Initialize captured fake window state.""" + self.drawn: list[str] = [] + self.drawn_attrs: list[tuple[str, int | None]] = [] + self.addstr_calls: list[tuple[int, int, str, int | None]] = [] + self.chgat_calls: list[tuple[int, int, int, int]] = [] + self.keypad_values: list[bool] = [] + FAKE_WINDOWS.append(self) + + def getmaxyx(self) -> tuple[int, int]: + return (30, 120) + + def keypad(self, value: bool) -> None: + self.keypad_values.append(value) + + def bkgd(self, *_args: object) -> None: + return None + + def erase(self) -> None: + self.drawn.clear() + self.drawn_attrs.clear() + + def border(self) -> None: + return None + + def addstr(self, *_args: Any) -> None: + if len(_args) >= 3: + y = int(_args[0]) + x = int(_args[1]) + text = str(_args[2]) + attr = ( + int(_args[3]) if len(_args) >= 4 and isinstance(_args[3], int) else None + ) + self.addstr_calls.append((y, x, text, attr)) + + def addnstr(self, *_args: Any) -> None: + if len(_args) >= 4: + text = str(_args[2])[: int(_args[3])] + attr = ( + int(_args[4]) if len(_args) >= 5 and isinstance(_args[4], int) else None + ) + self.drawn.append(text) + self.drawn_attrs.append((text, attr)) + + def chgat(self, *_args: Any) -> None: + if len(_args) >= 4: + self.chgat_calls.append( + (int(_args[0]), int(_args[1]), int(_args[2]), int(_args[3])) + ) + + def touchwin(self) -> None: + return None + + def noutrefresh(self) -> None: + return None + + def refresh(self) -> None: + return None + + +class FakePanelManager: + def __init__(self) -> None: + """Initialize panel lifecycle call capture.""" + self.active_panel: object | None = None + self.close_calls = 0 + self.show_calls: list[object] = [] + + def is_panel_active(self) -> bool: + return bool(getattr(self.active_panel, "visible", False)) + + def show_panel_instance(self, panel: object) -> None: + self.show_calls.append(panel) + if self.active_panel is panel and getattr(panel, "visible", False): + self.close_active_panel() + return + self.active_panel = panel + panel.visible = True # type: ignore[attr-defined] + + def close_active_panel(self) -> None: + self.close_calls += 1 + if self.active_panel is not None: + self.active_panel.visible = False # type: ignore[attr-defined] + + +class FakeBridge: + def __init__(self, snapshot: DiagnosticsSnapshot | None = None) -> None: + """Initialize diagnostics bridge call capture.""" + self.requests: list[str] = [] + self.diagnostics_snapshot = snapshot or DiagnosticsSnapshot() + + def request_diagnostics_refresh(self, scope: str = "buffer") -> bool: + self.requests.append(scope) + return True + + +class FakeEditor: + def __init__(self, snapshot: DiagnosticsSnapshot | None = None) -> None: + """Initialize a diagnostics-panel-compatible editor double.""" + self.stdscr = FakeWindow() + self.colors: dict[str, int] = { + "ui_success": 101, + "ui_panel_success": 105, + "ui_status_success": 106, + "ui_panel_warning": 102, + "ui_panel_error": 103, + "ui_info": 104, + } + self.focus = "panel" + self._force_full_redraw = False + self.status_message = "Ready" + self.status_messages: list[str] = [] + self.filename: str | None = None + self.modified = False + self._lexer = type("Lexer", (), {"name": "Python"})() + self.encoding = "utf-8" + self.cursor_y = 0 + self.cursor_x = 0 + self.text = [""] + self.insert_mode = True + self.git = None + self.config: dict[str, Any] = {} + self.is_lightweight = False + self.service_registry = None + self.panel_manager = FakePanelManager() + self.linter_bridge = FakeBridge(snapshot) + self.navigated: list[Diagnostic] = [] + self.lint_panel_active = False + self.lint_panel_message = "" + self.drawer = type("Drawer", (), {"_next_lint_panel_hide_ts": 0})() + self.diagnostic_line_highlight: dict[str, Any] | None = None + + def _set_status_message(self, message: str) -> None: + self.status_message = message + self.status_messages.append(message) + + def toggle_focus(self) -> bool: + self.focus = "editor" if self.focus == "panel" else "panel" + return True + + def goto_diagnostic(self, diagnostic: Diagnostic) -> bool: + self.navigated.append(diagnostic) + path = Path(diagnostic.file_path).name + self._set_status_message( + f"Jumped to {path}:{diagnostic.line}:{diagnostic.column}" + ) + return True + + def set_diagnostic_line_highlight( + self, + diagnostic: Diagnostic, + *, + generation: int | None = None, + ) -> None: + self.diagnostic_line_highlight = { + "file_path": os.path.abspath(diagnostic.file_path), + "line": diagnostic.line, + "severity": diagnostic.severity, + "generation": generation, + } + + def clear_diagnostic_line_highlight(self) -> None: + self.diagnostic_line_highlight = None + + def get_string_width(self, text: str) -> int: + return len(text) + + +class BinderEditor: + def __init__(self) -> None: + """Initialize a KeyBinder-compatible editor double.""" + self.history = type("History", (), {"undo": self._ok, "redo": self._ok})() + self.config: dict[str, Any] = {} + self.stdscr = FakeWindow() + self.is_lightweight = False + self.linter_bridge = object() + self.async_engine = None + self.status_message = "Ready" + self._lexer = None + self._state_lock = threading.RLock() + self.toggle_calls = 0 + + def __getattr__(self, _name: str) -> Any: + """Return a truthy no-op for unrelated editor actions.""" + return self._ok + + def _ok(self, *_args: Any, **_kwargs: Any) -> bool: + return True + + def _set_status_message(self, message: str) -> None: + self.status_message = message + + def insert_text(self, text: str) -> bool: + self.status_message = text + return True + + def toggle_diagnostics_panel(self) -> bool: + self.toggle_calls += 1 + return True + + +@pytest.fixture(autouse=True) +def fake_curses(monkeypatch: pytest.MonkeyPatch) -> None: + FAKE_WINDOWS.clear() + monkeypatch.setattr("ecli.ui.panels.curses.newwin", lambda *args: FakeWindow()) + monkeypatch.setattr("ecli.ui.panels.curses.curs_set", lambda value: None) + + +def make_panel( + snapshot: DiagnosticsSnapshot | None = None, +) -> tuple[DiagnosticsPanel, FakeEditor]: + editor = FakeEditor(snapshot) + panel = DiagnosticsPanel(editor.stdscr, editor) # type: ignore[arg-type] + editor.panel_manager.active_panel = panel + return panel, editor + + +def rendered(panel: DiagnosticsPanel) -> str: + return "\n".join(cast(FakeWindow, panel.win).drawn) + + +def rendered_popup() -> str: + return "\n".join(FAKE_WINDOWS[-1].drawn) + + +def diagnostics_rows(panel: DiagnosticsPanel) -> list[str]: + return [ + line + for line in cast(FakeWindow, panel.win).drawn + if line.startswith(("E ", "W ", "I ", "H ")) + ] + + +def drawn_attr(panel: DiagnosticsPanel, text: str) -> int | None: + for line, attr in cast(FakeWindow, panel.win).drawn_attrs: + if line == text: + return attr + return None + + +def test_f4_key_opens_diagnostics_panel() -> None: + editor = BinderEditor() + binder = KeyBinder(editor) # type: ignore[arg-type] + + assert binder.handle_input(curses.KEY_F4) is True + + assert editor.toggle_calls == 1 + + +def test_f4_and_escape_close_diagnostics_panel() -> None: + editor = cast(Any, Ecli.__new__(Ecli)) + manager = FakePanelManager() + panel = type("Panel", (), {"visible": False})() + editor.panel_manager = manager + editor.diagnostics_panel_instance = panel + editor._set_status_message = lambda _message: None + + assert editor.toggle_diagnostics_panel() is True + assert panel.visible is True + assert editor.toggle_diagnostics_panel() is True + assert panel.visible is False + + diagnostics_panel, diagnostics_editor = make_panel() + diagnostics_panel.visible = True + assert diagnostics_panel.handle_key(27) is True + assert diagnostics_editor.panel_manager.close_calls == 1 + + +def test_ctrl_q_exits_while_diagnostics_panel_is_active() -> None: + class FakeKeyBinder: + def is_key_for_action(self, key: int | str, action_name: str) -> bool: + return action_name == "quit" and key == 17 + + editor = cast(Any, Ecli.__new__(Ecli)) + editor.focus = "panel" + editor.keybinder = FakeKeyBinder() + editor.panel_manager = FakePanelManager() + editor.panel_manager.active_panel = type("Panel", (), {"visible": True})() + editor.handled: list[int | str] = [] + editor.handle_input = lambda key: editor.handled.append(key) or True + + assert editor._handle_input_dispatch(17) is True + + assert editor.handled == [17] + + +def test_event_queues_are_drained_after_waiting_for_key_before_dispatch() -> None: + class FakeKeyReader: + PASTE_EVENT = KeyBinder.PASTE_EVENT + last_paste = "" + + def get_key_input(self) -> int: + return curses.KEY_ENTER + + editor = cast(Any, Ecli.__new__(Ecli)) + editor.keybinder = FakeKeyReader() + editor.events: list[str] = [] + editor.queue_drains = 0 + editor.diagnostics_ready = False + + def drain_queues() -> bool: + editor.queue_drains += 1 + editor.events.append(f"drain-{editor.queue_drains}") + if editor.queue_drains == 2: + editor.diagnostics_ready = True + return True + return False + + def dispatch(key: int) -> bool: + assert key == curses.KEY_ENTER + editor.events.append(f"dispatch-ready-{editor.diagnostics_ready}") + return True + + editor._process_all_queues = drain_queues + editor._handle_input_dispatch = dispatch + + assert editor._process_events_and_input() is True + + assert editor.events == ["drain-1", "drain-2", "dispatch-ready-True"] + + +def test_panel_open_does_not_schedule_ruff() -> None: + panel, editor = make_panel() + + panel.open() + + assert editor.linter_bridge.requests == [] + + +def test_initial_panel_state_and_action_bar_are_explicit() -> None: + panel, _editor = make_panel() + panel.width = 90 + panel.visible = True + panel.draw() + + text = rendered(panel) + assert "Diagnostics not run yet." in text + assert "Press r to run diagnostics for this file." in text + assert "r:Run file" in text + assert "d:Details" in text + + +def test_refresh_keys_schedule_background_diagnostics() -> None: + panel, editor = make_panel() + panel.visible = True + + assert panel.handle_key(ord("r")) is True + assert panel.handle_key(ord("R")) is True + + assert editor.linter_bridge.requests == ["buffer", "workspace"] + + +def test_clean_skipped_and_error_states_are_rendered() -> None: + running_panel, _editor = make_panel(DiagnosticsSnapshot(status="running")) + running_panel.visible = True + running_panel.draw() + + assert "Running diagnostics..." in rendered(running_panel) + + empty_panel, _editor = make_panel(DiagnosticsSnapshot(status="ready")) + empty_panel.visible = True + empty_panel.draw() + + text = rendered(empty_panel) + assert "Diagnostics: PASS" in text + assert "No issues found." in text + assert "No diagnostics" not in text + assert drawn_attr(empty_panel, "Diagnostics: PASS") == 105 + + skipped_panel, _editor = make_panel( + DiagnosticsSnapshot(status="skipped", message="not a Python file") + ) + skipped_panel.visible = True + skipped_panel.draw() + + assert "Diagnostics skipped: not a Python file" in rendered(skipped_panel) + + error_panel, _editor = make_panel( + DiagnosticsSnapshot(status="error", message="Ruff executable not found") + ) + error_panel.visible = True + error_panel.draw() + + assert "Diagnostics failed: Ruff executable not found" in rendered(error_panel) + + +def test_pass_status_message_uses_success_style_hook() -> None: + editor = FakeEditor() + editor.status_message = "Diagnostics: PASS — no issues found." + editor.filename = "/tmp/f4_bad.py" + window = FakeWindow() + drawer = cast(Any, DrawScreen.__new__(DrawScreen)) + drawer.editor = editor + drawer.stdscr = window + drawer.colors = editor.colors + drawer.config = {} + + drawer._draw_status_bar() + + assert any(call[3] == 106 for call in window.chgat_calls) + + +def test_enter_navigation_uses_editor_diagnostic_path() -> None: + diagnostic = Diagnostic( + file_path="/tmp/example.py", + line=7, + column=3, + severity="warning", + code="F401", + message="unused import", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + ) + panel.visible = True + panel.draw() + cursor_before = getattr(editor, "cursor_y", None) + + assert panel.handle_key(curses.KEY_ENTER) is True + + assert editor.navigated == [diagnostic] + assert editor.diagnostic_line_highlight == { + "file_path": os.path.abspath("/tmp/example.py"), + "line": 7, + "severity": "warning", + "generation": 1, + } + assert getattr(editor, "cursor_y", None) == cursor_before + assert editor.lint_panel_active is False + assert panel._details_is_open() is False + + +def test_selected_diagnostic_sets_and_updates_editor_highlight(tmp_path: Path) -> None: + first = Diagnostic( + file_path=str(tmp_path / "first.py"), + line=4, + column=2, + severity="error", + code="E999", + message="first issue", + source="ruff", + ) + second = Diagnostic( + file_path=str(tmp_path / "second.py"), + line=9, + column=1, + severity="warning", + code="F821", + message="second issue", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=7, + diagnostics=(first, second), + status="ready", + message="Diagnostics: 2 issue(s).", + ) + ) + panel.visible = True + panel.draw() + + assert editor.diagnostic_line_highlight == { + "file_path": os.path.abspath(str(tmp_path / "first.py")), + "line": 4, + "severity": "error", + "generation": 7, + } + + assert panel.handle_key(curses.KEY_DOWN) is True + + assert editor.diagnostic_line_highlight == { + "file_path": os.path.abspath(str(tmp_path / "second.py")), + "line": 9, + "severity": "warning", + "generation": 7, + } + + +def test_selected_diagnostic_highlight_clears_on_close_refresh_and_clean_result( + tmp_path: Path, +) -> None: + diagnostic = Diagnostic( + file_path=str(tmp_path / "bad.py"), + line=3, + column=1, + severity="warning", + code="F401", + message="issue", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + ) + panel.visible = True + panel.draw() + assert editor.diagnostic_line_highlight is not None + + assert panel.handle_key(ord("r")) is True + assert editor.diagnostic_line_highlight is None + + editor.linter_bridge.diagnostics_snapshot = DiagnosticsSnapshot( + generation=2, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + panel.draw() + assert editor.diagnostic_line_highlight is not None + + editor.linter_bridge.diagnostics_snapshot = DiagnosticsSnapshot( + generation=3, + diagnostics=(), + status="ready", + message="Diagnostics: PASS — no issues found.", + ) + panel.draw() + assert editor.diagnostic_line_highlight is None + + editor.linter_bridge.diagnostics_snapshot = DiagnosticsSnapshot( + generation=4, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + panel.draw() + assert editor.diagnostic_line_highlight is not None + + assert panel.handle_key(27) is True + assert editor.diagnostic_line_highlight is None + + +def test_drawscreen_maps_selected_diagnostic_highlight_to_current_file( + tmp_path: Path, +) -> None: + editor = type("Editor", (), {})() + current_file = tmp_path / "current.py" + other_file = tmp_path / "other.py" + editor.filename = str(current_file) + editor.diagnostic_line_highlight = { + "file_path": os.path.abspath(str(current_file)), + "line": 5, + "severity": "warning", + "generation": 3, + } + drawer = cast(Any, DrawScreen.__new__(DrawScreen)) + drawer.editor = editor + drawer.colors = {"ui_warning": 700} + + assert drawer._active_diagnostic_highlight_line() == 4 + assert drawer._diagnostic_line_number_attr(9) == 700 + + editor.filename = str(other_file) + + assert drawer._active_diagnostic_highlight_line() is None + + +def test_editor_diagnostic_highlight_file_mismatch_clear_is_safe_when_absent( + tmp_path: Path, +) -> None: + editor = cast(Any, Ecli.__new__(Ecli)) + editor.filename = str(tmp_path / "current.py") + + editor._clear_diagnostic_line_highlight_if_file_mismatch() + + assert editor.diagnostic_line_highlight is None + + +def test_editor_clears_selected_diagnostic_highlight_when_file_switches( + tmp_path: Path, +) -> None: + current_file = tmp_path / "current.py" + other_file = tmp_path / "other.py" + editor = cast(Any, Ecli.__new__(Ecli)) + editor.filename = str(current_file) + editor._force_full_redraw = False + editor.diagnostic_line_highlight = { + "file_path": os.path.abspath(str(current_file)), + "line": 2, + "severity": "warning", + "generation": 1, + } + + editor._clear_diagnostic_line_highlight_if_file_mismatch() + assert editor.diagnostic_line_highlight is not None + + editor.filename = str(other_file) + editor._clear_diagnostic_line_highlight_if_file_mismatch() + + assert editor.diagnostic_line_highlight is None + assert editor._force_full_redraw is True + + +def test_details_key_opens_selected_diagnostic_popup_only(tmp_path: Path) -> None: + selected = Diagnostic( + file_path=str(tmp_path / "f4_bad.py"), + line=1, + column=11, + severity="warning", + code="invalid-syntax", + message="Expected `:`, found newline", + source="ruff", + fix_hint="Insert a colon", + suggested_code="class Askold:\n pass\n", + ) + other = Diagnostic( + file_path=str(tmp_path / "other.py"), + line=3, + column=1, + severity="warning", + code="F401", + message="Unused import os", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(selected, other), + status="ready", + message="Diagnostics: 2 issue(s).", + ) + ) + editor.filename = str(tmp_path / "f4_bad.py") + panel.visible = True + + assert panel.handle_key(ord("d")) is True + panel.draw() + + text = rendered_popup() + assert "Diagnostic details" in text + assert "File: f4_bad.py" in text + assert "Location: 1:11" in text + assert "Source: ruff" in text + assert "Code: invalid-syntax" in text + assert "Message: Expected `:`, found newline" in text + assert "Fix hint: Insert a colon" in text + assert "Suggested: class Askold: pass" in text + assert "Preview only. No changes were applied." in text + assert "Unused import os" not in text + + assert panel.handle_key(27) is True + assert panel._details_is_open() is False + + +def test_space_opens_details_popup(tmp_path: Path) -> None: + diagnostic = Diagnostic( + file_path=str(tmp_path / "f4_bad.py"), + line=2, + column=4, + severity="warning", + code=None, + message="Syntax detail", + source="ruff", + ) + panel, _editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + ) + panel.visible = True + + assert panel.handle_key(ord(" ")) is True + panel.draw() + + text = rendered_popup() + assert "Diagnostic details" in text + assert "Code:" not in text + + +def test_refresh_and_selection_movement_close_details_popup(tmp_path: Path) -> None: + first = Diagnostic( + file_path=str(tmp_path / "first.py"), + line=1, + column=1, + severity="warning", + code="F401", + message="first issue", + source="ruff", + ) + second = Diagnostic( + file_path=str(tmp_path / "second.py"), + line=2, + column=1, + severity="warning", + code="F821", + message="second issue", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(first, second), + status="ready", + message="Diagnostics: 2 issue(s).", + ) + ) + panel.visible = True + + assert panel.handle_key(ord("d")) is True + assert panel._details_is_open() is True + + assert panel.handle_key(curses.KEY_DOWN) is True + assert panel._details_is_open() is False + + assert panel.handle_key(ord("d")) is True + assert panel._details_is_open() is True + + assert panel.handle_key(ord("r")) is True + assert panel._details_is_open() is False + assert editor.linter_bridge.requests == ["buffer"] + + +def test_generation_change_invalidates_existing_details_popup(tmp_path: Path) -> None: + old = Diagnostic( + file_path=str(tmp_path / "old.py"), + line=1, + column=1, + severity="warning", + code="F401", + message="old issue", + source="ruff", + ) + new = Diagnostic( + file_path=str(tmp_path / "new.py"), + line=1, + column=1, + severity="warning", + code="F821", + message="new issue", + source="ruff", + ) + snapshot = DiagnosticsSnapshot( + generation=1, + diagnostics=(old,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + panel, editor = make_panel(snapshot) + panel.visible = True + + assert panel.handle_key(ord("d")) is True + editor.linter_bridge.diagnostics_snapshot = DiagnosticsSnapshot( + generation=2, + diagnostics=(new,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + panel.draw() + + assert panel._details_is_open() is False + + +def test_diagnostics_row_uses_relative_path_not_absolute_path(tmp_path: Path) -> None: + diagnostic_path = tmp_path / "pkg" / "module" / "bad.py" + diagnostic = Diagnostic( + file_path=str(diagnostic_path), + line=12, + column=4, + severity="warning", + code="F401", + message="Unused import os", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + ) + editor.filename = str(tmp_path / "open.py") + panel.visible = True + panel.draw() + + row = diagnostics_rows(panel)[0] + assert str(tmp_path) not in row + assert "pkg/module/bad.py:12:4" in row + assert row.startswith("W ruff ") + + +def test_panel_renders_diagnostics_count_and_visible_count(tmp_path: Path) -> None: + diagnostics = tuple( + Diagnostic( + file_path=str(tmp_path / f"bad_{index}.py"), + line=index + 1, + column=1, + severity="warning", + code="F401", + message=f"issue {index}", + source="ruff", + ) + for index in range(10) + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=diagnostics, + status="ready", + message="Diagnostics: 10 issue(s).", + ) + ) + editor.filename = str(tmp_path / "open.py") + panel.height = 8 + panel.visible = True + panel.draw() + + text = rendered(panel) + assert "Diagnostics: 10 issue(s)." in text + assert "Showing 4/10 diagnostics" in text + assert len(diagnostics_rows(panel)) == 4 + + +def test_diagnostic_message_remains_visible_when_width_is_constrained( + tmp_path: Path, +) -> None: + diagnostic = Diagnostic( + file_path=str(tmp_path / "a" / "very" / "deep" / "package" / "bad.py"), + line=1, + column=1, + severity="error", + code="E999", + message="Expected `:`, found newline", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + ) + editor.filename = str(tmp_path / "open.py") + panel.width = 42 + panel.visible = True + panel.draw() + + row = diagnostics_rows(panel)[0] + assert ":1:1" in row + assert "Expected" in row + assert str(tmp_path) not in row + + +def test_long_path_truncation_preserves_location_and_message(tmp_path: Path) -> None: + diagnostic = Diagnostic( + file_path=str( + tmp_path / "alpha" / "beta" / "gamma" / "delta" / "epsilon" / "bad.py" + ), + line=123, + column=45, + severity="warning", + code="F821", + message="Undefined name target_value", + source="ruff", + ) + panel, editor = make_panel( + DiagnosticsSnapshot( + generation=1, + diagnostics=(diagnostic,), + status="ready", + message="Diagnostics: 1 issue(s).", + ) + ) + editor.filename = str(tmp_path / "open.py") + panel.width = 38 + panel.visible = True + panel.draw() + + row = diagnostics_rows(panel)[0] + assert ":123:45" in row + assert "Undefined" in row + assert str(tmp_path) not in row + assert len(row) <= panel.width - 2 + + +def make_goto_editor(file_path: Path, lines: list[str]) -> Any: + editor = cast(Any, Ecli.__new__(Ecli)) + editor.filename = str(file_path) + editor.text = list(lines) + editor.status_message = "Ready" + editor.status_messages = [] + editor.cursor_y = 0 + editor.cursor_x = 0 + editor.scroll_top = 0 + editor.scroll_left = 0 + editor.stdscr = FakeWindow() + editor.drawer = type("Drawer", (), {"_text_start_x": 0})() + editor.panel_manager = FakePanelManager() + editor.focus = "panel" + editor._force_full_redraw = False + editor.service_registry = None + + def set_status(message: str, *_args: Any, **_kwargs: Any) -> None: + editor.status_message = message + editor.status_messages.append(message) + + editor._set_status_message = set_status + return editor + + +def test_goto_diagnostic_reports_jump_status_with_relative_location( + tmp_path: Path, +) -> None: + file_path = tmp_path / "f4_bad.py" + file_path.write_text("def ok():\n return 1\n", encoding="utf-8") + editor = make_goto_editor(file_path, ["def ok():", " return 1"]) + diagnostic = Diagnostic( + file_path=str(file_path), + line=2, + column=5, + severity="warning", + code="F401", + message="Unused import os", + source="ruff", + ) + + assert editor.goto_diagnostic(diagnostic) is True + + assert editor.cursor_y == 1 + assert editor.cursor_x == 4 + assert editor.status_messages[-1] == "Jumped to f4_bad.py:2:5" + + +def test_goto_diagnostic_missing_file_reports_controlled_status( + tmp_path: Path, + caplog: pytest.LogCaptureFixture, +) -> None: + current_file = tmp_path / "current.py" + current_file.write_text("x = 1\n", encoding="utf-8") + missing_file = tmp_path / "missing.py" + editor = make_goto_editor(current_file, ["x = 1"]) + diagnostic = Diagnostic( + file_path=str(missing_file), + line=1, + column=1, + severity="warning", + code="F401", + message="Unused import os", + source="ruff", + ) + + with caplog.at_level(logging.WARNING): + assert editor.goto_diagnostic(diagnostic) is True + + assert editor.status_messages[-1] == "Diagnostics: file not available: missing.py" + assert "file not available" in caplog.text + + +def test_goto_diagnostic_out_of_range_location_reports_controlled_status( + tmp_path: Path, + caplog: pytest.LogCaptureFixture, +) -> None: + file_path = tmp_path / "current.py" + file_path.write_text("x = 1\n", encoding="utf-8") + editor = make_goto_editor(file_path, ["x = 1"]) + diagnostic = Diagnostic( + file_path=str(file_path), + line=1, + column=80, + severity="warning", + code="F401", + message="Unused import os", + source="ruff", + ) + + with caplog.at_level(logging.WARNING): + assert editor.goto_diagnostic(diagnostic) is True + + assert ( + editor.status_messages[-1] + == "Diagnostics: column out of range for current.py:1:80" + ) + assert "column out of range" in caplog.text