Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions src/workos/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from workos.types.authorization.organization_role import OrganizationRole
from workos.types.authorization.permission import Permission
from workos.types.authorization.role import Role, RoleList
from workos.types.authorization.resource_identifier import ResourceIdentifier
from workos.types.authorization.role_assignment import RoleAssignment
from workos.types.list_resource import (
ListArgs,
ListMetadata,
Expand All @@ -28,6 +30,7 @@
)

AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions"
AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH = "authorization/organization_memberships"

_role_adapter: TypeAdapter[Role] = TypeAdapter(Role)

Expand All @@ -41,6 +44,15 @@ class PermissionListFilters(ListArgs, total=False):
]


class RoleAssignmentListFilters(ListArgs, total=False):
organization_membership_id: str


RoleAssignmentsListResource = WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
]


class AuthorizationModule(Protocol):
"""Offers methods through the WorkOS Authorization service."""

Expand Down Expand Up @@ -161,6 +173,38 @@ def add_environment_role_permission(
permission_slug: str,
) -> SyncOrAsync[EnvironmentRole]: ...

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> SyncOrAsync[RoleAssignmentsListResource]: ...

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> SyncOrAsync[RoleAssignment]: ...

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> SyncOrAsync[None]: ...

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> SyncOrAsync[None]: ...


class Authorization(AuthorizationModule):
_http_client: SyncHTTPClient
Expand Down Expand Up @@ -437,6 +481,93 @@ def add_environment_role_permission(

return EnvironmentRole.model_validate(response)

# Role Assignments

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
# list_params includes organization_membership_id so auto-pagination
# can reconstruct the full request. query_params excludes it because
# it is already embedded in the URL path and must not be sent as a
# query-string parameter.
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> RoleAssignment:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

response = self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json=json,
)

return RoleAssignment.model_validate(response)

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> None:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

self._http_client.delete_with_body(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
json=json,
)

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)


class AsyncAuthorization(AuthorizationModule):
_http_client: AsyncHTTPClient
Expand Down Expand Up @@ -712,3 +843,90 @@ async def add_environment_role_permission(
)

return EnvironmentRole.model_validate(response)

# Role Assignments

async def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
# list_params includes organization_membership_id so auto-pagination
# can reconstruct the full request. query_params excludes it because
# it is already embedded in the URL path and must not be sent as a
# query-string parameter.
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)

async def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> RoleAssignment:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

response = await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json=json,
)

return RoleAssignment.model_validate(response)

async def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> None:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

await self._http_client.delete_with_body(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
json=json,
)

async def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)
5 changes: 5 additions & 0 deletions src/workos/types/authorization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
)
from workos.types.authorization.permission import Permission
from workos.types.authorization.resource import Resource
from workos.types.authorization.resource_identifier import (
ResourceIdentifier,
ResourceIdentifierByExternalId,
ResourceIdentifierById,
)
from workos.types.authorization.role import (
Role,
RoleList,
Expand Down
15 changes: 15 additions & 0 deletions src/workos/types/authorization/resource_identifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Union

from typing_extensions import TypedDict


class ResourceIdentifierById(TypedDict):
resource_id: str


class ResourceIdentifierByExternalId(TypedDict):
resource_external_id: str
resource_type_slug: str


ResourceIdentifier = Union[ResourceIdentifierById, ResourceIdentifierByExternalId]
Comment on lines +1 to +15

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is duplicate of what is in https://github.com/workos/workos-python/pull/568/changes#r2842020033, can consolidate

Loading