From e6c889a2f6fa7eb4346698e0da331397f7154fbc Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 26 Jun 2026 20:00:30 +0000 Subject: [PATCH] feat: add post-quantum cryptography (PQC) integration tests and fixtures --- .../gapic-generator/tests/system/conftest.py | 44 +++++++++++++--- .../gapic-generator/tests/system/test_pqc.py | 52 +++++++++++++++++++ 2 files changed, 89 insertions(+), 7 deletions(-) create mode 100644 packages/gapic-generator/tests/system/test_pqc.py diff --git a/packages/gapic-generator/tests/system/conftest.py b/packages/gapic-generator/tests/system/conftest.py index 180e48b8d59a..5c00c99a5976 100644 --- a/packages/gapic-generator/tests/system/conftest.py +++ b/packages/gapic-generator/tests/system/conftest.py @@ -18,6 +18,8 @@ import os import pytest import pytest_asyncio +from requests.adapters import HTTPAdapter +from urllib3.poolmanager import PoolManager from typing import Sequence, Tuple @@ -328,7 +330,7 @@ def _read_response_metadata_stream(self): def intercept_unary_unary(self, continuation, client_call_details, request): self._add_request_metadata(client_call_details) response = continuation(client_call_details, request) - metadata = [(k, str(v)) for k, v in response.trailing_metadata()] + metadata = [(k, str(v)) for k, v in response.initial_metadata()] + [(k, str(v)) for k, v in response.trailing_metadata()] self.response_metadata = metadata return response @@ -453,24 +455,44 @@ async def intercepted_echo_grpc_async(): return EchoAsyncClient(transport=transport), interceptor +class HostNameIgnoringAdapter(HTTPAdapter): + """Custom HTTPAdapter that disables hostname verification for local self-signed certs.""" + def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs): + self.poolmanager = PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + assert_hostname=False, + **pool_kwargs + ) + + @pytest.fixture -def intercepted_echo_rest(): +def intercepted_echo_rest(use_mtls): transport_name = "rest" transport_cls = EchoClient.get_transport_class(transport_name) interceptor = EchoMetadataClientRestInterceptor() - # The custom host explicitly bypasses https. + url_scheme = "https" if use_mtls else "http" transport = transport_cls( credentials=ga_credentials.AnonymousCredentials(), host="localhost:7469", - url_scheme="http", + url_scheme=url_scheme, interceptor=interceptor, ) + if use_mtls: + dir = os.path.dirname(__file__) + cert_path = os.path.join(dir, "../cert/mtls.crt") + key_path = os.path.join(dir, "../cert/mtls.key") + transport._session.verify = cert_path + transport._session.cert = (cert_path, key_path) + transport._session.mount("https://", HostNameIgnoringAdapter()) + return EchoClient(transport=transport), interceptor @pytest.fixture -def intercepted_echo_rest_async(): +def intercepted_echo_rest_async(use_mtls): if not HAS_ASYNC_REST_ECHO_TRANSPORT: pytest.skip("Skipping test with async rest.") @@ -478,11 +500,19 @@ def intercepted_echo_rest_async(): transport_cls = EchoAsyncClient.get_transport_class(transport_name) interceptor = EchoMetadataClientRestAsyncInterceptor() - # The custom host explicitly bypasses https. + url_scheme = "https" if use_mtls else "http" transport = transport_cls( credentials=async_anonymous_credentials(), host="localhost:7469", - url_scheme="http", + url_scheme=url_scheme, interceptor=interceptor, ) + if use_mtls: + dir = os.path.dirname(__file__) + cert_path = os.path.join(dir, "../cert/mtls.crt") + key_path = os.path.join(dir, "../cert/mtls.key") + transport._session.verify = cert_path + transport._session.cert = (cert_path, key_path) + transport._session.mount("https://", HostNameIgnoringAdapter()) + return EchoAsyncClient(transport=transport), interceptor diff --git a/packages/gapic-generator/tests/system/test_pqc.py b/packages/gapic-generator/tests/system/test_pqc.py new file mode 100644 index 000000000000..127c2f9edc74 --- /dev/null +++ b/packages/gapic-generator/tests/system/test_pqc.py @@ -0,0 +1,52 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from google import showcase + +@pytest.fixture +def run_pqc_test(use_mtls): + if not use_mtls: + pytest.skip("PQC integration test requires mTLS (--mtls flag) to be enabled.") + +@pytest.mark.parametrize( + "transport_fixture", + ["intercepted_echo_grpc", "intercepted_echo_rest"] +) +def test_pqc_negotiated_group(run_pqc_test, request, transport_fixture): + """Verifies that the generated client library negotiates PQC with the Showcase server.""" + client, interceptor = request.getfixturevalue(transport_fixture) + + # Make secure call using the standard client library fixture + response = client.echo(request=showcase.EchoRequest(content="Verify PQC connection.")) + assert response.content == "Verify PQC connection." + + # Extract negotiated group and supported groups from response headers + negotiated_group = None + supported_groups = None + for key, value in interceptor.response_metadata: + if key.lower() == "x-showcase-tls-group": + negotiated_group = value + elif key.lower() == "x-showcase-tls-client-supported-groups": + supported_groups = value + + assert negotiated_group is not None, "Failed: Showcase server did not return negotiated TLS group header." + assert supported_groups is not None, "Failed: Showcase server did not return client advertised supported groups." + + print(f"\n[PQC Verification] ({transport_fixture}) Negotiated TLS Group: {negotiated_group}") + print(f"[PQC Verification] ({transport_fixture}) Client Advertised Supported Groups: {supported_groups}") + + # Enforce PQC compliance (this will fail if not using MLKEM/Kyber) + assert "MLKEM" in negotiated_group or "Kyber" in negotiated_group, \ + f"Failed: {transport_fixture} Connection is NOT PQC-compliant! Negotiated: {negotiated_group}"