-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathtr_harmonizer.py
1781 lines (1540 loc) · 65.4 KB
/
tr_harmonizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Utilities for harmonizing tandem repeat VCF records.
Handles VCFs generated by various TR genotyping tools
"""
import enum
import re
import warnings
from typing import Any, Callable, Dict, Generator, Iterator, List, Optional, Set, Tuple, Union
import cyvcf2
import numpy as np
import trtools.utils.utils as utils
import trtools.utils.common as common
# List of supported VCF types
# TODO: add Beagle
# TODO: add support for tool version numbers
_beagle_error = "If this file was imputed by Beagle, did you remember to copy the info fields over?"
class VcfTypes(enum.Enum):
"""The different tr callers that tr_harmonizer supports."""
# enum constants must have values, so given them
# redundant values
gangstr = "gangstr"
advntr = "advntr"
hipstr = "hipstr"
eh = "eh"
popstr = "popstr"
longtr = "longtr"
# Don't include the redundant values
# in how enums are printed out
def __repr__(self):
return '<{}.{}>'.format(self.__class__.__name__, self.name)
class TRDosageTypes(enum.Enum):
"""Ways to compute TR dosages."""
bestguess = "bestguess"
beagleap = "beagleap"
bestguess_norm = "bestguess_norm"
beagleap_norm = "beagleap_norm"
def __repr__(self):
return '<{}.{}>'.format(self.__class__.__name__, self.name)
def _ToVCFType(vcftype: Union[str, VcfTypes]):
# Convert the input to a VcfTypes enum.
#
# If it is a string, look up the VcfTypes enum.
# If it is already a VcfTypes enum, return it.
# Otherwise, error
if isinstance(vcftype, str):
if vcftype not in VcfTypes.__members__:
raise ValueError(("{} is not an excepted TR vcf type. "
"Expected one of {}").format(
vcftype, list(VcfTypes.__members__)))
return VcfTypes[vcftype]
elif isinstance(vcftype, VcfTypes):
return vcftype
else:
raise TypeError(("{} (of type {}) is not a vcftype"
.format(vcftype, type(vcftype))))
def MayHaveImpureRepeats(vcftype: Union[str, VcfTypes]):
"""
Determine if any of the alleles in this VCF may contain impure repeats.
Specifically, impure repeats include:
* impurities in the underlying sequence (e.g. AAATAAAAA)
* partial repeats (e.g. AATAATAATAA)
This is a guarantee that the caller attempted to call impure repeats,
not that it found any. It also does not guarantee that
all impurities present were identified and called.
Returns
-------
bool
Indicates whether repeat sequences may be impure
"""
vcftype = _ToVCFType(vcftype)
if vcftype == VcfTypes.gangstr:
return False
if vcftype == VcfTypes.hipstr:
return True
if vcftype == VcfTypes.longtr:
return True
if vcftype == VcfTypes.advntr:
return True
if vcftype == VcfTypes.popstr:
return True
if vcftype == VcfTypes.eh:
return False
# Can't cover this line because it is future proofing.
# (It explicitly is not reachable now,
# would only be reachable if VcfTypes is expanded in the future)
_UnexpectedTypeError(vcftype) # pragma: no cover
def HasLengthRefGenotype(vcftype: Union[str, VcfTypes]):
"""
Determine if the reference alleles of variants are given by length.
If True, then reference alleles for all variants produced by this
caller are specified by length and not by sequence. Sequences are
fabricated according to :py:func:`trtools.utils.utils.FabricateAllele`.
If True, then :py:meth:`HasLengthAltGenotypes` will also be true
Returns
-------
bool
Indicates whether ref alleles are specified by length
"""
vcftype = _ToVCFType(vcftype)
if vcftype == VcfTypes.gangstr:
return False
if vcftype == VcfTypes.hipstr:
return False
if vcftype == VcfTypes.longtr:
return False
if vcftype == VcfTypes.advntr:
return False
if vcftype == VcfTypes.popstr:
return False
if vcftype == VcfTypes.eh:
return True
# Can't cover this line because it is future proofing.
# (It explicitly is not reachable now,
# would only be reachable if VcfTypes is expanded in the future)
_UnexpectedTypeError(vcftype) # pragma: no cover
def HasLengthAltGenotypes(vcftype: Union[str, VcfTypes]):
"""
Determine if the alt alleles of variants are given by length.
If True, then alt alleles for all variants produced by this
caller are specified by length and not by sequence. Sequences are
fabricated according to :py:func:`trtools.utils.utils.FabricateAllele`.
Returns
-------
bool
Indicates whether alt alleles are specified by length
"""
vcftype = _ToVCFType(vcftype)
if vcftype == VcfTypes.gangstr:
return False
if vcftype == VcfTypes.hipstr:
return False
if vcftype == VcfTypes.longtr:
return False
if vcftype == VcfTypes.advntr:
return False
if vcftype == VcfTypes.popstr:
return True
if vcftype == VcfTypes.eh:
return True
# Can't cover this line because it is future proofing.
# (It explicitly is not reachable now,
# would only be reachable if VcfTypes is expanded in the future)
_UnexpectedTypeError(vcftype) # pragma: no cover
def _UnexpectedTypeError(vcftype: Union[str, VcfTypes]):
raise ValueError("self.vcftype is the unexpected type {}"
.format(vcftype))
def InferVCFType(vcffile: cyvcf2.VCF, vcftype: Union[str, VcfTypes] = "auto") -> VcfTypes:
"""
Infer the genotyping tool used to create the VCF.
When we can, infer from header metadata.
Otherwise, try to infer the type from the ALT field.
Parameters
----------
vcffile :
The input VCF file
vcftype :
If it is unclear which of a few VCF callers produced the underlying
VCF (because the output markings of those VCF callers are similar)
this string can be supplied by the user to choose from among
those callers.
Returns
-------
vcftype : VcfTypes
Type of the VCF file
Raises
------
TypeError
If this VCF does not look like it was produced by any supported TR
caller, or if it looks like it could have been produced by more than one
supported TR caller and vcftype == 'auto', or if vcftype doesn't match
any of the callers that could have produced this VCF.
"""
possible_vcf_types = set()
header = vcffile.raw_header.lower()
if 'command=' in header and 'gangstr' in header:
possible_vcf_types.add(VcfTypes.gangstr)
if 'command=' in header and 'hipstr' in header:
possible_vcf_types.add(VcfTypes.hipstr)
if 'command=' in header and 'longtr' in header:
possible_vcf_types.add(VcfTypes.longtr)
if 'source=advntr' in header:
possible_vcf_types.add(VcfTypes.advntr)
if 'source=popstr' in header:
possible_vcf_types.add(VcfTypes.popstr)
if re.search(r'ALT=<ID=STR\d+'.lower(), header):
possible_vcf_types.add(VcfTypes.eh)
if len(possible_vcf_types) == 0:
raise TypeError('Could not identify the type of this vcf')
if vcftype == 'auto':
if len(possible_vcf_types) == 1:
return next(iter(possible_vcf_types))
else:
raise TypeError(('Confused - this vcf looks like it could have '
'been any of the types: {}. Please specify '
'--vcftype to choose one of '
'them').format(possible_vcf_types))
user_supplied_type = _ToVCFType(vcftype)
if user_supplied_type in possible_vcf_types:
return user_supplied_type
else:
raise TypeError(('Confused - this vcf looks like it could have '
'been any of the types: {}. But you specified: '
'--vcftype {} which is not one of those types.'
.format(possible_vcf_types, vcftype)))
def IsBeagleVCF(vcffile: cyvcf2.VCF) -> bool:
"""
Is this a VCF produced by running the Beagle software to impute STRs from a panel generated by an TR genotyper,
or does it consist of STRs directly called by a TR genotyper?
Parameters
----------
vcffile :
The input VCF file
Returns
-------
bool
Whether this is a VCF produced by Beagle
"""
return bool(re.search('##source=(\'|")beagle', vcffile.raw_header.lower()))
def HarmonizeRecord(vcftype: Union[str, VcfTypes], vcfrecord: cyvcf2.Variant):
"""
Create a standardized TRRecord object out of a cyvcf2.Variant
object of possibly unknown type.
Parameters
----------
vcfrecord :
A cyvcf2.Variant Object
vcftype : VcfTypes
Type of the VCF file
Returns
-------
trrecord : TRRecord
A TRRecord object built out of the input record
"""
vcftype = _ToVCFType(vcftype)
if vcftype == VcfTypes.gangstr:
return _HarmonizeGangSTRRecord(vcfrecord)
if vcftype == VcfTypes.hipstr:
return _HarmonizeHipSTRRecord(vcfrecord)
# Note: LongTR is the same format of HipSTR so
# we re-use that function here
if vcftype == VcfTypes.longtr:
return _HarmonizeHipSTRRecord(vcfrecord)
if vcftype == VcfTypes.advntr:
return _HarmonizeAdVNTRRecord(vcfrecord)
if vcftype == VcfTypes.eh:
return _HarmonizeEHRecord(vcfrecord)
if vcftype == VcfTypes.popstr:
return _HarmonizePopSTRRecord(vcfrecord)
# Can't cover this line because it is future proofing.
# (It explicitly is not reachable now,
# would only be reachable if VcfTypes is expanded in the future)
_UnexpectedTypeError(vcftype) # pragma: no cover
def _HarmonizeGangSTRRecord(vcfrecord: cyvcf2.Variant):
"""
Turn a cyvcf2.Variant with GangSTR content into a TRRecord.
Parameters
----------
vcfrecord :
A cyvcf2.Variant Object
Returns
-------
TRRecord
"""
if vcfrecord.INFO.get('RU') is None:
raise TypeError(
"Record at {}:{} is missing mandatory GangSTR info field RU. ".format(vcfrecord.CHROM, vcfrecord.POS) + _beagle_error
)
if vcfrecord.INFO.get('VID') is not None:
raise TypeError(
"Trying to read an AdVNTR record as a GangSTR record {}:{}".format(vcfrecord.CHROM, vcfrecord.POS))
if vcfrecord.INFO.get('VARID') is not None:
raise TypeError("Trying to read an EH record as a GangSTR record {}:{}".format(vcfrecord.CHROM, vcfrecord.POS))
ref_allele = vcfrecord.REF.upper()
if vcfrecord.ALT:
alt_alleles = _UpperCaseAlleles(vcfrecord.ALT)
else:
alt_alleles = []
motif = vcfrecord.INFO["RU"].upper()
record_id = None
return TRRecord(vcfrecord, ref_allele, alt_alleles, motif, record_id, 'Q' if vcfrecord.INFO.get('IMP') is None else None)
def _HarmonizeHipSTRRecord(vcfrecord: cyvcf2.Variant):
"""
Turn a cyvcf2.Variant with HipSTR content into a TRRecord.
Parameters
----------
vcfrecord :
A cyvcf2.Variant Object
Returns
-------
TRRecord
"""
if (vcfrecord.INFO.get('START') is None
or vcfrecord.INFO.get('END') is None
or vcfrecord.INFO.get('PERIOD') is None):
raise TypeError(
"Record at {}:{} is missing one of the mandatory HipSTR/LongTR info fields START, END, PERIOD. ".format(vcfrecord.CHROM, vcfrecord.POS) + _beagle_error
)
# determine full alleles and trimmed alleles
pos = int(vcfrecord.POS)
start_offset = int(vcfrecord.INFO['START']) - pos
pos_end_offset = int(vcfrecord.INFO['END']) - pos
neg_end_offset = pos_end_offset + 1 - len(vcfrecord.REF)
if start_offset == 0 and neg_end_offset == 0:
full_alleles = None
else:
if vcfrecord.ALT:
full_alts = _UpperCaseAlleles(vcfrecord.ALT)
else:
full_alts = []
full_alleles = (vcfrecord.REF.upper(),
full_alts)
# neg_end_offset is the number of flanking non repeat bp to remove
# from the end of each allele
# e.g. 'AAAT'[0:-1] == 'AAA'
# however, if neg_end_offset == 0, then we would get
# 'AAAA'[1:0] == '' which is not the intent
# so we need an if statement to instead write 'AAAA'[0:]
# which gives us 'AAAA'
if neg_end_offset == 0:
ref_allele = vcfrecord.REF[start_offset:].upper()
if vcfrecord.ALT:
alt_alleles = []
for alt in vcfrecord.ALT:
alt_alleles.append(str(alt)[start_offset:].upper())
else:
alt_alleles = []
else:
ref_allele = vcfrecord.REF[start_offset:neg_end_offset].upper()
if vcfrecord.ALT:
alt_alleles = []
for alt in vcfrecord.ALT:
alt_alleles.append(
str(alt)[start_offset:neg_end_offset].upper()
)
else:
alt_alleles = []
# Get the motif.
# Hipstr doesn't tell us this explicitly, so figure it out
motif = utils.InferRepeatSequence(ref_allele[start_offset:],
vcfrecord.INFO["PERIOD"])
record_id = vcfrecord.ID
return TRRecord(vcfrecord,
ref_allele,
alt_alleles,
motif,
record_id,
'Q' if vcfrecord.INFO.get('IMP') is None else None,
harmonized_pos=int(vcfrecord.INFO['START']),
full_alleles=full_alleles)
def _HarmonizeAdVNTRRecord(vcfrecord: cyvcf2.Variant):
"""
Turn a cyvcf2.Variant with adVNTR content into a TRRecord.
Parameters
----------
vcfrecord :
A cyvcf2.Variant Object
Returns
-------
TRRecord
"""
if vcfrecord.INFO.get('RU') is None or vcfrecord.INFO.get('VID') is None:
raise TypeError(
"Record at {}:{} is missing one of the mandatory ADVNTR info fields RU, VID. ".format(vcfrecord.CHROM, vcfrecord.POS) + _beagle_error
)
ref_allele = vcfrecord.REF.upper()
if vcfrecord.ALT:
alt_alleles = _UpperCaseAlleles(vcfrecord.ALT)
else:
alt_alleles = []
motif = vcfrecord.INFO["RU"].upper()
record_id = vcfrecord.INFO["VID"]
return TRRecord(vcfrecord, ref_allele, alt_alleles, motif, record_id, 'ML' if vcfrecord.INFO.get('IMP') is None else None)
# def _PHREDtoProb(phred: int) -> float:
# """Convert PHRED score to probability
#
# Notes
# -----
# Per https://en.wikipedia.org/wiki/Phred_quality_score
# """
# return 10**(-phred/10)
# def _ConvertPLtoQualityProb(PL: List[int]) -> float:
# """
# Convert a list of PHRED-scaled genotype probabilities to the
# unscaled probability of the single most likely genotype.#
#
# Notes
# -----
# PHRED scaling is not very accurate around numbers close to 1
# unfortunately, so for PHRED score of 0, instead calculate the probability
# by 1 - sum(probabilities of other genotypes)
# """
#
# max_likelihood = min(PL)
# if max_likelihood != 0:
# return _PHREDtoProb(max_likelihood)
#
# sum_other_likelihoods = 0.0
# for phred_likelihood in PL:
# if phred_likelihood == 0:
# continue
# sum_other_likelihoods += _PHREDtoProb(phred_likelihood)
# return max(_PHREDtoProb(1), 1 - sum_other_likelihoods)
def _HarmonizePopSTRRecord(vcfrecord: cyvcf2.Variant):
"""
Turn a cyvcf2.Variant with popSTR content into a TRRecord.
Parameters
----------
vcfrecord :
A cyvcf2.Variant Object
Returns
-------
TRRecord
"""
if vcfrecord.INFO.get('Motif') is None:
raise TypeError(
"Record at {}:{} is missing mandatory PopSTR info field MOTIF".format(vcfrecord.CHROM, vcfrecord.POS)
)
ref_allele = vcfrecord.REF.upper()
motif = vcfrecord.INFO["Motif"].upper()
record_id = vcfrecord.ID
if vcfrecord.ALT:
alt_allele_lengths = []
for alt in vcfrecord.ALT:
alt = str(alt)
if alt[0] != "<" or alt[-1] != ">":
raise TypeError("This record does not look like a PopSTR"
" record. Alt alleles were not formatted"
" as expected")
alt_allele_lengths.append(float(alt[1:-1]))
else:
alt_allele_lengths = []
return TRRecord(vcfrecord,
ref_allele,
None,
motif,
record_id,
None,
alt_allele_lengths=alt_allele_lengths)
def _HarmonizeEHRecord(vcfrecord: cyvcf2.Variant):
"""
Turn a cyvcf2.Variant with EH content into a TRRecord.
Parameters
----------
vcfrecord :
A cyvcf2.Variant Object
Returns
-------
TRRecord
"""
if vcfrecord.INFO.get('VARID') is None or vcfrecord.INFO.get('RU') is None:
raise TypeError(
"Record at {}:{} is missing one of the mandatory ExpansionHunter info fields VARID, RU. ".format(vcfrecord.CHROM, vcfrecord.POS)
+ _beagle_error
)
record_id = vcfrecord.INFO["VARID"]
motif = vcfrecord.INFO["RU"].upper()
ref_allele_length = int(vcfrecord.INFO["RL"]) / len(motif)
if vcfrecord.ALT:
alt_allele_lengths = []
for alt in vcfrecord.ALT:
alt = str(alt)
if alt[:4] != "<STR" or alt[-1] != ">":
raise TypeError("This record does not look like an EH "
" record. Alt alleles were not formatted"
" as expected")
alt_allele_lengths.append(float(alt[4:-1]))
else:
alt_allele_lengths = []
return TRRecord(vcfrecord, None, None, motif, record_id, None,
ref_allele_length=ref_allele_length,
alt_allele_lengths=alt_allele_lengths)
def _UpperCaseAlleles(alleles: List[str]):
# Convert the list of allele strings to upper case
upper_alleles = []
for allele in alleles:
upper_alleles.append(allele.upper())
return upper_alleles
class _Cyvcf2FormatDict():
"""
Provide an immutable dict-like interface for accessing
format fields from a cyvcf2 record.
To iterate over this dict, use :code:`iter(this)`
or :code:`this.keys()`.
"""
def __init__(self, record: cyvcf2.Variant):
self.record = record
def __getitem__(self, key: str):
return self.record.format(key)
def __len__(self):
return len(self.record.FORMAT)
def __iter__(self):
return iter(self.record.FORMAT)
def __contains__(self, key: str):
return key in self.record.FORMAT
def keys(self):
return self.record.FORMAT
def get(self, key: str):
return self.record.format(key)
class TRRecord:
"""
A representation of a VCF record specialized for TR fields.
Allows downstream functions to be agnostic to the
genotyping tool used to create the record.
Parameters
----------
vcfrecord :
Cyvcf2 Variant object with the underlying data
ref_allele :
Reference allele string
alt_alleles :
List of alternate allele strings
motif :
Repeat unit
record_id :
Identifier for the record
quality_field :
the name of the FORMAT field which contains the quality score for each
call for this record
Attributes
----------
vcfrecord : cyvcf2.Variant
The cyvcf2 Variant object used to init this record.
ref_allele : str
Reference allele sequences, fabricated if necessary.
Gets converted to uppercase e.g. ACGACGACG
alt_alleles : List[str]
List of alternate allele sequences, fabricated if necessary
motif : str
Repeat unit
record_id : str
Identifier for the record
chrom : str
The chromosome this locus is in
pos : int
The bp along the chromosome that this locus is at (ignoring flanking base pairs/full alleles)
end_pos:
Position of the last bp of ref allele (ignoring flanking base pairs/full alleles)
full_alleles_pos:
Position of the first bp of the full ref allele (including the flanking base pairs)
full_alleles_end_pos:
Position of the last bp of the full ref allele (including the flanking base pairs)
info : Dict[str, Any]
The dictionary of INFO fields at this locus
format : Dict[str, np.ndarray]
The dictionary of FORMAT fields at this locus.
Numeric format fields are 2D numpy arrays with rows corresponding
to samples (normally 1 column, but if there are multiple numbers
then more than one column)
String format fields are 1D numpy arrays with entries corresponding
to samples
Other Parameters
----------------
harmonized_pos :
If this record has flanking base pairs before the repeat, set this
to note at which bp the repeat begins
full_alleles :
A tuple of string genotypes (ref_allele, [alt_alleles])
where each allele may contain any number of flanking
basepairs in addition to containing the tandem repeat.
If set, these can be accessed through :py:meth:`GetFullStringGenotypes`
If the alt alleles have differently sized flanks than the ref allele
then those alt alleles will be improperly trimmed.
alt_allele_lengths :
The lengths of each of the alt alleles, in order.
Thus is measured in number of copies of repeat unit,
NOT the allele length in base pairs.
Should be passed to the constructor when only the lengths of the alt alleles
were measured and not the sequences. If sequences are passed to the
constructor then this is set automatically.
If this is passed, the alt_alleles parameter to the constructor must
be set to None and the alt_alleles attribute of the record will be set
to fabricated alleles (see
:py:meth:`trtools.utils.utils.FabricateAllele`)
ref_allele_length :
like alt_allele_lengths, but for the reference allele.
If this is passed, alt_allele_lengths must also be passed
min_allele_length :
Minimum allele length from the reference and alternate alleles
max_allele_length :
Maximum allele length from the reference and alternate alleles
quality_score_transform :
A function which turns the quality_field value into a float
score. When None, the quality_field values are assumed
to already be floats
Notes
-----
Alleles are stored as upper case strings with all the repeats written out.
Alleles may contain partial repeat copies or impurities.
This class will attempt to make sure alleles do not contain any extra base
pairs to either side of the repeat. If you wish to have those base pairs,
use the 'Full' methods
"""
def __init__(self,
vcfrecord: cyvcf2.Variant,
ref_allele: Optional[str],
alt_alleles: Optional[List[str]],
motif: str,
record_id: str,
quality_field: Optional[str],
*,
harmonized_pos: Optional[int] = None,
full_alleles: Optional[Tuple[str, List[str]]] = None,
ref_allele_length: Optional[float] = None,
alt_allele_lengths: Optional[List[float]] = None,
quality_score_transform: Optional[Callable[..., float]] = None):
self.vcfrecord = vcfrecord
self.ref_allele = ref_allele
self.alt_alleles = alt_alleles
self.motif = motif
self.record_id = record_id
self.chrom = vcfrecord.CHROM
self.pos = harmonized_pos if harmonized_pos is not None else vcfrecord.POS
self.info = dict(vcfrecord.INFO)
self.format = _Cyvcf2FormatDict(vcfrecord)
self.full_alleles = full_alleles
self.full_alleles_pos = self.vcfrecord.POS
self.ref_allele_length = ref_allele_length
self.alt_allele_lengths = alt_allele_lengths
self.quality_field = quality_field
self.quality_score_transform = quality_score_transform
if full_alleles is not None and (alt_alleles is None or ref_allele is
None):
raise ValueError("Cannot set full alleles without setting "
"regular alleles")
if alt_allele_lengths is not None and alt_alleles is not None:
raise ValueError("Must specify only the sequences or the lengths"
" of the alt alleles, not both.")
if ref_allele_length is not None and alt_allele_lengths is None:
raise ValueError("If the ref allele is specified by length, the "
"alt alleles must be too.")
if ref_allele_length is not None:
self.has_fabricated_ref_allele = True
self.ref_allele = utils.FabricateAllele(motif, ref_allele_length)
else:
self.has_fabricated_ref_allele = False
self.ref_allele_length = len(ref_allele) / len(motif)
# declaration of end_pos variables. Values are rounded because self.ref_allele_length can
# sometimes be a float because of partial repeats. This can cause weird float problems, and simple cast
# is not enought to ensure that the proper position is calculated
self.end_pos = round(self.pos + self.ref_allele_length * len(motif) - 1)
self.full_alleles_end_pos = self.end_pos if full_alleles is None else \
round(self.full_alleles_pos + len(self.full_alleles[0]) - 1)
if alt_allele_lengths is not None:
self.has_fabricated_alt_alleles = True
self.alt_alleles = [
utils.FabricateAllele(motif, length) for length in
alt_allele_lengths
]
else:
self.has_fabricated_alt_alleles = False
self.alt_allele_lengths = [
len(allele) / len(motif) for allele in self.alt_alleles
]
# Update min/max length
if len(self.alt_alleles) > 0:
self.min_allele_length = min(self.ref_allele_length, min(self.alt_allele_lengths))
self.max_allele_length = max(self.ref_allele_length, max(self.alt_allele_lengths))
else:
self.min_allele_length = self.ref_allele_length
self.max_allele_length = self.ref_allele_length
try:
self._CheckRecord()
except ValueError as e:
raise ValueError(("Invalid TRRecord. TRRecord: {} Original record:"
" {}").format(str(self), str(self.vcfrecord)), e)
def _CheckRecord(self):
"""
Check that this record is properly constructed.
Checks that the same number of alt alleles were specified
as in the underlying record and that the full_alleles, if supplied,
contain their corresponding standard alleles
Raises an error if a check fails
"""
if len(self.alt_alleles) != len(self.vcfrecord.ALT):
raise ValueError("Underlying record does not have the same "
"number of alt alleles as given to the TRRecord "
"constructor. Underlying alt alleles: {}, "
" constructor alt alleles: {}".format(
self.vcfrecord.ALT, self.alt_alleles))
if self.full_alleles:
if len(self.full_alleles) != 2:
raise ValueError("full_alleles doesn't have both"
" a ref allele and alt alleles")
full_ref, full_alts = self.full_alleles
if len(full_alts) != len(self.alt_alleles):
raise ValueError("Different number of full alternate alleles "
"than normal alt alleles")
if self.ref_allele not in full_ref:
raise ValueError("could not find ref allele inside "
"full ref allele")
for idx, (full_alt, alt) \
in enumerate(zip(full_alts, self.alt_alleles)):
if alt not in full_alt:
raise ValueError(("Could not find alt allele {} "
"inside its full alt "
"allele").format(idx))
def GetMaxPloidy(self) -> int:
"""
Return the maximum ploidy of any sample at this locus.
All genotypes will be a tuple of that many haplotypes,
where samples with a smaller ploidy than that
will have haplotypes at the end of the tuple set to ','
(for string genotypes) or -2 (for index or length genotypes)
"""
return self.vcfrecord.ploidy
def GetNumSamples(self) -> int:
"""
Return the number of samples at this locus (called or not).
Same as the number of samples in the overall vcf
"""
return self.vcfrecord.genotype.n_samples
def GetGenotypeIndicies(self) -> Optional[np.ndarray]:
"""
Get an array of genotype indicies across all samples.
A genotype index is a number 0, 1, 2 ...
where 0 corresponds to the reference allele,
1 to the first alt allele, 2 to the second, etc.
The array is an array of ints with one row per sample.
The number of columns is the maximum ploidy of any sample
(normally 2) plus 1 for phasing.
All but the final column represent the index of the genotypes
of each call.
The final column has values 0 for unphased sampels or 1 for phased.
So a sample with gt '0|2' would be represented by the row [0, 2, 1]
and a sample with gt '3/0' would be represented by the row [3, 0, 0].
Uncalled haplotypes (represented by '.' in the VCF) are represented
by '-1' genotypes. If the sample has fewer haplotypes than the
maximum ploidy of all samples at this locus, then the row is padded
with -2s, so a haploid sample with gt '1' where other samples
are diploid would be represented by the row [1, -2, 0].
If all the genotype columns for a sample are negative then the
sample is a no call. Note: the value of the phasing
column is unspecified for haploid or no-call samples.
Returns
-------
Optional[np.ndarray]
The numpy array described above, of type int.
If there are no samples in the vcf this record comes from
then return None instead
"""
if self.vcfrecord.genotype is None:
return None
return self.vcfrecord.genotype.array().astype(int)
def GetCalledSamples(self, strict: bool = True) -> Optional[np.ndarray]:
"""
Get an array listing which samples have been called at this locus.
Parameters
----------
strict :
By default genotypes such as '1/.' are considered not called
because at least one of the haplotypes present is not called.
Set strict = False to mark these as being called.
Note: genotypes having lesser ploidy will not be marked
as no calls even when strict = True (e.g. if some samples
have tetraploid genotypes at this locus, a genotype of '1/2/2'
will be marked as called even though it is triploid)
Returns
-------
Optional[np.ndarray]
A bool array of length equal to the number of samples,
where true indicates a sample has been called
and false indicates otherwise.
If there are no samples in the vcf this record comes from
then return None instead
"""
gt_idxs = self.GetGenotypeIndicies()
if gt_idxs is None:
return None
if strict:
return ~np.any(gt_idxs[:, :-1] == -1, axis=1)
else:
return ~np.all(np.logical_or(gt_idxs[:, :-1] == -1,
gt_idxs[:, :-1] == -2),
axis=1)
def GetSamplePloidies(self) -> Optional[np.ndarray]:
"""
Get an array listing the ploidies of each sample
Returns
-------
Optional[np.ndarray]
An array of positive ints with length equal to the
number of samples where each entry denotes the
number of genotypes for each sample at this locus
(including no calls)
If there are no samples in the vcf this record comes from
then return None instead
"""
gt_idxs = self.GetGenotypeIndicies()
if gt_idxs is None:
return None
return (
gt_idxs.shape[1] - 1 - np.sum(gt_idxs[:, :-1] == -2, axis=1)
)
def GetCallRate(self, strict: bool = True) -> float:
"""
Return the call rate at this locus.
Parameters
----------
strict :
By default genotypes such as '1/.' are considered not called
because at least one of the haplotypes present is not called.
Set strict = False to mark these as being called.
Note: genotypes having lesser ploidy will not be marked
as no calls even when strict = True (e.g. if some samples
have tetraploid genotypes at this locus, a genotype of '1/2/2'
will be marked as called even though it is triploid)
Returns
-------
The fraction of the samples at this locus that have been
called. If there are no samples in the vcf this record comes from
then return np.nan instead
"""
called_samples = self.GetCalledSamples(strict=strict)
if called_samples is None:
return None
else:
return np.sum(called_samples) / called_samples.shape[0]
def _GetStringGenotypeArray(
self,
idx_gts: np.ndarray,
seq_alleles: List[str]):
max_len = max(len(allele) for allele in seq_alleles)
seq_array = np.empty(idx_gts.shape, dtype="<U{}".format(max_len))
seq_array[:, -1][idx_gts[:, -1] == 0] = '0'
seq_array[:, -1][idx_gts[:, -1] == 1] = '1'
for allele_idx, seq_allele in enumerate(seq_alleles):
seq_array[:, :-1][idx_gts[:, :-1] == allele_idx] = seq_allele
seq_array[:, :-1][idx_gts[:, :-1] == -1] = '.'
seq_array[:, :-1][idx_gts[:, :-1] == -2] = ','
return seq_array
def GetStringGenotypes(self) -> Optional[np.ndarray]:
"""
Get an array of string genotypes for each sample.
The array is as described in :py:meth:`GetGenotypeIndicies`
except that the indicies are replaced by their corresponding
sequences, -1 indicies (nocalls) are replaced by '.',
-2 indicies (not present due to smaller ploidy) are replaced
by ',', and the phasing bits (0 or 1) are replaced by the strings
'0' or '1'.
Will not include flanking base pairs. To get genotypes that include
flanking base pairs (for callers that call those), use
:py:meth:`GetFullStringGenotypes`. For callers that include flanking base pairs
it is possible that some of the alleles in the regular string genotypes
(with the flanks stripped) will be identical. In this case, you may
use :py:meth:`UniqueStringGenotypeMapping` to get a canonical unique subset
of indicies which represent all possible alleles.
Note that some TR callers will only call allele lengths, not allele
sequences. In such a case, this method will return a fabricated
sequence based on the called length (see
:py:meth:`trtools.utils.utils.FabricateAllele`) and
a warning will be raised. This may not be intended -
use :py:meth:`GetLengthGenotypes` for a fully caller agnostic
way of handling genotypes.
This method is inefficient for many samples, consider either using
length genotypes (:py:meth:`GetLengthGenotypes`), or
using genotype indicies (:py:meth:`GetGenotypeIndicies`) and
accessing string genotypes as needed through the fields ref_allele and
alt_alleles, instead.
Returns
-------
Optional[np.ndarray]
The numpy array described above, of type '<UN' where 'N'
is the max allele length.