-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathrun.py
More file actions
351 lines (285 loc) · 15.3 KB
/
run.py
File metadata and controls
351 lines (285 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""
Runs a pathogen workflow in a Nextstrain runtime with config and input from an
analysis directory and outputs written to that same directory.
This command focuses on the routine running of existing pathogen workflows
(mainly provided by Nextstrain) using your own configuration, data, and other
supported customizations. Pathogens are initially set up using `nextstrain
setup` and can be updated over time as desired using `nextstrain update`.
Multiple versions of a pathogen may be set up and run independently without
conflict, allowing for comparisons of output across versions. The same
pathogen workflow may also be concurrently run multiple times with separate
analysis directories (i.e. different configs, input data, etc.) without
conflict, allowing for independent outputs and analyses.
Compared to `nextstrain build`, this command is a higher-level interface to
running pathogen workflows that does not require knowledge of Git or management
of pathogen repositories and source code. For now, the `nextstrain build`
command remains more suitable for active authorship and development of
workflows.
All Nextstrain runtimes are supported. For AWS Batch, all runs will detach
after submission and `nextstrain build` must be used to further monitor or
manage the run and download results after completion.
"""
import os.path
from inspect import cleandoc
from shlex import quote as shquote
from textwrap import dedent
from .. import runner
from ..argparse import add_extended_help_flags, MkDirectoryPath, SKIP_AUTO_DEFAULT_IN_HELP
from ..debug import DEBUGGING, debug
from ..errors import UserError
from ..pathogens import PathogenVersion, UnmanagedPathogen
from ..runner import aws_batch, docker, singularity
from ..util import byte_quantity, split_image_name, warn
from ..volume import NamedVolume
from . import build
def register_parser(subparser):
"""
%(prog)s [options] <pathogen-name>[@<version>]|<pathogen-path> <workflow-name> <analysis-directory> [<target> [<target> [...]]]
%(prog)s --help
"""
parser = subparser.add_parser("run", help = "Run pathogen workflow", add_help = False)
# Positional parameters
parser.add_argument(
"pathogen",
metavar = "<pathogen-name>[@<version>]|<pathogen-path>",
help = cleandoc(f"""
The name (and optionally, version) of a previously set up pathogen.
See :command-reference:`nextstrain setup`. If no version is
specified, then the default version (if any) will be used.
Alternatively, the local path to a directory that is a pathogen
repository. For this case to be recognized as such, the path must
contain a separator ({{path_sep}}) or consist entirely of the current
directory ({os.path.curdir}) or parent directory ({os.path.pardir}) specifier.
Required.
""".format(path_sep = " or ".join(sorted(set([os.path.sep, os.path.altsep or os.path.sep]))))))
parser.add_argument(
"workflow",
metavar = "<workflow-name>",
help = cleandoc(f"""
The name of a workflow for the given pathogen, e.g. typically
``ingest``, ``phylogenetic``, or ``nextclade``.
Available workflows may vary per pathogen (and possibly between
pathogen version). Some pathogens may provide multiple variants or
base configurations of a top-level workflow, e.g. as in
``phylogenetic/mpxv`` and ``phylogenetic/hmpxv1``.
Run ``nextstrain version --pathogens`` to see a list of registered
workflows per pathogen version. If the pathogen does not have
registered workflows, then refer to the pathogen's own documentation
for valid workflow names.
Workflow names conventionally correspond directly to directory
paths in the pathogen source, but this may not always be the case.
Required.
"""))
parser.add_argument(
"analysis_directory",
metavar = "<analysis-directory>",
type = MkDirectoryPath(),
help = cleandoc("""
The path to your analysis directory. The workflow uses this as its
working directory for all local inputs and outputs, including
config files, input data files, resulting output data files, log
files, etc.
We recommend keeping your config files and static input files (e.g.
reference sequences, inclusion/exclusion lists, annotations, etc.)
in a version control system, such as Git, so you can keep track of
changes over time and recover previous versions. When using
version control, dynamic inputs (e.g. downloaded input filefs) and
outputs (e.g. resulting data files, log files, etc.) should
generally be marked as ignored/excluded from tracking, such as via
:file:`.gitignore` for Git.
An empty directory will be automatically created if the given path
does not exist but its parent directory does.
Required.
"""))
parser.add_argument(
"targets",
metavar = "<target>",
nargs = "*",
help = cleandoc("""
One or more workflow targets. A target is either a file path
(relative to :option:`<analysis-directory>`) produced by the
workflow or the name of a workflow rule or step.
Available targets will vary per pathogen (and between versions of
pathogens). Refer to the pathogen's own documentation for valid
targets.
Optional.
"""))
parser.add_argument(
"--force",
help = "Force a rerun of the whole workflow even if everything seems up-to-date.",
action = "store_true")
# XXX TODO: Consider if and how to share argument definitions with `build`?
# Starting with copying for now, but the expectation is they should be
# aligned as much as possible (at least where they overlap).
# -trs, 1 Nov 2024
parser.add_argument(
"--cpus",
help = "Number of CPUs/cores/threads/jobs to utilize at once. "
"Limits containerized (Docker, AWS Batch) workflow runs to this amount. "
"Informs Snakemake's resource scheduler when applicable. "
"Informs the AWS Batch instance size selection. "
"By default, no constraints are placed on how many CPUs are used by a workflow run; "
"workflow runs may use all that are available if they're able to.",
metavar = "<count>",
type = int)
parser.add_argument(
"--memory",
help = "Amount of memory to make available to the workflow run. "
"Units of b, kb, mb, gb, kib, mib, gib are supported. "
"Limits containerized (Docker, AWS Batch) workflow runs to this amount. "
"Informs Snakemake's resource scheduler when applicable. "
"Informs the AWS Batch instance size selection. ",
metavar = "<quantity>",
type = byte_quantity)
# XXX TODO: AWS Batch support for `nextstrain run`. Include options like
# --detach, --detach-on-interrupt, --attach, --cancel, etc? For now, only
# support detached Batch builds and kick the can to `build` for further
# monitoring/management. Maybe we leave it that way?
# -trs, 1 Nov 2024 & 28 Feb 2025
parser.add_argument(
"--exclude-from-upload",
metavar = "<pattern>",
help = dedent(f"""\
Exclude files matching ``<pattern>`` from being uploaded as part of
the remote build. Shell-style advanced globbing is supported, but
be sure to escape wildcards or quote the whole pattern so your
shell doesn't expand them. May be passed more than once.
Currently only supported when also using :option:`--aws-batch`.
Default is to upload the entire pathogen build directory (except
for some ancillary files which are always excluded).
Note that files excluded from upload may still be downloaded from
the remote build, e.g. if they're created by it, and if downloaded
will overwrite the local files. When attaching to the build, use
:option:`nextstrain build --no-download` to avoid downloading any
files or :option:`nextstrain build --exclude-from-download` to
avoid downloading specific files.
Besides basic glob features like single-part wildcards (``*``),
character classes (``[…]``), and brace expansion (``{{…, …}}``),
several advanced globbing features are also supported: multi-part
wildcards (``**``), extended globbing (``@(…)``, ``+(…)``, etc.),
and negation (``!…``).
Patterns should be relative to the build directory.
{SKIP_AUTO_DEFAULT_IN_HELP}
"""),
action = "append")
# Support --help and --help-all
add_extended_help_flags(parser)
# Register runner flags and arguments
#
# Note that we intentionally do not pass "..." (Ellipsis) as an element in
# "exec" because this `nextstrain run` command, unlike `nextstrain build`,
# is intended to fully encapsulate the details of Snakemake's invocation in
# order to present a simplified, stable interface.
# -trs, 6 Feb 2025
runner.register_runners(
parser,
exec = ["snakemake"]) # Other default exec args defined below
return parser
def run(opts):
build.assert_overlay_volumes_support(opts)
# Assert AWS Batch support; this command requires overlays.
if opts.__runner__ is aws_batch and not docker.image_supports(docker.IMAGE_FEATURE.aws_batch_overlays, opts.image):
raise UserError(f"""
The Nextstrain runtime image version in use
{opts.image}
is too old to support `nextstrain run` with AWS Batch.
Please update the runtime image to at least version
{split_image_name(opts.image)[0]}:{docker.IMAGE_FEATURE.aws_batch_overlays.value}
using `nextstrain update docker`. Alternatively, use a runtime
other than AWS Batch.
""")
# Resolve pathogen and workflow names to a local workflow directory.
try:
pathogen = UnmanagedPathogen(opts.pathogen)
except ValueError:
debug(f"Treating {opts.pathogen!r} as managed pathogen version")
pathogen = PathogenVersion(opts.pathogen)
else:
debug(f"Treating {opts.pathogen!r} as unmanaged pathogen directory")
if opts.workflow not in pathogen.registered_workflows():
warn(cleandoc(f"""
The {opts.workflow!r} workflow is not registered by pathogen {opts.pathogen!r}!
Trying to run it anyways (but it likely won't work)…
"""))
warn()
elif opts.workflow not in pathogen.compatible_workflows("nextstrain run"):
warn(cleandoc(f"""
The {opts.workflow!r} workflow is registered by pathogen {opts.pathogen!r}
but not marked as compatible with `nextstrain run`!
Trying to run it anyways (but it likely won't work)…
"""))
warn()
workflow_directory = pathogen.workflow_path(opts.workflow)
if not workflow_directory.is_dir() or not (workflow_directory / "Snakefile").is_file():
if isinstance(pathogen, UnmanagedPathogen):
raise UserError(f"""
No {opts.workflow!r} workflow for pathogen {opts.pathogen!r} found {f"in {str(workflow_directory)!r}" if DEBUGGING else "locally"}.
""")
else:
raise UserError(f"""
No {opts.workflow!r} workflow for pathogen {opts.pathogen!r} found {f"in {str(workflow_directory)!r}" if DEBUGGING else "locally"}.
Maybe you need to update to a newer version of the pathogen?
Hint: to update the pathogen, run `nextstrain update {shquote(pathogen.name)}`.
""")
# The pathogen volume is the pathogen directory (i.e. repo).
# The workflow volume is the workflow directory within the pathogen directory.
# The build volume is the user's analysis directory and will be the working directory.
pathogen_volume, workflow_volume = build.pathogen_volumes(workflow_directory, name = "pathogen")
build_volume = NamedVolume("build", opts.analysis_directory)
# for containerized runtimes (e.g. Docker, Singularity, and AWS Batch)
opts.volumes.append(pathogen_volume)
opts.volumes.append(build_volume)
print(f"Running the {opts.workflow!r} workflow for pathogen {pathogen}")
# Set up Snakemake invocation.
opts.default_exec_args += [
# Useful to see what's going on; see also 08ffc925.
"--printshellcmds",
# In our experience,¹ it's rarely useful to fail on incomplete outputs
# (Snakemake's default behaviour) instead of automatically regenerating
# them.
#
# ¹ <https://discussion.nextstrain.org/t/snakemake-throwing-incompletefilesexception-when-using-forceall/1397/4>
"--rerun-incomplete",
# Pin down rerun triggers so they don't drift over time as Snakemake
# changes the defaults. In the past, changes to this have been
# confusing/caused errors.
"--rerun-triggers", "code", "input", "mtime", "params", "software-env",
*(["--forceall"]
if opts.force else []),
# Explicitly use Snakemake's current working directory as the
# workflow's workdir, overriding any "workdir:" directive the workflow
# may include. Snakemake uses the cwd by default in the absence of any
# "workdir:" directive, but we want to _always_ use it to avoid writing
# into the pathogen/workflow source directories if a non-compatible
# workflow is run.
"--directory=.",
# Workdir will be the analysis volume (/nextstrain/build in a
# containerized runtime), so explicitly point to the Snakefile.
"--snakefile=%s/Snakefile" % (
docker.mount_point(workflow_volume)
if opts.__runner__ in {docker, singularity, aws_batch} else
workflow_volume.src.resolve(strict = True)),
# Pass thru appropriate resource options.
#
# Snakemake requires the --cores option as of 5.11, so provide a
# default to insulate our users from this and make Nextstrain builds
# fast-by-default. For more rationale/details, see a similar comment
# in nextstrain/cli/command/build.py.
# -trs, 1 Nov 2024
"--cores=%s" % (opts.cpus or "all"),
# Named MB but is really MiB, so convert our count of bytes to MiB
*(["--resources=mem_mb=%d" % (opts.memory // 1024**2)]
if opts.memory else []),
"--",
*opts.targets,
]
# XXX TODO: AWS Batch support for `nextstrain run`. For now, only support
# detached Batch builds and kick the can to `build` for further
# monitoring/management. In the future, maybe we'll support the full set
# of AWS Batch options (see related comment in register_parser() above).
# -trs, 28 Feb 2025
if opts.__runner__ is aws_batch:
opts.detach = True
opts.attach = None
opts.cancel = None
return runner.run(opts, working_volume = build_volume, cpus = opts.cpus, memory = opts.memory)