From ac60f0e365ab3730934c4ed9b37e876b9060db73 Mon Sep 17 00:00:00 2001 From: Nicolas Brainez Date: Thu, 12 Mar 2026 23:04:23 +0100 Subject: [PATCH] fix: support multiple X509Certificate elements in X509Data (closes #731) --- src/saml2/mdstore.py | 7 ++-- src/saml2/metadata.py | 6 +-- src/saml2/s2repoze/plugins/sp.py | 2 +- src/saml2/sigver.py | 32 +++++++-------- src/saml2/xmldsig/__init__.py | 4 +- tests/test_00_xmldsig.py | 25 +++++++++--- tests/test_02_saml.py | 2 +- tests/test_30_mdstore.py | 69 ++++++++++++++++++++++++++++++++ tests/test_82_pefim.py | 4 +- tests/test_93_hok.py | 2 +- 10 files changed, 119 insertions(+), 34 deletions(-) diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py index 2ea9742f0..9b3c342a5 100644 --- a/src/saml2/mdstore.py +++ b/src/saml2/mdstore.py @@ -492,9 +492,10 @@ def extract_certs(srvs): key_name_txt = key_name.get("text") if "use" not in key or key_use == use: for dat in key_info["x509_data"]: - cert = repack_cert(dat["x509_certificate"]["text"]) - if cert not in res: - res.append((key_name_txt, cert)) + for x509_cert in dat.get("x509_certificate", []): + cert = repack_cert(x509_cert["text"]) + if cert not in res: + res.append((key_name_txt, cert)) return res diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py index 3961c869f..3091be61e 100644 --- a/src/saml2/metadata.py +++ b/src/saml2/metadata.py @@ -184,7 +184,7 @@ def do_key_descriptor(cert=None, enc_cert=None, use="both"): for _cert in cert: kd_list.append( md.KeyDescriptor( - key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=_cert))), + key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=_cert)])), use="signing", ) ) @@ -194,13 +194,13 @@ def do_key_descriptor(cert=None, enc_cert=None, use="both"): for _enc_cert in enc_cert: kd_list.append( md.KeyDescriptor( - key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=_enc_cert))), + key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=_enc_cert)])), use="encryption", ) ) if len(kd_list) == 0 and cert is not None: return md.KeyDescriptor( - key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=cert))) + key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=cert)])) ) return kd_list diff --git a/src/saml2/s2repoze/plugins/sp.py b/src/saml2/s2repoze/plugins/sp.py index 7079d96fe..7ee4f4e96 100644 --- a/src/saml2/s2repoze/plugins/sp.py +++ b/src/saml2/s2repoze/plugins/sp.py @@ -323,7 +323,7 @@ def challenge(self, environ, _status, _app_headers, _forget_headers): if _cli.config.generate_cert_func is not None: cert_str, req_key_str = _cli.config.generate_cert_func() cert = {"cert": cert_str, "key": req_key_str} - spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=cert_str))) + spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=cert_str)])) extensions = Extensions(extension_elements=[element_to_extension_element(spcertenc)]) if _cli.authn_requests_signed: diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 738ac04b1..698f0251c 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -397,13 +397,13 @@ def cert_from_key_info(key_info, ignore_age=False): """ res = [] for x509_data in key_info.x509_data: - x509_certificate = x509_data.x509_certificate - cert = x509_certificate.text.strip() - cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64)) - if ignore_age or active_cert(cert): - res.append(cert) - else: - logger.info("Inactive cert") + for x509_certificate in x509_data.x509_certificate: + cert = x509_certificate.text.strip() + cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64)) + if ignore_age or active_cert(cert): + res.append(cert) + else: + logger.info("Inactive cert") return res @@ -423,13 +423,13 @@ def cert_from_key_info_dict(key_info, ignore_age=False): return res for x509_data in key_info["x509_data"]: - x509_certificate = x509_data["x509_certificate"] - cert = x509_certificate["text"].strip() - cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64)) - if ignore_age or active_cert(cert): - res.append(cert) - else: - logger.info("Inactive cert") + for x509_certificate in x509_data.get("x509_certificate", []): + cert = x509_certificate["text"].strip() + cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64)) + if ignore_age or active_cert(cert): + res.append(cert) + else: + logger.info("Inactive cert") return res @@ -1024,7 +1024,7 @@ def encrypt_cert_from_item(item): if isinstance(_tmp_elem, SPCertEnc): for _tmp_key_info in _tmp_elem.key_info: if _tmp_key_info.x509_data is not None and len(_tmp_key_info.x509_data) > 0: - _encrypt_cert = _tmp_key_info.x509_data[0].x509_certificate.text + _encrypt_cert = _tmp_key_info.x509_data[0].x509_certificate[0].text break except Exception: pass @@ -1846,7 +1846,7 @@ def pre_encryption_part( msg_encryption_method = EncryptionMethod(algorithm=msg_enc) key_encryption_method = EncryptionMethod(algorithm=key_enc) - x509_data = ds.X509Data(x509_certificate=ds.X509Certificate(text=encrypt_cert)) if encrypt_cert else None + x509_data = ds.X509Data(x509_certificate=[ds.X509Certificate(text=encrypt_cert)]) if encrypt_cert else None key_name = ds.KeyName(text=key_name) if key_name else None key_info = ds.KeyInfo(key_name=key_name, x509_data=x509_data) if key_name or x509_data else None diff --git a/src/saml2/xmldsig/__init__.py b/src/saml2/xmldsig/__init__.py index 85f892632..ace2b1a8c 100644 --- a/src/saml2/xmldsig/__init__.py +++ b/src/saml2/xmldsig/__init__.py @@ -992,8 +992,8 @@ class X509DataType_(SamlBase): c_cardinality["x509_ski"] = {"min": 0, "max": 1} c_children["{http://www.w3.org/2000/09/xmldsig#}X509SubjectName"] = ("x509_subject_name", X509SubjectName) c_cardinality["x509_subject_name"] = {"min": 0, "max": 1} - c_children["{http://www.w3.org/2000/09/xmldsig#}X509Certificate"] = ("x509_certificate", X509Certificate) - c_cardinality["x509_certificate"] = {"min": 0, "max": 1} + c_children["{http://www.w3.org/2000/09/xmldsig#}X509Certificate"] = ("x509_certificate", [X509Certificate]) + c_cardinality["x509_certificate"] = {"min": 0} c_children["{http://www.w3.org/2000/09/xmldsig#}X509CRL"] = ("x509_crl", X509CRL) c_cardinality["x509_crl"] = {"min": 0, "max": 1} c_child_order.extend(["x509_issuer_serial", "x509_ski", "x509_subject_name", "x509_certificate", "x509_crl"]) diff --git a/tests/test_00_xmldsig.py b/tests/test_00_xmldsig.py index 059e8a049..0715594ff 100644 --- a/tests/test_00_xmldsig.py +++ b/tests/test_00_xmldsig.py @@ -137,7 +137,7 @@ def testAccessors(self): self.x509_data.x509_issuer_serial = st self.x509_data.x509_ski = ds.X509SKI(text="x509 ski") self.x509_data.x509_subject_name = ds.X509SubjectName(text="x509 subject name") - self.x509_data.x509_certificate = ds.X509Certificate(text="x509 certificate") + self.x509_data.x509_certificate = [ds.X509Certificate(text="x509 certificate")] self.x509_data.x509_crl = ds.X509CRL(text="x509 crl") new_x509_data = ds.x509_data_from_string(self.x509_data.to_string()) @@ -149,8 +149,9 @@ def testAccessors(self): assert isinstance(new_x509_data.x509_ski, ds.X509SKI) assert new_x509_data.x509_subject_name.text.strip() == "x509 subject name" assert isinstance(new_x509_data.x509_subject_name, ds.X509SubjectName) - assert new_x509_data.x509_certificate.text.strip() == "x509 certificate" - assert isinstance(new_x509_data.x509_certificate, ds.X509Certificate) + assert len(new_x509_data.x509_certificate) == 1 + assert new_x509_data.x509_certificate[0].text.strip() == "x509 certificate" + assert isinstance(new_x509_data.x509_certificate[0], ds.X509Certificate) assert new_x509_data.x509_crl.text.strip() == "x509 crl" assert isinstance(new_x509_data.x509_crl, ds.X509CRL) @@ -162,12 +163,26 @@ def testUsingTestData(self): assert isinstance(new_x509_data.x509_ski, ds.X509SKI) assert new_x509_data.x509_subject_name.text.strip() == "x509 subject name" assert isinstance(new_x509_data.x509_subject_name, ds.X509SubjectName) - assert new_x509_data.x509_certificate.text.strip() == "x509 certificate" - assert isinstance(new_x509_data.x509_certificate, ds.X509Certificate) + assert len(new_x509_data.x509_certificate) == 1 + assert new_x509_data.x509_certificate[0].text.strip() == "x509 certificate" + assert isinstance(new_x509_data.x509_certificate[0], ds.X509Certificate) assert new_x509_data.x509_crl.text.strip() == "x509 crl" assert isinstance(new_x509_data.x509_crl, ds.X509CRL) + def testMultipleCertificatesInX509Data(self): + """Test that multiple X509Certificate elements in a single X509Data are parsed correctly""" + xml_string = """ + cert_one + cert_two + """ + x509_data = ds.x509_data_from_string(xml_string) + assert isinstance(x509_data.x509_certificate, list) + assert len(x509_data.x509_certificate) == 2 + assert x509_data.x509_certificate[0].text.strip() == "cert_one" + assert x509_data.x509_certificate[1].text.strip() == "cert_two" + + class TestTransform: def setup_class(self): self.transform = ds.Transform() diff --git a/tests/test_02_saml.py b/tests/test_02_saml.py index a090d89f2..330877afe 100644 --- a/tests/test_02_saml.py +++ b/tests/test_02_saml.py @@ -870,7 +870,7 @@ def testHolderOfKeyUsingTestData(self): "Vx/jWjX2g5SDbjItH6VGy6C9GCGf1A07VxFRCfJn5tA9HuJjPKiE+g/BmrV5N4Ce" "alzFxPHWYkNOzoRU8qI7OqUai1kL" ) - xcert = key_info[0].x509_data[0].x509_certificate + xcert = key_info[0].x509_data[0].x509_certificate[0] assert xcert.text.strip().replace("\n", "") == expected_cert diff --git a/tests/test_30_mdstore.py b/tests/test_30_mdstore.py index 6f6632014..0504242b2 100644 --- a/tests/test_30_mdstore.py +++ b/tests/test_30_mdstore.py @@ -624,6 +624,75 @@ def test_get_certs_from_metadata_without_keydescriptor(): assert len(certs) == 0 +TEST_SIGNING_CERT = """MIIDQTCCAimgAwIBAgIUX8x3mr8kEGTCYgQit+f8m4EqanYwDQYJKoZIhvcNAQEL +BQAwNzELMAkGA1UEBhMCQkUxETAPBgNVBAgMCEJydXNzZWxzMRUwEwYDVQQKDAxU +ZXN0IFJvb3QgQ0EwHhcNMjYwMzEyMjE1NTI1WhcNMjcwMzEyMjE1NTI1WjA6MQsw +CQYDVQQGEwJCRTERMA8GA1UECAwIQnJ1c3NlbHMxGDAWBgNVBAoMD1Rlc3QgU2ln +bmluZyBTQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALKxUPRwpNyx +H+0jiq+Pv8f0Q70zwdT/e9i03G/MG8JUctm69cFTo+snhbGhWbzpldyr+g8Tw8XA +saWNMG8UvuZIh3YEty3viyCp1urHki0cNPKSdVkTU35TkmMskaIoxG6FgjPvpACt +Z7XIjI8Ya9Y+GTkf8snWXPqL01ROH/0SwJkC1af77yoy8RfHJp9jvU5t+ulEfezn +dR1iwFYCbq43FAL6kuvW+9Y1G1d69nXrcrKno0vYJ0UaLXigYWG6x278aUHEOR1o +NmWb3wbXjSaZkgNdCPU3xt2riUF4LeoAOQFbSGmjtTmGXnpc8gDzras4XfmMhNQ3 +bsKxcWD3L/cCAwEAAaNCMEAwHQYDVR0OBBYEFJp0YG00psBAXr9N/yStQeZKWIHT +MB8GA1UdIwQYMBaAFOURsA+UoO8h3d8ea4CnSLSlEFXuMA0GCSqGSIb3DQEBCwUA +A4IBAQAId3IHM9nE5T+qwIcM6v/gweuIjiFakhAIwPWsTAbAIF9Q2P8CWkCF6lVG +XUvB9l2SCvZCwYpQZNSb1mn43qSBfcnX3qwe1MljFeL8p54Z2AVVK3LCq3FiS+9N +kbMEZkfm5hjH6D4MnSBSZE3NVB++QtnY4MtZ0kit1F2ziki+ptieE3DS6fbDKItN +SZpm6i0267ujZZuzV5Mi1LDtaMujHwdEfDuA7zCqYxLF645h2BFzr6WhJIbTEWfX +JRKjESU8gDk6jP3D/0HgqOjI0Z+n6p/L7PYi+0Hr8h6ezUHB+wywtxgTL5XCJ9T1 +M4skhVpmS5z0l0Mz0VaJ+fvgmv1G""" + +TEST_CA_CERT = """MIIDTzCCAjegAwIBAgIUCiB5e+gHpFzhSOVclgujbA9sKwswDQYJKoZIhvcNAQEL +BQAwNzELMAkGA1UEBhMCQkUxETAPBgNVBAgMCEJydXNzZWxzMRUwEwYDVQQKDAxU +ZXN0IFJvb3QgQ0EwHhcNMjYwMzEyMjE1NTI1WhcNMjcwMzEyMjE1NTI1WjA3MQsw +CQYDVQQGEwJCRTERMA8GA1UECAwIQnJ1c3NlbHMxFTATBgNVBAoMDFRlc3QgUm9v +dCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMVD3uB4wPW5WWCe +pfEesnxQOzZ5azJ7iVd4MmCaJKZuesNJ3K56eO7U5wh3EW01gbZtbgpr2G7sjHcG +YIE4ILKew8RMZRFgdjm2SEv3zhiU1RiQTNYMs6w2fE9u5d/JwYO2QQXAsK3mAvRx +Dr6XvxdtkWTXDJP0EXkpcsIMHHiLKfUE1H3stmaJXzzFf1w9laErSXuSw8iExp45 +VxNB0iJRQWmd1Gov3QG+yTTE6kuv8NTG1AAYlW7V3YoMlSSrEXRoWcJ4U+/jKjy0 +4wUGndn3D29to0BaJFJp2+DfnPKo3689qaGCs4bW4Hu6ahBom91EOWvcVR8mnwaE +W/EtdP8CAwEAAaNTMFEwHQYDVR0OBBYEFOURsA+UoO8h3d8ea4CnSLSlEFXuMB8G +A1UdIwQYMBaAFOURsA+UoO8h3d8ea4CnSLSlEFXuMA8GA1UdEwEB/wQFMAMBAf8w +DQYJKoZIhvcNAQELBQADggEBAKUrGs+0rtWGyRbHoNzZGR573r/msJm/wWewvqsO ++yNsr6yX/v98CR06yZfIz68bxEuYvMXq33T5TybXMM64L2zVjNMynIOt5Hhky4Dy +/Cxew1RcQYjqK9LFb+hTyek+sDhVx6Y9OsgsLQSAkJ2ySHMxJsstpGATGPt0+8Y+ +7dKdG1eMTjeGSYK4keHoX8xL4iO3npOXd3cKzr3OZ60ikNhQaPFhizCTq2XN9Bj7 +ufNZ3l6CCfFRIKV9dmF5SJ7NkpAX3hnrGeLZ0WqyTIZTct4q496cOO3WEVoInys6 +ckaoNnueMSDx2MfF//Nc63XDtlpz57PBy7Qw03xooFEfoBU=""" + + +def test_get_certs_from_metadata_with_cert_chain(): + """Test that certs() returns all certificates when X509Data contains a chain""" + metadata_xml = """ + + + + + + {signing_cert} + {ca_cert} + + + + + + + """.format(signing_cert=TEST_SIGNING_CERT, ca_cert=TEST_CA_CERT) + + mds = MetadataStore(ATTRCONV, None) + mds.imp([{"class": "saml2.mdstore.InMemoryMetaData", "metadata": [(metadata_xml,)]}]) + certs = mds.certs("http://idp.example.com/metadata", "any", "signing") + # Both certs from the chain should be returned + assert len(certs) == 2 + + def test_metadata_extension_algsupport(): mds = MetadataStore(ATTRCONV, None) mds.imp(METADATACONF["12"]) diff --git a/tests/test_82_pefim.py b/tests/test_82_pefim.py index ce223f272..1c0d86e9c 100644 --- a/tests/test_82_pefim.py +++ b/tests/test_82_pefim.py @@ -22,7 +22,7 @@ # place a certificate in an authn request cert = read_cert_from_file(full_path("test.pem")) -spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=cert))) +spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=cert)])) extensions = Extensions(extension_elements=[element_to_extension_element(spcertenc)]) @@ -47,5 +47,5 @@ assert len(_elem) == 1 _spcertenc = _elem[0] -_cert = _spcertenc.key_info[0].x509_data[0].x509_certificate.text +_cert = _spcertenc.key_info[0].x509_data[0].x509_certificate[0].text assert cert == _cert diff --git a/tests/test_93_hok.py b/tests/test_93_hok.py index aaf8ce0f3..905de5d63 100644 --- a/tests/test_93_hok.py +++ b/tests/test_93_hok.py @@ -23,7 +23,7 @@ def test_valid_hok_response_is_parsed(self): assert len(resp.assertion.subject.subject_confirmation) == 2 actual_hok_certs = [ - ki.x509_data[0].x509_certificate.text.strip() + ki.x509_data[0].x509_certificate[0].text.strip() for sc in resp.assertion.subject.subject_confirmation for ki in sc.subject_confirmation_data.extensions_as_elements(ds.KeyInfo.c_tag, ds) ]