-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcodegen.py
More file actions
386 lines (321 loc) · 15.8 KB
/
codegen.py
File metadata and controls
386 lines (321 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
import os
import re
import glob
type_map = {
# TYPE |code type |method type |is heap |default value
"b": ["int8_t ", "char", False, "0", ],
"B": ["uint8_t ", "byte", False, "0", ],
"h": ["int16_t ", "short", False, "0", ],
"H": ["uint16_t ", "ushort", False, "0", ],
"i": ["int32_t ", "int", False, "0", ],
"I": ["uint32_t ", "uint", False, "0", ],
"l": ["int64_t ", "long", False, "0", ],
"L": ["uint64_t ", "ulong", False, "0", ],
"f": ["float ", "float", False, "0", ],
"d": ["double ", "double", False, "0", ],
"?": ["bool ", "bool", False, "false", ],
"v": ["int32_t ", "varint", False, "0", ],
"s": ["char *", "string", True, "NULL", ],
"p": ["cmc_block_pos ", "position", False, "{.x=0,.y=0,.z=0}", ],
"n": ["cmc_nbt *", "nbt", True, "NULL", ],
"a": ["cmc_buff *", "buff", True, "NULL", ],
"S": ["cmc_slot *", "slot", True, "NULL", ],
"m": ["cmc_entity_metadata ", "entity_metadata", True, "{.size=0,.entries=NULL}", ],
"u": ["cmc_uuid ", "uuid", False, "{.lower=0,.upper=0}", ],
"A": ["cmc_array ", None, True, "{.data=NULL,.size=0}", ],
}
def type_c(f): return type_map[f['type']][0]
def type_func_name(f): return type_map[f['type']][1]
def is_heap(f): return type_map[f['type']][2]
types_with_extra_data = "A"
replacement_paths = ["src/*.c", "include/cmc/*.h"]
def split_array_exp(inp):
name = inp[:inp.find("[")]
inp = inp[inp.find("[")+1:]
deepness = 0
for i, c in enumerate(inp):
if c == "[":
deepness += 1
if c == "]":
deepness -= 1
if deepness == -1:
break
return name[1:], inp[:i], inp[i+1:]
def careful_split(exp):
out = []
curr_str = ""
deepness = 0
for c in exp:
if c == ";" and deepness == 0:
out.append(curr_str)
curr_str = ""
else:
curr_str += c
if c == "[":
deepness += 1
elif c == "]":
deepness -= 1
if deepness < 0:
raise ValueError("closed nonexistent array")
if curr_str:
out.append(curr_str)
return out
def get_absolute_paths(file_paths):
absolute_paths = []
for file_path in file_paths:
matches = glob.glob(file_path)
for match in matches:
absolute_path = os.path.abspath(match)
absolute_paths.append(absolute_path)
return absolute_paths
def replace_code_segments(replacement_code, tag):
start_tag = f"// CGSS: {tag}" # Code Generator Segment Start
end_tag = f"// CGSE: {tag}" # Code Generator Segment End
pattern = re.compile(
f"({re.escape(start_tag)}).*?({re.escape(end_tag)})", re.DOTALL
)
for file_path in get_absolute_paths(replacement_paths):
with open(file_path, "r") as file:
file_content = file.read()
if not pattern.search(file_content):
continue
file_content = pattern.sub(rf"\1\n{replacement_code}\n\2", file_content)
with open(file_path, "w") as file:
file.write(file_content)
return
raise ValueError(f"didnt find tag {tag}")
def replace_with_functions(tag, function, packets):
replace_code_segments("".join([function(packet) for packet in packets]), tag)
#print("".join([function(packet) for packet in packets]))
def replace_c_functions(tag, function, packets):
headers = ""
code = ""
for packet in packets:
generated_code = function(packet)
if not generated_code[0]:
continue
headers += f"{generated_code[1]};"
code += f"{generated_code[1]} {{{generated_code[0]}}}"
#print(code)
replace_code_segments(code, tag + "_c")
replace_code_segments(headers, tag + "_h")
def parse_exp_fields(fields):
out = []
for field_str in careful_split(fields):
field = {}
field["type"] = field_str[0]
if field["type"] in types_with_extra_data:
field["name"], array_expression, field["size_key"] = split_array_exp(field_str)
field["content"] = parse_exp_fields(array_expression)
else:
field["name"] = field_str[1:]
out.append(field)
return out
def gather_packets():
out = []
for fname in os.listdir("packets"):
with open("packets/" + fname, "r") as f:
vid = int(fname.split(".")[0])
for exp in f.read().replace(" ", "").split("\n"):
if exp.find("#") != -1:
exp = exp[:exp.find("#")]
if not exp:
continue
info = {}
name, packet_id, fields = exp.split(";", maxsplit=2)
packet_id = int(packet_id, base=16)
info["direction"], info["state"], *_ = name.split("_")
info["name"] = name
info["name_and_version"] = f"{name}_{vid}"
info["type"] = f"cmc_packet_{name}_{vid}"
info["version"] = vid
info["packet_id"] = packet_id
info["is_empty"] = not bool(fields) # dont generate type then and also disregard all would be needed methods
info["content"] = parse_exp_fields(fields)
info["needs_free"] = any([is_heap(f) for f in info["content"]]) # ohhh the python magic
out.append(info) # i dont know if its ugly
return out
"""
struct S2C_handshake_handshake {
type
};
"""
def type_def(packet):
code = ""
if packet["is_empty"]:
return code
def content(fields, struct_name):
nonlocal code
code_block = ""
code_block += f"struct {struct_name} {{"
for field in fields:
if field["type"] == "A":
content(field["content"], struct_name + "_" + field["name"])
code_block += f"{struct_name + "_" + field["name"]} *{field["name"]};"
else:
code_block += f"{type_c(field)}{field['name']};"
code_block += "};"
code_block += f"typedef struct {struct_name} {struct_name};"
code += code_block
content(packet["content"], f"cmc_packet_{packet['name']}_{packet['version']}")
return code
def packet_id_enums(packets):
states = {}
for packet in packets:
concated_state = f"{packet["direction"]}_{packet["state"]}_{packet["version"]}"
if concated_state not in states:
states[concated_state] = []
states[concated_state].append(packet)
code = ""
for state in states:
state_name = "cmc_packet_ids_state_" + state
code += f"enum {state_name} {{"
for packet in states[state]:
code += f"{packet["type"]}_id = {packet["packet_id"]},"
code += f"}}; typedef enum {state_name} {state_name};"
return code
def unpack_packet(packet):
iterator_decls = ""
err_handling = [] # we populate this whenever a thing is created that needs to be cleaned up
# and we append in front so it all runs reversed
on_err = ""
def add_failiure_point(lable, code):
nonlocal err_handling, on_err
err_handling.insert(0, f"{f"{lable}:" if lable else ""} {code}")
if lable:
on_err = f"goto {lable};"
code = ""
header = ""
if packet["is_empty"]:
return (code, header)
header = f"{packet["type"]} {packet["type"]}_unpack(cmc_buff *buff)"
code += f"{packet["type"]} packet = {{}};"
add_failiure_point("err", f"return ({packet["type"]}){{}};")
iterator_char = 'h' # before i
def unpack_code(unpack_to, content, lable_prefixs):
nonlocal code, iterator_char, iterator_decls, err_handling, on_err
for field in content:
if field["type"] == "A":
iterator_char = chr(ord(iterator_char) + 1)
code += f"if({unpack_to}{field["size_key"]} > 0) {{"
code += f"{unpack_to}{field["name"]} = CMC_ERRB_ABLE(cmc_malloc_packet_array(buff, {unpack_to}{field["size_key"]} * sizeof(*{unpack_to}{field["name"]})), {on_err});"
add_failiure_point(None, "}")
add_failiure_point(lable_prefixs + "_" + field["name"], f"free({unpack_to}{field["name"]});")
add_failiure_point(None, "}")
code += f"for({iterator_char} = 0; {iterator_char} < {unpack_to}{field["size_key"]}; ++{iterator_char}) {{"
on_err = f"goto {lable_prefixs + "_" + field["name"] + "_forloop"}"
iterator_decls += f"int {iterator_char} = 0;"
unpack_code(f"{unpack_to}{field["name"]}[{iterator_char}].", field["content"], lable_prefixs + "_" + field["name"])
add_failiure_point(None, f"for(;{iterator_char}>0;--{iterator_char}) {{")
iterator_char = chr(ord(iterator_char) - 1)
code += "}"
add_failiure_point(None, f"{lable_prefixs}_{field["name"]}_forloop:if({unpack_to}{field["size_key"]} > 0) {{")
code += "}"
else:
code += f"{unpack_to}{field["name"]} = CMC_ERRB_ABLE(cmc_buff_unpack_{type_func_name(field)}(buff), {on_err});"
if is_heap(field):
add_failiure_point(lable_prefixs + "_" + field["name"],
f"cmc_{type_func_name(field)}_free({unpack_to}{field["name"]});")
unpack_code("packet.", packet["content"], "err")
code += f"CMC_ERRB_IF(buff->position != buff->length, CMC_ERR_BUFF_UNDERFLOW, {on_err});return packet;"
code = iterator_decls + code + "".join(err_handling)
return (code, header)
def free_packet(packet):
# i love it no labels not error handling
def free_content(content, to_free, iterator_char):
code = ""
for field in content:
if is_heap(field):
if field["type"] == "A":
iterator_char = chr(ord(iterator_char) + 1)
maybe_code = free_content(field["content"], f"{to_free}{field["name"]}[{iterator_char}].", iterator_char)
code += f"if({to_free}{field["size_key"]} > 0) {{"
if maybe_code:
code += f"for(int32_t {iterator_char} = 0;{iterator_char}<{to_free}{field["size_key"]};++{iterator_char}) {{"
code += maybe_code
code += "}"
code += f"free({to_free}{field["name"]});"
code += "}"
iterator_char = chr(ord(iterator_char) - 1)
else:
code += f"cmc_{type_func_name(field)}_free({to_free}{field["name"]});"
return code
return (free_content(packet["content"], "packet->", "h"), f"void {packet["type"]}_free({packet["type"]} *packet)")
def pack_packet(packet):
def pack_content(content, to_pack, iterator_char):
code = ""
for field in content:
if field["type"] == "A":
code += f"for(int32_t {iterator_char} = 0; {iterator_char}<{to_pack}{field["size_key"]}; ++{iterator_char}) {{"
code += pack_content(field["content"], f"{to_pack}{field["name"]}[{iterator_char}].", chr(ord(iterator_char) + 1))
code += "}"
else:
code += f"cmc_buff_pack_{type_func_name(field)}(buff, {to_pack}{field["name"]});"
return code
pack_packet_id = f"cmc_buff_pack_varint(buff, {packet["type"]}_id);"
maybe_pack_code = pack_content(packet["content"], "packet->", "i")
return (pack_packet_id + maybe_pack_code, f"void {packet["type"]}_pack(cmc_buff *buff{f", {packet['type']} *packet" if maybe_pack_code else ""})")
def unpack_packet2(packet):
used_labels = set()
iterator_def = ""
if packet["is_empty"]:
return ("", "")
def unpack_packet_content(content, lable_prefix, pvnp, iterator_char): # pvr short for packet_variable_name prefix
nonlocal used_labels, iterator_def
def l(lable): # short for lable
used_labels.add(lable_prefix + lable)
return lable_prefix + lable
c = ""
for field in content:
if field["type"] == "A":
iterator_char = chr(ord(iterator_char)+1)
iterator_def += f"size_t {iterator_char} = 0;"
c += f"""
if({pvnp + field["size_key"]} > 0) {{
{pvnp+field["name"]} = CMC_ERRB_ABLE(cmc_malloc_packet_array(buff, {pvnp + field["size_key"]} * sizeof(*{pvnp+field["name"]})), goto {l(field["name"])};);
for(;{iterator_char} < {pvnp + field["size_key"]};
++{iterator_char}) {{"""
c += unpack_packet_content(field["content"], lable_prefix+field["name"]+"_", pvnp+field["name"]+f"[{iterator_char}].", iterator_char)
c += "}}"
iterator_char = chr(ord(iterator_char)-1)
else:
c += f"""{pvnp + field["name"]} = CMC_ERRB_ABLE(
cmc_buff_unpack_{type_func_name(field)}(buff);,
goto {l(field["name"])};);"""
return c
def unpack_error_code(content, pvnp, lable_prefix, iterator_char):
nonlocal used_labels
c = ""
def l(lable):
nonlocal c
c += lable_prefix+lable+":" if lable_prefix+lable in used_labels else ""
for field in content[::-1]:
if field["type"] == "A":
iterator_char = chr(ord(iterator_char)+1)
c += f"if({pvnp + field["size_key"]} > 0) {{"
c += unpack_error_code(field["content"], lable_prefix + field["name"] + "_", pvnp+field["name"]+"["+iterator_char+"].", iterator_char)
c += f"cmc_free({pvnp}{field["name"]});"
l(field["name"])
c += "}"
iterator_char = chr(ord(iterator_char)-1)
else:
if is_heap(field):
c += f"cmc_{type_func_name(field)}_free({pvnp+field["name"]});"
l(field["name"])
return c
define_packet = f"{packet["type"]} packet = {{0}};"
unpack_code = unpack_packet_content(packet["content"], "err_", "packet.", "h")
return_packet = "return packet;"
free_code = unpack_error_code(packet["content"], "packet.", "err_", "h")
return_packet_err = f"return ({packet["type"]}){{0}};"
code = define_packet + iterator_def + unpack_code + return_packet + free_code + return_packet_err
return (code, f"{packet["type"]} {packet["type"]}_unpack(cmc_buff *buff)")
def main():
packets = gather_packets()
replace_with_functions("packet_types", type_def, packets)
replace_code_segments(packet_id_enums(packets), "packet_ids")
replace_c_functions("unpack_methods", unpack_packet2, packets)
replace_c_functions("free_methods", free_packet, packets)
replace_c_functions("pack_methods", pack_packet, packets)
if __name__ == "__main__":
main()