Skip to content

Commit dd8fdec

Browse files
committed
feat: implemented R sessions api
1 parent 41c6b7c commit dd8fdec

File tree

4 files changed

+2265
-25
lines changed

4 files changed

+2265
-25
lines changed

datashield_opal/impl.py

Lines changed: 116 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from argparse import Namespace
66
from contextlib import suppress
77
from obiba_opal.core import OpalClient, UriBuilder, OpalRequest, OpalResponse, HTTPError
8-
from datashield.interface import DSLoginInfo, DSDriver, DSConnection, DSResult, DSError
8+
from datashield.interface import DSLoginInfo, DSDriver, DSConnection, DSResult, DSError, RSession
99

1010

1111
class OpalDSError(DSError):
@@ -23,15 +23,106 @@ def is_server_error(self) -> bool:
2323
return isinstance(self.exception, HTTPError) and self.exception.code >= 500
2424

2525

26+
class OpalRSession(RSession):
27+
def __init__(self, client: OpalClient, profile: str = None, restore: str = None, verbose: bool = False):
28+
self.client = client
29+
self.profile = profile
30+
self.restore = restore
31+
self.verbose = verbose
32+
self.id = None
33+
34+
def get_id(self) -> str:
35+
if self.id is None:
36+
self.start(False)
37+
return self.id
38+
39+
def start(self, asynchronous: bool = True) -> None:
40+
builder = UriBuilder(["datashield", "sessions"]).query("wait", not asynchronous)
41+
if self.profile is not None:
42+
builder.query("profile", self.profile)
43+
if self.restore is not None:
44+
builder.query("restore", self.restore)
45+
response = self._post(builder.build()).send()
46+
if response.code != 201:
47+
raise OpalDSError(ValueError(f"Failed to start R session: {response.code}"))
48+
session = response.from_json()
49+
if "id" not in session:
50+
raise OpalDSError(ValueError("Failed to start R session: no session id returned"))
51+
self.id = session["id"]
52+
53+
def is_ready(self) -> bool:
54+
if self.id is None:
55+
raise OpalDSError(ValueError("R session not started"))
56+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
57+
if response.code != 200:
58+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
59+
session = response.from_json()
60+
return session.get("state", "").lower() == "running"
61+
62+
def is_pending(self) -> bool:
63+
if self.id is None:
64+
raise OpalDSError(ValueError("R session not started"))
65+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
66+
if response.code != 200:
67+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
68+
session = response.from_json()
69+
return session.get("state", "").lower() == "pending"
70+
71+
def is_failed(self) -> bool:
72+
if self.id is None:
73+
raise OpalDSError(ValueError("R session not started"))
74+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
75+
if response.code != 200:
76+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
77+
session = response.from_json()
78+
return session.get("state", "").lower() == "failed"
79+
80+
def get_state_message(self) -> str:
81+
if self.id is None:
82+
raise OpalDSError(ValueError("R session not started"))
83+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
84+
if response.code != 200:
85+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
86+
session = response.from_json()
87+
events = session.get("events", [])
88+
if events:
89+
return events[-1]
90+
return "No recent events"
91+
92+
def close(self) -> None:
93+
if self.id is not None:
94+
builder = UriBuilder(["datashield", "session", self.id])
95+
self._delete(builder.build()).send()
96+
self.id = None
97+
98+
def _post(self, ws: str) -> OpalRequest:
99+
request = self.client.new_request()
100+
if self.verbose:
101+
request.verbose()
102+
return request.accept_json().post().resource(ws)
103+
104+
def _get(self, ws: str) -> OpalRequest:
105+
request = self.client.new_request()
106+
if self.verbose:
107+
request.verbose()
108+
return request.accept_json().get().resource(ws)
109+
110+
def _delete(self, ws: str) -> OpalRequest:
111+
request = self.client.new_request()
112+
if self.verbose:
113+
request.verbose()
114+
return request.accept_json().delete().resource(ws)
115+
116+
26117
class OpalConnection(DSConnection):
27118
def __init__(self, name: str, loginInfo: OpalClient.LoginInfo, profile: str = "default", restore: str = None):
28119
self.name = name
29120
self.client = OpalClient.build(loginInfo)
30121
self.subject = None
31122
self.profile = profile
32123
self.restore = restore
33-
self.session = None
34124
self.verbose = False
125+
self.rsession = None
35126

36127
#
37128
# Content listing
@@ -68,6 +159,25 @@ def has_resource(self, name: str) -> bool:
68159
response = self._get(UriBuilder(["project", parts[0], "resource", parts[1]]).build()).send()
69160
return response.code == 200
70161

162+
#
163+
# R Session (server side)
164+
#
165+
166+
def has_session(self) -> bool:
167+
return self.rsession is not None
168+
169+
def start_session(self, asynchronous: bool = True) -> RSession:
170+
if self.rsession is not None:
171+
return self.rsession
172+
self.rsession = OpalRSession(self.client, profile=self.profile, restore=self.restore, verbose=self.verbose)
173+
self.rsession.start(asynchronous=asynchronous)
174+
return self.rsession
175+
176+
def get_session(self) -> RSession:
177+
if self.rsession is None:
178+
raise OpalDSError(ValueError("No R session established. Please start a session first."))
179+
return self.rsession
180+
71181
#
72182
# Assign
73183
#
@@ -249,10 +359,8 @@ def disconnect(self) -> None:
249359
"""
250360
Close DataSHIELD session, and then Opal session.
251361
"""
252-
if self.session is not None:
253-
builder = UriBuilder(["datashield", "session", self._get_session_id()])
254-
self._delete(builder.build()).send()
255-
self.session = None
362+
if self.rsession is not None:
363+
self.rsession.close()
256364
self.client.close()
257365

258366
#
@@ -267,21 +375,8 @@ def _get_subject(self):
267375
return self.subject
268376

269377
def _get_session_id(self) -> str:
270-
return self._get_session()["id"]
271-
272-
def _get_session(self):
273-
if self.session is None:
274-
builder = UriBuilder(["datashield", "sessions"])
275-
if self.profile is not None:
276-
builder.query("profile", self.profile)
277-
if self.restore is not None:
278-
builder.query("restore", self.restore)
279-
response = self._post(builder.build()).send()
280-
if response.code == 201:
281-
self.session = response.from_json()
282-
else:
283-
raise OpalDSError(ValueError(f"DataSHIELD session creation failed: {response.code}"))
284-
return self.session
378+
self.start_session(asynchronous=False)
379+
return self.rsession.get_id()
285380

286381
def _get(self, ws) -> OpalRequest:
287382
request = self.client.new_request()

0 commit comments

Comments
 (0)