diff --git a/src/workos/authorization.py b/src/workos/authorization.py index 6e12f035..24108c37 100644 --- a/src/workos/authorization.py +++ b/src/workos/authorization.py @@ -9,6 +9,8 @@ from workos.types.authorization.organization_role import OrganizationRole from workos.types.authorization.permission import Permission from workos.types.authorization.role import Role, RoleList +from workos.types.authorization.resource_identifier import ResourceIdentifier +from workos.types.authorization.role_assignment import RoleAssignment from workos.types.list_resource import ( ListArgs, ListMetadata, @@ -28,6 +30,7 @@ ) AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions" +AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH = "authorization/organization_memberships" _role_adapter: TypeAdapter[Role] = TypeAdapter(Role) @@ -41,6 +44,15 @@ class PermissionListFilters(ListArgs, total=False): ] +class RoleAssignmentListFilters(ListArgs, total=False): + organization_membership_id: str + + +RoleAssignmentsListResource = WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata +] + + class AuthorizationModule(Protocol): """Offers methods through the WorkOS Authorization service.""" @@ -161,6 +173,38 @@ def add_environment_role_permission( permission_slug: str, ) -> SyncOrAsync[EnvironmentRole]: ... + def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> SyncOrAsync[RoleAssignmentsListResource]: ... + + def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> SyncOrAsync[RoleAssignment]: ... + + def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> SyncOrAsync[None]: ... + + def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> SyncOrAsync[None]: ... + class Authorization(AuthorizationModule): _http_client: SyncHTTPClient @@ -437,6 +481,93 @@ def add_environment_role_permission( return EnvironmentRole.model_validate(response) + # Role Assignments + + def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> RoleAssignmentsListResource: + # list_params includes organization_membership_id so auto-pagination + # can reconstruct the full request. query_params excludes it because + # it is already embedded in the URL path and must not be sent as a + # query-string parameter. + list_params: RoleAssignmentListFilters = { + "organization_membership_id": organization_membership_id, + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + query_params: ListArgs = { + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + response = self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_GET, + params=query_params, + ) + + return WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata + ]( + list_method=self.list_role_assignments, + list_args=list_params, + **ListPage[RoleAssignment](**response).model_dump(), + ) + + def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> RoleAssignment: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + response = self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_POST, + json=json, + ) + + return RoleAssignment.model_validate(response) + + def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> None: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + self._http_client.delete_with_body( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + json=json, + ) + + def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> None: + self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}", + method=REQUEST_METHOD_DELETE, + ) + class AsyncAuthorization(AuthorizationModule): _http_client: AsyncHTTPClient @@ -712,3 +843,90 @@ async def add_environment_role_permission( ) return EnvironmentRole.model_validate(response) + + # Role Assignments + + async def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> RoleAssignmentsListResource: + # list_params includes organization_membership_id so auto-pagination + # can reconstruct the full request. query_params excludes it because + # it is already embedded in the URL path and must not be sent as a + # query-string parameter. + list_params: RoleAssignmentListFilters = { + "organization_membership_id": organization_membership_id, + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + query_params: ListArgs = { + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + response = await self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_GET, + params=query_params, + ) + + return WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata + ]( + list_method=self.list_role_assignments, + list_args=list_params, + **ListPage[RoleAssignment](**response).model_dump(), + ) + + async def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> RoleAssignment: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + response = await self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_POST, + json=json, + ) + + return RoleAssignment.model_validate(response) + + async def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> None: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + await self._http_client.delete_with_body( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + json=json, + ) + + async def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> None: + await self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}", + method=REQUEST_METHOD_DELETE, + ) diff --git a/src/workos/types/authorization/__init__.py b/src/workos/types/authorization/__init__.py index 9eb705a0..93946662 100644 --- a/src/workos/types/authorization/__init__.py +++ b/src/workos/types/authorization/__init__.py @@ -13,6 +13,11 @@ ) from workos.types.authorization.permission import Permission from workos.types.authorization.resource import Resource +from workos.types.authorization.resource_identifier import ( + ResourceIdentifier, + ResourceIdentifierByExternalId, + ResourceIdentifierById, +) from workos.types.authorization.role import ( Role, RoleList, diff --git a/src/workos/types/authorization/resource_identifier.py b/src/workos/types/authorization/resource_identifier.py new file mode 100644 index 00000000..081a175d --- /dev/null +++ b/src/workos/types/authorization/resource_identifier.py @@ -0,0 +1,15 @@ +from typing import Union + +from typing_extensions import TypedDict + + +class ResourceIdentifierById(TypedDict): + resource_id: str + + +class ResourceIdentifierByExternalId(TypedDict): + resource_external_id: str + resource_type_slug: str + + +ResourceIdentifier = Union[ResourceIdentifierById, ResourceIdentifierByExternalId] diff --git a/tests/test_authorization_role_assignments.py b/tests/test_authorization_role_assignments.py new file mode 100644 index 00000000..00a8d3d6 --- /dev/null +++ b/tests/test_authorization_role_assignments.py @@ -0,0 +1,216 @@ +from typing import Union + +import pytest +from tests.utils.fixtures.mock_role_assignment import MockRoleAssignment +from tests.utils.list_resource import list_response_of +from tests.utils.syncify import syncify +from workos.authorization import AsyncAuthorization, Authorization + + +@pytest.mark.sync_and_async(Authorization, AsyncAuthorization) +class TestAuthorizationRoleAssignments: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[Authorization, AsyncAuthorization]): + self.http_client = module_instance._http_client + self.authorization = module_instance + + @pytest.fixture + def mock_role_assignment(self): + return MockRoleAssignment(id="ra_01ABC").dict() + + @pytest.fixture + def mock_role_assignments(self): + assignment_list = [ + MockRoleAssignment(id=f"ra_{i}", role_slug=f"role-{i}").dict() + for i in range(5) + ] + return { + "data": assignment_list, + "list_metadata": {"before": None, "after": None}, + "object": "list", + } + + @pytest.fixture + def mock_role_assignments_multiple_data_pages(self): + assignment_list = [ + MockRoleAssignment(id=f"ra_{i}", role_slug=f"role-{i}").dict() + for i in range(40) + ] + return list_response_of(data=assignment_list) + + def test_list_role_assignments( + self, mock_role_assignments, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments, 200 + ) + + response = syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC" + ) + ) + + assert request_kwargs["method"] == "get" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert "organization_membership_id" not in request_kwargs["params"] + assert len(response.data) == 5 + + def test_list_role_assignments_with_params( + self, mock_role_assignments, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + limit=5, + after="ra_cursor", + order="asc", + ) + ) + + assert request_kwargs["params"]["limit"] == 5 + assert request_kwargs["params"]["after"] == "ra_cursor" + assert request_kwargs["params"]["order"] == "asc" + + def test_list_role_assignments_auto_pagination( + self, + mock_role_assignments_multiple_data_pages, + test_auto_pagination, + ): + test_auto_pagination( + http_client=self.http_client, + list_function=self.authorization.list_role_assignments, + expected_all_page_data=mock_role_assignments_multiple_data_pages["data"], + list_function_params={ + "organization_membership_id": "om_01ABC", + }, + ) + + def test_assign_role( + self, mock_role_assignment, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignment, 201 + ) + + role_assignment = syncify( + self.authorization.assign_role( + "om_01ABC", + role_slug="admin", + resource_identifier={"resource_id": "res_01ABC"}, + ) + ) + + assert role_assignment.id == "ra_01ABC" + assert role_assignment.role.slug == "admin" + assert request_kwargs["method"] == "post" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_id": "res_01ABC", + } + + def test_assign_role_with_external_id( + self, mock_role_assignment, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignment, 201 + ) + + role_assignment = syncify( + self.authorization.assign_role( + "om_01ABC", + role_slug="admin", + resource_identifier={ + "resource_external_id": "ext_123", + "resource_type_slug": "document", + }, + ) + ) + + assert role_assignment.id == "ra_01ABC" + assert request_kwargs["method"] == "post" + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_external_id": "ext_123", + "resource_type_slug": "document", + } + + def test_remove_role(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify( + self.authorization.remove_role( + "om_01ABC", + role_slug="admin", + resource_identifier={"resource_id": "res_01ABC"}, + ) + ) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_id": "res_01ABC", + } + + def test_remove_role_with_external_id(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify( + self.authorization.remove_role( + "om_01ABC", + role_slug="admin", + resource_identifier={ + "resource_external_id": "ext_123", + "resource_type_slug": "document", + }, + ) + ) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_external_id": "ext_123", + "resource_type_slug": "document", + } + + def test_remove_role_assignment(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify( + self.authorization.remove_role_assignment("om_01ABC", "ra_01ABC") + ) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments/ra_01ABC" + )