Skip to content

Commit dd3cd0b

Browse files
ParthParth
authored andcommitted
TEMP: GMPL pipeline changes
Implement 3-layer GMPL → MUIO conversion pipeline - Phase 1: GMPLParser (syntax extraction) - Phase 2: SliceInterpreter (semantic tuple expansion) - Phase 3: MuioTransformer (MUIO JSON generation) - Added validation scripts and fixtures - Fully testable, pure functional modules
1 parent f82f146 commit dd3cd0b

8 files changed

Lines changed: 2260 additions & 0 deletions

File tree

API/Classes/Case/GMPLParser.py

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
"""
2+
Phase 1 — Pure GMPL syntax extraction.
3+
4+
Parses a GMPL data file (.txt / .dat) into structured objects without
5+
semantic interpretation. Every ``set`` and ``param`` declaration is
6+
captured, including multi-slice blocks, headerless tables, and empty
7+
param bodies.
8+
9+
Public API
10+
----------
11+
GMPLParser.parse_file(path) → GMPLParseResult
12+
GMPLParser.parse_string(text) → GMPLParseResult
13+
"""
14+
15+
from __future__ import annotations
16+
17+
import re
18+
from dataclasses import dataclass, field
19+
from pathlib import Path
20+
from typing import Optional, Union
21+
22+
# ────────────────────────────────────────────────────────────
23+
# Data structures
24+
# ────────────────────────────────────────────────────────────
25+
26+
@dataclass
27+
class RowEntry:
28+
"""One data row: a key string followed by numeric values."""
29+
key: str
30+
values: list[Union[int, float]]
31+
32+
33+
@dataclass
34+
class SliceBlock:
35+
"""
36+
One slice of a ``param`` declaration.
37+
38+
* ``header`` – the square-bracket header tokens, e.g. ``["RE1","*","*"]``.
39+
``None`` for headerless tables.
40+
* ``column_labels`` – the column names after ``:`` on the header line.
41+
* ``rows`` – the data rows.
42+
"""
43+
header: Optional[list[str]] = None
44+
column_labels: list[str] = field(default_factory=list)
45+
rows: list[RowEntry] = field(default_factory=list)
46+
47+
48+
@dataclass
49+
class ParsedParam:
50+
"""A complete ``param`` declaration with its name, default, and slices."""
51+
name: str
52+
default: Optional[Union[int, float]] = None
53+
slices: list[SliceBlock] = field(default_factory=list)
54+
55+
56+
@dataclass
57+
class GMPLParseResult:
58+
"""Bag holding every ``set`` and ``param`` extracted from a GMPL file."""
59+
sets: dict[str, list[str]] = field(default_factory=dict)
60+
params: list[ParsedParam] = field(default_factory=list)
61+
62+
# convenience
63+
def param_names(self) -> list[str]:
64+
return [p.name for p in self.params]
65+
66+
def summary(self) -> str:
67+
lines = [f"Sets : {len(self.sets)}"]
68+
for sn, sv in self.sets.items():
69+
lines.append(f" {sn} ({len(sv)}) : {sv[:6]}{'...' if len(sv) > 6 else ''}")
70+
lines.append(f"Params: {len(self.params)}")
71+
for p in self.params:
72+
total_rows = sum(len(s.rows) for s in p.slices)
73+
lines.append(f" {p.name} (default={p.default}, slices={len(p.slices)}, rows={total_rows})")
74+
return "\n".join(lines)
75+
76+
77+
# ────────────────────────────────────────────────────────────
78+
# Tokeniser helpers
79+
# ────────────────────────────────────────────────────────────
80+
81+
_COMMENT_RE = re.compile(r"#.*")
82+
83+
def _strip_comments(text: str) -> str:
84+
return _COMMENT_RE.sub("", text)
85+
86+
def _tokenise(text: str) -> list[str]:
87+
"""Split GMPL text into semicolon-terminated statements."""
88+
clean = _strip_comments(text)
89+
parts = clean.split(";")
90+
return [p.strip() for p in parts if p.strip()]
91+
92+
def _try_number(s: str) -> Union[int, float, str]:
93+
"""Try to cast *s* to int or float; fall back to the original string."""
94+
try:
95+
v = float(s)
96+
return int(v) if v == int(v) else v
97+
except (ValueError, OverflowError):
98+
return s
99+
100+
101+
# ────────────────────────────────────────────────────────────
102+
# Parser
103+
# ────────────────────────────────────────────────────────────
104+
105+
class GMPLParser:
106+
"""
107+
Pure-syntax GMPL parser.
108+
109+
Usage::
110+
111+
result = GMPLParser.parse_file("utopia.txt")
112+
print(result.summary())
113+
"""
114+
115+
# ── public ──────────────────────────────────────────────
116+
@staticmethod
117+
def parse_file(path: str | Path) -> GMPLParseResult:
118+
"""Parse a ``.txt`` / ``.dat`` GMPL file and return structured result."""
119+
text = Path(path).read_text(encoding="utf-8", errors="replace")
120+
return GMPLParser.parse_string(text)
121+
122+
@staticmethod
123+
def parse_string(text: str) -> GMPLParseResult:
124+
"""Parse raw GMPL text and return structured result."""
125+
result = GMPLParseResult()
126+
stmts = _tokenise(text)
127+
for stmt in stmts:
128+
first = stmt.split()[0].lower() if stmt.split() else ""
129+
if first == "end":
130+
break
131+
if first == "set":
132+
GMPLParser._parse_set(stmt, result)
133+
elif first == "param":
134+
GMPLParser._parse_param(stmt, result)
135+
return result
136+
137+
# ── set ─────────────────────────────────────────────────
138+
@staticmethod
139+
def _parse_set(stmt: str, result: GMPLParseResult) -> None:
140+
tokens = stmt.split()
141+
name = tokens[1]
142+
# find := position
143+
body = ""
144+
for i, t in enumerate(tokens):
145+
if ":=" in t:
146+
# Handle glued tokens like "YEAR:="
147+
after = t.split(":=", 1)[1]
148+
rest = tokens[i + 1 :]
149+
body = (after + " " + " ".join(rest)).strip()
150+
break
151+
members = [m for m in body.split() if m]
152+
result.sets[name] = members
153+
154+
# ── param ──────────────────────────────────────────────
155+
@staticmethod
156+
def _parse_param(stmt: str, result: GMPLParseResult) -> None:
157+
tokens = stmt.split()
158+
name = tokens[1]
159+
160+
# extract default
161+
default_val: Optional[Union[int, float]] = None
162+
for i, t in enumerate(tokens):
163+
if t.lower() == "default":
164+
dv = _try_number(tokens[i + 1])
165+
if isinstance(dv, (int, float)):
166+
default_val = dv
167+
break
168+
169+
# find := position
170+
assign_pos = None
171+
for i, t in enumerate(tokens):
172+
if ":=" in t:
173+
assign_pos = i
174+
break
175+
176+
if assign_pos is None:
177+
# declaration only, no data
178+
result.params.append(ParsedParam(name=name, default=default_val))
179+
return
180+
181+
# Rejoin everything after := (handle glued tokens)
182+
glued_after = tokens[assign_pos].split(":=", 1)[1]
183+
body_tokens = ([glued_after] if glued_after else []) + tokens[assign_pos + 1 :]
184+
body = " ".join(body_tokens).strip()
185+
186+
if not body:
187+
result.params.append(ParsedParam(name=name, default=default_val))
188+
return
189+
190+
# Split into slices by `[` headers
191+
slices = GMPLParser._split_slices(body)
192+
parsed = ParsedParam(name=name, default=default_val)
193+
for sl in slices:
194+
parsed.slices.append(GMPLParser._parse_slice_block(sl))
195+
result.params.append(parsed)
196+
197+
@staticmethod
198+
def _split_slices(body: str) -> list[str]:
199+
"""Split param body into per-slice strings."""
200+
# Find all '[' positions
201+
bracket_positions = [m.start() for m in re.finditer(r"\[", body)]
202+
if not bracket_positions:
203+
return [body]
204+
205+
result = []
206+
# Anything before first bracket is a headerless slice
207+
prefix = body[: bracket_positions[0]].strip()
208+
if prefix:
209+
result.append(prefix)
210+
211+
for i, pos in enumerate(bracket_positions):
212+
end = bracket_positions[i + 1] if i + 1 < len(bracket_positions) else len(body)
213+
result.append(body[pos:end].strip())
214+
215+
return result
216+
217+
@staticmethod
218+
def _parse_slice_block(text: str) -> SliceBlock:
219+
"""Parse one slice block into a SliceBlock object."""
220+
block = SliceBlock()
221+
222+
# Extract header if present
223+
if text.startswith("["):
224+
bracket_end = text.index("]")
225+
header_str = text[1:bracket_end]
226+
block.header = [h.strip() for h in header_str.split(",")]
227+
text = text[bracket_end + 1 :].strip()
228+
229+
# Look for colon separator (column labels)
230+
if ":" in text:
231+
parts = text.split(":")
232+
# Column labels are between first and second ':'
233+
if len(parts) >= 3:
234+
# header : col1 col2 ... : \n row data
235+
col_part = parts[1].strip()
236+
block.column_labels = col_part.split()
237+
# Rejoin remaining for rows
238+
row_text = ":".join(parts[2:]).strip()
239+
# Handle `:=` at start of row_text
240+
if row_text.startswith("="):
241+
row_text = row_text[1:].strip()
242+
elif len(parts) == 2:
243+
# Might be `key : val` pairs or `:=` continuation
244+
left = parts[0].strip()
245+
right = parts[1].strip()
246+
if right.startswith("="):
247+
# it's `:=` continuation
248+
row_text = right[1:].strip()
249+
# If left has column labels
250+
col_tokens = left.split()
251+
if col_tokens:
252+
block.column_labels = col_tokens
253+
else:
254+
row_text = text
255+
else:
256+
row_text = text
257+
else:
258+
row_text = text
259+
260+
# Parse rows
261+
if row_text:
262+
GMPLParser._parse_rows(row_text, block)
263+
264+
return block
265+
266+
@staticmethod
267+
def _parse_rows(text: str, block: SliceBlock) -> None:
268+
"""Parse row data into RowEntry objects."""
269+
# Tokenize by whitespace
270+
tokens = text.split()
271+
if not tokens:
272+
return
273+
274+
n_cols = len(block.column_labels) if block.column_labels else 0
275+
276+
if n_cols > 0:
277+
# Table format: key val1 val2 ... valN
278+
i = 0
279+
while i < len(tokens):
280+
key = tokens[i]
281+
i += 1
282+
vals = []
283+
while len(vals) < n_cols and i < len(tokens):
284+
v = _try_number(tokens[i])
285+
if isinstance(v, (int, float)):
286+
vals.append(v)
287+
i += 1
288+
else:
289+
break
290+
if vals:
291+
block.rows.append(RowEntry(key=key, values=vals))
292+
else:
293+
# Headerless: key value pairs or single values
294+
i = 0
295+
while i < len(tokens):
296+
key = tokens[i]
297+
i += 1
298+
vals = []
299+
while i < len(tokens):
300+
v = _try_number(tokens[i])
301+
if isinstance(v, (int, float)):
302+
vals.append(v)
303+
i += 1
304+
else:
305+
break
306+
if vals:
307+
block.rows.append(RowEntry(key=key, values=vals))
308+
309+
310+
# ────────────────────────────────────────────────────────────
311+
# CLI entry point
312+
# ────────────────────────────────────────────────────────────
313+
314+
if __name__ == "__main__":
315+
import sys
316+
317+
if len(sys.argv) < 2:
318+
print("Usage: python GMPLParser.py <data_file.txt>")
319+
sys.exit(1)
320+
321+
result = GMPLParser.parse_file(sys.argv[1])
322+
print(result.summary())
323+
print("\n" + "=" * 60 + "\n")
324+
325+
for p in result.params[:10]:
326+
print(f"\nparam {p.name} (default={p.default}):")
327+
for si, s in enumerate(p.slices):
328+
print(f" slice[{si}]: header={s.header}")
329+
print(f" columns: {s.column_labels}")
330+
for r in s.rows[:3]:
331+
print(f" {r}")
332+
if len(s.rows) > 3:
333+
print(f" ... ({len(s.rows)} rows total)")

0 commit comments

Comments
 (0)