diff --git a/README.md b/README.md index 8ab7581..f8b4efb 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Async Python client for [NanoKVM](https://github.com/sipeed/NanoKVM). from nanokvm.client import NanoKVMClient from nanokvm.models import GpioType, MouseButton -async with NanoKVMClient("http://kvm-8b76.local/api/") as client: +# NanoKVM (auto-detects password mode) +async with NanoKVMClient("https://kvm.local/api/") as client: await client.authenticate("username", "password") # Get device information @@ -50,3 +51,66 @@ disk = await ssh.run_command("df -h /") await ssh.disconnect() ``` + +### Password Obfuscation Modes + +By default, the client **auto-detects** the correct password mode. It tries obfuscated password first, and falls back to plain text if authentication fails. You can also force a specific mode: + +```python +# Auto-detect (default) — recommended +async with NanoKVMClient("https://kvm.local/api/") as client: + await client.authenticate("username", "password") + +# Force plain text (newer NanoKVM with HTTPS) +async with NanoKVMClient( + "https://kvm.local/api/", + use_password_obfuscation=False +) as client: + await client.authenticate("username", "password") + +# Force obfuscation (older NanoKVM with HTTP) +async with NanoKVMClient( + "http://kvm.local/api/", + use_password_obfuscation=True +) as client: + await client.authenticate("username", "password") +``` + +## HTTPS/SSL Configuration + +The client supports HTTPS connections with flexible SSL/TLS configuration options. + +### Standard HTTPS (Let's Encrypt, Public CA) + +For modern NanoKVM devices with HTTPS and valid certificates: + +```python +async with NanoKVMClient("https://kvm.local/api/") as client: + await client.authenticate("username", "password") +``` + +### Self-Signed Certificates + +For self-signed certificates, you have two options: + +#### Option 1: Disable verification (testing only) + +**Warning:** This is insecure and should only be used for testing! + +```python +async with NanoKVMClient( + "https://kvm.local/api/", + verify_ssl=False, +) as client: + await client.authenticate("username", "password") +``` + +#### Option 2: Use custom CA certificate (recommended) + +```python +async with NanoKVMClient( + "https://kvm.local/api/", + ssl_ca_cert="/path/to/ca.pem", +) as client: + await client.authenticate("username", "password") +``` diff --git a/nanokvm/client.py b/nanokvm/client.py index 0224252..2eb7475 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -8,10 +8,18 @@ import io import json import logging +import ssl from typing import Any, TypeVar, overload import aiohttp -from aiohttp import BodyPartReader, ClientResponse, ClientSession, MultipartReader, hdrs +from aiohttp import ( + BodyPartReader, + ClientResponse, + ClientSession, + MultipartReader, + TCPConnector, + hdrs, +) from PIL import Image from pydantic import BaseModel, ValidationError import yarl @@ -115,20 +123,63 @@ def __init__( *, token: str | None = None, request_timeout: int = 10, + verify_ssl: bool = True, + ssl_ca_cert: str | None = None, + use_password_obfuscation: bool | None = None, ) -> None: """ Initialize the NanoKVM client. Args: - url: Base URL of the NanoKVM API (e.g., "http://192.168.1.1/api/") + url: Base URL of the NanoKVM API (e.g., "https://kvm.local/api/") token: Optional pre-existing authentication token request_timeout: Request timeout in seconds (default: 10) + verify_ssl: Enable SSL certificate verification (default: True). + Set to False to disable verification for self-signed certificates. + ssl_ca_cert: Path to custom CA certificate bundle file for SSL verification. + Useful for self-signed certificates or private CAs. + use_password_obfuscation: Control password obfuscation mode (default: None). + None = auto-detect (try obfuscated first, fall back to plain text). + True = always use obfuscated passwords (older NanoKVM versions). + False = always use plain text passwords (newer HTTPS-enabled versions). """ self.url = yarl.URL(url) self._session: ClientSession | None = None self._token = token self._request_timeout = request_timeout self._ws: aiohttp.ClientWebSocketResponse | None = None + self._verify_ssl = verify_ssl + self._ssl_ca_cert = ssl_ca_cert + self._use_password_obfuscation = use_password_obfuscation + + def _create_ssl_context(self) -> ssl.SSLContext | bool: + """ + Create and configure SSL context based on initialization parameters. + + Returns: + ssl.SSLContext: Configured SSL context for custom certificates + True: Use default SSL verification (aiohttp default) + False: Disable SSL verification + + Raises: + FileNotFoundError: If the CA certificate file is missing. + ssl.SSLError: If the CA certificate is invalid. + """ + + if not self._verify_ssl: + _LOGGER.warning( + "SSL verification is disabled. This is insecure and should only be " + "used for testing with self-signed certificates." + ) + return False + + if not self._ssl_ca_cert: + return True + + ssl_ctx = ssl.create_default_context(cafile=self._ssl_ca_cert) + _LOGGER.debug("Using custom CA certificate: %s", self._ssl_ca_cert) + + return ssl_ctx @property def token(self) -> str | None: @@ -137,7 +188,16 @@ def token(self) -> str | None: async def __aenter__(self) -> NanoKVMClient: """Async context manager entry.""" - self._session = ClientSession() + + ssl_config = await asyncio.to_thread(self._create_ssl_context) + connector = TCPConnector(ssl=ssl_config) + self._session = ClientSession(connector=connector) + + _LOGGER.debug( + "Created client session with SSL verification: %s", + "disabled" if ssl_config is False else "enabled", + ) + return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: @@ -243,9 +303,8 @@ async def _api_request_json( return api_response.data - async def authenticate(self, username: str, password: str) -> None: - """Authenticate and store the session token.""" - _LOGGER.debug("Attempting authentication for user: %s", username) + async def _do_authenticate(self, username: str, password_to_send: str) -> None: + """Perform a single authentication attempt with the given password.""" try: login_response = await self._api_request_json( hdrs.METH_POST, @@ -254,7 +313,7 @@ async def authenticate(self, username: str, password: str) -> None: authenticate=False, data=LoginReq( username=username, - password=obfuscate_password(password), + password=password_to_send, ), ) @@ -272,6 +331,32 @@ async def authenticate(self, username: str, password: str) -> None: else: raise + async def authenticate(self, username: str, password: str) -> None: + """Authenticate and store the session token.""" + _LOGGER.debug("Attempting authentication for user: %s", username) + + if self._use_password_obfuscation is True: + _LOGGER.debug("Using password obfuscation (forced)") + await self._do_authenticate(username, obfuscate_password(password)) + elif self._use_password_obfuscation is False: + _LOGGER.debug("Using plain text password (forced)") + await self._do_authenticate(username, password) + else: + # Auto-detect: try obfuscated first, fall back to plain text + _LOGGER.debug("Auto-detecting password mode") + try: + await self._do_authenticate( + username, obfuscate_password(password) + ) + _LOGGER.info("Auto-detected obfuscated password mode") + except NanoKVMAuthenticationFailure: + _LOGGER.debug( + "Obfuscated authentication failed, " + "trying plain text password" + ) + await self._do_authenticate(username, password) + _LOGGER.info("Auto-detected plain text password mode") + async def logout(self) -> None: """Log out and clear the session token.""" if not self._token or self._token == "disabled": diff --git a/tests/test_ssl.py b/tests/test_ssl.py new file mode 100644 index 0000000..ce66808 --- /dev/null +++ b/tests/test_ssl.py @@ -0,0 +1,253 @@ +"""Tests for SSL/TLS configuration and password obfuscation.""" + +from pathlib import Path +import ssl +from unittest.mock import MagicMock, patch + +from aioresponses import aioresponses +import pytest +import yarl + +from nanokvm.client import NanoKVMAuthenticationFailure, NanoKVMClient + + +async def test_default_ssl_verification_enabled() -> None: + """Test that SSL verification is enabled by default.""" + client = NanoKVMClient("https://kvm.local/api/") + + ssl_config = client._create_ssl_context() + assert ssl_config is True + + +async def test_ssl_verification_disabled() -> None: + """Test SSL verification can be disabled.""" + client = NanoKVMClient("https://kvm.local/api/", verify_ssl=False) + + ssl_config = client._create_ssl_context() + assert ssl_config is False + + +async def test_custom_ca_certificate(tmp_path: Path) -> None: + """Test custom CA certificate configuration.""" + ca_cert_file = tmp_path / "ca.pem" + ca_cert_file.write_text("DUMMY CA CERT") + + with patch("ssl.create_default_context") as mock_ssl_context: + mock_ctx = MagicMock(spec=ssl.SSLContext) + mock_ssl_context.return_value = mock_ctx + + client = NanoKVMClient( + "https://kvm.local/api/", ssl_ca_cert=str(ca_cert_file) + ) + + ssl_config = client._create_ssl_context() + + mock_ssl_context.assert_called_once_with(cafile=str(ca_cert_file)) + assert isinstance(ssl_config, MagicMock) + + +async def test_nonexistent_ca_cert_raises_error() -> None: + """Test that non-existent CA cert file raises FileNotFoundError.""" + client = NanoKVMClient( + "https://kvm.local/api/", ssl_ca_cert="/path/that/does/not/exist.pem" + ) + + with pytest.raises(FileNotFoundError): + client._create_ssl_context() + + +async def test_http_url_works_regardless_of_ssl_config() -> None: + """Test that HTTP URL (not HTTPS) works regardless of SSL config.""" + client = NanoKVMClient("http://kvm.local/api/", verify_ssl=False) + + async with client: + assert client._session is not None + + +async def test_session_created_with_tcp_connector() -> None: + """Test that session is created with TCPConnector.""" + client = NanoKVMClient("https://kvm.local/api/") + + async with client: + assert client._session is not None + assert client._session.connector is not None + + +async def test_password_obfuscation_auto_by_default() -> None: + """Test that password obfuscation defaults to None (auto-detect).""" + client = NanoKVMClient("https://kvm.local/api/") + assert client._use_password_obfuscation is None + + +async def test_password_obfuscation_can_be_disabled() -> None: + """Test that password obfuscation can be disabled.""" + client = NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=False + ) + assert client._use_password_obfuscation is False + + +async def test_authenticate_with_plain_text_password() -> None: + """Test authentication with plain text password (newer NanoKVM).""" + async with NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=False + ) as client: + with aioresponses() as m: + m.post( + "https://kvm.local/api/auth/login", + payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["username"] == "root" + assert request_json["password"] == "password123" + + +async def test_authenticate_with_obfuscated_password() -> None: + """Test authentication with obfuscated password (older NanoKVM).""" + async with NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=True + ) as client: + with aioresponses() as m: + m.post( + "https://kvm.local/api/auth/login", + payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["username"] == "root" + assert request_json["password"].startswith("U2FsdGVkX1") + assert request_json["password"] != "password123" + + +async def test_auto_detect_obfuscation_succeeds() -> None: + """Test auto-detect succeeds with obfuscated password on first attempt.""" + async with NanoKVMClient("https://kvm.local/api/") as client: + with aioresponses() as m: + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": 0, + "msg": "success", + "data": {"token": "abc123"}, + }, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[ + ("POST", yarl.URL("https://kvm.local/api/auth/login")) + ] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["password"].startswith("U2FsdGVkX1") + assert client.token == "abc123" + + +async def test_auto_detect_fallback_to_plain_text() -> None: + """Test auto-detect falls back to plain text after obfuscated fails.""" + async with NanoKVMClient("https://kvm.local/api/") as client: + with aioresponses() as m: + # First call: obfuscated fails with code -2 + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": -2, + "msg": "invalid username or password", + "data": None, + }, + ) + # Second call: plain text succeeds + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": 0, + "msg": "success", + "data": {"token": "abc123"}, + }, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[ + ("POST", yarl.URL("https://kvm.local/api/auth/login")) + ] + assert len(calls) == 2 + # First attempt: obfuscated + assert calls[0].kwargs.get("json")[ + "password" + ].startswith("U2FsdGVkX1") + # Second attempt: plain text + assert ( + calls[1].kwargs.get("json")["password"] + == "password123" + ) + assert client.token == "abc123" + + +async def test_auto_detect_both_fail() -> None: + """Test auto-detect raises NanoKVMAuthenticationFailure when both fail.""" + async with NanoKVMClient("https://kvm.local/api/") as client: + with aioresponses() as m: + # First call: obfuscated fails + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": -2, + "msg": "invalid username or password", + "data": None, + }, + ) + # Second call: plain text also fails + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": -2, + "msg": "invalid username or password", + "data": None, + }, + ) + + with pytest.raises(NanoKVMAuthenticationFailure): + await client.authenticate("root", "wrong_password") + + +async def test_full_client_lifecycle_with_ssl() -> None: + """Test full client lifecycle with SSL configuration.""" + async with NanoKVMClient( + "https://kvm.local/api/", token="test-token", verify_ssl=True + ) as client: + with aioresponses() as m: + m.get( + "https://kvm.local/api/vm/info", + payload={ + "code": 0, + "msg": "success", + "data": { + "ips": [ + { + "name": "eth0", + "addr": "192.168.1.100", + "version": "4", + "type": "ethernet", + } + ], + "mdns": "kvm.local", + "image": "v1.0.0", + "application": "v2.1.0", + "deviceKey": "abc123", + }, + }, + ) + + info = await client.get_info() + assert len(info.ips) == 1 + assert info.ips[0].addr == "192.168.1.100"