Skip to content

Commit 9f66ed8

Browse files
authored
Merge pull request #556 from lsst/tickets/DM-54162
DM-54162: write in-process-gathered provenance when exceptions are raised and handled
2 parents 6a36e4f + bdff026 commit 9f66ed8

6 files changed

Lines changed: 160 additions & 56 deletions

File tree

doc/changes/DM-54162.feature.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Write provenance gathered from in-process execution (i.e. `SeparablePipelineExecutor` and `MPGraphExecutor`) when a task raises an exception and the executor successfully catches it and continues with other tasks.
2+
3+
This also includes task construction in the quantum executor's error-handling logic, so exceptions raised at that point do not break provenance writing or require any other special-casing at higher levels.

python/lsst/pipe/base/mp_graph_executor.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ def execute(
531531
new_graph = graph
532532
xgraph = self._make_xgraph(new_graph, old_graph)
533533
self._report = Report(qgraphSummary=new_graph._make_summary())
534+
err: MPGraphExecutorError | None = None
534535
with ExitStack() as exit_stack:
535536
provenance_writer: ProvenanceQuantumGraphWriter | None = None
536537
if provenance_graph_file is not None:
@@ -549,13 +550,21 @@ def execute(
549550
self._execute_quanta_mp(xgraph, self._report)
550551
else:
551552
self._execute_quanta_in_process(xgraph, self._report, provenance_writer)
553+
except MPGraphExecutorError as exc:
554+
self._report.set_exception(exc)
555+
err = exc
556+
# Defer re-raising this exception only to let provenance writes
557+
# finish as the ExitStack closes. The original traceback for
558+
# this exception isn't useful anyway.
552559
except Exception as exc:
553560
self._report.set_exception(exc)
554561
raise
555562
if provenance_writer is not None:
556563
provenance_writer.write_overall_inputs()
557564
provenance_writer.write_packages()
558565
provenance_writer.write_init_outputs(assume_existence=True)
566+
if err is not None:
567+
raise err
559568

560569
def _make_xgraph(
561570
self, new_graph: PredictedQuantumGraph, old_graph: QuantumGraph | None
@@ -724,6 +733,8 @@ def tiebreaker_sort_key(quantum_id: uuid.UUID) -> tuple:
724733
taskLabel=downstream_node_state["task_label"],
725734
)
726735
report.quantaReports.append(failed_quantum_report)
736+
if provenance_writer is not None:
737+
provenance_writer.write_blocked_quantum_provenance(downstream_quantum_id)
727738
_LOG.error(
728739
"Upstream job failed for task %s (%s@%s), skipping this quantum.",
729740
downstream_quantum_id,

python/lsst/pipe/base/quantum_graph/_provenance.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2040,6 +2040,17 @@ def write_quantum_provenance(
20402040
scan_data = provenance_models.to_scan_data(predicted_quantum, compressor=self.compressor)
20412041
self.write_scan_data(scan_data)
20422042

2043+
def write_blocked_quantum_provenance(self, quantum_id: uuid.UUID) -> None:
2044+
"""Gather and write provenance for a quantum that was blocked by an
2045+
upstream failure.
2046+
2047+
Parameters
2048+
----------
2049+
quantum_id : `uuid.UUID`
2050+
Unique ID for the quantum.
2051+
"""
2052+
self.write_scan_data(ProvenanceQuantumScanData.make_blocked(quantum_id))
2053+
20432054
def write_scan_data(self, scan_data: ProvenanceQuantumScanData) -> None:
20442055
"""Write the output of a quantum provenance scan to disk.
20452056
@@ -2436,6 +2447,27 @@ class ProvenanceQuantumScanData:
24362447
compressed.
24372448
"""
24382449

2450+
@classmethod
2451+
def make_blocked(cls, quantum_id: uuid.UUID) -> ProvenanceQuantumScanData:
2452+
"""Construct provenance information for a quantum blocked by an
2453+
upstream failure.
2454+
2455+
Parameters
2456+
----------
2457+
quantum_id : `uuid.UUID`
2458+
Unique ID of the quantum
2459+
2460+
Returns
2461+
-------
2462+
scan_data : `ProvenanceQuantumScanData`
2463+
Struct with ready-to-write provenance data.
2464+
"""
2465+
return ProvenanceQuantumScanData(
2466+
quantum_id,
2467+
status=ProvenanceQuantumScanStatus.BLOCKED,
2468+
is_compressed=True, # nothing to compress
2469+
)
2470+
24392471
def compress(self, compressor: Compressor) -> None:
24402472
"""Compress the data in this struct if it has not been compressed
24412473
already.

python/lsst/pipe/base/separable_pipeline_executor.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from ._quantumContext import ExecutionResources
4747
from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
4848
from .graph import QuantumGraph
49-
from .mp_graph_executor import MPGraphExecutor
49+
from .mp_graph_executor import MPGraphExecutor, MPGraphExecutorError
5050
from .pipeline import Pipeline
5151
from .quantum_graph import PredictedQuantumGraph
5252
from .quantum_graph_builder import QuantumGraphBuilder
@@ -390,11 +390,12 @@ def run_pipeline(
390390
provenance_dataset_ref : `lsst.daf.butler.DatasetRef`, optional
391391
Dataset that should be used to save provenance. Provenance is only
392392
supported when running in a single process (at least for the
393-
default quantum executor), and should not be used with
394-
``skip_existing_in=[output_run]`` when retrying a previous
395-
execution attempt. The caller is responsible for registering the
396-
dataset type and for ensuring that the dimensions of this dataset
397-
do not lead to uniqueness conflicts.
393+
default quantum executor), and should not be enabled in contexts
394+
where a quantum might be executed more than once (i.e. retried)
395+
within the same `~lsst.daf.butler.CollectionType.RUN` collection.
396+
The caller is responsible for registering the dataset type and for
397+
ensuring that the dimensions of this dataset do not lead to
398+
uniqueness conflicts.
398399
"""
399400
if not graph_executor:
400401
quantum_executor = SingleQuantumExecutor(
@@ -417,7 +418,16 @@ def run_pipeline(
417418

418419
if provenance_dataset_ref is not None:
419420
with TemporaryForIngest(self._butler, provenance_dataset_ref) as temporary:
420-
graph_executor.execute(graph, provenance_graph_file=temporary.ospath)
421-
temporary.ingest()
421+
try:
422+
graph_executor.execute(graph, provenance_graph_file=temporary.ospath)
423+
temporary.ingest()
424+
except MPGraphExecutorError:
425+
# If the graph executor itself raised, it will have
426+
# finished the provenance rewrite. In other cases the
427+
# temporary file might be incomplete or corrupted and we
428+
# can't roll the dice on ingesting it.
429+
temporary.ingest()
430+
raise
431+
422432
else:
423433
graph_executor.execute(graph)

python/lsst/pipe/base/single_quantum_executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,10 @@ def _execute_with_limited_butler(
293293
task_node.label,
294294
quantum.dataId,
295295
)
296-
task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs)
297-
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
298-
outputs_put: list[uuid.UUID] = []
299296
try:
297+
task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs)
298+
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
299+
outputs_put: list[uuid.UUID] = []
300300
with limited_butler.record_metrics() as butler_metrics:
301301
caveats = self._run_quantum(
302302
task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put

tests/test_separable_pipeline_executor.py

Lines changed: 93 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
PROVENANCE_DATASET_TYPE_NAME,
5151
PROVENANCE_STORAGE_CLASS,
5252
)
53+
from lsst.pipe.base.mp_graph_executor import MPGraphExecutorError
54+
from lsst.pipe.base.quantum_graph import ProvenanceQuantumGraph
5355
from lsst.pipe.base.quantum_graph_builder import OutputExistsError
5456
from lsst.pipe.base.separable_pipeline_executor import SeparablePipelineExecutor
5557
from lsst.pipe.base.tests.mocks import (
@@ -1153,60 +1155,106 @@ class SeparablePipelineExecutorMockTests(lsst.utils.tests.TestCase):
11531155
the lsst.pipe.base.tests.mocks system to define complex pipelines.
11541156
"""
11551157

1158+
def setUp(self):
1159+
# 'base.yaml' adds an instrument, 'Cam1', with four detectors and
1160+
# two physical filters.
1161+
self.helper, _ = self.enterContext(DirectButlerRepo.make_temporary("base.yaml"))
1162+
1163+
def run_base_test(
1164+
self, b_config: DynamicTestPipelineTaskConfig, expected_error: type[Exception] | None
1165+
) -> ProvenanceQuantumGraph:
1166+
"""Build and run a quantum graph with three tasks and four data IDs,
1167+
with customization of the middle task.
1168+
"""
1169+
self.helper.add_task("a", dimensions=["detector"])
1170+
self.helper.add_task("b", dimensions=["detector"], config=b_config)
1171+
self.helper.add_task("c", dimensions=["detector"])
1172+
qg = self.helper.make_quantum_graph()
1173+
self.helper.butler.collections.register(qg.header.output_run)
1174+
qg.init_output_run(self.helper.butler, existing=False)
1175+
executor = SeparablePipelineExecutor(
1176+
self.helper.butler.clone(collections=qg.header.inputs, run=qg.header.output_run)
1177+
)
1178+
provenance_type = lsst.daf.butler.DatasetType(
1179+
PROVENANCE_DATASET_TYPE_NAME,
1180+
self.helper.butler.dimensions.empty,
1181+
PROVENANCE_STORAGE_CLASS,
1182+
)
1183+
self.helper.butler.registry.registerDatasetType(provenance_type)
1184+
provenance_ref = lsst.daf.butler.DatasetRef(
1185+
provenance_type,
1186+
lsst.daf.butler.DataCoordinate.make_empty(self.helper.butler.dimensions),
1187+
run=qg.header.output_run,
1188+
)
1189+
if expected_error is None:
1190+
executor.run_pipeline(qg, provenance_dataset_ref=provenance_ref)
1191+
else:
1192+
with self.assertRaises(expected_error):
1193+
executor.run_pipeline(qg, provenance_dataset_ref=provenance_ref)
1194+
provenance_graph = self.helper.butler.get(provenance_ref)
1195+
self.assertEqual(len(provenance_graph.quanta_by_task), 3)
1196+
self.assertEqual(len(provenance_graph.quanta_by_task["a"]), 4)
1197+
self.assertEqual(len(provenance_graph.quanta_by_task["b"]), 4)
1198+
self.assertEqual(len(provenance_graph.quanta_by_task["c"]), 4)
1199+
return provenance_graph
1200+
11561201
def test_no_work_chain_provenance(self):
11571202
"""Test provenance recording when a NoWorkFound error chains to
11581203
downstream tasks during execution.
11591204
"""
1205+
b_config = DynamicTestPipelineTaskConfig()
1206+
b_config.fail_exception = "lsst.pipe.base.NoWorkFound"
1207+
b_config.fail_condition = "detector=2"
1208+
provenance_graph = self.run_base_test(b_config, expected_error=None)
1209+
xgraph = provenance_graph.quantum_only_xgraph
1210+
for quantum_id in provenance_graph.quanta_by_task["a"].values():
1211+
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1212+
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1213+
for data_id, quantum_id in provenance_graph.quanta_by_task["b"].items():
1214+
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1215+
if data_id["detector"] == 2:
1216+
self.assertTrue(xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.NO_WORK)
1217+
else:
1218+
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1219+
for data_id, quantum_id in provenance_graph.quanta_by_task["c"].items():
1220+
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1221+
if data_id["detector"] == 2:
1222+
self.assertTrue(
1223+
xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED
1224+
)
1225+
else:
1226+
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1227+
1228+
def test_failure_block_provenance(self):
1229+
"""Test provenance recording when an exception blocks one branch of a
1230+
QG but not another.
1231+
"""
11601232
# 'base.yaml' adds an instrument, 'Cam1', with four detectors and
11611233
# two physical filters.
1162-
with DirectButlerRepo.make_temporary("base.yaml") as (helper, _):
1163-
helper.add_task("a", dimensions=["detector"])
1164-
b_config = DynamicTestPipelineTaskConfig()
1165-
b_config.fail_exception = "lsst.pipe.base.NoWorkFound"
1166-
b_config.fail_condition = "detector=2"
1167-
helper.add_task("b", dimensions=["detector"], config=b_config)
1168-
helper.add_task("c", dimensions=["detector"])
1169-
qg = helper.make_quantum_graph()
1170-
helper.butler.collections.register(qg.header.output_run)
1171-
qg.init_output_run(helper.butler, existing=False)
1172-
provenance_type = lsst.daf.butler.DatasetType(
1173-
PROVENANCE_DATASET_TYPE_NAME,
1174-
helper.butler.dimensions.empty,
1175-
PROVENANCE_STORAGE_CLASS,
1176-
)
1177-
helper.butler.registry.registerDatasetType(provenance_type)
1178-
provenance_ref = lsst.daf.butler.DatasetRef(
1179-
provenance_type,
1180-
lsst.daf.butler.DataCoordinate.make_empty(helper.butler.dimensions),
1181-
run=qg.header.output_run,
1182-
)
1183-
executor = SeparablePipelineExecutor(
1184-
helper.butler.clone(collections=qg.header.inputs, run=qg.header.output_run)
1185-
)
1186-
executor.run_pipeline(qg, provenance_dataset_ref=provenance_ref)
1187-
provenance_graph = helper.butler.get(provenance_ref)
1188-
self.assertEqual(len(provenance_graph.quanta_by_task), 3)
1189-
self.assertEqual(len(provenance_graph.quanta_by_task["a"]), 4)
1190-
self.assertEqual(len(provenance_graph.quanta_by_task["b"]), 4)
1191-
self.assertEqual(len(provenance_graph.quanta_by_task["c"]), 4)
1192-
xgraph = provenance_graph.quantum_only_xgraph
1193-
for quantum_id in provenance_graph.quanta_by_task["a"].values():
1234+
b_config = DynamicTestPipelineTaskConfig()
1235+
b_config.fail_exception = "builtins.RuntimeError"
1236+
b_config.fail_condition = "detector=2"
1237+
provenance_graph = self.run_base_test(b_config, MPGraphExecutorError)
1238+
self.assertEqual(len(provenance_graph.quanta_by_task), 3)
1239+
self.assertEqual(len(provenance_graph.quanta_by_task["a"]), 4)
1240+
self.assertEqual(len(provenance_graph.quanta_by_task["b"]), 4)
1241+
self.assertEqual(len(provenance_graph.quanta_by_task["c"]), 4)
1242+
xgraph = provenance_graph.quantum_only_xgraph
1243+
for quantum_id in provenance_graph.quanta_by_task["a"].values():
1244+
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1245+
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1246+
for data_id, quantum_id in provenance_graph.quanta_by_task["b"].items():
1247+
if data_id["detector"] == 2:
1248+
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.FAILED)
1249+
else:
11941250
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
11951251
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1196-
for data_id, quantum_id in provenance_graph.quanta_by_task["b"].items():
1252+
for data_id, quantum_id in provenance_graph.quanta_by_task["c"].items():
1253+
if data_id["detector"] == 2:
1254+
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.BLOCKED)
1255+
else:
11971256
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1198-
if data_id["detector"] == 2:
1199-
self.assertTrue(xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.NO_WORK)
1200-
else:
1201-
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1202-
for data_id, quantum_id in provenance_graph.quanta_by_task["c"].items():
1203-
self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1204-
if data_id["detector"] == 2:
1205-
self.assertTrue(
1206-
xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED
1207-
)
1208-
else:
1209-
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1257+
self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
12101258

12111259

12121260
class MemoryTester(lsst.utils.tests.MemoryTestCase):

0 commit comments

Comments
 (0)