Skip to content

Commit ee7022a

Browse files
authored
Merge pull request #38 from Distributive-Network/bugfix/serialization-passes-dcp-symbols
Improve handling of job input and arguments
2 parents 83cd44a + db5cd33 commit ee7022a

3 files changed

Lines changed: 77 additions & 49 deletions

File tree

dcp/api/compute_for.py

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@ def compute_for_maker(Job):
1616
def compute_for(*args, **kwargs):
1717
args = list(args)
1818

19-
for i, arg in enumerate(args):
20-
if isinstance(arg, FunctionType):
21-
args[i] = dill.source.getsource(arg)
22-
23-
2419
# Hide values from PythonMonkey which aren't supported
2520
# TODO: This is bad for a number of reasons:
2621
####################################################
@@ -32,34 +27,63 @@ def compute_for(*args, **kwargs):
3227
job_input_idx = None
3328
job_args_idx = None
3429

35-
# compute.for(start, end, step, work, args)
36-
if len(args) == 5:
37-
job_args_idx = 4
38-
39-
# compute.for(iterableObject, work, args)
40-
elif len(args) <= 3:
41-
job_input_idx = 0
42-
43-
if len(args) == 3:
44-
job_args_idx = 2
45-
46-
# clean up job input for PythonMonkey
30+
for i, arg in enumerate(args):
31+
if isinstance(arg, FunctionType) or isinstance(arg, str):
32+
# work function arg separates input from arguments, find indices to hide based on it
33+
if i == 1: # compute.for(iterableObject, work, args), need to wrap iterable
34+
job_input_idx = 0
35+
if i < len(args) - 1: # work function isn't last argument, so last value is args in compute.for
36+
job_args_idx = len(args) - 1
37+
if isinstance(arg, FunctionType):
38+
args[i] = dill.source.getsource(arg)
39+
40+
# Process for ensuring symbols aren't mutated in the python -> js layer:
41+
# 1. Check if symbol is coming from a dcp module/class. If so, set it as the js_ref. Skip next steps.
42+
# 2. Determine if input array can be mutated, or create new array for input set
43+
# 3. For each input element, dereference js_ref if from dcp-client, add a guard if pythonmonkey will mutate it, else as it as-is.
4744
if job_input_idx != None:
48-
if js.utils.instanceof(getattr(args[job_input_idx], "js_ref", None), pm.eval("globalThis.dcp.compute.RemoteDataSet")):
49-
args[job_input_idx] = args[job_input_idx].js_ref
50-
elif hasattr(args[job_input_idx], '__setitem__'):
51-
for i, val in enumerate(args[job_input_idx]): #TODO don't enumerate each time... perhaps wrap in iterator
52-
if js.utils.throws_or_coerced_in_pm(val):
53-
args[job_input_idx][i] = { '__pythonmonkey_guard': val }
54-
55-
# clean up job args for PythonMonkey
45+
if hasattr(args[job_input_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_input_idx].js_ref):
46+
args[job_input_idx] = args[job_input_idx]
47+
else:
48+
try:
49+
tmp = args[job_input_idx][0]
50+
args[job_input_idx][0] = { 'arbitrary-input-test': True }
51+
args[job_input_idx][0] = tmp
52+
53+
newArr = args[job_input_idx]
54+
except (ValueError, TypeError, IndexError):
55+
newArr = [ 'placeholder' for i in range(len(args[job_input_idx]))]
56+
57+
for i, val in enumerate(args[job_input_idx]):
58+
if hasattr(val, 'js_ref') and dry.class_manager.reg.find_from_js_instance(val.js_ref):
59+
newArr[i] = val
60+
elif js.utils.throws_or_coerced_in_pm(val):
61+
newArr[i] = { '__pythonmonkey_guard': val }
62+
else:
63+
newArr[i] = val
64+
args[job_input_idx] = newArr
65+
5666
if job_args_idx != None:
57-
if js.utils.instanceof(getattr(args[job_args_idx], "js_ref", None), pm.eval("globalThis.dcp.compute.RemoteDataSet")):
58-
args[job_args_idx] = args[job_args_idx].js_ref
59-
elif hasattr(args[job_args_idx], '__setitem__'):
67+
if hasattr(args[job_args_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_args_idx].js_ref):
68+
args[job_args_idx] = args[job_args_idx]
69+
else:
70+
try:
71+
tmp = args[job_args_idx][0]
72+
args[job_args_idx][0] = { 'arbitrary-input-test': True }
73+
args[job_args_idx][0] = tmp
74+
75+
newArr = args[job_args_idx]
76+
except ValueError as e:
77+
newArr = [ 'placeholder' for i in range(len(args[job_args_idx]))]
78+
6079
for i, val in enumerate(args[job_args_idx]):
61-
if js.utils.throws_or_coerced_in_pm(val):
62-
args[job_args_idx][i] = { '__pythonmonkey_guard': val }
80+
if hasattr(val, 'js_ref') and dry.class_manager.reg.find_from_js_instance(val.js_ref):
81+
newArr[i] = val
82+
elif js.utils.throws_or_coerced_in_pm(val):
83+
newArr[i] = { '__pythonmonkey_guard': val }
84+
else:
85+
newArr[i] = val
86+
args[job_args_idx] = newArr
6387

6488
####################################################
6589

@@ -82,10 +106,6 @@ def compute_for(*args, **kwargs):
82106
})
83107
""")
84108

85-
if len(args) <= 3 and not js.utils.instanceof(args[job_input_idx], pm.eval("globalThis.dcp.compute.RemoteDataSet")):
86-
if isinstance(args[0], Iterable):
87-
args[0] = pm.new(JSIterator)(iter(args[0]))#(IterableWrapper(args[0]))
88-
89109
compute_for_js = pm.eval("globalThis.dcp.compute.for")
90110
job_js = dry.aio.blockify(compute_for_js)(*args, **kwargs)
91111
return Job(job_js)

dcp/api/job.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,39 +78,44 @@ def _before_exec(self, *args, **kwargs):
7878
serialized_input_data = []
7979
if len(self.serializers):
8080
validate_serializers(self.serializers)
81-
82-
super_range_object = pm.eval("globalThis.dcp['range-object'].SuperRangeObject")
83-
if utils.instanceof(self.js_ref.jobInputData, pm.eval("globalThis.dcp.compute.RemoteDataSet")):
84-
serialized_input_data = self.js_ref.jobInputData
81+
if hasattr(self.js_ref.jobInputData, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobInputData.js_ref):
82+
serialized_input_data = self.js_ref.jobInputData.js_ref
8583
elif isinstance(self.js_ref.jobInputData, list) or utils.instanceof(self.js_ref.jobInputData, pm.globalThis.Array):
8684
for input_slice in self.js_ref.jobInputData:
8785
# TODO - find better solution
8886
# un-hide values from PythonMonkey which aren't supported
8987
if isinstance(input_slice, dict) and '__pythonmonkey_guard' in input_slice:
9088
input_slice = input_slice['__pythonmonkey_guard']
9189

92-
serialized_slice = serialize(input_slice, self.serializers)
93-
serialized_input_data.append(serialized_slice)
94-
elif isinstance(self.js_ref.jobInputData, Iterator) and not utils.instanceof(self.js_ref.jobInputData, super_range_object):
95-
serialized_input_data = serialize(self.js_ref.jobInputData, self.serializers)
90+
# only serialize non-dcp values
91+
if hasattr(input_slice, 'js_ref') and dry.class_manager.reg.find_from_js_instance(input_slice.js_ref):
92+
serialized_input_data.append(input_slice.js_ref)
93+
else:
94+
serialized_slice = serialize(input_slice, self.serializers)
95+
serialized_input_data.append(serialized_slice)
9696
else:
9797
serialized_input_data = self.js_ref.jobInputData
98-
99-
if utils.instanceof(self.js_ref.jobArguments, pm.eval("globalThis.dcp.compute.RemoteDataSet")):
100-
convertToURL = pm.eval('(urlString) => new URL(urlString)')
101-
self.js_ref.jobArguments.forEach(lambda argument: serialized_arguments.append(convertToURL(argument)))
98+
if hasattr(self.js_ref.jobArguments, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobArguments.js_ref):
99+
serialized_arguments = self.js_ref.jobArguments.js_ref
100+
# if utils.instanceof(self.js_ref.jobArguments, pm.eval("globalThis.dcp.compute.RemoteDataSet")):
101+
# convertToURL = pm.eval('(urlString) => new URL(urlString)')
102+
# self.js_ref.jobArguments.forEach(lambda argument: serialized_arguments.append(convertToURL(argument)))
102103
else:
103104
for argument in self.js_ref.jobArguments:
104105
# TODO - find better solution
105106
# un-hide values from PythonMonkey which aren't supported
106107
if isinstance(argument, dict) and '__pythonmonkey_guard' in argument:
107108
argument = argument['__pythonmonkey_guard']
108-
if utils.instanceof(argument, pm.eval("URL")):
109+
if utils.instanceof(argument, pm.eval("URL")): # Still needed?
109110
serialized_arguments.append(argument)
110111
continue
111112

112-
serialized_argument = serialize(argument, self.serializers)
113-
serialized_arguments.append(serialized_argument)
113+
# only serialize non-dcp values
114+
if hasattr(argument, 'js_ref') and dry.class_manager.reg.find_from_js_instance(argument.js_ref):
115+
serialized_arguments.append(argument.js_ref)
116+
else:
117+
serialized_argument = serialize(argument, self.serializers)
118+
serialized_arguments.append(serialized_argument)
114119

115120
serialized_serializers = convert_serializers_to_arguments(self.serializers)
116121
meta_arguments.append(serialized_serializers)

dcp/dry/class_registry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ def cmp(c):
3737
class_cname = js.utils.class_name(c.get_js_class())
3838

3939
return js_inst_is_instance and instance_cname == class_cname
40+
41+
if js_inst is None:
42+
return None
4043
return self._find(cmp)
4144

4245
def find_from_name(self, name):

0 commit comments

Comments
 (0)