Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -U flake8 setuptools
pip install -U openapi-core uwsgi simplejson WSocket PyJWT
pip install -U openapi-core uwsgi simplejson WSocket PyJWT pyaes
pip install -U pytest pytest-doctestplus pytest-pylint pytest-mypy requests websocket-client
pip install -U types-simplejson types-requests types-PyYAML
- name: Lint with flake8
Expand Down
5 changes: 5 additions & 0 deletions doc/ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
cookies are rejected
- Domain-separated key derivation for keystream, MAC and permutation
- Users will be logged out after upgrading from a previous version
* AESSession - stronger self-contained session cookie (requires pyaes)
- AES-256-CTR encryption with a fresh random 16-byte nonce per write
(prevents CTR nonce-reuse attacks)
- HMAC-SHA256 authentication with domain-separated keys (Encrypt-then-MAC)
- Inherits from Session; same interface as PoorSession

==== 2.7.0 ====
* Reserved Request.db attribute for usage
Expand Down
1 change: 1 addition & 0 deletions doc/_poorwsgi_api.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{% set api = api + load_module('headers') %}
{% set api = api + load_module('fieldstorage') %}
{% set api = api + load_module('session') %}
{% set api = api + load_module('aes_session') %}
{% set api = api + load_module('digest') %}
{% set api = api + load_module('results') %}
{% set api = api + load_module('state') %}
Expand Down
33 changes: 33 additions & 0 deletions doc/documentation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,39 @@ invalidates all existing cookies.
cookie.header(response)
return response

AES Session
```````````
``AESSession`` is a stronger alternative to ``PoorSession`` that uses
AES-256-CTR for confidentiality and HMAC-SHA256 for integrity. It inherits
from ``Session`` and lives in a separate module because it requires the
``pyaes`` package:

.. code:: sh

pip install pyaes

Usage is identical to ``PoorSession``:

.. code:: python

from poorwsgi.aes_session import AESSession

app.secret_key = urandom(32)

cookie = AESSession(app.secret_key)
cookie.data['user'] = username
cookie.header(response)

The cookie format is ``base64(nonce + ciphertext).base64(hmac-sha256)``.
A 16-byte random nonce is generated on every ``write()`` call to prevent
CTR nonce reuse. A missing or tampered signature causes ``SessionError``
to be raised in ``load()``.

JSON Web Tokens
```````````````



HTTP Digest Auth
~~~~~~~~~~~~~~~~

Expand Down
155 changes: 155 additions & 0 deletions examples/aes_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""AESSession example.

Demonstrates ``AESSession`` — a self-contained encrypted session cookie.
User data is stored directly in the cookie; no JWT or server-side session
store is needed because ``AESSession`` provides both confidentiality
(AES-256-CTR) and integrity (HMAC-SHA256).

Run::

pip install pyaes
python examples/aes_session.py

Then open http://127.0.0.1:8080 in your browser.
Login with any non-empty username and password ``secret``.
"""
import logging
from functools import wraps
from os import path, urandom
from sys import path as python_path

python_path.insert(
0, path.abspath(path.join(path.dirname(__file__), path.pardir)))

# pylint: disable=wrong-import-position
from poorwsgi import Application, state # noqa
from poorwsgi.response import HTTPException, RedirectResponse, redirect # noqa
from poorwsgi.aes_session import AESSession # noqa

# pylint: disable=unused-argument

logging.getLogger().setLevel("DEBUG")

app = application = Application(__name__) # pylint: disable=invalid-name
app.debug = True

SECRET = urandom(32)
PASSWORD = "secret" # nosec # noqa: S105

SESSION_CONFIG = {
"secure": False,
"same_site": "Lax",
}


# --- helpers -----------------------------------------------------------------

def get_header(title):
"""Returns HTML page header lines."""
return (
"<html>", "<head>",
'<meta http-equiv="content-type" content="text/html; charset=utf-8"/>',
f"<title>{title}</title>", "</head>", "<body>",
f"<h1>{title}</h1>")


def get_footer():
"""Returns HTML page footer lines."""
return ("<hr>",
"<small>AESSession example — PoorWSGI</small>",
"</body>", "</html>")


def check_login(fn):
"""Decorator that reads the encrypted session cookie.

Sets ``req.login`` to the stored username on success, otherwise
redirects to ``/login``.
"""
@wraps(fn)
def handler(req):
session = AESSession(SECRET)
try:
session.load(req.cookies)
except Exception: # pylint: disable=broad-except
redirect("/login", message=b"Invalid session")
username = session.data.get("user")
if not username:
redirect("/login", message=b"Login required")
req.login = username
return fn(req)
return handler


# --- routes ------------------------------------------------------------------

@app.route('/')
def root(_req):
"""Index page with navigation."""
body = ('<ul>',
'<li><a href="/login">/login</a> — log in</li>',
'<li><a href="/private">/private</a> — protected page</li>',
'<li><a href="/logout">/logout</a> — log out</li>',
'</ul>')
for line in get_header("AESSession demo") + body + get_footer():
yield line.encode() + b'\n'


@app.route('/login')
def login_form(_req):
"""GET: show login form."""
form = ('<form method="post">',
'<label>Username: '
'<input type="text" name="username"/></label><br/>',
'<label>Password: <input type="password" name="password"/>'
'</label><br/>',
'<button type="submit">Login</button>', '</form>',
'<p><em>Password is: secret</em></p>')
for line in get_header("Login") + form + get_footer():
yield line.encode() + b'\n'


@app.route('/login', method=state.METHOD_POST)
def login_post(req):
"""POST: validate credentials and store username in encrypted cookie."""
username = req.form.getfirst('username', func=str) or ''
password = req.form.getfirst('password', func=str) or ''

if not username or password != PASSWORD:
redirect('/login')

response = RedirectResponse("/private")
session = AESSession(SECRET, **SESSION_CONFIG)
session.data["user"] = username
session.write()
session.header(response)
raise HTTPException(response)


@app.route('/private')
@check_login
def private(req):
"""A protected page — only accessible after login."""
body = (f'<p>Hello, <strong>{req.login}</strong>!</p>',
'<p>Your session is encrypted with AES-256-CTR + HMAC-SHA256.</p>',
'<p><a href="/logout">Log out</a></p>')
for line in get_header("Private page") + body + get_footer():
yield line.encode() + b'\n'


@app.route('/logout')
def logout(_req):
"""Clears the session cookie."""
response = RedirectResponse("/login")
session = AESSession(SECRET)
session.destroy()
session.header(response)
raise HTTPException(response)


if __name__ == '__main__':
from wsgiref.simple_server import make_server # noqa: E402
httpd = make_server( # pylint: disable=invalid-name
'127.0.0.1', 8080, app)
logging.info("Starting to serve on http://127.0.0.1:8080")
httpd.serve_forever()
6 changes: 5 additions & 1 deletion poorwsgi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
* response: Response classes and functions for creating HTTP responses.
* results: Default result handlers for the connector, such as directory index,
server errors, or debug output handlers.
* session: A self-contained cookie-based session class.
* session: Cookie session classes — ``Session`` (plain cookie wrapper) and
``PoorSession`` (self-contained encrypted and authenticated dictionary
cookie; no external dependencies).
* aes_session: ``AESSession`` — stronger self-contained encrypted session
cookie using AES-256-CTR + HMAC-SHA256 (requires ``pyaes``).
* state: Constants like HTTP status codes and method types.
* wsgi: The Application callable class, which is the main entry point for a
PoorWSGI web application.
Expand Down
153 changes: 153 additions & 0 deletions poorwsgi/aes_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""AES-CTR encrypted self-contained session cookie.

:Classes: AESSession

Requires the ``pyaes`` package::

pip install pyaes
"""
import hmac
from base64 import urlsafe_b64decode, urlsafe_b64encode
from hashlib import sha256, sha3_256
from http.cookies import SimpleCookie
from json import dumps, loads
from logging import getLogger
from os import urandom
from typing import Any, Dict, Optional, Union

from pyaes import ( # type: ignore[import-untyped]
AESModeOfOperationCTR, Counter)

from poorwsgi.session import Session, SessionError

log = getLogger("poorwsgi")

# pylint: disable=too-many-arguments
# pylint: disable=duplicate-code

_NONCE_SIZE = 16 # bytes — full AES block, counter state is 128 bits


class AESSession(Session):
"""Self-contained session cookie encrypted with AES-CTR.

A drop-in replacement for ``PoorSession`` that uses AES-256-CTR for
confidentiality and HMAC-SHA256 for integrity. Requires the ``pyaes``
package.

The cookie format is ``base64(nonce + ciphertext).base64(hmac-sha256)``.

.. code:: python

from poorwsgi.aes_session import AESSession

sess = AESSession(app.secret_key)
sess.data['user'] = username
sess.write()
sess.header(response)
"""

def __init__( # pylint: disable=too-many-positional-arguments
self, secret_key: Union[str, bytes],
expires: int = 0,
max_age: Optional[int] = None,
domain: str = '',
path: str = '/',
secure: bool = False,
same_site: Union[str, bool] = False,
sid: str = 'SESSID'):
"""Constructor.

Arguments:
secret_key
Secret used for AES key derivation and HMAC signing.
expires
Cookie ``Expires`` time in seconds. 0 means no expiration.
max_age
Cookie ``Max-Age`` attribute.
domain
Cookie ``Domain`` attribute.
path
Cookie ``Path`` attribute.
secure
If ``True``, set the ``Secure`` cookie attribute.
same_site
The ``SameSite`` attribute value (``'Strict'``, ``'Lax'``,
``'None'``) or ``False`` to omit it.
sid
Cookie name.
"""
if not secret_key:
raise SessionError("Empty secret_key")
if isinstance(secret_key, str):
secret_key = secret_key.encode('utf-8')

super().__init__(expires=expires, max_age=max_age, domain=domain,
path=path, secure=secure, same_site=same_site,
sid=sid)

root = sha3_256(secret_key).digest()
self.__enc_key = sha3_256(root + b'enc').digest() # AES-256 key
self.__mac_key = sha3_256(root + b'mac').digest() # HMAC-SHA256 key

self.data: Dict[Any, Any] = {}

def load(self, cookies: Any) -> None:
"""Load and decrypt session from request cookies.

Raises ``SessionError`` if the cookie is missing, corrupted, or the
HMAC signature does not match.

Cookie format: ``base64(nonce + ciphertext).base64(hmac-sha256)``.
The 16-byte nonce is prepended to the ciphertext so each cookie uses
a unique CTR counter, preventing nonce-reuse attacks.
"""
if not isinstance(cookies, SimpleCookie) or self._sid not in cookies:
return
raw = cookies[self._sid].value
if not raw:
return

try:
payload, signature = raw.encode('utf-8').split(b'.')
payload = urlsafe_b64decode(payload)
signature = urlsafe_b64decode(signature)

digest = hmac.digest(self.__mac_key, payload, digest=sha256)
if not hmac.compare_digest(digest, signature):
raise RuntimeError("Invalid Signature")

if len(payload) < _NONCE_SIZE:
raise RuntimeError("Bad payload")

nonce = payload[:_NONCE_SIZE]
ciphertext = payload[_NONCE_SIZE:]
counter = Counter(initial_value=int.from_bytes(nonce, 'big'))
aes = AESModeOfOperationCTR(self.__enc_key, counter=counter)
self.data = loads(aes.decrypt(ciphertext).decode('utf-8'))
except Exception as err:
log.info(repr(err))
raise SessionError("Bad session data.") from err

if not isinstance(self.data, dict):
raise SessionError("Cookie data is not dictionary!")

def write(self) -> str:
"""Encrypt session data and store it in the cookie.

A fresh 16-byte random nonce is generated for every write so the CTR
counter is never reused, even when the session data is unchanged.
Format: ``base64(nonce + ciphertext).base64(hmac-sha256(...))``.
"""
nonce = urandom(_NONCE_SIZE)
counter = Counter(initial_value=int.from_bytes(nonce, 'big'))
aes = AESModeOfOperationCTR(self.__enc_key, counter=counter)
ciphertext = aes.encrypt(dumps(self.data))
payload = nonce + ciphertext
digest = hmac.digest(self.__mac_key, payload, digest=sha256)
raw = urlsafe_b64encode(payload) + b'.' + urlsafe_b64encode(digest)

value = raw.decode('utf-8')
self.cookie[self._sid] = value
self._apply_cookie_attrs()
return value
Loading
Loading