Skip to content
Merged
200 changes: 200 additions & 0 deletions src/workos/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from workos.types.authorization.organization_role import OrganizationRole
from workos.types.authorization.permission import Permission
from workos.types.authorization.role import Role, RoleList
from workos.types.authorization.role_assignment import RoleAssignment
from workos.types.list_resource import (
ListArgs,
ListMetadata,
Expand Down Expand Up @@ -41,6 +42,15 @@ class PermissionListFilters(ListArgs, total=False):
]


class RoleAssignmentListFilters(ListArgs, total=False):
organization_membership_id: str


RoleAssignmentsListResource = WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
]


class AuthorizationModule(Protocol):
"""Offers methods through the WorkOS Authorization service."""

Expand Down Expand Up @@ -161,6 +171,38 @@ def add_environment_role_permission(
permission_slug: str,
) -> SyncOrAsync[EnvironmentRole]: ...

# Role Assignments

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> SyncOrAsync[RoleAssignmentsListResource]: ...

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
) -> SyncOrAsync[RoleAssignment]: ...

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
) -> SyncOrAsync[None]: ...

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> SyncOrAsync[None]: ...


class Authorization(AuthorizationModule):
_http_client: SyncHTTPClient
Expand Down Expand Up @@ -437,6 +479,85 @@ def add_environment_role_permission(

return EnvironmentRole.model_validate(response)

# Role Assignments

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
# list_params includes organization_membership_id so auto-pagination
# can reconstruct the full request. query_params excludes it because
# it is already embedded in the URL path and must not be sent as a
# query-string parameter.
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
) -> RoleAssignment:
response = self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json={"role_slug": role_slug},
)

return RoleAssignment.model_validate(response)

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
) -> None:
self._http_client.delete_with_body(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments",
json={"role_slug": role_slug},
)

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)


class AsyncAuthorization(AuthorizationModule):
_http_client: AsyncHTTPClient
Expand Down Expand Up @@ -712,3 +833,82 @@ async def add_environment_role_permission(
)

return EnvironmentRole.model_validate(response)

# Role Assignments

async def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
# list_params includes organization_membership_id so auto-pagination
# can reconstruct the full request. query_params excludes it because
# it is already embedded in the URL path and must not be sent as a
# query-string parameter.
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = await self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)

async def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
) -> RoleAssignment:
response = await self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json={"role_slug": role_slug},
)

return RoleAssignment.model_validate(response)

async def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
) -> None:
await self._http_client.delete_with_body(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments",
json={"role_slug": role_slug},
)

async def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
await self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)
155 changes: 155 additions & 0 deletions tests/test_authorization_role_assignments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from typing import Union

import pytest
from tests.utils.fixtures.mock_role_assignment import MockRoleAssignment
from tests.utils.list_resource import list_response_of
from tests.utils.syncify import syncify
from workos.authorization import AsyncAuthorization, Authorization


@pytest.mark.sync_and_async(Authorization, AsyncAuthorization)
class TestAuthorizationRoleAssignments:
@pytest.fixture(autouse=True)
def setup(self, module_instance: Union[Authorization, AsyncAuthorization]):
self.http_client = module_instance._http_client
self.authorization = module_instance

@pytest.fixture
def mock_role_assignment(self):
return MockRoleAssignment(id="ra_01ABC").dict()

@pytest.fixture
def mock_role_assignments(self):
assignment_list = [
MockRoleAssignment(id=f"ra_{i}", role_slug=f"role-{i}").dict()
for i in range(5)
]
return {
"data": assignment_list,
"list_metadata": {"before": None, "after": None},
"object": "list",
}

@pytest.fixture
def mock_role_assignments_multiple_data_pages(self):
assignment_list = [
MockRoleAssignment(id=f"ra_{i}", role_slug=f"role-{i}").dict()
for i in range(40)
]
return list_response_of(data=assignment_list)

# --- list_role_assignments ---

def test_list_role_assignments(
self, mock_role_assignments, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_role_assignments, 200
)

response = syncify(
self.authorization.list_role_assignments(
organization_membership_id="om_01ABC"
)
)

assert request_kwargs["method"] == "get"
assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01ABC/role_assignments"
)
assert "organization_membership_id" not in request_kwargs["params"]
assert len(response.data) == 5

def test_list_role_assignments_with_params(
self, mock_role_assignments, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_role_assignments, 200
)

syncify(
self.authorization.list_role_assignments(
organization_membership_id="om_01ABC",
limit=5,
after="ra_cursor",
order="asc",
)
)

assert request_kwargs["params"]["limit"] == 5
assert request_kwargs["params"]["after"] == "ra_cursor"
assert request_kwargs["params"]["order"] == "asc"

def test_list_role_assignments_auto_pagination(
self,
mock_role_assignments_multiple_data_pages,
test_auto_pagination,
):
test_auto_pagination(
http_client=self.http_client,
list_function=self.authorization.list_role_assignments,
expected_all_page_data=mock_role_assignments_multiple_data_pages["data"],
list_function_params={
"organization_membership_id": "om_01ABC",
},
)

# --- assign_role ---

def test_assign_role(
self, mock_role_assignment, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_role_assignment, 201
)

role_assignment = syncify(
self.authorization.assign_role("om_01ABC", role_slug="admin")
)

assert role_assignment.id == "ra_01ABC"
assert role_assignment.role.slug == "admin"
assert request_kwargs["method"] == "post"
assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01ABC/role_assignments"
)
assert request_kwargs["json"] == {"role_slug": "admin"}

# --- remove_role ---

def test_remove_role(self, capture_and_mock_http_client_request):
request_kwargs = capture_and_mock_http_client_request(
self.http_client,
status_code=202,
headers={"content-type": "text/plain; charset=utf-8"},
)

response = syncify(
self.authorization.remove_role("om_01ABC", role_slug="admin")
)

assert response is None
assert request_kwargs["method"] == "delete"
assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01ABC/role_assignments"
)
assert request_kwargs["json"] == {"role_slug": "admin"}

# --- remove_role_assignment ---

def test_remove_role_assignment(self, capture_and_mock_http_client_request):
request_kwargs = capture_and_mock_http_client_request(
self.http_client,
status_code=202,
headers={"content-type": "text/plain; charset=utf-8"},
)

response = syncify(
self.authorization.remove_role_assignment("om_01ABC", "ra_01ABC")
)

assert response is None
assert request_kwargs["method"] == "delete"
assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01ABC/role_assignments/ra_01ABC"
)