Skip to content

Commit b2e644c

Browse files
authored
feat(pipeline-stream): add log stream (#46)
* feat(pipeline-stream): add log stream
1 parent 1ac7c66 commit b2e644c

File tree

17 files changed

+1153
-52
lines changed

17 files changed

+1153
-52
lines changed

examples/create_workspace_with_landscape.py

Lines changed: 75 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,59 +3,92 @@
33

44
from codesphere import CodesphereSDK
55
from codesphere.resources.workspace import WorkspaceCreate
6-
from codesphere.resources.workspace.landscape import ProfileBuilder, ProfileConfig
6+
from codesphere.resources.workspace.landscape import (
7+
PipelineStage,
8+
PipelineState,
9+
ProfileBuilder,
10+
)
11+
from codesphere.resources.workspace.logs import LogStage
712

8-
TEAM_ID = 123 # Replace with your actual team ID
13+
TEAM_ID = 123
914

1015

11-
async def get_plan_id(sdk: CodesphereSDK, plan_name: str = "Micro") -> int:
12-
plans = await sdk.metadata.list_plans()
13-
plan = next((p for p in plans if p.title == plan_name and not p.deprecated), None)
14-
if not plan:
15-
raise ValueError(f"Plan '{plan_name}' not found")
16-
return plan.id
16+
async def main():
17+
async with CodesphereSDK() as sdk:
18+
plans = await sdk.metadata.list_plans()
19+
plan = next((p for p in plans if p.title == "Micro" and not p.deprecated), None)
20+
if not plan:
21+
raise ValueError("Micro plan not found")
22+
23+
workspace_name = f"pipeline-demo-{int(time.time())}"
24+
25+
print(f"Creating workspace '{workspace_name}'...")
26+
workspace = await sdk.workspaces.create(
27+
WorkspaceCreate(plan_id=plan.id, team_id=TEAM_ID, name=workspace_name)
28+
)
29+
print(f"✓ Workspace created (ID: {workspace.id})")
30+
31+
print("Waiting for workspace to start...")
32+
await workspace.wait_until_running(timeout=300.0, poll_interval=5.0)
33+
print("✓ Workspace is running\n")
1734

35+
profile = (
36+
ProfileBuilder()
37+
.prepare()
38+
.add_step("echo 'Installing dependencies...' && sleep 2")
39+
.add_step("echo 'Setup complete!' && sleep 1")
40+
.done()
41+
.add_reactive_service("web")
42+
.plan(plan.id)
43+
.add_step(
44+
'for i in $(seq 1 20); do echo "[$i] Processing request..."; sleep 1; done'
45+
)
46+
.add_port(3000, public=True)
47+
.add_path("/", port=3000)
48+
.replicas(1)
49+
.done()
50+
.build()
51+
)
1852

19-
def build_web_profile(plan_id: int) -> ProfileConfig:
20-
"""Build a simple web service landscape profile."""
21-
return (
22-
ProfileBuilder()
23-
.prepare()
24-
.add_step("npm install", name="Install dependencies")
25-
.done()
26-
.add_reactive_service("web")
27-
.plan(plan_id)
28-
.add_step("npm start")
29-
.add_port(3000, public=True)
30-
.add_path("/", port=3000)
31-
.replicas(1)
32-
.env("NODE_ENV", "production")
33-
.build()
34-
)
53+
print("Deploying landscape profile...")
54+
await workspace.landscape.save_profile("production", profile)
55+
await workspace.landscape.deploy(profile="production")
56+
print("✓ Profile deployed\n")
3557

58+
print("--- Prepare Stage ---")
59+
await workspace.landscape.start_stage(
60+
PipelineStage.PREPARE, profile="production"
61+
)
62+
prepare_status = await workspace.landscape.wait_for_stage(
63+
PipelineStage.PREPARE, timeout=60.0
64+
)
3665

37-
async def create_workspace(sdk: CodesphereSDK, plan_id: int, name: str):
38-
workspace = await sdk.workspaces.create(
39-
WorkspaceCreate(plan_id=plan_id, team_id=TEAM_ID, name=name)
40-
)
41-
await workspace.wait_until_running(timeout=300.0, poll_interval=5.0)
42-
return workspace
66+
for status in prepare_status:
67+
icon = "✓" if status.state == PipelineState.SUCCESS else "✗"
68+
print(f"{icon} {status.server}: {status.state.value}")
4369

70+
print("\nPrepare logs:")
71+
for step in range(len(prepare_status[0].steps)):
72+
logs = await workspace.logs.collect(
73+
stage=LogStage.PREPARE, step=step, timeout=5.0
74+
)
75+
for entry in logs:
76+
if entry.get_text():
77+
print(f" {entry.get_text().strip()}")
4478

45-
async def deploy_landscape(workspace, profile: dict, profile_name: str = "production"):
46-
await workspace.landscape.save_profile(profile_name, profile)
47-
await workspace.landscape.deploy(profile=profile_name)
48-
print("Deployment started!")
79+
print("\n--- Run Stage ---")
80+
await workspace.landscape.start_stage(PipelineStage.RUN, profile="production")
81+
print("Started run stage\n")
4982

83+
print("Streaming logs from 'web' service (using context manager):")
84+
count = 0
85+
async with workspace.logs.open_server_stream(step=0, server="web") as stream:
86+
async for entry in stream:
87+
if entry.get_text():
88+
print(f" {entry.get_text().strip()}")
89+
count += 1
5090

51-
async def main():
52-
async with CodesphereSDK() as sdk:
53-
plan_id = await get_plan_id(sdk)
54-
workspace = await create_workspace(
55-
sdk, plan_id, f"landscape-demo-{int(time.time())}"
56-
)
57-
profile = build_web_profile(plan_id)
58-
await deploy_landscape(workspace, profile)
91+
print(f"\n✓ Stream ended ({count} log entries)")
5992

6093

6194
if __name__ == "__main__":

src/codesphere/core/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
from .base import ResourceBase
2-
from .operations import APIOperation, AsyncCallable
3-
from .handler import _APIOperationExecutor, APIRequestHandler
1+
from .base import CamelModel, ResourceBase
2+
from .handler import APIRequestHandler, _APIOperationExecutor
3+
from .operations import APIOperation, AsyncCallable, StreamOperation
44

55
__all__ = [
6+
"CamelModel",
67
"ResourceBase",
78
"APIOperation",
89
"_APIOperationExecutor",
910
"APIRequestHandler",
1011
"AsyncCallable",
12+
"StreamOperation",
1113
]

src/codesphere/core/operations.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
1-
from typing import Callable, Awaitable, Generic, Optional, Type, TypeAlias, TypeVar
2-
3-
from pydantic import BaseModel
1+
from typing import Awaitable, Callable, Generic, Optional, Type, TypeAlias, TypeVar
42

3+
from pydantic import BaseModel, ConfigDict
54

65
_T = TypeVar("_T")
76
ResponseT = TypeVar("ResponseT")
87
InputT = TypeVar("InputT")
8+
EntryT = TypeVar("EntryT")
99

1010
AsyncCallable: TypeAlias = Callable[[], Awaitable[_T]]
1111

1212

1313
class APIOperation(BaseModel, Generic[ResponseT, InputT]):
14+
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
15+
1416
method: str
1517
endpoint_template: str
1618
response_model: Type[ResponseT]
1719
input_model: Optional[Type[InputT]] = None
20+
21+
22+
class StreamOperation(BaseModel, Generic[EntryT]):
23+
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
24+
endpoint_template: str
25+
entry_model: Type[EntryT]

src/codesphere/resources/workspace/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .git import GitHead, WorkspaceGitManager
2+
from .logs import LogEntry, LogProblem, LogStage, LogStream, WorkspaceLogManager
23
from .resources import WorkspacesResource
34
from .schemas import (
45
CommandInput,
@@ -19,4 +20,9 @@
1920
"CommandOutput",
2021
"WorkspaceGitManager",
2122
"GitHead",
23+
"LogStream",
24+
"WorkspaceLogManager",
25+
"LogEntry",
26+
"LogProblem",
27+
"LogStage",
2228
]

src/codesphere/resources/workspace/landscape/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
ManagedServiceConfig,
55
NetworkConfig,
66
PathConfig,
7+
PipelineStage,
8+
PipelineState,
9+
PipelineStatus,
10+
PipelineStatusList,
711
PortConfig,
812
Profile,
913
ProfileBuilder,
@@ -12,6 +16,7 @@
1216
ReactiveServiceConfig,
1317
StageConfig,
1418
Step,
19+
StepStatus,
1520
)
1621

1722
__all__ = [
@@ -28,4 +33,9 @@
2833
"NetworkConfig",
2934
"PortConfig",
3035
"PathConfig",
36+
"PipelineStage",
37+
"PipelineState",
38+
"PipelineStatus",
39+
"PipelineStatusList",
40+
"StepStatus",
3141
]

src/codesphere/resources/workspace/landscape/models.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import logging
45
import re
56
from typing import TYPE_CHECKING, Dict, List, Optional, Union
@@ -10,10 +11,20 @@
1011
from .operations import (
1112
_DEPLOY_OP,
1213
_DEPLOY_WITH_PROFILE_OP,
14+
_GET_PIPELINE_STATUS_OP,
1315
_SCALE_OP,
16+
_START_PIPELINE_STAGE_OP,
17+
_START_PIPELINE_STAGE_WITH_PROFILE_OP,
18+
_STOP_PIPELINE_STAGE_OP,
1419
_TEARDOWN_OP,
1520
)
16-
from .schemas import Profile, ProfileConfig
21+
from .schemas import (
22+
PipelineStage,
23+
PipelineState,
24+
PipelineStatusList,
25+
Profile,
26+
ProfileConfig,
27+
)
1728

1829
if TYPE_CHECKING:
1930
from ..schemas import CommandOutput
@@ -95,3 +106,94 @@ async def teardown(self) -> None:
95106

96107
async def scale(self, services: Dict[str, int]) -> None:
97108
await self._execute_operation(_SCALE_OP, data=services)
109+
110+
async def start_stage(
111+
self,
112+
stage: Union[PipelineStage, str],
113+
profile: Optional[str] = None,
114+
) -> None:
115+
if isinstance(stage, PipelineStage):
116+
stage = stage.value
117+
118+
if profile is not None:
119+
_validate_profile_name(profile)
120+
await self._execute_operation(
121+
_START_PIPELINE_STAGE_WITH_PROFILE_OP, stage=stage, profile=profile
122+
)
123+
else:
124+
await self._execute_operation(_START_PIPELINE_STAGE_OP, stage=stage)
125+
126+
async def stop_stage(self, stage: Union[PipelineStage, str]) -> None:
127+
if isinstance(stage, PipelineStage):
128+
stage = stage.value
129+
130+
await self._execute_operation(_STOP_PIPELINE_STAGE_OP, stage=stage)
131+
132+
async def get_stage_status(
133+
self, stage: Union[PipelineStage, str]
134+
) -> PipelineStatusList:
135+
if isinstance(stage, PipelineStage):
136+
stage = stage.value
137+
138+
return await self._execute_operation(_GET_PIPELINE_STATUS_OP, stage=stage)
139+
140+
async def wait_for_stage(
141+
self,
142+
stage: Union[PipelineStage, str],
143+
*,
144+
timeout: float = 300.0,
145+
poll_interval: float = 5.0,
146+
server: Optional[str] = None,
147+
) -> PipelineStatusList:
148+
if poll_interval <= 0:
149+
raise ValueError("poll_interval must be greater than 0")
150+
151+
stage_name = stage.value if isinstance(stage, PipelineStage) else stage
152+
elapsed = 0.0
153+
154+
while elapsed < timeout:
155+
status_list = await self.get_stage_status(stage)
156+
157+
relevant_statuses = []
158+
for s in status_list:
159+
if server is not None:
160+
if s.server == server:
161+
relevant_statuses.append(s)
162+
else:
163+
if s.steps:
164+
relevant_statuses.append(s)
165+
elif s.state != PipelineState.WAITING:
166+
relevant_statuses.append(s)
167+
168+
if not relevant_statuses:
169+
log.debug(
170+
"Pipeline stage '%s': no servers with steps yet, waiting...",
171+
stage_name,
172+
)
173+
await asyncio.sleep(poll_interval)
174+
elapsed += poll_interval
175+
continue
176+
177+
all_completed = all(
178+
s.state
179+
in (PipelineState.SUCCESS, PipelineState.FAILURE, PipelineState.ABORTED)
180+
for s in relevant_statuses
181+
)
182+
183+
if all_completed:
184+
log.debug("Pipeline stage '%s' completed.", stage_name)
185+
return PipelineStatusList(root=relevant_statuses)
186+
187+
states = [f"{s.server}={s.state.value}" for s in relevant_statuses]
188+
log.debug(
189+
"Pipeline stage '%s' status: %s (elapsed: %.1fs)",
190+
stage_name,
191+
", ".join(states),
192+
elapsed,
193+
)
194+
await asyncio.sleep(poll_interval)
195+
elapsed += poll_interval
196+
197+
raise TimeoutError(
198+
f"Pipeline stage '{stage_name}' did not complete within {timeout} seconds."
199+
)

src/codesphere/resources/workspace/landscape/operations.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from ....core.operations import APIOperation
2+
from .schemas import PipelineStatusList
23

34
_DEPLOY_OP = APIOperation(
45
method="POST",
@@ -23,3 +24,27 @@
2324
endpoint_template="/workspaces/{id}/landscape/scale",
2425
response_model=type(None),
2526
)
27+
28+
_START_PIPELINE_STAGE_OP = APIOperation(
29+
method="POST",
30+
endpoint_template="/workspaces/{id}/pipeline/{stage}/start",
31+
response_model=type(None),
32+
)
33+
34+
_START_PIPELINE_STAGE_WITH_PROFILE_OP = APIOperation(
35+
method="POST",
36+
endpoint_template="/workspaces/{id}/pipeline/{stage}/start/{profile}",
37+
response_model=type(None),
38+
)
39+
40+
_STOP_PIPELINE_STAGE_OP = APIOperation(
41+
method="POST",
42+
endpoint_template="/workspaces/{id}/pipeline/{stage}/stop",
43+
response_model=type(None),
44+
)
45+
46+
_GET_PIPELINE_STATUS_OP = APIOperation(
47+
method="GET",
48+
endpoint_template="/workspaces/{id}/pipeline/{stage}",
49+
response_model=PipelineStatusList,
50+
)

0 commit comments

Comments
 (0)