Skip to content

Commit 45e7e24

Browse files
committed
WIP: first draft of support for writing expanded pipelines.
This looks plausible, but it hasn't been tested, and can't really be until support for *reading* expanded pipelines has been added.
1 parent 5269085 commit 45e7e24

2 files changed

Lines changed: 282 additions & 16 deletions

File tree

python/lsst/pipe/base/pipeline.py

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
# -------------------------------
3131
from dataclasses import dataclass
3232
import logging
33+
import tempfile
34+
import shutil
3335
from types import MappingProxyType
3436
from typing import (ClassVar, Dict, Iterable, Iterator, Mapping, Set, Union,
3537
Generator, TYPE_CHECKING, Optional, Tuple)
@@ -59,6 +61,16 @@
5961

6062
_LOG = logging.getLogger(__name__)
6163

64+
# Archive formats allowed for expanded pipelines (in addition to directories).
65+
# We use shutil.make_archive, which automatically picks these extensions for
66+
# the given format, so it's not easy to support alternatives (e.g. tgz), and
67+
# we also need to limit this to special extensions recognized by ButlerURI.
68+
_ARCHIVE_FORMATS = {
69+
".tar": "tar",
70+
".tar.gz": "gztar",
71+
".tar.bz2": "bztar",
72+
}
73+
6274
# ------------------------
6375
# Exported definitions --
6476
# ------------------------
@@ -561,8 +573,40 @@ def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None:
561573
def toFile(self, filename: str) -> None:
562574
self._pipelineIR.to_file(filename)
563575

564-
def write_to_uri(self, uri: Union[str, ButlerURI]) -> None:
565-
self._pipelineIR.write_to_uri(uri)
576+
def write_to_uri(
577+
self,
578+
uri: Union[str, ButlerURI],
579+
expand: bool = False,
580+
task_defs: Optional[Iterable[TaskDef]] = None,
581+
) -> None:
582+
"""Write the pipeline to a file or directory.
583+
584+
Parameters
585+
----------
586+
uri : `str` or `ButlerURI`
587+
URI to write to; may have any scheme with `ButlerURI` write
588+
or no scheme for a local file/directory. Should have a ``.yaml``
589+
extension if ``expand=False`` and a trailing slash (indicating
590+
a directory-like URI) or (compressed) tar extension (``.tar``,
591+
``.tar.gz``, ``.tar.bz2``) if ``expand=True``.
592+
expand : `bool`, optional
593+
If `False`, write the pipeline to a single YAML file with
594+
references to configuration files and other config overrides
595+
unexpanded and unapplied, with tasks and subsets only minimally
596+
validated (and not imported). If `True`, import all tasks, apply
597+
all configuration overrides (including those supplied by an
598+
instrument), resolve parameters, sort all sections
599+
deterministically, and write the pipeline to a directory or ``tar``
600+
archive with separate config files for each task as well as a
601+
single ``pipeline.yaml`` file.
602+
task_defs : `Iterable` [ `TaskDef` ], optional
603+
Output of `toExpandedPipeline`; may be passed to avoid a second
604+
call to that method internally.
605+
"""
606+
if expand:
607+
self._write_expanded(uri, task_defs=task_defs)
608+
else:
609+
self._pipelineIR.write_to_uri(uri)
566610

567611
def toExpandedPipeline(self) -> Generator[TaskDef, None, None]:
568612
"""Returns a generator of TaskDefs which can be used to create quantum
@@ -654,6 +698,97 @@ def __eq__(self, other: object):
654698
return False
655699
return self._pipelineIR == other._pipelineIR
656700

701+
def _write_expanded(
702+
self,
703+
uri: Union[str, ButlerURI],
704+
task_defs: Optional[Iterable[TaskDef]] = None,
705+
) -> None:
706+
"""Internal implementation of `write_to_uri` with ``expand=True``.
707+
708+
Parameters
709+
----------
710+
uri : `str` or `ButlerURI`
711+
URI to write to; may have any scheme with `ButlerURI` write or no
712+
scheme for a local file/directory. Should have a trailing slash
713+
(indicating a directory-like URI) or (compressed) tar extension
714+
(``.tar``, ``.tar.gz``, ``.tar.bz2``).
715+
task_defs : `Iterable` [ `TaskDef` ], optional
716+
Output of `toExpandedPipeline`; may be passed to avoid a second
717+
call to that method internally.
718+
"""
719+
uri = ButlerURI(uri)
720+
ext = uri.getExtension()
721+
if (format := _ARCHIVE_FORMATS.get(ext)) is not None:
722+
# User wants a tar'd archive; write to a temporary local directory,
723+
# and then use shutil.make_archive to write to the target URI (via
724+
# ButlerURI.as_local, which may involve another temporary). By
725+
# using the lower-level `tarfile` interface, we could probably
726+
# stream when making the archive instead, but this is much simpler,
727+
# possibly faster (not sure how well compression will interact with
728+
# streaming), and right now we expect expansion to mostly happen
729+
# locally anyway.
730+
try:
731+
dir_temp_uri = ButlerURI(tempfile.mkdtemp(), forcedDirectory=True, isTemporary=True)
732+
self._write_expanded_dir(dir_temp_uri, task_defs=task_defs)
733+
with uri.as_local() as local_uri:
734+
shutil.make_archive(
735+
local_uri.updatedExtension(None).ospath,
736+
format=format,
737+
root_dir=dir_temp_uri.ospath,
738+
base_dir=dir_temp_uri.ospath,
739+
logger=_LOG,
740+
)
741+
finally:
742+
shutil.rmtree(dir_temp_uri.ospath, ignore_errors=True)
743+
elif ext:
744+
raise ValueError(
745+
f"Cannot write pipeline to URI '{uri}'; because extension '{ext}' is not supported; "
746+
f"supported extensions are {tuple(_ARCHIVE_FORMATS.keys())}."
747+
)
748+
elif not uri.dirLike:
749+
raise ValueError(
750+
f"Cannot write pipeline to URI '{uri}'; because it is not a directory-like URI; "
751+
f"please either add a supported extension {tuple(_ARCHIVE_FORMATS.keys())} "
752+
"for an archive or add a trailing slash for a directory."
753+
)
754+
else:
755+
self._write_expanded_dir(uri, task_defs=task_defs)
756+
757+
def _write_expanded_dir(self, uri: ButlerURI, task_defs: Optional[Iterable[TaskDef]] = None) -> None:
758+
"""Internal implementation of `write_to_uri` with ``expand=True`` and
759+
a directory-like URI.
760+
761+
Parameters
762+
----------
763+
uri : `str` or `ButlerURI`
764+
URI to write to; may have any scheme with `ButlerURI` write or no
765+
scheme for a local file/directory. Should have a trailing slash
766+
(indicating a directory-like URI).
767+
task_defs : `Iterable` [ `TaskDef` ], optional
768+
Output of `toExpandedPipeline`; may be passed to avoid a second
769+
call to that method internally.
770+
"""
771+
assert uri.dirLike, f"{uri} is not a directory-like URI."
772+
# Expand the pipeline. This applies all config overrides, applies all
773+
# parameters, checks contracts, and sorts tasks topologically with
774+
# lexicographical (on label) tiebreaking.
775+
if task_defs is not None:
776+
task_defs = list(task_defs)
777+
else:
778+
task_defs = list(self.toExpandedPipeline())
779+
uri.mkdir()
780+
config_dir_uri = uri.join("config/")
781+
config_dir_uri.mkdir()
782+
expanded_tasks: Dict[str, pipelineIR.TaskIR] = {}
783+
for task_def in task_defs:
784+
task_ir = pipelineIR.TaskIR(label=task_def.label, klass=task_def.taskName)
785+
config_uri = config_dir_uri.join(f"{task_def.label}.py")
786+
with config_uri.open("w") as buffer:
787+
task_def.config.saveToStream(buffer)
788+
task_ir.config.append(pipelineIR.ConfigIR(file=[f"config/{task_def.label}.py"]))
789+
expanded_tasks[task_def.label] = task_ir
790+
self._pipelineIR.write_to_uri(uri.join("pipeline.yaml"), expanded_tasks=expanded_tasks)
791+
657792

658793
@dataclass(frozen=True)
659794
class TaskDatasetTypes:

python/lsst/pipe/base/pipelineIR.py

Lines changed: 145 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from collections.abc import Iterable as abcIterable
3636
from dataclasses import dataclass, field
3737
from deprecated.sphinx import deprecated
38-
from typing import Any, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type
38+
from typing import Any, Mapping, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type
3939

4040
import copy
4141
import re
@@ -192,14 +192,77 @@ def from_primitives(label: str, value: Union[List[str], dict]) -> LabeledSubset:
192192
"associated with a string")
193193
return LabeledSubset(label, set(subset), description)
194194

195-
def to_primitives(self) -> Dict[str, Union[List[str], str]]:
195+
def to_primitives(self, sorter: Optional[Mapping[str, Any]] = None) -> Dict[str, Union[List[str], str]]:
196196
"""Convert to a representation used in yaml serialization
197+
198+
Parameters
199+
----------
200+
sorter : `Mapping` [ `str`, `object` ]
201+
Mapping from task or subset label to a comparable object that
202+
should be used to sort those labels.
197203
"""
198-
accumulate: Dict[str, Union[List[str], str]] = {"subset": list(self.subset)}
204+
contents = list(self.subset)
205+
if sorter is not None:
206+
contents.sort(key=lambda label: sorter[label])
207+
accumulate: Dict[str, Union[List[str], str]] = {"subset": contents}
199208
if self.description is not None:
200209
accumulate["description"] = self.description
201210
return accumulate
202211

212+
@staticmethod
213+
def expand_nested(original_subsets: Mapping[str, LabeledSubset]) -> Dict[str, LabeledSubset]:
214+
"""Recursively expand subsets that contain the labels of other subsets.
215+
216+
Parameters
217+
----------
218+
original_subsets : `Mapping` [ `str`, `LabeledSubset` ]
219+
Mapping from string label to a labeled subset definition; keys must
220+
match the `label` attribute of the corresponding value.
221+
222+
Returns
223+
-------
224+
new_subsets : `Dict` [ `str`, `LabeledSubset` ]
225+
Mapping of new labeled subset definitions. Guaranteed to contain
226+
new instances (even when an old instance did not contain any subset
227+
labels) with only task labels.
228+
229+
Raises
230+
------
231+
RuntimeError
232+
Raised if a reference cycle is detected.
233+
"""
234+
done: Dict[str, LabeledSubset] = {}
235+
in_progress: Set[str] = set()
236+
237+
def expand_one_subset(subset_label: str) -> Set[str]:
238+
"""Expand the subset with the given label if it has not already
239+
been done, returning its recursively-expanded (i.e. task label
240+
only) contents.
241+
242+
This updates `done` in-place, and uses `in_progress` to detect
243+
cycles.
244+
"""
245+
if (already_expanded := done.get(subset_label)) is not None:
246+
return already_expanded.subset
247+
if subset_label in in_progress:
248+
raise RuntimeError(f"Cycle detected in subset definitions involving {subset_label}.")
249+
in_progress.add(subset_label)
250+
original = original_subsets[subset_label]
251+
new = LabeledSubset(
252+
label=subset_label,
253+
subset=set(original.subset - original_subsets.keys()),
254+
description=original.description,
255+
)
256+
for nested_subset_label in original.subset & original_subsets.keys():
257+
new.subset.update(expand_one_subset(original_subsets[nested_subset_label]))
258+
in_progress.remove(subset_label)
259+
done[subset_label] = new
260+
return new.subset
261+
262+
for k in original_subsets.keys():
263+
expand_one_subset(k)
264+
return done
265+
203266

204267
@dataclass
205268
class ParametersIR:
@@ -918,31 +981,99 @@ def to_file(self, filename: str):
918981
"""
919982
self.write_to_uri(filename)
920983

921-
def write_to_uri(self, uri: Union[ButlerURI, str]):
984+
def write_to_uri(
985+
self,
986+
uri: Union[ButlerURI, str],
987+
*,
988+
expanded_tasks: Optional[Mapping[str, TaskIR]] = None,
989+
):
922990
"""Serialize this `PipelineIR` object into a yaml formatted string and
923991
write the output to a file at the specified uri.
924992
925993
Parameters
926994
----------
927995
uri: `str` or `ButlerURI`
928996
Location of document to write a `PipelineIR` object.
997+
expanded_tasks : `Mapping` [ `str`, `TaskIR` ], optional
998+
Mapping containing replacement `TaskIR` objects that capture the
999+
fully-expanded configuration rather than a set of overrides, in
1000+
deterministic order. When this is not `None`, the ``instrument``
1001+
and ``parameters`` sections are not written and all other sections
1002+
are sorted (using the order of the given tasks) to maximize the
1003+
extent to which equivalent pipelines will be written identically.
9291004
"""
9301005
with ButlerURI(uri).open("w") as buffer:
931-
yaml.dump(self.to_primitives(), buffer, sort_keys=False)
1006+
yaml.dump(self.to_primitives(expanded_tasks=expanded_tasks), buffer, sort_keys=False)
9321007

933-
def to_primitives(self) -> Dict[str, Any]:
1008+
def to_primitives(self, expanded_tasks: Optional[Mapping[str, TaskIR]] = None) -> Dict[str, Any]:
9341009
"""Convert to a representation used in yaml serialization
1010+
1011+
Parameters
1012+
----------
1013+
expanded_tasks : `Mapping` [ `str`, `TaskIR` ], optional
1014+
Mapping containing replacement `TaskIR` objects that capture the
1015+
fully-expanded configuration rather than a set of overrides, in
1016+
deterministic order. When this is not `None`, the ``instrument``
1017+
and ``parameters`` sections are not written and all other sections
1018+
are sorted (using the order of the given tasks) to maximize the
1019+
extent to which equivalent pipelines will be written identically.
1020+
1021+
Returns
1022+
-------
1023+
primitives : `dict`
1024+
Dictionary that maps directly to the serialized YAML form.
9351025
"""
9361026
accumulate = {"description": self.description}
937-
if self.instrument is not None:
938-
accumulate['instrument'] = self.instrument
939-
if self.parameters:
940-
accumulate['parameters'] = self.parameters.to_primitives()
941-
accumulate['tasks'] = {m: t.to_primitives() for m, t in self.tasks.items()}
1027+
sorter: Optional[Dict[str, int]] = None
1028+
if expanded_tasks is None:
1029+
tasks = self.tasks
1030+
labeled_subsets = self.labeled_subsets
1031+
contracts = self.contracts
1032+
# Instrument and parameters are only included in non-expanded form,
1033+
# because they'll have already been applied to configs in expanded
1034+
# form. It might be nice to include the instrument to check that
1035+
# the expanded pipeline is only used on data from that instrument,
1036+
# but we can't risk having the instrument's config overrides
1037+
# applied (again) on top of other configs that should supersede
1038+
# them.
1039+
if self.instrument is not None:
1040+
accumulate['instrument'] = self.instrument
1041+
if self.parameters:
1042+
accumulate['parameters'] = self.parameters.to_primitives()
1043+
else:
1044+
tasks = expanded_tasks
1045+
# Make a dict that maps task labels to their position in the
1046+
# (presumably ordered) mapping we were given.
1047+
sorter = {label: n for n, label in enumerate(expanded_tasks.keys())}
1048+
# Expand out subsets that reference other subsets, so that they
1049+
# all only reference task labels directly.
1050+
subset_labels = LabeledSubset.expand_nested(self.labeled_subsets)
1051+
# Sort the labeled subsets themselves by the position of their
1052+
# first task in the overall pipeline, followed by their own label
1053+
# to break ties. Note that we sort the tasks within them only
1054+
# later when we convert them to primitives, because at this point
1055+
# they hold `set` objects, not lists.
1056+
subset_labels = {
1057+
subset.label: subset
1058+
for subset in sorted(
1059+
subset_labels.values(),
1060+
key=lambda s: (min(sorter[t] for t in s.subset), s.label)
1061+
)
1062+
}
1063+
# Sort contracts by the string expression itself, just for as much
1064+
# determinism as we can manage; can't help it if someone rewrites
1065+
# an expression to something equivalent that changes the sort
1066+
# order.
1067+
contracts = list(self.contracts)
1068+
contracts.sort(key=lambda c: c.contract)
1069+
1070+
# Get primitives for sections common to expanded and non-expanded
1071+
# forms.
1072+
accumulate['tasks'] = {m: t.to_primitives() for m, t in tasks.items()}
9421073
if len(self.contracts) > 0:
943-
accumulate['contracts'] = [c.to_primitives() for c in self.contracts]
944-
if self.labeled_subsets:
945-
accumulate['subsets'] = {k: v.to_primitives() for k, v in self.labeled_subsets.items()}
1074+
accumulate['contracts'] = [c.to_primitives() for c in contracts]
1075+
if labeled_subsets:
1076+
accumulate['subsets'] = {k: v.to_primitives(sorter) for k, v in labeled_subsets.items()}
9461077
return accumulate
9471078

9481079
def __str__(self) -> str:

0 commit comments

Comments
 (0)