Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions tests/test_pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import pytest

from tests.utils.syncify import syncify
from workos.pipes import AsyncPipes, Pipes


@pytest.mark.sync_and_async(Pipes, AsyncPipes)
class TestPipes:
@pytest.fixture
def mock_access_token(self):
return {
"object": "access_token",
"access_token": "test_access_token_123",
"expires_at": "2026-01-09T12:00:00.000Z",
"scopes": ["read:users", "write:users"],
"missing_scopes": [],
}

def test_get_access_token_success_with_expiry(
self,
module_instance,
mock_access_token,
capture_and_mock_http_client_request,
):
response_body = {
"active": True,
"access_token": mock_access_token,
}
request_kwargs = capture_and_mock_http_client_request(
module_instance._http_client, response_body, 200
)

result = syncify(
module_instance.get_access_token(
provider="test-provider",
user_id="user_123",
)
)

assert request_kwargs["url"].endswith("data-integrations/test-provider/token")
assert request_kwargs["method"] == "post"
assert request_kwargs["json"]["user_id"] == "user_123"
assert result.active is True
assert result.access_token.access_token == mock_access_token["access_token"]
assert result.access_token.scopes == mock_access_token["scopes"]

def test_get_access_token_success_without_expiry(
self,
module_instance,
capture_and_mock_http_client_request,
):
response_body = {
"active": True,
"access_token": {
"object": "access_token",
"access_token": "test_token",
"expires_at": None,
"scopes": ["read"],
"missing_scopes": [],
},
}
capture_and_mock_http_client_request(
module_instance._http_client, response_body, 200
)

result = syncify(
module_instance.get_access_token(
provider="test-provider",
user_id="user_123",
)
)

assert result.active is True
assert result.access_token.expires_at is None

def test_get_access_token_with_organization_id(
self,
module_instance,
mock_access_token,
capture_and_mock_http_client_request,
):
response_body = {
"active": True,
"access_token": mock_access_token,
}
request_kwargs = capture_and_mock_http_client_request(
module_instance._http_client, response_body, 200
)

syncify(
module_instance.get_access_token(
provider="test-provider",
user_id="user_123",
organization_id="org_456",
)
)

assert request_kwargs["json"]["organization_id"] == "org_456"

def test_get_access_token_without_organization_id(
self,
module_instance,
mock_access_token,
capture_and_mock_http_client_request,
):
response_body = {
"active": True,
"access_token": mock_access_token,
}
request_kwargs = capture_and_mock_http_client_request(
module_instance._http_client, response_body, 200
)

syncify(
module_instance.get_access_token(
provider="test-provider",
user_id="user_123",
)
)

assert "organization_id" not in request_kwargs["json"]

def test_get_access_token_not_installed(
self,
module_instance,
capture_and_mock_http_client_request,
):
response_body = {
"active": False,
"error": "not_installed",
}
capture_and_mock_http_client_request(
module_instance._http_client, response_body, 200
)

result = syncify(
module_instance.get_access_token(
provider="test-provider",
user_id="user_123",
)
)

assert result.active is False
assert result.error == "not_installed"

def test_get_access_token_needs_reauthorization(
self,
module_instance,
capture_and_mock_http_client_request,
):
response_body = {
"active": False,
"error": "needs_reauthorization",
}
capture_and_mock_http_client_request(
module_instance._http_client, response_body, 200
)

result = syncify(
module_instance.get_access_token(
provider="test-provider",
user_id="user_123",
)
)

assert result.active is False
assert result.error == "needs_reauthorization"
5 changes: 5 additions & 0 deletions workos/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from workos.fga import FGAModule
from workos.mfa import MFAModule
from workos.organization_domains import OrganizationDomainsModule
from workos.pipes import PipesModule
from workos.organizations import OrganizationsModule
from workos.passwordless import PasswordlessModule
from workos.portal import PortalModule
Expand Down Expand Up @@ -101,6 +102,10 @@ def organization_domains(self) -> OrganizationDomainsModule: ...
@abstractmethod
def passwordless(self) -> PasswordlessModule: ...

@property
@abstractmethod
def pipes(self) -> PipesModule: ...

@property
@abstractmethod
def portal(self) -> PortalModule: ...
Expand Down
7 changes: 7 additions & 0 deletions workos/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from workos.organizations import AsyncOrganizations
from workos.organization_domains import AsyncOrganizationDomains
from workos.passwordless import PasswordlessModule
from workos.pipes import AsyncPipes
from workos.portal import PortalModule
from workos.sso import AsyncSSO
from workos.user_management import AsyncUserManagement
Expand Down Expand Up @@ -102,6 +103,12 @@ def passwordless(self) -> PasswordlessModule:
"Passwordless APIs are not yet supported in the async client."
)

@property
def pipes(self) -> AsyncPipes:
if not getattr(self, "_pipes", None):
self._pipes = AsyncPipes(self._http_client)
return self._pipes

@property
def portal(self) -> PortalModule:
raise NotImplementedError(
Expand Down
7 changes: 7 additions & 0 deletions workos/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from workos.organizations import Organizations
from workos.organization_domains import OrganizationDomains
from workos.passwordless import Passwordless
from workos.pipes import Pipes
from workos.portal import Portal
from workos.sso import SSO
from workos.webhooks import Webhooks
Expand Down Expand Up @@ -102,6 +103,12 @@ def passwordless(self) -> Passwordless:
self._passwordless = Passwordless(self._http_client)
return self._passwordless

@property
def pipes(self) -> Pipes:
if not getattr(self, "_pipes", None):
self._pipes = Pipes(self._http_client)
return self._pipes

@property
def portal(self) -> Portal:
if not getattr(self, "_portal", None):
Expand Down
93 changes: 93 additions & 0 deletions workos/pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Dict, Optional, Protocol

from workos.types.pipes import (
GetAccessTokenFailureResponse,
GetAccessTokenResponse,
GetAccessTokenSuccessResponse,
)
from workos.typing.sync_or_async import SyncOrAsync
from workos.utils.http_client import AsyncHTTPClient, SyncHTTPClient
from workos.utils.request_helper import REQUEST_METHOD_POST


class PipesModule(Protocol):
"""Protocol defining the Pipes module interface."""

def get_access_token(
self,
*,
provider: str,
user_id: str,
organization_id: Optional[str] = None,
) -> SyncOrAsync[GetAccessTokenResponse]:
"""Retrieve an access token for a third-party provider.

Kwargs:
provider (str): The third-party provider identifier
user_id (str): The WorkOS user ID
organization_id (str, optional): The WorkOS organization ID

Returns:
GetAccessTokenResponse: Success response with token or failure response with error
"""
...


class Pipes(PipesModule):
"""Sync implementation of the Pipes module."""

_http_client: SyncHTTPClient

def __init__(self, http_client: SyncHTTPClient):
self._http_client = http_client

def get_access_token(
self,
*,
provider: str,
user_id: str,
organization_id: Optional[str] = None,
) -> GetAccessTokenResponse:
json_data: Dict[str, str] = {"user_id": user_id}
if organization_id is not None:
json_data["organization_id"] = organization_id

response = self._http_client.request(
f"data-integrations/{provider}/token",
method=REQUEST_METHOD_POST,
json=json_data,
)

if response.get("active") is True:
return GetAccessTokenSuccessResponse.model_validate(response)
return GetAccessTokenFailureResponse.model_validate(response)


class AsyncPipes(PipesModule):
"""Async implementation of the Pipes module."""

_http_client: AsyncHTTPClient

def __init__(self, http_client: AsyncHTTPClient):
self._http_client = http_client

async def get_access_token(
self,
*,
provider: str,
user_id: str,
organization_id: Optional[str] = None,
) -> GetAccessTokenResponse:
json_data: Dict[str, str] = {"user_id": user_id}
if organization_id is not None:
json_data["organization_id"] = organization_id

response = await self._http_client.request(
f"data-integrations/{provider}/token",
method=REQUEST_METHOD_POST,
json=json_data,
)

if response.get("active") is True:
return GetAccessTokenSuccessResponse.model_validate(response)
return GetAccessTokenFailureResponse.model_validate(response)
6 changes: 6 additions & 0 deletions workos/types/pipes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from workos.types.pipes.pipes import (
AccessToken as AccessToken,
GetAccessTokenFailureResponse as GetAccessTokenFailureResponse,
GetAccessTokenResponse as GetAccessTokenResponse,
GetAccessTokenSuccessResponse as GetAccessTokenSuccessResponse,
)
34 changes: 34 additions & 0 deletions workos/types/pipes/pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime
from typing import Literal, Optional, Sequence, Union

from workos.types.workos_model import WorkOSModel


class AccessToken(WorkOSModel):
"""Represents an OAuth access token for a third-party provider."""

object: Literal["access_token"]
access_token: str
expires_at: Optional[datetime] = None
scopes: Sequence[str]
missing_scopes: Sequence[str]


class GetAccessTokenSuccessResponse(WorkOSModel):
"""Successful response containing the access token."""

active: Literal[True]
access_token: AccessToken


class GetAccessTokenFailureResponse(WorkOSModel):
"""Failed response indicating why the token couldn't be retrieved."""

active: Literal[False]
error: Literal["not_installed", "needs_reauthorization"]


GetAccessTokenResponse = Union[
GetAccessTokenSuccessResponse,
GetAccessTokenFailureResponse,
]