Skip to content

Commit 195706d

Browse files
committed
improved metrics display and aggregate triple duplication bug fix
1 parent 1fe33ea commit 195706d

5 files changed

Lines changed: 283 additions & 107 deletions

File tree

src/run_conversion.sh

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ mkdir -p "$TIME_LOG_DIR" "$METRICS_JSON_DIR"
3535
TIME_LOG="$TIME_LOG_DIR/${RUN_ID}.txt"
3636
METRICS_JSON="$METRICS_JSON_DIR/${RUN_ID}.json"
3737
METRICS_CSV="$LOGDIR/metrics.csv"
38-
METRICS_HEADER="run_id,timestamp,output_name,output_dir,exit_code_java,wall_seconds_java,user_seconds_java,sys_seconds_java,max_rss_kb_java,input_mapping_size_bytes,input_vcf_size_bytes,output_dir_size_bytes,output_triples,jar,mapping_file,output_path,combined_rdf_size_bytes,gzip_size_bytes,brotli_size_bytes,hdt_size_bytes,exit_code_gzip,exit_code_brotli,exit_code_hdt,wall_seconds_gzip,user_seconds_gzip,sys_seconds_gzip,max_rss_kb_gzip,wall_seconds_brotli,user_seconds_brotli,sys_seconds_brotli,max_rss_kb_brotli,wall_seconds_hdt,user_seconds_hdt,sys_seconds_hdt,max_rss_kb_hdt,compression_methods,hdt_source,gzip_on_hdt_size_bytes,brotli_on_hdt_size_bytes,exit_code_gzip_on_hdt,exit_code_brotli_on_hdt,wall_seconds_gzip_on_hdt,user_seconds_gzip_on_hdt,sys_seconds_gzip_on_hdt,max_rss_kb_gzip_on_hdt,wall_seconds_brotli_on_hdt,user_seconds_brotli_on_hdt,sys_seconds_brotli_on_hdt,max_rss_kb_brotli_on_hdt"
38+
METRICS_HEADER="run_id,timestamp,output_name,output_dir,exit_code_java,wall_seconds_java,user_seconds_java,sys_seconds_java,max_rss_kb_java,input_mapping_size_bytes,input_vcf_size_bytes,output_dir_size_bytes,output_triples,jar,mapping_file,output_path"
3939

4040

4141
# Return byte size for file or directory (GNU + BSD compatible).
@@ -76,6 +76,21 @@ stat_size() {
7676

7777
have_gnu_time() { [[ -x /usr/bin/time ]] && /usr/bin/time --version >/dev/null 2>&1; }
7878

79+
# Return stable content hash for duplicate part detection.
80+
hash_file_sha256() {
81+
local path="$1"
82+
if command -v sha256sum >/dev/null 2>&1; then
83+
sha256sum "$path" | awk '{print $1}'
84+
return
85+
fi
86+
if command -v shasum >/dev/null 2>&1; then
87+
shasum -a 256 "$path" | awk '{print $1}'
88+
return
89+
fi
90+
# Last-resort fallback when SHA utilities are unavailable.
91+
cksum "$path" | awk '{print $1":"$2}'
92+
}
93+
7994
# Count triples via non-comment RDF lines ending in '.'.
8095
count_triples_json() {
8196
local path="$1"
@@ -167,13 +182,30 @@ if [[ "$AGGREGATE_RDF" == "1" ]]; then
167182
PART_FILES=("$OUT_DIR/$OUT_NAME"/*.nt)
168183
if (( ${#PART_FILES[@]} > 0 )); then
169184
: > "$MERGED_NT"
185+
# Defensive dedupe: some Spark/RMLStreamer runs can emit identical part
186+
# files for the same dataset. Skip exact duplicate part payloads to avoid
187+
# doubling every triple in the merged output.
188+
SEEN_HASH_FILE="$OUT_DIR/$OUT_NAME/.seen_part_hashes.$$"
189+
SEEN_MAP_FILE="$OUT_DIR/$OUT_NAME/.seen_part_hash_map.$$"
190+
: > "$SEEN_HASH_FILE"
191+
: > "$SEEN_MAP_FILE"
170192
for PART_NT in "${PART_FILES[@]}"; do
171193
if [[ "$PART_NT" == "$MERGED_NT" ]]; then
172194
continue
173195
fi
196+
PART_HASH=$(hash_file_sha256 "$PART_NT")
197+
if grep -Fqx "$PART_HASH" "$SEEN_HASH_FILE"; then
198+
FIRST_SEEN=$(awk -F'\t' -v hash="$PART_HASH" '$1 == hash { print $2; exit }' "$SEEN_MAP_FILE")
199+
echo "WARNING: skipping duplicate RDF part '$PART_NT' (same content as '$FIRST_SEEN')." >&2
200+
rm -f "$PART_NT"
201+
continue
202+
fi
203+
printf "%s\n" "$PART_HASH" >> "$SEEN_HASH_FILE"
204+
printf "%s\t%s\n" "$PART_HASH" "$PART_NT" >> "$SEEN_MAP_FILE"
174205
cat "$PART_NT" >> "$MERGED_NT"
175206
rm -f "$PART_NT"
176207
done
208+
rm -f "$SEEN_HASH_FILE" "$SEEN_MAP_FILE"
177209
else
178210
: > "$MERGED_NT"
179211
fi
@@ -265,13 +297,7 @@ csv_fields=(
265297
"$JAR"
266298
"$IN"
267299
"$OUTPUT_PATH"
268-
"" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" ""
269300
)
270-
271-
# Compression-related fields are initialized as empty from conversion step output.
272-
for _ in $(seq 1 13); do
273-
csv_fields+=("")
274-
done
275301
( IFS=,; echo "${csv_fields[*]}" ) >> "$METRICS_CSV"
276302

277303
echo "Done."

test/helpers.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,25 @@
5656
"max_rss_kb_brotli_on_hdt",
5757
]
5858

59+
CONVERSION_METRICS_HEADER = [
60+
"run_id",
61+
"timestamp",
62+
"output_name",
63+
"output_dir",
64+
"exit_code_java",
65+
"wall_seconds_java",
66+
"user_seconds_java",
67+
"sys_seconds_java",
68+
"max_rss_kb_java",
69+
"input_mapping_size_bytes",
70+
"input_vcf_size_bytes",
71+
"output_dir_size_bytes",
72+
"output_triples",
73+
"jar",
74+
"mapping_file",
75+
"output_path",
76+
]
77+
5978

6079
def make_executable(path: Path, content: str) -> None:
6180
path.write_text(content)

test/test_run_conversion_unit.py

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import unittest
55
from pathlib import Path
66

7-
from test.helpers import METRICS_HEADER, VerboseTestCase, env_with_path, make_executable
7+
from test.helpers import CONVERSION_METRICS_HEADER, VerboseTestCase, env_with_path, make_executable
88

99

1010
REPO_ROOT = Path(__file__).resolve().parents[1]
@@ -76,11 +76,10 @@ def test_run_conversion_writes_nt_and_metrics_without_real_java(self):
7676
rows = list(csv.DictReader(f))
7777
self.assertTrue(rows)
7878
row = rows[0]
79-
self.assertEqual(list(row.keys()), METRICS_HEADER)
79+
self.assertEqual(list(row.keys()), CONVERSION_METRICS_HEADER)
8080
self.assertEqual(row["run_id"], "run123")
8181
self.assertEqual(row["output_name"], "rdf")
8282
self.assertEqual(row["exit_code_java"], "0")
83-
self.assertEqual(row["compression_methods"], "")
8483

8584
def test_run_conversion_exits_non_zero_when_java_fails(self):
8685
"""Conversion script returns non-zero and records exit_code_java when Java command fails."""
@@ -355,6 +354,67 @@ def test_run_conversion_batch_mode_keeps_individual_nt_parts(self):
355354
self.assertTrue((out_dir / "rdf" / "part-00001.nt").exists())
356355
self.assertFalse((out_dir / "rdf" / "rdf.nt").exists())
357356

357+
def test_run_conversion_aggregate_skips_duplicate_part_payloads(self):
358+
"""Aggregate merge skips exact duplicate part files to avoid duplicated triples."""
359+
with tempfile.TemporaryDirectory() as td:
360+
tmp_path = Path(td)
361+
fake_bin = tmp_path / "bin"
362+
fake_bin.mkdir()
363+
make_executable(
364+
fake_bin / "java",
365+
"""#!/usr/bin/env bash
366+
set -euo pipefail
367+
if [[ "${1:-}" == "-version" ]]; then
368+
echo 'openjdk version "11.0.0"' >&2
369+
exit 0
370+
fi
371+
out=""
372+
while [[ $# -gt 0 ]]; do
373+
if [[ "$1" == "-o" ]]; then
374+
out="$2"
375+
shift 2
376+
continue
377+
fi
378+
shift
379+
done
380+
mkdir -p "$out"
381+
printf '<dup> <p> <o> .\\n' > "$out/part-00000"
382+
printf '<dup> <p> <o> .\\n' > "$out/part-00001"
383+
printf '<uniq> <p> <o> .\\n' > "$out/part-00002"
384+
""",
385+
)
386+
387+
out_dir = tmp_path / "out"
388+
metrics_dir = tmp_path / "metrics"
389+
rules = tmp_path / "rules.ttl"
390+
rules.write_text("@prefix ex: <http://example.org/> .\n")
391+
vcf = tmp_path / "input.vcf"
392+
vcf.write_text("##fileformat=VCFv4.2\n#CHROM\tPOS\n1\t5\n")
393+
394+
env = env_with_path(fake_bin)
395+
env.update(
396+
{
397+
"JAR": "fake.jar",
398+
"IN": str(rules),
399+
"IN_VCF": str(vcf),
400+
"OUT_DIR": str(out_dir),
401+
"OUT_NAME": "rdf",
402+
"LOGDIR": str(metrics_dir),
403+
"RUN_ID": "run-dedupe",
404+
"TIMESTAMP": "2026-01-01T00:00:00",
405+
}
406+
)
407+
408+
result = subprocess.run(["bash", str(SCRIPT)], env=env, capture_output=True, text=True)
409+
self.assertEqual(result.returncode, 0, msg=result.stderr)
410+
self.assertIn("skipping duplicate RDF part", result.stderr)
411+
412+
merged_nt = out_dir / "rdf" / "rdf.nt"
413+
self.assertTrue(merged_nt.exists())
414+
lines = [line.strip() for line in merged_nt.read_text().splitlines() if line.strip()]
415+
self.assertEqual(lines.count("<dup> <p> <o> ."), 1)
416+
self.assertEqual(lines.count("<uniq> <p> <o> ."), 1)
417+
358418

359419
if __name__ == "__main__":
360420
unittest.main()

test/test_vcf_rdfizer_unit.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,13 @@ def test_update_metrics_csv_keeps_raw_and_hdt_compound_metrics_separate(self):
163163
)
164164

165165
with metrics_csv.open() as handle:
166-
row = next(csv.DictReader(handle))
166+
reader = csv.DictReader(handle)
167+
row = next(reader)
168+
fieldnames = reader.fieldnames or []
167169

168-
self.assertEqual(row["gzip_size_bytes"], "0")
170+
self.assertNotIn("gzip_size_bytes", fieldnames)
171+
self.assertIn("gzip_on_hdt_size_bytes", fieldnames)
169172
self.assertEqual(row["gzip_on_hdt_size_bytes"], "12")
170-
self.assertEqual(row["exit_code_gzip"], "0")
171173
self.assertEqual(row["exit_code_gzip_on_hdt"], "0")
172174
self.assertEqual(row["hdt_source"], "existing")
173175
self.assertEqual(row["user_seconds_hdt"], "1.100000")
@@ -630,6 +632,8 @@ def fake_run(cmd, cwd=None, env=None):
630632
output = out_buf.getvalue()
631633
self.assertIn("Triples produced: 17", output)
632634
self.assertIn("Total triples produced (full run): 17", output)
635+
self.assertIn("Final RDF size (no compression):", output)
636+
self.assertIn("- N-Triples (.nt):", output)
633637
self.assertIn("Run time (full mode):", output)
634638

635639
run_metrics_dir = latest_metrics_run_dir(metrics_dir)
@@ -1624,11 +1628,12 @@ def fake_run(cmd, cwd=None, env=None):
16241628
self.assertEqual(rc, 0)
16251629

16261630
def test_main_removes_tsv_when_wrapper_created_it(self):
1627-
"""Wrapper removes TSV directory when it created it and --keep-tsv is not set."""
1631+
"""Wrapper removes hidden .intermediate directory when --keep-tsv is not set."""
16281632
with tempfile.TemporaryDirectory() as td:
16291633
tmp_path = Path(td)
16301634
input_dir, rules_path = prepare_inputs(tmp_path)
1631-
tsv_dir = tmp_path / "tsv-out"
1635+
intermediate_dir = tmp_path / "out" / ".intermediate"
1636+
tsv_dir = intermediate_dir / "tsv"
16321637

16331638
def fake_run(cmd, cwd=None, env=None):
16341639
return 0
@@ -1649,23 +1654,23 @@ def fake_run(cmd, cwd=None, env=None):
16491654
str(input_dir),
16501655
"--rules",
16511656
str(rules_path),
1652-
"--tsv",
1653-
str(tsv_dir),
16541657
]
16551658
)
16561659
finally:
16571660
os.chdir(old_cwd)
16581661

16591662
self.assertEqual(rc, 0)
16601663
self.assertFalse(tsv_dir.exists())
1664+
self.assertFalse(intermediate_dir.exists())
16611665

1662-
def test_main_keeps_preexisting_tsv_directory(self):
1663-
"""Wrapper preserves preexisting TSV directory to avoid deleting user-managed files."""
1666+
def test_main_keep_tsv_preserves_hidden_intermediate_directory(self):
1667+
"""Wrapper preserves hidden intermediates when --keep-tsv is set."""
16641668
with tempfile.TemporaryDirectory() as td:
16651669
tmp_path = Path(td)
16661670
input_dir, rules_path = prepare_inputs(tmp_path)
1667-
tsv_dir = tmp_path / "tsv-out"
1668-
tsv_dir.mkdir()
1671+
intermediate_dir = tmp_path / "out" / ".intermediate"
1672+
tsv_dir = intermediate_dir / "tsv"
1673+
tsv_dir.mkdir(parents=True, exist_ok=True)
16691674
sentinel = tsv_dir / "keep.me"
16701675
sentinel.write_text("x")
16711676

@@ -1688,15 +1693,15 @@ def fake_run(cmd, cwd=None, env=None):
16881693
str(input_dir),
16891694
"--rules",
16901695
str(rules_path),
1691-
"--tsv",
1692-
str(tsv_dir),
1696+
"--keep-tsv",
16931697
]
16941698
)
16951699
finally:
16961700
os.chdir(old_cwd)
16971701

16981702
self.assertEqual(rc, 0)
16991703
self.assertTrue(sentinel.exists())
1704+
self.assertTrue(intermediate_dir.exists())
17001705

17011706
def test_main_fails_when_tsv_step_fails(self):
17021707
"""Wrapper stops when TSV conversion command returns non-zero."""

0 commit comments

Comments
 (0)