-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathcopy_with_port_portname.py
More file actions
183 lines (153 loc) · 7.43 KB
/
copy_with_port_portname.py
File metadata and controls
183 lines (153 loc) · 7.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# copythefile.py
import sys
import os
import subprocess
import logging
import json
def _normalize_output_relpath(template_script_path, output_relpath=None):
if output_relpath:
relpath = output_relpath.replace("\\", "/").lstrip("/")
else:
relpath = os.path.basename(template_script_path)
if not relpath:
raise ValueError("Output relative path cannot be empty.")
return relpath
def _join_output_path(output_dir, output_relpath):
return os.path.join(output_dir, *output_relpath.split("/"))
def run_specialization_script(
template_script_path,
output_dir,
edge_params_list,
python_exe,
copy_script_path,
output_relpath=None
):
"""
Calls the copy script to generate a specialized version of a node's script.
Returns the basename of the generated script on success, None on failure.
"""
base_template_name = os.path.basename(template_script_path)
output_relpath = _normalize_output_relpath(template_script_path, output_relpath)
expected_output_path = _join_output_path(output_dir, output_relpath)
# If the specialized file already exists, we don't need to regenerate it.
if os.path.exists(expected_output_path):
logging.info(f"Specialized script '{expected_output_path}' already exists. Using existing.")
return output_relpath
# Convert the list of parameters to a JSON string for command line argument
edge_params_json_str = json.dumps(edge_params_list)
cmd = [
python_exe,
copy_script_path,
template_script_path,
output_dir,
edge_params_json_str # Pass the JSON string as the last argument
]
if output_relpath:
cmd.append(output_relpath)
logging.info(f"Running specialization for '{base_template_name}': {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8')
logging.info(f"Successfully generated specialized script '{output_relpath}'.")
if result.stdout: logging.debug(f"copy_with_port_portname.py stdout:\n{result.stdout.strip()}")
if result.stderr: logging.warning(f"copy_with_port_portname.py stderr:\n{result.stderr.strip()}")
return output_relpath
except subprocess.CalledProcessError as e:
logging.error(f"Error calling specialization script for '{template_script_path}':")
logging.error(f"Command: {' '.join(e.cmd)}")
logging.error(f"Return code: {e.returncode}")
logging.error(f"Stdout: {e.stdout.strip()}")
logging.error(f"Stderr: {e.stderr.strip()}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred while trying to run specialization script: {e}")
return None
def create_modified_script(template_script_path, output_dir, edge_params_json_str, output_relpath=None):
"""
Creates a modified Python script by injecting ZMQ port and port name
definitions from a JSON object.
Args:
template_script_path (str): The path to the source template script.
output_dir (str): The directory to save the new script in.
edge_params_json_str (str): A JSON string representing a list of
edge parameter dictionaries.
"""
try:
with open(template_script_path, 'r') as f:
lines = f.readlines()
except FileNotFoundError:
print(f"Error: Template script '{template_script_path}' not found.")
sys.exit(1)
except Exception as e:
print(f"Error reading template script '{template_script_path}': {e}")
sys.exit(1)
try:
edge_params_list = json.loads(edge_params_json_str)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON string provided for edge parameters: {e}")
print(f"Received: {edge_params_json_str}")
sys.exit(1)
# --- Generate definitions from the list of edge parameters ---
definitions = ['\n# --- ZMQ Port and Name Definitions (Auto-generated by mkconcore.py) ---\n']
print(f"Generating definitions for {len(edge_params_list)} edge(s):")
for params in edge_params_list:
port = params.get("port")
port_name = params.get("port_name")
source_label = params.get("source_node_label", "UNKNOWN_SOURCE")
target_label = params.get("target_node_label", "UNKNOWN_TARGET")
# Sanitize labels to be valid Python variable parts
safe_source = "".join(c if c.isalnum() else '_' for c in source_label)
safe_target = "".join(c if c.isalnum() else '_' for c in target_label)
# Create unique variable names
port_var_name = f"PORT_{safe_source}_{safe_target}"
port_name_var_name = f"PORT_NAME_{safe_source}_{safe_target}"
definitions.append(f'{port_name_var_name} = "{port_name}"\n')
definitions.append(f'{port_var_name} = "{port}"\n')
print(f" - {port_name_var_name} = \"{port_name}\"")
print(f" - {port_var_name} = \"{port}\"")
definitions.append('# --- End of Auto-generated Definitions ---\n\n')
# --- Insert definitions into the script ---
insert_index = 0
for i, line in enumerate(lines):
stripped_line = line.strip()
# Find the last import statement to insert after
if stripped_line.startswith('import ') or stripped_line.startswith('from '):
insert_index = i + 1
# Stop searching after the first non-import, non-comment line after imports are found
elif insert_index > 0 and stripped_line and not stripped_line.startswith('#'):
break
# Handle case where script starts with shebang
if insert_index == 0 and lines and lines[0].startswith('#!'):
insert_index = 1
modified_lines = lines[:insert_index] + definitions + lines[insert_index:]
# --- Determine and create output file ---
output_relpath = _normalize_output_relpath(template_script_path, output_relpath)
output_script_path = _join_output_path(output_dir, output_relpath)
try:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"Created output directory: {output_dir}")
output_parent = os.path.dirname(output_script_path)
if output_parent and not os.path.exists(output_parent):
os.makedirs(output_parent, exist_ok=True)
with open(output_script_path, 'w') as f:
f.writelines(modified_lines)
print(f"Successfully created specialized script: '{output_script_path}'")
except Exception as e:
print(f"Error writing output script '{output_script_path}': {e}")
sys.exit(1)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
if len(sys.argv) not in [4, 5]:
print("\nUsage: python3 copy_with_port_portname.py <TEMPLATE_SCRIPT_PATH> <OUTPUT_DIRECTORY> '<JSON_PARAMETERS>' [OUTPUT_RELATIVE_PATH]\n")
print("Example JSON: '[{\"port\": \"2355\", \"port_name\": \"FUNBODY_REP_1\", \"source_node_label\": \"nodeA\", \"target_node_label\": \"nodeB\"}]'")
print("Note: The JSON string must be enclosed in single quotes in shell.\n")
sys.exit(1)
template_script_path_arg = sys.argv[1]
output_directory_arg = sys.argv[2]
json_params_arg = sys.argv[3]
output_relpath_arg = sys.argv[4] if len(sys.argv) == 5 else None
create_modified_script(template_script_path_arg, output_directory_arg, json_params_arg, output_relpath_arg)