diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py
index 2ea9742f0..9b3c342a5 100644
--- a/src/saml2/mdstore.py
+++ b/src/saml2/mdstore.py
@@ -492,9 +492,10 @@ def extract_certs(srvs):
key_name_txt = key_name.get("text")
if "use" not in key or key_use == use:
for dat in key_info["x509_data"]:
- cert = repack_cert(dat["x509_certificate"]["text"])
- if cert not in res:
- res.append((key_name_txt, cert))
+ for x509_cert in dat.get("x509_certificate", []):
+ cert = repack_cert(x509_cert["text"])
+ if cert not in res:
+ res.append((key_name_txt, cert))
return res
diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py
index 3961c869f..3091be61e 100644
--- a/src/saml2/metadata.py
+++ b/src/saml2/metadata.py
@@ -184,7 +184,7 @@ def do_key_descriptor(cert=None, enc_cert=None, use="both"):
for _cert in cert:
kd_list.append(
md.KeyDescriptor(
- key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=_cert))),
+ key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=_cert)])),
use="signing",
)
)
@@ -194,13 +194,13 @@ def do_key_descriptor(cert=None, enc_cert=None, use="both"):
for _enc_cert in enc_cert:
kd_list.append(
md.KeyDescriptor(
- key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=_enc_cert))),
+ key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=_enc_cert)])),
use="encryption",
)
)
if len(kd_list) == 0 and cert is not None:
return md.KeyDescriptor(
- key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=cert)))
+ key_info=ds.KeyInfo(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=cert)]))
)
return kd_list
diff --git a/src/saml2/s2repoze/plugins/sp.py b/src/saml2/s2repoze/plugins/sp.py
index 7079d96fe..7ee4f4e96 100644
--- a/src/saml2/s2repoze/plugins/sp.py
+++ b/src/saml2/s2repoze/plugins/sp.py
@@ -323,7 +323,7 @@ def challenge(self, environ, _status, _app_headers, _forget_headers):
if _cli.config.generate_cert_func is not None:
cert_str, req_key_str = _cli.config.generate_cert_func()
cert = {"cert": cert_str, "key": req_key_str}
- spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=cert_str)))
+ spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=cert_str)]))
extensions = Extensions(extension_elements=[element_to_extension_element(spcertenc)])
if _cli.authn_requests_signed:
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index 738ac04b1..698f0251c 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -397,13 +397,13 @@ def cert_from_key_info(key_info, ignore_age=False):
"""
res = []
for x509_data in key_info.x509_data:
- x509_certificate = x509_data.x509_certificate
- cert = x509_certificate.text.strip()
- cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64))
- if ignore_age or active_cert(cert):
- res.append(cert)
- else:
- logger.info("Inactive cert")
+ for x509_certificate in x509_data.x509_certificate:
+ cert = x509_certificate.text.strip()
+ cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64))
+ if ignore_age or active_cert(cert):
+ res.append(cert)
+ else:
+ logger.info("Inactive cert")
return res
@@ -423,13 +423,13 @@ def cert_from_key_info_dict(key_info, ignore_age=False):
return res
for x509_data in key_info["x509_data"]:
- x509_certificate = x509_data["x509_certificate"]
- cert = x509_certificate["text"].strip()
- cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64))
- if ignore_age or active_cert(cert):
- res.append(cert)
- else:
- logger.info("Inactive cert")
+ for x509_certificate in x509_data.get("x509_certificate", []):
+ cert = x509_certificate["text"].strip()
+ cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64))
+ if ignore_age or active_cert(cert):
+ res.append(cert)
+ else:
+ logger.info("Inactive cert")
return res
@@ -1024,7 +1024,7 @@ def encrypt_cert_from_item(item):
if isinstance(_tmp_elem, SPCertEnc):
for _tmp_key_info in _tmp_elem.key_info:
if _tmp_key_info.x509_data is not None and len(_tmp_key_info.x509_data) > 0:
- _encrypt_cert = _tmp_key_info.x509_data[0].x509_certificate.text
+ _encrypt_cert = _tmp_key_info.x509_data[0].x509_certificate[0].text
break
except Exception:
pass
@@ -1846,7 +1846,7 @@ def pre_encryption_part(
msg_encryption_method = EncryptionMethod(algorithm=msg_enc)
key_encryption_method = EncryptionMethod(algorithm=key_enc)
- x509_data = ds.X509Data(x509_certificate=ds.X509Certificate(text=encrypt_cert)) if encrypt_cert else None
+ x509_data = ds.X509Data(x509_certificate=[ds.X509Certificate(text=encrypt_cert)]) if encrypt_cert else None
key_name = ds.KeyName(text=key_name) if key_name else None
key_info = ds.KeyInfo(key_name=key_name, x509_data=x509_data) if key_name or x509_data else None
diff --git a/src/saml2/xmldsig/__init__.py b/src/saml2/xmldsig/__init__.py
index 85f892632..ace2b1a8c 100644
--- a/src/saml2/xmldsig/__init__.py
+++ b/src/saml2/xmldsig/__init__.py
@@ -992,8 +992,8 @@ class X509DataType_(SamlBase):
c_cardinality["x509_ski"] = {"min": 0, "max": 1}
c_children["{http://www.w3.org/2000/09/xmldsig#}X509SubjectName"] = ("x509_subject_name", X509SubjectName)
c_cardinality["x509_subject_name"] = {"min": 0, "max": 1}
- c_children["{http://www.w3.org/2000/09/xmldsig#}X509Certificate"] = ("x509_certificate", X509Certificate)
- c_cardinality["x509_certificate"] = {"min": 0, "max": 1}
+ c_children["{http://www.w3.org/2000/09/xmldsig#}X509Certificate"] = ("x509_certificate", [X509Certificate])
+ c_cardinality["x509_certificate"] = {"min": 0}
c_children["{http://www.w3.org/2000/09/xmldsig#}X509CRL"] = ("x509_crl", X509CRL)
c_cardinality["x509_crl"] = {"min": 0, "max": 1}
c_child_order.extend(["x509_issuer_serial", "x509_ski", "x509_subject_name", "x509_certificate", "x509_crl"])
diff --git a/tests/test_00_xmldsig.py b/tests/test_00_xmldsig.py
index 059e8a049..0715594ff 100644
--- a/tests/test_00_xmldsig.py
+++ b/tests/test_00_xmldsig.py
@@ -137,7 +137,7 @@ def testAccessors(self):
self.x509_data.x509_issuer_serial = st
self.x509_data.x509_ski = ds.X509SKI(text="x509 ski")
self.x509_data.x509_subject_name = ds.X509SubjectName(text="x509 subject name")
- self.x509_data.x509_certificate = ds.X509Certificate(text="x509 certificate")
+ self.x509_data.x509_certificate = [ds.X509Certificate(text="x509 certificate")]
self.x509_data.x509_crl = ds.X509CRL(text="x509 crl")
new_x509_data = ds.x509_data_from_string(self.x509_data.to_string())
@@ -149,8 +149,9 @@ def testAccessors(self):
assert isinstance(new_x509_data.x509_ski, ds.X509SKI)
assert new_x509_data.x509_subject_name.text.strip() == "x509 subject name"
assert isinstance(new_x509_data.x509_subject_name, ds.X509SubjectName)
- assert new_x509_data.x509_certificate.text.strip() == "x509 certificate"
- assert isinstance(new_x509_data.x509_certificate, ds.X509Certificate)
+ assert len(new_x509_data.x509_certificate) == 1
+ assert new_x509_data.x509_certificate[0].text.strip() == "x509 certificate"
+ assert isinstance(new_x509_data.x509_certificate[0], ds.X509Certificate)
assert new_x509_data.x509_crl.text.strip() == "x509 crl"
assert isinstance(new_x509_data.x509_crl, ds.X509CRL)
@@ -162,12 +163,26 @@ def testUsingTestData(self):
assert isinstance(new_x509_data.x509_ski, ds.X509SKI)
assert new_x509_data.x509_subject_name.text.strip() == "x509 subject name"
assert isinstance(new_x509_data.x509_subject_name, ds.X509SubjectName)
- assert new_x509_data.x509_certificate.text.strip() == "x509 certificate"
- assert isinstance(new_x509_data.x509_certificate, ds.X509Certificate)
+ assert len(new_x509_data.x509_certificate) == 1
+ assert new_x509_data.x509_certificate[0].text.strip() == "x509 certificate"
+ assert isinstance(new_x509_data.x509_certificate[0], ds.X509Certificate)
assert new_x509_data.x509_crl.text.strip() == "x509 crl"
assert isinstance(new_x509_data.x509_crl, ds.X509CRL)
+ def testMultipleCertificatesInX509Data(self):
+ """Test that multiple X509Certificate elements in a single X509Data are parsed correctly"""
+ xml_string = """
+ cert_one
+ cert_two
+ """
+ x509_data = ds.x509_data_from_string(xml_string)
+ assert isinstance(x509_data.x509_certificate, list)
+ assert len(x509_data.x509_certificate) == 2
+ assert x509_data.x509_certificate[0].text.strip() == "cert_one"
+ assert x509_data.x509_certificate[1].text.strip() == "cert_two"
+
+
class TestTransform:
def setup_class(self):
self.transform = ds.Transform()
diff --git a/tests/test_02_saml.py b/tests/test_02_saml.py
index a090d89f2..330877afe 100644
--- a/tests/test_02_saml.py
+++ b/tests/test_02_saml.py
@@ -870,7 +870,7 @@ def testHolderOfKeyUsingTestData(self):
"Vx/jWjX2g5SDbjItH6VGy6C9GCGf1A07VxFRCfJn5tA9HuJjPKiE+g/BmrV5N4Ce"
"alzFxPHWYkNOzoRU8qI7OqUai1kL"
)
- xcert = key_info[0].x509_data[0].x509_certificate
+ xcert = key_info[0].x509_data[0].x509_certificate[0]
assert xcert.text.strip().replace("\n", "") == expected_cert
diff --git a/tests/test_30_mdstore.py b/tests/test_30_mdstore.py
index 6f6632014..0504242b2 100644
--- a/tests/test_30_mdstore.py
+++ b/tests/test_30_mdstore.py
@@ -624,6 +624,75 @@ def test_get_certs_from_metadata_without_keydescriptor():
assert len(certs) == 0
+TEST_SIGNING_CERT = """MIIDQTCCAimgAwIBAgIUX8x3mr8kEGTCYgQit+f8m4EqanYwDQYJKoZIhvcNAQEL
+BQAwNzELMAkGA1UEBhMCQkUxETAPBgNVBAgMCEJydXNzZWxzMRUwEwYDVQQKDAxU
+ZXN0IFJvb3QgQ0EwHhcNMjYwMzEyMjE1NTI1WhcNMjcwMzEyMjE1NTI1WjA6MQsw
+CQYDVQQGEwJCRTERMA8GA1UECAwIQnJ1c3NlbHMxGDAWBgNVBAoMD1Rlc3QgU2ln
+bmluZyBTQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALKxUPRwpNyx
+H+0jiq+Pv8f0Q70zwdT/e9i03G/MG8JUctm69cFTo+snhbGhWbzpldyr+g8Tw8XA
+saWNMG8UvuZIh3YEty3viyCp1urHki0cNPKSdVkTU35TkmMskaIoxG6FgjPvpACt
+Z7XIjI8Ya9Y+GTkf8snWXPqL01ROH/0SwJkC1af77yoy8RfHJp9jvU5t+ulEfezn
+dR1iwFYCbq43FAL6kuvW+9Y1G1d69nXrcrKno0vYJ0UaLXigYWG6x278aUHEOR1o
+NmWb3wbXjSaZkgNdCPU3xt2riUF4LeoAOQFbSGmjtTmGXnpc8gDzras4XfmMhNQ3
+bsKxcWD3L/cCAwEAAaNCMEAwHQYDVR0OBBYEFJp0YG00psBAXr9N/yStQeZKWIHT
+MB8GA1UdIwQYMBaAFOURsA+UoO8h3d8ea4CnSLSlEFXuMA0GCSqGSIb3DQEBCwUA
+A4IBAQAId3IHM9nE5T+qwIcM6v/gweuIjiFakhAIwPWsTAbAIF9Q2P8CWkCF6lVG
+XUvB9l2SCvZCwYpQZNSb1mn43qSBfcnX3qwe1MljFeL8p54Z2AVVK3LCq3FiS+9N
+kbMEZkfm5hjH6D4MnSBSZE3NVB++QtnY4MtZ0kit1F2ziki+ptieE3DS6fbDKItN
+SZpm6i0267ujZZuzV5Mi1LDtaMujHwdEfDuA7zCqYxLF645h2BFzr6WhJIbTEWfX
+JRKjESU8gDk6jP3D/0HgqOjI0Z+n6p/L7PYi+0Hr8h6ezUHB+wywtxgTL5XCJ9T1
+M4skhVpmS5z0l0Mz0VaJ+fvgmv1G"""
+
+TEST_CA_CERT = """MIIDTzCCAjegAwIBAgIUCiB5e+gHpFzhSOVclgujbA9sKwswDQYJKoZIhvcNAQEL
+BQAwNzELMAkGA1UEBhMCQkUxETAPBgNVBAgMCEJydXNzZWxzMRUwEwYDVQQKDAxU
+ZXN0IFJvb3QgQ0EwHhcNMjYwMzEyMjE1NTI1WhcNMjcwMzEyMjE1NTI1WjA3MQsw
+CQYDVQQGEwJCRTERMA8GA1UECAwIQnJ1c3NlbHMxFTATBgNVBAoMDFRlc3QgUm9v
+dCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMVD3uB4wPW5WWCe
+pfEesnxQOzZ5azJ7iVd4MmCaJKZuesNJ3K56eO7U5wh3EW01gbZtbgpr2G7sjHcG
+YIE4ILKew8RMZRFgdjm2SEv3zhiU1RiQTNYMs6w2fE9u5d/JwYO2QQXAsK3mAvRx
+Dr6XvxdtkWTXDJP0EXkpcsIMHHiLKfUE1H3stmaJXzzFf1w9laErSXuSw8iExp45
+VxNB0iJRQWmd1Gov3QG+yTTE6kuv8NTG1AAYlW7V3YoMlSSrEXRoWcJ4U+/jKjy0
+4wUGndn3D29to0BaJFJp2+DfnPKo3689qaGCs4bW4Hu6ahBom91EOWvcVR8mnwaE
+W/EtdP8CAwEAAaNTMFEwHQYDVR0OBBYEFOURsA+UoO8h3d8ea4CnSLSlEFXuMB8G
+A1UdIwQYMBaAFOURsA+UoO8h3d8ea4CnSLSlEFXuMA8GA1UdEwEB/wQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAKUrGs+0rtWGyRbHoNzZGR573r/msJm/wWewvqsO
++yNsr6yX/v98CR06yZfIz68bxEuYvMXq33T5TybXMM64L2zVjNMynIOt5Hhky4Dy
+/Cxew1RcQYjqK9LFb+hTyek+sDhVx6Y9OsgsLQSAkJ2ySHMxJsstpGATGPt0+8Y+
+7dKdG1eMTjeGSYK4keHoX8xL4iO3npOXd3cKzr3OZ60ikNhQaPFhizCTq2XN9Bj7
+ufNZ3l6CCfFRIKV9dmF5SJ7NkpAX3hnrGeLZ0WqyTIZTct4q496cOO3WEVoInys6
+ckaoNnueMSDx2MfF//Nc63XDtlpz57PBy7Qw03xooFEfoBU="""
+
+
+def test_get_certs_from_metadata_with_cert_chain():
+ """Test that certs() returns all certificates when X509Data contains a chain"""
+ metadata_xml = """
+
+
+
+
+
+ {signing_cert}
+ {ca_cert}
+
+
+
+
+
+
+ """.format(signing_cert=TEST_SIGNING_CERT, ca_cert=TEST_CA_CERT)
+
+ mds = MetadataStore(ATTRCONV, None)
+ mds.imp([{"class": "saml2.mdstore.InMemoryMetaData", "metadata": [(metadata_xml,)]}])
+ certs = mds.certs("http://idp.example.com/metadata", "any", "signing")
+ # Both certs from the chain should be returned
+ assert len(certs) == 2
+
+
def test_metadata_extension_algsupport():
mds = MetadataStore(ATTRCONV, None)
mds.imp(METADATACONF["12"])
diff --git a/tests/test_82_pefim.py b/tests/test_82_pefim.py
index ce223f272..1c0d86e9c 100644
--- a/tests/test_82_pefim.py
+++ b/tests/test_82_pefim.py
@@ -22,7 +22,7 @@
# place a certificate in an authn request
cert = read_cert_from_file(full_path("test.pem"))
-spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=ds.X509Certificate(text=cert)))
+spcertenc = SPCertEnc(x509_data=ds.X509Data(x509_certificate=[ds.X509Certificate(text=cert)]))
extensions = Extensions(extension_elements=[element_to_extension_element(spcertenc)])
@@ -47,5 +47,5 @@
assert len(_elem) == 1
_spcertenc = _elem[0]
-_cert = _spcertenc.key_info[0].x509_data[0].x509_certificate.text
+_cert = _spcertenc.key_info[0].x509_data[0].x509_certificate[0].text
assert cert == _cert
diff --git a/tests/test_93_hok.py b/tests/test_93_hok.py
index aaf8ce0f3..905de5d63 100644
--- a/tests/test_93_hok.py
+++ b/tests/test_93_hok.py
@@ -23,7 +23,7 @@ def test_valid_hok_response_is_parsed(self):
assert len(resp.assertion.subject.subject_confirmation) == 2
actual_hok_certs = [
- ki.x509_data[0].x509_certificate.text.strip()
+ ki.x509_data[0].x509_certificate[0].text.strip()
for sc in resp.assertion.subject.subject_confirmation
for ki in sc.subject_confirmation_data.extensions_as_elements(ds.KeyInfo.c_tag, ds)
]