-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathvcf2phylip.py
executable file
·190 lines (166 loc) · 4.19 KB
/
vcf2phylip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/python
import re
import sys
import os
import vcf
import getopt
def main():
params = parseArgs()
data = dict()
if params.vcf:
#for each record in VCF
for record in read_vcf(params.vcf):
for call in record.samples:
#Get consensus base call
cons = None
if call.gt_bases:
l = (call.gt_bases).split("/")
cons = reverse_iupac(listToSortUniqueString(l))
else:
cons = "N"
if cons:
if call.sample in data:
data[call.sample].append(cons)
else:
data[call.sample] = list()
data[call.sample].append(cons)
else:
print ("Uh oh! No consensus called for %s, something is wrong"%call)
#Print dict to phylip file
with open(params.out, 'w') as fh:
try:
header = getPhylipHeader(data) + "\n"
fh.write(header)
for sample in data:
line = str(sample) + "\t" + "".join(data[sample]) + "\n"
fh.write(line)
except IOError:
print("Could not write to file ",params.out)
sys.exit(1)
finally:
fh.close()
else:
print("Error: No VCF file provided")
sys.exit(1)
#Returns header for Phylip file from a dictionary of samples w/ data
def getPhylipHeader(d):
numSamp = 0
numLoci = None
for sample in d:
numSamp = numSamp + 1
if not numLoci:
numLoci = len(d[sample])
else:
if numLoci != len(d[sample]):
print("getPhylipHeader: Warning: Sequences of unequal length.")
header = str(numSamp) + " " + str(numLoci)
if numLoci == 0 or not numLoci:
print("getPhylipHeader: Warning: No loci in dictionary.")
if numSamp == 0:
print("getPhylipHeader: Warning: No samples in dictionary.")
return(header)
#Read VCF variant calls
#Generator function, yields each locus
def read_vcf(v):
try:
vfh = vcf.Reader(filename=v)
except IOError as err:
print("I/O error({0}): {1}".format(err.errno, err.strerror))
except:
print("Unexpected error:", sys.exec_info()[0])
chrom = ""
recs = []
added = 0
for rec in vfh:
if not rec.FILTER:
yield(rec)
#Function to return sorted unique string from list of chars
def listToSortUniqueString(l):
sl = sorted(set(l))
return(str(''.join(sl)))
#Function to translate a string of bases to an iupac ambiguity code
def reverse_iupac(char):
char = char.upper()
if "-" in char:
return("-")
elif "N" in char:
return("N")
elif "." in char:
return(".")
else:
iupac = {
'A':'A',
'N':'N',
'-':'-',
'C':'C',
'G':'G',
'T':'T',
'AG':'R',
'CT':'Y',
'AC':'M',
'GT':'K',
'AT':'W',
'CG':'S',
'CGT':'B',
'AGT':'D',
'ACT':'H',
'ACG':'V',
'ACGT':'N'
}
return iupac[char]
#Object to parse command-line arguments
class parseArgs():
def __init__(self):
#Define options
try:
options, remainder = getopt.getopt(sys.argv[1:], 'v:o:h', \
["vcf=","help","out="])
except getopt.GetoptError as err:
print(err)
self.display_help("\nExiting because getopt returned non-zero exit status.")
#Default values for params
#Input params
self.vcf=None
self.out=None
#First pass to see if help menu was called
for o, a in options:
if o in ("-h", "-help", "--help"):
self.display_help("Exiting because help menu was called.")
#Second pass to set all args.
for opt, arg_raw in options:
arg = arg_raw.replace(" ","")
arg = arg.strip()
opt = opt.replace("-","")
#print(opt,arg)
if opt in ('v', 'vcf'):
self.vcf = arg
elif opt in ('h', 'help'):
pass
elif opt in ('o','out'):
self.out = arg
else:
assert False, "Unhandled option %r"%opt
#Check manditory options are set
if not self.vcf:
self.display_help("\nError: Missing required input file <-v,--vcf>")
if self.out:
self.out = self.out + ".phy"
else:
self.out = "out.phy"
def display_help(self, message=None):
if message is not None:
print (message)
print ("\nvcf2phylip.py\n")
print ("Contact:Tyler K. Chafin, University of Arkansas,[email protected]")
print ("\nUsage: ", sys.argv[0], "-v </path/to/vcf>\n")
print ("Description: Extract SNPs from a VCF file and outputs as concatenated Phylip")
print("""
Arguments:
-v,--vcf : VCF input file
-o,--out : Prefix for output file <default = ./out>
-h,--help : Displays help menu
""")
sys.exit()
#Call main function
if __name__ == '__main__':
main()