Skip to content

Commit a82067e

Browse files
committed
feat(hooks): adding in parallelize hook support
1 parent 4d52b49 commit a82067e

17 files changed

Lines changed: 461 additions & 60 deletions

.coveragerc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
[run]
22
omit =
3-
tests/*
3+
tests/*
4+
servc/svc/com/worker/__init__.py

.github/workflows/servc.yml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ jobs:
1515
- 3.13
1616

1717
test:
18-
- test_config
19-
- test_h
20-
- test_p
21-
- test_simple_metho
18+
- test_conf*
19+
- test_svc_c*
20+
- test_svc_h*
21+
- test_svc_pre*
22+
- test_svc_simple*
2223

2324
services:
2425
rabbitmq:
@@ -64,7 +65,7 @@ jobs:
6465
uses: actions/checkout@v4
6566
with:
6667
repository: serv-c/docs
67-
ref: 0.3.3
68+
ref: 0.4.0
6869
path: servc-docs
6970
sparse-checkout: |
7071
tests
@@ -83,4 +84,4 @@ jobs:
8384
START_SCRIPT: ${{ env.CURRENT_PATH }}/main.py
8485
CACHE_URL: redis://${{ github.server_url != 'https://github.com' && 'redis' || 'localhost' }}
8586
BUS_URL: amqp://guest:guest@${{ github.server_url != 'https://github.com' && 'rabbitmq' || 'localhost' }}
86-
run: python -m unittest tests/**/${{ matrix.test }}*.py
87+
run: python -m unittest tests/${{ matrix.test }}*.py

servc/svc/com/bus/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ def emitEvent(self, event: str, instanceId: str, details: Any) -> bool:
4545
},
4646
)
4747

48+
def create_queue(self, queue: str, bindEventExchange: bool = True) -> None:
49+
pass
50+
51+
def delete_queue(self, queue: str) -> None:
52+
pass
53+
54+
def get_queue_length(self, queue: str) -> int:
55+
return 0
56+
4857
def subscribe(
4958
self,
5059
route: str,

servc/svc/com/bus/rabbitmq.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def queue_declare(
2929

3030

3131
def on_channel_open(channel: pika.channel.Channel, method: Callable, args: Tuple):
32-
method(*args, channel)
32+
return method(*args, channel)
3333

3434

3535
class BusRabbitMQ(BusComponent):
@@ -56,7 +56,7 @@ def _connect(self, method=None | Callable, args=None | Tuple, blocking=True):
5656
if not self.isOpen:
5757
if blocking:
5858
self._conn = BlockingConnection(pika.URLParameters(self._url))
59-
self.get_channel(method, args)
59+
return self.get_channel(method, args)
6060
else:
6161
self._conn = AsyncioConnection(
6262
parameters=pika.URLParameters(self._url),
@@ -89,12 +89,51 @@ def get_channel(self, method: Callable | None, args: Tuple | None):
8989
elif method and args and self._conn:
9090
if self.isBlockingConnection():
9191
channel = self._conn.channel()
92-
on_channel_open(channel, method, args)
92+
return on_channel_open(channel, method, args)
9393
else:
9494
self._conn.channel(
9595
on_open_callback=lambda c: on_channel_open(c, method, args)
9696
)
9797

98+
def create_queue(self, queue: str, bindEventExchange: bool = False, channel: pika.channel.Channel | None = None) -> None: # type: ignore
99+
if not self.isReady:
100+
return self._connect(self.create_queue, (queue, bindEventExchange))
101+
if not channel:
102+
return self.get_channel(self.create_queue, (queue, bindEventExchange))
103+
104+
queue_declare(channel, self.getRoute(queue), bindEventExchange)
105+
channel.close()
106+
return True
107+
108+
def delete_queue(self, queue: str, channel: pika.channel.Channel | None = None) -> None: # type: ignore
109+
if not self.isReady:
110+
return self._connect(self.delete_queue, (queue,))
111+
if not channel:
112+
return self.get_channel(self.delete_queue, (queue,))
113+
114+
channel.queue_delete(queue=self.getRoute(queue))
115+
channel.close()
116+
return True
117+
118+
def get_queue_length(self, route, channel: pika.channel.Channel | None = None) -> int: # type: ignore
119+
if not self.isReady:
120+
return self._connect(self.get_queue_length, (route,))
121+
if not channel:
122+
return self.get_channel(self.get_queue_length, (route,))
123+
124+
try:
125+
queue = channel.queue_declare(
126+
queue=self.getRoute(route),
127+
passive=True,
128+
durable=True,
129+
exclusive=False,
130+
auto_delete=False,
131+
)
132+
channel.close()
133+
return queue.method.message_count
134+
except pika.exceptions.ChannelClosedByBroker:
135+
return 0
136+
98137
def publishMessage( # type: ignore
99138
self,
100139
route: str,

servc/svc/com/worker/__init__.py

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,15 @@
1-
from typing import Any, Callable, Dict, List, Union
1+
from typing import Any, List
22

33
from servc.svc import ComponentType, Middleware
4-
from servc.svc.client.send import sendMessage
54
from servc.svc.com.bus import BusComponent, OnConsuming
65
from servc.svc.com.cache import CacheComponent
6+
from servc.svc.com.worker.hooks import evaluate_post_hooks, evaluate_pre_hooks
7+
from servc.svc.com.worker.types import EMIT_EVENT, RESOLVER_MAPPING
78
from servc.svc.config import Config
8-
from servc.svc.idgen.simple import simple as idGenerator
9-
from servc.svc.io.input import ArgumentArtifact, InputPayload, InputType
9+
from servc.svc.io.input import InputType
1010
from servc.svc.io.output import StatusCode
1111
from servc.svc.io.response import getAnswerArtifact, getErrorArtifact
1212

13-
EMIT_EVENT = Callable[[str, Any], None]
14-
15-
RESOLVER = Callable[
16-
[str, BusComponent, CacheComponent, Any, List[Middleware], EMIT_EVENT],
17-
Union[StatusCode, Any, None],
18-
]
19-
20-
RESOLVER_MAPPING = Dict[str, RESOLVER]
21-
2213

2314
def HEALTHZ(
2415
_id: str, bus: BusComponent, cache: CacheComponent, _any: Any, c: List[Middleware]
@@ -116,34 +107,6 @@ def connect(self):
116107
def emitEvent(self, bus: BusComponent, eventName: str, details: Any):
117108
bus.emitEvent(eventName, self._instanceId, details)
118109

119-
def processPostHooks(
120-
self, bus: BusComponent, message: InputPayload, artifact: ArgumentArtifact
121-
):
122-
# print(artifact)
123-
if "hooks" in artifact and "on_complete" in artifact["hooks"]:
124-
for hook in artifact["hooks"]["on_complete"]:
125-
if hook["type"] == "sendmessage":
126-
try:
127-
payload: InputPayload = {
128-
"id": "",
129-
"type": InputType.INPUT.value,
130-
"route": hook["route"],
131-
"force": message["force"] if "force" in message else False,
132-
"argumentId": "",
133-
"argument": {
134-
"method": hook["method"],
135-
"inputs": {
136-
"id": message["id"],
137-
"method": artifact["method"],
138-
"inputs": artifact["inputs"],
139-
},
140-
},
141-
}
142-
sendMessage(payload, bus, self._cache, idGenerator)
143-
except Exception as e:
144-
print("Unable to process post hook", flush=True)
145-
print(e, flush=True)
146-
147110
def inputProcessor(self, message: Any) -> StatusCode:
148111
bus = self._busClass(
149112
self._config.get("conf.bus.url"),
@@ -214,6 +177,20 @@ def inputProcessor(self, message: Any) -> StatusCode:
214177
)
215178
return StatusCode.METHOD_NOT_FOUND
216179

180+
continueExecution = evaluate_pre_hooks(
181+
self._route,
182+
self._resolvers,
183+
bus,
184+
cache,
185+
message,
186+
artifact,
187+
self._children,
188+
emitEvent,
189+
)
190+
if not continueExecution:
191+
return StatusCode.OK
192+
193+
statusCode: StatusCode = StatusCode.OK
217194
try:
218195
response = self._resolvers[artifact["method"]](
219196
message["id"],
@@ -224,15 +201,16 @@ def inputProcessor(self, message: Any) -> StatusCode:
224201
emitEvent,
225202
)
226203
cache.setKey(message["id"], getAnswerArtifact(message["id"], response))
227-
self.processPostHooks(bus, message, artifact)
228-
return StatusCode.OK
204+
evaluate_post_hooks(bus, cache, message, artifact)
229205
except Exception as e:
230206
cache.setKey(
231207
message["id"],
232208
getErrorArtifact(message["id"], str(e), StatusCode.SERVER_ERROR),
233209
)
234-
self.processPostHooks(bus, message, artifact)
235-
return StatusCode.SERVER_ERROR
210+
statusCode = StatusCode.SERVER_ERROR
211+
finally:
212+
evaluate_post_hooks(bus, cache, message, artifact)
213+
return statusCode
236214

237215
cache.setKey(
238216
message["id"],
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from typing import List
2+
3+
from servc.svc import Middleware
4+
from servc.svc.com.bus import BusComponent
5+
from servc.svc.com.cache import CacheComponent
6+
from servc.svc.com.worker.hooks.oncomplete import process_complete_hook
7+
from servc.svc.com.worker.hooks.parallelize import (
8+
evaluate_part_pre_hook,
9+
process_post_part_hook,
10+
)
11+
from servc.svc.com.worker.types import EMIT_EVENT, RESOLVER_MAPPING
12+
from servc.svc.io.hooks import Hooks, OnCompleteHook, PartHook
13+
from servc.svc.io.input import ArgumentArtifact, InputPayload
14+
15+
16+
def evaluate_post_hooks(
17+
bus: BusComponent,
18+
cache: CacheComponent,
19+
message: InputPayload,
20+
artifact: ArgumentArtifact,
21+
) -> bool:
22+
if "hooks" not in artifact or not isinstance(artifact["hooks"], dict):
23+
return False
24+
hooks: Hooks = artifact["hooks"]
25+
26+
if "part" in hooks and isinstance(hooks["part"], dict):
27+
if not all(
28+
x in hooks["part"] for x in ("part_id", "total_parts", "part_queue")
29+
):
30+
return False
31+
partHook: PartHook = hooks["part"]
32+
33+
completed = process_post_part_hook(bus, cache, message, artifact, partHook)
34+
if not completed:
35+
return True
36+
37+
if "on_complete" in hooks and isinstance(hooks["on_complete"], list):
38+
for hook in hooks["on_complete"]:
39+
if not all(x in hook for x in ("type", "route", "method")):
40+
return False
41+
completeHook: OnCompleteHook = hook
42+
process_complete_hook(bus, cache, message, artifact, completeHook)
43+
44+
return True
45+
46+
47+
def evaluate_pre_hooks(
48+
route: str,
49+
resolvers: RESOLVER_MAPPING,
50+
bus: BusComponent,
51+
cache: CacheComponent,
52+
message: InputPayload,
53+
artifact: ArgumentArtifact,
54+
children: List[Middleware],
55+
emit: EMIT_EVENT,
56+
) -> bool:
57+
hooks: Hooks = {}
58+
if "hooks" in artifact and isinstance(artifact["hooks"], dict):
59+
hooks: Hooks = artifact["hooks"]
60+
if "part" in hooks:
61+
return True
62+
63+
for prehook in (evaluate_part_pre_hook,):
64+
continueExecution = prehook(
65+
route, resolvers, bus, cache, message, artifact, children, emit
66+
)
67+
if not continueExecution:
68+
return False
69+
70+
return True
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from servc.svc.client.send import sendMessage
2+
from servc.svc.com.bus import BusComponent
3+
from servc.svc.com.cache import CacheComponent
4+
from servc.svc.idgen.simple import simple as idGenerator
5+
from servc.svc.io.hooks import OnCompleteHook
6+
from servc.svc.io.input import ArgumentArtifact, InputPayload, InputType
7+
8+
9+
def process_complete_hook(
10+
bus: BusComponent,
11+
cache: CacheComponent,
12+
message: InputPayload,
13+
artifact: ArgumentArtifact,
14+
hook: OnCompleteHook,
15+
) -> bool:
16+
input = (
17+
hook["inputs"] if "inputs" in hook and isinstance(hook["inputs"], dict) else {}
18+
)
19+
20+
payload: InputPayload = {
21+
"id": "",
22+
"type": InputType.INPUT.value,
23+
"route": hook["route"],
24+
"force": message["force"] if "force" in message else False,
25+
"argumentId": "",
26+
"argument": {
27+
"method": hook["method"],
28+
"inputs": {
29+
"id": input.get("id", message["id"]),
30+
"method": input.get("method", artifact["method"]),
31+
"inputs": input.get("inputs", artifact["inputs"]),
32+
},
33+
},
34+
}
35+
sendMessage(payload, bus, cache, idGenerator)
36+
37+
return True

0 commit comments

Comments
 (0)