From 60b59ef1514d7d425604bbbe55803faab7ee88c7 Mon Sep 17 00:00:00 2001 From: Birdcar <434063+birdcar@users.noreply.github.com> Date: Fri, 9 Jan 2026 10:20:35 -0600 Subject: [PATCH] Add Python SDK support for WorkOS Pipes --- tests/test_pipes.py | 167 +++++++++++++++++++++++++++++++++ workos/_base_client.py | 5 + workos/async_client.py | 7 ++ workos/client.py | 7 ++ workos/pipes.py | 93 ++++++++++++++++++ workos/types/pipes/__init__.py | 6 ++ workos/types/pipes/pipes.py | 34 +++++++ 7 files changed, 319 insertions(+) create mode 100644 tests/test_pipes.py create mode 100644 workos/pipes.py create mode 100644 workos/types/pipes/__init__.py create mode 100644 workos/types/pipes/pipes.py diff --git a/tests/test_pipes.py b/tests/test_pipes.py new file mode 100644 index 00000000..90c2be32 --- /dev/null +++ b/tests/test_pipes.py @@ -0,0 +1,167 @@ +import pytest + +from tests.utils.syncify import syncify +from workos.pipes import AsyncPipes, Pipes + + +@pytest.mark.sync_and_async(Pipes, AsyncPipes) +class TestPipes: + @pytest.fixture + def mock_access_token(self): + return { + "object": "access_token", + "access_token": "test_access_token_123", + "expires_at": "2026-01-09T12:00:00.000Z", + "scopes": ["read:users", "write:users"], + "missing_scopes": [], + } + + def test_get_access_token_success_with_expiry( + self, + module_instance, + mock_access_token, + capture_and_mock_http_client_request, + ): + response_body = { + "active": True, + "access_token": mock_access_token, + } + request_kwargs = capture_and_mock_http_client_request( + module_instance._http_client, response_body, 200 + ) + + result = syncify( + module_instance.get_access_token( + provider="test-provider", + user_id="user_123", + ) + ) + + assert request_kwargs["url"].endswith("data-integrations/test-provider/token") + assert request_kwargs["method"] == "post" + assert request_kwargs["json"]["user_id"] == "user_123" + assert result.active is True + assert result.access_token.access_token == mock_access_token["access_token"] + assert result.access_token.scopes == mock_access_token["scopes"] + + def test_get_access_token_success_without_expiry( + self, + module_instance, + capture_and_mock_http_client_request, + ): + response_body = { + "active": True, + "access_token": { + "object": "access_token", + "access_token": "test_token", + "expires_at": None, + "scopes": ["read"], + "missing_scopes": [], + }, + } + capture_and_mock_http_client_request( + module_instance._http_client, response_body, 200 + ) + + result = syncify( + module_instance.get_access_token( + provider="test-provider", + user_id="user_123", + ) + ) + + assert result.active is True + assert result.access_token.expires_at is None + + def test_get_access_token_with_organization_id( + self, + module_instance, + mock_access_token, + capture_and_mock_http_client_request, + ): + response_body = { + "active": True, + "access_token": mock_access_token, + } + request_kwargs = capture_and_mock_http_client_request( + module_instance._http_client, response_body, 200 + ) + + syncify( + module_instance.get_access_token( + provider="test-provider", + user_id="user_123", + organization_id="org_456", + ) + ) + + assert request_kwargs["json"]["organization_id"] == "org_456" + + def test_get_access_token_without_organization_id( + self, + module_instance, + mock_access_token, + capture_and_mock_http_client_request, + ): + response_body = { + "active": True, + "access_token": mock_access_token, + } + request_kwargs = capture_and_mock_http_client_request( + module_instance._http_client, response_body, 200 + ) + + syncify( + module_instance.get_access_token( + provider="test-provider", + user_id="user_123", + ) + ) + + assert "organization_id" not in request_kwargs["json"] + + def test_get_access_token_not_installed( + self, + module_instance, + capture_and_mock_http_client_request, + ): + response_body = { + "active": False, + "error": "not_installed", + } + capture_and_mock_http_client_request( + module_instance._http_client, response_body, 200 + ) + + result = syncify( + module_instance.get_access_token( + provider="test-provider", + user_id="user_123", + ) + ) + + assert result.active is False + assert result.error == "not_installed" + + def test_get_access_token_needs_reauthorization( + self, + module_instance, + capture_and_mock_http_client_request, + ): + response_body = { + "active": False, + "error": "needs_reauthorization", + } + capture_and_mock_http_client_request( + module_instance._http_client, response_body, 200 + ) + + result = syncify( + module_instance.get_access_token( + provider="test-provider", + user_id="user_123", + ) + ) + + assert result.active is False + assert result.error == "needs_reauthorization" diff --git a/workos/_base_client.py b/workos/_base_client.py index 326ab20d..0937bf43 100644 --- a/workos/_base_client.py +++ b/workos/_base_client.py @@ -10,6 +10,7 @@ from workos.fga import FGAModule from workos.mfa import MFAModule from workos.organization_domains import OrganizationDomainsModule +from workos.pipes import PipesModule from workos.organizations import OrganizationsModule from workos.passwordless import PasswordlessModule from workos.portal import PortalModule @@ -101,6 +102,10 @@ def organization_domains(self) -> OrganizationDomainsModule: ... @abstractmethod def passwordless(self) -> PasswordlessModule: ... + @property + @abstractmethod + def pipes(self) -> PipesModule: ... + @property @abstractmethod def portal(self) -> PortalModule: ... diff --git a/workos/async_client.py b/workos/async_client.py index b3d25979..db4f4407 100644 --- a/workos/async_client.py +++ b/workos/async_client.py @@ -10,6 +10,7 @@ from workos.organizations import AsyncOrganizations from workos.organization_domains import AsyncOrganizationDomains from workos.passwordless import PasswordlessModule +from workos.pipes import AsyncPipes from workos.portal import PortalModule from workos.sso import AsyncSSO from workos.user_management import AsyncUserManagement @@ -102,6 +103,12 @@ def passwordless(self) -> PasswordlessModule: "Passwordless APIs are not yet supported in the async client." ) + @property + def pipes(self) -> AsyncPipes: + if not getattr(self, "_pipes", None): + self._pipes = AsyncPipes(self._http_client) + return self._pipes + @property def portal(self) -> PortalModule: raise NotImplementedError( diff --git a/workos/client.py b/workos/client.py index 6c124f51..6b36a9e6 100644 --- a/workos/client.py +++ b/workos/client.py @@ -8,6 +8,7 @@ from workos.organizations import Organizations from workos.organization_domains import OrganizationDomains from workos.passwordless import Passwordless +from workos.pipes import Pipes from workos.portal import Portal from workos.sso import SSO from workos.webhooks import Webhooks @@ -102,6 +103,12 @@ def passwordless(self) -> Passwordless: self._passwordless = Passwordless(self._http_client) return self._passwordless + @property + def pipes(self) -> Pipes: + if not getattr(self, "_pipes", None): + self._pipes = Pipes(self._http_client) + return self._pipes + @property def portal(self) -> Portal: if not getattr(self, "_portal", None): diff --git a/workos/pipes.py b/workos/pipes.py new file mode 100644 index 00000000..481417cf --- /dev/null +++ b/workos/pipes.py @@ -0,0 +1,93 @@ +from typing import Dict, Optional, Protocol + +from workos.types.pipes import ( + GetAccessTokenFailureResponse, + GetAccessTokenResponse, + GetAccessTokenSuccessResponse, +) +from workos.typing.sync_or_async import SyncOrAsync +from workos.utils.http_client import AsyncHTTPClient, SyncHTTPClient +from workos.utils.request_helper import REQUEST_METHOD_POST + + +class PipesModule(Protocol): + """Protocol defining the Pipes module interface.""" + + def get_access_token( + self, + *, + provider: str, + user_id: str, + organization_id: Optional[str] = None, + ) -> SyncOrAsync[GetAccessTokenResponse]: + """Retrieve an access token for a third-party provider. + + Kwargs: + provider (str): The third-party provider identifier + user_id (str): The WorkOS user ID + organization_id (str, optional): The WorkOS organization ID + + Returns: + GetAccessTokenResponse: Success response with token or failure response with error + """ + ... + + +class Pipes(PipesModule): + """Sync implementation of the Pipes module.""" + + _http_client: SyncHTTPClient + + def __init__(self, http_client: SyncHTTPClient): + self._http_client = http_client + + def get_access_token( + self, + *, + provider: str, + user_id: str, + organization_id: Optional[str] = None, + ) -> GetAccessTokenResponse: + json_data: Dict[str, str] = {"user_id": user_id} + if organization_id is not None: + json_data["organization_id"] = organization_id + + response = self._http_client.request( + f"data-integrations/{provider}/token", + method=REQUEST_METHOD_POST, + json=json_data, + ) + + if response.get("active") is True: + return GetAccessTokenSuccessResponse.model_validate(response) + return GetAccessTokenFailureResponse.model_validate(response) + + +class AsyncPipes(PipesModule): + """Async implementation of the Pipes module.""" + + _http_client: AsyncHTTPClient + + def __init__(self, http_client: AsyncHTTPClient): + self._http_client = http_client + + async def get_access_token( + self, + *, + provider: str, + user_id: str, + organization_id: Optional[str] = None, + ) -> GetAccessTokenResponse: + json_data: Dict[str, str] = {"user_id": user_id} + if organization_id is not None: + json_data["organization_id"] = organization_id + + response = await self._http_client.request( + f"data-integrations/{provider}/token", + method=REQUEST_METHOD_POST, + json=json_data, + ) + + if response.get("active") is True: + return GetAccessTokenSuccessResponse.model_validate(response) + return GetAccessTokenFailureResponse.model_validate(response) diff --git a/workos/types/pipes/__init__.py b/workos/types/pipes/__init__.py new file mode 100644 index 00000000..27e630dd --- /dev/null +++ b/workos/types/pipes/__init__.py @@ -0,0 +1,6 @@ +from workos.types.pipes.pipes import ( + AccessToken as AccessToken, + GetAccessTokenFailureResponse as GetAccessTokenFailureResponse, + GetAccessTokenResponse as GetAccessTokenResponse, + GetAccessTokenSuccessResponse as GetAccessTokenSuccessResponse, +) diff --git a/workos/types/pipes/pipes.py b/workos/types/pipes/pipes.py new file mode 100644 index 00000000..6d93db85 --- /dev/null +++ b/workos/types/pipes/pipes.py @@ -0,0 +1,34 @@ +from datetime import datetime +from typing import Literal, Optional, Sequence, Union + +from workos.types.workos_model import WorkOSModel + + +class AccessToken(WorkOSModel): + """Represents an OAuth access token for a third-party provider.""" + + object: Literal["access_token"] + access_token: str + expires_at: Optional[datetime] = None + scopes: Sequence[str] + missing_scopes: Sequence[str] + + +class GetAccessTokenSuccessResponse(WorkOSModel): + """Successful response containing the access token.""" + + active: Literal[True] + access_token: AccessToken + + +class GetAccessTokenFailureResponse(WorkOSModel): + """Failed response indicating why the token couldn't be retrieved.""" + + active: Literal[False] + error: Literal["not_installed", "needs_reauthorization"] + + +GetAccessTokenResponse = Union[ + GetAccessTokenSuccessResponse, + GetAccessTokenFailureResponse, +]