Skip to content

Commit 41fc1e1

Browse files
committed
feat: load all dependencies in worker
1 parent 67749cc commit 41fc1e1

6 files changed

Lines changed: 114 additions & 61 deletions

File tree

questionpy_server/worker/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@
22
# The QuestionPy Server is free software released under terms of the MIT license. See LICENSE.md.
33
# (c) Technische Universität Berlin, innoCampus <info@isis.tu-berlin.de>
44
from abc import ABC, abstractmethod
5+
from collections.abc import Mapping
56
from dataclasses import dataclass
67
from enum import Enum
78
from pathlib import Path
89
from typing import TypedDict, TypeVar, Unpack
910

1011
from pydantic import BaseModel
1112

13+
from questionpy_common import PackageNamespaceAndShortName
1214
from questionpy_common.api.attempt import AttemptModel, AttemptScoredModel, AttemptStartedModel
1315
from questionpy_common.api.question import LmsPermissions
1416
from questionpy_common.elements import OptionsFormDefinition
1517
from questionpy_common.environment import PackagePermissions, RequestInfo
1618
from questionpy_common.manifest import PackageFile
19+
from questionpy_server.dependencies import SolutionAndLocation
1720
from questionpy_server.models import LoadedPackage, QuestionCreated
1821
from questionpy_server.utils.manifest import Manifest
1922
from questionpy_server.worker.runtime.messages import MessageToServer, MessageToWorker
@@ -55,15 +58,22 @@ class PackageFileData:
5558
class WorkerArgs(TypedDict):
5659
name: str
5760
"""A unique name given to the worker by its pool."""
61+
5862
package: PackageLocation
5963
"""The main package that the worker should load when [start][questionpy_server.worker.Worker.start] is called."""
64+
6065
worker_home: Path
6166
"""An existing directory owned by the worker, with the same lifetime as the worker."""
67+
6268
permissions: PackagePermissions
6369
"""The package permissions."""
70+
6471
environment_variables: dict[str, str]
6572
"""Environment variables to be set in the worker."""
6673

74+
dependencies: Mapping[PackageNamespaceAndShortName, SolutionAndLocation]
75+
"""All resolved dependencies in the root package's tree. Does not include the root package itself."""
76+
6777

6878
class Worker(ABC):
6979
"""Interface for worker implementations."""
@@ -75,6 +85,7 @@ def __init__(self, **kwargs: Unpack[WorkerArgs]) -> None:
7585
self.worker_home = kwargs["worker_home"]
7686
self.permissions = kwargs["permissions"]
7787
self.environment_variables = kwargs["environment_variables"]
88+
self._dependencies = kwargs["dependencies"]
7889

7990
self.state = WorkerState.NOT_RUNNING
8091
self.loaded_packages: list[LoadedPackage] = []

questionpy_server/worker/impl/_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ async def _initialize(self) -> None:
112112

113113
async def _load_package(self, package_location: PackageLocation, *, main: bool) -> None:
114114
loaded = await self.send_and_wait_for_response(
115-
LoadQPyPackage(location=package_location, main=main),
115+
LoadQPyPackage(location=package_location, main=main, dependencies=self._dependencies),
116116
LoadQPyPackage.Response,
117117
self.permissions.bootstrap_timeout,
118118
)
@@ -373,7 +373,7 @@ class LimitTimeUsageMixin(Worker, ABC):
373373
the cpu limit in real time.
374374
"""
375375

376-
_real_time_limit_factor = 3
376+
_real_time_limit_factor = 1000
377377

378378
def __init__(self, **kwargs: Any) -> None:
379379
super().__init__(**kwargs)

questionpy_server/worker/pool.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from questionpy_common.environment import PackagePermissions
1818
from questionpy_common.error import QPyBaseError
1919
from questionpy_server.dependencies import WorkerDependencyResolver
20+
from questionpy_server.package import Package
21+
from questionpy_server.utils.manifest import ComparableManifest, read_manifest_from_location
2022
from questionpy_server.worker.impl.subprocess import SubprocessWorker
2123
from questionpy_server.worker.runtime.package_location import (
2224
DirPackageLocation,
@@ -111,7 +113,7 @@ def _memory_available(self, required_memory: int) -> bool:
111113
@asynccontextmanager
112114
async def get_worker(
113115
self,
114-
package: PackageLocation,
116+
package: Package | PackageLocation,
115117
user: str | None,
116118
context: str,
117119
permissions: PackagePermissions,
@@ -122,17 +124,25 @@ async def get_worker(
122124
A context manager is used to ensure that a worker is always given back to the pool.
123125
124126
Args:
125-
package: path to QuestionPy package
127+
package: The main package to be run, either as a [Package][questionpy_server.package.Package] instance, or a
128+
specific [PackageLocation][questionpy_server.worker.runtime.package_location.PackageLocation].
126129
user: the user requesting the worker
127130
context: context within the lms
128131
permissions: package permissions
129132
environment_variables: environment variables to be set in the worker
130133
131134
Returns:
132-
A worker
135+
A new or previously idle worker running the given package.
133136
"""
134137
self._workers_requested += 1
135138

139+
if isinstance(package, Package):
140+
manifest = package.manifest
141+
package_location: PackageLocation = await package.get_zip_package_location()
142+
else:
143+
manifest = await read_manifest_from_location(package)
144+
package_location = package
145+
136146
# Limit the number of running workers.
137147
async with self._semaphore:
138148
worker = None
@@ -149,15 +159,20 @@ async def get_worker(
149159
async with self._lock, self._condition:
150160
await self._condition.wait_for(lambda: self._memory_available(permissions.memory))
151161
worker = await self._create_or_reuse_worker(
152-
package, user, context, permissions, environment_variables
162+
package_location=package_location,
163+
manifest=manifest,
164+
user=user,
165+
context=context,
166+
permissions=permissions,
167+
environment_variables=environment_variables,
153168
)
154169
self._workers_in_use += 1
155170

156171
yield worker
157172
finally:
158173
if worker:
159174
async with self._condition:
160-
await self._handle_idle_worker(package, user, context, worker)
175+
await self._handle_idle_worker(package_location, user, context, worker)
161176
self._condition.notify()
162177
self._workers_in_use -= 1
163178

@@ -219,16 +234,17 @@ def _generate_worker_name(self, package: PackageLocation) -> str:
219234

220235
async def _create_or_reuse_worker(
221236
self,
222-
package: PackageLocation,
237+
*,
238+
package_location: PackageLocation,
239+
manifest: ComparableManifest,
223240
user: str | None,
224241
context: str,
225242
permissions: PackagePermissions,
226243
environment_variables: dict[str, str],
227244
) -> Worker:
228245
"""If possible, get an idle worker or create a new one."""
229-
# Since the `PackagePermissions` only dependent on the `user` and `context` the worker
230-
# permissions are the same.
231-
identifier = _IdleWorkersIdentifier(package, user, context)
246+
# Since the `PackagePermissions` only depend on the `user` and `context`, the worker permissions are the same.
247+
identifier = _IdleWorkersIdentifier(package_location, user, context)
232248
if identifier in self._idle_workers:
233249
# There is an idle worker with this package loaded - reuse the most recent one.
234250
worker = self._idle_workers[identifier].popleft()
@@ -239,18 +255,21 @@ async def _create_or_reuse_worker(
239255

240256
self._memory_idle -= worker.permissions.memory
241257
else:
258+
dependencies = await self._dependency_resolver.resolve_and_retrieve(manifest)
259+
242260
# We need to create a new worker - free as much memory as needed to start the worker.
243261
await self._free_memory(permissions.memory)
244262

245-
name = self._generate_worker_name(package)
263+
name = self._generate_worker_name(package_location)
246264
worker_home = self._working_dir / f"worker-{name}"
247265
await asyncio.to_thread(worker_home.mkdir)
248266

249267
worker = self._worker_type(
250268
name=name,
251-
package=package,
269+
package=package_location,
252270
permissions=permissions,
253271
worker_home=worker_home,
272+
dependencies=dependencies,
254273
environment_variables=environment_variables,
255274
)
256275
await worker.start()

questionpy_server/worker/runtime/manager.py

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from contextlib import contextmanager
88
from dataclasses import dataclass
99
from graphlib import TopologicalSorter
10+
from itertools import chain
1011
from pathlib import Path
1112
from types import MappingProxyType
1213
from typing import TYPE_CHECKING, NoReturn, TypeVar, cast
@@ -22,6 +23,7 @@
2223
set_qpy_environment,
2324
)
2425
from questionpy_common.manifest import PackageType
26+
from questionpy_server.dependencies import SolutionAndLocation, StaticDependencySolution
2527
from questionpy_server.worker.runtime.connection import WorkerToServerConnection
2628
from questionpy_server.worker.runtime.messages import (
2729
CreateQuestionFromOptions,
@@ -87,12 +89,14 @@ def register_on_request_callback(self, callback: OnRequestCallback) -> None:
8789
type OnMessageCallback[M: MessageToWorker] = Callable[[M], MessageToServer]
8890

8991

90-
def _linearize_packages(
91-
packages: Mapping[PackageNamespaceAndShortName, ImportablePackage],
92+
def _linearize_dependencies(
93+
solutions: Mapping[PackageNamespaceAndShortName, SolutionAndLocation],
9294
) -> Sequence[PackageNamespaceAndShortName]:
9395
sorter = TopologicalSorter[PackageNamespaceAndShortName]()
94-
for nssn, package in packages.items():
95-
sorter.add(nssn, *package.dependencies.keys())
96+
97+
for nssn, (solution, _) in solutions.items():
98+
dep_nssns = [PackageNamespaceAndShortName(dep.namespace, dep.short_name) for dep in solution.dependencies.qpy]
99+
sorter.add(nssn, *dep_nssns)
96100

97101
return tuple(sorter.static_order())
98102

@@ -159,54 +163,48 @@ def _open_package(location: PackageLocation, worker_home: Path) -> ImportablePac
159163
# This is a separate method to allow it to be mocked separately.
160164
return open_qpy_package(location, worker_home)
161165

162-
def _open_packages_recursively(
163-
self,
164-
msg: LoadQPyPackage,
165-
package_location: PackageLocation,
166-
stack: tuple[PackageNamespaceAndShortName, ...] = (),
167-
) -> tuple[PackageNamespaceAndShortName, ImportablePackage]:
166+
def on_msg_load_qpy_package(self, msg: LoadQPyPackage) -> MessageToServer:
168167
if not self._env or not self._worker_home:
169168
self._raise_not_initialized(msg)
170169

171-
package = self._open_package(package_location, self._worker_home)
172-
nssn = PackageNamespaceAndShortName(package.manifest.namespace, package.manifest.short_name)
173-
174-
if nssn in stack and self._packages[nssn].manifest.version == package.manifest.version:
175-
raise CircularDependencyError(nssn, stack)
176-
177-
if nssn in self._packages:
178-
# For now, we don't support two packages using the same static dependency, even if they would use the same
179-
# version. Supporting the latter case would require us to either trust or check that both dependency's
180-
# content is identical.
181-
err_msg = f"Package '{nssn}' is already loaded. Dependency stack: {stack}"
182-
raise DependencyError(err_msg, stack)
170+
root_package = self._open_package(msg.location, self._worker_home)
171+
root_nssn = root_package.manifest.nssn
172+
self._packages[root_nssn] = root_package
183173

184-
self._packages[nssn] = package
174+
linearized = _linearize_dependencies(msg.dependencies)
185175

186-
new_stack = (*stack, nssn)
176+
for nssn in reversed(linearized):
177+
solution, package_location = msg.dependencies[nssn]
178+
if isinstance(solution, StaticDependencySolution):
179+
owner = self._packages.get(solution.owner)
180+
if not owner:
181+
# Since we open packages in reverse topological order, this shouldn't happen.
182+
# (Unless the tree passed to us by the server contains errors.)
183+
err_msg = f"Cannot open static dependency '{nssn}' before owner '{solution.owner}'."
184+
raise RuntimeError(err_msg)
187185

188-
if len(stack) >= MAX_QPY_DEPENDENCY_LEVELS and package.manifest.dependencies.qpy:
189-
raise TooDeeplyNestedDependencyError(new_stack)
186+
package_location = owner.resolve_static_dependency(nssn)
190187

191-
for dep_location in package.resolve_static_dependencies():
192-
dep_nssn, dep_package = self._open_packages_recursively(msg, dep_location, new_stack)
193-
package.dependencies[dep_nssn] = dep_package
194-
195-
return nssn, package
196-
197-
def on_msg_load_qpy_package(self, msg: LoadQPyPackage) -> MessageToServer:
198-
if not self._env or not self._worker_home:
199-
self._raise_not_initialized(msg)
200-
201-
root_nssn, root_package = self._open_packages_recursively(msg, msg.location, ())
188+
# MyPy doesn't narrow the type properly.
189+
self._packages[nssn] = self._open_package(cast("PackageLocation", package_location), self._worker_home)
202190

203191
if msg.main:
204192
self._env = dataclasses.replace(self._env, _main_package=root_package)
205193
set_qpy_environment(self._env)
206194

207-
linearized = _linearize_packages(self._packages)
208-
for nssn in linearized:
195+
for nssn in chain(linearized, (root_nssn,)):
209196
package = self._packages[nssn]
197+
198+
# Make the package's dependencies accessible to the package.
199+
for dep in package.manifest.dependencies.qpy:
200+
dep_nssn = PackageNamespaceAndShortName(dep.namespace, dep.short_name)
201+
dep_package = self._packages.get(dep_nssn)
202+
if not dep_package:
203+
err_msg = f"Unfulfilled dependency of '{nssn}': '{dep_nssn}'"
204+
raise RuntimeError(err_msg)
205+
206+
package.dependencies[dep_nssn] = dep_package
207+
210208
if package.state < PackageState.LOADED:
211209
package.load()
212210

questionpy_server/worker/runtime/messages.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from questionpy_common.environment import PackagePermissions, RequestInfo
1919
from questionpy_common.error import QPyBaseError
2020
from questionpy_common.manifest import Manifest
21+
from questionpy_server.dependencies import SolutionAndLocation
2122
from questionpy_server.worker.runtime.package_location import PackageLocation
2223

2324
messages_header_struct: Struct = Struct("=LL")
@@ -118,6 +119,9 @@ class LoadQPyPackage(MessageToWorker):
118119
main: bool
119120
"""Set this package as the main package and execute its entry point."""
120121

122+
dependencies: dict[PackageNamespaceAndShortName, SolutionAndLocation]
123+
"""All resolved dependencies in the root package's tree. Does not include the root package itself."""
124+
121125
class Response(MessageToServer):
122126
"""Success message in return to LoadQPyPackage."""
123127

questionpy_server/worker/runtime/package.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def init(self, env: Environment) -> None:
8181
"""
8282

8383
@abstractmethod
84-
def resolve_static_dependencies(self) -> list[PackageLocation]:
84+
def resolve_static_dependency(self, nssn: PackageNamespaceAndShortName) -> PackageLocation:
8585
pass
8686

8787

@@ -100,12 +100,33 @@ def __repr__(self) -> str:
100100

101101
__str__ = __repr__
102102

103-
def resolve_static_dependencies(self) -> list[PackageLocation]:
104-
return [
105-
DirPackageLocation(self.path / "dependencies" / "qpy" / dep.dir_name / DIST_DIR)
106-
for dep in self.manifest.dependencies.qpy
107-
if isinstance(dep, DistStaticQPyDependency)
108-
]
103+
def resolve_static_dependency(self, nssn: PackageNamespaceAndShortName) -> PackageLocation:
104+
dep = next(
105+
(
106+
dep
107+
for dep in self.manifest.dependencies.qpy
108+
if isinstance(dep, DistStaticQPyDependency)
109+
and dep.namespace == nssn.namespace
110+
and dep.short_name == nssn.short_name
111+
for dep in self.manifest.dependencies.qpy
112+
),
113+
None,
114+
)
115+
if not dep:
116+
msg = f"Package '{self.manifest.nssn}' does not provide static dependency '{nssn}'."
117+
raise RuntimeError(msg)
118+
119+
dep_dist_path = (
120+
self.path / "dependencies" / "qpy" / f"{dep.namespace}-{dep.short_name}-{dep.version}" / DIST_DIR
121+
)
122+
if not dep_dist_path.exists():
123+
msg = (
124+
f"Package '{self.manifest.nssn}' lists static dependency '{nssn}', but '{dep_dist_path}' is not "
125+
f"present."
126+
)
127+
raise RuntimeError(msg)
128+
129+
return DirPackageLocation(dep_dist_path)
109130

110131
def load(self) -> None:
111132
for new_path in (
@@ -166,8 +187,8 @@ def __repr__(self) -> str:
166187

167188
__str__ = __repr__
168189

169-
def resolve_static_dependencies(self) -> list[PackageLocation]:
170-
return []
190+
def resolve_static_dependency(self, nssn: PackageNamespaceAndShortName) -> PackageLocation:
191+
raise NotImplementedError
171192

172193

173194
def _package_dir(worker_home: Path, manifest: Manifest) -> Path:

0 commit comments

Comments
 (0)