This repository was archived by the owner on Sep 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathconverter.py
More file actions
73 lines (57 loc) · 2.34 KB
/
converter.py
File metadata and controls
73 lines (57 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import inspect
from typing import Union, List, Iterable, Callable
from temporal.api.common.v1 import Payload, Payloads
from temporal.conversions import ENCODINGS, METADATA_ENCODING_KEY, DECODINGS
def get_fn_args_type_hints(fn: Callable) -> List[type]:
spec = inspect.getfullargspec(fn)
args = []
for s in spec.args:
args.append(spec.annotations.get(s))
if inspect.ismethod(fn):
# Remove "self
args.pop(0)
return args
def get_fn_ret_type_hints(fn: Callable) -> List[type]:
spec = inspect.getfullargspec(fn)
return [spec.annotations.get("return")]
class DataConverter:
def to_payload(self, arg: object) -> Payload:
raise NotImplementedError()
def from_payload(self, payload: Payload, type_hint: type = None) -> object:
raise NotImplementedError()
def to_payloads(self, args: Union[object, List[object]]) -> Payloads:
payloads: Payloads = Payloads()
payloads.payloads = []
if isinstance(args, (str, bytes)) or not isinstance(args, Iterable):
args = [args]
for arg in args:
payloads.payloads.append(self.to_payload(arg))
return payloads
def from_payloads(self, payloads: Payloads, type_hints: List[type] = []) -> List[object]:
args: List[object] = []
if payloads is None:
return [None]
for i, payload in enumerate(payloads.payloads):
try:
type_hint = type_hints[i]
except IndexError as ex:
type_hint = None
args.append(self.from_payload(payload, type_hint))
return args
@staticmethod
def get_default():
return DefaultDataConverter()
class DefaultDataConverter(DataConverter):
def to_payload(self, arg: object) -> Payload:
for fn in ENCODINGS:
payload = fn(arg)
if payload is not None:
return payload
raise Exception(f"Object cannot be encoded: {arg}")
def from_payload(self, payload: Payload, type_hint: type = None) -> object:
encoding: bytes = payload.metadata[METADATA_ENCODING_KEY]
decoding = DECODINGS.get(encoding)
if not decoding:
raise Exception(f"Unsupported encoding: {str(encoding, 'utf-8')}")
return decoding(payload, type_hint)
DEFAULT_DATA_CONVERTER_INSTANCE = DefaultDataConverter()