|
23 | 23 | SimpleAuthentication, |
24 | 24 | UnbindRequest, |
25 | 25 | ) |
| 26 | +from ldap_protocol.ldap_requests.bind_methods.sasl_spnego import ( |
| 27 | + SaslSPNEGOAuthentication, |
| 28 | +) |
26 | 29 | from ldap_protocol.ldap_requests.contexts import ( |
27 | 30 | LDAPBindRequestContext, |
28 | 31 | LDAPUnbindRequestContext, |
@@ -199,6 +202,80 @@ async def mock_init_security_context( |
199 | 202 | ) |
200 | 203 |
|
201 | 204 |
|
| 205 | +@pytest.mark.asyncio |
| 206 | +@pytest.mark.usefixtures("session") |
| 207 | +@pytest.mark.usefixtures("setup_session") |
| 208 | +async def test_spnego_bind_ok( |
| 209 | + creds: TestCreds, |
| 210 | + container: AsyncContainer, |
| 211 | +) -> None: |
| 212 | + """Test spnego bind ok.""" |
| 213 | + mock_security_context = Mock(spec=gssapi.SecurityContext) |
| 214 | + mock_security_context.step.return_value = b"server_ticket" |
| 215 | + mock_security_context.complete = False |
| 216 | + mock_security_context.initiator_name = f"{creds.un}@domain" |
| 217 | + |
| 218 | + async def mock_init_security_context( |
| 219 | + session: AsyncSession, # noqa: ARG001 |
| 220 | + settings: Settings, # noqa: ARG001 |
| 221 | + ) -> None: |
| 222 | + auth_choice._ldap_session.gssapi_security_context = ( |
| 223 | + mock_security_context |
| 224 | + ) |
| 225 | + |
| 226 | + auth_choice = SaslSPNEGOAuthentication(ticket=b"client_ticket") |
| 227 | + auth_choice._init_security_context = mock_init_security_context # type: ignore |
| 228 | + |
| 229 | + first_bind = BindRequest( |
| 230 | + version=0, |
| 231 | + name=creds.un, |
| 232 | + AuthenticationChoice=auth_choice, |
| 233 | + ) |
| 234 | + |
| 235 | + second_bind = MutePolicyBindRequest( |
| 236 | + version=0, |
| 237 | + name=creds.un, |
| 238 | + AuthenticationChoice=SaslSPNEGOAuthentication(ticket=b"client_ticket"), |
| 239 | + ) |
| 240 | + |
| 241 | + async with container(scope=Scope.REQUEST) as container: |
| 242 | + kwargs = await resolve_deps(first_bind.handle, container) |
| 243 | + result = await anext(first_bind.handle(**kwargs)) |
| 244 | + assert result == BindResponse( |
| 245 | + result_code=LDAPCodes.SASL_BIND_IN_PROGRESS, |
| 246 | + serverSaslCreds=b"server_ticket", |
| 247 | + ) |
| 248 | + |
| 249 | + mock_security_context.complete = True |
| 250 | + |
| 251 | + kwargs = await resolve_deps(second_bind.handle, container) |
| 252 | + result = await anext(second_bind.handle(**kwargs)) |
| 253 | + assert result == BindResponse( |
| 254 | + result_code=LDAPCodes.SUCCESS, |
| 255 | + serverSaslCreds=b"server_ticket", |
| 256 | + ) |
| 257 | + |
| 258 | + |
| 259 | +@pytest.mark.asyncio |
| 260 | +@pytest.mark.usefixtures("session") |
| 261 | +@pytest.mark.usefixtures("setup_session") |
| 262 | +async def test_spnego_bind_missing_credentials( |
| 263 | + creds: TestCreds, |
| 264 | + container: AsyncContainer, |
| 265 | +) -> None: |
| 266 | + """Test spnego bind with missing credentials.""" |
| 267 | + bind = BindRequest( |
| 268 | + version=0, |
| 269 | + name=creds.un, |
| 270 | + AuthenticationChoice=SaslSPNEGOAuthentication(), |
| 271 | + ) |
| 272 | + |
| 273 | + async with container(scope=Scope.REQUEST) as container: |
| 274 | + kwargs = await resolve_deps(bind.handle, container) |
| 275 | + with pytest.raises(gssapi.exceptions.MissingCredentialsError): |
| 276 | + await anext(bind.handle(**kwargs)) |
| 277 | + |
| 278 | + |
202 | 279 | @pytest.mark.asyncio |
203 | 280 | @pytest.mark.usefixtures("session") |
204 | 281 | async def test_bind_invalid_password_or_user( |
|
0 commit comments