-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathNextFlowPipelineJob.java
More file actions
148 lines (125 loc) · 5.21 KB
/
NextFlowPipelineJob.java
File metadata and controls
148 lines (125 loc) · 5.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package org.labkey.nextflow.pipeline;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.json.JSONObject;
import org.labkey.api.data.Container;
import org.labkey.api.files.FileContentService;
import org.labkey.api.pipeline.ParamParser;
import org.labkey.api.pipeline.PipeRoot;
import org.labkey.api.pipeline.PipelineJobService;
import org.labkey.api.pipeline.PipelineService;
import org.labkey.api.pipeline.PipelineStatusFile;
import org.labkey.api.pipeline.TaskId;
import org.labkey.api.pipeline.TaskPipeline;
import org.labkey.api.pipeline.file.AbstractFileAnalysisJob;
import org.labkey.api.util.FileUtil;
import org.labkey.api.util.PageFlowUtil;
import org.labkey.api.util.StringUtilsLabKey;
import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.view.ViewBackgroundInfo;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
@Getter
public class NextFlowPipelineJob extends AbstractFileAnalysisJob
{
protected static final Logger LOG = LogHelper.getLogger(NextFlowPipelineJob.class, "NextFlow jobs");
private Path config;
@SuppressWarnings("unused") // For serialization
protected NextFlowPipelineJob()
{}
public static NextFlowPipelineJob create(ViewBackgroundInfo info, @NotNull PipeRoot root, Path templateConfig, List<Path> inputFiles) throws IOException
{
Path parentDir = inputFiles.get(0).getParent();
String jobName = FileUtil.makeFileNameWithTimestamp("NextFlow");
Path jobDir = parentDir.resolve(jobName);
Path log = jobDir.resolve(jobName + ".log");
FileUtil.createDirectory(jobDir);
Path config = createConfig(templateConfig, parentDir, jobDir, info.getContainer());
return new NextFlowPipelineJob(info, root, config, inputFiles, log);
}
public NextFlowPipelineJob(ViewBackgroundInfo info, @NotNull PipeRoot root, Path config, List<Path> inputFiles, Path log) throws IOException
{
super(new NextFlowProtocol(), NextFlowPipelineProvider.NAME, info, root, config.getFileName().toString(), config, inputFiles, false, false);
this.config = config;
setLogFile(log);
LOG.info("NextFlow job queued: {}", getJsonJobInfo());
}
protected JSONObject getJsonJobInfo()
{
JSONObject result = new JSONObject();
result.put("user", getUser().getEmail());
result.put("container", getContainer().getPath());
result.put("filePath", getLogFilePath().getParent().toString());
result.put("runName", getNextFlowRunName());
result.put("configFile", getConfig().getFileName().toString());
return result;
}
protected String getNextFlowRunName()
{
PipelineStatusFile file = PipelineService.get().getStatusFile(getJobGUID());
return file == null ? "Unknown" : ("LabKeyJob" + file.getRowId());
}
@Override
public ParamParser getInputParameters()
{
return PipelineJobService.get().createParamParser();
}
/** Take the template config file and substitute in the values for this job */
private static Path createConfig(Path configTemplate, Path parentDir, Path jobDir, Container container) throws IOException
{
String template;
try (InputStream in = Files.newInputStream(configTemplate))
{
template = PageFlowUtil.getStreamContentsAsString(in);
}
String webdavUrl = FileContentService.get().getWebDavUrl(parentDir, container, FileContentService.PathType.full).toString();
webdavUrl = StringUtils.stripEnd(webdavUrl, "/");
String substitutedContent = template.replace("${quant_spectra_dir}", "quant_spectra_dir = '" + webdavUrl + "'");
String uploadUrl = FileContentService.get().getWebDavUrl(jobDir, container, FileContentService.PathType.full).toString();
uploadUrl = StringUtils.stripEnd(uploadUrl, "/");
substitutedContent = substitutedContent.replace("${panorama.upload_url}", "panorama.upload_url = '" + uploadUrl + "'");
Path substitutedFile = jobDir.resolve(configTemplate.getFileName());
try (BufferedWriter writer = Files.newBufferedWriter(substitutedFile))
{
writer.write(substitutedContent);
}
return substitutedFile;
}
@Override
public String getDescription()
{
return "NextFlow analysis of " + StringUtilsLabKey.pluralize(getInputFilePaths().size(), "file") + " using config: " + config.getFileName();
}
@Override
public TaskPipeline<?> getTaskPipeline()
{
return PipelineJobService.get().getTaskPipeline(getTaskPipelineId());
}
@Override
public TaskId getTaskPipelineId()
{
return new TaskId(NextFlowPipelineJob.class);
}
@Override
public AbstractFileAnalysisJob createSingleFileJob(File file)
{
throw new UnsupportedOperationException();
}
@Override
public File findInputFile(String name)
{
throw new UnsupportedOperationException();
}
@Override
public File findOutputFile(String name)
{
return null;
}
}