diff --git a/docs/guides/testing-ldap-krb5.rst b/docs/guides/testing-ldap-krb5.rst index 53c7e2c7..ca65cbc4 100644 --- a/docs/guides/testing-ldap-krb5.rst +++ b/docs/guides/testing-ldap-krb5.rst @@ -26,6 +26,32 @@ That is enough for LDAP GSSAPI (``ldap_sasl_mech = gssapi``) without each test running ``ktadd``/upload itself. Tests still call ``client.sssd.common.krb5_auth(kdc)`` and configure the SSSD domain as usual. +Reusable client utilities +------------------------- + +LDAP/Kerberos system tests can share the following helpers (no per-test ``named`` +or ``getent`` boilerplate): + +* :meth:`~sssd_test_framework.utils.tools.GetentUtils.resolve_ipv4` — + ``client.tools.resolve_ipv4(hostname, host=role.host)`` uses topology ``host.ip`` + when set, otherwise ``getent ahostsv4``. + +* :meth:`~sssd_test_framework.utils.network.NetworkUtils.dig` — + ``client.net.dig(name, server)`` for A/PTR checks (prefer over shell ``dig``). + +* :meth:`~sssd_test_framework.utils.network.NetworkUtils.setup_sasl_canonicalize_bogus_ptr` — + local ``named`` + ``/etc/hosts`` setup for BZ 732935 (bogus PTR for the LDAP + server IP, forward A for the LDAP FQDN, ``resolv.conf`` → ``127.0.0.1``). + Files are backed up via ``client.fs`` and restored after the test. + +* :func:`~sssd_test_framework.misc.ip_to_ptr` — reverse zone name for an IPv4 + address (also used inside the bogus-PTR helper). + +Kerberos templates from :meth:`~sssd_test_framework.roles.kdc.KDC.config` include +``rdns = false`` in ``[libdefaults]`` so tests that call +``client.sssd.common.krb5_auth(kdc)`` do not need to edit ``/etc/krb5.conf`` for +that option. + .. seealso:: * :class:`sssd_test_framework.roles.kdc.KDC` diff --git a/sssd_test_framework/roles/kdc.py b/sssd_test_framework/roles/kdc.py index f611f257..949eed18 100644 --- a/sssd_test_framework/roles/kdc.py +++ b/sssd_test_framework/roles/kdc.py @@ -131,6 +131,7 @@ def config(self) -> str: ticket_lifetime = 24h renew_lifetime = 7d forwardable = yes + rdns = false [realms] {self.host.realm} = {{ diff --git a/sssd_test_framework/roles/ldap.py b/sssd_test_framework/roles/ldap.py index d2c8a62f..810ca76a 100644 --- a/sssd_test_framework/roles/ldap.py +++ b/sssd_test_framework/roles/ldap.py @@ -3,9 +3,11 @@ from __future__ import annotations import base64 +import tempfile +import time from datetime import datetime from enum import Enum -from typing import Any, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Generic, TypeVar import ldap import ldap.ldapobject @@ -17,6 +19,9 @@ from .generic import GenericNetgroupMember, GenericPasswordPolicy, ProtocolName from .nfs import NFSExport +if TYPE_CHECKING: + from .kdc import KDC + __all__ = [ "LDAPRoleType", "LDAPPasswordPolicy", @@ -239,6 +244,207 @@ def setup(self) -> None: except ldap.TYPE_OR_VALUE_EXISTS: pass + def enable_gssapi(self, kdc: KDC) -> None: + """ + Configure Directory Server for GSSAPI/SASL authentication. + + This method sets up the LDAP server to accept GSSAPI (Kerberos) authentication + by creating a service principal, exporting the keytab, and configuring Directory Server. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.LDAP_KRB5) + def test_ldap_gssapi(client: Client, ldap: LDAP, kdc: KDC): + # Enable GSSAPI on LDAP server + ldap.enable_gssapi(kdc) + + ldap.user('testuser').add() + kdc.principal('testuser').add() + + # Configure SSSD to use GSSAPI + client.sssd.domain["ldap_sasl_mech"] = "GSSAPI" + client.sssd.start() + + result = client.tools.id('testuser') + assert result is not None + + :param kdc: KDC role object to create service principal + :type kdc: KDC + """ + + # 1. Install required packages + self.host.conn.run( + "dnf install -y cyrus-sasl-gssapi krb5-workstation || " + "yum install -y cyrus-sasl-gssapi krb5-workstation", + ) + self.host.conn.run("rpm -q cyrus-sasl-gssapi") + + # 2. Create LDAP service principal + ldap_principal = f"ldap/{self.host.hostname}" + kdc.principal(ldap_principal).add(password=None) + + # 3. Export keytab to LDAP server (same transfer pattern as LDAPKRB5TopologyController) + keytab_path = "/etc/dirsrv/ds.keytab" + keytab_staging = "/tmp/sssd-test-framework-ds.keytab" + qualified_principal = kdc.qualify(ldap_principal) + kdc.host.conn.run(f"rm -f {keytab_staging}", raise_on_error=False) + kdc.host.conn.run( + f"kadmin.local -q 'ktadd -k {keytab_staging} -norandkey \"{qualified_principal}\"'" + ) + with tempfile.NamedTemporaryFile() as tmp: + kdc.host.fs.download(keytab_staging, tmp.name) + self.host.fs.upload(tmp.name, keytab_path) + kdc.host.conn.run(f"rm -f {keytab_staging}", raise_on_error=False) + self.host.conn.run(f"chown dirsrv:dirsrv {keytab_path}") + self.host.conn.run(f"chmod 600 {keytab_path}") + + # 4. Copy krb5.conf from KDC to LDAP server + krb5_conf = kdc.config() + self.host.conn.run(f"cat > /etc/krb5.conf << 'EOFKRB5'\n{krb5_conf}\nEOFKRB5") + + # Add default_keytab_name to krb5.conf as fallback + self.host.conn.run( + f"sed -i '/\\[libdefaults\\]/a\\ default_keytab_name = {keytab_path}' /etc/krb5.conf" + ) + + # 5. Configure Cyrus SASL to use the keytab + # This is critical - without this, the SASL GSSAPI plugin won't know where to find the keytab + self.host.conn.run( + f"mkdir -p /etc/sasl2 && " + f"cat > /etc/sasl2/slapd.conf << 'EOFSASL'\n" + f"mech_list: GSSAPI EXTERNAL PLAIN LOGIN\n" + f"keytab: {keytab_path}\n" + f"EOFSASL" + ) + + # Also create for other possible SASL application names + self.host.conn.run("cp /etc/sasl2/slapd.conf /etc/sasl2/ns-slapd.conf") + self.host.conn.run("cp /etc/sasl2/slapd.conf /etc/sasl2/ldap.conf") + + # 6. Set KRB5_KTNAME environment variable for Directory Server via sysconfig + # Note: systemd Environment= directives don't work reliably in containers + # Use EnvironmentFile instead + self.host.conn.run( + f"echo 'KRB5_KTNAME={keytab_path}' > /etc/sysconfig/dirsrv-localhost" + ) + + # 7. Configure SASL identity mapping in Directory Server + # Align default Kerberos maps with the data suffix (sssd-qe krb_credential_cache) and + # add a high-priority map for host/ldap service principals used by SSSD GSSAPI binds. + realm = kdc.realm + base_dn = self.naming_context + binddn = self.host.binddn + bindpw = self.host.bindpw + + sasl_ldif = "" + for cn in ( + "Kerberos uid mapping", + "rfc 2829 dn syntax", + "rfc 2829 u syntax", + "uid mapping", + ): + sasl_ldif += ( + f"dn: cn={cn},cn=mapping,cn=sasl,cn=config\n" + "changetype: modify\n" + "replace: nsSaslMapBaseDNTemplate\n" + f"nsSaslMapBaseDNTemplate: {base_dn}\n" + "\n" + ) + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=sasl_ldif, + ) + for cn in ( + "SSSD service principals", + "SSSD service principals no realm", + ): + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=( + f"dn: cn={cn},cn=mapping,cn=sasl,cn=config\n" + "changetype: delete\n" + ), + raise_on_error=False, + ) + + # cn=Directory Manager is a bind identity, not a searchable LDAP entry (BASE + # search returns No such object). Map GSSAPI clients to a real entry under the + # data suffix, per 389-ds server-to-server SASL examples (full target DN + + # (objectclass=*)). + people_ou = f"ou=People,{base_dn}" + gssapi_proxy_dn = f"uid=sssd-gssapi,{people_ou}" + bootstrap_ldif = ( + f"dn: {people_ou}\n" + "changetype: add\n" + "objectClass: organizationalUnit\n" + "ou: People\n" + "\n" + f"dn: {gssapi_proxy_dn}\n" + "changetype: add\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "cn: SSSD GSSAPI proxy\n" + "sn: proxy\n" + "uid: sssd-gssapi\n" + ) + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=bootstrap_ldif, + raise_on_error=False, + ) + service_map_ldif = ( + "dn: cn=SSSD service principals,cn=mapping,cn=sasl,cn=config\n" + "changetype: add\n" + "objectClass: top\n" + "objectClass: nsSaslMapping\n" + "cn: SSSD service principals\n" + f"nsSaslMapRegexString: ^(host|ldap)/.*@{realm}$\n" + f"nsSaslMapBaseDNTemplate: {gssapi_proxy_dn}\n" + "nsSaslMapFilterTemplate: (objectclass=*)\n" + "nsSaslMapPriority: 10\n" + "\n" + "dn: cn=SSSD service principals no realm,cn=mapping,cn=sasl,cn=config\n" + "changetype: add\n" + "objectClass: top\n" + "objectClass: nsSaslMapping\n" + "cn: SSSD service principals no realm\n" + "nsSaslMapRegexString: ^(host|ldap)/.*$\n" + f"nsSaslMapBaseDNTemplate: {gssapi_proxy_dn}\n" + "nsSaslMapFilterTemplate: (objectclass=*)\n" + "nsSaslMapPriority: 11\n" + ) + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=service_map_ldif, + ) + verify = self.host.conn.run( + f"ldapsearch -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost " + f"-b '{gssapi_proxy_dn}' -s base '(objectclass=*)' dn", + raise_on_error=False, + ) + if verify.rc != 0 or "dn:" not in (verify.stdout or ""): + raise RuntimeError( + f"SASL GSSAPI proxy entry {gssapi_proxy_dn} is not searchable: " + f"{verify.stdout or verify.stderr}" + ) + + # 8. Reload systemd and restart Directory Server + self.host.conn.run("systemctl daemon-reload") + self.host.conn.run("systemctl restart dirsrv@localhost") + + # Wait for Directory Server to fully start with GSSAPI support + time.sleep(3) + + klist = self.host.conn.run(f"klist -kt {keytab_path}", raise_on_error=False) + if qualified_principal not in (klist.stdout or ""): + raise RuntimeError( + f"LDAP GSSAPI keytab {keytab_path} does not contain {qualified_principal}: " + f"{klist.stdout or klist.stderr}" + ) + def fqn(self, name: str) -> str: """ Return fully qualified name in form name@domain. diff --git a/sssd_test_framework/utils/network.py b/sssd_test_framework/utils/network.py index 8cc784d7..8db914cd 100644 --- a/sssd_test_framework/utils/network.py +++ b/sssd_test_framework/utils/network.py @@ -9,12 +9,34 @@ from pytest_mh.conn import ProcessResult from pytest_mh.utils.fs import LinuxFileSystem -from ..misc import ip_is_valid +from ..misc import ip_is_valid, ip_to_ptr from ..misc.ssh import SSHKillableProcess __all__ = ["NetworkUtils", "IPUtils"] +def _inject_named_forwarders(fs: LinuxFileSystem, forwarder_ips: list[str]) -> None: + """Add DNS forwarders to local named so 127.0.0.1 can resolve forward names.""" + if not forwarder_ips: + return + + named_conf = fs.read("/etc/named.conf") + if "forwarders" in named_conf: + return + + forwarders_line = " forwarders { " + "; ".join(forwarder_ips) + "; };\n" + new_lines: list[str] = [] + inserted = False + for line in named_conf.splitlines(keepends=True): + new_lines.append(line) + if not inserted and line.strip() == "options {": + new_lines.append(forwarders_line) + inserted = True + + if inserted: + fs.write("/etc/named.conf", "".join(new_lines)) + + class NetworkUtils(MultihostUtility[MultihostHost]): def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None: @@ -145,6 +167,170 @@ def dig(self, address: str, server: str | None = None) -> list[dict] | None: return records if records else None + def setup_sasl_canonicalize_bogus_ptr( + self, + *, + ldap_ip: str, + ldap_hostname: str, + kdc_ip: str, + kdc_hostname: str, + provider_domain: str, + client_hostname: str, + bogus_label: str = "invalid", + ) -> None: + """ + Configure local ``named`` on this host for BZ 732935 (bogus LDAP PTR). + + Serves a wrong PTR for ``ldap_ip``, pins bogus reverse in ``/etc/hosts`` for + libc/NSS, and adds a local forward A record for ``ldap_hostname``. Only the KDC + forward name is pinned in ``/etc/hosts``. Raises ``AssertionError`` on failure. + + :param ldap_ip: LDAP server IPv4 address. + :param ldap_hostname: LDAP server FQDN (Kerberos service name). + :param kdc_ip: KDC IPv4 address. + :param kdc_hostname: KDC FQDN. + :param provider_domain: LDAP DNS domain (e.g. ``ldap.test``). + :param client_hostname: This host FQDN (SOA/NS in zone files). + :param bogus_label: Leftmost label for bogus PTR host (default ``invalid``). + """ + if self.host.conn.run("rpm -q bind", raise_on_error=False).rc != 0: + self.host.conn.run( + "dnf install -y bind bind-utils || yum install -y bind bind-utils", + raise_on_error=False, + ) + assert self.host.conn.run("rpm -q bind", raise_on_error=False).rc == 0, ( + "bind package is required (dnf install -y bind bind-utils)" + ) + + bogus_hostname = f"{bogus_label}.{provider_domain}" + ldap_short = ldap_hostname.removesuffix(f".{provider_domain}").rstrip(".") + + self.fs.backup("/etc/hosts") + hosts_lines: list[str] = [] + for line in self.fs.read("/etc/hosts").splitlines(): + parts = line.split() + if len(parts) >= 1 and parts[0] == ldap_ip: + continue + if len(parts) >= 2 and (ldap_hostname in parts[1:] or bogus_hostname in parts[1:]): + continue + if len(parts) >= 2 and kdc_hostname in parts[1:]: + continue + hosts_lines.append(line) + hosts_lines.append(f"{ldap_ip}\t{bogus_hostname}") + hosts_lines.append(f"{kdc_ip}\t{kdc_hostname}") + self.fs.write("/etc/hosts", "\n".join(hosts_lines).rstrip() + "\n") + self.host.conn.run("nscd -i hosts 2>/dev/null || true", raise_on_error=False) + + reverse_zone = ip_to_ptr(ldap_ip) + ptr_label = ldap_ip.rsplit(".", maxsplit=1)[-1] + reverse_zone_path = f"/var/named/{reverse_zone}" + forward_zone_path = f"/var/named/{provider_domain}" + + self.fs.backup("/etc/resolv.conf") + self.fs.backup("/etc/named.conf") + self.fs.backup(reverse_zone_path) + self.fs.backup(forward_zone_path) + + def zone_block(zone_name: str, zone_file: str) -> str: + return ( + f'zone "{zone_name}" {{\n' + " type master;\n" + " check-names ignore;\n" + f' file "{zone_file}";\n' + "};\n" + ) + + named_conf = self.fs.read("/etc/named.conf") + for zone_name, zone_file in ( + (reverse_zone, reverse_zone), + (provider_domain, provider_domain), + ): + if zone_name not in named_conf: + named_conf += zone_block(zone_name, zone_file) + self.fs.write("/etc/named.conf", named_conf) + + upstream_ips: list[str] = [] + for line in self.fs.read("/etc/resolv.conf").splitlines(): + stripped = line.strip() + if stripped.startswith("nameserver ") and "127.0.0.1" not in stripped: + upstream_ips.append(stripped.split()[1]) + _inject_named_forwarders(self.fs, upstream_ips) + + soa_lines = ( + "$TTL 604800\n" + f"$ORIGIN {provider_domain}.\n" + f"@ IN SOA {client_hostname}. root.{client_hostname}. (\n" + " 2010050702 ; serial\n" + " 604800 ; refresh\n" + " 86400 ; retry\n" + " 2419200 ; expire\n" + " 10800 ; negative caching time\n" + " )\n" + f"@ IN NS {client_hostname}.\n" + ) + reverse_soa = soa_lines.replace(f"$ORIGIN {provider_domain}.\n", f"$ORIGIN {reverse_zone}.\n") + self.fs.write( + reverse_zone_path, + reverse_soa + f"{ptr_label} IN PTR {bogus_hostname}.\n", + ) + self.fs.write( + forward_zone_path, + soa_lines + f"{ldap_short} IN A {ldap_ip}\n", + ) + self.host.conn.run( + f"restorecon -v {reverse_zone_path} {forward_zone_path}", + raise_on_error=False, + ) + + self.host.conn.run("systemctl enable named", raise_on_error=False) + named_restart = self.host.conn.run("systemctl restart named", raise_on_error=False) + if named_restart.rc != 0: + journal = self.host.conn.run( + "journalctl -u named -n 30 --no-pager", + raise_on_error=False, + ) + assert False, ( + "systemctl restart named failed; " + f"journal: {(journal.stdout or journal.stderr or '')[-2000:]}" + ) + + self.fs.write("/etc/resolv.conf", "nameserver 127.0.0.1\n") + self.host.conn.run("restorecon -v /etc/resolv.conf", raise_on_error=False) + self.host.conn.run("nscd -i hosts 2>/dev/null || true", raise_on_error=False) + + ptr_result = self.dig(ldap_ip, "127.0.0.1") + assert ptr_result, f"dig -x {ldap_ip} @127.0.0.1 returned no PTR" + assert any(bogus_hostname in str(record.get("data", "")) for record in ptr_result), ( + f"PTR for {ldap_ip} via 127.0.0.1 is not {bogus_hostname}" + ) + + a_result = self.dig(ldap_hostname, "127.0.0.1") + assert a_result and any( + record.get("type") == "A" and str(record.get("data")) == ldap_ip for record in a_result + ), ( + f"no local A record for {ldap_hostname} via 127.0.0.1 " + f"(zone {provider_domain}, record {ldap_short})" + ) + + rev_nss = self.host.conn.run(f"getent hosts {ldap_ip}", raise_on_error=False) + rev_out = rev_nss.stdout or "" + assert ldap_hostname not in rev_out, ( + f"{ldap_ip} must not reverse-resolve to {ldap_hostname}; getent hosts: {rev_out.strip()!r}" + ) + assert bogus_hostname in rev_out, ( + f"{ldap_ip} must reverse-resolve to {bogus_hostname}; getent hosts: {rev_out.strip()!r}" + ) + + ldap_fwd = self.host.conn.run(f"getent ahostsv4 {ldap_hostname}", raise_on_error=False) + assert ldap_ip in (ldap_fwd.stdout or ""), ( + f"forward lookup for {ldap_hostname} must return {ldap_ip}" + ) + + kdc_fwd = self.host.conn.run(f"getent ahostsv4 {kdc_hostname}", raise_on_error=False) + assert kdc_ip in (kdc_fwd.stdout or ""), ( + f"forward lookup for {kdc_hostname} must return {kdc_ip}" + ) + def teardown(self): """ Revert all changes. diff --git a/sssd_test_framework/utils/tools.py b/sssd_test_framework/utils/tools.py index 77303143..46d26b0e 100644 --- a/sssd_test_framework/utils/tools.py +++ b/sssd_test_framework/utils/tools.py @@ -10,6 +10,7 @@ from pytest_mh.utils.fs import LinuxFileSystem __all__ = [ + "AHostSv4Entry", "GetentUtils", "GroupEntry", "LinuxToolsUtils", @@ -601,6 +602,32 @@ def FromOutput(cls, stdout: str) -> HostsEntry: return cls.FromList(result) +class AHostSv4Entry(object): + """ + Result of ``getent ahostsv4`` — first IPv4 from the first data line. + + Same style as :class:`HostsEntry` (use the ``.ip`` field). + """ + + def __init__(self, ip: str | None) -> None: + self.ip: str | None = ip + """IPv4 dotted-quad (first column of ``getent ahostsv4`` output).""" + + def __str__(self) -> str: + return f"({self.ip})" + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromOutput(cls, stdout: str) -> AHostSv4Entry | None: + for line in stdout.splitlines(): + parts = line.split() + if parts: + return cls(ip=parts[0]) + return None + + class NetworksEntry(object): """ Result of ``getent networks`` @@ -1058,6 +1085,54 @@ def hosts(self, name: str, *, service: str | None = None) -> HostsEntry: """ return self.__exec(HostsEntry, "hosts", name, service) + def ahostsv4(self, name: str, *, service: str | None = None) -> AHostSv4Entry | None: + """ + Call ``getent ahostsv4`` and return the first IPv4 address from output. + + For DNS lookups using ``dig`` (A, SRV, etc.), use + :meth:`NetworkUtils.dig ` + on ``client.net`` instead of ad hoc shell commands in tests. + + :param name: Host name or address to resolve. + :type name: str + :param service: Optional NSS service name (``getent -s``). + :type service: str | None, optional + :return: Parsed entry or ``None`` if lookup failed or produced no address. + :rtype: AHostSv4Entry | None + """ + args: list[str] = [] + if service is not None: + args = ["-s", service] + + command = self.host.conn.exec(["getent", *args, "ahostsv4", str(name)], raise_on_error=False) + if command.rc != 0: + return None + + return AHostSv4Entry.FromOutput(command.stdout) + + def resolve_ipv4(self, name: str, *, host: object | None = None) -> str | None: + """ + Return the first IPv4 address for ``name`` on this host. + + Uses ``host.ip`` when the multihost host object provides it, otherwise + :meth:`ahostsv4`. For DNS record checks use + :meth:`NetworkUtils.dig `. + + :param name: Hostname to resolve. + :param host: Optional role host (``provider.host``, ``kdc.host``, …) with ``ip``. + :return: IPv4 dotted-quad or ``None``. + """ + if host is not None: + role_ip = getattr(host, "ip", None) + if role_ip: + return str(role_ip) + + entry = self.ahostsv4(name) + if entry is not None and entry.ip is not None: + return entry.ip + + return None + def networks(self, name: str, *, service: str | None = None) -> NetworksEntry: """ Call ``getent networks $name`` diff --git a/tests/test_tools.py b/tests/test_tools.py new file mode 100644 index 00000000..8bbc2d8d --- /dev/null +++ b/tests/test_tools.py @@ -0,0 +1,25 @@ +"""Unit tests for :mod:`sssd_test_framework.utils.tools`.""" + +from __future__ import annotations + +import pytest + +from sssd_test_framework.utils.tools import AHostSv4Entry + + +@pytest.mark.parametrize( + "stdout, expected_ip", + [ + ("192.168.1.1 STREAM hostname.example\n", "192.168.1.1"), + ("10.0.0.5 STREAM foo\n10.0.0.6 STREAM foo\n", "10.0.0.5"), + ("", None), + ("\n\n", None), + ], +) +def test_ahostsv4_entry_from_output(stdout: str, expected_ip: str | None) -> None: + entry = AHostSv4Entry.FromOutput(stdout) + if expected_ip is None: + assert entry is None + else: + assert entry is not None + assert entry.ip == expected_ip