diff --git a/python/lsst/pipe/base/blocking_limited_butler.py b/python/lsst/pipe/base/blocking_limited_butler.py
new file mode 100644
index 000000000..3fe194645
--- /dev/null
+++ b/python/lsst/pipe/base/blocking_limited_butler.py
@@ -0,0 +1,173 @@
+# This file is part of pipe_base.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This software is dual licensed under the GNU General Public License and also
+# under a 3-clause BSD license. Recipients may choose which of these licenses
+# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
+# respectively. If you choose the GPL option then the following text applies
+# (but note that there is still no warranty even if you opt for BSD instead):
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from __future__ import annotations
+
+__all__ = ["BlockingLimitedButler"]
+
+import logging
+import time
+from collections.abc import Iterable, Mapping
+from typing import Any
+
+from lsst.daf.butler import (
+ ButlerMetrics,
+ DatasetProvenance,
+ DatasetRef,
+ DeferredDatasetHandle,
+ DimensionUniverse,
+ LimitedButler,
+ StorageClass,
+)
+
+_LOG = logging.getLogger(__name__)
+
+
+class BlockingLimitedButler(LimitedButler):
+ """A `LimitedButler` that blocks until certain dataset types exist.
+
+ Parameters
+ ----------
+ wrapped : `LimitedButler`
+ The butler to wrap.
+ timeouts : `~collections.abc.Mapping` [ `str`, `float` or `None` ]
+ Timeouts in seconds to wait for different dataset types. Dataset types
+ not included not blocked on (i.e. their timeout is ``0.0``).
+
+ Notes
+ -----
+ When a timeout is exceeded, `get` will raise `FileNotFoundError` (as usual
+ for a dataset that does not exist) and `stored_many` will mark the dataset
+ as non-existent. `getDeferred` does not block.
+ """
+
+ def __init__(
+ self,
+ wrapped: LimitedButler,
+ timeouts: Mapping[str, float | None],
+ ):
+ self._wrapped = wrapped
+ self._timeouts = timeouts
+
+ def close(self) -> None:
+ self._wrapped.close()
+
+ @property
+ def _metrics(self) -> ButlerMetrics:
+ # Need to always forward from the wrapped metrics object.
+ return self._wrapped._metrics
+
+ @_metrics.setter
+ def _metrics(self, metrics: ButlerMetrics) -> None:
+ # Allow record_metrics() context manager to override the wrapped
+ # butler.
+ self._wrapped._metrics = metrics
+
+ def get(
+ self,
+ ref: DatasetRef,
+ /,
+ *,
+ parameters: dict[str, Any] | None = None,
+ storageClass: StorageClass | str | None = None,
+ ) -> Any:
+ parent_dataset_type_name = ref.datasetType.nameAndComponent()[0]
+ timeout = self._timeouts.get(parent_dataset_type_name, 0.0)
+ start = time.time()
+ while True:
+ try:
+ return self._wrapped.get(ref, parameters=parameters, storageClass=storageClass)
+ except FileNotFoundError as err:
+ if timeout is not None:
+ elapsed = time.time() - start
+ if elapsed > timeout:
+ err.add_note(f"Timed out after {elapsed:03f}s.")
+ raise
+ _LOG.info(f"Dataset {ref.datasetType} not immediately available for {ref.id}, waiting {timeout}s")
+ time.sleep(0.5)
+
+ def getDeferred(
+ self,
+ ref: DatasetRef,
+ /,
+ *,
+ parameters: dict[str, Any] | None = None,
+ storageClass: str | StorageClass | None = None,
+ ) -> DeferredDatasetHandle:
+ # note that this does not use the block at all
+ return self._wrapped.getDeferred(ref, parameters=parameters, storageClass=storageClass)
+
+ def stored_many(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
+ start = time.time()
+ result = self._wrapped.stored_many(refs)
+ timeouts = {ref.id: self._timeouts.get(ref.datasetType.nameAndComponent()[0], 0.0) for ref in result}
+ while True:
+ elapsed = time.time() - start
+ remaining: list[DatasetRef] = []
+ for ref, exists in result.items():
+ timeout = timeouts[ref.id]
+ if not exists and (timeout is None or elapsed <= timeout):
+ _LOG.info(
+ f"Dataset {ref.datasetType} not immediately available for {ref.id}, "
+ f"waiting {timeout}s"
+ )
+ remaining.append(ref)
+ if not remaining:
+ return result
+ result.update(self._wrapped.stored_many(remaining))
+ time.sleep(0.5)
+
+ def isWriteable(self) -> bool:
+ return self._wrapped.isWriteable()
+
+ def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef:
+ return self._wrapped.put(obj, ref, provenance=provenance)
+
+ def pruneDatasets(
+ self,
+ refs: Iterable[DatasetRef],
+ *,
+ disassociate: bool = True,
+ unstore: bool = False,
+ tags: Iterable[str] = (),
+ purge: bool = False,
+ ) -> None:
+ return self._wrapped.pruneDatasets(
+ refs, disassociate=disassociate, unstore=unstore, tags=tags, purge=purge
+ )
+
+ @property
+ def dimensions(self) -> DimensionUniverse:
+ return self._wrapped.dimensions
+
+ @property
+ def _datastore(self) -> Any:
+ return self._wrapped._datastore
+
+ @_datastore.setter # demanded by MyPy since we declare it to be an instance attribute in LimitedButler.
+ def _datastore(self, value: Any) -> None:
+ self._wrapped._datastore = value
diff --git a/python/lsst/pipe/base/trivial_quantum_graph_builder.py b/python/lsst/pipe/base/trivial_quantum_graph_builder.py
new file mode 100644
index 000000000..12343abb6
--- /dev/null
+++ b/python/lsst/pipe/base/trivial_quantum_graph_builder.py
@@ -0,0 +1,183 @@
+# This file is part of pipe_base.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This software is dual licensed under the GNU General Public License and also
+# under a 3-clause BSD license. Recipients may choose which of these licenses
+# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
+# respectively. If you choose the GPL option then the following text applies
+# (but note that there is still no warranty even if you opt for BSD instead):
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from __future__ import annotations
+
+__all__ = "TrivialQuantumGraphBuilder"
+
+from collections.abc import Mapping, Sequence
+from typing import TYPE_CHECKING, Any, final
+
+from lsst.daf.butler import Butler, DataCoordinate, DatasetIdGenEnum, DatasetRef, DimensionGroup
+from lsst.utils.timer import timeMethod
+
+from .quantum_graph_builder import QuantumGraphBuilder
+from .quantum_graph_skeleton import QuantumGraphSkeleton
+
+if TYPE_CHECKING:
+ from .pipeline_graph import PipelineGraph
+
+
+@final
+class TrivialQuantumGraphBuilder(QuantumGraphBuilder):
+ """An optimized quantum-graph builder for pipelines that operate on only
+ a single data ID or a closely related set of data IDs.
+
+ Parameters
+ ----------
+ pipeline_graph
+ Pipeline to build a quantum graph from, as a graph. Will be resolved
+ in-place with the given butler (any existing resolution is ignored).
+ butler
+ Client for the data repository. Should be read-only.
+ data_ids
+ Mapping from dimension group to the data ID to use for that dimension
+ group. This is intended to allow the pipeline to switch between
+ effectively-equivalent dimensions (e.g. ``group``, ``visit``
+ ``exposure``).
+ input_refs
+ References for input datasets, keyed by task label and then connection
+ name. This should include all regular overall-input datasets whose
+ data IDs are not included in ``data_ids``. It may (but need not)
+ include prerequisite inputs. Existing intermediate datasets should
+ also be provided when they need to be clobbered or used in skip logic.
+ dataset_id_modes
+ Mapping from dataset type name to the ID generation mode for that
+ dataset type. They default is to generate random UUIDs.
+ **kwargs
+ Forwarded to the base `.quantum_graph_builder.QuantumGraphBuilder`.
+
+ Notes
+ -----
+ If ``dataset_id_modes`` is provided, ``clobber=True`` will be passed to
+ the base builder's constructor, as is this is necessary to avoid spurious
+ errors about the affected datasets already existing. The only effect of
+ this to silence *other* errors about datasets in the output run existing
+ unexpectedly.
+ """
+
+ def __init__(
+ self,
+ pipeline_graph: PipelineGraph,
+ butler: Butler,
+ *,
+ data_ids: Mapping[DimensionGroup, DataCoordinate],
+ input_refs: Mapping[str, Mapping[str, Sequence[DatasetRef]]] | None = None,
+ dataset_id_modes: Mapping[str, DatasetIdGenEnum] | None = None,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(pipeline_graph, butler, **kwargs)
+ if dataset_id_modes:
+ self.clobber = True
+ self.data_ids = dict(data_ids)
+ self.data_ids[self.empty_data_id.dimensions] = self.empty_data_id
+ self.input_refs = input_refs or {}
+ self.dataset_id_modes = dataset_id_modes or {}
+
+ def _get_data_id(self, dimensions: DimensionGroup, context: str) -> DataCoordinate:
+ try:
+ return self.data_ids[dimensions]
+ except KeyError as e:
+ e.add_note(context)
+ raise
+
+ @timeMethod
+ def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton:
+ skeleton = QuantumGraphSkeleton(subgraph.tasks)
+ for task_node in subgraph.tasks.values():
+ quantum_key = skeleton.add_quantum_node(
+ task_node.label, self._get_data_id(task_node.dimensions, context=f"task {task_node.label!r}")
+ )
+ input_refs_for_task = self.input_refs.get(task_node.label, {})
+
+ for read_edge in task_node.iter_all_inputs():
+ if (input_refs := input_refs_for_task.get(read_edge.connection_name)) is not None:
+ for input_ref in input_refs:
+ if read_edge.is_prerequisite:
+ prereq_key = skeleton.add_prerequisite_node(input_ref)
+ skeleton.add_input_edge(quantum_key, prereq_key)
+ self.log.info(
+ f"Added prereq {task_node.label}.{read_edge.connection_name} "
+ f"for {input_ref.dataId} from input_refs"
+ )
+ else:
+ input_key = skeleton.add_dataset_node(
+ read_edge.parent_dataset_type_name,
+ input_ref.dataId,
+ ref=input_ref,
+ )
+ skeleton.add_input_edge(quantum_key, input_key)
+ self.log.info(
+ f"Added regular input {task_node.label}.{read_edge.connection_name} "
+ f"for {input_ref.dataId} from input_refs"
+ )
+
+ if read_edge.is_prerequisite:
+ continue
+ dataset_type_node = subgraph.dataset_types[read_edge.parent_dataset_type_name]
+ data_id = self._get_data_id(
+ dataset_type_node.dimensions,
+ context=f"input {task_node.label}.{read_edge.connection_name}",
+ )
+ input_key = skeleton.add_dataset_node(
+ read_edge.parent_dataset_type_name,
+ data_id,
+ )
+ skeleton.add_input_edge(quantum_key, input_key)
+ if subgraph.producer_of(read_edge.parent_dataset_type_name) is None:
+ if skeleton.get_dataset_ref(input_key) is None:
+ ref = self.butler.find_dataset(dataset_type_node.dataset_type, data_id)
+ if ref is not None:
+ skeleton.set_dataset_ref(ref)
+ self.log.info(
+ f"Added regular input {task_node.label}.{read_edge.connection_name} for {data_id}"
+ )
+
+ for write_edge in task_node.iter_all_outputs():
+ dataset_type_node = subgraph.dataset_types[write_edge.parent_dataset_type_name]
+ data_id = self._get_data_id(
+ dataset_type_node.dimensions,
+ context=f"output {task_node.label}.{write_edge.connection_name}",
+ )
+ output_key = skeleton.add_dataset_node(write_edge.parent_dataset_type_name, data_id)
+ skeleton.add_output_edge(quantum_key, output_key)
+ self.log.info(f"Added output {task_node.label}.{write_edge.connection_name} for {data_id}")
+ if mode := self.dataset_id_modes.get(write_edge.parent_dataset_type_name):
+ ref = DatasetRef(
+ dataset_type_node.dataset_type,
+ data_id,
+ run=self.output_run,
+ id_generation_mode=mode,
+ )
+ skeleton.set_dataset_ref(ref)
+ skeleton.set_output_in_the_way(ref)
+ self.log.info(
+ f"Added ref for output {task_node.label}.{write_edge.connection_name} for "
+ f"{data_id} with {mode=}"
+ )
+
+ return skeleton
diff --git a/tests/test_blocking_limited_butler.py b/tests/test_blocking_limited_butler.py
new file mode 100644
index 000000000..21dd58602
--- /dev/null
+++ b/tests/test_blocking_limited_butler.py
@@ -0,0 +1,101 @@
+# This file is part of pipe_base.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This software is dual licensed under the GNU General Public License and also
+# under a 3-clause BSD license. Recipients may choose which of these licenses
+# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
+# respectively. If you choose the GPL option then the following text applies
+# (but note that there is still no warranty even if you opt for BSD instead):
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Tests for execution butler."""
+
+import logging
+import os
+import unittest
+
+import lsst.utils.tests
+from lsst.daf.butler import DataCoordinate, DatasetRef
+from lsst.pipe.base.blocking_limited_butler import _LOG, BlockingLimitedButler
+from lsst.pipe.base.tests.mocks import InMemoryRepo
+
+TESTDIR = os.path.abspath(os.path.dirname(__file__))
+
+
+class BlockingLimitedButlerTestCase(unittest.TestCase):
+ """Unit tests for BlockingLimitedButler"""
+
+ def test_no_block_nonexistent(self):
+ """Test checking/getting with no dataset and blocking disabled."""
+ helper = InMemoryRepo("base.yaml")
+ helper.add_task()
+ helper.pipeline_graph.resolve(helper.butler.registry)
+ ref = DatasetRef(
+ helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type,
+ DataCoordinate.make_empty(helper.butler.dimensions),
+ run="input_run",
+ )
+ helper.pipeline_graph.register_dataset_types(helper.butler)
+ in_memory_butler = helper.make_limited_butler()
+ blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={})
+ with self.assertNoLogs(_LOG, level=logging.INFO):
+ self.assertFalse(blocking_butler.stored_many([ref])[ref])
+ with self.assertRaises(FileNotFoundError):
+ blocking_butler.get(ref)
+
+ def test_timeout_nonexistent(self):
+ """Test checking/getting with no dataset, leading to a timeout."""
+ helper = InMemoryRepo("base.yaml")
+ helper.add_task()
+ helper.pipeline_graph.resolve(helper.butler.registry)
+ ref = DatasetRef(
+ helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type,
+ DataCoordinate.make_empty(helper.butler.dimensions),
+ run="input_run",
+ )
+ helper.pipeline_graph.register_dataset_types(helper.butler)
+ in_memory_butler = helper.make_limited_butler()
+ blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={"dataset_auto0": 0.1})
+ with self.assertLogs(_LOG, level=logging.INFO) as cm:
+ self.assertFalse(blocking_butler.stored_many([ref])[ref])
+ self.assertIn("not immediately available", cm.output[0])
+ with self.assertLogs(_LOG, level=logging.INFO) as cm:
+ with self.assertRaises(FileNotFoundError):
+ blocking_butler.get(ref)
+ self.assertIn("not immediately available", cm.output[0])
+
+ def test_no_waiting_if_exists(self):
+ """Test checking/getting with dataset present immediately, so no
+ waiting should be necessary.
+ """
+ helper = InMemoryRepo("base.yaml")
+ helper.add_task()
+ (ref,) = helper.insert_datasets("dataset_auto0")
+ helper.pipeline_graph.register_dataset_types(helper.butler)
+ in_memory_butler = helper.make_limited_butler()
+ blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={})
+ with self.assertNoLogs(_LOG, level=logging.INFO):
+ self.assertTrue(blocking_butler.stored_many([ref])[ref])
+ self.assertIsNotNone(blocking_butler.get(ref))
+
+
+if __name__ == "__main__":
+ lsst.utils.tests.init()
+ unittest.main()
diff --git a/tests/test_trivial_qg_builder.py b/tests/test_trivial_qg_builder.py
new file mode 100644
index 000000000..39e229327
--- /dev/null
+++ b/tests/test_trivial_qg_builder.py
@@ -0,0 +1,171 @@
+# This file is part of pipe_base.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This software is dual licensed under the GNU General Public License and also
+# under a 3-clause BSD license. Recipients may choose which of these licenses
+# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
+# respectively. If you choose the GPL option then the following text applies
+# (but note that there is still no warranty even if you opt for BSD instead):
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from __future__ import annotations
+
+import unittest
+
+from lsst.daf.butler import DatasetIdGenEnum, DatasetRef
+from lsst.pipe.base.tests.mocks import DynamicConnectionConfig, InMemoryRepo
+from lsst.pipe.base.trivial_quantum_graph_builder import TrivialQuantumGraphBuilder
+
+
+class TrivialQuantumGraphBuilderTestCase(unittest.TestCase):
+ """Tests for the TrivialQuantumGraphBuilder class."""
+
+ def test_trivial_qg_builder(self) -> None:
+ # Make a test helper with a mock task appropriate for the QG builder:
+ # - the QG will have no branching
+ # - while the task have different dimensions, they can be 1-1 related
+ # (for the purposes of this test, at least).
+ helper = InMemoryRepo("base.yaml")
+ helper.add_task(
+ "a",
+ dimensions=["band", "detector"],
+ prerequisite_inputs={
+ "prereq_connection": DynamicConnectionConfig(
+ dataset_type_name="dataset_prereq0", dimensions=["detector"]
+ )
+ },
+ )
+ helper.add_task(
+ "b",
+ dimensions=["physical_filter", "detector"],
+ inputs={
+ "input_connection": DynamicConnectionConfig(
+ dataset_type_name="dataset_auto1", dimensions=["band", "detector"]
+ ),
+ "extra_input_connection": DynamicConnectionConfig(
+ dataset_type_name="dataset_extra1", dimensions=["physical_filter", "detector"]
+ ),
+ },
+ )
+ # Use the helper to make a quantum graph using the general-purpose
+ # builder. This will cover all data IDs in the test dataset, which
+ # includes 4 detectors, 3 physical_filters, and 2 bands.
+ # This also has useful side-effects: it inserts the input datasets
+ # and registers all dataset types.
+ general_qg = helper.make_quantum_graph()
+ # Make the trivial QG builder we want to test giving it only one
+ # detector and one band (is the one that corresponds to only one
+ # physical_filter).
+ (a_data_id,) = [
+ data_id
+ for data_id in general_qg.quanta_by_task["a"]
+ if data_id["detector"] == 1 and data_id["band"] == "g"
+ ]
+ (b_data_id,) = [
+ data_id
+ for data_id in general_qg.quanta_by_task["b"]
+ if data_id["detector"] == 1 and data_id["band"] == "g"
+ ]
+ prereq_data_id = a_data_id.subset(["detector"])
+ dataset_auto0_ref = helper.butler.get_dataset(general_qg.datasets_by_type["dataset_auto0"][a_data_id])
+ assert dataset_auto0_ref is not None, "Input dataset should have been inserted above."
+ dataset_prereq0_ref = helper.butler.get_dataset(
+ general_qg.datasets_by_type["dataset_prereq0"][prereq_data_id]
+ )
+ assert dataset_prereq0_ref is not None, "Input dataset should have been inserted above."
+ trivial_builder = TrivialQuantumGraphBuilder(
+ helper.pipeline_graph,
+ helper.butler,
+ data_ids={a_data_id.dimensions: a_data_id, b_data_id.dimensions: b_data_id},
+ input_refs={
+ "a": {"input_connection": [dataset_auto0_ref], "prereq_connection": [dataset_prereq0_ref]}
+ },
+ dataset_id_modes={"dataset_auto2": DatasetIdGenEnum.DATAID_TYPE_RUN},
+ output_run="trivial_output_run",
+ input_collections=general_qg.header.inputs,
+ )
+ trivial_qg = trivial_builder.finish(attach_datastore_records=False).assemble()
+ self.assertEqual(len(trivial_qg.quanta_by_task), 2)
+ self.assertEqual(trivial_qg.quanta_by_task["a"].keys(), {a_data_id})
+ self.assertEqual(trivial_qg.quanta_by_task["b"].keys(), {b_data_id})
+ self.assertEqual(trivial_qg.datasets_by_type["dataset_prereq0"].keys(), {prereq_data_id})
+ self.assertEqual(
+ trivial_qg.datasets_by_type["dataset_prereq0"][prereq_data_id],
+ general_qg.datasets_by_type["dataset_prereq0"][prereq_data_id],
+ )
+ self.assertEqual(trivial_qg.datasets_by_type["dataset_auto0"].keys(), {a_data_id})
+ self.assertEqual(
+ trivial_qg.datasets_by_type["dataset_auto0"][a_data_id],
+ general_qg.datasets_by_type["dataset_auto0"][a_data_id],
+ )
+ self.assertEqual(trivial_qg.datasets_by_type["dataset_extra1"].keys(), {b_data_id})
+ self.assertEqual(
+ trivial_qg.datasets_by_type["dataset_extra1"][b_data_id],
+ general_qg.datasets_by_type["dataset_extra1"][b_data_id],
+ )
+ self.assertEqual(trivial_qg.datasets_by_type["dataset_auto1"].keys(), {a_data_id})
+ self.assertNotEqual(
+ trivial_qg.datasets_by_type["dataset_auto1"][a_data_id],
+ general_qg.datasets_by_type["dataset_auto1"][a_data_id],
+ )
+ self.assertEqual(trivial_qg.datasets_by_type["dataset_auto2"].keys(), {b_data_id})
+ self.assertNotEqual(
+ trivial_qg.datasets_by_type["dataset_auto2"][b_data_id],
+ general_qg.datasets_by_type["dataset_auto2"][b_data_id],
+ )
+ self.assertEqual(
+ trivial_qg.datasets_by_type["dataset_auto2"][b_data_id],
+ DatasetRef(
+ helper.pipeline_graph.dataset_types["dataset_auto2"].dataset_type,
+ b_data_id,
+ run="trivial_output_run",
+ id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN,
+ ).id,
+ )
+ qo_xg = trivial_qg.quantum_only_xgraph
+ self.assertEqual(len(qo_xg.nodes), 2)
+ self.assertEqual(len(qo_xg.edges), 1)
+ bp_xg = trivial_qg.bipartite_xgraph
+ self.assertEqual(
+ set(bp_xg.predecessors(trivial_qg.quanta_by_task["a"][a_data_id])),
+ set(trivial_qg.datasets_by_type["dataset_auto0"].values())
+ | set(trivial_qg.datasets_by_type["dataset_prereq0"].values()),
+ )
+ self.assertEqual(
+ set(bp_xg.successors(trivial_qg.quanta_by_task["a"][a_data_id])),
+ set(trivial_qg.datasets_by_type["dataset_auto1"].values())
+ | set(trivial_qg.datasets_by_type["a_metadata"].values())
+ | set(trivial_qg.datasets_by_type["a_log"].values()),
+ )
+ self.assertEqual(
+ set(bp_xg.predecessors(trivial_qg.quanta_by_task["b"][b_data_id])),
+ set(trivial_qg.datasets_by_type["dataset_auto1"].values())
+ | set(trivial_qg.datasets_by_type["dataset_extra1"].values()),
+ )
+ self.assertEqual(
+ set(bp_xg.successors(trivial_qg.quanta_by_task["b"][b_data_id])),
+ set(trivial_qg.datasets_by_type["dataset_auto2"].values())
+ | set(trivial_qg.datasets_by_type["b_metadata"].values())
+ | set(trivial_qg.datasets_by_type["b_log"].values()),
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()