From bf7b6a20a0699ff8cab2232d54bfaeef82a0179a Mon Sep 17 00:00:00 2001 From: swaroopakkineni Date: Fri, 20 Feb 2026 11:38:42 -1000 Subject: [PATCH 1/4] FGA_4: list/assign/remove role, remove roleAssginment --- src/workos/authorization.py | 200 +++++++++++++++++++ tests/test_authorization_role_assignments.py | 155 ++++++++++++++ 2 files changed, 355 insertions(+) create mode 100644 tests/test_authorization_role_assignments.py diff --git a/src/workos/authorization.py b/src/workos/authorization.py index 6e12f035..d0fe3211 100644 --- a/src/workos/authorization.py +++ b/src/workos/authorization.py @@ -9,6 +9,7 @@ from workos.types.authorization.organization_role import OrganizationRole from workos.types.authorization.permission import Permission from workos.types.authorization.role import Role, RoleList +from workos.types.authorization.role_assignment import RoleAssignment from workos.types.list_resource import ( ListArgs, ListMetadata, @@ -41,6 +42,15 @@ class PermissionListFilters(ListArgs, total=False): ] +class RoleAssignmentListFilters(ListArgs, total=False): + organization_membership_id: str + + +RoleAssignmentsListResource = WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata +] + + class AuthorizationModule(Protocol): """Offers methods through the WorkOS Authorization service.""" @@ -161,6 +171,38 @@ def add_environment_role_permission( permission_slug: str, ) -> SyncOrAsync[EnvironmentRole]: ... + # Role Assignments + + def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> SyncOrAsync[RoleAssignmentsListResource]: ... + + def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + ) -> SyncOrAsync[RoleAssignment]: ... + + def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + ) -> SyncOrAsync[None]: ... + + def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> SyncOrAsync[None]: ... + class Authorization(AuthorizationModule): _http_client: SyncHTTPClient @@ -437,6 +479,85 @@ def add_environment_role_permission( return EnvironmentRole.model_validate(response) + # Role Assignments + + def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> RoleAssignmentsListResource: + # list_params includes organization_membership_id so auto-pagination + # can reconstruct the full request. query_params excludes it because + # it is already embedded in the URL path and must not be sent as a + # query-string parameter. + list_params: RoleAssignmentListFilters = { + "organization_membership_id": organization_membership_id, + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + query_params: ListArgs = { + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + response = self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_GET, + params=query_params, + ) + + return WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata + ]( + list_method=self.list_role_assignments, + list_args=list_params, + **ListPage[RoleAssignment](**response).model_dump(), + ) + + def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + ) -> RoleAssignment: + response = self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_POST, + json={"role_slug": role_slug}, + ) + + return RoleAssignment.model_validate(response) + + def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + ) -> None: + self._http_client.delete_with_body( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + json={"role_slug": role_slug}, + ) + + def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> None: + self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments/{role_assignment_id}", + method=REQUEST_METHOD_DELETE, + ) + class AsyncAuthorization(AuthorizationModule): _http_client: AsyncHTTPClient @@ -712,3 +833,82 @@ async def add_environment_role_permission( ) return EnvironmentRole.model_validate(response) + + # Role Assignments + + async def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> RoleAssignmentsListResource: + # list_params includes organization_membership_id so auto-pagination + # can reconstruct the full request. query_params excludes it because + # it is already embedded in the URL path and must not be sent as a + # query-string parameter. + list_params: RoleAssignmentListFilters = { + "organization_membership_id": organization_membership_id, + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + query_params: ListArgs = { + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + response = await self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_GET, + params=query_params, + ) + + return WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata + ]( + list_method=self.list_role_assignments, + list_args=list_params, + **ListPage[RoleAssignment](**response).model_dump(), + ) + + async def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + ) -> RoleAssignment: + response = await self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_POST, + json={"role_slug": role_slug}, + ) + + return RoleAssignment.model_validate(response) + + async def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + ) -> None: + await self._http_client.delete_with_body( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + json={"role_slug": role_slug}, + ) + + async def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> None: + await self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/role_assignments/{role_assignment_id}", + method=REQUEST_METHOD_DELETE, + ) diff --git a/tests/test_authorization_role_assignments.py b/tests/test_authorization_role_assignments.py new file mode 100644 index 00000000..9010e487 --- /dev/null +++ b/tests/test_authorization_role_assignments.py @@ -0,0 +1,155 @@ +from typing import Union + +import pytest +from tests.utils.fixtures.mock_role_assignment import MockRoleAssignment +from tests.utils.list_resource import list_response_of +from tests.utils.syncify import syncify +from workos.authorization import AsyncAuthorization, Authorization + + +@pytest.mark.sync_and_async(Authorization, AsyncAuthorization) +class TestAuthorizationRoleAssignments: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[Authorization, AsyncAuthorization]): + self.http_client = module_instance._http_client + self.authorization = module_instance + + @pytest.fixture + def mock_role_assignment(self): + return MockRoleAssignment(id="ra_01ABC").dict() + + @pytest.fixture + def mock_role_assignments(self): + assignment_list = [ + MockRoleAssignment(id=f"ra_{i}", role_slug=f"role-{i}").dict() + for i in range(5) + ] + return { + "data": assignment_list, + "list_metadata": {"before": None, "after": None}, + "object": "list", + } + + @pytest.fixture + def mock_role_assignments_multiple_data_pages(self): + assignment_list = [ + MockRoleAssignment(id=f"ra_{i}", role_slug=f"role-{i}").dict() + for i in range(40) + ] + return list_response_of(data=assignment_list) + + # --- list_role_assignments --- + + def test_list_role_assignments( + self, mock_role_assignments, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments, 200 + ) + + response = syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC" + ) + ) + + assert request_kwargs["method"] == "get" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert "organization_membership_id" not in request_kwargs["params"] + assert len(response.data) == 5 + + def test_list_role_assignments_with_params( + self, mock_role_assignments, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + limit=5, + after="ra_cursor", + order="asc", + ) + ) + + assert request_kwargs["params"]["limit"] == 5 + assert request_kwargs["params"]["after"] == "ra_cursor" + assert request_kwargs["params"]["order"] == "asc" + + def test_list_role_assignments_auto_pagination( + self, + mock_role_assignments_multiple_data_pages, + test_auto_pagination, + ): + test_auto_pagination( + http_client=self.http_client, + list_function=self.authorization.list_role_assignments, + expected_all_page_data=mock_role_assignments_multiple_data_pages["data"], + list_function_params={ + "organization_membership_id": "om_01ABC", + }, + ) + + # --- assign_role --- + + def test_assign_role( + self, mock_role_assignment, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignment, 201 + ) + + role_assignment = syncify( + self.authorization.assign_role("om_01ABC", role_slug="admin") + ) + + assert role_assignment.id == "ra_01ABC" + assert role_assignment.role.slug == "admin" + assert request_kwargs["method"] == "post" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == {"role_slug": "admin"} + + # --- remove_role --- + + def test_remove_role(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify( + self.authorization.remove_role("om_01ABC", role_slug="admin") + ) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == {"role_slug": "admin"} + + # --- remove_role_assignment --- + + def test_remove_role_assignment(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify( + self.authorization.remove_role_assignment("om_01ABC", "ra_01ABC") + ) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments/ra_01ABC" + ) From fc93478170f213d7193bdf5a9b26c0a017aad601 Mon Sep 17 00:00:00 2001 From: swaroopakkineni Date: Mon, 23 Feb 2026 07:12:30 -1000 Subject: [PATCH 2/4] moar --- src/workos/authorization.py | 16 ++++++-- src/workos/types/authorization/__init__.py | 5 +++ .../authorization/resource_identifier.py | 15 ++++++++ tests/test_authorization_role_assignments.py | 37 ++++++++++++++++++- 4 files changed, 67 insertions(+), 6 deletions(-) create mode 100644 src/workos/types/authorization/resource_identifier.py diff --git a/src/workos/authorization.py b/src/workos/authorization.py index d0fe3211..5ae9ac20 100644 --- a/src/workos/authorization.py +++ b/src/workos/authorization.py @@ -9,6 +9,7 @@ from workos.types.authorization.organization_role import OrganizationRole from workos.types.authorization.permission import Permission from workos.types.authorization.role import Role, RoleList +from workos.types.authorization.resource_identifier import ResourceIdentifier from workos.types.authorization.role_assignment import RoleAssignment from workos.types.list_resource import ( ListArgs, @@ -171,8 +172,6 @@ def add_environment_role_permission( permission_slug: str, ) -> SyncOrAsync[EnvironmentRole]: ... - # Role Assignments - def list_role_assignments( self, *, @@ -188,6 +187,7 @@ def assign_role( organization_membership_id: str, *, role_slug: str, + resource_identifier: ResourceIdentifier, ) -> SyncOrAsync[RoleAssignment]: ... def remove_role( @@ -528,11 +528,15 @@ def assign_role( organization_membership_id: str, *, role_slug: str, + resource_identifier: ResourceIdentifier, ) -> RoleAssignment: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + response = self._http_client.request( f"authorization/organization_memberships/{organization_membership_id}/role_assignments", method=REQUEST_METHOD_POST, - json={"role_slug": role_slug}, + json=json, ) return RoleAssignment.model_validate(response) @@ -883,11 +887,15 @@ async def assign_role( organization_membership_id: str, *, role_slug: str, + resource_identifier: ResourceIdentifier, ) -> RoleAssignment: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + response = await self._http_client.request( f"authorization/organization_memberships/{organization_membership_id}/role_assignments", method=REQUEST_METHOD_POST, - json={"role_slug": role_slug}, + json=json, ) return RoleAssignment.model_validate(response) diff --git a/src/workos/types/authorization/__init__.py b/src/workos/types/authorization/__init__.py index 9eb705a0..93946662 100644 --- a/src/workos/types/authorization/__init__.py +++ b/src/workos/types/authorization/__init__.py @@ -13,6 +13,11 @@ ) from workos.types.authorization.permission import Permission from workos.types.authorization.resource import Resource +from workos.types.authorization.resource_identifier import ( + ResourceIdentifier, + ResourceIdentifierByExternalId, + ResourceIdentifierById, +) from workos.types.authorization.role import ( Role, RoleList, diff --git a/src/workos/types/authorization/resource_identifier.py b/src/workos/types/authorization/resource_identifier.py new file mode 100644 index 00000000..081a175d --- /dev/null +++ b/src/workos/types/authorization/resource_identifier.py @@ -0,0 +1,15 @@ +from typing import Union + +from typing_extensions import TypedDict + + +class ResourceIdentifierById(TypedDict): + resource_id: str + + +class ResourceIdentifierByExternalId(TypedDict): + resource_external_id: str + resource_type_slug: str + + +ResourceIdentifier = Union[ResourceIdentifierById, ResourceIdentifierByExternalId] diff --git a/tests/test_authorization_role_assignments.py b/tests/test_authorization_role_assignments.py index 9010e487..c12dda95 100644 --- a/tests/test_authorization_role_assignments.py +++ b/tests/test_authorization_role_assignments.py @@ -104,7 +104,11 @@ def test_assign_role( ) role_assignment = syncify( - self.authorization.assign_role("om_01ABC", role_slug="admin") + self.authorization.assign_role( + "om_01ABC", + role_slug="admin", + resource_identifier={"resource_id": "res_01ABC"}, + ) ) assert role_assignment.id == "ra_01ABC" @@ -113,7 +117,36 @@ def test_assign_role( assert request_kwargs["url"].endswith( "/authorization/organization_memberships/om_01ABC/role_assignments" ) - assert request_kwargs["json"] == {"role_slug": "admin"} + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_id": "res_01ABC", + } + + def test_assign_role_with_external_id( + self, mock_role_assignment, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignment, 201 + ) + + role_assignment = syncify( + self.authorization.assign_role( + "om_01ABC", + role_slug="admin", + resource_identifier={ + "resource_external_id": "ext_123", + "resource_type_slug": "document", + }, + ) + ) + + assert role_assignment.id == "ra_01ABC" + assert request_kwargs["method"] == "post" + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_external_id": "ext_123", + "resource_type_slug": "document", + } # --- remove_role --- From 96f6ad7d6f6c3761c92403fbee75648dff474681 Mon Sep 17 00:00:00 2001 From: swaroopakkineni Date: Mon, 23 Feb 2026 07:17:17 -1000 Subject: [PATCH 3/4] moar --- src/workos/authorization.py | 30 +++++++++++++------- tests/test_authorization_role_assignments.py | 11 +++++-- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/workos/authorization.py b/src/workos/authorization.py index 5ae9ac20..24108c37 100644 --- a/src/workos/authorization.py +++ b/src/workos/authorization.py @@ -30,6 +30,7 @@ ) AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions" +AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH = "authorization/organization_memberships" _role_adapter: TypeAdapter[Role] = TypeAdapter(Role) @@ -195,6 +196,7 @@ def remove_role( organization_membership_id: str, *, role_slug: str, + resource_identifier: ResourceIdentifier, ) -> SyncOrAsync[None]: ... def remove_role_assignment( @@ -510,7 +512,7 @@ def list_role_assignments( } response = self._http_client.request( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", method=REQUEST_METHOD_GET, params=query_params, ) @@ -534,7 +536,7 @@ def assign_role( json.update(resource_identifier) response = self._http_client.request( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", method=REQUEST_METHOD_POST, json=json, ) @@ -546,10 +548,14 @@ def remove_role( organization_membership_id: str, *, role_slug: str, + resource_identifier: ResourceIdentifier, ) -> None: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + self._http_client.delete_with_body( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments", - json={"role_slug": role_slug}, + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + json=json, ) def remove_role_assignment( @@ -558,7 +564,7 @@ def remove_role_assignment( role_assignment_id: str, ) -> None: self._http_client.request( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments/{role_assignment_id}", + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}", method=REQUEST_METHOD_DELETE, ) @@ -869,7 +875,7 @@ async def list_role_assignments( } response = await self._http_client.request( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", method=REQUEST_METHOD_GET, params=query_params, ) @@ -893,7 +899,7 @@ async def assign_role( json.update(resource_identifier) response = await self._http_client.request( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments", + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", method=REQUEST_METHOD_POST, json=json, ) @@ -905,10 +911,14 @@ async def remove_role( organization_membership_id: str, *, role_slug: str, + resource_identifier: ResourceIdentifier, ) -> None: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + await self._http_client.delete_with_body( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments", - json={"role_slug": role_slug}, + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + json=json, ) async def remove_role_assignment( @@ -917,6 +927,6 @@ async def remove_role_assignment( role_assignment_id: str, ) -> None: await self._http_client.request( - f"authorization/organization_memberships/{organization_membership_id}/role_assignments/{role_assignment_id}", + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}", method=REQUEST_METHOD_DELETE, ) diff --git a/tests/test_authorization_role_assignments.py b/tests/test_authorization_role_assignments.py index c12dda95..cf1bf164 100644 --- a/tests/test_authorization_role_assignments.py +++ b/tests/test_authorization_role_assignments.py @@ -158,7 +158,11 @@ def test_remove_role(self, capture_and_mock_http_client_request): ) response = syncify( - self.authorization.remove_role("om_01ABC", role_slug="admin") + self.authorization.remove_role( + "om_01ABC", + role_slug="admin", + resource_identifier={"resource_id": "res_01ABC"}, + ) ) assert response is None @@ -166,7 +170,10 @@ def test_remove_role(self, capture_and_mock_http_client_request): assert request_kwargs["url"].endswith( "/authorization/organization_memberships/om_01ABC/role_assignments" ) - assert request_kwargs["json"] == {"role_slug": "admin"} + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_id": "res_01ABC", + } # --- remove_role_assignment --- From 8796fc2905e70ad4ca4283901d3ae9cf7c76fb9c Mon Sep 17 00:00:00 2001 From: swaroopakkineni Date: Mon, 23 Feb 2026 07:30:49 -1000 Subject: [PATCH 4/4] moar tests --- tests/test_authorization_role_assignments.py | 35 ++++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/tests/test_authorization_role_assignments.py b/tests/test_authorization_role_assignments.py index cf1bf164..00a8d3d6 100644 --- a/tests/test_authorization_role_assignments.py +++ b/tests/test_authorization_role_assignments.py @@ -38,8 +38,6 @@ def mock_role_assignments_multiple_data_pages(self): ] return list_response_of(data=assignment_list) - # --- list_role_assignments --- - def test_list_role_assignments( self, mock_role_assignments, capture_and_mock_http_client_request ): @@ -94,8 +92,6 @@ def test_list_role_assignments_auto_pagination( }, ) - # --- assign_role --- - def test_assign_role( self, mock_role_assignment, capture_and_mock_http_client_request ): @@ -148,8 +144,6 @@ def test_assign_role_with_external_id( "resource_type_slug": "document", } - # --- remove_role --- - def test_remove_role(self, capture_and_mock_http_client_request): request_kwargs = capture_and_mock_http_client_request( self.http_client, @@ -175,7 +169,34 @@ def test_remove_role(self, capture_and_mock_http_client_request): "resource_id": "res_01ABC", } - # --- remove_role_assignment --- + def test_remove_role_with_external_id(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify( + self.authorization.remove_role( + "om_01ABC", + role_slug="admin", + resource_identifier={ + "resource_external_id": "ext_123", + "resource_type_slug": "document", + }, + ) + ) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_external_id": "ext_123", + "resource_type_slug": "document", + } def test_remove_role_assignment(self, capture_and_mock_http_client_request): request_kwargs = capture_and_mock_http_client_request(