Skip to content
Merged
218 changes: 218 additions & 0 deletions src/workos/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from workos.types.authorization.organization_role import OrganizationRole
from workos.types.authorization.permission import Permission
from workos.types.authorization.role import Role, RoleList
from workos.types.authorization.resource_identifier import ResourceIdentifier
from workos.types.authorization.role_assignment import RoleAssignment
from workos.types.list_resource import (
ListArgs,
ListMetadata,
Expand All @@ -28,6 +30,7 @@
)

AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions"
AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH = "authorization/organization_memberships"

_role_adapter: TypeAdapter[Role] = TypeAdapter(Role)

Expand All @@ -41,6 +44,15 @@ class PermissionListFilters(ListArgs, total=False):
]


class RoleAssignmentListFilters(ListArgs, total=False):
organization_membership_id: str


RoleAssignmentsListResource = WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
]


class AuthorizationModule(Protocol):
"""Offers methods through the WorkOS Authorization service."""

Expand Down Expand Up @@ -161,6 +173,38 @@ def add_environment_role_permission(
permission_slug: str,
) -> SyncOrAsync[EnvironmentRole]: ...

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> SyncOrAsync[RoleAssignmentsListResource]: ...

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> SyncOrAsync[RoleAssignment]: ...

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> SyncOrAsync[None]: ...

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> SyncOrAsync[None]: ...


class Authorization(AuthorizationModule):
_http_client: SyncHTTPClient
Expand Down Expand Up @@ -437,6 +481,93 @@ def add_environment_role_permission(

return EnvironmentRole.model_validate(response)

# Role Assignments

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
# list_params includes organization_membership_id so auto-pagination
# can reconstruct the full request. query_params excludes it because
# it is already embedded in the URL path and must not be sent as a
# query-string parameter.
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> RoleAssignment:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

response = self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json=json,
)

return RoleAssignment.model_validate(response)

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> None:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

self._http_client.delete_with_body(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
json=json,
)

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)


class AsyncAuthorization(AuthorizationModule):
_http_client: AsyncHTTPClient
Expand Down Expand Up @@ -712,3 +843,90 @@ async def add_environment_role_permission(
)

return EnvironmentRole.model_validate(response)

# Role Assignments

async def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
# list_params includes organization_membership_id so auto-pagination
# can reconstruct the full request. query_params excludes it because
# it is already embedded in the URL path and must not be sent as a
# query-string parameter.
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)

async def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> RoleAssignment:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

response = await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json=json,
)

return RoleAssignment.model_validate(response)

async def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> None:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

await self._http_client.delete_with_body(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
json=json,
)

async def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)
5 changes: 5 additions & 0 deletions src/workos/types/authorization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
)
from workos.types.authorization.permission import Permission
from workos.types.authorization.resource import Resource
from workos.types.authorization.resource_identifier import (
ResourceIdentifier,
ResourceIdentifierByExternalId,
ResourceIdentifierById,
)
from workos.types.authorization.role import (
Role,
RoleList,
Expand Down
15 changes: 15 additions & 0 deletions src/workos/types/authorization/resource_identifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Union

from typing_extensions import TypedDict


class ResourceIdentifierById(TypedDict):
resource_id: str


class ResourceIdentifierByExternalId(TypedDict):
resource_external_id: str
resource_type_slug: str


ResourceIdentifier = Union[ResourceIdentifierById, ResourceIdentifierByExternalId]
Comment on lines +1 to +15
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is duplicate of what is in https://github.com/workos/workos-python/pull/568/changes#r2842020033, can consolidate

Loading