-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: add post-quantum cryptography (PQC) integration tests and fixtures #17586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,6 +18,8 @@ | |||||||||||||
| import os | ||||||||||||||
| import pytest | ||||||||||||||
| import pytest_asyncio | ||||||||||||||
| from requests.adapters import HTTPAdapter | ||||||||||||||
| from urllib3.poolmanager import PoolManager | ||||||||||||||
|
|
||||||||||||||
| from typing import Sequence, Tuple | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -328,7 +330,7 @@ def _read_response_metadata_stream(self): | |||||||||||||
| def intercept_unary_unary(self, continuation, client_call_details, request): | ||||||||||||||
| self._add_request_metadata(client_call_details) | ||||||||||||||
| response = continuation(client_call_details, request) | ||||||||||||||
| metadata = [(k, str(v)) for k, v in response.trailing_metadata()] | ||||||||||||||
| metadata = [(k, str(v)) for k, v in response.initial_metadata()] + [(k, str(v)) for k, v in response.trailing_metadata()] | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In gRPC Python,
Suggested change
References
|
||||||||||||||
| self.response_metadata = metadata | ||||||||||||||
| return response | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -453,36 +455,64 @@ async def intercepted_echo_grpc_async(): | |||||||||||||
| return EchoAsyncClient(transport=transport), interceptor | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class HostNameIgnoringAdapter(HTTPAdapter): | ||||||||||||||
| """Custom HTTPAdapter that disables hostname verification for local self-signed certs.""" | ||||||||||||||
| def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs): | ||||||||||||||
| self.poolmanager = PoolManager( | ||||||||||||||
| num_pools=connections, | ||||||||||||||
| maxsize=maxsize, | ||||||||||||||
| block=block, | ||||||||||||||
| assert_hostname=False, | ||||||||||||||
| **pool_kwargs | ||||||||||||||
| ) | ||||||||||||||
|
Comment on lines
+458
to
+467
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In class HostNameIgnoringAdapter(HTTPAdapter):
"""Custom HTTPAdapter that disables hostname verification for local self-signed certs."""
def cert_verify(self, conn, url, verify, cert):
super().cert_verify(conn, url, verify, cert)
conn.assert_hostname = False |
||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| @pytest.fixture | ||||||||||||||
| def intercepted_echo_rest(): | ||||||||||||||
| def intercepted_echo_rest(use_mtls): | ||||||||||||||
| transport_name = "rest" | ||||||||||||||
| transport_cls = EchoClient.get_transport_class(transport_name) | ||||||||||||||
| interceptor = EchoMetadataClientRestInterceptor() | ||||||||||||||
|
|
||||||||||||||
| # The custom host explicitly bypasses https. | ||||||||||||||
| url_scheme = "https" if use_mtls else "http" | ||||||||||||||
| transport = transport_cls( | ||||||||||||||
| credentials=ga_credentials.AnonymousCredentials(), | ||||||||||||||
| host="localhost:7469", | ||||||||||||||
| url_scheme="http", | ||||||||||||||
| url_scheme=url_scheme, | ||||||||||||||
| interceptor=interceptor, | ||||||||||||||
| ) | ||||||||||||||
| if use_mtls: | ||||||||||||||
| dir = os.path.dirname(__file__) | ||||||||||||||
| cert_path = os.path.join(dir, "../cert/mtls.crt") | ||||||||||||||
| key_path = os.path.join(dir, "../cert/mtls.key") | ||||||||||||||
|
Comment on lines
+484
to
+486
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using
Suggested change
|
||||||||||||||
| transport._session.verify = cert_path | ||||||||||||||
| transport._session.cert = (cert_path, key_path) | ||||||||||||||
| transport._session.mount("https://", HostNameIgnoringAdapter()) | ||||||||||||||
|
|
||||||||||||||
| return EchoClient(transport=transport), interceptor | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| @pytest.fixture | ||||||||||||||
| def intercepted_echo_rest_async(): | ||||||||||||||
| def intercepted_echo_rest_async(use_mtls): | ||||||||||||||
| if not HAS_ASYNC_REST_ECHO_TRANSPORT: | ||||||||||||||
| pytest.skip("Skipping test with async rest.") | ||||||||||||||
|
|
||||||||||||||
| transport_name = "rest_asyncio" | ||||||||||||||
| transport_cls = EchoAsyncClient.get_transport_class(transport_name) | ||||||||||||||
| interceptor = EchoMetadataClientRestAsyncInterceptor() | ||||||||||||||
|
|
||||||||||||||
| # The custom host explicitly bypasses https. | ||||||||||||||
| url_scheme = "https" if use_mtls else "http" | ||||||||||||||
| transport = transport_cls( | ||||||||||||||
| credentials=async_anonymous_credentials(), | ||||||||||||||
| host="localhost:7469", | ||||||||||||||
| url_scheme="http", | ||||||||||||||
| url_scheme=url_scheme, | ||||||||||||||
| interceptor=interceptor, | ||||||||||||||
| ) | ||||||||||||||
| if use_mtls: | ||||||||||||||
| dir = os.path.dirname(__file__) | ||||||||||||||
| cert_path = os.path.join(dir, "../cert/mtls.crt") | ||||||||||||||
| key_path = os.path.join(dir, "../cert/mtls.key") | ||||||||||||||
|
Comment on lines
+511
to
+513
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using
Suggested change
|
||||||||||||||
| transport._session.verify = cert_path | ||||||||||||||
| transport._session.cert = (cert_path, key_path) | ||||||||||||||
| transport._session.mount("https://", HostNameIgnoringAdapter()) | ||||||||||||||
|
|
||||||||||||||
| return EchoAsyncClient(transport=transport), interceptor | ||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,52 @@ | ||||||||||||||||||||||||
| # Copyright 2026 Google LLC | ||||||||||||||||||||||||
| # | ||||||||||||||||||||||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||
| # you may not use this file except in compliance with the License. | ||||||||||||||||||||||||
| # You may obtain a copy of the License at | ||||||||||||||||||||||||
| # | ||||||||||||||||||||||||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||
| # | ||||||||||||||||||||||||
| # Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||
| # See the License for the specific language governing permissions and | ||||||||||||||||||||||||
| # limitations under the License. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import pytest | ||||||||||||||||||||||||
| from google import showcase | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @pytest.fixture | ||||||||||||||||||||||||
| def run_pqc_test(use_mtls): | ||||||||||||||||||||||||
| if not use_mtls: | ||||||||||||||||||||||||
| pytest.skip("PQC integration test requires mTLS (--mtls flag) to be enabled.") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @pytest.mark.parametrize( | ||||||||||||||||||||||||
| "transport_fixture", | ||||||||||||||||||||||||
| ["intercepted_echo_grpc", "intercepted_echo_rest"] | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| def test_pqc_negotiated_group(run_pqc_test, request, transport_fixture): | ||||||||||||||||||||||||
| """Verifies that the generated client library negotiates PQC with the Showcase server.""" | ||||||||||||||||||||||||
| client, interceptor = request.getfixturevalue(transport_fixture) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Make secure call using the standard client library fixture | ||||||||||||||||||||||||
| response = client.echo(request=showcase.EchoRequest(content="Verify PQC connection.")) | ||||||||||||||||||||||||
| assert response.content == "Verify PQC connection." | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Extract negotiated group and supported groups from response headers | ||||||||||||||||||||||||
| negotiated_group = None | ||||||||||||||||||||||||
| supported_groups = None | ||||||||||||||||||||||||
| for key, value in interceptor.response_metadata: | ||||||||||||||||||||||||
| if key.lower() == "x-showcase-tls-group": | ||||||||||||||||||||||||
| negotiated_group = value | ||||||||||||||||||||||||
| elif key.lower() == "x-showcase-tls-client-supported-groups": | ||||||||||||||||||||||||
| supported_groups = value | ||||||||||||||||||||||||
|
Comment on lines
+38
to
+42
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Accessing
Suggested change
References
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| assert negotiated_group is not None, "Failed: Showcase server did not return negotiated TLS group header." | ||||||||||||||||||||||||
| assert supported_groups is not None, "Failed: Showcase server did not return client advertised supported groups." | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| print(f"\n[PQC Verification] ({transport_fixture}) Negotiated TLS Group: {negotiated_group}") | ||||||||||||||||||||||||
| print(f"[PQC Verification] ({transport_fixture}) Client Advertised Supported Groups: {supported_groups}") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Enforce PQC compliance (this will fail if not using MLKEM/Kyber) | ||||||||||||||||||||||||
| assert "MLKEM" in negotiated_group or "Kyber" in negotiated_group, \ | ||||||||||||||||||||||||
| f"Failed: {transport_fixture} Connection is NOT PQC-compliant! Negotiated: {negotiated_group}" | ||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the suggestion to use
cert_verifyinHostNameIgnoringAdapteris applied,PoolManageris no longer needed and its import can be removed.