diff --git a/.bumpversion.cfg b/.bumpversion.cfg index c107a2010..e080fe817 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,10 +1,10 @@ [bumpversion] -current_version = 4.1.0 +current_version = 4.3.2 commit = true tag = true tag_name = {new_version} -[bumpversion:file:setup.py] +[bumpversion:file:pyproject.toml] [bumpversion:file:docs/conf.py] diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 000000000..0d1007cf4 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1,3 @@ +# These are supported funding model platforms + +github: [mvantellingen] diff --git a/.github/workflows/python-release.yml b/.github/workflows/python-release.yml index 21caa6f46..19457bcec 100644 --- a/.github/workflows/python-release.yml +++ b/.github/workflows/python-release.yml @@ -7,18 +7,22 @@ on: jobs: release: runs-on: ubuntu-latest + environment: release + permissions: + id-token: write steps: - - uses: actions/checkout@v2 - - name: Set up Python 3.7 - uses: actions/setup-python@v1 + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 with: - python-version: 3.7 + python-version: 3.12 + - name: Install build requirements - run: python -m pip install wheel + run: python -m pip install wheel build + - name: Build package - run: python setup.py sdist bdist_wheel + run: python -m build + - name: Publish package - uses: pypa/gh-action-pypi-publish@master - with: - user: __token__ - password: ${{ secrets.pypi_password }} + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml index 428e60b5e..beacd799c 100644 --- a/.github/workflows/python-test.yml +++ b/.github/workflows/python-test.yml @@ -1,19 +1,27 @@ name: Python Tests -on: [push] +on: + push: + branches: + - main + pull_request: + types: [opened, synchronize] jobs: format: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Set up Python 3.7 - uses: actions/setup-python@v2 + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 with: - python-version: 3.7 + python-version: 3.12 + - name: Install dependencies run: pip install tox + - name: Validate formatting run: tox -e format @@ -24,7 +32,11 @@ jobs: max-parallel: 4 matrix: platform: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + # # TODO: Remove Windows exclusion when binary wheel available for lxml + # exclude: + # - { platform: windows-latest, python-version: "3.11" } + steps: - name: Install system dependencies @@ -32,48 +44,66 @@ jobs: run: | sudo apt-get update sudo apt-get install libssl-dev libxmlsec1 libxmlsec1-dev libxmlsec1-openssl libxslt1-dev pkg-config + - name: Install system dependencies if: matrix.platform == 'macos-latest' run: | brew install libxmlsec1 libxslt pkgconfig - - uses: actions/checkout@v2 + + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Install dependencies run: | python -m pip install --upgrade pip pip install tox tox-gh-actions + - name: Test with tox run: tox env: PLATFORM: ${{ matrix.platform }} + - name: Prepare artifacts run: mkdir .coverage-data && mv .coverage.* .coverage-data/ - - uses: actions/upload-artifact@master + + - uses: actions/upload-artifact@v4 with: - name: coverage-data - path: .coverage-data/ + name: coverage-data-${{ matrix.platform }}-${{ matrix.python-version }} + path: .coverage-data/.coverage.* + include-hidden-files: true + retention-days: 1 coverage: runs-on: ubuntu-latest needs: [test] steps: - - uses: actions/checkout@v2 - - uses: actions/download-artifact@master + - uses: actions/checkout@v4 + + - uses: actions/download-artifact@v4 with: - name: coverage-data + pattern: coverage-data-* + merge-multiple: true path: . - - name: Set up Python 3.7 - uses: actions/setup-python@v2 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 with: - python-version: 3.7 + python-version: 3.12 + - name: Install dependencies run: | python -m pip install --upgrade pip pip install tox + - name: Prepare Coverage report run: tox -e coverage-report + - name: Upload to codecov - uses: codecov/codecov-action@v1.0.6 + if: github.ref == 'refs/heads/main' + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/CHANGES b/CHANGES index 85c4065fe..09c85e8c6 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,26 @@ -4.2.0 (2022-111-03) +4.3.2 (2025-09-15) +----------------- + - Support newer versions of httpx (#1447) + +4.3.1 (2024-10-16) +------------------ + - Fix regression in parsing xsd:Date with negative timezone + +4.3.0 (2024-10-13) +------------------ + - Drop support for Python 3.7 and 3.8 and add support for Python 3.12 and 3.13 (#1421, #1408) + - Add workaround to support date values with timezone in combination with + isodate 0.7.2 (#1431) + - Replace deprecated `datetime.datetime.utcnow()` + - Properly close 'file://' resources (#1339) + - Complete migration to pyproject.toml (remove setup.py) + + +4.2.1 (2022-11-10) +------------------- + - Fix error regarding closing session in async transport (#1347) + +4.2.0 (2022-11-03) ------------------- - Drop support for Python 3.6 - Allow embedding CDATA elements in simple types (#1339) @@ -8,6 +30,7 @@ - Fix IndexError when empty body response (#1287) - Add support for context manager on Client (#1166) - Allow Ws Addressing plugin to use a different URL (#1328) + - Accept strings for xsd base64Binary (#1072) 4.1.0 (2021-08-15) diff --git a/README.md b/README.md new file mode 100644 index 000000000..bf516b81d --- /dev/null +++ b/README.md @@ -0,0 +1,59 @@ +# Zeep: Python SOAP client +[![Documentation Status](https://readthedocs.org/projects/python-zeep/badge/?version=latest)](https://readthedocs.org/projects/python-zeep/) +[![Python Tests](https://github.com/mvantellingen/python-zeep/workflows/Python%20Tests/badge.svg)](https://github.com/mvantellingen/python-zeep/actions?query=workflow%3A%22Python+Tests%22) +[![Coverage](https://codecov.io/gh/mvantellingen/python-zeep/graph/badge.svg?token=zwew4hc8ih)](https://codecov.io/gh/mvantellingen/python-zeep) +[![PyPI version](https://img.shields.io/pypi/v/zeep.svg)](https://pypi.python.org/pypi/zeep/) + +A Python SOAP client + +## Highlights: +- Compatible with Python 3.9, 3.10, 3.11, 3.12, 3.13 and PyPy3 +- Built on top of lxml, requests, and httpx +- Support for Soap 1.1, Soap 1.2, and HTTP bindings +- Support for WS-Addressing headers +- Support for WSSE (UserNameToken / x.509 signing) +- Support for asyncio using the httpx module +- Experimental support for XOP messages + +Please see the [documentation](http://docs.python-zeep.org/) for more information. + +## Status + +> [!NOTE] +> I consider this library to be stable. Since no new developments happen around the SOAP specification, it won't be updated that much. Good PRs which fix bugs are always welcome, however. + + +## Installation + +```bash +pip install zeep +``` + +Zeep uses the lxml library for parsing XML. See [lxml installation requirements](https://lxml.de/installation.html). + +## Usage + +```python +from zeep import Client + +client = Client('tests/wsdl_files/example.rst') +client.service.ping() +``` + +To quickly inspect a WSDL file, use: + +```bash +python -m zeep +``` + +Please see the [documentation](http://docs.python-zeep.org) for more information. + + +# Sponsors +[![Kraken Tech](https://camo.githubusercontent.com/ecc2b8426b961f8895e4f42741c006839e4488fbe9ba8e92cfa02d48c7fdb3f1/68747470733a2f2f7374617469632e6f63746f70757363646e2e636f6d2f6b746c2f6b72616b656e2d6c6f676f2d772d67726f75702d7365636f6e646172792d323032322e706e67)](https://github.com/octoenergy) + +# Support + +If you want to report a bug, please first read [the bug reporting guidelines](http://docs.python-zeep.org/en/master/reporting_bugs.html). + +Please only report bugs, not support requests, to the GitHub issue tracker. diff --git a/README.rst b/README.rst deleted file mode 100644 index da27ca3bc..000000000 --- a/README.rst +++ /dev/null @@ -1,76 +0,0 @@ -======================== -Zeep: Python SOAP client -======================== - -A fast and modern Python SOAP client - -Highlights: - * Compatible with Python 3.7, 3.8, 3.9, 3.10, 3.11 and PyPy3 - * Build on top of lxml and requests - * Support for Soap 1.1, Soap 1.2 and HTTP bindings - * Support for WS-Addressing headers - * Support for WSSE (UserNameToken / x.509 signing) - * Support for asyncio using the httpx module - * Experimental support for XOP messages - - -Please see for more information the documentation at -http://docs.python-zeep.org/ - - -.. start-no-pypi - -Status ------- - -.. image:: https://readthedocs.org/projects/python-zeep/badge/?version=latest - :target: https://readthedocs.org/projects/python-zeep/ - -.. image:: https://github.com/mvantellingen/python-zeep/workflows/Python%20Tests/badge.svg - :target: https://github.com/mvantellingen/python-zeep/actions?query=workflow%3A%22Python+Tests%22 - -.. image:: http://codecov.io/github/mvantellingen/python-zeep/coverage.svg?branch=master - :target: http://codecov.io/github/mvantellingen/python-zeep?branch=master - -.. image:: https://img.shields.io/pypi/v/zeep.svg - :target: https://pypi.python.org/pypi/zeep/ - -.. end-no-pypi - -Installation ------------- - -.. code-block:: bash - - pip install zeep - -Note that the latest version to support Python 2.7, 3.3, 3.4 and 3.5 is Zeep 3.4, install via `pip install zeep==3.4.0` - -Zeep uses the lxml library for parsing xml. See https://lxml.de/installation.html for the installation requirements. - -Usage ------ -.. code-block:: python - - from zeep import Client - - client = Client('tests/wsdl_files/example.rst') - client.service.ping() - - -To quickly inspect a WSDL file use:: - - python -m zeep - - -Please see the documentation at http://docs.python-zeep.org for more -information. - - -Support -======= - -If you want to report a bug then please first read -http://docs.python-zeep.org/en/master/reporting_bugs.html - -Please only report bugs and not support requests to the GitHub issue tracker. diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 000000000..c09bc15b4 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,4 @@ +# Security Policy + +If you discover a security vulnerability, please report it by emailing to michaelvantellingen@gmail.com. +I take all security concerns seriously and will respond promptly to evaluate and address the issue. diff --git a/docs/_templates/sidebar-intro.html b/docs/_templates/sidebar-intro.html index af3fb9eae..db7b6ef88 100644 --- a/docs/_templates/sidebar-intro.html +++ b/docs/_templates/sidebar-intro.html @@ -3,7 +3,7 @@ -

Zeep is a modern SOAP client for Python

+

Zeep is a SOAP client for Python

diff --git a/docs/conf.py b/docs/conf.py index f006c1af0..32933820d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Zeep documentation build configuration file, created by # sphinx-quickstart on Fri Mar 4 16:51:06 2016. @@ -12,9 +11,6 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys -import os -import pkg_resources # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the @@ -52,9 +48,9 @@ master_doc = 'index' # General information about the project. -project = u'Zeep' -copyright = u'2016, Michael van Tellingen' -author = u'Michael van Tellingen' +project = 'Zeep' +copyright = '2016, Michael van Tellingen' +author = 'Michael van Tellingen' # The version info for the project you're documenting, acts as replacement for @@ -62,7 +58,7 @@ # built documents. # # The short X.Y version. -version = '4.1.0' +version = '4.3.2' release = version # The language for content autogenerated by Sphinx. Refer to documentation @@ -249,8 +245,8 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'Zeep.tex', u'Zeep Documentation', - u'Michael van Tellingen', 'manual'), + (master_doc, 'Zeep.tex', 'Zeep Documentation', + 'Michael van Tellingen', 'manual'), ] # The name of an image file (relative to this directory) to place at the top of @@ -293,7 +289,7 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'Zeep', u'Zeep Documentation', + (master_doc, 'Zeep', 'Zeep Documentation', author, 'Zeep', 'One line description of project.', 'Miscellaneous'), ] diff --git a/docs/index.rst b/docs/index.rst index a2a812798..c84e2cc20 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -2,10 +2,10 @@ Zeep: Python SOAP client ======================== -A fast and modern Python SOAP client +A Python SOAP client Highlights: - * Compatible with Python 3.7, 3.8, 3.9, 3.10, 3.11 and PyPy + * Compatible with Python 3.9, 3.10, 3.11, 3.12, 3.13 and PyPy * Build on top of lxml and requests * Support for Soap 1.1, Soap 1.2 and HTTP bindings * Support for WS-Addressing headers @@ -106,7 +106,7 @@ See ``python -mzeep --help`` for more information about this command. .. note:: Zeep follows `semver`_ for versioning, however bugs can always occur. So as always pin the version of zeep you tested with - (e.g. ``zeep==4.1.0``'). + (e.g. ``zeep==4.3.2``'). .. _semver: http://semver.org/ diff --git a/examples/async_client.py b/examples/async_client.py index 3bb962869..7eeebf77e 100644 --- a/examples/async_client.py +++ b/examples/async_client.py @@ -1,10 +1,8 @@ import asyncio -import httpx import time import zeep -from zeep.transports import AsyncTransport def run_async(): diff --git a/examples/code39.py b/examples/code39.py index f1c68b425..e24810dfa 100644 --- a/examples/code39.py +++ b/examples/code39.py @@ -1,4 +1,3 @@ -from __future__ import print_function import zeep client = zeep.Client( diff --git a/examples/eu_vat_service.py b/examples/eu_vat_service.py index e751e3e00..befe5df4b 100644 --- a/examples/eu_vat_service.py +++ b/examples/eu_vat_service.py @@ -1,4 +1,3 @@ -from __future__ import print_function import zeep diff --git a/examples/http_basic_auth.py b/examples/http_basic_auth.py index 0fe84dd48..c4ccd5b56 100644 --- a/examples/http_basic_auth.py +++ b/examples/http_basic_auth.py @@ -1,5 +1,3 @@ -from __future__ import print_function - from requests import Session from requests.auth import HTTPBasicAuth diff --git a/examples/km_to_miles.py b/examples/km_to_miles.py index d8821dfc5..b5c0ddb76 100644 --- a/examples/km_to_miles.py +++ b/examples/km_to_miles.py @@ -1,4 +1,3 @@ -from __future__ import print_function import zeep diff --git a/examples/soap_server.py b/examples/soap_server.py index 7ffdd51dd..4dfb9af06 100644 --- a/examples/soap_server.py +++ b/examples/soap_server.py @@ -21,7 +21,7 @@ class ExampleService(ServiceBase): @rpc(Unicode, _returns=Unicode) def slow_request(ctx, request_id): time.sleep(1) - return u'Request: %s' % request_id + return 'Request: %s' % request_id application = Application( services=[ExampleService], diff --git a/pyproject.toml b/pyproject.toml index 1cc85aba6..aad6ce2ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,64 @@ +[project] +name = "zeep" +version = "4.3.2" +description = "A Python SOAP client" +readme = "README.md" +license = { text = "MIT" } +authors = [ + { name = "Michael van Tellingen", email = "michaelvantellingen@gmail.com" } +] +requires-python = ">=3.8" +classifiers = [ + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dependencies = [ + "attrs>=17.2.0", + "isodate>=0.5.4", + "lxml>=4.6.0", + "platformdirs>=1.4.0", + "requests>=2.7.0", + "requests-toolbelt>=0.7.1", + "requests-file>=1.5.1", +] + +[project.urls] +Repository = "https://github.com/mvantellingen/python-zeep" +Documentation = "https://docs.python-zeep.org" +Changelog = "https://github.com/mvantellingen/python-zeep/blob/main/CHANGES" + +[project.optional-dependencies] +docs = ["sphinx>=1.4.0"] +test = [ + "coverage[toml]==7.6.2", + "freezegun==1.5.1", + "pretend==1.0.9", + "pytest-cov==5.0.0", + "pytest-httpx", + "pytest-asyncio", + "pytest==8.3.3", + "requests_mock==1.12.1", + # Linting + "isort==5.13.2", + "flake8==7.1.1", + "flake8-blind-except==0.2.1", + "flake8-debugger==4.1.2", + "flake8-imports==0.1.1", +] +async = [ + "httpx>=0.15.0", + "packaging", +] +xmlsec = ["xmlsec>=0.6.1"] +crypto = ["cryptography>=3.0"] + [build-system] requires = ["setuptools>=40.6.0", "wheel"] build-backend = "setuptools.build_meta" @@ -28,5 +89,9 @@ testpaths = [ ] markers = [ # mark a test to allow socket usage - "requests" + "requests", + "network: test case requires network connection", ] + +[tool.flake8] +max-line-length = 99 diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 61d908155..000000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[flake8] -max-line-length = 99 diff --git a/setup.py b/setup.py deleted file mode 100755 index 7984ecc85..000000000 --- a/setup.py +++ /dev/null @@ -1,89 +0,0 @@ -import re - -from setuptools import setup - -install_requires = [ - "attrs>=17.2.0", - "cached-property>=1.3.0; python_version<'3.8'", - "isodate>=0.5.4", - "lxml>=4.6.0", - "platformdirs>=1.4.0", - "requests>=2.7.0", - "requests-toolbelt>=0.7.1", - "requests-file>=1.5.1", - "pytz", -] - -docs_require = [ - "sphinx>=1.4.0", -] - -async_require = ["httpx>=0.15.0"] - -xmlsec_require = [ - "xmlsec>=0.6.1", -] - -tests_require = [ - "coverage[toml]==5.2.1", - "freezegun==0.3.15", - "pretend==1.0.9", - "pytest-cov==2.8.1", - "pytest-httpx", - "pytest-asyncio", - "pytest==6.2.5", - "requests_mock>=0.7.0", - # Linting - "isort==5.3.2", - "flake8==3.8.3", - "flake8-blind-except==0.1.1", - "flake8-debugger==3.2.1", - "flake8-imports==0.1.1", -] - - -with open("README.rst") as fh: - long_description = re.sub( - "^.. start-no-pypi.*^.. end-no-pypi", "", fh.read(), flags=re.M | re.S - ) - -setup( - name="zeep", - version="4.1.0", - description="A modern/fast Python SOAP client based on lxml / requests", - long_description=long_description, - author="Michael van Tellingen", - author_email="michaelvantellingen@gmail.com", - url="https://docs.python-zeep.org", - project_urls={ - "Source": "https://github.com/mvantellingen/python-zeep", - }, - python_requires=">=3.7", - install_requires=install_requires, - tests_require=tests_require, - extras_require={ - "docs": docs_require, - "test": tests_require, - "async": async_require, - "xmlsec": xmlsec_require, - }, - entry_points={}, - package_dir={"": "src"}, - packages=["zeep"], - include_package_data=True, - license="MIT", - classifiers=[ - "Development Status :: 5 - Production/Stable", - "License :: OSI Approved :: MIT License", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: Implementation :: CPython", - "Programming Language :: Python :: Implementation :: PyPy", - ], - zip_safe=False, -) diff --git a/src/zeep/__init__.py b/src/zeep/__init__.py index c420cb2be..222c5e524 100644 --- a/src/zeep/__init__.py +++ b/src/zeep/__init__.py @@ -4,7 +4,7 @@ from zeep.transports import AsyncTransport, Transport from zeep.xsd.valueobjects import AnyObject -__version__ = "4.1.0" +__version__ = "4.3.2" __all__ = [ "AsyncClient", "AsyncTransport", diff --git a/src/zeep/__main__.py b/src/zeep/__main__.py index 33a8ceb10..d06ee7d65 100644 --- a/src/zeep/__main__.py +++ b/src/zeep/__main__.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, print_function - import argparse import logging import logging.config diff --git a/src/zeep/cache.py b/src/zeep/cache.py index 4b7b90e76..796288e21 100644 --- a/src/zeep/cache.py +++ b/src/zeep/cache.py @@ -4,11 +4,10 @@ import logging import os import threading -import typing from contextlib import contextmanager +from typing import Dict, Tuple, Union import platformdirs -import pytz # The sqlite3 is not available on Google App Engine so we handle the # ImportError here and set the sqlite3 var to None. @@ -68,9 +67,7 @@ class InMemoryCache(Base): """Simple in-memory caching using dict lookup with support for timeouts""" #: global cache, thread-safe by default - _cache = ( - {} - ) # type: typing.Dict[str, typing.Tuple[datetime.datetime, typing.Union[bytes, str]]] + _cache: Dict[str, Tuple[datetime.datetime, Union[bytes, str]]] = {} def __init__(self, timeout=3600): self._timeout = timeout @@ -81,7 +78,7 @@ def add(self, url, content): raise TypeError( "a bytes-like object is required, not {}".format(type(content).__name__) ) - self._cache[url] = (datetime.datetime.utcnow(), content) + self._cache[url] = (datetime.datetime.now(datetime.timezone.utc), content) def get(self, url): try: @@ -130,6 +127,8 @@ def __init__(self, path=None, timeout=3600): @contextmanager def db_connection(self): + assert sqlite3 + with self._lock: connection = sqlite3.connect( self._db_path, detect_types=sqlite3.PARSE_DECLTYPES @@ -146,7 +145,7 @@ def add(self, url, content): cursor.execute("DELETE FROM request WHERE url = ?", (url,)) cursor.execute( "INSERT INTO request (created, url, content) VALUES (?, ?, ?)", - (datetime.datetime.utcnow(), url, data), + (datetime.datetime.now(datetime.timezone.utc), url, data), ) conn.commit() @@ -169,8 +168,8 @@ def _is_expired(value, timeout): if timeout is None: return False - now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) - max_age = value.replace(tzinfo=pytz.utc) + now = datetime.datetime.now(datetime.timezone.utc) + max_age = value.replace(tzinfo=datetime.timezone.utc) max_age += datetime.timedelta(seconds=timeout) return now > max_age diff --git a/src/zeep/exceptions.py b/src/zeep/exceptions.py index 632b3b088..f59d5d1d3 100644 --- a/src/zeep/exceptions.py +++ b/src/zeep/exceptions.py @@ -95,7 +95,7 @@ class IncompleteOperation(Error): class DTDForbidden(Error): def __init__(self, name, sysid, pubid): - super(DTDForbidden, self).__init__() + super().__init__() self.name = name self.sysid = sysid self.pubid = pubid @@ -107,7 +107,7 @@ def __str__(self): class EntitiesForbidden(Error): def __init__(self, name, content): - super(EntitiesForbidden, self).__init__() + super().__init__() self.name = name self.content = content diff --git a/src/zeep/settings.py b/src/zeep/settings.py index 2a1c329d0..3ee6b294c 100644 --- a/src/zeep/settings.py +++ b/src/zeep/settings.py @@ -76,6 +76,7 @@ def __call__(self, **options): setattr(self._tls, key, value) def __getattribute__(self, key): - if key != "_tls" and hasattr(self._tls, key): - return getattr(self._tls, key) - return super().__getattribute__(key) + _tls = object.__getattribute__(self, "_tls") + if key != "_tls" and hasattr(_tls, key): + return getattr(_tls, key) + return object.__getattribute__(self, key) diff --git a/src/zeep/transports.py b/src/zeep/transports.py index ffde356bd..99e28bb8e 100644 --- a/src/zeep/transports.py +++ b/src/zeep/transports.py @@ -1,6 +1,6 @@ import logging import os -from contextlib import contextmanager +from contextlib import closing, contextmanager from urllib.parse import urlparse import requests @@ -16,6 +16,16 @@ except ImportError: httpx = None +try: + from packaging.version import Version + + if httpx is None or Version(httpx.__version__) < Version("0.26.0"): + HTTPX_PROXY_KWARG_NAME = "proxies" + else: + HTTPX_PROXY_KWARG_NAME = "proxy" +except ImportError: + Version = None + HTTPX_PROXY_KWARG_NAME = None __all__ = ["AsyncTransport", "Transport"] @@ -37,7 +47,7 @@ def __init__(self, cache=None, timeout=300, operation_timeout=None, session=None self.operation_timeout = operation_timeout self.logger = logging.getLogger(__name__) - self.__close_session = not session + self._close_session = not session self.session = session or requests.Session() self.session.mount("file://", FileAdapter()) self.session.headers["User-Agent"] = "Zeep/%s (www.python-zeep.org)" % ( @@ -114,7 +124,6 @@ def load(self, url): scheme = urlparse(url).scheme if scheme in ("http", "https", "file"): - if self.cache: response = self.cache.get(url) if response: @@ -133,8 +142,9 @@ def load(self, url): def _load_remote_data(self, url): self.logger.debug("Loading remote data from: %s", url) response = self.session.get(url, timeout=self.load_timeout) - response.raise_for_status() - return response.content + with closing(response): + response.raise_for_status() + return response.content @contextmanager def settings(self, timeout=None): @@ -156,7 +166,7 @@ def settings(self, timeout=None): self.operation_timeout = old_timeout def __del__(self): - if self.__close_session: + if self._close_session: self.session.close() @@ -178,19 +188,24 @@ def __init__( verify_ssl=True, proxy=None, ): - if httpx is None: - raise RuntimeError("The AsyncTransport is based on the httpx module") + if httpx is None or HTTPX_PROXY_KWARG_NAME is None: + raise RuntimeError( + "To use AsyncTransport, install zeep with the async extras, " + "e.g., `pip install zeep[async]`" + ) + self._close_session = False self.cache = cache + proxy_kwargs = {HTTPX_PROXY_KWARG_NAME: proxy} self.wsdl_client = wsdl_client or httpx.Client( verify=verify_ssl, - proxies=proxy, timeout=timeout, + **proxy_kwargs, ) self.client = client or httpx.AsyncClient( verify=verify_ssl, - proxies=proxy, timeout=operation_timeout, + **proxy_kwargs, ) self.logger = logging.getLogger(__name__) @@ -210,7 +225,7 @@ def _load_remote_data(self, url): try: response.raise_for_status() - except httpx.HTTPStatusError as exc: + except httpx.HTTPStatusError: raise TransportError(status_code=response.status_code) return result diff --git a/src/zeep/utils.py b/src/zeep/utils.py index 2118253eb..c011b127a 100644 --- a/src/zeep/utils.py +++ b/src/zeep/utils.py @@ -1,6 +1,6 @@ -import cgi import inspect import typing +from email.message import Message from lxml import etree @@ -90,5 +90,7 @@ def detect_soap_env(envelope): def get_media_type(value): """Parse a HTTP content-type header and return the media-type""" - main_value, parameters = cgi.parse_header(value) - return main_value.lower() + msg = Message() + msg["content-type"] = value + + return msg.get_content_type() diff --git a/src/zeep/wsdl/__init__.py b/src/zeep/wsdl/__init__.py index 8b7d052c2..a12d735c8 100644 --- a/src/zeep/wsdl/__init__.py +++ b/src/zeep/wsdl/__init__.py @@ -1,18 +1,19 @@ """ - zeep.wsdl - --------- +zeep.wsdl +--------- - The wsdl module is responsible for parsing the WSDL document. This includes - the bindings and messages. +The wsdl module is responsible for parsing the WSDL document. This includes +the bindings and messages. - The structure and naming of the modules and classses closely follows the - WSDL 1.1 specification. +The structure and naming of the modules and classses closely follows the +WSDL 1.1 specification. - The serialization and deserialization of the SOAP/HTTP messages is done - by the zeep.wsdl.messages modules. +The serialization and deserialization of the SOAP/HTTP messages is done +by the zeep.wsdl.messages modules. """ + from zeep.wsdl.wsdl import Document __all__ = ["Document"] diff --git a/src/zeep/wsdl/attachments.py b/src/zeep/wsdl/attachments.py index 3e91df569..44e09900c 100644 --- a/src/zeep/wsdl/attachments.py +++ b/src/zeep/wsdl/attachments.py @@ -5,12 +5,7 @@ """ import base64 -import sys - -if sys.version_info >= (3, 8): - from functools import cached_property -else: - from cached_property import cached_property +from functools import cached_property from requests.structures import CaseInsensitiveDict diff --git a/src/zeep/wsdl/definitions.py b/src/zeep/wsdl/definitions.py index deb33ea99..47b9d2e70 100644 --- a/src/zeep/wsdl/definitions.py +++ b/src/zeep/wsdl/definitions.py @@ -1,20 +1,21 @@ """ - zeep.wsdl.definitions - ~~~~~~~~~~~~~~~~~~~~~ +zeep.wsdl.definitions +~~~~~~~~~~~~~~~~~~~~~ - A WSDL document exists out of a number of definitions. There are 6 major - definitions, these are: +A WSDL document exists out of a number of definitions. There are 6 major +definitions, these are: - - types - - message - - portType - - binding - - port - - service + - types + - message + - portType + - binding + - port + - service - This module defines the definitions which occur within a WSDL document, +This module defines the definitions which occur within a WSDL document, """ + import typing import warnings from collections import OrderedDict, namedtuple diff --git a/src/zeep/wsdl/messages/__init__.py b/src/zeep/wsdl/messages/__init__.py index f77f710cf..0272eb732 100644 --- a/src/zeep/wsdl/messages/__init__.py +++ b/src/zeep/wsdl/messages/__init__.py @@ -1,20 +1,21 @@ """ - zeep.wsdl.messages - ~~~~~~~~~~~~~~~~~~ +zeep.wsdl.messages +~~~~~~~~~~~~~~~~~~ - The messages are responsible for serializing and deserializing +The messages are responsible for serializing and deserializing - .. inheritance-diagram:: - zeep.wsdl.messages.soap.DocumentMessage - zeep.wsdl.messages.soap.RpcMessage - zeep.wsdl.messages.http.UrlEncoded - zeep.wsdl.messages.http.UrlReplacement - zeep.wsdl.messages.mime.MimeContent - zeep.wsdl.messages.mime.MimeXML - zeep.wsdl.messages.mime.MimeMultipart - :parts: 1 +.. inheritance-diagram:: + zeep.wsdl.messages.soap.DocumentMessage + zeep.wsdl.messages.soap.RpcMessage + zeep.wsdl.messages.http.UrlEncoded + zeep.wsdl.messages.http.UrlReplacement + zeep.wsdl.messages.mime.MimeContent + zeep.wsdl.messages.mime.MimeXML + zeep.wsdl.messages.mime.MimeMultipart + :parts: 1 """ + from .http import * # noqa from .mime import * # noqa from .soap import * # noqa diff --git a/src/zeep/wsdl/messages/base.py b/src/zeep/wsdl/messages/base.py index f45c365a5..1fe5a3f96 100644 --- a/src/zeep/wsdl/messages/base.py +++ b/src/zeep/wsdl/messages/base.py @@ -1,8 +1,9 @@ """ - zeep.wsdl.messages.base - ~~~~~~~~~~~~~~~~~~~~~~~ +zeep.wsdl.messages.base +~~~~~~~~~~~~~~~~~~~~~~~ """ + import typing from collections import namedtuple diff --git a/src/zeep/wsdl/messages/http.py b/src/zeep/wsdl/messages/http.py index 38ca00e5c..f2bc7cf14 100644 --- a/src/zeep/wsdl/messages/http.py +++ b/src/zeep/wsdl/messages/http.py @@ -1,8 +1,9 @@ """ - zeep.wsdl.messages.http - ~~~~~~~~~~~~~~~~~~~~~~~ +zeep.wsdl.messages.http +~~~~~~~~~~~~~~~~~~~~~~~ """ + from zeep import xsd from zeep.wsdl.messages.base import ConcreteMessage, SerializedMessage diff --git a/src/zeep/wsdl/messages/mime.py b/src/zeep/wsdl/messages/mime.py index c37faec20..0973dfa15 100644 --- a/src/zeep/wsdl/messages/mime.py +++ b/src/zeep/wsdl/messages/mime.py @@ -1,8 +1,9 @@ """ - zeep.wsdl.messages.mime - ~~~~~~~~~~~~~~~~~~~~~~~ +zeep.wsdl.messages.mime +~~~~~~~~~~~~~~~~~~~~~~~ """ + from urllib.parse import urlencode from lxml import etree diff --git a/src/zeep/wsdl/messages/soap.py b/src/zeep/wsdl/messages/soap.py index 6685a6fa0..cedbe3f0b 100644 --- a/src/zeep/wsdl/messages/soap.py +++ b/src/zeep/wsdl/messages/soap.py @@ -1,8 +1,9 @@ """ - zeep.wsdl.messages.soap - ~~~~~~~~~~~~~~~~~~~~~~~ +zeep.wsdl.messages.soap +~~~~~~~~~~~~~~~~~~~~~~~ """ + import copy import typing from collections import OrderedDict @@ -85,9 +86,11 @@ def serialize(self, *args, **kwargs): # Soap11Binding._set_http_headers(). But let's keep it like this for # now. headers = { - "SOAPAction": '"%s"' % self.operation.soapaction - if self.operation.soapaction - else '""' + "SOAPAction": ( + '"%s"' % self.operation.soapaction + if self.operation.soapaction + else '""' + ) } return SerializedMessage(path=None, headers=headers, content=envelope) @@ -116,6 +119,10 @@ def deserialize(self, envelope): return result result = result.body + if not hasattr( + result, "__len__" + ): # Return body directly if len is allowed (could indicated valid primitive type). + return result if result is None or len(result) == 0: return None elif len(result) > 1: diff --git a/src/zeep/wsdl/parse.py b/src/zeep/wsdl/parse.py index 76e976925..910647968 100644 --- a/src/zeep/wsdl/parse.py +++ b/src/zeep/wsdl/parse.py @@ -1,8 +1,9 @@ """ - zeep.wsdl.parse - ~~~~~~~~~~~~~~~ +zeep.wsdl.parse +~~~~~~~~~~~~~~~ """ + import typing from lxml import etree diff --git a/src/zeep/wsdl/utils.py b/src/zeep/wsdl/utils.py index af923db84..63965b14e 100644 --- a/src/zeep/wsdl/utils.py +++ b/src/zeep/wsdl/utils.py @@ -1,8 +1,9 @@ """ - zeep.wsdl.utils - ~~~~~~~~~~~~~~~ +zeep.wsdl.utils +~~~~~~~~~~~~~~~ """ + from urllib.parse import urlparse, urlunparse from lxml import etree diff --git a/src/zeep/wsdl/wsdl.py b/src/zeep/wsdl/wsdl.py index f842fa439..ff279d3d4 100644 --- a/src/zeep/wsdl/wsdl.py +++ b/src/zeep/wsdl/wsdl.py @@ -1,9 +1,8 @@ """ - zeep.wsdl.wsdl - ~~~~~~~~~~~~~~ +zeep.wsdl.wsdl +~~~~~~~~~~~~~~ """ -from __future__ import print_function import logging import operator @@ -15,12 +14,7 @@ from lxml import etree from zeep.exceptions import IncompleteMessage -from zeep.loader import ( - absolute_location, - is_relative_path, - load_external, - load_external_async, -) +from zeep.loader import absolute_location, is_relative_path, load_external from zeep.settings import Settings from zeep.utils import findall_multiple_ns from zeep.wsdl import parse diff --git a/src/zeep/wsse/__init__.py b/src/zeep/wsse/__init__.py index 5df2c78d2..54bbe41f4 100644 --- a/src/zeep/wsse/__init__.py +++ b/src/zeep/wsse/__init__.py @@ -2,10 +2,31 @@ from .signature import BinarySignature, MemorySignature, Signature from .username import UsernameToken +try: + from .crypto import ( + CryptoBinaryMemorySignature, + CryptoBinarySignature, + CryptoMemorySignature, + CryptoSignature, + PKCS12Signature, + ) +except ImportError: + # cryptography not installed — pure-python signing unavailable + CryptoBinaryMemorySignature = None + CryptoBinarySignature = None + CryptoMemorySignature = None + CryptoSignature = None + PKCS12Signature = None + __all__ = [ "Compose", "BinarySignature", "MemorySignature", "Signature", "UsernameToken", + "CryptoBinaryMemorySignature", + "CryptoBinarySignature", + "CryptoMemorySignature", + "CryptoSignature", + "PKCS12Signature", ] diff --git a/src/zeep/wsse/crypto.py b/src/zeep/wsse/crypto.py new file mode 100644 index 000000000..c5fc97962 --- /dev/null +++ b/src/zeep/wsse/crypto.py @@ -0,0 +1,964 @@ +"""Pure-Python WS-Security (WSSE) signature creation and verification. + +This module provides the same functionality as ``zeep.wsse.signature`` but uses +the ``cryptography`` library instead of the C-based ``xmlsec`` library. This +makes installation straightforward on all platforms—no system-level C libraries +required. + +Key improvements over the xmlsec-based module: + +* **No C dependencies** — only ``cryptography`` and ``lxml`` (both are + pure-Python wheels on all major platforms). +* **PKCS#12 key support** — load keys from ``.p12`` / ``.pfx`` files directly. +* **Configurable signed parts** — sign Body, Timestamp, UsernameToken, + BinarySecurityToken, or any element with a ``wsu:Id``. +* **Inclusive namespace prefixes** — per-reference control over exclusive C14N + ``InclusiveNamespaces/PrefixList``, required by some government SOAP services. +* **Mixed algorithms** — e.g. SHA-256 digests with RSA-SHA1 signature, a + common requirement of older WS-Security profiles. + +Usage:: + + from zeep.wsse.crypto import CryptoSignature, CryptoBinarySignature + + # PEM files — drop-in replacement for wsse.Signature / wsse.BinarySignature + sig = CryptoSignature("key.pem", "cert.pem") + sig = CryptoBinarySignature("key.pem", "cert.pem") + + # PKCS#12 — pass raw bytes or a file path + sig = CryptoBinarySignature.from_pkcs12("cert.p12", b"password") + + # Sign extra elements and control C14N prefixes + sig = CryptoBinarySignature( + "key.pem", "cert.pem", + sign_username_token=True, + sign_binary_security_token=True, + inclusive_ns_prefixes={"Body": ["wsse", "ds"]}, + ) + +""" + +import base64 +import hashlib +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple, Union + +from lxml import etree +from lxml.etree import QName + +from zeep import ns +from zeep.exceptions import SignatureVerificationFailed +from zeep.utils import detect_soap_env +from zeep.wsse.utils import ensure_id, get_security_header + +try: + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding, rsa, utils + from cryptography.hazmat.primitives.serialization.pkcs12 import ( + load_key_and_certificates, + ) + from cryptography.x509.oid import ExtensionOID + from cryptography.x509 import load_der_x509_certificate, load_pem_x509_certificate +except ImportError: + hashes = None # type: ignore[assignment] + +# --------------------------------------------------------------------------- +# Algorithm URI constants +# --------------------------------------------------------------------------- + +# Signature algorithms +SIG_RSA_SHA1 = "http://www.w3.org/2000/09/xmldsig#rsa-sha1" +SIG_RSA_SHA256 = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256" +SIG_RSA_SHA384 = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha384" +SIG_RSA_SHA512 = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha512" + +# Digest algorithms +DIGEST_SHA1 = "http://www.w3.org/2000/09/xmldsig#sha1" +DIGEST_SHA256 = "http://www.w3.org/2001/04/xmlenc#sha256" +DIGEST_SHA384 = "http://www.w3.org/2001/04/xmldsig-more#sha384" +DIGEST_SHA512 = "http://www.w3.org/2001/04/xmlenc#sha512" + +# Canonicalization +C14N_EXCL = "http://www.w3.org/2001/10/xml-exc-c14n#" +KEY_IDENTIFIER_SKI = "ski" +KEY_IDENTIFIER_THUMBPRINT = "thumbprint" +KEY_INFO_X509 = "x509" +KEY_INFO_BINARY_REF = "binary-ref" + +# Mapping from URI → cryptography hash class +_SIG_HASH_MAP = { + SIG_RSA_SHA1: hashes.SHA1 if hashes else None, + SIG_RSA_SHA256: hashes.SHA256 if hashes else None, + SIG_RSA_SHA384: hashes.SHA384 if hashes else None, + SIG_RSA_SHA512: hashes.SHA512 if hashes else None, +} + +_DIGEST_HASH_MAP = { + DIGEST_SHA1: hashlib.sha1, + DIGEST_SHA256: hashlib.sha256, + DIGEST_SHA384: hashlib.sha384, + DIGEST_SHA512: hashlib.sha512, +} + + +def _check_crypto_import(): + if hashes is None: + raise ImportError( + "The cryptography module is required for CryptoSignature().\nInstall it with: pip install cryptography\n" + ) + + +def _read_file(f_name): + with open(f_name, "rb") as f: + return f.read() + + +# --------------------------------------------------------------------------- +# Key / certificate loading +# --------------------------------------------------------------------------- + + +def _load_pem_private_key(key_data: bytes, password: Optional[bytes] = None): + """Load an RSA private key from PEM-encoded data.""" + return serialization.load_pem_private_key(key_data, password=password) + + +def _load_pem_certificate(cert_data: bytes): + """Load an X.509 certificate from PEM-encoded data.""" + return load_pem_x509_certificate(cert_data) + + +def _load_pkcs12(p12_data: bytes, password: Optional[bytes] = None) -> Tuple[Any, Any, Any]: + """Load private key, certificate, and additional certs from PKCS#12 data. + + Returns ``(private_key, certificate, additional_certs)``. + """ + return load_key_and_certificates(p12_data, password) + + +def _cert_der_bytes(certificate) -> bytes: + """Return the DER-encoded bytes of an X.509 certificate object.""" + return certificate.public_bytes(serialization.Encoding.DER) + + +def _cert_base64(certificate) -> str: + """Return the base64-encoded DER representation of the certificate.""" + return base64.b64encode(_cert_der_bytes(certificate)).decode("ascii") + + +def _cert_thumbprint_sha1_base64(certificate) -> str: + """Return base64(SHA1(DER(cert))).""" + thumbprint = hashlib.sha1(_cert_der_bytes(certificate)).digest() + return base64.b64encode(thumbprint).decode("ascii") + + +def _cert_subject_key_identifier_base64(certificate) -> str: + """Return base64(subjectKeyIdentifier) from cert extension.""" + try: + ski_ext = certificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER) + except Exception: + raise ValueError("Certificate does not contain SubjectKeyIdentifier extension") + return base64.b64encode(ski_ext.value.digest).decode("ascii") + + +# --------------------------------------------------------------------------- +# XML canonicalization and digest helpers +# --------------------------------------------------------------------------- + + +def _c14n(element: etree._Element, inclusive_prefixes: Optional[List[str]] = None) -> bytes: + """Exclusive C14N of *element* with optional inclusive namespace prefixes.""" + return etree.tostring( + element, + method="c14n", + exclusive=True, + inclusive_ns_prefixes=inclusive_prefixes, + with_comments=False, + ) + + +def _compute_digest( + element: etree._Element, + digest_uri: str = DIGEST_SHA256, + inclusive_prefixes: Optional[List[str]] = None, +) -> str: + """Canonicalize *element* and return the base64-encoded digest.""" + c14n_bytes = _c14n(element, inclusive_prefixes) + hash_fn = _DIGEST_HASH_MAP.get(digest_uri) + if hash_fn is None: + raise ValueError(f"Unsupported digest algorithm: {digest_uri}") + digest = hash_fn(c14n_bytes).digest() + return base64.b64encode(digest).decode("ascii") + + +def _sign_bytes(private_key, data: bytes, signature_uri: str = SIG_RSA_SHA1) -> bytes: + """Sign *data* using *private_key* and the algorithm identified by *signature_uri*.""" + hash_cls = _SIG_HASH_MAP.get(signature_uri) + if hash_cls is None: + raise ValueError(f"Unsupported signature algorithm: {signature_uri}") + return private_key.sign(data, padding.PKCS1v15(), hash_cls()) + + +def _verify_bytes(public_key, signature_bytes: bytes, data: bytes, signature_uri: str): + """Verify *signature_bytes* over *data*.""" + hash_cls = _SIG_HASH_MAP.get(signature_uri) + if hash_cls is None: + raise ValueError(f"Unsupported signature algorithm: {signature_uri}") + public_key.verify(signature_bytes, data, padding.PKCS1v15(), hash_cls()) + + +# --------------------------------------------------------------------------- +# DS namespace helpers +# --------------------------------------------------------------------------- + +DS_NS = ns.DS + + +def _ds(tag): + return QName(DS_NS, tag) + + +def _ec(tag): + return QName(C14N_EXCL, tag) + + +def _wsse(tag): + return QName(ns.WSSE, tag) + + +def _wsu(tag): + return QName(ns.WSU, tag) + + +# --------------------------------------------------------------------------- +# Signature XML construction +# --------------------------------------------------------------------------- + + +def _build_reference( + parent: etree._Element, + uri: str, + digest_value: str, + digest_uri: str, + inclusive_prefixes: Optional[List[str]] = None, +): + """Append a ``ds:Reference`` element to *parent*.""" + ref = etree.SubElement(parent, _ds("Reference"), URI=uri) + transforms = etree.SubElement(ref, _ds("Transforms")) + transform = etree.SubElement(transforms, _ds("Transform"), Algorithm=C14N_EXCL) + if inclusive_prefixes is not None: + etree.SubElement( + transform, + _ec("InclusiveNamespaces"), + PrefixList=" ".join(inclusive_prefixes), + ) + etree.SubElement(ref, _ds("DigestMethod"), Algorithm=digest_uri) + dv = etree.SubElement(ref, _ds("DigestValue")) + dv.text = digest_value + return ref + + +def _sign_envelope( + envelope: etree._Element, + private_key, + certificate, + signature_method: str = SIG_RSA_SHA1, + digest_method: str = DIGEST_SHA1, + sign_timestamp: bool = True, + sign_username_token: bool = False, + sign_binary_security_token: bool = False, + extra_references: Optional[List[etree._Element]] = None, + inclusive_ns_prefixes: Optional[Dict[str, List[str]]] = None, + c14n_inclusive_prefixes: Optional[List[str]] = None, +) -> etree._Element: + """Build a ``ds:Signature``, sign the envelope, and insert into the + ``wsse:Security`` header. + + Parameters + ---------- + envelope : lxml Element + The SOAP envelope to sign. + private_key : + An RSA private key from the ``cryptography`` library. + certificate : + An X.509 certificate from the ``cryptography`` library. + signature_method : str + URI of the signature algorithm (default RSA-SHA1). + digest_method : str + URI of the digest algorithm (default SHA1). + sign_timestamp : bool + Whether to sign the ``wsu:Timestamp`` if present. + sign_username_token : bool + Whether to sign the ``wsse:UsernameToken`` if present. + sign_binary_security_token : bool + Whether to sign the ``wsse:BinarySecurityToken`` if present. + extra_references : list, optional + Additional elements to sign (must already have ``wsu:Id``). + inclusive_ns_prefixes : dict, optional + Mapping of element local-name → list of inclusive namespace prefixes + for exclusive C14N. E.g. ``{"Body": ["wsse", "ds"]}``. + c14n_inclusive_prefixes : list, optional + Default inclusive prefixes for the ``CanonicalizationMethod`` of + ``SignedInfo`` itself. + + Returns + ------- + The ``ds:Signature`` element that was inserted. + """ + soap_env = detect_soap_env(envelope) + security = get_security_header(envelope) + body = envelope.find(QName(soap_env, "Body")) + + inc = inclusive_ns_prefixes or {} + + # ---- collect elements to sign ---- + targets: List[Tuple[etree._Element, Optional[List[str]]]] = [] + + if body is not None: + targets.append((body, inc.get("Body"))) + + if sign_timestamp: + ts = security.find(_wsu("Timestamp")) + if ts is not None: + targets.append((ts, inc.get("Timestamp"))) + + if sign_username_token: + ut = security.find(_wsse("UsernameToken")) + if ut is not None: + targets.append((ut, inc.get("UsernameToken"))) + + if sign_binary_security_token: + bst = security.find(_wsse("BinarySecurityToken")) + if bst is not None: + targets.append((bst, inc.get("BinarySecurityToken"))) + + for extra in extra_references or []: + local = QName(extra.tag).localname + targets.append((extra, inc.get(local))) + + # ---- build ds:Signature skeleton ---- + sig_el = etree.SubElement(security, _ds("Signature")) + signed_info = etree.SubElement(sig_el, _ds("SignedInfo")) + + # CanonicalizationMethod + c14n_method = etree.SubElement(signed_info, _ds("CanonicalizationMethod"), Algorithm=C14N_EXCL) + if c14n_inclusive_prefixes: + etree.SubElement( + c14n_method, + _ec("InclusiveNamespaces"), + PrefixList=" ".join(c14n_inclusive_prefixes), + ) + + # SignatureMethod + etree.SubElement(signed_info, _ds("SignatureMethod"), Algorithm=signature_method) + + # ---- add references ---- + for target, prefixes in targets: + node_id = ensure_id(target) + digest_value = _compute_digest(target, digest_method, prefixes) + _build_reference( + signed_info, + "#" + node_id, + digest_value, + digest_method, + prefixes, + ) + + # ---- compute signature ---- + signed_info_c14n = _c14n(signed_info, c14n_inclusive_prefixes) + raw_signature = _sign_bytes(private_key, signed_info_c14n, signature_method) + + sig_value = etree.SubElement(sig_el, _ds("SignatureValue")) + sig_value.text = base64.b64encode(raw_signature).decode("ascii") + + return sig_el + + +def _add_key_info_x509(sig_el, certificate): + """Add ``ds:KeyInfo`` with ``X509Data`` (issuer-serial + certificate).""" + key_info = etree.SubElement(sig_el, _ds("KeyInfo")) + sec_token_ref = etree.SubElement(key_info, _wsse("SecurityTokenReference")) + x509_data = etree.SubElement(sec_token_ref, _ds("X509Data")) + + x509_issuer_serial = etree.SubElement(x509_data, _ds("X509IssuerSerial")) + issuer_name = etree.SubElement(x509_issuer_serial, _ds("X509IssuerName")) + issuer_name.text = certificate.issuer.rfc4514_string() + serial_number = etree.SubElement(x509_issuer_serial, _ds("X509SerialNumber")) + serial_number.text = str(certificate.serial_number) + + x509_cert = etree.SubElement(x509_data, _ds("X509Certificate")) + x509_cert.text = _cert_base64(certificate) + + return key_info + + +def _add_key_info_binary_ref(sig_el, bintok_id): + """Add ``ds:KeyInfo`` referencing a ``BinarySecurityToken`` by ``wsu:Id``.""" + key_info = etree.SubElement(sig_el, _ds("KeyInfo")) + sec_token_ref = etree.SubElement(key_info, _wsse("SecurityTokenReference")) + etree.SubElement( + sec_token_ref, + _wsse("Reference"), + URI="#" + bintok_id, + ValueType=("http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"), + ) + return key_info + + +def _add_key_info_key_identifier(sig_el, certificate, identifier_type: str): + """Add wsse:KeyIdentifier in SecurityTokenReference.""" + key_info = etree.SubElement(sig_el, _ds("KeyInfo")) + sec_token_ref = etree.SubElement(key_info, _wsse("SecurityTokenReference")) + + if identifier_type == KEY_IDENTIFIER_THUMBPRINT: + value_type = ( + "http://docs.oasis-open.org/wss/oasis-wss-soap-message-security-1.1#ThumbprintSHA1" + ) + value = _cert_thumbprint_sha1_base64(certificate) + elif identifier_type == KEY_IDENTIFIER_SKI: + value_type = ( + "http://docs.oasis-open.org/wss/2004/01/" + "oasis-200401-wss-x509-token-profile-1.0#X509SubjectKeyIdentifier" + ) + value = _cert_subject_key_identifier_base64(certificate) + else: + raise ValueError(f"Unsupported key identifier type: {identifier_type}") + + key_id = etree.SubElement( + sec_token_ref, + _wsse("KeyIdentifier"), + ValueType=value_type, + EncodingType=( + "http://docs.oasis-open.org/wss/2004/01/" + "oasis-200401-wss-soap-message-security-1.0#Base64Binary" + ), + ) + key_id.text = value + return key_info + + +def _add_binary_security_token(security, certificate): + """Insert a ``BinarySecurityToken`` into the security header and return it.""" + bintok = etree.Element( + _wsse("BinarySecurityToken"), + { + "ValueType": ("http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"), + "EncodingType": ( + "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary" + ), + }, + ) + ensure_id(bintok) + bintok.text = _cert_base64(certificate) + # Insert at position 0 (before Signature, Timestamp, etc.) + security.insert(0, bintok) + return bintok + + +def _reorder_security_children(security: etree._Element, layout: str): + """Reorder wsse:Security direct children for interop-sensitive layouts.""" + if layout == "append": + return + + layout_map = { + "xmlsec_compatible": ["Signature", "BinarySecurityToken", "Timestamp"], + "signature_first": ["Signature", "Timestamp", "BinarySecurityToken"], + "timestamp_first": ["Timestamp", "BinarySecurityToken", "Signature"], + "binary_first": ["BinarySecurityToken", "Timestamp", "Signature"], + } + priority = layout_map.get(layout) + if priority is None: + raise ValueError(f"Unsupported security_header_layout: {layout}") + + indexed = list(enumerate(list(security))) + ranked = sorted( + indexed, + key=lambda item: ( + priority.index(QName(item[1].tag).localname) + if QName(item[1].tag).localname in priority + else len(priority), + item[0], + ), + ) + + for child in list(security): + security.remove(child) + for _, child in ranked: + security.append(child) + + +# --------------------------------------------------------------------------- +# Verification +# --------------------------------------------------------------------------- + + +def _verify_envelope(envelope, certificate): + """Verify a signed SOAP envelope using the given certificate.""" + soap_env = detect_soap_env(envelope) + header = envelope.find(QName(soap_env, "Header")) + if header is None: + raise SignatureVerificationFailed("No SOAP Header found") + + security = header.find(QName(ns.WSSE, "Security")) + if security is None: + raise SignatureVerificationFailed("No wsse:Security header found") + + sig_el = security.find(QName(ns.DS, "Signature")) + if sig_el is None: + raise SignatureVerificationFailed("No ds:Signature found in Security header") + + signed_info = sig_el.find(QName(ns.DS, "SignedInfo")) + sig_value_el = sig_el.find(QName(ns.DS, "SignatureValue")) + if signed_info is None or sig_value_el is None: + raise SignatureVerificationFailed("Malformed Signature element") + + # Determine signature algorithm + sig_method_el = signed_info.find(QName(ns.DS, "SignatureMethod")) + signature_uri = sig_method_el.get("Algorithm") if sig_method_el is not None else SIG_RSA_SHA1 + + # Get inclusive prefixes for SignedInfo canonicalization (if any) + c14n_method = signed_info.find(QName(ns.DS, "CanonicalizationMethod")) + c14n_prefixes = None + if c14n_method is not None: + inc_ns = c14n_method.find(_ec("InclusiveNamespaces")) + if inc_ns is not None: + prefix_list = inc_ns.get("PrefixList", "") + c14n_prefixes = prefix_list.split() if prefix_list.strip() else None + + # Recompute SignedInfo canonical form + signed_info_c14n = _c14n(signed_info, c14n_prefixes) + + # Decode signature value + raw_signature = base64.b64decode(sig_value_el.text) + + # Verify the signature over SignedInfo + public_key = certificate.public_key() + try: + _verify_bytes(public_key, raw_signature, signed_info_c14n, signature_uri) + except Exception: + raise SignatureVerificationFailed("Signature value verification failed") + + # Verify each reference digest + refs = signed_info.findall(QName(ns.DS, "Reference")) + for ref in refs: + uri = ref.get("URI", "") + if not uri.startswith("#"): + continue + + ref_id = uri[1:] + referenced = envelope.xpath("//*[@wsu:Id='%s']" % ref_id, namespaces={"wsu": ns.WSU}) + if not referenced: + raise SignatureVerificationFailed(f"Referenced element not found: {ref_id}") + + element = referenced[0] + + # Get digest algorithm + digest_method_el = ref.find(QName(ns.DS, "DigestMethod")) + digest_uri = digest_method_el.get("Algorithm") if digest_method_el is not None else DIGEST_SHA1 + + # Get inclusive prefixes from the transform + inc_prefixes = None + transforms = ref.find(QName(ns.DS, "Transforms")) + if transforms is not None: + for transform in transforms.findall(QName(ns.DS, "Transform")): + inc_ns = transform.find(_ec("InclusiveNamespaces")) + if inc_ns is not None: + prefix_list = inc_ns.get("PrefixList", "") + inc_prefixes = prefix_list.split() if prefix_list.strip() else None + + # Compute digest and compare + computed_digest = _compute_digest(element, digest_uri, inc_prefixes) + expected_digest_el = ref.find(QName(ns.DS, "DigestValue")) + expected_digest = expected_digest_el.text if expected_digest_el is not None else "" + + if computed_digest != expected_digest: + raise SignatureVerificationFailed(f"Digest mismatch for element {ref_id}") + + +def _parse_xs_datetime(value: str) -> datetime: + value = value.strip() + if value.endswith("Z"): + value = value[:-1] + "+00:00" + parsed = datetime.fromisoformat(value) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + +def _validate_timestamp_policy( + envelope: etree._Element, + now: Optional[datetime] = None, + clock_skew_seconds: int = 0, +): + """Validate Timestamp freshness semantics (Created/Expires).""" + soap_env = detect_soap_env(envelope) + header = envelope.find(QName(soap_env, "Header")) + if header is None: + raise SignatureVerificationFailed("No SOAP Header found") + security = header.find(QName(ns.WSSE, "Security")) + if security is None: + raise SignatureVerificationFailed("No wsse:Security header found") + timestamp = security.find(QName(ns.WSU, "Timestamp")) + if timestamp is None: + raise SignatureVerificationFailed("No wsu:Timestamp found") + + created_el = timestamp.find(QName(ns.WSU, "Created")) + expires_el = timestamp.find(QName(ns.WSU, "Expires")) + if created_el is None or created_el.text is None: + raise SignatureVerificationFailed("Timestamp missing wsu:Created") + if expires_el is None or expires_el.text is None: + raise SignatureVerificationFailed("Timestamp missing wsu:Expires") + + created = _parse_xs_datetime(created_el.text) + expires = _parse_xs_datetime(expires_el.text) + if expires < created: + raise SignatureVerificationFailed("Timestamp has Expires earlier than Created") + + now = now or datetime.now(timezone.utc) + skew = abs(int(clock_skew_seconds)) + if created.timestamp() - skew > now.timestamp(): + raise SignatureVerificationFailed("Timestamp Created is in the future") + if expires.timestamp() + skew < now.timestamp(): + raise SignatureVerificationFailed("Timestamp has expired") + + +def _validate_certificate_time(certificate, now: Optional[datetime] = None): + """Validate certificate validity period against current time.""" + now = now or datetime.now(timezone.utc) + not_before = getattr(certificate, "not_valid_before_utc", None) + not_after = getattr(certificate, "not_valid_after_utc", None) + if not_before is None: + not_before = certificate.not_valid_before.replace(tzinfo=timezone.utc) + if not_after is None: + not_after = certificate.not_valid_after.replace(tzinfo=timezone.utc) + + if now < not_before: + raise SignatureVerificationFailed("Certificate is not yet valid") + if now > not_after: + raise SignatureVerificationFailed("Certificate has expired") + + +# --------------------------------------------------------------------------- +# Public API — classes +# --------------------------------------------------------------------------- + + +class CryptoMemorySignature: + """Sign a SOAP envelope using in-memory PEM key and certificate data. + + This is the pure-Python equivalent of :class:`zeep.wsse.signature.MemorySignature`. + It uses the ``cryptography`` library instead of ``xmlsec``. + + Parameters + ---------- + key_data : bytes + PEM-encoded private key. + cert_data : bytes + PEM-encoded X.509 certificate. + password : bytes or str, optional + Password for the private key. + signature_method : str, optional + Signature algorithm URI (default: RSA-SHA1). + digest_method : str, optional + Digest algorithm URI (default: SHA1). + sign_timestamp : bool + Sign the ``wsu:Timestamp`` element if present (default True). + sign_username_token : bool + Sign the ``wsse:UsernameToken`` element if present (default False). + sign_binary_security_token : bool + Sign the ``wsse:BinarySecurityToken`` if present (default False). + inclusive_ns_prefixes : dict, optional + Element-name → prefix-list mapping for exclusive C14N. + c14n_inclusive_prefixes : list, optional + Inclusive prefixes for the ``CanonicalizationMethod`` of ``SignedInfo``. + """ + + def __init__( + self, + key_data: bytes, + cert_data: bytes, + password: Optional[Union[bytes, str]] = None, + signature_method: str = SIG_RSA_SHA1, + digest_method: str = DIGEST_SHA1, + sign_timestamp: bool = True, + sign_username_token: bool = False, + sign_binary_security_token: bool = False, + inclusive_ns_prefixes: Optional[Dict[str, List[str]]] = None, + c14n_inclusive_prefixes: Optional[List[str]] = None, + key_info_style: str = KEY_INFO_X509, + security_header_layout: str = "append", + ): + _check_crypto_import() + + if isinstance(password, str): + password = password.encode("utf-8") + + private_key = _load_pem_private_key(key_data, password) + certificate = _load_pem_certificate(cert_data) + self._configure( + private_key, + certificate, + signature_method=signature_method, + digest_method=digest_method, + sign_timestamp=sign_timestamp, + sign_username_token=sign_username_token, + sign_binary_security_token=sign_binary_security_token, + inclusive_ns_prefixes=inclusive_ns_prefixes, + c14n_inclusive_prefixes=c14n_inclusive_prefixes, + key_info_style=key_info_style, + security_header_layout=security_header_layout, + ) + + def _configure( + self, + private_key, + certificate, + signature_method: str = SIG_RSA_SHA1, + digest_method: str = DIGEST_SHA1, + sign_timestamp: bool = True, + sign_username_token: bool = False, + sign_binary_security_token: bool = False, + inclusive_ns_prefixes: Optional[Dict[str, List[str]]] = None, + c14n_inclusive_prefixes: Optional[List[str]] = None, + key_info_style: str = KEY_INFO_X509, + security_header_layout: str = "append", + ): + """Assign all signing-related attributes.""" + self.private_key = private_key + self.certificate = certificate + self.signature_method = signature_method + self.digest_method = digest_method + self.sign_timestamp = sign_timestamp + self.sign_username_token = sign_username_token + self.sign_binary_security_token = sign_binary_security_token + self.inclusive_ns_prefixes = inclusive_ns_prefixes + self.c14n_inclusive_prefixes = c14n_inclusive_prefixes + self.key_info_style = key_info_style + self.security_header_layout = security_header_layout + + def _sign(self, envelope): + """Sign the envelope and add KeyInfo with X509Data.""" + sig_el = _sign_envelope( + envelope, + self.private_key, + self.certificate, + signature_method=self.signature_method, + digest_method=self.digest_method, + sign_timestamp=self.sign_timestamp, + sign_username_token=self.sign_username_token, + sign_binary_security_token=self.sign_binary_security_token, + inclusive_ns_prefixes=self.inclusive_ns_prefixes, + c14n_inclusive_prefixes=self.c14n_inclusive_prefixes, + ) + if self.key_info_style == KEY_INFO_X509: + _add_key_info_x509(sig_el, self.certificate) + elif self.key_info_style in (KEY_IDENTIFIER_SKI, KEY_IDENTIFIER_THUMBPRINT): + _add_key_info_key_identifier(sig_el, self.certificate, self.key_info_style) + else: + raise ValueError(f"Unsupported key_info_style: {self.key_info_style}") + + security = get_security_header(envelope) + _reorder_security_children(security, self.security_header_layout) + return sig_el + + def apply(self, envelope, headers): + self._sign(envelope) + return envelope, headers + + def verify( + self, + envelope, + validate_timestamp: bool = False, + clock_skew_seconds: int = 0, + validate_certificate_time: bool = False, + now: Optional[datetime] = None, + ): + _verify_envelope(envelope, self.certificate) + if validate_timestamp: + _validate_timestamp_policy( + envelope, + now=now, + clock_skew_seconds=clock_skew_seconds, + ) + if validate_certificate_time: + _validate_certificate_time(self.certificate, now=now) + return envelope + + +class CryptoSignature(CryptoMemorySignature): + """Sign a SOAP envelope using PEM key and certificate files. + + Drop-in replacement for :class:`zeep.wsse.signature.Signature`. + + Parameters + ---------- + key_file : str + Path to PEM private key file. + certfile : str + Path to PEM certificate file. + password : bytes or str, optional + Private key password. + signature_method, digest_method, sign_timestamp, sign_username_token, + sign_binary_security_token, inclusive_ns_prefixes, c14n_inclusive_prefixes : + See :class:`CryptoMemorySignature`. + """ + + def __init__( + self, + key_file: str, + certfile: str, + password: Optional[Union[bytes, str]] = None, + signature_method: str = SIG_RSA_SHA1, + digest_method: str = DIGEST_SHA1, + **kwargs, + ): + super().__init__( + _read_file(key_file), + _read_file(certfile), + password, + signature_method, + digest_method, + **kwargs, + ) + + +class CryptoBinaryMemorySignature(CryptoMemorySignature): + """Sign a SOAP envelope and embed the certificate as a + ``BinarySecurityToken``. + + This is the pure-Python equivalent of + :class:`zeep.wsse.signature.BinarySignature`. + + The certificate is placed in a ``wsse:BinarySecurityToken`` element in the + security header, and the ``ds:KeyInfo`` in the signature references it. + """ + + def _sign(self, envelope): + security = get_security_header(envelope) + bintok = _add_binary_security_token(security, self.certificate) + bintok_id = bintok.get(QName(ns.WSU, "Id")) + + sig_el = _sign_envelope( + envelope, + self.private_key, + self.certificate, + signature_method=self.signature_method, + digest_method=self.digest_method, + sign_timestamp=self.sign_timestamp, + sign_username_token=self.sign_username_token, + sign_binary_security_token=self.sign_binary_security_token, + inclusive_ns_prefixes=self.inclusive_ns_prefixes, + c14n_inclusive_prefixes=self.c14n_inclusive_prefixes, + ) + _add_key_info_binary_ref(sig_el, bintok_id) + _reorder_security_children(security, self.security_header_layout) + return sig_el + + +class CryptoBinarySignature(CryptoBinaryMemorySignature): + """Sign a SOAP envelope using PEM files and embed a ``BinarySecurityToken``. + + Drop-in replacement for :class:`zeep.wsse.signature.BinarySignature`. + + Parameters + ---------- + key_file : str + Path to PEM private key file. + certfile : str + Path to PEM certificate file. + password : bytes or str, optional + Private key password. + signature_method, digest_method, sign_timestamp, sign_username_token, + sign_binary_security_token, inclusive_ns_prefixes, c14n_inclusive_prefixes : + See :class:`CryptoMemorySignature`. + """ + + def __init__( + self, + key_file: str, + certfile: str, + password: Optional[Union[bytes, str]] = None, + signature_method: str = SIG_RSA_SHA1, + digest_method: str = DIGEST_SHA1, + **kwargs, + ): + super().__init__( + _read_file(key_file), + _read_file(certfile), + password, + signature_method, + digest_method, + **kwargs, + ) + + @classmethod + def from_pkcs12( + cls, + p12_file: str, + password: Optional[Union[bytes, str]] = None, + signature_method: str = SIG_RSA_SHA1, + digest_method: str = DIGEST_SHA1, + **kwargs, + ): + """Create a ``CryptoBinarySignature`` from a PKCS#12 file. + + Parameters + ---------- + p12_file : str + Path to a ``.p12`` or ``.pfx`` file. + password : bytes or str, optional + Password for the PKCS#12 file. + """ + _check_crypto_import() + + if isinstance(password, str): + password = password.encode("utf-8") + + p12_data = _read_file(p12_file) + private_key, certificate, _ = _load_pkcs12(p12_data, password) + + instance = cls.__new__(cls) + instance._configure( + private_key, + certificate, + signature_method=signature_method, + digest_method=digest_method, + **kwargs, + ) + return instance + + +class PKCS12Signature(CryptoMemorySignature): + """Sign a SOAP envelope using a PKCS#12 file with X509Data in KeyInfo. + + Parameters + ---------- + p12_file : str + Path to a ``.p12`` or ``.pfx`` file. + password : bytes or str, optional + Password for the PKCS#12 file. + """ + + def __init__( + self, + p12_file: str, + password: Optional[Union[bytes, str]] = None, + signature_method: str = SIG_RSA_SHA1, + digest_method: str = DIGEST_SHA1, + **kwargs, + ): + _check_crypto_import() + + if isinstance(password, str): + password = password.encode("utf-8") + + p12_data = _read_file(p12_file) + private_key, certificate, _ = _load_pkcs12(p12_data, password) + + self._configure( + private_key, + certificate, + signature_method=signature_method, + digest_method=digest_method, + **kwargs, + ) diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index c4aec758e..47ab7fd4e 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -8,6 +8,7 @@ module. """ + from lxml import etree from lxml.etree import QName diff --git a/src/zeep/wsse/utils.py b/src/zeep/wsse/utils.py index f9de49b7e..04e50e699 100644 --- a/src/zeep/wsse/utils.py +++ b/src/zeep/wsse/utils.py @@ -1,7 +1,6 @@ import datetime from uuid import uuid4 -import pytz from lxml import etree from lxml.builder import ElementMaker @@ -28,8 +27,8 @@ def get_security_header(doc): def get_timestamp(timestamp=None, zulu_timestamp=None): - timestamp = timestamp or datetime.datetime.utcnow() - timestamp = timestamp.replace(tzinfo=pytz.utc, microsecond=0) + timestamp = timestamp or datetime.datetime.now(datetime.timezone.utc) + timestamp = timestamp.replace(tzinfo=datetime.timezone.utc, microsecond=0) if zulu_timestamp: return timestamp.isoformat().replace("+00:00", "Z") else: diff --git a/src/zeep/xsd/__init__.py b/src/zeep/xsd/__init__.py index 45a293974..bf3ea5bfc 100644 --- a/src/zeep/xsd/__init__.py +++ b/src/zeep/xsd/__init__.py @@ -1,8 +1,9 @@ """ - zeep.xsd - -------- +zeep.xsd +-------- """ + from zeep.xsd.const import Nil, SkipValue from zeep.xsd.elements import * # noqa from zeep.xsd.schema import Schema as Schema diff --git a/src/zeep/xsd/elements/builtins.py b/src/zeep/xsd/elements/builtins.py index fa639be7b..a75c07217 100644 --- a/src/zeep/xsd/elements/builtins.py +++ b/src/zeep/xsd/elements/builtins.py @@ -1,5 +1,3 @@ -from __future__ import division - from zeep.xsd.const import xsd_ns from zeep.xsd.elements.base import Base diff --git a/src/zeep/xsd/elements/indicators.py b/src/zeep/xsd/elements/indicators.py index f56814e61..de07adc82 100644 --- a/src/zeep/xsd/elements/indicators.py +++ b/src/zeep/xsd/elements/indicators.py @@ -11,18 +11,11 @@ -> Group """ + import copy import operator -import sys -import typing from collections import OrderedDict, defaultdict, deque - -if sys.version_info >= (3, 8): - from functools import cached_property as threaded_cached_property -else: - from cached_property import threaded_cached_property - -from lxml import etree +from functools import cached_property as threaded_cached_property from zeep.exceptions import UnexpectedElementError, ValidationError from zeep.xsd.const import NotSet, SkipValue @@ -661,8 +654,7 @@ def __str__(self): return self.signature() def __iter__(self, *args, **kwargs): - for item in self.child: - yield item + yield from self.child @threaded_cached_property def elements(self): diff --git a/src/zeep/xsd/elements/references.py b/src/zeep/xsd/elements/references.py index 14a36a64b..91ea28fa6 100644 --- a/src/zeep/xsd/elements/references.py +++ b/src/zeep/xsd/elements/references.py @@ -6,6 +6,7 @@ all the elements. """ + __all__ = ["RefElement", "RefAttribute", "RefAttributeGroup", "RefGroup"] diff --git a/src/zeep/xsd/schema.py b/src/zeep/xsd/schema.py index 8f877003d..b2bfb2dca 100644 --- a/src/zeep/xsd/schema.py +++ b/src/zeep/xsd/schema.py @@ -303,8 +303,7 @@ def __init__(self): self._instances = OrderedDict() def __iter__(self): - for document in self.values(): - yield document + yield from self.values() def add(self, document: "SchemaDocument") -> None: """Add a schema document""" @@ -348,8 +347,7 @@ def has_schema_document_for_ns(self, namespace: str) -> bool: def values(self): for documents in self._instances.values(): - for document in documents: - yield document + yield from documents class SchemaDocument: diff --git a/src/zeep/xsd/types/any.py b/src/zeep/xsd/types/any.py index 4b792fca9..ea7317380 100644 --- a/src/zeep/xsd/types/any.py +++ b/src/zeep/xsd/types/any.py @@ -1,11 +1,6 @@ import logging -import sys import typing - -if sys.version_info >= (3, 8): - from functools import cached_property as threaded_cached_property -else: - from cached_property import threaded_cached_property +from functools import cached_property as threaded_cached_property from lxml import etree diff --git a/src/zeep/xsd/types/builtins.py b/src/zeep/xsd/types/builtins.py index d57068eea..8c5c6ba1a 100644 --- a/src/zeep/xsd/types/builtins.py +++ b/src/zeep/xsd/types/builtins.py @@ -5,7 +5,6 @@ from decimal import Decimal as _Decimal import isodate -import pytz from zeep.xsd.const import xsd_ns from zeep.xsd.types.any import AnyType @@ -151,22 +150,7 @@ def xmlvalue(self, value): if isinstance(value, str): return value - # Bit of a hack, since datetime is a subclass of date we can't just - # test it with an isinstance(). And actually, we should not really - # care about the type, as long as it has the required attributes - if not all(hasattr(value, attr) for attr in ("hour", "minute", "second")): - value = datetime.datetime.combine( - value, - datetime.time( - getattr(value, "hour", 0), - getattr(value, "minute", 0), - getattr(value, "second", 0), - ), - ) - - if getattr(value, "microsecond", 0): - return isodate.isostrf.strftime(value, "%Y-%m-%dT%H:%M:%S.%f%Z") - return isodate.isostrf.strftime(value, "%Y-%m-%dT%H:%M:%S%Z") + return value.isoformat().replace("+00:00", "Z") @treat_whitespace("collapse") def pythonvalue(self, value): @@ -189,9 +173,7 @@ def xmlvalue(self, value): if isinstance(value, str): return value - if value.microsecond: - return isodate.isostrf.strftime(value, "%H:%M:%S.%f%Z") - return isodate.isostrf.strftime(value, "%H:%M:%S%Z") + return value.isoformat().replace("+00:00", "Z") @treat_whitespace("collapse") def pythonvalue(self, value): @@ -201,16 +183,26 @@ def pythonvalue(self, value): class Date(BuiltinType): _default_qname = xsd_ns("date") accepted_types = [datetime.date, str] + _pattern = re.compile(r"(\d{4})-(\d{2})-(\d{2})") @check_no_collection def xmlvalue(self, value): if isinstance(value, str): return value - return isodate.isostrf.strftime(value, "%Y-%m-%d") + return value.strftime("%Y-%m-%d") @treat_whitespace("collapse") def pythonvalue(self, value): - return isodate.parse_date(value) + try: + return isodate.parse_date(value) + except isodate.ISO8601Error: + # Recent versions of isodate don't support timezone in date's. This + # is not really ISO8601 compliant anway, but we should try to handle + # it, so lets just use a regex to parse the date directly. + m = self._pattern.match(value) + if m: + return datetime.date(*map(int, m.groups())) + raise class gYearMonth(BuiltinType): @@ -372,6 +364,8 @@ class Base64Binary(BuiltinType): @check_no_collection def xmlvalue(self, value): + if isinstance(value, str): + return value return base64.b64encode(value) def pythonvalue(self, value): @@ -536,12 +530,12 @@ class PositiveInteger(NonNegativeInteger): ## # Other def _parse_timezone(val): - """Return a pytz.tzinfo object""" + """Return a timezone object""" if not val: return if val == "Z" or val == "+00:00": - return pytz.utc + return datetime.timezone.utc negative = val.startswith("-") minutes = int(val[-2:]) @@ -549,22 +543,17 @@ def _parse_timezone(val): if negative: minutes = 0 - minutes - return pytz.FixedOffset(minutes) + return datetime.timezone(offset=datetime.timedelta(minutes=minutes)) -def _unparse_timezone(tzinfo): +def _unparse_timezone(tzinfo: datetime.timezone): if not tzinfo: return "" - if tzinfo == pytz.utc: + if tzinfo == datetime.timezone.utc: return "Z" - hours = math.floor(tzinfo._minutes / 60) - minutes = tzinfo._minutes % 60 - - if hours > 0: - return "+%02d:%02d" % (hours, minutes) - return "-%02d:%02d" % (abs(hours), minutes) + return datetime.datetime.now(tz=tzinfo).isoformat()[-6:] _types = [ diff --git a/src/zeep/xsd/types/complex.py b/src/zeep/xsd/types/complex.py index 9208e1a7c..4c69a9cde 100644 --- a/src/zeep/xsd/types/complex.py +++ b/src/zeep/xsd/types/complex.py @@ -2,16 +2,10 @@ import copy import logging -import sys import typing from collections import OrderedDict, deque +from functools import cached_property as threaded_cached_property from itertools import chain -from typing import Optional - -if sys.version_info >= (3, 8): - from functools import cached_property as threaded_cached_property -else: - from cached_property import threaded_cached_property from lxml import etree @@ -47,7 +41,7 @@ class ComplexType(AnyType): - _xsd_name: typing.Optional[str] = None + _xsd_name: str | None = None def __init__( self, @@ -66,7 +60,7 @@ def __init__( self._attributes = attributes or [] self._restriction = restriction self._extension = extension - self._extension_types: typing.List[typing.Type] = [] + self._extension_types: list[type] = [] super().__init__(qname=qname, is_global=is_global) def __call__(self, *args, **kwargs): @@ -75,11 +69,11 @@ def __call__(self, *args, **kwargs): return self._value_class(*args, **kwargs) @property - def accepted_types(self) -> typing.List[typing.Type]: + def accepted_types(self) -> list[type]: return [self._value_class] + self._extension_types @threaded_cached_property - def _array_class(self) -> typing.Type[ArrayValue]: + def _array_class(self) -> type[ArrayValue]: assert self._array_type return type( self.__class__.__name__, @@ -88,7 +82,7 @@ def _array_class(self) -> typing.Type[ArrayValue]: ) @threaded_cached_property - def _value_class(self) -> typing.Type[CompoundValue]: + def _value_class(self) -> type[CompoundValue]: return type( self.__class__.__name__, (CompoundValue,), @@ -174,11 +168,11 @@ def _array_type(self): def parse_xmlelement( self, xmlelement: etree._Element, - schema: Optional[Schema] = None, + schema: Schema | None = None, allow_none: bool = True, context: XmlParserContext = None, - schema_type: Optional[Type] = None, - ) -> typing.Optional[typing.Union[str, CompoundValue, typing.List[etree._Element]]]: + schema_type: Type | None = None, + ) -> str | CompoundValue | list[etree._Element] | None: """Consume matching xmlelements and call parse() on each :param xmlelement: XML element objects @@ -250,8 +244,8 @@ def parse_xmlelement( def render( self, node: etree._Element, - value: typing.Union[list, dict, CompoundValue], - xsd_type: "ComplexType" = None, + value: list | dict | CompoundValue, + xsd_type: ComplexType = None, render_path=None, ) -> None: """Serialize the given value lxml.Element subelements on the node @@ -314,10 +308,10 @@ def render( def parse_kwargs( self, - kwargs: typing.Dict[str, typing.Any], + kwargs: dict[str, typing.Any], name: str, - available_kwargs: typing.Set[str], - ) -> typing.Dict[str, typing.Any]: + available_kwargs: set[str], + ) -> dict[str, typing.Any]: """Parse the kwargs for this type and return the accepted data as a dict. @@ -341,8 +335,8 @@ def parse_kwargs( return {} def _create_object( - self, value: typing.Union[list, dict, CompoundValue, None], name: str - ) -> typing.Union[CompoundValue, None, _ObjectList]: + self, value: list | dict | CompoundValue | None, name: str + ) -> CompoundValue | None | _ObjectList: """Return the value as a CompoundValue object :type value: str diff --git a/src/zeep/xsd/utils.py b/src/zeep/xsd/utils.py index 7fe3ca221..dc0e7c3d2 100644 --- a/src/zeep/xsd/utils.py +++ b/src/zeep/xsd/utils.py @@ -33,8 +33,7 @@ def max_occurs_iter(max_occurs, items=None): for i, sub_kwargs in zip(generator, items): yield sub_kwargs else: - for i in generator: - yield i + yield from generator def create_prefixed_name(qname, schema): diff --git a/src/zeep/xsd/visitor.py b/src/zeep/xsd/visitor.py index c7b1fb936..5aa40f537 100644 --- a/src/zeep/xsd/visitor.py +++ b/src/zeep/xsd/visitor.py @@ -37,7 +37,7 @@ class tags: attributeGroup = xsd_ns("attributeGroup") restriction = xsd_ns("restriction") extension = xsd_ns("extension") - notation = xsd_ns("notations") + notation = xsd_ns("notation") class SchemaVisitor: diff --git a/tests/conftest.py b/tests/conftest.py index 4bd41de06..76e776aa4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,3 @@ -import sys - import pytest pytest.register_assert_rewrite("tests.utils") diff --git a/tests/test_cache.py b/tests/test_cache.py index 68ece947a..bdf3b42b6 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -35,11 +35,14 @@ def test_no_records(self, tmpdir): result = c.get("http://tests.python-zeep.org/example.wsdl") assert result is None + @pytest.mark.network def test_has_expired(self, tmpdir): c = cache.SqliteCache(path=tmpdir.join("sqlite.cache.db").strpath) c.add("http://tests.python-zeep.org/example.wsdl", b"content") - freeze_dt = datetime.datetime.utcnow() + datetime.timedelta(seconds=7200) + freeze_dt = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=7200 + ) with freezegun.freeze_time(freeze_dt): result = c.get("http://tests.python-zeep.org/example.wsdl") assert result is None @@ -51,13 +54,16 @@ def test_has_not_expired(self, tmpdir): assert result == b"content" +@pytest.mark.network def test_memory_cache_timeout(tmpdir): c = cache.InMemoryCache() c.add("http://tests.python-zeep.org/example.wsdl", b"content") result = c.get("http://tests.python-zeep.org/example.wsdl") assert result == b"content" - freeze_dt = datetime.datetime.utcnow() + datetime.timedelta(seconds=7200) + freeze_dt = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=7200 + ) with freezegun.freeze_time(freeze_dt): result = c.get("http://tests.python-zeep.org/example.wsdl") assert result is None @@ -76,16 +82,18 @@ class TestIsExpired: def test_timeout_none(self): assert cache._is_expired(100, None) is False + @pytest.mark.network def test_has_expired(self): timeout = 7200 - utcnow = datetime.datetime.utcnow() + utcnow = datetime.datetime.now(datetime.timezone.utc) value = utcnow + datetime.timedelta(seconds=timeout) with freezegun.freeze_time(utcnow): assert cache._is_expired(value, timeout) is False + @pytest.mark.network def test_has_not_expired(self): timeout = 7200 - utcnow = datetime.datetime.utcnow() + utcnow = datetime.datetime.now(datetime.timezone.utc) value = utcnow - datetime.timedelta(seconds=timeout) with freezegun.freeze_time(utcnow): assert cache._is_expired(value, timeout) is False diff --git a/tests/test_cert.p12 b/tests/test_cert.p12 new file mode 100644 index 000000000..78fcdc538 Binary files /dev/null and b/tests/test_cert.p12 differ diff --git a/tests/test_cert.pem b/tests/test_cert.pem new file mode 100644 index 000000000..d2f5a1401 --- /dev/null +++ b/tests/test_cert.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDLTCCAhWgAwIBAgIUcvOvPL9johmNFM6wh9ssxKWzXvUwDQYJKoZIhvcNAQEL +BQAwJjERMA8GA1UEAwwIVGVzdENlcnQxETAPBgNVBAoMCFplZXBUZXN0MB4XDTI2 +MDMyMDA4Mjc0OVoXDTM2MDMxNzA4Mjc0OVowJjERMA8GA1UEAwwIVGVzdENlcnQx +ETAPBgNVBAoMCFplZXBUZXN0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC +AQEAlqSUrgWb2oBtxeH7reHw4FPO6muQeKcm3lGHumYBDh7sSSD6yTOUeQWoex3Z +Fv+au6LtLStiSf01tuY2/0DvZcueZi7VI0mFKEwKVF+Zr/DhKq5B5/ixcAv/iWcZ +opGF5IWdBBjPpymlAH3SwMAo2/UQnIiUrpoHmE03u4lSn/aU1iGMR2q66TXrg/WK +M2rLyJFnOipxiTmlZZpCYb9xfI0yjXrFFrurkv03TXeckWxTInKKRoVl1NRysfLy +SsGk7CW6XuI0C2lCg0IWtmBfjJudTCVZtmqsqut7PzipQZfnY59qI6ffaSpPd+BL +zS1iCfHTzZaVzOzm+WGEV42/dQIDAQABo1MwUTAdBgNVHQ4EFgQUkhRpvsLP5Ek7 ++1/kTDAmAYS7ef8wHwYDVR0jBBgwFoAUkhRpvsLP5Ek7+1/kTDAmAYS7ef8wDwYD +VR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEACv1azo/9rTvPeXwQ/ndw +mXX6V6dtXutCS66mjoIPX3e64uu1tcQTUQGTPGa0Ar5FzkvPOy1w59Iyn8yh2lp3 +3KZCLDstk3ECVn22iwrtfe0tbpDU2ui+D8aTFiqjb27CcEtH24IkBFCgkY50Ycke +/fc48eu2ioeGWWeOWMU3EhH/mdbNK70dxpr70dnAxn6IMbNeBzvM67huRQfaHypX +nEuDIe7ejOuJqzPHhQP4vcQvOvPsbRRrO7kkvR9GJDCpIbgY+RI2fCThUQpA3P8O +EF2LCQKE8/X9PJuGufz7qdjNOg9TOkKO9epkZHAbvhBAbXOx29hrlu7YCi1+SF85 +bg== +-----END CERTIFICATE----- diff --git a/tests/test_client.py b/tests/test_client.py index af13d9b98..313a9293f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -60,7 +60,7 @@ def test_context_manager(): def test_service_proxy_dir_operations(): client_obj = client.Client("tests/wsdl_files/soap.wsdl") operations = [op for op in dir(client_obj.service) if not op.startswith("_")] - assert set(operations) == set(["GetLastTradePrice", "GetLastTradePriceNoOutput"]) + assert set(operations) == {"GetLastTradePrice", "GetLastTradePriceNoOutput"} def test_operation_proxy_doc(): diff --git a/tests/test_key.pem b/tests/test_key.pem new file mode 100644 index 000000000..c11300234 --- /dev/null +++ b/tests/test_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCWpJSuBZvagG3F +4fut4fDgU87qa5B4pybeUYe6ZgEOHuxJIPrJM5R5Bah7HdkW/5q7ou0tK2JJ/TW2 +5jb/QO9ly55mLtUjSYUoTApUX5mv8OEqrkHn+LFwC/+JZxmikYXkhZ0EGM+nKaUA +fdLAwCjb9RCciJSumgeYTTe7iVKf9pTWIYxHarrpNeuD9YozasvIkWc6KnGJOaVl +mkJhv3F8jTKNesUWu6uS/TdNd5yRbFMicopGhWXU1HKx8vJKwaTsJbpe4jQLaUKD +Qha2YF+Mm51MJVm2aqyq63s/OKlBl+djn2ojp99pKk934EvNLWIJ8dPNlpXM7Ob5 +YYRXjb91AgMBAAECggEAAh6815oimwz1qTRIpiYo4q9V7nynDYZPXg+drmTjHOoY +fzWknJJKQ6MYakB9nqeS65Mbwr1P5noBeIUoPv1l9nPK0ztTf+n3+Bzh2arNbz7K +Rx8pXvS5Yo9OAFwKw1FIUMgEnAXRXlPlYOq5vZa2vH8SxcaHo9FtIUVCRIm74kkW +QMHjI+L9zvIrLrNAIF6iCpLTv0n+Me2p13qKs4t5P1iXT+18hIc4HqIme8y7OC8X +viCfxap7gWXODN/HI7fN+yYWxndDy/pJehfHL5jx2CIoUaWW2AxUCzjlokIUSfU5 +zXenITDiKI4Xb2zdNCBHJ5vHXzyFlPk5qMaGIhu0YQKBgQDFDwRWa4eVXXbvDvgD +j9zHx74m46i5uUOhx0kBC07H2Wvyglth1siVfNr7G0UTDGTdCSIbz6Ycn4LUEQji +giF0b3lZCZ/SwHI5aAK+cblRgzU8XpXfXWWvScXY+S48cq8N+PknFw4FulhxaTZ0 +xWSBnA3FgJTyZYwFTky5GTCQbQKBgQDDs3YOmeJ9u9y926ikQgOV8NsecZgohbUh +L1+LsukWZaZLdt7e8ZEFeVTnL1y4c0V4KmvAohm129v/B6h2P+Xwkb+NfuG3o2PN +4aE+uKKVdnqzI531rwOrQT6PugvwRHNqqf22L6l7SMtkznb3C3G9Bqh6ouS684F1 +JLFcggpWKQKBgFba9Zh5sLr/BSr9OAep4zATNBbEBVxG8i5ePik9qK2hgVjUaB7J +ooNxErvXGyz4K4TZP7VuYtL+45VBJN9haAmAEMZT3aPwuufelkcGrR3mbRJ8xiY7 +blRCeffWbLCsA8hu0rKOENopdk4Wh7AUSOZvuAozUHM+mubDykFTfvpJAoGAG0mA +opSOYh25r7dbElwqB9QnmV0ZbD0IfnIOjnoJyheuUkNm3Asuv2alkf6mnQllnFW9 +5IQ1tYfMawj0UliihSKtZNtWObbMib52LzqrYM++WBAVTFZ02oIkaQH1/DBvL2Y4 +fBi1Nv11br7jE5tC5qpw/3iLEA439M+C83UmX9ECgYEAo/OaX1k3YNpcwdmcRzVy +eyFV+iG03fCBpIwngO9rviYEKR7uhSKVg+4qLJbo1Ta0qQqjJ7Q6b8aCVefUyRad ++n5bzYLXimqx1JXPz5A5m+kzo/AIGyesQ1GDZrhQj44pUha+oG2xo6W+CCWAQJ2M +UdoW5g3TRRPY63MLkRHAu2Y= +-----END PRIVATE KEY----- diff --git a/tests/test_loader.py b/tests/test_loader.py index 16548320d..8ef55594e 100644 --- a/tests/test_loader.py +++ b/tests/test_loader.py @@ -1,5 +1,4 @@ import pytest -from pytest import raises as assert_raises from tests.utils import DummyTransport from zeep.exceptions import DTDForbidden, EntitiesForbidden diff --git a/tests/test_soap_xop.py b/tests/test_soap_xop.py index 283672a74..9c010114d 100644 --- a/tests/test_soap_xop.py +++ b/tests/test_soap_xop.py @@ -249,7 +249,7 @@ def test_xop(): headers={"Content-Type": content_type}, ) result = service.TestOperation2("") - assert result["_value_1"] == "BINARYDATA".encode() + assert result["_value_1"] == b"BINARYDATA" m.post( "http://tests.python-zeep.org/test", @@ -257,7 +257,7 @@ def test_xop(): headers={"Content-Type": content_type}, ) result = service.TestOperation1("") - assert result == "BINARYDATA".encode() + assert result == b"BINARYDATA" def test_xop_cid_encoded(): @@ -302,4 +302,4 @@ def test_xop_cid_encoded(): headers={"Content-Type": content_type}, ) result = service.TestOperation2("") - assert result["_value_1"] == "BINARYDATA".encode() + assert result["_value_1"] == b"BINARYDATA" diff --git a/tests/test_transports.py b/tests/test_transports.py index 2fc87013e..ce8ad193e 100644 --- a/tests/test_transports.py +++ b/tests/test_transports.py @@ -49,6 +49,7 @@ def test_load_file_unix(): result = transport.load("file:///usr/local/bin/example.wsdl") assert result == b"x" m_open.assert_called_once_with("/usr/local/bin/example.wsdl", "rb") + m_open.return_value.close.assert_called() @pytest.mark.skipif(os.name != "nt", reason="test valid for windows platform only") diff --git a/tests/test_wsdl.py b/tests/test_wsdl.py index 5013d79f3..ddf617b18 100644 --- a/tests/test_wsdl.py +++ b/tests/test_wsdl.py @@ -1,4 +1,3 @@ -import io from io import StringIO import pytest @@ -159,7 +158,7 @@ def test_parse_types_multiple_schemas(): def test_parse_types_nsmap_issues(): content = StringIO( - """ + r""" + + + + OK + + + + """ + ) + + +def _make_envelope_with_timestamp(): + return load_xml( + """ + + + + + 2025-01-01T00:00:00Z + 2025-01-01T01:00:00Z + + + + + + OK + + + + """ + ) + + +def _make_envelope_with_username_token(): + return load_xml( + """ + + + + + 2025-01-01T00:00:00Z + 2025-01-01T01:00:00Z + + + testuser + testpass + + + + + + OK + + + + """ + ) + + +# ----------------------------------------------------------------------- +# Basic signing & verification +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +class TestCryptoSignature: + """Tests for CryptoSignature (X509Data in KeyInfo).""" + + def test_sign_and_verify(self): + envelope = _make_envelope() + plugin = crypto.CryptoSignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + def test_sign_with_timestamp(self): + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoSignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + # Should have 2 references: Body + Timestamp + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 2 + + def test_verify_fails_on_tampered_body(self): + envelope = _make_envelope() + plugin = crypto.CryptoSignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + + # Tamper with the body + nsmap = {"tns": "http://tests.python-zeep.org/"} + for elm in envelope.xpath("//tns:Argument", namespaces=nsmap): + elm.text = "TAMPERED" + + with pytest.raises(SignatureVerificationFailed): + plugin.verify(envelope) + + def test_combined_pem(self): + """Test with a PEM file containing both key and cert.""" + envelope = _make_envelope() + plugin = crypto.CryptoSignature(COMBINED_PEM, COMBINED_PEM) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + def test_combined_pem_with_password(self): + envelope = _make_envelope() + plugin = crypto.CryptoSignature(COMBINED_PEM_PW, COMBINED_PEM_PW, "geheim") + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + def test_x509_data_present(self): + """Verify KeyInfo contains X509Data with certificate.""" + envelope = _make_envelope() + plugin = crypto.CryptoSignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + + x509_cert = envelope.xpath( + "//ds:KeyInfo//ds:X509Certificate", + namespaces={"ds": ns.DS}, + ) + assert len(x509_cert) == 1 + assert x509_cert[0].text # Certificate should be non-empty + + +# ----------------------------------------------------------------------- +# BinarySignature +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +class TestCryptoBinarySignature: + """Tests for CryptoBinarySignature (BinarySecurityToken in header).""" + + def test_sign_and_verify(self): + envelope = _make_envelope() + plugin = crypto.CryptoBinarySignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + def test_binary_token_present(self): + envelope = _make_envelope() + plugin = crypto.CryptoBinarySignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + + bintok = envelope.xpath( + "//wsse:BinarySecurityToken", + namespaces={"wsse": ns.WSSE}, + ) + assert len(bintok) == 1 + assert bintok[0].text # Should contain base64 cert + + def test_key_info_references_binary_token(self): + envelope = _make_envelope() + plugin = crypto.CryptoBinarySignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + + bintok = envelope.xpath( + "//wsse:BinarySecurityToken", + namespaces={"wsse": ns.WSSE}, + )[0] + ref = envelope.xpath( + "//ds:KeyInfo//wsse:Reference", + namespaces={"ds": ns.DS, "wsse": ns.WSSE}, + )[0] + bintok_id = bintok.get(QName(ns.WSU, "Id")) + assert ref.get("URI") == "#" + bintok_id + + def test_sign_with_timestamp(self): + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoBinarySignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 2 + + def test_verify_fails_on_tampered_body(self): + envelope = _make_envelope() + plugin = crypto.CryptoBinarySignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + + nsmap = {"tns": "http://tests.python-zeep.org/"} + for elm in envelope.xpath("//tns:Argument", namespaces=nsmap): + elm.text = "TAMPERED" + + with pytest.raises(SignatureVerificationFailed): + plugin.verify(envelope) + + +# ----------------------------------------------------------------------- +# Algorithm combinations +# ----------------------------------------------------------------------- + +SIGNATURE_METHODS = [ + (crypto.SIG_RSA_SHA1, "http://www.w3.org/2000/09/xmldsig#rsa-sha1"), + (crypto.SIG_RSA_SHA256, "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"), +] + +DIGEST_METHODS = [ + (crypto.DIGEST_SHA1, "http://www.w3.org/2000/09/xmldsig#sha1"), + (crypto.DIGEST_SHA256, "http://www.w3.org/2001/04/xmlenc#sha256"), +] + + +@skip_if_no_crypto +@pytest.mark.parametrize("sig_method,expected_sig_href", SIGNATURE_METHODS) +@pytest.mark.parametrize("digest_method,expected_digest_href", DIGEST_METHODS) +def test_algorithm_combinations(sig_method, expected_sig_href, digest_method, expected_digest_href): + """Test all combinations of signature and digest algorithms.""" + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + signature_method=sig_method, + digest_method=digest_method, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + # Verify algorithm URIs in the XML + digests = envelope.xpath("//ds:DigestMethod", namespaces={"ds": ns.DS}) + assert len(digests) > 0 + for d in digests: + assert d.get("Algorithm") == expected_digest_href + + signatures = envelope.xpath("//ds:SignatureMethod", namespaces={"ds": ns.DS}) + assert len(signatures) == 1 + assert signatures[0].get("Algorithm") == expected_sig_href + + +@skip_if_no_crypto +def test_mixed_algorithms_sha256_digest_sha1_signature(): + """VetStat-style: SHA256 digests with RSA-SHA1 signature.""" + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + signature_method=crypto.SIG_RSA_SHA1, + digest_method=crypto.DIGEST_SHA256, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + digests = envelope.xpath("//ds:DigestMethod", namespaces={"ds": ns.DS}) + for d in digests: + assert d.get("Algorithm") == crypto.DIGEST_SHA256 + + sig_method = envelope.xpath("//ds:SignatureMethod", namespaces={"ds": ns.DS})[0] + assert sig_method.get("Algorithm") == crypto.SIG_RSA_SHA1 + + +@skip_if_no_crypto +def test_key_identifier_thumbprint(): + envelope = _make_envelope() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + key_info_style=crypto.KEY_IDENTIFIER_THUMBPRINT, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + key_identifier = envelope.xpath( + "//ds:KeyInfo//wsse:KeyIdentifier", + namespaces={"ds": ns.DS, "wsse": ns.WSSE}, + ) + assert len(key_identifier) == 1 + assert key_identifier[0].get("ValueType").endswith("ThumbprintSHA1") + assert key_identifier[0].text + + +@skip_if_no_crypto +def test_key_identifier_ski(): + envelope = _make_envelope() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + key_info_style=crypto.KEY_IDENTIFIER_SKI, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + key_identifier = envelope.xpath( + "//ds:KeyInfo//wsse:KeyIdentifier", + namespaces={"ds": ns.DS, "wsse": ns.WSSE}, + ) + assert len(key_identifier) == 1 + assert key_identifier[0].get("ValueType").endswith("X509SubjectKeyIdentifier") + assert key_identifier[0].text + + +# ----------------------------------------------------------------------- +# Signing extra elements (UsernameToken, BinarySecurityToken) +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +def test_sign_username_token(): + """Sign Body + Timestamp + UsernameToken.""" + envelope = _make_envelope_with_username_token() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + sign_username_token=True, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 3 # Body + Timestamp + UsernameToken + + +@skip_if_no_crypto +def test_sign_binary_security_token(): + """Sign Body + Timestamp + BinarySecurityToken.""" + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoBinarySignature( + KEY_FILE, + CERT_FILE, + sign_binary_security_token=True, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 3 # Body + Timestamp + BinarySecurityToken + + +@skip_if_no_crypto +def test_sign_all_elements(): + """VetStat-style: sign Body + Timestamp + UsernameToken + BinarySecurityToken.""" + envelope = _make_envelope_with_username_token() + plugin = crypto.CryptoBinarySignature( + KEY_FILE, + CERT_FILE, + sign_username_token=True, + sign_binary_security_token=True, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 4 # Body + Timestamp + UsernameToken + BinarySecurityToken + + +# ----------------------------------------------------------------------- +# Inclusive namespace prefixes +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +def test_inclusive_ns_prefixes(): + """Verify that inclusive namespace prefixes appear in the transform elements.""" + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + inclusive_ns_prefixes={ + "Body": ["wsse", "ds"], + "Timestamp": ["soapenv", "wsse"], + }, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + # Find InclusiveNamespaces elements + inc_ns_els = envelope.xpath( + "//ds:Reference/ds:Transforms/ds:Transform/ec:InclusiveNamespaces", + namespaces={ + "ds": ns.DS, + "ec": "http://www.w3.org/2001/10/xml-exc-c14n#", + }, + ) + assert len(inc_ns_els) >= 1 + prefix_lists = [el.get("PrefixList") for el in inc_ns_els] + assert "wsse ds" in prefix_lists or "ds wsse" in prefix_lists + + +@skip_if_no_crypto +def test_security_header_layout_xmlsec_compatible(): + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoBinarySignature( + KEY_FILE, + CERT_FILE, + security_header_layout="xmlsec_compatible", + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + security_children = envelope.xpath( + "//wsse:Security/*", + namespaces={"wsse": ns.WSSE}, + ) + local_names = [QName(node.tag).localname for node in security_children] + assert local_names[:3] == ["Signature", "BinarySecurityToken", "Timestamp"] + + +@skip_if_no_crypto +def test_c14n_inclusive_prefixes_on_signed_info(): + """Verify CanonicalizationMethod has inclusive prefixes when configured.""" + envelope = _make_envelope() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + c14n_inclusive_prefixes=["ds", "ec", "soapenv", "wsse", "wsu"], + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + c14n_method = envelope.xpath( + "//ds:SignedInfo/ds:CanonicalizationMethod", + namespaces={"ds": ns.DS}, + )[0] + inc_ns = c14n_method.find(QName("http://www.w3.org/2001/10/xml-exc-c14n#", "InclusiveNamespaces")) + assert inc_ns is not None + assert "ds" in inc_ns.get("PrefixList") + + +# ----------------------------------------------------------------------- +# PKCS#12 support +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +def test_pkcs12_signature(): + """Test PKCS12Signature with .p12 file.""" + envelope = _make_envelope() + plugin = crypto.PKCS12Signature(P12_FILE, P12_PASSWORD) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + +@skip_if_no_crypto +def test_binary_signature_from_pkcs12(): + """Test CryptoBinarySignature.from_pkcs12 classmethod.""" + envelope = _make_envelope() + plugin = crypto.CryptoBinarySignature.from_pkcs12(P12_FILE, P12_PASSWORD) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + +@skip_if_no_crypto +def test_pkcs12_with_timestamp(): + envelope = _make_envelope_with_timestamp() + plugin = crypto.PKCS12Signature(P12_FILE, P12_PASSWORD) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 2 + + +@skip_if_no_crypto +def test_pkcs12_string_password(): + """Test that string passwords are accepted.""" + envelope = _make_envelope() + plugin = crypto.PKCS12Signature(P12_FILE, "testpass") + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + +# ----------------------------------------------------------------------- +# Compose with UsernameToken +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +def test_compose_with_username_token(): + """Test using Compose to combine UsernameToken + CryptoSignature.""" + from zeep.wsse import Compose + from zeep.wsse.username import UsernameToken + + envelope = _make_envelope() + wsse_plugin = Compose( + [ + UsernameToken("testuser", "testpass"), + crypto.CryptoSignature(KEY_FILE, CERT_FILE), + ] + ) + envelope, headers = wsse_plugin.apply(envelope, {}) + + # Should have the username token + ut = envelope.xpath( + "//wsse:UsernameToken/wsse:Username", + namespaces={"wsse": ns.WSSE}, + ) + assert len(ut) == 1 + assert ut[0].text == "testuser" + + # Should have a signature + sig = envelope.xpath("//ds:Signature", namespaces={"ds": ns.DS}) + assert len(sig) == 1 + + +@skip_if_no_crypto +def test_compose_username_and_signed_username(): + """Compose UsernameToken + CryptoBinarySignature that also signs the token.""" + from zeep.wsse import Compose + from zeep.wsse.username import UsernameToken + + envelope = _make_envelope() + wsse_plugin = Compose( + [ + UsernameToken("testuser", "testpass"), + crypto.CryptoBinarySignature( + KEY_FILE, + CERT_FILE, + sign_username_token=True, + ), + ] + ) + envelope, headers = wsse_plugin.apply(envelope, {}) + + # Should have refs for Body + UsernameToken (no Timestamp in this envelope) + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 2 + + +# ----------------------------------------------------------------------- +# Memory-based classes +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +def test_memory_signature(): + """Test CryptoMemorySignature with raw PEM bytes.""" + with open(KEY_FILE, "rb") as f: + key_data = f.read() + with open(CERT_FILE, "rb") as f: + cert_data = f.read() + + envelope = _make_envelope() + plugin = crypto.CryptoMemorySignature(key_data, cert_data) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + +@skip_if_no_crypto +def test_binary_memory_signature(): + """Test CryptoBinaryMemorySignature with raw PEM bytes.""" + with open(KEY_FILE, "rb") as f: + key_data = f.read() + with open(CERT_FILE, "rb") as f: + cert_data = f.read() + + envelope = _make_envelope() + plugin = crypto.CryptoBinaryMemorySignature(key_data, cert_data) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + bintok = envelope.xpath( + "//wsse:BinarySecurityToken", + namespaces={"wsse": ns.WSSE}, + ) + assert len(bintok) == 1 + + +# ----------------------------------------------------------------------- +# Edge cases +# ----------------------------------------------------------------------- + + +@skip_if_no_crypto +def test_no_timestamp_only_body(): + """When there's no Timestamp, should only sign Body.""" + envelope = _make_envelope() + plugin = crypto.CryptoSignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 1 # Body only + + +@skip_if_no_crypto +def test_sign_timestamp_disabled(): + """Explicitly disable timestamp signing.""" + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoSignature( + KEY_FILE, + CERT_FILE, + sign_timestamp=False, + ) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope) + + refs = envelope.xpath("//ds:Reference", namespaces={"ds": ns.DS}) + assert len(refs) == 1 # Body only + + +@skip_if_no_crypto +def test_verify_no_header_raises(): + """Verify raises on envelope without Header.""" + envelope = load_xml( + """ + + + + """ + ) + with pytest.raises(SignatureVerificationFailed): + crypto._verify_envelope( + envelope, + crypto._load_pem_certificate(open(CERT_FILE, "rb").read()), + ) + + +@skip_if_no_crypto +def test_verify_no_security_header_raises(): + """Verify raises on envelope with Header but no Security.""" + envelope = load_xml( + """ + + + + + """ + ) + with pytest.raises(SignatureVerificationFailed): + crypto._verify_envelope( + envelope, + crypto._load_pem_certificate(open(CERT_FILE, "rb").read()), + ) + + +@skip_if_no_crypto +def test_verify_with_timestamp_policy_expired_raises(): + envelope = _make_envelope_with_timestamp() + plugin = crypto.CryptoSignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + + with pytest.raises(SignatureVerificationFailed): + plugin.verify( + envelope, + validate_timestamp=True, + now=datetime(2030, 1, 1, tzinfo=timezone.utc), + ) + + +@skip_if_no_crypto +def test_verify_with_certificate_time_policy_passes(): + envelope = _make_envelope() + plugin = crypto.CryptoSignature(KEY_FILE, CERT_FILE) + envelope, headers = plugin.apply(envelope, {}) + plugin.verify(envelope, validate_certificate_time=True) diff --git a/tests/test_wsse_signature.py b/tests/test_wsse_signature.py index 487b83542..bab949570 100644 --- a/tests/test_wsse_signature.py +++ b/tests/test_wsse_signature.py @@ -2,7 +2,6 @@ import sys import pytest -from lxml import etree from lxml.etree import QName from tests.utils import load_xml diff --git a/tests/test_wsse_username.py b/tests/test_wsse_username.py index 2b72145e6..06cb41334 100644 --- a/tests/test_wsse_username.py +++ b/tests/test_wsse_username.py @@ -85,6 +85,7 @@ def test_password_text(): assert_nodes_equal(envelope, expected) +@pytest.mark.network @freeze_time("2016-05-08 12:00:00") def test_password_digest(monkeypatch): monkeypatch.setattr(os, "urandom", lambda x: b"mocked-random") @@ -138,6 +139,7 @@ def test_password_digest(monkeypatch): assert_nodes_equal(envelope, expected) +@pytest.mark.network @freeze_time("2016-05-08 12:00:00") def test_password_digest_custom(monkeypatch): monkeypatch.setattr(os, "urandom", lambda x: b"mocked-random") @@ -323,6 +325,7 @@ def test_timestamp_token(): assert_nodes_equal(envelope, expected) +@pytest.mark.network @freeze_time("2016-05-08 12:00:00") def test_bytes_like_password_digest(monkeypatch): monkeypatch.setattr(os, "urandom", lambda x: b"mocked-random") diff --git a/tests/test_xsd_builtins.py b/tests/test_xsd_builtins.py index 9d76dea50..4f055da98 100644 --- a/tests/test_xsd_builtins.py +++ b/tests/test_xsd_builtins.py @@ -3,7 +3,6 @@ import isodate import pytest -import pytz from zeep.xsd.types import builtins @@ -161,14 +160,16 @@ def test_xmlvalue(self): value = datetime.datetime(2016, 3, 4, 21, 14, 42) assert instance.xmlvalue(value) == "2016-03-04T21:14:42" - value = datetime.datetime(2016, 3, 4, 21, 14, 42, tzinfo=pytz.utc) + value = datetime.datetime(2016, 3, 4, 21, 14, 42, tzinfo=datetime.timezone.utc) assert instance.xmlvalue(value) == "2016-03-04T21:14:42Z" - value = datetime.datetime(2016, 3, 4, 21, 14, 42, 123456, tzinfo=pytz.utc) + value = datetime.datetime( + 2016, 3, 4, 21, 14, 42, 123456, tzinfo=datetime.timezone.utc + ) assert instance.xmlvalue(value) == "2016-03-04T21:14:42.123456Z" - value = datetime.datetime(2016, 3, 4, 21, 14, 42, tzinfo=pytz.utc) - value = value.astimezone(pytz.timezone("Europe/Amsterdam")) + value = datetime.datetime(2016, 3, 4, 21, 14, 42, tzinfo=datetime.timezone.utc) + value = value.astimezone(datetime.timezone(datetime.timedelta(hours=1))) assert instance.xmlvalue(value) == "2016-03-04T22:14:42+01:00" assert ( @@ -242,6 +243,8 @@ def test_pythonvalue(self): instance = builtins.Date() assert instance.pythonvalue("2016-03-04") == datetime.date(2016, 3, 4) assert instance.pythonvalue("2001-10-26+02:00") == datetime.date(2001, 10, 26) + assert instance.pythonvalue("2001-10-26-02:00") == datetime.date(2001, 10, 26) + assert instance.pythonvalue("2024-08-21-10:00") == datetime.date(2024, 8, 21) assert instance.pythonvalue("2001-10-26Z") == datetime.date(2001, 10, 26) assert instance.pythonvalue("2001-10-26+00:00") == datetime.date(2001, 10, 26) assert instance.pythonvalue("\r\n\t 2016-03-04 ") == datetime.date(2016, 3, 4) @@ -260,7 +263,7 @@ class TestgYearMonth: def test_xmlvalue(self): instance = builtins.gYearMonth() assert instance.xmlvalue((2012, 10, None)) == "2012-10" - assert instance.xmlvalue((2012, 10, pytz.utc)) == "2012-10Z" + assert instance.xmlvalue((2012, 10, datetime.timezone.utc)) == "2012-10Z" def test_pythonvalue(self): instance = builtins.gYearMonth() @@ -268,10 +271,14 @@ def test_pythonvalue(self): assert instance.pythonvalue("2001-10+02:00") == ( 2001, 10, - pytz.FixedOffset(120), + datetime.timezone(datetime.timedelta(minutes=120)), + ) + assert instance.pythonvalue("2001-10Z") == (2001, 10, datetime.timezone.utc) + assert instance.pythonvalue("2001-10+00:00") == ( + 2001, + 10, + datetime.timezone.utc, ) - assert instance.pythonvalue("2001-10Z") == (2001, 10, pytz.utc) - assert instance.pythonvalue("2001-10+00:00") == (2001, 10, pytz.utc) assert instance.pythonvalue("-2001-10") == (-2001, 10, None) assert instance.pythonvalue("-20001-10") == (-20001, 10, None) @@ -283,19 +290,22 @@ class TestgYear: def test_xmlvalue(self): instance = builtins.gYear() assert instance.xmlvalue((2001, None)) == "2001" - assert instance.xmlvalue((2001, pytz.utc)) == "2001Z" + assert instance.xmlvalue((2001, datetime.timezone.utc)) == "2001Z" def test_pythonvalue(self): instance = builtins.gYear() assert instance.pythonvalue("2001") == (2001, None) - assert instance.pythonvalue("2001+02:00") == (2001, pytz.FixedOffset(120)) - assert instance.pythonvalue("2001Z") == (2001, pytz.utc) - assert instance.pythonvalue("2001+00:00") == (2001, pytz.utc) + assert instance.pythonvalue("2001+02:00") == ( + 2001, + datetime.timezone(datetime.timedelta(minutes=120)), + ) + assert instance.pythonvalue("2001Z") == (2001, datetime.timezone.utc) + assert instance.pythonvalue("2001+00:00") == (2001, datetime.timezone.utc) assert instance.pythonvalue("-2001") == (-2001, None) assert instance.pythonvalue("-20000") == (-20000, None) assert instance.pythonvalue(" \t2001+02:00\r\n ") == ( 2001, - pytz.FixedOffset(120), + datetime.timezone(datetime.timedelta(minutes=120)), ) with pytest.raises(builtins.ParseError): @@ -310,9 +320,17 @@ def test_xmlvalue(self): def test_pythonvalue(self): instance = builtins.gMonthDay() assert instance.pythonvalue("--05-01") == (5, 1, None) - assert instance.pythonvalue("--11-01Z") == (11, 1, pytz.utc) - assert instance.pythonvalue("--11-01+02:00") == (11, 1, pytz.FixedOffset(120)) - assert instance.pythonvalue("--11-01-04:00") == (11, 1, pytz.FixedOffset(-240)) + assert instance.pythonvalue("--11-01Z") == (11, 1, datetime.timezone.utc) + assert instance.pythonvalue("--11-01+02:00") == ( + 11, + 1, + datetime.timezone(datetime.timedelta(minutes=120)), + ) + assert instance.pythonvalue("--11-01-04:00") == ( + 11, + 1, + datetime.timezone(datetime.timedelta(minutes=-240)), + ) assert instance.pythonvalue("--11-15") == (11, 15, None) assert instance.pythonvalue("--02-29") == (2, 29, None) assert instance.pythonvalue("\t\r\n --05-01 ") == (5, 1, None) @@ -329,12 +347,18 @@ def test_xmlvalue(self): def test_pythonvalue(self): instance = builtins.gMonth() assert instance.pythonvalue("--05") == (5, None) - assert instance.pythonvalue("--11Z") == (11, pytz.utc) - assert instance.pythonvalue("--11+02:00") == (11, pytz.FixedOffset(120)) - assert instance.pythonvalue("--11-04:00") == (11, pytz.FixedOffset(-240)) + assert instance.pythonvalue("--11Z") == (11, datetime.timezone.utc) + assert instance.pythonvalue("--11+02:00") == ( + 11, + datetime.timezone(datetime.timedelta(minutes=120)), + ) + assert instance.pythonvalue("--11-04:00") == ( + 11, + datetime.timezone(datetime.timedelta(minutes=-240)), + ) assert instance.pythonvalue("--11") == (11, None) assert instance.pythonvalue("--02") == (2, None) - assert instance.pythonvalue("\n\t --11Z \r") == (11, pytz.utc) + assert instance.pythonvalue("\n\t --11Z \r") == (11, datetime.timezone.utc) with pytest.raises(builtins.ParseError): assert instance.pythonvalue("99") @@ -347,18 +371,24 @@ def test_xmlvalue(self): value = (1, None) assert instance.xmlvalue(value) == "---01" - value = (1, pytz.FixedOffset(120)) + value = (1, datetime.timezone(datetime.timedelta(minutes=120))) assert instance.xmlvalue(value) == "---01+02:00" - value = (1, pytz.FixedOffset(-240)) + value = (1, datetime.timezone(datetime.timedelta(minutes=-240))) assert instance.xmlvalue(value) == "---01-04:00" def test_pythonvalue(self): instance = builtins.gDay() assert instance.pythonvalue("---01") == (1, None) - assert instance.pythonvalue("---01Z") == (1, pytz.utc) - assert instance.pythonvalue("---01+02:00") == (1, pytz.FixedOffset(120)) - assert instance.pythonvalue("---01-04:00") == (1, pytz.FixedOffset(-240)) + assert instance.pythonvalue("---01Z") == (1, datetime.timezone.utc) + assert instance.pythonvalue("---01+02:00") == ( + 1, + datetime.timezone(datetime.timedelta(minutes=120)), + ) + assert instance.pythonvalue("---01-04:00") == ( + 1, + datetime.timezone(datetime.timedelta(minutes=-240)), + ) assert instance.pythonvalue("---15") == (15, None) assert instance.pythonvalue("---31") == (31, None) assert instance.pythonvalue("\r\n \t---31 ") == (31, None) @@ -369,17 +399,18 @@ def test_pythonvalue(self): class TestHexBinary: def test_xmlvalue(self): instance = builtins.HexBinary() - assert instance.xmlvalue(b"\xFF") == b"\xFF" + assert instance.xmlvalue(b"\xff") == b"\xff" def test_pythonvalue(self): instance = builtins.HexBinary() - assert instance.pythonvalue(b"\xFF") == b"\xFF" + assert instance.pythonvalue(b"\xff") == b"\xff" class TestBase64Binary: def test_xmlvalue(self): instance = builtins.Base64Binary() assert instance.xmlvalue(b"hoi") == b"aG9p" + assert instance.xmlvalue("aG9p") == "aG9p" def test_pythonvalue(self): instance = builtins.Base64Binary() diff --git a/tests/test_xsd_extension.py b/tests/test_xsd_extension.py index c677f5aa4..def5760c4 100644 --- a/tests/test_xsd_extension.py +++ b/tests/test_xsd_extension.py @@ -1,5 +1,4 @@ import datetime -import io from lxml import etree @@ -437,9 +436,7 @@ def test_issue_221(): transport.bind( "https://www.w3.org/TR/xmldsig-core/xmldsig-core-schema.xsd", load_xml( - io.open("tests/wsdl_files/xmldsig-core-schema.xsd", "r") - .read() - .encode("utf-8") + open("tests/wsdl_files/xmldsig-core-schema.xsd").read().encode("utf-8") ), ) diff --git a/tests/test_xsd_schemas.py b/tests/test_xsd_schemas.py index 8a3a59d2a..70f35b972 100644 --- a/tests/test_xsd_schemas.py +++ b/tests/test_xsd_schemas.py @@ -1,5 +1,3 @@ -import io - import pytest from lxml import etree @@ -1021,7 +1019,7 @@ def test_xml_namespace(): def test_auto_import_known_schema(): - content = io.open("tests/wsdl_files/soap-enc.xsd", "rb").read() + content = open("tests/wsdl_files/soap-enc.xsd", "rb").read() transport = DummyTransport() transport.bind("http://schemas.xmlsoap.org/soap/encoding/", content) diff --git a/tests/test_xsd_types.py b/tests/test_xsd_types.py index 2c2a983d8..489a8a3aa 100644 --- a/tests/test_xsd_types.py +++ b/tests/test_xsd_types.py @@ -1,4 +1,4 @@ -from datetime import datetime, time +from datetime import datetime from decimal import Decimal import isodate diff --git a/tests/test_xsd_union.py b/tests/test_xsd_union.py index 389e8bd50..a0dbb6241 100644 --- a/tests/test_xsd_union.py +++ b/tests/test_xsd_union.py @@ -47,7 +47,7 @@ def test_union_same_types(): def test_union_mixed(): schema = xsd.Schema( load_xml( - """ + r"""