-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathvg_index.py
executable file
·1509 lines (1191 loc) · 68.9 KB
/
vg_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
vg_index.py: index a graph so it can be mapped to
"""
import argparse, sys, os, os.path, errno, random, subprocess, shutil, itertools, glob, tarfile
import doctest, re, json, collections, time, timeit, distutils.util
import logging, logging.handlers, struct, socket, threading
import string
import getpass
import pdb
import logging
from distutils import util
from math import ceil
from subprocess import Popen, PIPE
from toil.common import Toil
from toil.job import Job
from toil.realtimeLogger import RealtimeLogger
from toil_vg.vg_common import *
from toil_vg.context import Context, run_write_info_to_outstore
logger = logging.getLogger(__name__)
def index_subparser(parser):
"""
Create a subparser for indexing. Should pass in results of subparsers.add_parser()
"""
# Add the Toil options so the job store is the first argument
Job.Runner.addToilOptions(parser)
# Options specific to the toil-vg index driver
parser.add_argument("out_store",
help="output store. All output written here. Path specified using same syntax as toil jobStore")
parser.add_argument("--graphs", nargs='+', default=[], type=make_url,
help="input graph(s). one per chromosome (separated by space)")
parser.add_argument("--chroms", nargs='+',
help="name(s) of reference path in graph(s) (separated by space). If --graphs "
" has multiple elements, must be same length/order as --chroms (not needed for xg_index)")
parser.add_argument("--node_mapping", type=make_url,
help="node mapping file required for gbwt pruning. Created by toil-vg construct"
" (or vg ids -j)")
parser.add_argument("--bwa_index_fasta", type=make_url,
help="index the given FASTA for BWA MEM alignment")
# Add common options shared with everybody
add_common_vg_parse_args(parser)
# Add indexing options
index_toggle_parse_args(parser)
index_parse_args(parser)
# Add common docker options
add_container_tool_parse_args(parser)
def index_toggle_parse_args(parser):
"""
Common args we do not want to share with toil-vg run, and which toggle
different index types on and off.
Safe to use in toil-vg construct without having to import any files.
"""
parser.add_argument("--gcsa_index", dest="indexes", default=[], action="append_const", const="gcsa",
help="Make a gcsa index for each output graph")
parser.add_argument("--xg_index", dest="indexes", action="append_const", const="xg",
help="Make an xg index for each output graph")
parser.add_argument("--gbwt_index", dest="indexes", action="append_const", const="gbwt",
help="Make a GBWT index alongside the xg index for each output graph")
parser.add_argument("--snarls_index", dest="indexes", action="append_const", const="snarls",
help="Make an snarls file for each output graph")
parser.add_argument("--trivial_snarls_index", dest="indexes", action="append_const", const="trivial_snarls",
help="Make a trivial-inclusive snarls file for each output graph")
parser.add_argument("--distance_index", dest="indexes", action="append_const", const="distance",
help="Make a (minimum) distance index for each output graph")
parser.add_argument("--minimizer_index", dest="indexes", action="append_const", const="minimizer",
help="Make a minimizer index for each output graph")
parser.add_argument("--id_ranges_index", dest="indexes", action="append_const", const="id_ranges",
help="Make chromosome id ranges tables (so toil-vg map can optionally split output by chromosome)")
parser.add_argument("--alt_path_gam_index", dest="indexes", action="append_const", const="alt-gam",
help="Save alt paths from vg into an indexed GAM")
parser.add_argument("--xg_alts", dest="indexes", action="append_const", const="xg_alts",
help="Include alt paths in xg index")
parser.add_argument("--all_index", dest="indexes", action="store_const",
const=["gcsa", "xg", "gbwt", "snarls", "trivial_snarls", "distance", "minimizer", "id_ranges"],
help="Equivalent to --gcsa_index --xg_index --gbwt_index --snarls_index --trivial_snarls_index "
"--distance_index --minimizer_index --id_ranges_index")
def index_parse_args(parser):
"""
Indexing parameters which can be part of the main toil-vg run pipeline.
"""
parser.add_argument("--gcsa_index_cores", type=int,
help="number of threads during the gcsa indexing step")
parser.add_argument("--xg_index_cores", type=int,
help="number of threads during the xg indexing step")
parser.add_argument("--gbwt_index_cores", type=int,
help="number of threads during the gbwt indexing step")
parser.add_argument("--index_name", type=str, default='index',
help="name of index files. <name>.xg, <name>.gcsa etc.")
parser.add_argument("--gcsa_opts", type=str,
help="Options to pass to gcsa indexing.")
parser.add_argument("--minimizer_opts", type=str,
help="Options to pass to minimizer indexing.")
parser.add_argument("--vcf_phasing", nargs='+', type=make_url, default=[],
help="Import phasing information from VCF(s) into xg (or GBWT with --gbwt_index)")
parser.add_argument("--vcf_phasing_regions", nargs='+', default=[],
help="Hint the relevant chrom:start-end regions to the GBWT indexer, for subregion graphs")
parser.add_argument("--gbwt_input", type=make_url,
help="Use given GBWT for GCSA2 pruning")
parser.add_argument("--gbwt_prune", action='store_true',
help="Use gbwt for gcsa pruning")
parser.add_argument("--force_phasing", type=lambda x:bool(util.strtobool(x)), default=None,
help="If 'True', randomly phase unphased variants and discard unresolveable overlaps for GBWT")
def validate_index_options(options):
"""
Validate the index options semantics enforced by index only.
Throw an error if an invalid combination of options has been selected.
"""
if len(options.indexes) > 0:
require(len(options.graphs) == 0 or options.chroms, '--chroms must be specified for --graphs')
require(len(options.graphs) == 1 or len(options.chroms) == len(options.graphs),
'--chroms and --graphs must have'
' same number of arguments if more than one graph specified if doing anything but xg indexing')
require(any([len(options.indexes) > 0,
options.bwa_index_fasta]),
'one of --xg_index, --gcsa_index, --snarls_index, --trivial_snarls_index, --id_ranges_index, '
'--gbwt_index, --minimizer_index, --distance_index, --all_index, --alt_path_gam_index or '
'--bwa_index_fasta is required')
require(not options.gbwt_prune or options.node_mapping,
'--node_mapping required with --gbwt_prune')
require('gbwt' not in options.indexes or not options.gbwt_input,
'only one of --gbwt_index and --gbwt_input can be used at a time')
if options.gbwt_input:
require(options.gbwt_prune == 'gbwt', '--gbwt_prune required with --gbwt_input')
validate_shared_index_options(options)
def validate_shared_index_options(options):
"""
Validate the index options semantics enforced by index and construct.
Throw an error if an invalid combination of options has been selected.
"""
if options.vcf_phasing:
require(all([vcf.endswith('.vcf.gz') for vcf in options.vcf_phasing]),
'input phasing files must end with .vcf.gz')
if 'gbwt' in options.indexes:
require(len(options.vcf_phasing) > 0, 'generating a GBWT requires a VCF with phasing information')
if options.gbwt_prune:
require(('gbwt' in options.indexes) or options.gbwt_input, '--gbwt_index or --gbwt_input required for --gbwt_prune')
if options.vcf_phasing_regions:
require('gbwt' in options.indexes, "cannot hint regions to GBWT indexer without building a GBWT index")
def run_gcsa_prune(job, context, graph_name, input_graph_id, gbwt_id, mapping_id, remove_paths = []):
"""
Make a pruned graph using vg prune. If unfold_mapping_id is provided, use -u, else -r
"""
RealtimeLogger.info("Starting GCSA graph-pruning {}...".format("using GBWT" if gbwt_id else ""))
start_time = timeit.default_timer()
# Define work directory for docker calls
work_dir = job.fileStore.getLocalTempDir()
# Intermediate output
restored_filename = os.path.join(work_dir, "restored_{}".format(graph_name))
# Final output
pruned_filename = os.path.join(work_dir, "unfolded_{}".format(graph_name))
# Node Mapping output
mapping_filename = os.path.join(work_dir, 'node_mapping')
# Download input
graph_filename = os.path.join(work_dir, graph_name)
job.fileStore.readGlobalFile(input_graph_id, graph_filename)
gbwt_filename = graph_filename + '.gbwt'
if gbwt_id:
job.fileStore.readGlobalFile(gbwt_id, gbwt_filename)
if mapping_id:
job.fileStore.readGlobalFile(mapping_id, mapping_filename, mutable=True)
if remove_paths:
# Remove alt paths so they don't make the graph too complex when getting restored
remove_cmd = ['vg', 'mod', os.path.basename(graph_filename), '-I']
for remove_path in remove_paths:
remove_cmd += ['--retain-path', remove_path]
cmd = [remove_cmd, ['vg', 'prune', '-']]
else:
cmd = [['vg', 'prune', os.path.basename(graph_filename)]]
cmd[-1] += ['--threads', str(job.cores)]
if context.config.prune_opts:
cmd[-1] += context.config.prune_opts
if gbwt_id:
cmd[-1] += ['--append-mapping', '--mapping', os.path.basename(mapping_filename), '--unfold-paths']
cmd[-1] += ['--gbwt-name', os.path.basename(gbwt_filename)]
else:
cmd[-1] += ['--restore-paths']
with open(pruned_filename, 'wb') as pruned_file:
context.runner.call(job, cmd, work_dir=work_dir, outfile=pruned_file)
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished GCSA pruning. Process took {} seconds.".format(run_time))
pruned_graph_id = context.write_intermediate_file(job, pruned_filename)
if gbwt_id:
mapping_id = context.write_intermediate_file(job, mapping_filename)
else:
mapping_id = None
return pruned_graph_id, mapping_id
def run_gcsa_prep(job, context, input_graph_ids,
graph_names, index_name, chroms,
chrom_gbwt_ids, node_mapping_id, skip_pruning=False,
remove_paths=[]):
"""
Do all the preprocessing for gcsa indexing (pruning)
Then launch the indexing as follow-on
"""
RealtimeLogger.info("Starting gcsa preprocessing...")
start_time = timeit.default_timer()
if chrom_gbwt_ids:
assert len(chrom_gbwt_ids) <= len(input_graph_ids)
# to encapsulate everything under this job
child_job = Job()
job.addChild(child_job)
prune_root_job = Job()
child_job.addChild(prune_root_job)
# keep these in lists for now just in case
prune_jobs = []
# todo: figure out how best to update file with toil without making copies
mapping_ids = [node_mapping_id] if node_mapping_id and chrom_gbwt_ids else []
# prune each input graph.
prune_ids = []
for graph_i, input_graph_id in enumerate(input_graph_ids):
gbwt_id = chrom_gbwt_ids[graph_i] if chrom_gbwt_ids else None
mapping_id = mapping_ids[-1] if mapping_ids and gbwt_id else None
# toggle between parallel/sequential based on if we're unfolding or not
add_fn = prune_jobs[-1].addFollowOnJobFn if prune_jobs and gbwt_id else prune_root_job.addChildJobFn
if not skip_pruning:
prune_job = add_fn(run_gcsa_prune, context, graph_names[graph_i],
input_graph_id, gbwt_id, mapping_id,
remove_paths = remove_paths,
cores=context.config.prune_cores,
memory=context.config.prune_mem,
disk=context.config.prune_disk)
prune_id = prune_job.rv(0)
if gbwt_id:
prune_jobs.append(prune_job)
mapping_ids.append(prune_job.rv(1))
else:
prune_id = input_graph_id
prune_ids.append(prune_id)
return child_job.addFollowOnJobFn(run_gcsa_indexing, context, prune_ids,
graph_names, index_name, mapping_ids[-1] if mapping_ids else None,
cores=context.config.gcsa_index_cores,
memory=context.config.gcsa_index_mem,
disk=context.config.gcsa_index_disk,
preemptable=context.config.gcsa_index_preemptable).rv()
def run_gcsa_indexing(job, context, prune_ids, graph_names, index_name, mapping_id):
"""
Make the gcsa2 index. Return its store id
"""
RealtimeLogger.info("Starting gcsa indexing...")
start_time = timeit.default_timer()
# Define work directory for docker calls
work_dir = job.fileStore.getLocalTempDir()
# Scratch directory for indexing
index_temp_dir = os.path.join(work_dir, 'index-temp')
os.makedirs(index_temp_dir)
# Download all the pruned graphs.
prune_filenames = []
for graph_i, prune_id in enumerate(prune_ids):
prune_filename = os.path.join(work_dir, remove_ext(os.path.basename(graph_names[graph_i]), '.vg') + '.prune.vg')
job.fileStore.readGlobalFile(prune_id, prune_filename)
prune_filenames.append(prune_filename)
# Download the mapping_id
mapping_filename = None
if mapping_id:
mapping_filename = os.path.join(work_dir, 'node_mapping')
job.fileStore.readGlobalFile(mapping_id, mapping_filename)
# Where do we put the GCSA2 index?
gcsa_filename = "{}.gcsa".format(index_name)
command = ['vg', 'index', '-g', os.path.basename(gcsa_filename)] + context.config.gcsa_opts
command += ['--threads', str(job.cores)]
command += ['--temp-dir', os.path.join('.', os.path.basename(index_temp_dir))]
if mapping_id:
command += ['--mapping', os.path.basename(mapping_filename)]
for prune_filename in prune_filenames:
command += [os.path.basename(prune_filename)]
try:
context.runner.call(job, command, work_dir=work_dir)
except:
# Dump everything we need to replicate the index run
logging.error("GCSA indexing failed. Dumping files.")
for prune_filename in prune_filenames:
context.write_output_file(job, prune_filename)
if mapping_id:
context.write_output_file(job, mapping_filename)
raise
# Checkpoint index to output store
gcsa_file_id = context.write_output_file(job, os.path.join(work_dir, gcsa_filename))
lcp_file_id = context.write_output_file(job, os.path.join(work_dir, gcsa_filename) + ".lcp")
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished GCSA index. Process took {} seconds.".format(run_time))
return gcsa_file_id, lcp_file_id
def run_concat_vcfs(job, context, vcf_ids, tbi_ids):
"""
concatenate a list of vcfs. we do this because vg index -v only takes one vcf, and
we may be working with a set of chromosome vcfs.
"""
work_dir = job.fileStore.getLocalTempDir()
vcf_names = ['chrom_{}.vcf.gz'.format(i) for i in range(len(vcf_ids))]
out_name = 'genome.vcf.gz'
for vcf_id, tbi_id, vcf_name in zip(vcf_ids, tbi_ids, vcf_names):
job.fileStore.readGlobalFile(vcf_id, os.path.join(work_dir, vcf_name))
job.fileStore.readGlobalFile(tbi_id, os.path.join(work_dir, vcf_name + '.tbi'))
cmd = ['bcftools', 'concat'] + [vcf_name for vcf_name in vcf_names] + ['-O', 'z']
with open(os.path.join(work_dir, out_name), 'wb') as out_file:
context.runner.call(job, cmd, work_dir=work_dir, outfile = out_file)
cmd = ['tabix', '-f', '-p', 'vcf', out_name]
context.runner.call(job, cmd, work_dir=work_dir)
out_vcf_id = context.write_intermediate_file(job, os.path.join(work_dir, out_name))
out_tbi_id = context.write_intermediate_file(job, os.path.join(work_dir, out_name + '.tbi'))
return out_vcf_id, out_tbi_id
def run_combine_graphs(job, context, inputGraphFileIDs, graph_names, index_name, intermediate=False):
"""
Merge a list of graph files. We do this because the haplotype index vg index
produces can only be built for a single graph.
Takes the file IDs to concatenate, the human-readable names for those
files, and the base name of the index we are working on (which is used to
derive the graph output name).
Graph files to concatenate must already be in the same ID space.
If intermediate is set to true, the concatenated graph is not written to
the output store.
"""
# Define work directory for local files
work_dir = job.fileStore.getLocalTempDir()
RealtimeLogger.info("Starting VG graph merge...")
start_time = timeit.default_timer()
# The file names we are given can be very long, so if we download and cat
# everything we can run into maximum command line length limits.
# Unfortuantely, we need to use vg to do the graph combining because who
# knows what HandleGraph format each file is in.
# So download the files to short names.
filenames = []
for number, in_id in enumerate(inputGraphFileIDs):
# Determine where to save the graph
filename = '{}.vg'.format(number)
# Put it in the workdir
full_filename = os.path.join(work_dir, filename)
# Save to the given file
got_filename = job.fileStore.readGlobalFile(in_id, full_filename)
RealtimeLogger.info('Downloaded graph ID {} to {} (which should be {}) for joining'.format(in_id, got_filename, full_filename))
# Keep the filename
filenames.append(filename)
# Work out the file name we want to report
concatenated_basename = "{}.cat.vg".format(index_name)
# Run vg to combine into that file
cmd = ['vg', 'combine'] + filenames
try:
with open(os.path.join(work_dir, concatenated_basename), 'wb') as out_file:
context.runner.call(job, cmd, work_dir=work_dir, outfile = out_file)
except:
# Dump everything we need to replicate the index run
logging.error("Graph merging failed. Dumping files.")
for graph_filename in filenames:
context.write_output_file(job, os.path.join(work_dir, graph_filename))
raise
# Now we generate the concatenated file ID
concatenated_file_id = None
if intermediate:
# Save straight to the file store
concatenated_file_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, concatenated_basename))
else:
# Checkpoint concatednated graph file to output store
concatenated_file_id = context.write_output_file(job, os.path.join(work_dir, concatenated_basename))
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished VG graph merge. Process took {} seconds.".format(run_time))
return (concatenated_file_id, concatenated_basename)
def run_xg_indexing(job, context, inputGraphFileIDs, graph_names, index_name,
vcf_phasing_file_id = None, tbi_phasing_file_id = None,
make_gbwt=False, gbwt_regions=[],
intermediate=False, include_alt_paths=False):
"""
Make the xg index and optional GBWT haplotype index.
Saves the xg in the outstore as <index_name>.xg and the GBWT, if requested,
as <index_name>.gbwt.
If gbwt_regions is specified, it is a list of chrom:start-end region
specifiers, restricting, on each specified chromosome, the region of the
VCF that GBWT indexing will examine.
if make_gbwt is specified *and* a phasing VCF is specified, the GBWT will
be generated. Otherwise it won't be (for example, for single-contig graphs
where no VCF is available).
Return a tuple of file IDs, (xg_id, gbwt_id, thread_db_id). The GBWT ID
will be None if no GBWT is generated. The thread DB ID will be None if no
thread DB is generated.
If intermediate is set to true, do not save the produced files to the
output store.
"""
RealtimeLogger.info("Starting xg indexing...")
start_time = timeit.default_timer()
# Define work directory for docker calls
work_dir = job.fileStore.getLocalTempDir()
# Scratch directory for indexing
index_temp_dir = os.path.join(work_dir, 'index-temp')
os.makedirs(index_temp_dir)
RealtimeLogger.info("inputGraphFileIDs: {}".format(str(inputGraphFileIDs)))
RealtimeLogger.info("graph_names: {}".format(str(graph_names)))
# Our local copy of the graphs
graph_filenames = []
for i, graph_id in enumerate(inputGraphFileIDs):
graph_filename = os.path.join(work_dir, graph_names[i])
job.fileStore.readGlobalFile(graph_id, graph_filename)
graph_filenames.append(os.path.basename(graph_filename))
# If we have a separate GBWT it will go here
gbwt_filename = os.path.join(work_dir, "{}.gbwt".format(index_name))
# And if we ahve a separate thread db it will go here
thread_db_filename = os.path.join(work_dir, "{}.threads".format(index_name))
# Get the vcf file for loading phasing info
if vcf_phasing_file_id:
phasing_file = os.path.join(work_dir, 'phasing.{}.vcf.gz'.format(index_name))
job.fileStore.readGlobalFile(vcf_phasing_file_id, phasing_file)
job.fileStore.readGlobalFile(tbi_phasing_file_id, phasing_file + '.tbi')
phasing_opts = ['-v', os.path.basename(phasing_file)]
if make_gbwt:
# Write the haplotype index to its own file
phasing_opts += ['--gbwt-name', os.path.basename(gbwt_filename)]
for region in gbwt_regions:
phasing_opts += ['--region', region]
if context.config.force_phasing:
# We need to discard overlaps also to really get rid of haplotype breaks.
phasing_opts += ['--force-phasing', '--discard-overlaps']
else:
phasing_opts = []
# Where do we put the XG index?
xg_filename = "{}.xg".format(index_name)
# Now run the indexer.
RealtimeLogger.info("XG Indexing {}".format(str(graph_filenames)))
command = ['vg', 'index', '--threads', str(job.cores), '--xg-name', os.path.basename(xg_filename)]
command += phasing_opts + graph_filenames
command += ['--temp-dir', os.path.join('.', os.path.basename(index_temp_dir))]
if include_alt_paths:
command += ['--xg-alts']
try:
context.runner.call(job, command, work_dir=work_dir)
except:
# Dump everything we need to replicate the index run
logging.error("XG indexing failed. Dumping files.")
for graph_filename in graph_filenames:
context.write_output_file(job, os.path.join(work_dir, graph_filename))
if vcf_phasing_file_id:
context.write_output_file(job, phasing_file)
context.write_output_file(job, phasing_file + '.tbi')
raise
# Determine if we want to checkpoint index to output store
write_function = context.write_intermediate_file if intermediate else context.write_output_file
xg_file_id = write_function(job, os.path.join(work_dir, xg_filename))
gbwt_file_id = None
thread_db_file_id = None
if make_gbwt and vcf_phasing_file_id:
# Also save the GBWT if it was generated
gbwt_file_id = write_function(job, gbwt_filename)
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished XG index. Process took {} seconds.".format(run_time))
# TODO: convert to a dict
return (xg_file_id, gbwt_file_id)
def run_cat_xg_indexing(job, context, inputGraphFileIDs, graph_names, index_name,
vcf_phasing_file_id = None, tbi_phasing_file_id = None,
make_gbwt=False, gbwt_regions=[],
intermediate=False, intermediate_cat=True, include_alt_paths=False):
"""
Encapsulates run_combine_graphs and run_xg_indexing job functions.
Can be used for ease of programming in job functions that require running only
during runs of the run_xg_indexing job function.
Note: the resources assigned to indexing come from those assigned to this parent job
(as they can get toggled between xg and gbwt modes in the caller)
If intermediate is set to True, do not save the final .xg to the output store.
If intermediate_cat is False and intermediate is also False, save the .cat.vg to the output store.
"""
# to encapsulate everything under this job
child_job = Job()
job.addChild(child_job)
# Concatenate the graph files.
vg_concat_job = child_job.addChildJobFn(run_combine_graphs, context, inputGraphFileIDs,
graph_names, index_name, intermediate=(intermediate or intermediate_cat),
cores=job.cores,
memory=job.memory,
disk=job.disk)
return child_job.addFollowOnJobFn(run_xg_indexing,
context, [vg_concat_job.rv(0)],
[vg_concat_job.rv(1)], index_name,
vcf_phasing_file_id, tbi_phasing_file_id,
make_gbwt=make_gbwt, gbwt_regions=gbwt_regions,
intermediate=intermediate,
include_alt_paths=include_alt_paths,
cores=job.cores,
memory=job.memory,
disk=job.disk,
preemptable=job.preemptable).rv()
def run_snarl_indexing(job, context, inputGraphFileIDs, graph_names, index_name=None, include_trivial=False):
"""
Compute the snarls of the graph.
Saves the snarls file in the outstore as <index_name>.snarls or
<index_name>.trivial.snarls, as appropriate, unless index_name is None.
If incluse_trivial is set to true, include trivial snarls, which mpmap
cannot yet filter out itself for snarl cutting, but which are needed for
distance indexing.
Return the file ID of the snarls file.
"""
assert(len(inputGraphFileIDs) == len(graph_names))
# Decide on an index output extension.
extension = '.trivial.snarls' if include_trivial else '.snarls'
if len(inputGraphFileIDs) > 1:
# We have been given multiple chromosome graphs. Since snarl indexing
# can take a lot of memory, we are going to process each one separately
# and then concatenate the results.
RealtimeLogger.info("Breaking up snarl computation for {}".format(str(graph_names)))
snarl_jobs = []
for file_id, file_name in zip(inputGraphFileIDs, graph_names):
# For each input graph, make a child job to index it.
snarl_jobs.append(job.addChildJobFn(run_snarl_indexing, context, [file_id], [file_name],
include_trivial=include_trivial,
cores=context.config.snarl_index_cores,
memory=context.config.snarl_index_mem,
disk=context.config.snarl_index_disk))
# Make a job to concatenate the indexes all together
concat_job = snarl_jobs[0].addFollowOnJobFn(run_concat_files, context, [job.rv() for job in snarl_jobs],
index_name + extension if index_name is not None else None,
cores=context.config.snarl_index_cores,
memory=context.config.snarl_index_mem,
disk=context.config.snarl_index_disk)
for i in range(1, len(snarl_jobs)):
# And make it wait for all of them
snarl_jobs[i].addFollowOn(concat_job)
return concat_job.rv()
else:
# Base case: single graph
RealtimeLogger.info("Starting snarl computation {} trivial snarls...".format('with' if include_trivial else 'without'))
start_time = timeit.default_timer()
# Define work directory for docker calls
work_dir = job.fileStore.getLocalTempDir()
# Download the one graph
graph_id = inputGraphFileIDs[0]
graph_filename = graph_names[0]
job.fileStore.readGlobalFile(graph_id, os.path.join(work_dir, graph_filename))
# Where do we put the snarls?
snarl_filename = os.path.join(work_dir, (index_name if index_name is not None else "part") + extension)
# Now run the indexer.
RealtimeLogger.info("Computing snarls for {}".format(graph_filename))
cmd = ['vg', 'snarls', graph_filename]
if include_trivial:
cmd += ['--include-trivial']
with open(snarl_filename, "wb") as snarl_file:
try:
# Compute snarls to the correct file
context.runner.call(job, cmd, work_dir=work_dir, outfile=snarl_file)
except:
# Dump everything we need to replicate the indexing
logging.error("Snarl indexing failed. Dumping files.")
context.write_output_file(job, os.path.join(work_dir, graph_filename))
raise
if index_name is not None:
# Checkpoint index to output store
snarl_file_id = context.write_output_file(job, snarl_filename)
else:
# Just save the index as an intermediate
snarl_file_id = context.write_intermediate_file(job, snarl_filename)
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished computing snarls. Process took {} seconds.".format(run_time))
return snarl_file_id
def run_distance_indexing(job, context, input_xg_id, input_trivial_snarls_id, index_name=None, max_distance_threshold=0):
"""
Make a distance index from the given XG index and the given snarls file,
including the trivial snarls.
TODO: also support a single VG file and snarls.
If index_name is not None, saves it as <index_name>.dist to the output
store.
If max_distance_threshold is strictly positive, also includes a max
distance index with the given limit. By default includes only a min
distance index.
Returns the file ID of the resulting distance index.
"""
RealtimeLogger.info("Starting distance indexing...")
start_time = timeit.default_timer()
# Define work directory for docker calls
work_dir = job.fileStore.getLocalTempDir()
# Download the input files.
xg_filename = os.path.join(work_dir, 'graph.xg')
job.fileStore.readGlobalFile(input_xg_id, xg_filename)
trivial_snarls_filename = os.path.join(work_dir, 'graph.trivial.snarls')
job.fileStore.readGlobalFile(input_trivial_snarls_id, trivial_snarls_filename)
# Where do we put the distance index?
distance_filename = os.path.join(work_dir, (index_name if index_name is not None else 'graph') + '.dist')
cmd = ['vg', 'index', '-t', max(1, int(job.cores)), '-j', os.path.basename(distance_filename),
'-x', os.path.basename(xg_filename), '-s', os.path.basename(trivial_snarls_filename)]
if max_distance_threshold > 0:
# Add a max distance index with this limit.
cmd.append('-w')
cmd.append(str(max_distance_threshold))
try:
# Compute the index to the correct file
context.runner.call(job, cmd, work_dir=work_dir)
except:
# Dump everything we need to replicate the indexing
logging.error("Distance indexing failed. Dumping files.")
context.write_output_file(job, xg_filename)
context.write_output_file(job, trivial_snarls_filename)
if os.path.exists(distance_filename):
context.write_output_file(job, distance_filename)
raise
if index_name is not None:
# Checkpoint index to output store
distance_file_id = context.write_output_file(job, distance_filename)
else:
# Just save the index as an intermediate
distance_file_id = context.write_intermediate_file(job, distance_filename)
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished computing distance index. Process took {} seconds.".format(run_time))
return distance_file_id
def run_minimizer_indexing(job, context, input_xg_id, input_gbwt_id, index_name=None):
"""
Make a minimizer index file for the graph and haplotypes described by the
given input XG and GBWT indexes.
If index_name is not None, saves it as <index_name>.min to the output
store.
Returns the file ID of the resulting minimizer index.
"""
RealtimeLogger.info("Starting minimizer indexing...")
start_time = timeit.default_timer()
# Define work directory for docker calls
work_dir = job.fileStore.getLocalTempDir()
# Check our setup.
# TODO: Be able to build a minimizer index for all paths somehow if the
# GBWT isn't available, for e.g. a linear graph.
assert input_xg_id is not None
assert input_gbwt_id is not None
# Download the input files.
xg_filename = os.path.join(work_dir, 'graph.xg')
job.fileStore.readGlobalFile(input_xg_id, xg_filename)
gbwt_filename = os.path.join(work_dir, 'graph.gbwt')
job.fileStore.readGlobalFile(input_gbwt_id, gbwt_filename)
# Where do we put the minimizer index?
minimizer_filename = os.path.join(work_dir, (index_name if index_name is not None else 'graph') + '.min')
cmd = ['vg', 'minimizer', '-t', max(1, int(job.cores)), '-i', os.path.basename(minimizer_filename),
'-g', os.path.basename(gbwt_filename)] + context.config.minimizer_opts + [os.path.basename(xg_filename)]
try:
# Compute the index to the correct file
context.runner.call(job, cmd, work_dir=work_dir)
except:
# Dump everything we need to replicate the indexing
logging.error("Minimizer indexing failed. Dumping files.")
context.write_output_file(job, xg_filename)
context.write_output_file(job, gbwt_filename)
context.write_output_file(job, minimizer_filename)
raise
if index_name is not None:
# Checkpoint index to output store
minimizer_file_id = context.write_output_file(job, minimizer_filename)
else:
# Just save the index as an intermediate
minimizer_file_id = context.write_intermediate_file(job, minimizer_filename)
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished computing minimizer index. Process took {} seconds.".format(run_time))
return minimizer_file_id
def run_id_ranges(job, context, inputGraphFileIDs, graph_names, index_name, chroms):
""" Make a file of chrom_name <tab> first_id <tab> last_id covering the
id ranges of all chromosomes. This is to speed up gam splitting down the road.
"""
RealtimeLogger.info("Starting id ranges...")
start_time = timeit.default_timer()
# Our id ranges (list of triples)
id_ranges = []
# to encapsulate everything under this job
child_job = Job()
job.addChild(child_job)
# Get the range for one graph per job.
for graph_id, graph_name, chrom in zip(inputGraphFileIDs, graph_names, chroms):
id_range = child_job.addChildJobFn(run_id_range, context, graph_id, graph_name, chrom,
cores=context.config.prune_cores,
memory=context.config.prune_mem, disk=context.config.prune_disk).rv()
id_ranges.append(id_range)
# Merge them into a file and return its id
return child_job.addFollowOnJobFn(run_merge_id_ranges, context, id_ranges, index_name,
cores=context.config.misc_cores, memory=context.config.misc_mem,
disk=context.config.misc_disk).rv()
end_time = timeit.default_timer()
run_time = end_time - start_time
RealtimeLogger.info("Finished id ranges. Process took {} seconds.".format(run_time))
def run_id_range(job, context, graph_id, graph_name, chrom):
"""
Compute a node id range for a graph (which should be an entire contig/chromosome with
contiguous id space -- see vg ids) using vg stats
"""
work_dir = job.fileStore.getLocalTempDir()
# download graph
graph_filename = os.path.join(work_dir, graph_name)
job.fileStore.readGlobalFile(graph_id, graph_filename)
#run vg stats
#expect result of form node-id-range <tab> first:last
command = ['vg', 'stats', '--node-id-range', os.path.basename(graph_filename)]
stats_out = context.runner.call(job, command, work_dir=work_dir, check_output = True).strip().split()
assert stats_out[0].decode('ascii') == 'node-id-range'
first, last = stats_out[1].split(b':')
return chrom, first, last
def run_merge_id_ranges(job, context, id_ranges, index_name):
""" create a BED-style file of id ranges
"""
work_dir = job.fileStore.getLocalTempDir()
# Where do we put the id ranges tsv?
id_range_filename = os.path.join(work_dir, '{}_id_ranges.tsv'.format(index_name))
with open(id_range_filename, 'wb') as f:
for id_range in id_ranges:
f.write('{}\t{}\t{}\n'.format(*id_range).encode())
# Checkpoint index to output store
return context.write_output_file(job, id_range_filename)
def run_merge_gbwts(job, context, chrom_gbwt_ids, index_name):
""" merge up some gbwts
"""
work_dir = job.fileStore.getLocalTempDir()
gbwt_chrom_filenames = []
for i, gbwt_id in enumerate(chrom_gbwt_ids):
if gbwt_id:
gbwt_filename = os.path.join(work_dir, '{}.gbwt'.format(i))
job.fileStore.readGlobalFile(gbwt_id, gbwt_filename)
gbwt_chrom_filenames.append(gbwt_filename)
if len(gbwt_chrom_filenames) == 0:
return None
elif len(gbwt_chrom_filenames) == 1:
return context.write_output_file(job, gbwt_chrom_filenames[0],
out_store_path = index_name + '.gbwt')
else:
# Merge the GBWT files together
cmd = ['vg', 'gbwt', '--merge', '--fast', '--output', index_name + '.gbwt']
cmd += [os.path.basename(f) for f in gbwt_chrom_filenames]
try:
context.runner.call(job, cmd, work_dir=work_dir)
except:
# Dump everything we need to replicate the merge
logging.error("GBWT merge failed. Dumping files.")
for f in gbwt_chrom_filenames:
context.write_output_file(job, f)
raise
return context.write_output_file(job, os.path.join(work_dir, index_name + '.gbwt'))
def run_bwa_index(job, context, fasta_file_id, bwa_index_ids=None, intermediate=False, copy_fasta=False):
"""
Make a bwa index for a fast sequence if not given in input.
If intermediate is set to True, do not output them. Otherwise, output them
as bwa.fa.<index type>.
Returns a dict from index extension to index file ID.
Note that BWA produces 'amb', 'ann', 'bwt', 'pac', and 'sa' index files.
If such a nonempty dict is passed in already, return that instead (and
don't output any files).
If copy_fasta is True (and intermediate is False), also output the input FASTA to the out store.
"""
if not bwa_index_ids:
bwa_index_ids = dict()
work_dir = job.fileStore.getLocalTempDir()
# Download the FASTA file to be indexed
# It would be nice to name it the same as the actual input FASTA but we'd have to peek at the options
fasta_file = os.path.join(work_dir, 'bwa.fa')
job.fileStore.readGlobalFile(fasta_file_id, fasta_file)
cmd = ['bwa', 'index', os.path.basename(fasta_file)]
context.runner.call(job, cmd, work_dir = work_dir)
# Work out how to output the files
write_file = context.write_intermediate_file if intermediate else context.write_output_file
for idx_file in glob.glob('{}.*'.format(fasta_file)):
# Upload all the index files created, and store their IDs under their extensions
bwa_index_ids[idx_file[len(fasta_file):]] = write_file(job, idx_file)
if copy_fasta and not intermediate:
# We ought to upload the FASTA also.
context.write_output_file(job, fasta_file)
return bwa_index_ids
def run_minimap2_index(job, context, fasta_file_id, minimap2_index_id=None, intermediate=False, copy_fasta=False):
"""
Make a minimap2 index for a fasta sequence if not given in input.
If intermediate is set to True, do not output it. Otherwise, output it
as minimap2.fa.mmi.
Returns the index file ID.
If copy_fasta is True (and intermediate is False), also output the input FASTA to the out store.
"""
if not minimap2_index_id:
work_dir = job.fileStore.getLocalTempDir()
# Download the FASTA file to be indexed
# It would be nice to name it the same as the actual input FASTA but we'd have to peek at the options
fasta_file = os.path.join(work_dir, 'minimap2.fa')
job.fileStore.readGlobalFile(fasta_file_id, fasta_file)
# Say where the index should go
index_file = os.path.join(work_dir, 'minimap2.fa.mmi')
# Make the index
cmd = ['minimap2', '-d', os.path.basename(index_file), os.path.basename(fasta_file)]
context.runner.call(job, cmd, work_dir = work_dir)
# Work out how to output the files
write_file = context.write_intermediate_file if intermediate else context.write_output_file
minimap2_index_id = write_file(job, index_file)
if copy_fasta and not intermediate:
# We ought to upload the FASTA also.