-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschemas.py
More file actions
146 lines (116 loc) · 4.25 KB
/
schemas.py
File metadata and controls
146 lines (116 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import annotations
import asyncio
import logging
from functools import cached_property
from typing import Dict, List, Optional
from ...core import _APIOperationExecutor
from ...core.base import CamelModel
from ...utils import update_model_fields
from .envVars import EnvVar, WorkspaceEnvVarManager
from .git import WorkspaceGitManager
from .landscape import WorkspaceLandscapeManager
log = logging.getLogger(__name__)
class WorkspaceCreate(CamelModel):
team_id: int
name: str
plan_id: int
base_image: Optional[str] = None
is_private_repo: bool = True
replicas: int = 1
git_url: Optional[str] = None
initial_branch: Optional[str] = None
clone_depth: Optional[int] = None
source_workspace_id: Optional[int] = None
welcome_message: Optional[str] = None
vpn_config: Optional[str] = None
restricted: Optional[bool] = None
env: Optional[List[EnvVar]] = None
class WorkspaceBase(CamelModel):
id: int
team_id: int
name: str
plan_id: int
is_private_repo: bool
replicas: int
base_image: Optional[str] = None
data_center_id: int
user_id: int
git_url: Optional[str] = None
initial_branch: Optional[str] = None
source_workspace_id: Optional[int] = None
welcome_message: Optional[str] = None
vpn_config: Optional[str] = None
restricted: bool
class WorkspaceUpdate(CamelModel):
plan_id: Optional[int] = None
base_image: Optional[str] = None
name: Optional[str] = None
replicas: Optional[int] = None
vpn_config: Optional[str] = None
restricted: Optional[bool] = None
class CommandInput(CamelModel):
command: str
env: Optional[Dict[str, str]] = None
class CommandOutput(CamelModel):
command: str
working_dir: str
output: str
error: str
class WorkspaceStatus(CamelModel):
is_running: bool
class Workspace(WorkspaceBase, _APIOperationExecutor):
async def update(self, data: WorkspaceUpdate) -> None:
from .operations import _UPDATE_OP
await self._execute_operation(_UPDATE_OP, data=data)
update_model_fields(target=self, source=data)
async def delete(self) -> None:
from .operations import _DELETE_OP
await self._execute_operation(_DELETE_OP)
async def get_status(self) -> WorkspaceStatus:
from .operations import _GET_STATUS_OP
return await self._execute_operation(_GET_STATUS_OP)
async def wait_until_running(
self,
*,
timeout: float = 300.0,
poll_interval: float = 5.0,
) -> None:
if poll_interval <= 0:
raise ValueError("poll_interval must be greater than 0")
elapsed = 0.0
while elapsed < timeout:
status = await self.get_status()
if status.is_running:
log.debug("Workspace %s is now running.", self.id)
return
log.debug(
"Workspace %s not running yet, waiting %ss... (elapsed: %.1fs)",
self.id,
poll_interval,
elapsed,
)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(
f"Workspace {self.id} did not reach running state within {timeout} seconds."
)
async def execute_command(
self, command: str, env: Optional[Dict[str, str]] = None
) -> CommandOutput:
from .operations import _EXECUTE_COMMAND_OP
command_data = CommandInput(command=command, env=env)
return await self._execute_operation(_EXECUTE_COMMAND_OP, data=command_data)
@cached_property
def env_vars(self) -> WorkspaceEnvVarManager:
http_client = self.validate_http_client()
return WorkspaceEnvVarManager(http_client, workspace_id=self.id)
@cached_property
def landscape(self) -> WorkspaceLandscapeManager:
"""Manager for landscape operations (Multi Server Deployments)."""
http_client = self.validate_http_client()
return WorkspaceLandscapeManager(http_client, workspace_id=self.id)
@cached_property
def git(self) -> WorkspaceGitManager:
"""Manager for git operations (head, pull)."""
http_client = self.validate_http_client()
return WorkspaceGitManager(http_client, workspace_id=self.id)