diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9b4145b..fda7b3d 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -27,7 +27,7 @@ jobs: run: | python -m pip install --upgrade pip pip install -U flake8 setuptools - pip install -U openapi-core uwsgi simplejson WSocket + pip install -U openapi-core uwsgi simplejson WSocket PyJWT pip install -U pytest pytest-doctestplus pytest-pylint pytest-mypy requests websocket-client pip install -U types-simplejson types-requests types-PyYAML - name: Lint with flake8 diff --git a/doc/ChangeLog b/doc/ChangeLog index 903d2cb..85a40d6 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -6,6 +6,15 @@ * Updated openapi3.py example to openapi-core 0.23+ API (Spec replaced by OpenAPI) * Drop Python 3.9 and 3.10 support, require Python >= 3.11 + * Session base class - plain cookie wrapper for server-side session IDs or + JWTs; PoorSession inherits from Session + * PoorSession cookie encryption - ! existing cookies are invalidated ! + - Self-contained encrypted session cookie using shake_256 XOF keystream + (1024 B) and byte-substitution (no external dependencies) + - HMAC-SHA256 authentication (Encrypt-then-MAC); tampered or forged + cookies are rejected + - Domain-separated key derivation for keystream, MAC and permutation + - Users will be logged out after upgrading from a previous version ==== 2.7.0 ==== * Reserved Request.db attribute for usage diff --git a/doc/documentation.rst b/doc/documentation.rst index 7aae763..d455d26 100644 --- a/doc/documentation.rst +++ b/doc/documentation.rst @@ -1050,11 +1050,77 @@ multiple times. Sessions ~~~~~~~~ -Like mod_python, PoorSession is the session class of PoorWSGI. It's a -self-contained cookie with a data dictionary. Data are sent to the client -in a hidden, bzip2-compressed, base64-encoded format. PoorSession needs a ``secret_key``, -which can be set by the ``poor_SecretKey`` environment variable to the -Application.secret_key property. +PoorWSGI provides a ``Session`` base class and ``PoorSession`` which extends +it. Both share the same interface. + +Session +``````` +``Session`` is a thin wrapper around ``http.cookies.SimpleCookie``. It is +suitable when the cookie value is either a **server-side session ID** (the +server holds the real data) or a **JWT** (which provides its own signature). +No encryption is applied — the value is stored as-is in the cookie. + +.. code:: python + + from poorwsgi.session import Session + + # Store a session-id issued by the server + session = Session(sid="SESSID", secure=True, same_site="Lax") + session.data = generate_session_id() # any string value + session.write() + session.header(response) + + # Read back + session = Session() + session.load(req.cookies) + server_data = server_store[session.data] + +The ``Session`` class accepts the following keyword arguments: +``sid``, ``expires``, ``max_age``, ``domain``, ``path``, ``secure``, +``same_site``. It exposes ``load()``, ``write()``, ``destroy()``, and +``header()`` methods. + +.. note:: + + ``Session`` does **not** encrypt or sign the cookie value. If you store a + predictable token, an attacker can forge it. Use a cryptographically random + session ID or a properly signed JWT as the value. + +PoorSession +``````````` +``PoorSession`` extends ``Session`` and stores data as an **encrypted and +authenticated** dictionary directly in the cookie. PoorSession needs a +``secret_key``, which can be set via the ``poor_SecretKey`` environment +variable or the ``Application.secret_key`` property. + +**Security model** (XOR + substitution variant, no external dependencies): + +* **Integrity** — The cookie is signed with HMAC-SHA256. Any modification + by the client is detected and the cookie is rejected. An attacker cannot + forge a valid cookie without knowing the secret key. + +* **Confidentiality** — The data are protected by a XOR stream cipher with a + 1024-byte keystream (derived via ``shake_256``) combined with a + byte-substitution step. This is a custom construction, **not AES**. + A passive attacker who can collect a large number of cookies from different + users (roughly 512 or more) may be able to reconstruct the keystream via + a known-plaintext attack (JSON data always starts with ``{"``), and + subsequently read the contents of other cookies. **Do not store highly + sensitive data** (passwords, private keys, …) in the cookie. Store only + session identifiers, user IDs, or non-critical flags. + +* **Upgrade notice** — Updating PoorWSGI to a version that changes the + encryption scheme (e.g. changes ``KEYSTREAM_SIZE``, switches from SHA-3 + to shake, or adds HMAC) will **invalidate all existing cookies**. Users + will be logged out after a server restart / upgrade. This is expected + behaviour. + +* **Cookie format**: ``base64(ciphertext).base64(hmac-sha256)`` + +The ``KEYSTREAM_SIZE`` constant in ``poorwsgi.session`` controls the keystream +length (default ``1024``). Increasing it makes known-plaintext attacks harder +at the cost of slightly larger memory usage per session instance. Changing it +invalidates all existing cookies. .. code:: python @@ -1074,7 +1140,7 @@ Application.secret_key property. @wraps(fn) # using wraps make right/better /debug-info page def handler(req): cookie = PoorSession(app.secret_key) - cookie.load() + cookie.load(req.cookies) if "passwd" not in cookie.data: # expires or didn't set log.info("Login cookie not found.") redirect("/login", message=b"Login required") diff --git a/examples/session.py b/examples/session.py new file mode 100644 index 0000000..73069cf --- /dev/null +++ b/examples/session.py @@ -0,0 +1,161 @@ +"""Session + JWT example. + +Demonstrates ``Session`` (plain cookie) combined with PyJWT for stateless +authentication. The server issues a signed JWT on login and stores it in a +``Session`` cookie. On every protected request the JWT is read from the +cookie and verified — no server-side state is required. + +Run:: + + pip install PyJWT + python examples/session.py + +Then open http://127.0.0.1:8080 in your browser. +Login with any non-empty username and password ``secret``. +""" +import logging +from functools import wraps +from os import path, urandom +from sys import path as python_path +from time import time + +import jwt # PyJWT + +python_path.insert( + 0, path.abspath(path.join(path.dirname(__file__), path.pardir))) + +# pylint: disable=wrong-import-position +from poorwsgi import Application, state # noqa +from poorwsgi.response import HTTPException, RedirectResponse, redirect # noqa +from poorwsgi.session import Session # noqa + +# pylint: disable=unused-argument + +logging.getLogger().setLevel("DEBUG") + +app = application = Application(__name__) # pylint: disable=invalid-name +app.debug = True + +# Secret used both for the JWT signature and (optionally) session protection. +SECRET = urandom(32) +PASSWORD = "secret" # nosec # noqa: S105 +JWT_EXPIRY = 3600 # seconds + + +# --- helpers ----------------------------------------------------------------- + +def get_header(title): + """Returns HTML page header lines.""" + return ( + "", "
", + '', + f"Password is: secret
') + for line in get_header("Login") + form + get_footer(): + yield line.encode() + b'\n' + + +@app.route('/login', method=state.METHOD_POST) +def login_post(req): + """POST: validate credentials and issue JWT.""" + username = req.form.getfirst('username', func=str) or '' + password = req.form.getfirst('password', func=str) or '' + + if not username or password != PASSWORD: + redirect('/login') + + token = jwt.encode( + {"sub": username, "exp": int(time()) + JWT_EXPIRY}, + SECRET, algorithm="HS256") + + response = RedirectResponse("/private") + session = Session(secure=False, same_site="Lax") + session.data = token + session.write() + session.header(response) + raise HTTPException(response) + + +@app.route('/private') +@check_login +def private(req): + """A protected page — only accessible after login.""" + user = req.login.get("sub", "?") + exp = req.login.get("exp", 0) + body = (f'Hello, {user}!
', + f'Your token expires at Unix time {exp}.
', + '') + for line in get_header("Private page") + body + get_footer(): + yield line.encode() + b'\n' + + +@app.route('/logout') +def logout(_req): + """Clears the session cookie.""" + response = RedirectResponse("/login") + session = Session() + session.destroy() + session.header(response) + raise HTTPException(response) + + +if __name__ == '__main__': + from wsgiref.simple_server import make_server # noqa: E402 + httpd = make_server( # pylint: disable=invalid-name + '127.0.0.1', 8080, app) + logging.info("Starting to serve on http://127.0.0.1:8080") + httpd.serve_forever() diff --git a/poorwsgi/session.py b/poorwsgi/session.py index 0db26cd..1e274b8 100644 --- a/poorwsgi/session.py +++ b/poorwsgi/session.py @@ -1,59 +1,91 @@ -"""PoorSession self-contained cookie class. +"""Session cookie classes. -:Classes: NoCompress, PoorSession -:Functions: hidden, get_token, check_token +:Classes: NoCompress, Session, PoorSession +:Functions: hidden, encrypt, decrypt, get_token, check_token + +:class:`Session` is a plain cookie wrapper suitable for storing a server-side +session ID or a JWT. No encryption is applied; the value is stored verbatim. + +:class:`PoorSession` is a self-contained encrypted session cookie. + +Cookie format for PoorSession: ``base64(ciphertext).base64(hmac-sha256)`` + +Security note: PoorSession uses a custom XOR + byte-substitution cipher with +HMAC-SHA256 authentication. The keystream is derived deterministically from +the secret key (no per-message nonce), which makes it vulnerable to +known-plaintext attacks given enough collected cookies. It is suitable as a +"no external dependencies" baseline. For stronger confidentiality use the +``cryptography`` package variant. """ -from hashlib import sha512, sha256 -from json import dumps, loads +import bz2 +import hmac from base64 import b64decode, b64encode +from hashlib import sha3_256, shake_256 +from json import dumps, loads from logging import getLogger +from random import Random from time import time -from typing import Union, Dict, Any, Optional - -import bz2 +from typing import Any, Dict, Optional, Union from http.cookies import SimpleCookie from poorwsgi.headers import Headers -from poorwsgi.request import Request from poorwsgi.response import Response +# Length of the XOR keystream derived from the secret key. Longer values +# reduce the risk of known-plaintext reconstruction: an attacker needs roughly +# KEYSTREAM_SIZE / (known bytes per cookie) cookies to reconstruct the stream. +# Must be a positive integer; changing it invalidates all existing cookies. +KEYSTREAM_SIZE = 1024 + log = getLogger("poorwsgi") # pylint: disable=invalid-name # pylint: disable=unsubscriptable-object # pylint: disable=consider-using-f-string -def hidden(text: Union[str, bytes], passwd: Union[str, bytes]) -> bytes: +def hidden(text: Union[str, bytes], secret_hash: bytes) -> bytes: """(En|de)crypts text with a SHA hash of the password via XOR. text Raw data to (en|de)crypt. - passwd - The password. + secret_hash + Binary digest of the secret key. """ - if isinstance(passwd, bytes): - passwd = sha512(passwd).digest() - else: - passwd = sha512(passwd.encode("utf-8")).digest() - passlen = len(passwd) + secret_len = len(secret_hash) # text must be bytes if isinstance(text, str): text = text.encode("utf-8") - if isinstance(text, str): # if text is str - retval = '' - for i, val in enumerate(text): - retval += chr(ord(val) ^ ord(passwd[i % passlen])) - else: # if text is bytes - retval = bytearray() - for i, val in enumerate(text): - retval.append(val ^ passwd[i % passlen]) + retval = bytearray() + for i, val in enumerate(text): + retval.append(val ^ secret_hash[i % secret_len]) return retval +def encrypt(data: bytes, table: bytearray) -> bytes: + """Encrypt data by replacing bytes value. + + >>> encrypt(b'Hello', bytearray(range(255, -1, -1))) + b'\xb7\x9a\x93\x93\x90' + """ + return bytes(table[byte] for byte in data) + + +def decrypt(data: bytes, table: bytearray) -> bytes: + """Decrypt data by replacing bytes value. + + >>> decrypt(b'\\xb7\\x9a\\x93\\x93\\x90', bytearray(range(255, -1, -1))) + b'Hello' + """ + reverse = {} + for key, val in enumerate(table): + reverse[val] = key + return bytes(reverse[byte] for byte in data) + + def get_token(secret: str, client: str, timeout: Optional[int] = None, expired: int = 0): """Creates a token from a secret and client string. @@ -69,7 +101,7 @@ def get_token(secret: str, client: str, timeout: Optional[int] = None, expired = now + 2 * timeout text = "%s%s%s" % (secret, expired, client) - return sha256(text.encode()).hexdigest() + return sha3_256(text.encode()).hexdigest() def check_token(token: str, secret: str, client: str, @@ -114,7 +146,147 @@ def decompress(data): return data -class PoorSession: +class Session: + """Simple cookie session — stores a single raw string value. + + Suitable for holding a server-side session ID or a JWT. No encryption + or integrity protection is applied; the value is stored in the cookie + verbatim. When storing a JWT, integrity and authenticity are provided by + the JWT itself. When storing a server-side session ID, security comes + from the server-side store. + + The cookie is always set with ``HttpOnly=True``. Use ``secure=True`` + when serving over HTTPS. + + This class is also the base class for :class:`PoorSession`. + + .. code:: python + + session = Session(secure=True) + session.load(req.cookies) + + if not session.data: # no active session + session.data = create_server_session(req) + + resp = Response(...) + session.header(resp) + + """ + + def __init__(self, expires: int = 0, max_age: Optional[int] = None, + domain: str = '', path: str = '/', secure: bool = False, + same_site: Union[str, bool] = False, sid: str = 'SESSID'): + """Constructor. + + Arguments: + expires + Cookie ``Expires`` time in seconds. If it is 0, no expiration + is set. + max_age + Cookie ``Max-Age`` attribute. If both expires and max-age are + set, max_age has precedence. + domain + The cookie ``Host`` to which the cookie will be sent. + path + The cookie ``Path`` that must exist in the requested URL. + secure + If the ``Secure`` cookie attribute will be sent. + same_site + The ``SameSite`` attribute. When set, it can be one of + ``Strict|Lax|None``. By default, the attribute is not + set, which browsers default to ``Lax``. + sid + The cookie key name. + """ + self._sid = sid + self.__expires = expires + self.__max_age = max_age + self.__domain = domain + self.__path = path + self.__secure = secure + self.__same_site = same_site + + self.data: Any = "" + self.cookie: SimpleCookie = SimpleCookie() + self.cookie[sid] = '' + + def _apply_cookie_attrs(self): + """Apply security and configuration attributes to the session cookie. + + Called by :meth:`write` and subclass overrides of :meth:`write`. + Sets ``HttpOnly``, ``Domain``, ``Path``, ``Secure``, ``SameSite``, + ``Expires``, and ``Max-Age`` as configured. + """ + self.cookie[self._sid]['HttpOnly'] = True + if self.__domain: + self.cookie[self._sid]['Domain'] = self.__domain + if self.__path: + self.cookie[self._sid]['path'] = self.__path + if self.__secure: + self.cookie[self._sid]['Secure'] = True + if self.__same_site: + self.cookie[self._sid]['SameSite'] = self.__same_site + if self.__expires: + self.cookie[self._sid]['expires'] = self.__expires + if self.__max_age is not None: + self.cookie[self._sid]['Max-Age'] = self.__max_age + + def load(self, cookies: Optional[SimpleCookie]): + """Load the session value from the request's cookies. + + Sets :attr:`data` to the raw cookie string, or leaves it as ``""`` + if the cookie is absent or empty. + """ + if not isinstance(cookies, SimpleCookie) or self._sid not in cookies: + return + self.data = cookies[self._sid].value + + def write(self) -> str: + """Store :attr:`data` to the cookie value. + + This method is called automatically by :meth:`header`. + Returns the raw string written to the cookie. + """ + raw = self.data if isinstance(self.data, str) else str(self.data) + self.cookie[self._sid] = raw + self._apply_cookie_attrs() + return raw + + def destroy(self): + """Destroy the session by setting the cookie's expires to the past + (-1). + + Ensures that data cannot be changed: + https://stackoverflow.com/a/5285982/8379994 + """ + self.cookie[self._sid]['expires'] = -1 + if self.__max_age is not None: + self.cookie[self._sid]['Max-Age'] = -1 + self.cookie[self._sid]['HttpOnly'] = True + if self.__secure: + self.cookie[self._sid]['Secure'] = True + + def header(self, headers: Optional[Union[Headers, Response]] = None): + """Generate cookie headers and optionally append them to headers. + + Returns a list of ``(name, value)`` cookie header pairs. + + headers + The object used to write the header directly. + """ + self.write() + cookies = self.cookie.output().split('\r\n') + retval = [] + for cookie in cookies: + var = cookie[:10] # Set-Cookie + val = cookie[12:] # SID=###; expires=###; Path=/ + retval.append((var, val)) + if headers: + headers.add_header(var, val) + return retval + + +class PoorSession(Session): """A self-contained cookie with session data. You can store or read data from the object via the PoorSession.data @@ -157,13 +329,16 @@ def to_dict(self): obj.from_dict(sess.data['dict']) """ - def __init__(self, secret_key: Union[Request, str, bytes], + def __init__(self, secret_key: Union[str, bytes], expires: int = 0, max_age: Optional[int] = None, domain: str = '', path: str = '/', secure: bool = False, - same_site: bool = False, compress=bz2, sid: str = 'SESSID'): + same_site: Union[str, bool] = False, compress=bz2, + sid: str = 'SESSID'): """Constructor. Arguments: + secret_key + The application secret key used for encryption and signing. expires Cookie ``Expires`` time in seconds. If it is 0, no expiration is set. @@ -195,7 +370,7 @@ def __init__(self, secret_key: Union[Request, str, bytes], 'domain': 'example.net', 'path': '/application', 'secure': True, - 'same_site': True, + 'same_site': 'Strict', 'compress': gzip, 'sid': 'MYSID' } @@ -209,105 +384,88 @@ def __init__(self, secret_key: Union[Request, str, bytes], *Changed in version 2.4.x*: Use app.secret_key in the constructor, and then call the load method. """ + super().__init__(expires=expires, max_age=max_age, domain=domain, + path=path, secure=secure, same_site=same_site, + sid=sid) + + _request = None if not isinstance(secret_key, (str, bytes)): # backwards compatibility log.warning('Do not use request in PoorSession constructor, ' 'see new api and call load method manually.') - if secret_key.secret_key is None: - raise SessionError("poor_SecretKey is not set!") - self.__secret_key = secret_key.secret_key - else: - self.__secret_key = secret_key + _request = secret_key + secret_key = _request.secret_key + + if isinstance(secret_key, str): + secret_key = secret_key.encode('utf-8') + + if not secret_key: + raise SessionError("poor_SecretKey is not set!") + + # XOR keystream — domain-separated from MAC key. + self.__secret_hash = shake_256(b'ks\x00' + secret_key).digest( + KEYSTREAM_SIZE) + # MAC key — independent derivation so changing KEYSTREAM_SIZE does not + # affect it and vice versa. + self.__mac_key = shake_256(b'mac\x00' + secret_key).digest(32) + # Permutation table seeded independently from its own label. + self.__secret_table = bytearray(range(256)) + perm_seed = shake_256(b'perm\x00' + secret_key).digest(32) + Random(perm_seed).shuffle(self.__secret_table) # nosec # noqa: S311 - self.__sid = sid - self.__expires = expires - self.__max_age = max_age - self.__domain = domain - self.__path = path - self.__secure = secure - self.__same_site = same_site self.__cps = compress if compress is not None else NoCompress - - # data is session dictionary to store user data in cookie self.data: Dict[Any, Any] = {} - self.cookie: SimpleCookie = SimpleCookie() - self.cookie[sid] = '' - if not isinstance(secret_key, (str, bytes)): # backwards compatibility - self.load(secret_key.cookies) + if _request is not None: + self.load(_request.cookies) def load(self, cookies: Optional[SimpleCookie]): - """Loads the session from the request's cookie.""" - if not isinstance(cookies, SimpleCookie) or self.__sid not in cookies: + """Load and decrypt the session from the request's cookies.""" + if not isinstance(cookies, SimpleCookie) or self._sid not in cookies: return - raw = cookies[self.__sid].value + raw = cookies[self._sid].value - if raw: - try: - self.data = loads(hidden(self.__cps.decompress - (b64decode(raw.encode())), - self.__secret_key)) - except Exception as err: - log.info(repr(err)) - raise SessionError("Bad session data.") from err + if not raw: + return - if not isinstance(self.data, dict): - raise SessionError("Cookie data is not dictionary!") + try: + if '.' not in raw: + raise ValueError("Missing HMAC separator") + payload_b64, sig_b64 = raw.split('.', 1) - def write(self): - """Stores data to the cookie value. + payload = b64decode(payload_b64.encode(), validate=True) + signature = b64decode(sig_b64.encode(), validate=True) - This method is called automatically in the header method. - """ - raw = b64encode(self.__cps.compress(hidden(dumps(self.data), - self.__secret_key), 9)) - raw = raw if isinstance(raw, str) else raw.decode() - self.cookie[self.__sid] = raw - self.cookie[self.__sid]['HttpOnly'] = True + if len(signature) != 32: + raise ValueError("Invalid signature length") - if self.__domain: - self.cookie[self.__sid]['Domain'] = self.__domain - if self.__path: - self.cookie[self.__sid]['path'] = self.__path - if self.__secure: - self.cookie[self.__sid]['Secure'] = True - if self.__same_site: - self.cookie[self.__sid]['SameSite'] = self.__same_site - if self.__expires: - self.cookie[self.__sid]['expires'] = self.__expires - if self.__max_age is not None: - self.cookie[self.__sid]['Max-Age'] = self.__max_age + expected = hmac.digest(self.__mac_key, payload, 'sha256') + if not hmac.compare_digest(expected, signature): + raise ValueError("Invalid signature") - return raw + self.data = loads( + hidden( + decrypt( + self.__cps.decompress(payload), + self.__secret_table), + self.__secret_hash)) + except Exception as err: + log.info(repr(err)) + raise SessionError("Bad session data.") from err - def destroy(self): - """Destroys the session by setting the cookie's expires value - to the past (-1). + if not isinstance(self.data, dict): + raise SessionError("Cookie data is not dictionary!") - Ensures that data cannot be changed: - https://stackoverflow.com/a/5285982/8379994 - """ - self.cookie[self.__sid]['expires'] = -1 - if self.__max_age is not None: - self.cookie[self.__sid]['Max-Age'] = -1 - self.cookie[self.__sid]['HttpOnly'] = True - if self.__secure: - self.cookie[self.__sid]['Secure'] = True - - def header(self, headers: Optional[Union[Headers, Response]] = None): - """Generates cookie headers and appends them to headers if set. - - Returns a list of cookie header pairs. + def write(self) -> str: + """Encrypt and sign the session data, write to cookie. - headers - The object used to write the header directly. + This method is called automatically by :meth:`header`. """ - self.write() - cookies = self.cookie.output().split('\r\n') - retval = [] - for cookie in cookies: - var = cookie[:10] # Set-Cookie - val = cookie[12:] # SID=###; expires=###; Path=/ - retval.append((var, val)) - if headers: - headers.add_header(var, val) - return retval + payload = self.__cps.compress( + encrypt(hidden(dumps(self.data), self.__secret_hash), + self.__secret_table), + 9) + signature = hmac.digest(self.__mac_key, payload, 'sha256') + raw = b64encode(payload).decode() + '.' + b64encode(signature).decode() + self.cookie[self._sid] = raw + self._apply_cookie_attrs() + return raw diff --git a/tests/test_session.py b/tests/test_session.py index 43df035..59049f4 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,4 +1,4 @@ -"""Unit tests for the PoorSession class.""" +"""Unit tests for Session and PoorSession classes.""" from os import urandom from sys import version_info from http.cookies import SimpleCookie, Morsel @@ -6,12 +6,11 @@ from pytest import fixture, raises, mark -from poorwsgi.session import PoorSession, SessionError +from poorwsgi.session import Session, PoorSession, SessionError SECRET_KEY = urandom(32) # pylint: disable=redefined-outer-name -# pylint: disable=no-self-use # pylint: disable=missing-function-docstring # pylint: disable=too-few-public-methods @@ -44,9 +43,11 @@ def req_session(): return request -class TestSession: +class TestPoorSession: """Tests PoorSession configuration options.""" + # pylint: disable=no-self-use + def test_default(self): """Tests the default PoorSession configuration.""" session = PoorSession(SECRET_KEY) @@ -111,6 +112,8 @@ def test_https(self): class TestSameSite: """Tests for the PoorSession same_site option.""" + # pylint: disable=no-self-use + def test_default(self): """Tests the default SameSite behavior of PoorSession.""" session = PoorSession(SECRET_KEY) @@ -139,6 +142,8 @@ def test_strict(self): class TestErrors: """Tests exceptions.""" + # pylint: disable=no-self-use + def test_no_secret_key(self): """Tests PoorSession initialization without a secret key, expecting SessionError.""" @@ -177,6 +182,8 @@ def test_bad_session_compatibility(self, req): class TestLoadWrite: """Tests the load and write methods.""" + # pylint: disable=no-self-use + def test_compatibility_empty(self, req): """Tests compatibility with an empty request in PoorSession constructor.""" @@ -194,3 +201,106 @@ def test_write_load(self, req_session): session = PoorSession(SECRET_KEY) session.load(req_session.cookies) assert session.data == {'test': True} + + def test_tampered_cookie_rejected(self, req_session): + """Tests that a tampered cookie value raises SessionError.""" + # Flip one byte in the payload part (before the '.') + raw = req_session.cookies['SESSID'].value + payload_b64, sig_b64 = raw.split('.') + # Replace last char of payload to corrupt the ciphertext + corrupted = payload_b64[:-1] + ('A' if payload_b64[-1] != 'A' else 'B') + # Rebuild cookie with original signature — HMAC must reject it + # pylint: disable=protected-access + morsel = Morsel() + morsel._key = 'SESSID' + morsel._value = corrupted + '.' + sig_b64 + morsel._coded_value = corrupted + '.' + sig_b64 + req_session.cookies['SESSID'] = morsel + + session = PoorSession(SECRET_KEY) + with raises(SessionError): + session.load(req_session.cookies) + + def test_wrong_key_rejected(self, req_session): + """Tests that a cookie encrypted with a different key raises + SessionError.""" + session = PoorSession(b'different_key_' + SECRET_KEY) + with raises(SessionError): + session.load(req_session.cookies) + + +class TestSession: + """Tests for the plain Session base class.""" + + # pylint: disable=no-self-use + + def test_default(self): + """Tests default Session cookie attributes.""" + session = Session() + headers = session.header() + assert "HttpOnly" in headers[0][1] + assert "Path=/" in headers[0][1] + assert "Expires" not in headers[0][1] + assert "Max-Age" not in headers[0][1] + assert "Domain" not in headers[0][1] + assert "Secure" not in headers[0][1] + + def test_load_write(self): + """Tests that a value written by Session can be read back.""" + session = Session() + session.data = "my-session-id" + session.write() + + session2 = Session() + session2.load(session.cookie) + assert session2.data == "my-session-id" + + def test_empty_cookie(self): + """Tests that Session.load with no matching cookie leaves data as + empty string.""" + session = Session() + session.load(SimpleCookie()) + assert session.data == "" + + def test_destroy(self): + """Tests that destroy sets expires in the past.""" + session = Session() + session.destroy() + headers = session.header() + assert "expires=" in headers[0][1] + + def test_expires(self): + """Tests Session with an expires setting.""" + session = Session(expires=3600) + headers = session.header() + assert "expires=" in headers[0][1] + + def test_max_age(self): + """Tests Session with a max_age setting.""" + session = Session(max_age=3600) + headers = session.header() + assert "Max-Age=3600" in headers[0][1] + + def test_secure(self): + """Tests Session with secure=True.""" + session = Session(secure=True) + headers = session.header() + assert "Secure" in headers[0][1] + + def test_same_site(self): + """Tests Session with same_site='Strict'.""" + session = Session(same_site="Strict") + headers = session.header() + assert "SameSite=Strict" in headers[0][1] + + def test_custom_sid(self): + """Tests Session with a custom cookie name.""" + session = Session(sid="MYSESSID") + session.data = "token-value" + session.write() + assert "MYSESSID" in session.cookie + + def test_is_base_of_poor_session(self): + """Tests that PoorSession is a subclass of Session.""" + assert issubclass(PoorSession, Session) + assert isinstance(PoorSession(SECRET_KEY), Session) diff --git a/tests_integrity/test_session.py b/tests_integrity/test_session.py new file mode 100644 index 0000000..3cc40ff --- /dev/null +++ b/tests_integrity/test_session.py @@ -0,0 +1,89 @@ +"""Session + JWT example integrity tests.""" +from os import environ +from os.path import dirname, join, pardir + +from pytest import fixture +from requests import Session + +from .support import check_url, start_server + +# pylint: disable=inconsistent-return-statements +# pylint: disable=missing-function-docstring +# pylint: disable=no-self-use +# pylint: disable=redefined-outer-name + + +@fixture(scope="module") +def url(request): + """The URL (server fixture).""" + val = environ.get("TEST_SESSION_URL", "").strip('/') + if val: + return val + + process = start_server( + request, + join(dirname(__file__), pardir, 'examples/session.py')) + + yield "http://localhost:8080" # server is running + process.kill() + process.wait() + + +@fixture +def logged_in(url): + """Fixture that logs in with valid credentials and returns the session.""" + session = Session() + res = check_url(url + "/login", method="POST", session=session, + allow_redirects=False, status_code=302, + data={"username": "alice", "password": "secret"}) + assert "SESSID" in session.cookies + cookie = res.headers["Set-Cookie"] + assert "; HttpOnly" in cookie + return session + + +class TestSession: + """Tests for the session + JWT example.""" + + def test_root(self, url): + check_url(url + "/") + + def test_login_get(self, url): + check_url(url + "/login") + + def test_private_without_login_redirects(self, url): + check_url(url + "/private", allow_redirects=False, status_code=302) + + def test_login_bad_password(self, url): + check_url(url + "/login", method="POST", status_code=200, + data={"username": "alice", "password": "wrong"}) + + def test_login_empty_username(self, url): + check_url(url + "/login", method="POST", status_code=200, + data={"username": "", "password": "secret"}) + + def test_login_sets_cookie(self, url): + session = Session() + res = check_url(url + "/login", method="POST", session=session, + allow_redirects=False, status_code=302, + data={"username": "bob", "password": "secret"}) + assert "SESSID" in session.cookies + cookie = res.headers["Set-Cookie"] + assert "; HttpOnly" in cookie + + def test_private_after_login(self, url, logged_in): + res = check_url(url + "/private", session=logged_in) + assert b"alice" in res.content + + def test_logout_clears_cookie(self, url, logged_in): + res = check_url(url + "/logout", session=logged_in, + allow_redirects=False, status_code=302) + assert "SESSID" not in logged_in.cookies + cookie = res.headers["Set-Cookie"] + assert "; HttpOnly" in cookie + + def test_private_after_logout_redirects(self, url, logged_in): + check_url(url + "/logout", session=logged_in, allow_redirects=False, + status_code=302) + check_url(url + "/private", session=logged_in, + allow_redirects=False, status_code=302)