forked from BimberLab/DiscvrLabKeyModules
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathParagraphStep.java
More file actions
317 lines (274 loc) · 14.4 KB
/
ParagraphStep.java
File metadata and controls
317 lines (274 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package org.labkey.sequenceanalysis.run.alignment;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMReadGroupRecord;
import htsjdk.samtools.SamReader;
import htsjdk.samtools.SamReaderFactory;
import org.apache.commons.io.FileUtils;
import org.json.JSONObject;
import org.labkey.api.module.ModuleLoader;
import org.labkey.api.pipeline.PipelineJob;
import org.labkey.api.pipeline.PipelineJobException;
import org.labkey.api.pipeline.RecordedAction;
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
import org.labkey.api.sequenceanalysis.pipeline.AbstractParameterizedOutputHandler;
import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport;
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService;
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
import org.labkey.api.sequenceanalysis.run.DockerWrapper;
import org.labkey.api.sequenceanalysis.run.SelectVariantsWrapper;
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
import org.labkey.api.util.FileUtil;
import org.labkey.api.writer.PrintWriters;
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
import org.labkey.sequenceanalysis.util.SequenceUtil;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class ParagraphStep extends AbstractParameterizedOutputHandler<SequenceOutputHandler.SequenceOutputProcessor>
{
public ParagraphStep()
{
super(ModuleLoader.getInstance().getModule(SequenceAnalysisModule.class), "Paragraph SV Genotyping", "This will run paraGRAPH on one or more BAM files to genotype SVs", null, Arrays.asList(
ToolParameterDescriptor.createExpDataParam("svVCF", "Input VCF", "This is the DataId of the VCF containing the SVs to genotype", "ldk-expdatafield", new JSONObject()
{{
put("allowBlank", false);
}}, null),
ToolParameterDescriptor.create("doBndSubset", "Filter Input VCF", "If selected, prior to running SelectVariants will be run to remove BNDs sites with POS<150 and symbolic INS without ALT sequence", "checkbox", new JSONObject(){{
put("checked", false);
}}, false),
ToolParameterDescriptor.create("useOutputFileContainer", "Submit to Source File Workbook", "If checked, each job will be submitted to the same workbook as the input file, as opposed to submitting all jobs to the same workbook. This is primarily useful if submitting a large batch of files to process separately. This only applies if 'Run Separately' is selected.", "checkbox", new JSONObject(){{
put("checked", false);
}}, false),
ToolParameterDescriptor.create("verbose", "Verbose Logging", "If checked, --verbose will be passed to paragraph to increase logging", "checkbox", new JSONObject(){{
put("checked", false);
}}, false),
ToolParameterDescriptor.create("useLocalScratch", "User local scratch", "If checked, the tool will write the intermediate temp files to a folder in the working directory, rather than the job's tempDir. This can make debugging easier.", "checkbox", new JSONObject(){{
put("checked", false);
}}, false),
ToolParameterDescriptor.create("retrieveReferenceSeq", "Retrieve Reference Sequence", "If checked, --debug will be passed to paragraph to increase logging", "checkbox", new JSONObject(){{
put("checked", false);
}}, false)
));
}
@Override
public boolean doSplitJobs()
{
return true;
}
@Override
public boolean canProcess(SequenceOutputFile o)
{
return o.getFile() != null && o.getFile().exists() && SequenceUtil.FILETYPE.bamOrCram.getFileType().isType(o.getFile());
}
@Override
public boolean doRunRemote()
{
return true;
}
@Override
public boolean doRunLocal()
{
return false;
}
@Override
public SequenceOutputProcessor getProcessor()
{
return new Processor();
}
public static class Processor implements SequenceOutputProcessor
{
@Override
public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport support, List<SequenceOutputFile> inputFiles, JSONObject params, File outputDir, List<RecordedAction> actions, List<SequenceOutputFile> outputsToCreate) throws UnsupportedOperationException, PipelineJobException
{
}
@Override
public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext ctx) throws UnsupportedOperationException, PipelineJobException
{
int svVcfId = ctx.getParams().optInt("svVCF", 0);
if (svVcfId == 0)
{
throw new PipelineJobException("svVCF param was null");
}
File svVcf = ctx.getSequenceSupport().getCachedData(svVcfId);
if (svVcf == null)
{
throw new PipelineJobException("File not found for ID: " + svVcfId);
}
else if (!svVcf.exists())
{
throw new PipelineJobException("Missing file: " + svVcf.getPath());
}
boolean doBndSubset = ctx.getParams().optBoolean("doBndSubset", false);
if (doBndSubset)
{
File vcfNoBnd = new File(ctx.getOutputDir(), SequenceAnalysisService.get().getUnzippedBaseName(svVcf.getName()) + "pgSubset.vcf.gz");
File vcfNoBndIdx = new File(vcfNoBnd.getPath() + ".tbi");
if (vcfNoBndIdx.exists())
{
ctx.getLogger().debug("Index exists, will no repeat VCF subset");
}
else
{
SelectVariantsWrapper svw = new SelectVariantsWrapper(ctx.getLogger());
List<String> selectArgs = new ArrayList<>();
selectArgs.add("-select");
selectArgs.add("SVTYPE != 'BND' && SVTYPE != 'DUP' && POS > 150 && !(vc.hasAttribute('SVTYPE') && vc.getAttribute('SVTYPE') == 'INS' && vc.hasSymbolicAlleles() && !vc.hasAttribute('SEQ'))");
selectArgs.add("--exclude-filtered");
selectArgs.add("--exclude-filtered");
selectArgs.add("--sites-only-vcf-output");
svw.execute(ctx.getSequenceSupport().getCachedGenome(inputFiles.get(0).getLibrary_id()).getWorkingFastaFile(), svVcf, vcfNoBnd, selectArgs);
ctx.getFileManager().addIntermediateFile(vcfNoBnd);
ctx.getFileManager().addIntermediateFile(vcfNoBndIdx);
svVcf = vcfNoBnd;
}
}
Integer threads = SequencePipelineService.get().getMaxThreads(ctx.getLogger());
for (SequenceOutputFile so : inputFiles)
{
List<String> depthArgs = new ArrayList<>();
depthArgs.add("idxdepth");
depthArgs.add("-b");
depthArgs.add(so.getFile().getPath());
File coverageJson = new File(ctx.getWorkingDirectory(), "coverage.json");
depthArgs.add("-o");
depthArgs.add(coverageJson.getPath());
depthArgs.add("-r");
depthArgs.add(ctx.getSequenceSupport().getCachedGenome(so.getLibrary_id()).getWorkingFastaFile().getPath());
if (threads != null)
{
depthArgs.add("--threads");
depthArgs.add(threads.toString());
}
File doneFile = new File(ctx.getWorkingDirectory(), "idxdepth.done");
ctx.getFileManager().addIntermediateFile(doneFile);
if (doneFile.exists())
{
ctx.getLogger().info("idxdepth already performed, skipping");
}
else
{
new SimpleScriptWrapper(ctx.getLogger()).execute(depthArgs);
try
{
FileUtils.touch(doneFile);
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
}
if (!coverageJson.exists())
{
throw new PipelineJobException("Missing file: " + coverageJson.getPath());
}
ctx.getFileManager().addIntermediateFile(coverageJson);
// Should produce a simple text file:
// id path depth read length
// IB18 ../IB18.cram 29.77 150
File coverageFile = new File(ctx.getWorkingDirectory(), "coverage.txt");
String rgId = null;
try (PrintWriter writer = PrintWriters.getPrintWriter(coverageFile); SamReader reader = SamReaderFactory.makeDefault().open(so.getFile()))
{
SAMFileHeader header = reader.getFileHeader();
if (header.getReadGroups().isEmpty())
{
throw new PipelineJobException("No read groups found in input BAM");
}
Set<String> uniqueSamples = header.getReadGroups().stream().map(SAMReadGroupRecord::getSample).collect(Collectors.toSet());
if (uniqueSamples.size() > 1)
{
throw new PipelineJobException("Readgroups contained more than one unique sample");
}
rgId = uniqueSamples.iterator().next();
JSONObject json = new JSONObject(FileUtils.readFileToString(coverageJson, Charset.defaultCharset()));
writer.println("id\tpath\tdepth\tread length");
double depth = json.getJSONObject("autosome").getDouble("depth");
if (depth <= 0)
{
throw new PipelineJobException("Depth was zero for file: " + so.getFile().getPath());
}
double readLength = json.getInt("read_length");
writer.println(rgId + "\t" + so.getFile().getPath() + "\t" + depth + "\t" + readLength);
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
ctx.getFileManager().addIntermediateFile(coverageFile);
DockerWrapper dockerWrapper = new DockerWrapper("ghcr.io/bimberlabinternal/paragraph:latest", ctx.getLogger(), ctx);
dockerWrapper.setTmpDir(new File(SequencePipelineService.get().getJavaTempDir()));
List<String> paragraphArgs = new ArrayList<>();
paragraphArgs.add("/opt/paragraph/bin/multigrmpy.py");
File paragraphOutDir = new File(ctx.getWorkingDirectory(), FileUtil.getBaseName(so.getFile()));
paragraphArgs.add("-o");
paragraphArgs.add(paragraphOutDir.getPath());
File localScratchDir = new File(ctx.getOutputDir(), "pgScratch");
if (localScratchDir.exists())
{
try
{
FileUtils.deleteDirectory(localScratchDir);
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
}
boolean useLocalScratch = ctx.getParams().optBoolean("useLocalScratch", false);
if (useLocalScratch)
{
paragraphArgs.add("--scratch-dir");
paragraphArgs.add(localScratchDir.getPath());
ctx.getFileManager().addIntermediateFile(localScratchDir);
}
paragraphArgs.add("-i");
paragraphArgs.add(svVcf.getPath());
paragraphArgs.add("-m");
paragraphArgs.add(coverageFile.getPath());
if (ctx.getParams().optBoolean("verbose", false))
{
paragraphArgs.add("--verbose");
}
paragraphArgs.add("-r");
File genomeFasta = ctx.getSequenceSupport().getCachedGenome(so.getLibrary_id()).getWorkingFastaFile();
paragraphArgs.add(genomeFasta.getPath());
if (ctx.getParams().optBoolean("retrieveReferenceSeq", false))
{
paragraphArgs.add("--retrieve-reference-sequence");
}
if (threads != null)
{
paragraphArgs.add("--threads");
paragraphArgs.add(threads.toString());
}
dockerWrapper.executeWithDocker(paragraphArgs, ctx.getWorkingDirectory(), ctx.getFileManager(), Arrays.asList(so.getFile(), genomeFasta, svVcf));
File genotypes = new File(paragraphOutDir, "genotypes.vcf.gz");
if (!genotypes.exists())
{
throw new PipelineJobException("Missing file: " + genotypes.getPath());
}
try
{
SequenceAnalysisService.get().ensureVcfIndex(genotypes, ctx.getLogger());
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
ctx.getFileManager().addSequenceOutput(genotypes, "paraGRAPH Genotypes: " + so.getName(), "paraGRAPH Genoypes", so.getReadset(), null, so.getLibrary_id(), "Input VCF: " + svVcf.getName() + " (" + svVcfId + ")");
ctx.getFileManager().addIntermediateFile(new File(paragraphOutDir, "variants.json.gz"));
ctx.getFileManager().addIntermediateFile(new File(paragraphOutDir, "variants.vcf.gz"));
ctx.getFileManager().addIntermediateFile(new File(paragraphOutDir, "genotypes.json.gz"));
ctx.getFileManager().addIntermediateFile(new File(paragraphOutDir, "grmpy.log"));
}
}
}
}