Skip to content

Commit 6a98e1a

Browse files
committed
Fairly substantial checkin. Among the changes
1. learnErrors: we are switching back to using FASTQ, primarily b/c reading in *all* derep RDS files is required when using these, however only a small portion of sequence data is actually used for the model. 2. dada: we are using all dereps in the pooled runs and per sample runs 3. dada options (used in learnErrors and dada) are more consistent but we do need to plan for changes here, the current method is a bit of a hack 4. We are testing a parallel pseudo pooling approach, though this still needs work
1 parent b8ccb74 commit 6a98e1a

9 files changed

Lines changed: 127 additions & 66 deletions

File tree

modules/local/dadainfer.nf

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,17 @@ process DADA2_POOLED_INFER {
2121
#!/usr/bin/env Rscript
2222
suppressPackageStartupMessages(library(dada2))
2323
24-
dadaOpt <- ${dadaOpt}
24+
dadaOpt <- "${dadaOpt}"
2525
2626
if (!is.na(dadaOpt)) {
27-
setDadaOpt(dadaOpt)
28-
cat("dada Options:\\n",dadaOpt,"\\n")
27+
setDadaOpt(${dadaOpt})
28+
cat("dada Options:\\n",${dadaOpt},"\\n")
2929
}
3030
set.seed(100)
3131
3232
cat("Processing all samples\\n")
3333
3434
err <- readRDS("${err}")
35-
dereps <- readRDS("${dereps}")
3635
3736
#Variable selection from CLI input flag --pool
3837
pool <- "${params.pool}"
@@ -41,6 +40,16 @@ process DADA2_POOLED_INFER {
4140
if(pool != "pseudo"){
4241
pool <- as.logical(pool)
4342
}
43+
44+
# File parsing (these come from the process input channel)
45+
derep_files <- list.files('.', pattern=paste0("${readmode}",".derep.RDS"), full.names = TRUE)
46+
47+
dereps <- lapply(derep_files, readRDS)
48+
49+
# note this is a bit of a hack, but we want the file name
50+
# included with the name of the derep object. This makes
51+
# sure these are in sync if needed later
52+
names(dereps) <- sapply(dereps, function(x) { x\$file })
4453
4554
cat(paste0("Denoising ${readmode} reads: pool:", pool, "\\n"))
4655
dds <- dada(dereps,

modules/local/learnerrors/main.nf

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ process DADA2_LEARN_ERRORS {
99

1010
output:
1111
tuple val(readmode), path("errors.${readmode}.RDS"), emit: error_models
12-
tuple val(readmode), path("dereps.${readmode}.RDS"), emit: dereps_full
12+
// tuple val(readmode), path("dereps.${readmode}.RDS"), emit: dereps_full
1313
path("${readmode}*.err.pdf"), emit: pdf
1414

1515
when:
@@ -47,20 +47,13 @@ process DADA2_LEARN_ERRORS {
4747
cat("dada Options:\\n","${params.dada_opts}","\\n")
4848
}
4949
50-
# File parsing (these come from the process input channel)
51-
derep_files <- list.files('.', pattern=paste0("${readmode}",".derep.RDS"), full.names = TRUE)
52-
53-
dereps <- lapply(derep_files, readRDS)
54-
55-
# note this is a bit of a hack, but we want the file name
56-
# included with the name of the derep object. This makes
57-
# sure these are in sync if needed later
58-
names(dereps) <- sapply(dereps, function(x) { x\$file })
50+
# File parsing
51+
filts <- list.files('.', pattern=paste0("${readmode}",".filtered.fastq.gz"), full.names = TRUE)
5952
6053
set.seed(${params.random_seed})
6154
6255
# Learn read error rates
63-
err <- learnErrors(dereps,
56+
err <- learnErrors(filts,
6457
multithread=${task.cpus},
6558
errorEstimationFunction=errFunc,
6659
verbose=TRUE,
@@ -82,6 +75,5 @@ process DADA2_LEARN_ERRORS {
8275
dev.off()
8376
8477
saveRDS(err, paste0("errors.","${readmode}",".RDS"))
85-
saveRDS(dereps, paste0("dereps.","${readmode}",".RDS"))
8678
"""
8779
}

modules/local/persampleinferderepmerge.nf

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ process PER_SAMPLE_INFER {
55
container "ghcr.io/h3abionet/tada:docker-DADA-1.36"
66

77
input:
8-
tuple val(meta), path(reads)
8+
tuple val(meta), path(dereps)
99
path(errs)
1010
// optional inputs
1111
path(fp, stageAs: "priors_R1")
@@ -40,17 +40,17 @@ process PER_SAMPLE_INFER {
4040
return(priors)
4141
}
4242
43-
dadaOpt <- ${dadaOpt}
43+
dadaOpt <- "${dadaOpt}"
4444
4545
if (!is.na(dadaOpt)) {
46-
setDadaOpt(dadaOpt)
47-
cat("dada Options:\\n",dadaOpt,"\\n")
46+
setDadaOpt(${dadaOpt})
47+
cat("dada Options:\\n",${dadaOpt},"\\n")
4848
}
4949
5050
cat("Processing:", "${meta.id}", "\\n")
5151
5252
errF <- readRDS("errors.R1.RDS")
53-
derepF <- derepFastq("${reads[0]}", n=100000)
53+
derepF <- readRDS("${dereps[0]}")
5454
5555
# TODO: there is probably a better way of doing this
5656
# when using optparse
@@ -70,7 +70,7 @@ process PER_SAMPLE_INFER {
7070
7171
if (file.exists("errors.R2.RDS")) {
7272
errR <- readRDS("errors.R2.RDS")
73-
derepR <- derepFastq("${reads[1]}", n=100000)
73+
derepR <- readRDS("${dereps[1]}")
7474
paramsR <- list(
7575
derep=derepR,
7676
err=errR,

modules/local/persamplemergedadards.nf

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,29 @@ process PER_SAMPLE_MERGE {
44

55
input:
66
path(dds)
7+
val(stage)
78

89
output:
9-
path("all.dd.*.RDS"), emit: inferred // to readtracking
10-
path("priors.R1.fna"), optional: true, emit: priors_for
11-
path("priors.R2.fna"), optional: true, emit: priors_rev
10+
path("all.dd.${stage}.*.RDS"), emit: inferred // to readtracking
11+
path("priors.${stage}.R1.fna"), optional: true, emit: priors_for
12+
path("priors.${stage}.R2.fna"), optional: true, emit: priors_rev
1213

1314
when:
1415
task.ext.when == null || task.ext.when
1516

1617
script:
17-
def dadaOpt = !params.dada_opts.isEmpty() ? "'${params.dada_opts.collect{k,v->"$k=$v"}.join(", ")}'" : 'NA'
18+
def dadaOpt = params.dada_opts ? "${params.dada_opts}" : "NA"
1819
"""
1920
#!/usr/bin/env Rscript
2021
suppressPackageStartupMessages(library(dada2))
2122
suppressPackageStartupMessages(library(ShortRead))
2223
suppressPackageStartupMessages(library(openssl))
2324
24-
dadaOpt <- ${dadaOpt}
25+
dadaOpt <- "${dadaOpt}"
2526
2627
if (!is.na(dadaOpt)) {
27-
setDadaOpt(dadaOpt)
28-
cat("dada Options:\\n",dadaOpt,"\\n")
28+
setDadaOpt(${dadaOpt})
29+
cat("dada Options:\\n",${dadaOpt},"\\n")
2930
}
3031
3132
dadaopts <- getDadaOpt()
@@ -51,23 +52,23 @@ process PER_SAMPLE_MERGE {
5152
dadaFs <- lapply(list.files(path = '.', pattern = '.dd.R1.RDS'), function (x) readRDS(x))
5253
dadaRs <- lapply(list.files(path = '.', pattern = '.dd.R2.RDS'), function (x) readRDS(x))
5354
names(dadaFs) <- sub('.dd.R1.RDS', '', list.files('.', pattern = '.dd.R1.RDS'))
54-
saveRDS(dadaFs, "all.dd.R1.RDS")
55+
saveRDS(dadaFs, "all.dd.${stage}.R1.RDS")
5556
5657
priorsF <- generate_priors(dadaFs, idtype="${params.id_type}")
5758
if (is.na(priorsF)) {
5859
message("No priors found for R1!")
5960
} else {
60-
writeFasta(priorsF, file = 'priors.R1.fna')
61+
writeFasta(priorsF, file = 'priors.${stage}.R1.fna')
6162
}
6263
if (length(dadaRs) > 0) {
6364
names(dadaRs) <- sub('.dd.R2.RDS', '', list.files('.', pattern = '.dd.R2.RDS'))
6465
saveRDS(dadaRs, "all.dd.R2.RDS")
6566
priorsR <- generate_priors(dadaRs, idtype="${params.id_type}")
6667
if (is.na(priorsR)) {
6768
message("No priors found for R2!")
68-
file.create("priors.R2.fna")
69+
file.create("priors.${stage}.R2.fna")
6970
} else {
70-
writeFasta(priorsR, file = "priors.R2.fna")
71+
writeFasta(priorsR, file = "priors.${stage}.R2.fna")
7172
}
7273
}
7374
"""

modules/local/persampleseqtable.nf

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ process PER_SAMPLE_SEQTABLE {
55
input:
66
path(mr)
77
val(readmode)
8+
val(stage)
89

910
output:
10-
path("seqtab.${readmode}.RDS"), emit: filtered_seqtable
11-
path("all.merged.RDS"), optional: true, emit: merged_seqs
12-
path("seqtab.original.${readmode}.RDS"), emit: seqtabQC
11+
path("seqtab.${stage}.${readmode}.RDS"), emit: filtered_seqtable
12+
path("all.${stage}.merged.RDS"), optional: true, emit: merged_seqs
13+
path("seqtab.original.${stage}.${readmode}.RDS"), emit: seqtabQC
1314

1415
when:
1516
task.ext.when == null || task.ext.when
@@ -26,7 +27,7 @@ process PER_SAMPLE_SEQTABLE {
2627
names(combined) <- pairIds
2728
seqtab <- makeSequenceTable(combined)
2829
29-
saveRDS(seqtab, "seqtab.original.${readmode}.RDS")
30+
saveRDS(seqtab, "seqtab.original.${stage}.${readmode}.RDS")
3031
3132
# this is an optional filtering step to remove *merged* sequences based on
3233
# min/max length criteria
@@ -38,7 +39,7 @@ process PER_SAMPLE_SEQTABLE {
3839
seqtab <- seqtab[,nchar(colnames(seqtab)) <= ${params.min_asv_len}, drop = FALSE]
3940
}
4041
41-
saveRDS(seqtab, "seqtab.${readmode}.RDS")
42-
saveRDS(combined, "all.${readmode}.RDS")
42+
saveRDS(seqtab, "seqtab.${stage}.${readmode}.RDS")
43+
saveRDS(combined, "all.${stage}.${readmode}.RDS")
4344
"""
4445
}

modules/local/pooledseqtable.nf

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,20 @@ process DADA2_POOLED_SEQTABLE {
151151
# read in denoised reads for both
152152
ddFs <- readRDS("all.dd.R1.RDS")
153153
154-
derepsF <- readRDS("dereps.R1.RDS")
154+
# File parsing (these come from the process input channel)
155+
derep_files_r1 <- list.files('.', pattern="R1.derep.RDS", full.names = TRUE)
156+
derepsF <- lapply(derep_files_r1, readRDS)
157+
names(derepsF) <- sapply(derepsF, function(x) { x\$file })
155158
156159
if (file.exists("all.dd.R2.RDS")) {
157160
158161
ddRs <- readRDS("all.dd.R2.RDS")
159-
derepsR <- readRDS("dereps.R2.RDS")
162+
163+
# File parsing (these come from the process input channel)
164+
derep_files_r2 <- list.files('.', pattern="R2.derep.RDS", full.names = TRUE)
165+
derepsR <- lapply(derep_files_r2, readRDS)
166+
names(derepsR) <- sapply(derepsR, function(x) { x\$file })
167+
160168
mergers <- if(rescuePairs) {
161169
mergePairsRescue(ddFs, derepsF, ddRs, derepsR,
162170
returnRejects = TRUE,

subworkflows/local/dada2_denoise.nf

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
include { DADA2_LEARN_ERRORS } from '../../modules/local/learnerrors'
2-
include { DADA2_DEREP_SEQS } from '../../modules/local/dada2derepseqs'
3-
include { DADA2_POOLED_DENOISE } from '../../subworkflows/local/dada2_pooled_denoise'
4-
include { DADA2_PER_SAMPLE_DENOISE } from '../../subworkflows/local/dada2_per_sample_denoise'
1+
include { DADA2_LEARN_ERRORS } from '../../modules/local/learnerrors'
2+
include { DADA2_DEREP_SEQS } from '../../modules/local/dada2derepseqs'
3+
include { DADA2_POOLED_DENOISE } from '../../subworkflows/local/dada2_pooled_denoise'
4+
include { DADA2_PER_SAMPLE_DENOISE as DADA2_PER_SAMPLE_DENOISE_ROUND1 } from '../../subworkflows/local/dada2_per_sample_denoise'
5+
include { DADA2_PER_SAMPLE_DENOISE as DADA2_PER_SAMPLE_DENOISE_ROUND2 } from '../../subworkflows/local/dada2_per_sample_denoise'
6+
include { DADA2_PER_SAMPLE_DENOISE } from '../../subworkflows/local/dada2_per_sample_denoise'
57

68
workflow DADA2_DENOISE {
79

@@ -37,20 +39,20 @@ workflow DADA2_DENOISE {
3739
// For this 'batch' run, we use two channels combining the data and
3840
// include whether they are R1 or R1 (a 'readmode') to distinguish them. ch_trimmed_batch
3941

40-
ch_dereps_per_read = ch_dereps
42+
ch_trimmed_per_read = ch_trimmed
4143
.map {
4244
[ 'R1', it[0].single_end ? it[1] : it[1][0] ]
4345
}
4446
.concat(
45-
ch_dereps
47+
ch_trimmed
4648
.filter { !it[0].single_end }
4749
.map {
4850
[ 'R2', it[1][1] ]
4951
}
5052
)
5153
.groupTuple(sort: true)
5254

53-
DADA2_LEARN_ERRORS(ch_dereps_per_read)
55+
DADA2_LEARN_ERRORS(ch_trimmed_per_read)
5456
ch_errs = DADA2_LEARN_ERRORS.out.error_models
5557

5658
// deal with priors here, which are optional inputs
@@ -64,14 +66,48 @@ workflow DADA2_DENOISE {
6466
if (params.pool == "parallel") {
6567
DADA2_PER_SAMPLE_DENOISE(
6668
per_sample_errs,
67-
ch_trimmed_parallel,
69+
ch_dereps,
6870
for_priors,
69-
rev_priors)
71+
rev_priors,
72+
"single-pass")
7073
ch_merged = DADA2_PER_SAMPLE_DENOISE.out.merged_seqs
7174
ch_inferred = DADA2_PER_SAMPLE_DENOISE.out.inferred
7275
ch_filtered_seqtab = DADA2_PER_SAMPLE_DENOISE.out.filtered_seqtable
76+
ch_versions = ch_versions.mix(DADA2_PER_SAMPLE_DENOISE.out.versions)
7377
} else {
74-
error "parallel-pseudo pooling not supported yet"
78+
// error "parallel-pseudo pooling not supported yet"
79+
80+
// For now we keep these separate since this method is *highly*
81+
// experimental!!!!
82+
83+
// Round 1, generate a new set of priors
84+
// Note this can also take an older set of prior data
85+
DADA2_PER_SAMPLE_DENOISE_ROUND1(
86+
per_sample_errs,
87+
ch_dereps,
88+
for_priors,
89+
rev_priors,
90+
"Round1")
91+
rnd1_ch_merged = DADA2_PER_SAMPLE_DENOISE_ROUND1.out.merged_seqs
92+
rnd1_ch_inferred = DADA2_PER_SAMPLE_DENOISE_ROUND1.out.inferred
93+
rnd1_ch_filtered_seqtab = DADA2_PER_SAMPLE_DENOISE_ROUND1.out.filtered_seqtable
94+
rnd1_for_priors = DADA2_PER_SAMPLE_DENOISE_ROUND1.out.for_priors
95+
rnd1_rev_priors = DADA2_PER_SAMPLE_DENOISE_ROUND1.out.rev_priors
96+
ch_versions = ch_versions.mix(DADA2_PER_SAMPLE_DENOISE_ROUND1.out.versions)
97+
98+
// Round 2, using priors from round 1 but same error models and dereps
99+
DADA2_PER_SAMPLE_DENOISE_ROUND2(
100+
per_sample_errs,
101+
ch_dereps,
102+
rnd1_for_priors,
103+
rnd1_rev_priors,
104+
"Round2")
105+
ch_merged = DADA2_PER_SAMPLE_DENOISE_ROUND2.out.merged_seqs
106+
ch_inferred = DADA2_PER_SAMPLE_DENOISE_ROUND2.out.inferred
107+
ch_filtered_seqtab = DADA2_PER_SAMPLE_DENOISE_ROUND2.out.filtered_seqtable
108+
rnd2_for_priors = DADA2_PER_SAMPLE_DENOISE_ROUND2.out.for_priors
109+
rnd2_rev_priors = DADA2_PER_SAMPLE_DENOISE_ROUND2.out.rev_priors
110+
ch_versions = ch_versions.mix(DADA2_PER_SAMPLE_DENOISE_ROUND2.out.versions)
75111
}
76112
} else {
77113
// TODO: can we even use priors with 'true' or 'pseudo'?
@@ -84,9 +120,21 @@ workflow DADA2_DENOISE {
84120
// and this step, which optionally pools them. For really large runs
85121
// this will use a ton of memory
86122

123+
ch_dereps_per_read = ch_dereps
124+
.map {
125+
[ 'R1', it[0].single_end ? it[1] : it[1][0] ]
126+
}
127+
.concat(
128+
ch_dereps
129+
.filter { !it[0].single_end }
130+
.map {
131+
[ 'R2', it[1][1] ]
132+
}
133+
)
134+
.groupTuple(sort: true)
135+
87136
DADA2_POOLED_DENOISE(
88-
ch_errs,
89-
DADA2_LEARN_ERRORS.out.dereps_full
137+
ch_errs, ch_dereps_per_read
90138
)
91139

92140
ch_merged = DADA2_POOLED_DENOISE.out.merged_seqs

0 commit comments

Comments
 (0)