Skip to content

Commit 3e06d5b

Browse files
authored
Merge pull request #225 from GREENRAT-K405/update/concoredocker-with-concore
Update concoredocker with concore.
2 parents 79a476c + 1e76558 commit 3e06d5b

1 file changed

Lines changed: 192 additions & 30 deletions

File tree

concoredocker.py

Lines changed: 192 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,112 @@
22
from ast import literal_eval
33
import re
44
import os
5+
import logging
6+
import zmq
7+
import numpy as np
8+
9+
logging.basicConfig(
10+
level=logging.INFO,
11+
format='%(levelname)s - %(message)s'
12+
)
13+
14+
class ZeroMQPort:
15+
def __init__(self, port_type, address, zmq_socket_type):
16+
self.context = zmq.Context()
17+
self.socket = self.context.socket(zmq_socket_type)
18+
self.port_type = port_type
19+
self.address = address
20+
21+
# Configure timeouts & immediate close on failure
22+
self.socket.setsockopt(zmq.RCVTIMEO, 2000) # 2 sec receive timeout
23+
self.socket.setsockopt(zmq.SNDTIMEO, 2000) # 2 sec send timeout
24+
self.socket.setsockopt(zmq.LINGER, 0) # Drop pending messages on close
25+
26+
# Bind or connect
27+
if self.port_type == "bind":
28+
self.socket.bind(address)
29+
logging.info(f"ZMQ Port bound to {address}")
30+
else:
31+
self.socket.connect(address)
32+
logging.info(f"ZMQ Port connected to {address}")
33+
34+
def send_json_with_retry(self, message):
35+
"""Send JSON message with retries if timeout occurs."""
36+
for attempt in range(5):
37+
try:
38+
self.socket.send_json(message)
39+
return
40+
except zmq.Again:
41+
logging.warning(f"Send timeout (attempt {attempt + 1}/5)")
42+
time.sleep(0.5)
43+
logging.error("Failed to send after retries.")
44+
return
45+
46+
def recv_json_with_retry(self):
47+
"""Receive JSON message with retries if timeout occurs."""
48+
for attempt in range(5):
49+
try:
50+
return self.socket.recv_json()
51+
except zmq.Again:
52+
logging.warning(f"Receive timeout (attempt {attempt + 1}/5)")
53+
time.sleep(0.5)
54+
logging.error("Failed to receive after retries.")
55+
return None
56+
57+
# Global ZeroMQ ports registry
58+
zmq_ports = {}
59+
60+
def init_zmq_port(port_name, port_type, address, socket_type_str):
61+
"""
62+
Initializes and registers a ZeroMQ port.
63+
port_name (str): A unique name for this ZMQ port.
64+
port_type (str): "bind" or "connect".
65+
address (str): The ZMQ address (e.g., "tcp://*:5555", "tcp://localhost:5555").
66+
socket_type_str (str): String representation of ZMQ socket type (e.g., "REQ", "REP", "PUB", "SUB").
67+
"""
68+
if port_name in zmq_ports:
69+
logging.info(f"ZMQ Port {port_name} already initialized.")
70+
return#avoid reinstallation
71+
try:
72+
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
73+
zmq_socket_type = getattr(zmq, socket_type_str.upper())
74+
zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
75+
logging.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
76+
except AttributeError:
77+
logging.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
78+
except zmq.error.ZMQError as e:
79+
logging.error(f"Error initializing ZMQ port {port_name} on {address}: {e}")
80+
except Exception as e:
81+
logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
82+
83+
def terminate_zmq():
84+
for port in zmq_ports.values():
85+
try:
86+
port.socket.close()
87+
port.context.term()
88+
except Exception as e:
89+
logging.error(f"Error while terminating ZMQ port {port.address}: {e}")
90+
# --- ZeroMQ Integration End ---
91+
92+
# NumPy Type Conversion Helper
93+
def convert_numpy_to_python(obj):
94+
if isinstance(obj, np.generic):
95+
return obj.item()
96+
elif isinstance(obj, list):
97+
return [convert_numpy_to_python(item) for item in obj]
98+
elif isinstance(obj, tuple):
99+
return tuple(convert_numpy_to_python(item) for item in obj)
100+
elif isinstance(obj, dict):
101+
return {key: convert_numpy_to_python(value) for key, value in obj.items()}
102+
else:
103+
return obj
5104

6105
def safe_literal_eval(filename, defaultValue):
7106
try:
8107
with open(filename, "r") as file:
9108
return literal_eval(file.read())
10109
except (FileNotFoundError, SyntaxError, ValueError, Exception) as e:
11-
print(f"Error reading {filename}: {e}")
110+
logging.info(f"Error reading {filename}: {e}")
12111
return defaultValue
13112

14113
iport = safe_literal_eval("concore.iport", {})
@@ -21,8 +120,8 @@ def safe_literal_eval(filename, defaultValue):
21120
inpath = os.path.abspath("/in")
22121
outpath = os.path.abspath("/out")
23122
simtime = 0
24-
concore_params_file = os.path.join(inpath, "1", "concore.params")
25-
concore_maxtime_file = os.path.join(inpath, "1", "concore.maxtime")
123+
concore_params_file = os.path.join(inpath + "1", "concore.params")
124+
concore_maxtime_file = os.path.join(inpath + "1", "concore.maxtime")
26125

27126
#9/21/22
28127
def parse_params(sparams):
@@ -41,8 +140,8 @@ def parse_params(sparams):
41140
except (ValueError, SyntaxError):
42141
pass
43142

44-
# keep backward compatibility: comma-separated params
45-
for item in s.split(","):
143+
# Potentially breaking backward compatibility: moving away from the comma-separated params
144+
for item in s.split(";"):
46145
if "=" in item:
47146
key, value = item.split("=", 1)
48147
key = key.strip()
@@ -66,12 +165,13 @@ def parse_params(sparams):
66165
sparams = sparams[:sparams.find('"')]
67166

68167
if sparams:
69-
print("parsing sparams:", sparams)
168+
logging.debug("parsing sparams: "+sparams)
70169
params = parse_params(sparams)
170+
logging.debug("parsed params: " + str(params))
71171
else:
72172
params = dict()
73173
except Exception as e:
74-
print(f"Error reading concore.params: {e}")
174+
logging.error(f"Error reading concore.params: {e}")
75175
params = dict()
76176

77177
#9/30/22
@@ -93,74 +193,136 @@ def unchanged():
93193
olds = s
94194
return False
95195

96-
def read(port, name, initstr):
196+
def read(port_identifier, name, initstr_val):
97197
global s, simtime, retrycount
98-
max_retries=5
198+
199+
default_return_val = initstr_val
200+
if isinstance(initstr_val, str):
201+
try:
202+
default_return_val = literal_eval(initstr_val)
203+
except (SyntaxError, ValueError):
204+
# Failed to parse initstr_val; fall back to the original string value.
205+
logging.debug(
206+
"Could not parse initstr_val %r with literal_eval; using raw string as default.",
207+
initstr_val
208+
)
209+
210+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
211+
zmq_p = zmq_ports[port_identifier]
212+
try:
213+
message = zmq_p.recv_json_with_retry()
214+
return message
215+
except zmq.error.ZMQError as e:
216+
logging.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
217+
return default_return_val
218+
except Exception as e:
219+
logging.error(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
220+
return default_return_val
221+
222+
try:
223+
file_port_num = int(port_identifier)
224+
except ValueError:
225+
logging.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
226+
return default_return_val
227+
99228
time.sleep(delay)
100-
file_path = os.path.join(inpath, str(port), name)
229+
# Construct file path consistent with other components (e.g., /in1/<name>)
230+
file_path = os.path.join(inpath + str(file_port_num), name)
101231

102232
try:
103233
with open(file_path, "r") as infile:
104234
ins = infile.read()
105235
except FileNotFoundError:
106-
print(f"File {file_path} not found, using default value.")
107-
ins = initstr
236+
ins = str(initstr_val)
237+
s += ins
108238
except Exception as e:
109-
print(f"Error reading {file_path}: {e}")
110-
return initstr
239+
logging.error(f"Error reading {file_path}: {e}. Using default value.")
240+
return default_return_val
111241

112242
attempts = 0
243+
max_retries = 5
113244
while len(ins) == 0 and attempts < max_retries:
114245
time.sleep(delay)
115246
try:
116247
with open(file_path, "r") as infile:
117248
ins = infile.read()
118249
except Exception as e:
119-
print(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
250+
logging.warning(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
120251
attempts += 1
121252
retrycount += 1
122253

123254
if len(ins) == 0:
124-
print(f"Max retries reached for {file_path}, using default value.")
125-
return initstr
255+
logging.error(f"Max retries reached for {file_path}, using default value.")
256+
return default_return_val
126257

127258
s += ins
128259
try:
129260
inval = literal_eval(ins)
130-
simtime = max(simtime, inval[0])
131-
return inval[1:]
261+
if isinstance(inval, list) and len(inval) > 0:
262+
current_simtime_from_file = inval[0]
263+
if isinstance(current_simtime_from_file, (int, float)):
264+
simtime = max(simtime, current_simtime_from_file)
265+
return inval[1:]
266+
else:
267+
logging.warning(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
268+
return inval
132269
except Exception as e:
133-
print(f"Error parsing {ins}: {e}")
134-
return initstr
270+
logging.error(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
271+
return default_return_val
135272

136-
137-
def write(port, name, val, delta=0):
273+
def write(port_identifier, name, val, delta=0):
138274
global simtime
139-
file_path = os.path.join(outpath, str(port), name)
275+
276+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
277+
zmq_p = zmq_ports[port_identifier]
278+
try:
279+
zmq_p.send_json_with_retry(val)
280+
except zmq.error.ZMQError as e:
281+
logging.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
282+
except Exception as e:
283+
logging.error(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
284+
285+
try:
286+
file_port_num = int(port_identifier)
287+
file_path = os.path.join(outpath + str(file_port_num), name)
288+
except ValueError:
289+
logging.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
290+
return
140291

141292
if isinstance(val, str):
142293
time.sleep(2 * delay)
143294
elif not isinstance(val, list):
144-
print("write must have list or str")
295+
logging.error(f"File write to {file_path} must have list or str value, got {type(val)}")
145296
return
146297

147298
try:
148299
with open(file_path, "w") as outfile:
149300
if isinstance(val, list):
150-
outfile.write(str([simtime + delta] + val))
301+
val_converted = convert_numpy_to_python(val)
302+
data_to_write = [simtime + delta] + val_converted
303+
outfile.write(str(data_to_write))
151304
simtime += delta
152305
else:
153306
outfile.write(val)
154307
except Exception as e:
155-
print(f"Error writing to {file_path}: {e}")
308+
logging.error(f"Error writing to {file_path}: {e}")
156309

157310
def initval(simtime_val):
158311
global simtime
159312
try:
160313
val = literal_eval(simtime_val)
161-
simtime = val[0]
162-
return val[1:]
314+
if isinstance(val, list) and len(val) > 0:
315+
first_element = val[0]
316+
if isinstance(first_element, (int, float)):
317+
simtime = first_element
318+
return val[1:]
319+
else:
320+
logging.error(f"Error: First element in initval string '{simtime_val}' is not a number. Using data part as is or empty.")
321+
return val[1:] if len(val) > 1 else []
322+
else:
323+
logging.error(f"Error: initval string '{simtime_val}' is not a list or is empty. Returning empty list.")
324+
return []
163325
except Exception as e:
164-
print(f"Error parsing simtime_val: {e}")
326+
logging.error(f"Error parsing simtime_val_str '{simtime_val}': {e}. Returning empty list.")
165327
return []
166328

0 commit comments

Comments
 (0)