-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.nf
303 lines (273 loc) · 13.6 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
#!/usr/bin/env nextflow
/*
* RNAswarm: differential RNA-RNA interaction probing pipeline based on RNA proximity ligation data
*
* Authors:
* - Gabriel Lencioni Lovate <[email protected]>
* - Kevin Lamkiewicz <[email protected]>
*/
nextflow.enable.dsl=2
/**************************
* MODULES
**************************/
// preprocessing
include { fastpTrimming} from './modules/data_preprocessing.nf'
include { fastqcReport } from './modules/reports_generation.nf'
workflow preprocessing {
take: reads_ch
main:
// Trims reads with fastp
fastpTrimming( reads_ch )
// Generates fastqc reports
qc_ch = fastpTrimming.out.concat( reads_ch )
fastqcReport( qc_ch )
emit:
fastpTrimming.out
fastqcReport.out
}
// mapping with segemehl
include { segemehlIndex; segemehl; segemehlPublish; convertSAMtoBAM } from './modules/map_reads.nf'
include { getStats } from './modules/reports_generation.nf'
workflow segemehl_mapping {
take:
preprocessed_reads_ch
genomes_ch
main:
// Indexes the reference genomes
segemehlIndex( genomes_ch )
// Maps the reads to the reference genomes
segemehl_input_ch = segemehlIndex.out.combine( // group name, genome file, idx file
preprocessed_reads_ch // sample name, reads file, group name
.map{ it -> [ it[2], it[0], it[1] ] }, by: 0 // group name, sample name, reads file
)
.map{ it -> [ it[3], it[4], it[0], it[1], it[2] ] } // sample name, reads file, group name, genome file, idx file
segemehl( segemehl_input_ch )
// Publishes segemehl trns files for inspection
segemehlPublish( segemehl.out )
// Converts segemehl's SAM output to BAM file
convertSAMtoBAM(
segemehl.out.map{ it -> [ it[0], it[4], 'segemehl' ] }
)
// Runs samtools flagstats on the BAM file
getStats( segemehl.out.map{ it -> [ it[0], it[4] ] } )
emit:
segemehl.out
convertSAMtoBAM.out
getStats.out
}
/**************************
* WORKFLOW ENTRY POINT
**************************/
// array filling using numpy
include { fillArrays; mergeArrays } from './modules/handle_arrays.nf'
// plot heatmaps
include { plotHeatmaps as plotHeatmapsRaw } from './modules/data_visualization.nf'
include { plotHeatmaps as plotHeatmapsMerged } from './modules/data_visualization.nf'
include { plotHeatmapsAnnotated } from './modules/data_visualization.nf'
include { plotHeatmapsAnnotated as plotHeatmapsAnnotatedDedup } from './modules/data_visualization.nf'
include { annotateArrays; mergeAnnotations } from './modules/annotate_interactions.nf'
// differential analysis
include { generateCountTables; mergeCountTables; runDESeq2 } from './modules/differential_analysis.nf'
// make alias for mergeCountTables to merge all count tables
include { mergeCountTables as mergeAllCountTables } from './modules/differential_analysis.nf'
// deduplicate annotations
include { deduplicateAnnotations } from './modules/annotate_interactions.nf'
// generate circos plots
include { makeCircosTable_deseq2; makeCircosTable_count_table; runCircos_single; runCircos_comb } from './modules/data_visualization.nf'
workflow {
if ( params.help ) {
println("""
RNAswarm: differential RNA-RNA interaction probing pipeline based on RNA proximity ligation data
Usage:
The typical command for running the pipeline is as follows:
nextflow run gabriellovate/RNAswarm -profile local,apptainer --samples <SAMPLES_CSV_FILE> --comparisons <COMPARISONS_CSV_FILE> --output <OUTDIR>
nextflow run gabriellovate/RNAswarm -profile slurm,apptainer --samples <SAMPLES_CSV_FILE> --comparisons <COMPARISONS_CSV_FILE> --output <OUTDIR>
nextflow run gabriellovate/RNAswarm -profile local,apptainer --samples <SAMPLES_CSV_FILE> --comparisons <COMPARISONS_CSV_FILE> --annotation_table <ANNOTATION_TABLE> --output <OUTDIR>
nextflow run gabriellovate/RNAswarm -profile slurm,apptainer --samples <SAMPLES_CSV_FILE> --comparisons <COMPARISONS_CSV_FILE> --annotation_table <ANNOTATION_TABLE> --output <OUTDIR>
Mandatory arguments:
--samples <SAMPLES_CSV_FILE> CSV file containing the samples to be processed. The file must have the following format:
<SAMPLE_NAME>,<READ_FILE>,<GENOME_FILE>,<GROUP_NAME>
<SAMPLE_NAME>,<READ_FILE>,<GENOME_FILE>,<GROUP_NAME>
...
<SAMPLE_NAME>,<READ_FILE>,<GENOME_FILE>,<GROUP_NAME>
where:
- <SAMPLE_NAME> is the name of the sample
- <READ_FILE> is the path to the read file
- <GENOME_FILE> is the path to the genome file
- <GROUP_NAME> is the name of the group to which the sample belongs
--comparisons <COMPARISONS_CSV_FILE> CSV file containing the comparisons to be performed. The file must have the following format:
<GROUP_NAME_1>,<GROUP_NAME_2>
<GROUP_NAME_1>,<GROUP_NAME_3>
...
<GROUP_NAME_2>,<GROUP_NAME_3>
where:
- <GROUP_NAME_X> is the name of the group
--output <OUTDIR> Output directory
Optional arguments:
--annotation_table <ANNOTATION_TABLE> CSV, TSV or XLSX file containing the annotations to be used.
--test Run in test mode. This will run the pipeline with a small subset of the data
--help Print this help message
""")
exit 0
}
// parse sample's csv file
if ( params.test ) {
println "Running in test mode"
samples_input_ch = Channel
.fromPath( params.samples, checkIfExists: true )
.splitCsv()
.map{
row -> [
"${row[0]}", // sample name
file("${baseDir}${row[1]}", checkIfExists: true), // read file
file("${baseDir}${row[2]}", checkIfExists: true), // genome file
"${row[3]}" // group name
]
}
} else {
samples_input_ch = Channel
.fromPath( params.samples, checkIfExists: true )
.splitCsv()
.map{
row -> [
"${row[0]}", // sample name
file("${row[1]}", checkIfExists: true), // read file
file("${row[2]}", checkIfExists: true), // genome file
"${row[3]}" // group name
]
}
}
reads_ch = samples_input_ch
.map{ it -> [ it[0], it[1], it[3] ] } // sample name, read file, group name
genomes_ch = samples_input_ch
.map{ it -> [ it[3], it[2] ] } // group name, genome file
.unique()
// preprocessing workflow
preprocessing( reads_ch )
// segemehl workflow
segemehl_mapping( preprocessing.out[0], genomes_ch )
// fill arrays with the segemehl output
array_ch = fillArrays(
segemehl_mapping.out[0]
.map( it -> [ it[0], it[1], it[5], it[6] ] ) // sample name, trns file, group name, genome
)
// plot heatmaps using the filled arrays
plotHeatmapsRaw(
array_ch
.map( it -> [ it[0], it[3], it[4] ] ) // sample name, genome, array
)
// accumulate arrays with the same group name
groupped_arrays_ch = array_ch
.groupTuple( by: 2 ) // This can be streamlined by knowing the number of samples in each group beforehand,
// but should be fine for now
.map( it -> [ it[2], it[3][0], it[4].flatten()] ) // group name, genome, arrays
// merge arrays with the same group name
merged_arrays_ch = mergeArrays( groupped_arrays_ch )
// plot heatmaps using the merged arrays
plotHeatmapsMerged( merged_arrays_ch )
// Check if annotations are present
if ( params.annotation_table ) {
// Create a channel with the annotations
annotated_arrays_ch = merged_arrays_ch
.combine( Channel.fromPath( params.annotation_table, checkIfExists: true ) )
annotated_trns_ch = segemehl_mapping.out[0]
.map( it -> [ it[0], it[1], it[5] ] ) // sample name, trns file, group name
.combine( Channel.fromPath( params.annotation_table, checkIfExists: true ) )
} else {
// Annotate interactions de novo
annotated_arrays_ch = annotateArrays(
merged_arrays_ch
)
// collect annotations from the annotated_arrays_ch channel and merge them
mergeAnnotations(
annotated_arrays_ch
.collect { it[3] }
)
//
annotated_trns_ch = segemehl_mapping.out[0]
.map( it -> [ it[0], it[1], it[5] ] ) // sample name, trns file, group name
.combine( mergeAnnotations.out )
}
// Plot the annotations on the heatmaps
plotHeatmapsAnnotated(
annotated_arrays_ch.map( it -> [ it[0], it[1], it[2], it[3] ] ) // sample name, genome, array, annotations
)
// Generate count tables
annotated_trns_ch
count_tables_ch = generateCountTables( annotated_trns_ch )
merged_count_tables_ch = mergeCountTables(
count_tables_ch
.groupTuple( by: 2 )
.map( it -> [ it[2], it[1] ] ) // group name, count tables
)
// Merge all count tables independently of the group by collecting all count tables
merged_count_tables_all_ch = mergeAllCountTables(
count_tables_ch
.map( it -> [ it[1] ] ) // count tables
.collect()
.map( it -> [ "all", it ] ) // group name, count tables
)
if ( params.annotation_table ) {
// Plot annotations on the heatmaps
annotated_arrays_ch
plotHeatmapsAnnotatedDedup( annotated_arrays_ch )
} else {
// Deduplicate annotations
deduplicate_annotations_input_ch = merged_count_tables_all_ch // group_name, merged_count_table
.combine( mergeAnnotations.out ) // merged_annotations
.map( it -> [ it[0], it[2], it[1] ] ) // group name, count table, annotations
deduplicate_annotations_input_ch
deduplicateAnnotations( deduplicate_annotations_input_ch )
// Plot deduplicated annotations on the heatmaps
dedup_heatmaps_ch = annotated_arrays_ch
.map( it -> [ it[0], it[1], it[2], it[3] ] ) // sample name, genome, array, annotations
.combine( deduplicateAnnotations.out )
dedup_heatmaps_ch
plotHeatmapsAnnotatedDedup( dedup_heatmaps_ch )
}
// Run differential analysis with DESeq2
samples_input_ch = Channel
.fromPath( params.comparisons, checkIfExists: true )
.splitCsv()
.combine( merged_count_tables_ch, by: 0 )
.map( it -> [ it[1], it[0], it[2] ] )
.combine( merged_count_tables_ch, by: 0 )
.map( it -> [ it[1], it[2], it[0], it[3] ] )
runDESeq2( samples_input_ch )
// Generate circos files
if ( params.annotation_table ) {
annotated_arrays_remapped_ch = annotated_arrays_ch
.map( it -> [ it[0], it[3] ] )
.combine(merged_count_tables_all_ch)
circos_deseq2_ch = runDESeq2.out
.combine( genomes_ch, by: 0 )
.map( it -> [ it[1], it[0], it[3], it[2] ] )
.combine( genomes_ch, by: 0 )
.map( it -> [ it[1], it[2], it[0], it[4], it[3] ] )
.combine( annotated_arrays_remapped_ch, by: 0)
.map( it -> [ it[0], it[1], it[2], it[3], it[4], it[6], it[5], it[7] ] )
circos_count_table_ch = merged_count_tables_ch
.combine( genomes_ch, by: 0 )
.map( it -> [ it[0], it[2], it[1] ] )
.combine( annotated_arrays_remapped_ch, by: 0)
.map( it -> [ it[0], it[1], it[2], it[4], it[3], it[5] ])
} else {
circos_deseq2_ch = runDESeq2.out
.combine( genomes_ch, by: 0 )
.map( it -> [ it[1], it[0], it[3], it[2] ] )
.combine( genomes_ch, by: 0 )
.map( it -> [ it[1], it[2], it[0], it[4], it[3] ] )
.combine( deduplicateAnnotations.out )
circos_count_table_ch = merged_count_tables_ch
.combine( genomes_ch, by: 0 )
.map( it -> [ it[0], it[2], it[1] ] )
.combine( deduplicateAnnotations.out )
}
circos_deseq2_ch
// Create circos tables
makeCircosTable_deseq2( circos_deseq2_ch )
makeCircosTable_count_table( circos_count_table_ch )
// Render circos plots
runCircos_single( makeCircosTable_count_table.out )
runCircos_comb( makeCircosTable_deseq2.out )
}