From b8546d2d2b80460973f99e073a1236f3165f3dd6 Mon Sep 17 00:00:00 2001 From: Ondrej Tuma Date: Sat, 29 Jun 2024 11:34:51 +0200 Subject: [PATCH] AES for encryping PoorSession --- .github/workflows/python-package.yml | 2 +- doc/ChangeLog | 5 + doc/_poorwsgi_api.html | 1 + doc/documentation.rst | 33 ++++++ examples/aes_session.py | 155 +++++++++++++++++++++++++++ poorwsgi/__init__.py | 6 +- poorwsgi/aes_session.py | 153 ++++++++++++++++++++++++++ tests/test_aes_session.py | 102 ++++++++++++++++++ tests/test_session.py | 15 +++ tests_integrity/test_aes_session.py | 91 ++++++++++++++++ 10 files changed, 561 insertions(+), 2 deletions(-) create mode 100644 examples/aes_session.py create mode 100644 poorwsgi/aes_session.py create mode 100644 tests/test_aes_session.py create mode 100644 tests_integrity/test_aes_session.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index fda7b3d..4d360f5 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -27,7 +27,7 @@ jobs: run: | python -m pip install --upgrade pip pip install -U flake8 setuptools - pip install -U openapi-core uwsgi simplejson WSocket PyJWT + pip install -U openapi-core uwsgi simplejson WSocket PyJWT pyaes pip install -U pytest pytest-doctestplus pytest-pylint pytest-mypy requests websocket-client pip install -U types-simplejson types-requests types-PyYAML - name: Lint with flake8 diff --git a/doc/ChangeLog b/doc/ChangeLog index 85a40d6..fb70312 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -15,6 +15,11 @@ cookies are rejected - Domain-separated key derivation for keystream, MAC and permutation - Users will be logged out after upgrading from a previous version + * AESSession - stronger self-contained session cookie (requires pyaes) + - AES-256-CTR encryption with a fresh random 16-byte nonce per write + (prevents CTR nonce-reuse attacks) + - HMAC-SHA256 authentication with domain-separated keys (Encrypt-then-MAC) + - Inherits from Session; same interface as PoorSession ==== 2.7.0 ==== * Reserved Request.db attribute for usage diff --git a/doc/_poorwsgi_api.html b/doc/_poorwsgi_api.html index aa5a0da..3e86fd7 100644 --- a/doc/_poorwsgi_api.html +++ b/doc/_poorwsgi_api.html @@ -7,6 +7,7 @@ {% set api = api + load_module('headers') %} {% set api = api + load_module('fieldstorage') %} {% set api = api + load_module('session') %} +{% set api = api + load_module('aes_session') %} {% set api = api + load_module('digest') %} {% set api = api + load_module('results') %} {% set api = api + load_module('state') %} diff --git a/doc/documentation.rst b/doc/documentation.rst index d455d26..76f9888 100644 --- a/doc/documentation.rst +++ b/doc/documentation.rst @@ -1178,6 +1178,39 @@ invalidates all existing cookies. cookie.header(response) return response +AES Session +``````````` +``AESSession`` is a stronger alternative to ``PoorSession`` that uses +AES-256-CTR for confidentiality and HMAC-SHA256 for integrity. It inherits +from ``Session`` and lives in a separate module because it requires the +``pyaes`` package: + +.. code:: sh + + pip install pyaes + +Usage is identical to ``PoorSession``: + +.. code:: python + + from poorwsgi.aes_session import AESSession + + app.secret_key = urandom(32) + + cookie = AESSession(app.secret_key) + cookie.data['user'] = username + cookie.header(response) + +The cookie format is ``base64(nonce + ciphertext).base64(hmac-sha256)``. +A 16-byte random nonce is generated on every ``write()`` call to prevent +CTR nonce reuse. A missing or tampered signature causes ``SessionError`` +to be raised in ``load()``. + +JSON Web Tokens +``````````````` + + + HTTP Digest Auth ~~~~~~~~~~~~~~~~ diff --git a/examples/aes_session.py b/examples/aes_session.py new file mode 100644 index 0000000..f69f577 --- /dev/null +++ b/examples/aes_session.py @@ -0,0 +1,155 @@ +"""AESSession example. + +Demonstrates ``AESSession`` — a self-contained encrypted session cookie. +User data is stored directly in the cookie; no JWT or server-side session +store is needed because ``AESSession`` provides both confidentiality +(AES-256-CTR) and integrity (HMAC-SHA256). + +Run:: + + pip install pyaes + python examples/aes_session.py + +Then open http://127.0.0.1:8080 in your browser. +Login with any non-empty username and password ``secret``. +""" +import logging +from functools import wraps +from os import path, urandom +from sys import path as python_path + +python_path.insert( + 0, path.abspath(path.join(path.dirname(__file__), path.pardir))) + +# pylint: disable=wrong-import-position +from poorwsgi import Application, state # noqa +from poorwsgi.response import HTTPException, RedirectResponse, redirect # noqa +from poorwsgi.aes_session import AESSession # noqa + +# pylint: disable=unused-argument + +logging.getLogger().setLevel("DEBUG") + +app = application = Application(__name__) # pylint: disable=invalid-name +app.debug = True + +SECRET = urandom(32) +PASSWORD = "secret" # nosec # noqa: S105 + +SESSION_CONFIG = { + "secure": False, + "same_site": "Lax", +} + + +# --- helpers ----------------------------------------------------------------- + +def get_header(title): + """Returns HTML page header lines.""" + return ( + "", "", + '', + f"{title}", "", "", + f"

{title}

") + + +def get_footer(): + """Returns HTML page footer lines.""" + return ("
", + "AESSession example — PoorWSGI", + "", "") + + +def check_login(fn): + """Decorator that reads the encrypted session cookie. + + Sets ``req.login`` to the stored username on success, otherwise + redirects to ``/login``. + """ + @wraps(fn) + def handler(req): + session = AESSession(SECRET) + try: + session.load(req.cookies) + except Exception: # pylint: disable=broad-except + redirect("/login", message=b"Invalid session") + username = session.data.get("user") + if not username: + redirect("/login", message=b"Login required") + req.login = username + return fn(req) + return handler + + +# --- routes ------------------------------------------------------------------ + +@app.route('/') +def root(_req): + """Index page with navigation.""" + body = ('') + for line in get_header("AESSession demo") + body + get_footer(): + yield line.encode() + b'\n' + + +@app.route('/login') +def login_form(_req): + """GET: show login form.""" + form = ('
', + '
', + '
', + '', '
', + '

Password is: secret

') + for line in get_header("Login") + form + get_footer(): + yield line.encode() + b'\n' + + +@app.route('/login', method=state.METHOD_POST) +def login_post(req): + """POST: validate credentials and store username in encrypted cookie.""" + username = req.form.getfirst('username', func=str) or '' + password = req.form.getfirst('password', func=str) or '' + + if not username or password != PASSWORD: + redirect('/login') + + response = RedirectResponse("/private") + session = AESSession(SECRET, **SESSION_CONFIG) + session.data["user"] = username + session.write() + session.header(response) + raise HTTPException(response) + + +@app.route('/private') +@check_login +def private(req): + """A protected page — only accessible after login.""" + body = (f'

Hello, {req.login}!

', + '

Your session is encrypted with AES-256-CTR + HMAC-SHA256.

', + '

Log out

') + for line in get_header("Private page") + body + get_footer(): + yield line.encode() + b'\n' + + +@app.route('/logout') +def logout(_req): + """Clears the session cookie.""" + response = RedirectResponse("/login") + session = AESSession(SECRET) + session.destroy() + session.header(response) + raise HTTPException(response) + + +if __name__ == '__main__': + from wsgiref.simple_server import make_server # noqa: E402 + httpd = make_server( # pylint: disable=invalid-name + '127.0.0.1', 8080, app) + logging.info("Starting to serve on http://127.0.0.1:8080") + httpd.serve_forever() diff --git a/poorwsgi/__init__.py b/poorwsgi/__init__.py index 105b456..45997dc 100644 --- a/poorwsgi/__init__.py +++ b/poorwsgi/__init__.py @@ -9,7 +9,11 @@ * response: Response classes and functions for creating HTTP responses. * results: Default result handlers for the connector, such as directory index, server errors, or debug output handlers. -* session: A self-contained cookie-based session class. +* session: Cookie session classes — ``Session`` (plain cookie wrapper) and + ``PoorSession`` (self-contained encrypted and authenticated dictionary + cookie; no external dependencies). +* aes_session: ``AESSession`` — stronger self-contained encrypted session + cookie using AES-256-CTR + HMAC-SHA256 (requires ``pyaes``). * state: Constants like HTTP status codes and method types. * wsgi: The Application callable class, which is the main entry point for a PoorWSGI web application. diff --git a/poorwsgi/aes_session.py b/poorwsgi/aes_session.py new file mode 100644 index 0000000..411fa74 --- /dev/null +++ b/poorwsgi/aes_session.py @@ -0,0 +1,153 @@ +"""AES-CTR encrypted self-contained session cookie. + +:Classes: AESSession + +Requires the ``pyaes`` package:: + + pip install pyaes +""" +import hmac +from base64 import urlsafe_b64decode, urlsafe_b64encode +from hashlib import sha256, sha3_256 +from http.cookies import SimpleCookie +from json import dumps, loads +from logging import getLogger +from os import urandom +from typing import Any, Dict, Optional, Union + +from pyaes import ( # type: ignore[import-untyped] + AESModeOfOperationCTR, Counter) + +from poorwsgi.session import Session, SessionError + +log = getLogger("poorwsgi") + +# pylint: disable=too-many-arguments +# pylint: disable=duplicate-code + +_NONCE_SIZE = 16 # bytes — full AES block, counter state is 128 bits + + +class AESSession(Session): + """Self-contained session cookie encrypted with AES-CTR. + + A drop-in replacement for ``PoorSession`` that uses AES-256-CTR for + confidentiality and HMAC-SHA256 for integrity. Requires the ``pyaes`` + package. + + The cookie format is ``base64(nonce + ciphertext).base64(hmac-sha256)``. + + .. code:: python + + from poorwsgi.aes_session import AESSession + + sess = AESSession(app.secret_key) + sess.data['user'] = username + sess.write() + sess.header(response) + """ + + def __init__( # pylint: disable=too-many-positional-arguments + self, secret_key: Union[str, bytes], + expires: int = 0, + max_age: Optional[int] = None, + domain: str = '', + path: str = '/', + secure: bool = False, + same_site: Union[str, bool] = False, + sid: str = 'SESSID'): + """Constructor. + + Arguments: + secret_key + Secret used for AES key derivation and HMAC signing. + expires + Cookie ``Expires`` time in seconds. 0 means no expiration. + max_age + Cookie ``Max-Age`` attribute. + domain + Cookie ``Domain`` attribute. + path + Cookie ``Path`` attribute. + secure + If ``True``, set the ``Secure`` cookie attribute. + same_site + The ``SameSite`` attribute value (``'Strict'``, ``'Lax'``, + ``'None'``) or ``False`` to omit it. + sid + Cookie name. + """ + if not secret_key: + raise SessionError("Empty secret_key") + if isinstance(secret_key, str): + secret_key = secret_key.encode('utf-8') + + super().__init__(expires=expires, max_age=max_age, domain=domain, + path=path, secure=secure, same_site=same_site, + sid=sid) + + root = sha3_256(secret_key).digest() + self.__enc_key = sha3_256(root + b'enc').digest() # AES-256 key + self.__mac_key = sha3_256(root + b'mac').digest() # HMAC-SHA256 key + + self.data: Dict[Any, Any] = {} + + def load(self, cookies: Any) -> None: + """Load and decrypt session from request cookies. + + Raises ``SessionError`` if the cookie is missing, corrupted, or the + HMAC signature does not match. + + Cookie format: ``base64(nonce + ciphertext).base64(hmac-sha256)``. + The 16-byte nonce is prepended to the ciphertext so each cookie uses + a unique CTR counter, preventing nonce-reuse attacks. + """ + if not isinstance(cookies, SimpleCookie) or self._sid not in cookies: + return + raw = cookies[self._sid].value + if not raw: + return + + try: + payload, signature = raw.encode('utf-8').split(b'.') + payload = urlsafe_b64decode(payload) + signature = urlsafe_b64decode(signature) + + digest = hmac.digest(self.__mac_key, payload, digest=sha256) + if not hmac.compare_digest(digest, signature): + raise RuntimeError("Invalid Signature") + + if len(payload) < _NONCE_SIZE: + raise RuntimeError("Bad payload") + + nonce = payload[:_NONCE_SIZE] + ciphertext = payload[_NONCE_SIZE:] + counter = Counter(initial_value=int.from_bytes(nonce, 'big')) + aes = AESModeOfOperationCTR(self.__enc_key, counter=counter) + self.data = loads(aes.decrypt(ciphertext).decode('utf-8')) + except Exception as err: + log.info(repr(err)) + raise SessionError("Bad session data.") from err + + if not isinstance(self.data, dict): + raise SessionError("Cookie data is not dictionary!") + + def write(self) -> str: + """Encrypt session data and store it in the cookie. + + A fresh 16-byte random nonce is generated for every write so the CTR + counter is never reused, even when the session data is unchanged. + Format: ``base64(nonce + ciphertext).base64(hmac-sha256(...))``. + """ + nonce = urandom(_NONCE_SIZE) + counter = Counter(initial_value=int.from_bytes(nonce, 'big')) + aes = AESModeOfOperationCTR(self.__enc_key, counter=counter) + ciphertext = aes.encrypt(dumps(self.data)) + payload = nonce + ciphertext + digest = hmac.digest(self.__mac_key, payload, digest=sha256) + raw = urlsafe_b64encode(payload) + b'.' + urlsafe_b64encode(digest) + + value = raw.decode('utf-8') + self.cookie[self._sid] = value + self._apply_cookie_attrs() + return value diff --git a/tests/test_aes_session.py b/tests/test_aes_session.py new file mode 100644 index 0000000..75851df --- /dev/null +++ b/tests/test_aes_session.py @@ -0,0 +1,102 @@ +"""Unit tests for AESSession class.""" +from os import urandom +from http.cookies import SimpleCookie + +from pytest import fixture, raises + +from poorwsgi.aes_session import AESSession +from poorwsgi.session import Session, SessionError + +SECRET_KEY = urandom(32) + +# pylint: disable=redefined-outer-name +# pylint: disable=no-self-use +# pylint: disable=missing-function-docstring +# pylint: disable=too-few-public-methods + + +@fixture +def session_with_data(): + """AESSession instance with data written into cookie.""" + session = AESSession(SECRET_KEY) + session.data['test'] = True + session.write() + return session + + +def test_aes_session_is_subclass_of_session(): + """AESSession must inherit from Session.""" + assert issubclass(AESSession, Session) + assert isinstance(AESSession(SECRET_KEY), Session) + + +class TestErrors: + """Test exceptions.""" + + def test_empty_secret_key(self): + with raises(SessionError): + AESSession(b"") + + def test_empty_string_secret_key(self): + with raises(SessionError): + AESSession("") + + def test_bad_session_data(self): + cookies = SimpleCookie() + cookies["SESSID"] = "notvalidbase64!!!" + session = AESSession(SECRET_KEY) + with raises(SessionError): + session.load(cookies) + + def test_tampered_signature(self): + """Cookie with valid structure but wrong HMAC must be rejected.""" + session = AESSession(SECRET_KEY) + session.data['x'] = 1 + session.write() + raw = session.cookie["SESSID"].value + # flip last char of signature part + parts = raw.rsplit('.', 1) + last = 'A' if parts[1][-1] != 'A' else 'B' + tampered = parts[0] + '.' + parts[1][:-1] + last + cookies = SimpleCookie() + cookies["SESSID"] = tampered + new_session = AESSession(SECRET_KEY) + with raises(SessionError): + new_session.load(cookies) + + +class TestLoadWrite: + """Tests of load and write methods.""" + + def test_write_returns_str(self): + session = AESSession(SECRET_KEY) + result = session.write() + assert isinstance(result, str) + + def test_write_load_roundtrip(self, session_with_data): + new_session = AESSession(SECRET_KEY) + new_session.load(session_with_data.cookie) + assert new_session.data == {'test': True} + + def test_nonce_uniqueness(self): + """Two write() calls for same data must produce different cookies.""" + session = AESSession(SECRET_KEY) + session.data = {'user': 'alice'} + raw1 = session.write() + raw2 = session.write() + assert raw1 != raw2 + + def test_write_load_complex_data(self): + session = AESSession(SECRET_KEY) + session.data = {'user': 'alice', 'role': 'admin', 'count': 42} + session.write() + new_session = AESSession(SECRET_KEY) + new_session.load(session.cookie) + expected = {'user': 'alice', 'role': 'admin', 'count': 42} + assert new_session.data == expected + + def test_wrong_key_raises(self, session_with_data): + """Session encrypted with one key cannot be read with another.""" + new_session = AESSession(urandom(32)) + with raises(SessionError): + new_session.load(session_with_data.cookie) diff --git a/tests/test_session.py b/tests/test_session.py index 59049f4..a9c44df 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -106,6 +106,21 @@ def test_https(self): headers = session.header() assert "; Secure" in headers[0][1] + def test_load_empty_cookie(self): + """Tests that load() with an empty SimpleCookie leaves data as {}.""" + session = PoorSession(SECRET_KEY) + session.load(SimpleCookie()) + assert session.data == {} + + def test_load_missing_sid(self): + """Tests that load() with a cookie that has no matching SID leaves + data as {}.""" + cookies = SimpleCookie() + cookies["OTHER"] = "value" + session = PoorSession(SECRET_KEY) + session.load(cookies) + assert session.data == {} + @mark.skipif(version_info.minor < 8, reason="SameSite is supported from Python 3.8") diff --git a/tests_integrity/test_aes_session.py b/tests_integrity/test_aes_session.py new file mode 100644 index 0000000..a629da7 --- /dev/null +++ b/tests_integrity/test_aes_session.py @@ -0,0 +1,91 @@ +"""AESSession example integrity tests.""" +from os import environ +from os.path import dirname, join, pardir + +from pytest import fixture +from requests import Session + +from .support import check_url, start_server + +# pylint: disable=inconsistent-return-statements +# pylint: disable=missing-function-docstring +# pylint: disable=no-self-use +# pylint: disable=redefined-outer-name + + +@fixture(scope="module") +def url(request): + """The URL (server fixture).""" + val = environ.get("TEST_AES_SESSION_URL", "").strip('/') + if val: + return val + + process = start_server( + request, + join(dirname(__file__), pardir, 'examples/aes_session.py')) + + yield "http://localhost:8080" # server is running + process.kill() + process.wait() + + +@fixture +def logged_in(url): + """Fixture that logs in with valid credentials and returns the session.""" + session = Session() + res = check_url(url + "/login", method="POST", session=session, + allow_redirects=False, status_code=302, + data={"username": "alice", "password": "secret"}) + assert "SESSID" in session.cookies + cookie = res.headers["Set-Cookie"] + assert "; HttpOnly" in cookie + return session + + +class TestAESSession: + """Tests for the AESSession example.""" + + def test_root(self, url): + check_url(url + "/") + + def test_login_get(self, url): + check_url(url + "/login") + + def test_private_without_login_redirects(self, url): + check_url(url + "/private", allow_redirects=False, status_code=302) + + def test_login_bad_password(self, url): + check_url(url + "/login", method="POST", status_code=200, + data={"username": "alice", "password": "wrong"}) + + def test_login_empty_username(self, url): + check_url(url + "/login", method="POST", status_code=200, + data={"username": "", "password": "secret"}) + + def test_login_sets_encrypted_cookie(self, url): + session = Session() + res = check_url(url + "/login", method="POST", session=session, + allow_redirects=False, status_code=302, + data={"username": "bob", "password": "secret"}) + assert "SESSID" in session.cookies + cookie = res.headers["Set-Cookie"] + assert "; HttpOnly" in cookie + # Cookie value must not contain the username in plaintext + assert "bob" not in session.cookies["SESSID"] + + def test_private_after_login(self, url, logged_in): + res = check_url(url + "/private", session=logged_in) + assert b"alice" in res.content + + def test_logout_clears_cookie(self, url, logged_in): + res = check_url(url + "/logout", session=logged_in, + allow_redirects=False, status_code=302) + assert "SESSID" not in logged_in.cookies + cookie = res.headers["Set-Cookie"] + assert "; HttpOnly" in cookie + + def test_private_after_logout_redirects(self, url, logged_in): + check_url(url + "/logout", session=logged_in, + allow_redirects=False, status_code=302) + check_url(url + "/private", session=logged_in, + allow_redirects=False, status_code=302)