diff --git a/.gitignore b/.gitignore
index 50a3a94..83cde51 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,4 @@ build/
tags
__pycache__/
*.pyc
+*.profile
diff --git a/examples/poor_session.py b/examples/poor_session.py
new file mode 100644
index 0000000..bb50311
--- /dev/null
+++ b/examples/poor_session.py
@@ -0,0 +1,150 @@
+"""PoorSession example.
+
+Demonstrates ``PoorSession`` — a self-contained encrypted and authenticated
+session cookie. User data is stored as a dictionary directly in the cookie;
+no server-side session store is needed. No external dependencies are
+required.
+
+Run::
+
+ python examples/poor_session.py
+
+Then open http://127.0.0.1:8080 in your browser.
+Login with any non-empty username and password ``secret``.
+"""
+import logging
+from functools import wraps
+from os import path, urandom
+from sys import path as python_path
+
+python_path.insert(
+ 0, path.abspath(path.join(path.dirname(__file__), path.pardir)))
+
+# pylint: disable=wrong-import-position
+from poorwsgi import Application, state # noqa
+from poorwsgi.response import HTTPException, RedirectResponse, redirect # noqa
+from poorwsgi.session import PoorSession, SessionError # noqa
+
+# pylint: disable=unused-argument
+
+logging.getLogger().setLevel("DEBUG")
+
+app = application = Application(__name__) # pylint: disable=invalid-name
+app.debug = True
+app.secret_key = urandom(32)
+
+PASSWORD = "secret" # nosec # noqa: S105
+
+
+# --- helpers -----------------------------------------------------------------
+
+def get_header(title):
+ """Returns HTML page header lines."""
+ return (
+ "", "
",
+ '',
+ f"{title}", "", "",
+ f"{title}
")
+
+
+def get_footer():
+ """Returns HTML page footer lines."""
+ return ("
",
+ "PoorSession example — PoorWSGI",
+ "", "")
+
+
+def check_login(fn):
+ """Decorator that reads and verifies the PoorSession cookie.
+
+ Sets ``req.login`` to the stored username on success, otherwise
+ redirects to ``/login``.
+ """
+ @wraps(fn)
+ def handler(req):
+ session = PoorSession(app.secret_key)
+ try:
+ session.load(req.cookies)
+ except SessionError:
+ redirect("/login", message=b"Invalid session")
+ username = session.data.get("user")
+ if not username:
+ redirect("/login", message=b"Login required")
+ req.login = username
+ return fn(req)
+ return handler
+
+
+# --- routes ------------------------------------------------------------------
+
+@app.route('/')
+def root(_req):
+ """Index page with navigation."""
+ body = ('')
+ for line in get_header("PoorSession demo") + body + get_footer():
+ yield line.encode() + b'\n'
+
+
+@app.route('/login')
+def login_form(_req):
+ """GET: show login form."""
+ form = ('',
+ 'Password is: secret
')
+ for line in get_header("Login") + form + get_footer():
+ yield line.encode() + b'\n'
+
+
+@app.route('/login', method=state.METHOD_POST)
+def login_post(req):
+ """POST: validate credentials and store username in encrypted cookie."""
+ username = req.form.getfirst('username', func=str) or ''
+ password = req.form.getfirst('password', func=str) or ''
+
+ if not username or password != PASSWORD:
+ redirect('/login')
+
+ response = RedirectResponse("/private")
+ session = PoorSession(app.secret_key, secure=False, same_site="Lax")
+ session.data["user"] = username
+ session.write()
+ session.header(response)
+ raise HTTPException(response)
+
+
+@app.route('/private')
+@check_login
+def private(req):
+ """A protected page — only accessible after login."""
+ body = (f'Hello, {req.login}!
',
+ 'Your session is stored in an encrypted cookie '
+ '(no external dependencies).
',
+ 'Log out
')
+ for line in get_header("Private page") + body + get_footer():
+ yield line.encode() + b'\n'
+
+
+@app.route('/logout')
+def logout(_req):
+ """Clears the session cookie."""
+ response = RedirectResponse("/login")
+ session = PoorSession(app.secret_key)
+ session.destroy()
+ session.header(response)
+ raise HTTPException(response)
+
+
+if __name__ == '__main__':
+ from wsgiref.simple_server import make_server # noqa: E402
+ httpd = make_server( # pylint: disable=invalid-name
+ '127.0.0.1', 8080, app)
+ logging.info("Starting to serve on http://127.0.0.1:8080")
+ httpd.serve_forever()
diff --git a/examples/simple.py b/examples/simple.py
index 37ce555..10853d6 100644
--- a/examples/simple.py
+++ b/examples/simple.py
@@ -9,7 +9,6 @@
import os
from base64 import decodebytes, encodebytes, urlsafe_b64encode
from collections import OrderedDict
-from functools import wraps
from hashlib import md5
from io import BytesIO
from io import FileIO as file
@@ -23,16 +22,15 @@
0, os.path.abspath(os.path.join(EXAMPLES_PATH, os.path.pardir)))
# pylint: disable=import-error, wrong-import-position
-from poorwsgi import Application, redirect, state # noqa
+from poorwsgi import Application, state # noqa
from poorwsgi.fieldstorage import FieldStorageParser # noqa
from poorwsgi.headers import http_to_time, parse_range, time_to_http # noqa
from poorwsgi.response import FileResponse # noqa
from poorwsgi.response import HTTPException # noqa
from poorwsgi.response import (FileObjResponse, GeneratorResponse, # noqa
NoContentResponse, NotModifiedResponse,
- PartialResponse, RedirectResponse, Response)
+ PartialResponse, Response)
from poorwsgi.results import html_escape, not_modified # noqa
-from poorwsgi.session import PoorSession, SessionError # noqa
try:
import uwsgi # type: ignore
@@ -46,7 +44,6 @@
app.debug = True
app.document_root = '.'
app.document_index = True
-app.secret_key = os.urandom(32) # random key each run
class MyValueError(ValueError):
@@ -159,27 +156,6 @@ def get_variables(req):
app.set_filter('email', r'[\w\.\-]+@[\w\.\-]+')
-def check_login(fun):
- """Checks the session cookie."""
-
- @wraps(fun)
- def handler(req):
- session = PoorSession(app.secret_key)
- try:
- session.load(req.cookies)
- except SessionError:
- pass
- if 'login' not in session.data:
- log.info('Login cookie not found.')
- redirect(
- "/",
- message="Login required",
- )
- return fun(req)
-
- return handler
-
-
@app.route('/')
def root(req):
"""Returns the root index page."""
@@ -204,12 +180,10 @@ def root(req):
' - Testing variable args',
'/test/headers - Testing Headers'
'',
- '/login - Create login session',
- '/logout - Destroy login session',
'/test/form'
- ' - Testing http form (only if you have login cookie / session)',
+ ' - Testing http form',
'/test/upload - '
- 'Testing file upload (only if you have login cookie / session)',
+ 'Testing file upload',
'/debug-info'
' - Debug Page (only if poor_Debug is set)',
'/no-page - No Exist Page',
@@ -346,30 +320,7 @@ def test_varargs(req, *args):
return response
-@app.route('/login')
-def login(req):
- """Creates a login session cookie."""
- log.debug("Input cookies: %s", repr(req.cookies))
- cookie = PoorSession(app.secret_key)
- cookie.data['login'] = True
- response = RedirectResponse('/')
- cookie.header(response)
- return response
-
-
-@app.route('/logout')
-def logout(req):
- """Destroys the login session cookie."""
- log.debug("Input cookies: %s", repr(req.cookies))
- cookie = PoorSession(app.secret_key)
- cookie.destroy()
- response = RedirectResponse('/')
- cookie.header(response)
- return response
-
-
@app.route('/test/form', method=state.METHOD_GET_POST)
-@check_login
def test_form(req):
"""A form example."""
# pylint: disable=consider-using-f-string
@@ -448,7 +399,6 @@ def test_form(req):
@app.route('/test/upload', method=state.METHOD_GET_POST)
-@check_login
def test_upload(req):
"""A file upload example."""
var_info = OrderedDict((
diff --git a/poorwsgi/fieldstorage.py b/poorwsgi/fieldstorage.py
index e38bb04..0a4f008 100644
--- a/poorwsgi/fieldstorage.py
+++ b/poorwsgi/fieldstorage.py
@@ -13,7 +13,7 @@
from io import BytesIO, StringIO, TextIOWrapper
from typing import Any, Callable, Optional, Union
-from poorwsgi.headers import parse_header
+from poorwsgi.headers import parse_header, Headers
_RE_STR_BOUNDARY = re.compile("^[ -~]{0,200}[!-~]$")
_RE_BIN_BOUNDARY = re.compile(b"^[ -~]{0,200}[!-~]$")
@@ -425,7 +425,7 @@ class FieldStorageParser:
"""
BUFSIZE = 8*1024 # buffering size for copy to file and storing StringIO
- def __init__(self, input_=None, headers=None, outerboundary=b'',
+ def __init__(self, input_=None, headers=None | Headers, outerboundary=b'',
keep_blank_values=0, strict_parsing=0,
limit=None, encoding='utf-8', errors='replace',
max_num_fields=None, separator='&', file_callback=None):
@@ -477,7 +477,7 @@ def __init__(self, input_=None, headers=None, outerboundary=b'',
allows you to write a file from the request directly to its
destination without temporary files.
"""
- self.headers = headers
+ self.headers = headers or Headers()
self.outerboundary = outerboundary
self.keep_blank_values = keep_blank_values
self.strict_parsing = strict_parsing
@@ -551,7 +551,7 @@ def parse(self) -> FieldStorage:
try:
clen = int(self.headers['content-length'])
except ValueError:
- pass
+ pass # clen will be still -1 if value is not integer
field.length = self.length = clen
if self.limit is None and clen >= 0:
self.limit = clen
diff --git a/poorwsgi/session.py b/poorwsgi/session.py
index 1e274b8..df53f4c 100644
--- a/poorwsgi/session.py
+++ b/poorwsgi/session.py
@@ -3,10 +3,10 @@
:Classes: NoCompress, Session, PoorSession
:Functions: hidden, encrypt, decrypt, get_token, check_token
-:class:`Session` is a plain cookie wrapper suitable for storing a server-side
+``Session`` is a plain cookie wrapper suitable for storing a server-side
session ID or a JWT. No encryption is applied; the value is stored verbatim.
-:class:`PoorSession` is a self-contained encrypted session cookie.
+``PoorSession`` is a self-contained encrypted session cookie.
Cookie format for PoorSession: ``base64(ciphertext).base64(hmac-sha256)``
@@ -158,7 +158,7 @@ class Session:
The cookie is always set with ``HttpOnly=True``. Use ``secure=True``
when serving over HTTPS.
- This class is also the base class for :class:`PoorSession`.
+ This class is also the base class for ``PoorSession``.
.. code:: python
@@ -213,7 +213,7 @@ def __init__(self, expires: int = 0, max_age: Optional[int] = None,
def _apply_cookie_attrs(self):
"""Apply security and configuration attributes to the session cookie.
- Called by :meth:`write` and subclass overrides of :meth:`write`.
+ Called by ``write`` and subclass overrides of ``write``.
Sets ``HttpOnly``, ``Domain``, ``Path``, ``Secure``, ``SameSite``,
``Expires``, and ``Max-Age`` as configured.
"""
@@ -234,7 +234,7 @@ def _apply_cookie_attrs(self):
def load(self, cookies: Optional[SimpleCookie]):
"""Load the session value from the request's cookies.
- Sets :attr:`data` to the raw cookie string, or leaves it as ``""``
+ Sets ``data`` to the raw cookie string, or leaves it as ``""``
if the cookie is absent or empty.
"""
if not isinstance(cookies, SimpleCookie) or self._sid not in cookies:
@@ -242,9 +242,9 @@ def load(self, cookies: Optional[SimpleCookie]):
self.data = cookies[self._sid].value
def write(self) -> str:
- """Store :attr:`data` to the cookie value.
+ """Store ``data`` to the cookie value.
- This method is called automatically by :meth:`header`.
+ This method is called automatically by ``header``.
Returns the raw string written to the cookie.
"""
raw = self.data if isinstance(self.data, str) else str(self.data)
@@ -458,7 +458,7 @@ def load(self, cookies: Optional[SimpleCookie]):
def write(self) -> str:
"""Encrypt and sign the session data, write to cookie.
- This method is called automatically by :meth:`header`.
+ This method is called automatically by ``header``.
"""
payload = self.__cps.compress(
encrypt(hidden(dumps(self.data), self.__secret_hash),
diff --git a/tests/test_session.py b/tests/test_session.py
index a9c44df..dbe2c30 100644
--- a/tests/test_session.py
+++ b/tests/test_session.py
@@ -1,10 +1,9 @@
"""Unit tests for Session and PoorSession classes."""
from os import urandom
-from sys import version_info
from http.cookies import SimpleCookie, Morsel
from typing import Any
-from pytest import fixture, raises, mark
+from pytest import fixture, raises
from poorwsgi.session import Session, PoorSession, SessionError
@@ -122,8 +121,6 @@ def test_load_missing_sid(self):
assert session.data == {}
-@mark.skipif(version_info.minor < 8,
- reason="SameSite is supported from Python 3.8")
class TestSameSite:
"""Tests for the PoorSession same_site option."""
diff --git a/tests_integrity/test_poor_session.py b/tests_integrity/test_poor_session.py
new file mode 100644
index 0000000..d52ffa4
--- /dev/null
+++ b/tests_integrity/test_poor_session.py
@@ -0,0 +1,100 @@
+"""PoorSession example integrity tests."""
+from os import environ
+from os.path import dirname, join, pardir
+
+from pytest import fixture
+from requests import Session
+
+from .support import check_url, start_server
+
+# pylint: disable=missing-function-docstring
+# pylint: disable=no-self-use
+# pylint: disable=redefined-outer-name
+# pylint: disable=duplicate-code
+
+
+@fixture(scope="module")
+def url(request):
+ """The URL (server fixture)."""
+ process = None
+ val = environ.get("TEST_POOR_SESSION_URL", "").rstrip('/')
+ if val:
+ yield val
+ else:
+ process = start_server(
+ request,
+ join(dirname(__file__), pardir, 'examples/poor_session.py'))
+ yield "http://localhost:8080" # server is running
+
+ if process is not None:
+ process.kill()
+ process.wait()
+
+
+@fixture
+def logged_in(url):
+ """Fixture that logs in with valid credentials and returns the session."""
+ session = Session()
+ res = check_url(url + "/login", method="POST", session=session,
+ allow_redirects=False, status_code=302,
+ data={"username": "alice", "password": "secret"})
+ assert "SESSID" in session.cookies
+ cookie = res.headers["Set-Cookie"]
+ assert "; HttpOnly" in cookie
+ return session
+
+
+class TestPoorSession:
+ """Tests for the PoorSession example."""
+
+ def test_root(self, url):
+ check_url(url + "/")
+
+ def test_login_get(self, url):
+ check_url(url + "/login")
+
+ def test_private_without_login_redirects(self, url):
+ check_url(url + "/private", allow_redirects=False, status_code=302)
+
+ def test_login_bad_password(self, url):
+ check_url(url + "/login", method="POST", status_code=200,
+ data={"username": "alice", "password": "wrong"})
+
+ def test_login_empty_username(self, url):
+ check_url(url + "/login", method="POST", status_code=200,
+ data={"username": "", "password": "secret"})
+
+ def test_login_sets_cookie(self, url):
+ session = Session()
+ res = check_url(url + "/login", method="POST", session=session,
+ allow_redirects=False, status_code=302,
+ data={"username": "bob", "password": "secret"})
+ assert "SESSID" in session.cookies
+ cookie = res.headers["Set-Cookie"]
+ assert "; HttpOnly" in cookie
+
+ def test_username_not_in_plaintext_cookie(self, url):
+ """Username must not appear in plaintext in the cookie value."""
+ session = Session()
+ check_url(url + "/login", method="POST", session=session,
+ allow_redirects=False, status_code=302,
+ data={"username": "alice", "password": "secret"})
+ cookie_value = session.cookies.get("SESSID", "")
+ assert "alice" not in cookie_value
+
+ def test_private_after_login(self, url, logged_in):
+ res = check_url(url + "/private", session=logged_in)
+ assert b"alice" in res.content
+
+ def test_logout_clears_cookie(self, url, logged_in):
+ res = check_url(url + "/logout", session=logged_in,
+ allow_redirects=False, status_code=302)
+ assert "SESSID" not in logged_in.cookies
+ cookie = res.headers["Set-Cookie"]
+ assert "; HttpOnly" in cookie
+
+ def test_private_after_logout_redirects(self, url, logged_in):
+ check_url(url + "/logout", session=logged_in, allow_redirects=False,
+ status_code=302)
+ check_url(url + "/private", session=logged_in,
+ allow_redirects=False, status_code=302)
diff --git a/tests_integrity/test_simple.py b/tests_integrity/test_simple.py
index 4e899ef..51c9373 100644
--- a/tests_integrity/test_simple.py
+++ b/tests_integrity/test_simple.py
@@ -3,7 +3,6 @@
from os.path import dirname, join, pardir
from pytest import fixture
-from requests import Session
from .support import check_url, start_server
@@ -11,6 +10,7 @@
# pylint: disable=no-self-use
# pylint: disable=redefined-outer-name
# pylint: disable=consider-using-f-string
+# pylint: disable=duplicate-code
@fixture(scope="module")
@@ -31,18 +31,6 @@ def url(request):
process.wait()
-@fixture
-def session(url):
- """Fixture for creating a session and logging in."""
- session = Session()
- res = check_url(url+"/login", status_code=302, session=session,
- allow_redirects=False)
- assert "SESSID" in session.cookies
- cookie = res.headers["Set-Cookie"]
- assert "; HttpOnly; " in cookie
- return session
-
-
class TestSimple():
"""Tests for routes."""
@@ -131,17 +119,17 @@ def test_file_response_304_last_modified(self, url):
Modified."""
res = check_url(url+"/simple.py")
last_modified = res.headers.get('Last-Modified')
- res = check_url(url+"/simple.py",
- headers={'If-Modified-Since': last_modified},
- status_code=304)
+ check_url(url+"/simple.py",
+ headers={'If-Modified-Since': last_modified},
+ status_code=304)
def test_file_response_304_etag(self, url):
"""Tests FileResponse with ETag header for 304 Not Modified."""
res = check_url(url+"/simple.py")
etag = res.headers.get('ETag')
- res = check_url(url+"/simple.py",
- headers={'If-None-Match': etag},
- status_code=304)
+ check_url(url+"/simple.py",
+ headers={'If-None-Match': etag},
+ status_code=304)
def test_none_no_content(self, url):
"""Tests None response resulting in 204 No Content."""
@@ -198,53 +186,32 @@ def test_unicodes(self, url):
assert len(res.text) == 50
assert len(res.text) < len(res.text.encode("utf-8"))
+ def test_form_get(self, url):
+ """Tests GET form access."""
+ check_url(url+"/test/form")
-class TestSession():
- """Session tests."""
-
- def test_login(self, url):
- """Tests the /login endpoint."""
- check_url(url+"/login", status_code=302, allow_redirects=False)
-
- def test_logout(self, url, session):
- """Tests the /logout endpoint."""
- res = check_url(url+"/logout", session=session,
- allow_redirects=False, status_code=302)
- assert "SESSID" not in session.cookies # cookie is expired
- cookie = res.headers["Set-Cookie"] # header must exists
- assert "; HttpOnly; " in cookie
-
- def test_form_get_not_logged(self, url):
- """Tests GET form access when not logged in."""
- check_url(url+"/test/form", status_code=302, allow_redirects=False)
-
- def test_form_get_logged(self, url, session):
- """Tests GET form access when logged in."""
- check_url(url+"/test/form", session=session, allow_redirects=False)
-
- def test_form_post(self, url, session):
+ def test_form_post(self, url):
"""Tests POST form submission."""
- check_url(url+"/test/form", method="POST", session=session,
- allow_redirects=False)
+ check_url(url+"/test/form", method="POST")
- def test_form_upload(self, url, session):
+ def test_form_upload(self, url):
"""Tests file upload via form."""
with open(__file__, 'rb') as _file:
files = {'file_0': ('testfile.py', _file,
'text/x-python', {'Expires': '0'})}
- res = check_url(url+"/test/upload", method="POST", session=session,
+ res = check_url(url+"/test/upload", method="POST",
allow_redirects=False, files=files)
assert 'testfile.py' in res.text
assert __doc__ in res.text
assert 'anything' in res.text
- def test_form_upload_small(self, url, session):
+ def test_form_upload_small(self, url):
"""Tests small file upload via form."""
manifest = join(dirname(__file__), pardir, 'MANIFEST.in')
with open(manifest, 'rb') as _file:
files = {'file_0': ('MANIFEST.in', _file,
'text/plain', {'Expires': '0'})}
- res = check_url(url+"/test/upload", method="POST", session=session,
+ res = check_url(url+"/test/upload", method="POST",
allow_redirects=False, files=files)
assert 'MANIFEST.in' in res.text
assert 'graft' in res.text