From 116e8e686198e3c96f99769c58ade33093967cb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Gierwia=C5=82o?= Date: Fri, 13 Feb 2026 17:29:11 +0100 Subject: [PATCH 1/3] Add HTTPS/SSL support and password obfuscation control --- README.md | 63 ++++++++++++++++- nanokvm/client.py | 85 ++++++++++++++++++++-- tests/test_ssl.py | 175 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 318 insertions(+), 5 deletions(-) create mode 100644 tests/test_ssl.py diff --git a/README.md b/README.md index 8ab7581..733bb58 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,11 @@ Async Python client for [NanoKVM](https://github.com/sipeed/NanoKVM). from nanokvm.client import NanoKVMClient from nanokvm.models import GpioType, MouseButton -async with NanoKVMClient("http://kvm-8b76.local/api/") as client: +#NanoKVM with HTTPS (recommended) +async with NanoKVMClient( + "https://kvm.local/api/", + use_password_obfuscation=False # Newer versions use plain text +) as client: await client.authenticate("username", "password") # Get device information @@ -50,3 +54,60 @@ disk = await ssh.run_command("df -h /") await ssh.disconnect() ``` + +### For Legacy NanoKVM (HTTP with password obfuscation) + +```python +# Older NanoKVM versions with HTTP (backward compatibility) +async with NanoKVMClient( + "http://kvm.local/api/", + use_password_obfuscation=True # Default: True for backward compatibility +) as client: + await client.authenticate("username", "password") +``` + +**Note:** Password obfuscation is enabled by default for backward compatibility with older NanoKVM versions. For newer HTTPS-enabled devices, set `use_password_obfuscation=False`. + +## HTTPS/SSL Configuration + +The client supports HTTPS connections with flexible SSL/TLS configuration options. + +### Standard HTTPS (Let's Encrypt, Public CA) + +For modern NanoKVM devices with HTTPS and valid certificates: + +```python +async with NanoKVMClient( + "https://kvm.local/api/", + use_password_obfuscation=False +) as client: + await client.authenticate("username", "password") +``` + +### Self-Signed Certificates + +For self-signed certificates, you have two options: + +#### Option 1: Disable verification (testing only) + +**Warning:** This is insecure and should only be used for testing! + +```python +async with NanoKVMClient( + "https://kvm.local/api/", + verify_ssl=False, + use_password_obfuscation=False +) as client: + await client.authenticate("username", "password") +``` + +#### Option 2: Use custom CA certificate (recommended) + +```python +async with NanoKVMClient( + "https://kvm.local/api/", + ssl_ca_cert="/path/to/ca.pem", + use_password_obfuscation=False +) as client: + await client.authenticate("username", "password") +``` diff --git a/nanokvm/client.py b/nanokvm/client.py index 0224252..87d4463 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -8,10 +8,12 @@ import io import json import logging +from pathlib import Path +import ssl from typing import Any, TypeVar, overload import aiohttp -from aiohttp import BodyPartReader, ClientResponse, ClientSession, MultipartReader, hdrs +from aiohttp import BodyPartReader, ClientResponse, ClientSession, MultipartReader, TCPConnector, hdrs from PIL import Image from pydantic import BaseModel, ValidationError import yarl @@ -106,6 +108,10 @@ class NanoKVMInvalidResponseError(NanoKVMError): """Exception for unexpected or unparsable responses.""" +class NanoKVMSSLError(NanoKVMError): + """Exception for SSL/TLS configuration errors.""" + + class NanoKVMClient: """Async API client for the NanoKVM.""" @@ -115,20 +121,74 @@ def __init__( *, token: str | None = None, request_timeout: int = 10, + verify_ssl: bool = True, + ssl_ca_cert: str | None = None, + use_password_obfuscation: bool = True, ) -> None: """ Initialize the NanoKVM client. Args: - url: Base URL of the NanoKVM API (e.g., "http://192.168.1.1/api/") + url: Base URL of the NanoKVM API (e.g., "https://kvm.local/api/") token: Optional pre-existing authentication token request_timeout: Request timeout in seconds (default: 10) + verify_ssl: Enable SSL certificate verification (default: True). + Set to False to disable verification for self-signed certificates. + ssl_ca_cert: Path to custom CA certificate bundle file for SSL verification. + Useful for self-signed certificates or private CAs. + use_password_obfuscation: Use password obfuscation/encryption (default: True). + Older NanoKVM versions require obfuscated passwords. + Newer versions (with HTTPS) accept plain text passwords. + Set to False for newer NanoKVM devices with HTTPS. """ self.url = yarl.URL(url) self._session: ClientSession | None = None self._token = token self._request_timeout = request_timeout self._ws: aiohttp.ClientWebSocketResponse | None = None + self._verify_ssl = verify_ssl + self._ssl_ca_cert = ssl_ca_cert + self._use_password_obfuscation = use_password_obfuscation + + def _create_ssl_context(self) -> ssl.SSLContext | bool: + """ + Create and configure SSL context based on initialization parameters. + + Returns: + ssl.SSLContext: Configured SSL context for custom certificates + True: Use default SSL verification (aiohttp default) + False: Disable SSL verification + + Raises: + NanoKVMSSLError: If SSL configuration is invalid + """ + + if not self._verify_ssl: + _LOGGER.warning( + "SSL verification is disabled. This is insecure and should only be " + "used for testing with self-signed certificates." + ) + return False + + if not self._ssl_ca_cert: + return True + + try: + ca_path = Path(self._ssl_ca_cert) + if not ca_path.is_file(): + raise NanoKVMSSLError( + f"CA certificate not found: {self._ssl_ca_cert}" + ) + + ssl_ctx = ssl.create_default_context(cafile=self._ssl_ca_cert) + _LOGGER.debug("Using custom CA certificate: %s", self._ssl_ca_cert) + + return ssl_ctx + + except ssl.SSLError as err: + raise NanoKVMSSLError(f"Failed to create SSL context: {err}") from err + except OSError as err: + raise NanoKVMSSLError(f"Failed to load SSL certificates: {err}") from err @property def token(self) -> str | None: @@ -137,7 +197,16 @@ def token(self) -> str | None: async def __aenter__(self) -> NanoKVMClient: """Async context manager entry.""" - self._session = ClientSession() + + ssl_config = self._create_ssl_context() + connector = TCPConnector(ssl=ssl_config) + self._session = ClientSession(connector=connector) + + _LOGGER.debug( + "Created client session with SSL verification: %s", + "disabled" if ssl_config is False else "enabled", + ) + return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: @@ -246,6 +315,14 @@ async def _api_request_json( async def authenticate(self, username: str, password: str) -> None: """Authenticate and store the session token.""" _LOGGER.debug("Attempting authentication for user: %s", username) + + if self._use_password_obfuscation: + _LOGGER.debug("Using password obfuscation") + password_to_send = obfuscate_password(password) + else: + _LOGGER.debug("Using plain text password") + password_to_send = password + try: login_response = await self._api_request_json( hdrs.METH_POST, @@ -254,7 +331,7 @@ async def authenticate(self, username: str, password: str) -> None: authenticate=False, data=LoginReq( username=username, - password=obfuscate_password(password), + password=password_to_send, ), ) diff --git a/tests/test_ssl.py b/tests/test_ssl.py new file mode 100644 index 0000000..c2f50c9 --- /dev/null +++ b/tests/test_ssl.py @@ -0,0 +1,175 @@ +"""Tests for SSL/TLS configuration.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch +import ssl + +from aiohttp import TCPConnector +import pytest +import yarl + +from nanokvm.client import NanoKVMClient, NanoKVMSSLError + + +class TestSSLConfiguration: + """Test SSL/TLS configuration scenarios.""" + + async def test_default_ssl_verification_enabled(self) -> None: + """Test that SSL verification is enabled by default.""" + client = NanoKVMClient("https://kvm.local/api/") + + ssl_config = client._create_ssl_context() + assert ssl_config is True + + async def test_ssl_verification_disabled(self) -> None: + """Test SSL verification can be disabled.""" + client = NanoKVMClient("https://kvm.local/api/", verify_ssl=False) + + ssl_config = client._create_ssl_context() + assert ssl_config is False + + async def test_custom_ca_certificate(self, tmp_path: Path) -> None: + """Test custom CA certificate configuration.""" + # Create dummy CA cert file + ca_cert_file = tmp_path / "ca.pem" + ca_cert_file.write_text("DUMMY CA CERT") + + with patch("ssl.create_default_context") as mock_ssl_context: + mock_ctx = MagicMock(spec=ssl.SSLContext) + mock_ssl_context.return_value = mock_ctx + + client = NanoKVMClient( + "https://kvm.local/api/", ssl_ca_cert=str(ca_cert_file) + ) + + ssl_config = client._create_ssl_context() + + # Verify create_default_context was called with cafile + mock_ssl_context.assert_called_once_with(cafile=str(ca_cert_file)) + assert isinstance(ssl_config, MagicMock) + + async def test_nonexistent_ca_cert_raises_error(self) -> None: + """Test that non-existent CA cert file raises error.""" + client = NanoKVMClient( + "https://kvm.local/api/", ssl_ca_cert="/path/that/does/not/exist.pem" + ) + + with pytest.raises(NanoKVMSSLError, match="CA certificate not found"): + client._create_ssl_context() + + async def test_http_url_works_regardless_of_ssl_config(self) -> None: + """Test that HTTP URL (not HTTPS) works regardless of SSL config.""" + client = NanoKVMClient("http://kvm.local/api/", verify_ssl=False) + + async with client: + assert client._session is not None + + async def test_session_created_with_tcp_connector(self) -> None: + """Test that session is created with TCPConnector.""" + client = NanoKVMClient("https://kvm.local/api/") + + async with client: + assert client._session is not None + assert client._session.connector is not None + + +class TestPasswordObfuscation: + """Test password obfuscation modes.""" + + async def test_password_obfuscation_enabled_by_default(self) -> None: + """Test that password obfuscation is enabled by default (backward compatibility).""" + client = NanoKVMClient("https://kvm.local/api/") + assert client._use_password_obfuscation is True + + async def test_password_obfuscation_can_be_disabled(self) -> None: + """Test that password obfuscation can be disabled.""" + client = NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=False + ) + assert client._use_password_obfuscation is False + + async def test_authenticate_with_plain_text_password(self) -> None: + """Test authentication with plain text password (newer NanoKVM).""" + from aioresponses import aioresponses + + async with NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=False + ) as client: + with aioresponses() as m: + # Mock login endpoint + m.post( + "https://kvm.local/api/auth/login", + payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, + ) + + await client.authenticate("root", "password123") + + # Verify the request was made with plain text password + calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["username"] == "root" + assert request_json["password"] == "password123" # Plain text! + + async def test_authenticate_with_obfuscated_password(self) -> None: + """Test authentication with obfuscated password (older NanoKVM).""" + from aioresponses import aioresponses + + async with NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=True + ) as client: + with aioresponses() as m: + # Mock login endpoint + m.post( + "https://kvm.local/api/auth/login", + payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, + ) + + await client.authenticate("root", "password123") + + # Verify the request was made with obfuscated password + calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["username"] == "root" + # Should be obfuscated (starts with "U2FsdGVkX1") + assert request_json["password"].startswith("U2FsdGVkX1") + assert request_json["password"] != "password123" # Not plain text + + +class TestSSLIntegration: + """Integration tests for SSL scenarios.""" + + async def test_full_client_lifecycle_with_ssl(self) -> None: + """Test full client lifecycle with SSL configuration.""" + from aioresponses import aioresponses + + async with NanoKVMClient( + "https://kvm.local/api/", token="test-token", verify_ssl=True + ) as client: + with aioresponses() as m: + m.get( + "https://kvm.local/api/vm/info", + payload={ + "code": 0, + "msg": "success", + "data": { + "ips": [ + { + "name": "eth0", + "addr": "192.168.1.100", + "version": "4", + "type": "ethernet", + } + ], + "mdns": "kvm.local", + "image": "v1.0.0", + "application": "v2.1.0", + "deviceKey": "abc123", + }, + }, + ) + + info = await client.get_info() + assert len(info.ips) == 1 + assert info.ips[0].addr == "192.168.1.100" From d6afc0c29a5ccf00b05128d61b32cc22909776c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Gierwia=C5=82o?= Date: Sun, 15 Feb 2026 16:54:17 +0100 Subject: [PATCH 2/3] Simplify SSL error handling and flatten test structure --- nanokvm/client.py | 28 +---- tests/test_ssl.py | 272 ++++++++++++++++++++++------------------------ 2 files changed, 135 insertions(+), 165 deletions(-) diff --git a/nanokvm/client.py b/nanokvm/client.py index 87d4463..17fcac5 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -8,7 +8,6 @@ import io import json import logging -from pathlib import Path import ssl from typing import Any, TypeVar, overload @@ -108,10 +107,6 @@ class NanoKVMInvalidResponseError(NanoKVMError): """Exception for unexpected or unparsable responses.""" -class NanoKVMSSLError(NanoKVMError): - """Exception for SSL/TLS configuration errors.""" - - class NanoKVMClient: """Async API client for the NanoKVM.""" @@ -160,7 +155,8 @@ def _create_ssl_context(self) -> ssl.SSLContext | bool: False: Disable SSL verification Raises: - NanoKVMSSLError: If SSL configuration is invalid + FileNotFoundError: If the CA certificate file is missing. + ssl.SSLError: If the CA certificate is invalid. """ if not self._verify_ssl: @@ -173,22 +169,10 @@ def _create_ssl_context(self) -> ssl.SSLContext | bool: if not self._ssl_ca_cert: return True - try: - ca_path = Path(self._ssl_ca_cert) - if not ca_path.is_file(): - raise NanoKVMSSLError( - f"CA certificate not found: {self._ssl_ca_cert}" - ) - - ssl_ctx = ssl.create_default_context(cafile=self._ssl_ca_cert) - _LOGGER.debug("Using custom CA certificate: %s", self._ssl_ca_cert) - - return ssl_ctx + ssl_ctx = ssl.create_default_context(cafile=self._ssl_ca_cert) + _LOGGER.debug("Using custom CA certificate: %s", self._ssl_ca_cert) - except ssl.SSLError as err: - raise NanoKVMSSLError(f"Failed to create SSL context: {err}") from err - except OSError as err: - raise NanoKVMSSLError(f"Failed to load SSL certificates: {err}") from err + return ssl_ctx @property def token(self) -> str | None: @@ -198,7 +182,7 @@ def token(self) -> str | None: async def __aenter__(self) -> NanoKVMClient: """Async context manager entry.""" - ssl_config = self._create_ssl_context() + ssl_config = await asyncio.to_thread(self._create_ssl_context) connector = TCPConnector(ssl=ssl_config) self._session = ClientSession(connector=connector) diff --git a/tests/test_ssl.py b/tests/test_ssl.py index c2f50c9..f7d89e3 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -1,175 +1,161 @@ -"""Tests for SSL/TLS configuration.""" +"""Tests for SSL/TLS configuration and password obfuscation.""" from pathlib import Path from unittest.mock import MagicMock, patch import ssl -from aiohttp import TCPConnector +from aioresponses import aioresponses import pytest import yarl -from nanokvm.client import NanoKVMClient, NanoKVMSSLError +from nanokvm.client import NanoKVMClient -class TestSSLConfiguration: - """Test SSL/TLS configuration scenarios.""" +async def test_default_ssl_verification_enabled() -> None: + """Test that SSL verification is enabled by default.""" + client = NanoKVMClient("https://kvm.local/api/") - async def test_default_ssl_verification_enabled(self) -> None: - """Test that SSL verification is enabled by default.""" - client = NanoKVMClient("https://kvm.local/api/") + ssl_config = client._create_ssl_context() + assert ssl_config is True - ssl_config = client._create_ssl_context() - assert ssl_config is True - async def test_ssl_verification_disabled(self) -> None: - """Test SSL verification can be disabled.""" - client = NanoKVMClient("https://kvm.local/api/", verify_ssl=False) +async def test_ssl_verification_disabled() -> None: + """Test SSL verification can be disabled.""" + client = NanoKVMClient("https://kvm.local/api/", verify_ssl=False) + + ssl_config = client._create_ssl_context() + assert ssl_config is False + + +async def test_custom_ca_certificate(tmp_path: Path) -> None: + """Test custom CA certificate configuration.""" + ca_cert_file = tmp_path / "ca.pem" + ca_cert_file.write_text("DUMMY CA CERT") + + with patch("ssl.create_default_context") as mock_ssl_context: + mock_ctx = MagicMock(spec=ssl.SSLContext) + mock_ssl_context.return_value = mock_ctx + + client = NanoKVMClient( + "https://kvm.local/api/", ssl_ca_cert=str(ca_cert_file) + ) ssl_config = client._create_ssl_context() - assert ssl_config is False - async def test_custom_ca_certificate(self, tmp_path: Path) -> None: - """Test custom CA certificate configuration.""" - # Create dummy CA cert file - ca_cert_file = tmp_path / "ca.pem" - ca_cert_file.write_text("DUMMY CA CERT") + mock_ssl_context.assert_called_once_with(cafile=str(ca_cert_file)) + assert isinstance(ssl_config, MagicMock) - with patch("ssl.create_default_context") as mock_ssl_context: - mock_ctx = MagicMock(spec=ssl.SSLContext) - mock_ssl_context.return_value = mock_ctx - client = NanoKVMClient( - "https://kvm.local/api/", ssl_ca_cert=str(ca_cert_file) - ) +async def test_nonexistent_ca_cert_raises_error() -> None: + """Test that non-existent CA cert file raises FileNotFoundError.""" + client = NanoKVMClient( + "https://kvm.local/api/", ssl_ca_cert="/path/that/does/not/exist.pem" + ) - ssl_config = client._create_ssl_context() + with pytest.raises(FileNotFoundError): + client._create_ssl_context() - # Verify create_default_context was called with cafile - mock_ssl_context.assert_called_once_with(cafile=str(ca_cert_file)) - assert isinstance(ssl_config, MagicMock) - async def test_nonexistent_ca_cert_raises_error(self) -> None: - """Test that non-existent CA cert file raises error.""" - client = NanoKVMClient( - "https://kvm.local/api/", ssl_ca_cert="/path/that/does/not/exist.pem" - ) +async def test_http_url_works_regardless_of_ssl_config() -> None: + """Test that HTTP URL (not HTTPS) works regardless of SSL config.""" + client = NanoKVMClient("http://kvm.local/api/", verify_ssl=False) - with pytest.raises(NanoKVMSSLError, match="CA certificate not found"): - client._create_ssl_context() + async with client: + assert client._session is not None - async def test_http_url_works_regardless_of_ssl_config(self) -> None: - """Test that HTTP URL (not HTTPS) works regardless of SSL config.""" - client = NanoKVMClient("http://kvm.local/api/", verify_ssl=False) - async with client: - assert client._session is not None +async def test_session_created_with_tcp_connector() -> None: + """Test that session is created with TCPConnector.""" + client = NanoKVMClient("https://kvm.local/api/") - async def test_session_created_with_tcp_connector(self) -> None: - """Test that session is created with TCPConnector.""" - client = NanoKVMClient("https://kvm.local/api/") + async with client: + assert client._session is not None + assert client._session.connector is not None - async with client: - assert client._session is not None - assert client._session.connector is not None +async def test_password_obfuscation_enabled_by_default() -> None: + """Test that password obfuscation is enabled by default.""" + client = NanoKVMClient("https://kvm.local/api/") + assert client._use_password_obfuscation is True -class TestPasswordObfuscation: - """Test password obfuscation modes.""" - async def test_password_obfuscation_enabled_by_default(self) -> None: - """Test that password obfuscation is enabled by default (backward compatibility).""" - client = NanoKVMClient("https://kvm.local/api/") - assert client._use_password_obfuscation is True +async def test_password_obfuscation_can_be_disabled() -> None: + """Test that password obfuscation can be disabled.""" + client = NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=False + ) + assert client._use_password_obfuscation is False - async def test_password_obfuscation_can_be_disabled(self) -> None: - """Test that password obfuscation can be disabled.""" - client = NanoKVMClient( - "https://kvm.local/api/", use_password_obfuscation=False - ) - assert client._use_password_obfuscation is False - - async def test_authenticate_with_plain_text_password(self) -> None: - """Test authentication with plain text password (newer NanoKVM).""" - from aioresponses import aioresponses - - async with NanoKVMClient( - "https://kvm.local/api/", use_password_obfuscation=False - ) as client: - with aioresponses() as m: - # Mock login endpoint - m.post( - "https://kvm.local/api/auth/login", - payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, - ) - - await client.authenticate("root", "password123") - - # Verify the request was made with plain text password - calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] - assert len(calls) == 1 - request_json = calls[0].kwargs.get("json") - assert request_json["username"] == "root" - assert request_json["password"] == "password123" # Plain text! - - async def test_authenticate_with_obfuscated_password(self) -> None: - """Test authentication with obfuscated password (older NanoKVM).""" - from aioresponses import aioresponses - - async with NanoKVMClient( - "https://kvm.local/api/", use_password_obfuscation=True - ) as client: - with aioresponses() as m: - # Mock login endpoint - m.post( - "https://kvm.local/api/auth/login", - payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, - ) - - await client.authenticate("root", "password123") - - # Verify the request was made with obfuscated password - calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] - assert len(calls) == 1 - request_json = calls[0].kwargs.get("json") - assert request_json["username"] == "root" - # Should be obfuscated (starts with "U2FsdGVkX1") - assert request_json["password"].startswith("U2FsdGVkX1") - assert request_json["password"] != "password123" # Not plain text - - -class TestSSLIntegration: - """Integration tests for SSL scenarios.""" - - async def test_full_client_lifecycle_with_ssl(self) -> None: - """Test full client lifecycle with SSL configuration.""" - from aioresponses import aioresponses - - async with NanoKVMClient( - "https://kvm.local/api/", token="test-token", verify_ssl=True - ) as client: - with aioresponses() as m: - m.get( - "https://kvm.local/api/vm/info", - payload={ - "code": 0, - "msg": "success", - "data": { - "ips": [ - { - "name": "eth0", - "addr": "192.168.1.100", - "version": "4", - "type": "ethernet", - } - ], - "mdns": "kvm.local", - "image": "v1.0.0", - "application": "v2.1.0", - "deviceKey": "abc123", - }, + +async def test_authenticate_with_plain_text_password() -> None: + """Test authentication with plain text password (newer NanoKVM).""" + async with NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=False + ) as client: + with aioresponses() as m: + m.post( + "https://kvm.local/api/auth/login", + payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["username"] == "root" + assert request_json["password"] == "password123" + + +async def test_authenticate_with_obfuscated_password() -> None: + """Test authentication with obfuscated password (older NanoKVM).""" + async with NanoKVMClient( + "https://kvm.local/api/", use_password_obfuscation=True + ) as client: + with aioresponses() as m: + m.post( + "https://kvm.local/api/auth/login", + payload={"code": 0, "msg": "success", "data": {"token": "abc123"}}, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[("POST", yarl.URL("https://kvm.local/api/auth/login"))] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["username"] == "root" + assert request_json["password"].startswith("U2FsdGVkX1") + assert request_json["password"] != "password123" + + +async def test_full_client_lifecycle_with_ssl() -> None: + """Test full client lifecycle with SSL configuration.""" + async with NanoKVMClient( + "https://kvm.local/api/", token="test-token", verify_ssl=True + ) as client: + with aioresponses() as m: + m.get( + "https://kvm.local/api/vm/info", + payload={ + "code": 0, + "msg": "success", + "data": { + "ips": [ + { + "name": "eth0", + "addr": "192.168.1.100", + "version": "4", + "type": "ethernet", + } + ], + "mdns": "kvm.local", + "image": "v1.0.0", + "application": "v2.1.0", + "deviceKey": "abc123", }, - ) + }, + ) - info = await client.get_info() - assert len(info.ips) == 1 - assert info.ips[0].addr == "192.168.1.100" + info = await client.get_info() + assert len(info.ips) == 1 + assert info.ips[0].addr == "192.168.1.100" From 11a1e4fa79a61bee86bdde1f58068815c42a8f8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Gierwia=C5=82o?= Date: Sun, 15 Feb 2026 17:07:29 +0100 Subject: [PATCH 3/3] Add auto-detection of password obfuscation mode with try/fallback --- README.md | 35 ++++++++-------- nanokvm/client.py | 58 ++++++++++++++++++-------- tests/test_ssl.py | 102 +++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 157 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 733bb58..f8b4efb 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,8 @@ Async Python client for [NanoKVM](https://github.com/sipeed/NanoKVM). from nanokvm.client import NanoKVMClient from nanokvm.models import GpioType, MouseButton -#NanoKVM with HTTPS (recommended) -async with NanoKVMClient( - "https://kvm.local/api/", - use_password_obfuscation=False # Newer versions use plain text -) as client: +# NanoKVM (auto-detects password mode) +async with NanoKVMClient("https://kvm.local/api/") as client: await client.authenticate("username", "password") # Get device information @@ -55,19 +52,30 @@ disk = await ssh.run_command("df -h /") await ssh.disconnect() ``` -### For Legacy NanoKVM (HTTP with password obfuscation) +### Password Obfuscation Modes + +By default, the client **auto-detects** the correct password mode. It tries obfuscated password first, and falls back to plain text if authentication fails. You can also force a specific mode: ```python -# Older NanoKVM versions with HTTP (backward compatibility) +# Auto-detect (default) — recommended +async with NanoKVMClient("https://kvm.local/api/") as client: + await client.authenticate("username", "password") + +# Force plain text (newer NanoKVM with HTTPS) +async with NanoKVMClient( + "https://kvm.local/api/", + use_password_obfuscation=False +) as client: + await client.authenticate("username", "password") + +# Force obfuscation (older NanoKVM with HTTP) async with NanoKVMClient( "http://kvm.local/api/", - use_password_obfuscation=True # Default: True for backward compatibility + use_password_obfuscation=True ) as client: await client.authenticate("username", "password") ``` -**Note:** Password obfuscation is enabled by default for backward compatibility with older NanoKVM versions. For newer HTTPS-enabled devices, set `use_password_obfuscation=False`. - ## HTTPS/SSL Configuration The client supports HTTPS connections with flexible SSL/TLS configuration options. @@ -77,10 +85,7 @@ The client supports HTTPS connections with flexible SSL/TLS configuration option For modern NanoKVM devices with HTTPS and valid certificates: ```python -async with NanoKVMClient( - "https://kvm.local/api/", - use_password_obfuscation=False -) as client: +async with NanoKVMClient("https://kvm.local/api/") as client: await client.authenticate("username", "password") ``` @@ -96,7 +101,6 @@ For self-signed certificates, you have two options: async with NanoKVMClient( "https://kvm.local/api/", verify_ssl=False, - use_password_obfuscation=False ) as client: await client.authenticate("username", "password") ``` @@ -107,7 +111,6 @@ async with NanoKVMClient( async with NanoKVMClient( "https://kvm.local/api/", ssl_ca_cert="/path/to/ca.pem", - use_password_obfuscation=False ) as client: await client.authenticate("username", "password") ``` diff --git a/nanokvm/client.py b/nanokvm/client.py index 17fcac5..2eb7475 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -12,7 +12,14 @@ from typing import Any, TypeVar, overload import aiohttp -from aiohttp import BodyPartReader, ClientResponse, ClientSession, MultipartReader, TCPConnector, hdrs +from aiohttp import ( + BodyPartReader, + ClientResponse, + ClientSession, + MultipartReader, + TCPConnector, + hdrs, +) from PIL import Image from pydantic import BaseModel, ValidationError import yarl @@ -118,7 +125,7 @@ def __init__( request_timeout: int = 10, verify_ssl: bool = True, ssl_ca_cert: str | None = None, - use_password_obfuscation: bool = True, + use_password_obfuscation: bool | None = None, ) -> None: """ Initialize the NanoKVM client. @@ -131,10 +138,10 @@ def __init__( Set to False to disable verification for self-signed certificates. ssl_ca_cert: Path to custom CA certificate bundle file for SSL verification. Useful for self-signed certificates or private CAs. - use_password_obfuscation: Use password obfuscation/encryption (default: True). - Older NanoKVM versions require obfuscated passwords. - Newer versions (with HTTPS) accept plain text passwords. - Set to False for newer NanoKVM devices with HTTPS. + use_password_obfuscation: Control password obfuscation mode (default: None). + None = auto-detect (try obfuscated first, fall back to plain text). + True = always use obfuscated passwords (older NanoKVM versions). + False = always use plain text passwords (newer HTTPS-enabled versions). """ self.url = yarl.URL(url) self._session: ClientSession | None = None @@ -296,17 +303,8 @@ async def _api_request_json( return api_response.data - async def authenticate(self, username: str, password: str) -> None: - """Authenticate and store the session token.""" - _LOGGER.debug("Attempting authentication for user: %s", username) - - if self._use_password_obfuscation: - _LOGGER.debug("Using password obfuscation") - password_to_send = obfuscate_password(password) - else: - _LOGGER.debug("Using plain text password") - password_to_send = password - + async def _do_authenticate(self, username: str, password_to_send: str) -> None: + """Perform a single authentication attempt with the given password.""" try: login_response = await self._api_request_json( hdrs.METH_POST, @@ -333,6 +331,32 @@ async def authenticate(self, username: str, password: str) -> None: else: raise + async def authenticate(self, username: str, password: str) -> None: + """Authenticate and store the session token.""" + _LOGGER.debug("Attempting authentication for user: %s", username) + + if self._use_password_obfuscation is True: + _LOGGER.debug("Using password obfuscation (forced)") + await self._do_authenticate(username, obfuscate_password(password)) + elif self._use_password_obfuscation is False: + _LOGGER.debug("Using plain text password (forced)") + await self._do_authenticate(username, password) + else: + # Auto-detect: try obfuscated first, fall back to plain text + _LOGGER.debug("Auto-detecting password mode") + try: + await self._do_authenticate( + username, obfuscate_password(password) + ) + _LOGGER.info("Auto-detected obfuscated password mode") + except NanoKVMAuthenticationFailure: + _LOGGER.debug( + "Obfuscated authentication failed, " + "trying plain text password" + ) + await self._do_authenticate(username, password) + _LOGGER.info("Auto-detected plain text password mode") + async def logout(self) -> None: """Log out and clear the session token.""" if not self._token or self._token == "disabled": diff --git a/tests/test_ssl.py b/tests/test_ssl.py index f7d89e3..ce66808 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -1,14 +1,14 @@ """Tests for SSL/TLS configuration and password obfuscation.""" from pathlib import Path -from unittest.mock import MagicMock, patch import ssl +from unittest.mock import MagicMock, patch from aioresponses import aioresponses import pytest import yarl -from nanokvm.client import NanoKVMClient +from nanokvm.client import NanoKVMAuthenticationFailure, NanoKVMClient async def test_default_ssl_verification_enabled() -> None: @@ -73,10 +73,10 @@ async def test_session_created_with_tcp_connector() -> None: assert client._session.connector is not None -async def test_password_obfuscation_enabled_by_default() -> None: - """Test that password obfuscation is enabled by default.""" +async def test_password_obfuscation_auto_by_default() -> None: + """Test that password obfuscation defaults to None (auto-detect).""" client = NanoKVMClient("https://kvm.local/api/") - assert client._use_password_obfuscation is True + assert client._use_password_obfuscation is None async def test_password_obfuscation_can_be_disabled() -> None: @@ -128,6 +128,98 @@ async def test_authenticate_with_obfuscated_password() -> None: assert request_json["password"] != "password123" +async def test_auto_detect_obfuscation_succeeds() -> None: + """Test auto-detect succeeds with obfuscated password on first attempt.""" + async with NanoKVMClient("https://kvm.local/api/") as client: + with aioresponses() as m: + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": 0, + "msg": "success", + "data": {"token": "abc123"}, + }, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[ + ("POST", yarl.URL("https://kvm.local/api/auth/login")) + ] + assert len(calls) == 1 + request_json = calls[0].kwargs.get("json") + assert request_json["password"].startswith("U2FsdGVkX1") + assert client.token == "abc123" + + +async def test_auto_detect_fallback_to_plain_text() -> None: + """Test auto-detect falls back to plain text after obfuscated fails.""" + async with NanoKVMClient("https://kvm.local/api/") as client: + with aioresponses() as m: + # First call: obfuscated fails with code -2 + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": -2, + "msg": "invalid username or password", + "data": None, + }, + ) + # Second call: plain text succeeds + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": 0, + "msg": "success", + "data": {"token": "abc123"}, + }, + ) + + await client.authenticate("root", "password123") + + calls = m.requests[ + ("POST", yarl.URL("https://kvm.local/api/auth/login")) + ] + assert len(calls) == 2 + # First attempt: obfuscated + assert calls[0].kwargs.get("json")[ + "password" + ].startswith("U2FsdGVkX1") + # Second attempt: plain text + assert ( + calls[1].kwargs.get("json")["password"] + == "password123" + ) + assert client.token == "abc123" + + +async def test_auto_detect_both_fail() -> None: + """Test auto-detect raises NanoKVMAuthenticationFailure when both fail.""" + async with NanoKVMClient("https://kvm.local/api/") as client: + with aioresponses() as m: + # First call: obfuscated fails + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": -2, + "msg": "invalid username or password", + "data": None, + }, + ) + # Second call: plain text also fails + m.post( + "https://kvm.local/api/auth/login", + payload={ + "code": -2, + "msg": "invalid username or password", + "data": None, + }, + ) + + with pytest.raises(NanoKVMAuthenticationFailure): + await client.authenticate("root", "wrong_password") + + async def test_full_client_lifecycle_with_ssl() -> None: """Test full client lifecycle with SSL configuration.""" async with NanoKVMClient(