forked from BimberLab/DiscvrLabKeyModules
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathAlignerIndexUtil.java
More file actions
192 lines (167 loc) · 8.26 KB
/
AlignerIndexUtil.java
File metadata and controls
192 lines (167 loc) · 8.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package org.labkey.api.sequenceanalysis.pipeline;
import org.apache.commons.io.FileUtils;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.data.ConvertHelper;
import org.labkey.api.pipeline.PipelineJobException;
import org.labkey.api.pipeline.WorkDirectory;
import java.io.File;
import java.io.IOException;
/**
* Created by bimber on 9/6/2014.
*/
public class AlignerIndexUtil
{
public static final String INDEX_DIR = "alignerIndexes";
public static final String COPY_LOCALLY = "copyGenomeLocally";
public static boolean hasCachedIndex(PipelineContext ctx, String name, ReferenceGenome genome) throws PipelineJobException
{
ctx.getLogger().debug("checking whether cached index exists: " + name);
return verifyOrCreateCachedIndex(ctx, null, null, name, name, genome, false);
}
public static boolean copyIndexIfExists(PipelineContext ctx, AlignmentOutputImpl output, String name, ReferenceGenome genome) throws PipelineJobException
{
return copyIndexIfExists(ctx, output, name, name, genome);
}
public static boolean copyIndexIfExists(PipelineContext ctx, AlignmentOutputImpl output, String localName, String webserverName, ReferenceGenome genome) throws PipelineJobException
{
return copyIndexIfExists(ctx, output, localName, webserverName, genome, false);
}
public static boolean copyIndexIfExists(PipelineContext ctx, AlignmentOutputImpl output, String localName, String webserverName, ReferenceGenome genome, boolean forceCopyLocal) throws PipelineJobException
{
ctx.getLogger().debug("checking if index exists: " + localName + ". copy local: " + forceCopyLocal);
if (ctx.getWorkDir() == null)
{
throw new PipelineJobException("PipelineContext.getWorkDir() is null");
}
return verifyOrCreateCachedIndex(ctx, ctx.getWorkDir(), output, localName, webserverName, genome, forceCopyLocal);
}
public static File getIndexDir(ReferenceGenome genome, String name)
{
return getIndexDir(genome, name, false);
}
public static File getIndexDir(ReferenceGenome genome, String name, boolean useWebserverDir)
{
return new File(useWebserverDir ? genome.getSourceFastaFile().getParentFile() : genome.getWorkingFastaFile().getParentFile(), (genome.isTemporaryGenome() ? "" : INDEX_DIR + "/") + name);
}
/**
* If WorkDirectory is null, files will not be copied. Otherwise, files will be copied to this destination.
*/
private static boolean verifyOrCreateCachedIndex(PipelineContext ctx, @Nullable WorkDirectory wd, @Nullable AlignmentOutputImpl output, String localName, String webserverName, ReferenceGenome genome, boolean forceCopyLocal) throws PipelineJobException
{
boolean hasCachedIndex = false;
if (genome != null)
{
//NOTE: when we cache the indexes with the source FASTA genome, we store all aligners under the folder /alignerIndexes. When these are temporary genomes, they're top-level
File webserverIndexDir = getIndexDir(genome, webserverName, true);
if (webserverIndexDir.exists())
{
ctx.getLogger().info("previously created index found, no need to recreate");
ctx.getLogger().debug(webserverIndexDir.getPath());
//This is going to be a really rare event, so for the time being leave this as-is. We could consider throwing an exception and letting the job restart?
File lockFile = new File(webserverIndexDir.getPath() + ".copyLock");
if (lockFile.exists())
{
throw new PipelineJobException("Another job is actively saving this cached index. This error is being thrown as a precaution to avoid duplicate rsync jobs, and to prevent this job from progressing file that copy is in-progress. This job can be restarted after the copy is complete, and should resume normally.");
}
hasCachedIndex = true;
try
{
if (wd != null)
{
String val = ctx.getJob().getParameters().get(COPY_LOCALLY);
boolean doCopy = forceCopyLocal || (val == null || ConvertHelper.convert(val, Boolean.class));
if (doCopy)
{
ctx.getLogger().info("copying index files to work location");
File localSharedDir = new File(wd.getDir(), "Shared");
File destination = new File(localSharedDir, localName);
ctx.getLogger().debug(destination.getPath());
File[] files = webserverIndexDir.listFiles();
if (files == null)
{
return false;
}
destination = wd.inputFile(webserverIndexDir, destination, true);
if (output != null && !destination.equals(webserverIndexDir))
{
ctx.getLogger().debug("adding deferred delete file: " + destination.getPath());
output.addDeferredDeleteIntermediateFile(destination);
}
ctx.getLogger().info("finished copying files");
}
else
{
ctx.getLogger().debug("index files will not be copied to the work directory");
}
}
else
{
ctx.getLogger().info("no need to copy files at this time");
}
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
}
else
{
ctx.getLogger().debug("expected location of cached index does not exist: " + webserverIndexDir.getPath());
}
}
else
{
ctx.getLogger().debug("there is no cached reference genome, cannot build index");
}
return hasCachedIndex;
}
public static void saveCachedIndex(boolean hasCachedIndex, PipelineContext ctx, File indexDir, String name, ReferenceGenome genome) throws PipelineJobException
{
if (!hasCachedIndex && genome != null && !genome.isTemporaryGenome())
{
File cachingDir = new File(genome.getSourceFastaFile().getParentFile(), INDEX_DIR + "/" + name);
ctx.getLogger().info("caching index files for future use");
ctx.getLogger().debug(cachingDir.getPath());
File lockFile = new File(cachingDir.getPath() + ".copyLock");
if (lockFile.exists())
{
ctx.getLogger().info("Another job is already caching this index, skipping");
return;
}
try
{
//use as indicator to prevent multiple concurrent jobs from interfering
FileUtils.touch(lockFile);
if (!cachingDir.exists())
{
cachingDir.mkdirs();
}
File[] files = indexDir.listFiles();
for (File f : files)
{
File dest = new File(cachingDir, f.getName());
if (f.equals(dest))
{
ctx.getLogger().debug("source/destination are the same, skipping: " + dest.getName());
continue;
}
ctx.getLogger().debug("copying file: " + dest.getName());
if (f.isDirectory())
{
FileUtils.copyDirectory(f, dest);
}
else
{
FileUtils.copyFile(f, dest);
}
}
lockFile.delete();
ReferenceGenomeManager.get().markGenomeModified(genome, ctx.getLogger());
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
}
}
}