|
50 | 50 | IS_OPENSSL_3_0_0 = ssl.OPENSSL_VERSION_INFO >= (3, 0, 0) |
51 | 51 | PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS') |
52 | 52 |
|
| 53 | +HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename') |
| 54 | +requires_keylog = unittest.skipUnless( |
| 55 | + HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback') |
| 56 | +CAN_SET_KEYLOG = HAS_KEYLOG and os.name != "nt" |
| 57 | +requires_keylog_setter = unittest.skipUnless( |
| 58 | + CAN_SET_KEYLOG, |
| 59 | + "cannot set 'keylog_filename' on Windows" |
| 60 | +) |
| 61 | + |
| 62 | + |
53 | 63 | PROTOCOL_TO_TLS_VERSION = {} |
54 | 64 | for proto, ver in ( |
55 | 65 | ("PROTOCOL_SSLv3", "SSLv3"), |
@@ -258,26 +268,67 @@ def utc_offset(): #NOTE: ignore issues like #1647654 |
258 | 268 | ) |
259 | 269 |
|
260 | 270 |
|
261 | | -def test_wrap_socket(sock, *, |
262 | | - cert_reqs=ssl.CERT_NONE, ca_certs=None, |
263 | | - ciphers=None, certfile=None, keyfile=None, |
264 | | - **kwargs): |
265 | | - if not kwargs.get("server_side"): |
266 | | - kwargs["server_hostname"] = SIGNED_CERTFILE_HOSTNAME |
267 | | - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
268 | | - else: |
| 271 | +def make_test_context( |
| 272 | + *, |
| 273 | + server_side=False, |
| 274 | + check_hostname=None, |
| 275 | + cert_reqs=ssl.CERT_NONE, |
| 276 | + ca_certs=None, certfile=None, keyfile=None, |
| 277 | + ciphers=None, |
| 278 | + min_version=None, max_version=None, |
| 279 | +): |
| 280 | + if server_side: |
269 | 281 | context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
270 | | - if cert_reqs is not None: |
| 282 | + else: |
| 283 | + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| 284 | + |
| 285 | + if check_hostname is None: |
271 | 286 | if cert_reqs == ssl.CERT_NONE: |
272 | 287 | context.check_hostname = False |
| 288 | + else: |
| 289 | + context.check_hostname = check_hostname |
| 290 | + |
| 291 | + if cert_reqs is not None: |
273 | 292 | context.verify_mode = cert_reqs |
| 293 | + |
274 | 294 | if ca_certs is not None: |
275 | 295 | context.load_verify_locations(ca_certs) |
276 | 296 | if certfile is not None or keyfile is not None: |
277 | 297 | context.load_cert_chain(certfile, keyfile) |
| 298 | + |
278 | 299 | if ciphers is not None: |
279 | 300 | context.set_ciphers(ciphers) |
280 | | - return context.wrap_socket(sock, **kwargs) |
| 301 | + |
| 302 | + if min_version is not None: |
| 303 | + context.minimum_version = min_version |
| 304 | + if max_version is not None: |
| 305 | + context.maximum_version = max_version |
| 306 | + |
| 307 | + return context |
| 308 | + |
| 309 | + |
| 310 | +def test_wrap_socket( |
| 311 | + sock, |
| 312 | + *, |
| 313 | + server_side=False, |
| 314 | + check_hostname=None, |
| 315 | + cert_reqs=ssl.CERT_NONE, |
| 316 | + ca_certs=None, certfile=None, keyfile=None, |
| 317 | + ciphers=None, |
| 318 | + min_version=None, max_version=None, |
| 319 | + **kwargs, |
| 320 | +): |
| 321 | + context = make_test_context( |
| 322 | + server_side=server_side, |
| 323 | + check_hostname=check_hostname, |
| 324 | + cert_reqs=cert_reqs, |
| 325 | + ca_certs=ca_certs, certfile=certfile, keyfile=keyfile, |
| 326 | + ciphers=ciphers, |
| 327 | + min_version=min_version, max_version=max_version, |
| 328 | + ) |
| 329 | + if not server_side: |
| 330 | + kwargs.setdefault("server_hostname", SIGNED_CERTFILE_HOSTNAME) |
| 331 | + return context.wrap_socket(sock, server_side=server_side, **kwargs) |
281 | 332 |
|
282 | 333 |
|
283 | 334 | USE_SAME_TEST_CONTEXT = False |
@@ -1665,6 +1716,39 @@ def test_num_tickest(self): |
1665 | 1716 | with self.assertRaises(ValueError): |
1666 | 1717 | ctx.num_tickets = 1 |
1667 | 1718 |
|
| 1719 | + @support.cpython_only |
| 1720 | + def test_refcycle_msg_callback(self): |
| 1721 | + # See https://github.com/python/cpython/issues/142516. |
| 1722 | + ctx = make_test_context() |
| 1723 | + def msg_callback(*args, _=ctx, **kwargs): ... |
| 1724 | + ctx._msg_callback = msg_callback |
| 1725 | + |
| 1726 | + @support.cpython_only |
| 1727 | + @requires_keylog_setter |
| 1728 | + def test_refcycle_keylog_filename(self): |
| 1729 | + # See https://github.com/python/cpython/issues/142516. |
| 1730 | + self.addCleanup(os_helper.unlink, os_helper.TESTFN) |
| 1731 | + ctx = make_test_context() |
| 1732 | + class KeylogFilename(str): ... |
| 1733 | + ctx.keylog_filename = KeylogFilename(os_helper.TESTFN) |
| 1734 | + ctx.keylog_filename._ = ctx |
| 1735 | + |
| 1736 | + @support.cpython_only |
| 1737 | + @unittest.skipUnless(ssl.HAS_PSK, 'requires TLS-PSK') |
| 1738 | + def test_refcycle_psk_client_callback(self): |
| 1739 | + # See https://github.com/python/cpython/issues/142516. |
| 1740 | + ctx = make_test_context() |
| 1741 | + def psk_client_callback(*args, _=ctx, **kwargs): ... |
| 1742 | + ctx.set_psk_client_callback(psk_client_callback) |
| 1743 | + |
| 1744 | + @support.cpython_only |
| 1745 | + @unittest.skipUnless(ssl.HAS_PSK, 'requires TLS-PSK') |
| 1746 | + def test_refcycle_psk_server_callback(self): |
| 1747 | + # See https://github.com/python/cpython/issues/142516. |
| 1748 | + ctx = make_test_context(server_side=True) |
| 1749 | + def psk_server_callback(*args, _=ctx, **kwargs): ... |
| 1750 | + ctx.set_psk_server_callback(psk_server_callback) |
| 1751 | + |
1668 | 1752 |
|
1669 | 1753 | class SSLErrorTests(unittest.TestCase): |
1670 | 1754 |
|
@@ -4914,10 +4998,6 @@ def test_internal_chain_server(self): |
4914 | 4998 | self.assertEqual(res, b'\x02\n') |
4915 | 4999 |
|
4916 | 5000 |
|
4917 | | -HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename') |
4918 | | -requires_keylog = unittest.skipUnless( |
4919 | | - HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback') |
4920 | | - |
4921 | 5001 | class TestSSLDebug(unittest.TestCase): |
4922 | 5002 |
|
4923 | 5003 | def keylog_lines(self, fname=os_helper.TESTFN): |
|
0 commit comments