-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfennec_sequence_characterization.py
125 lines (107 loc) · 3.7 KB
/
fennec_sequence_characterization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python3
import gc
import os
import subprocess
import sys
import h5py
import fennec
if not fennec._utils.isinteractive():
try:
_, fastafile, min_length, chunk_size, overlap, n_jobs = sys.argv
min_length, chunk_size, n_jobs = (int(min_length), int(chunk_size), int(n_jobs))
try:
overlap = int(overlap)
except: # `overlap` may be "auto"
assert overlap == "auto", "`overlap` must be 0+ or 'auto'"
except:
print(
f"usage: {sys.argv[0]} <file.fasta> <min_length> <chunk_size> <overlap> <n_jobs>"
)
sys.exit(1)
else:
fastafile = "DATA/S.Scaffolds.fasta"
min_length = 1000
chunk_size = 10000
overlap = 0
n_jobs = 4
print(f"== Processing '{fastafile}' ==")
# -- variable definitions
h5file = fastafile.replace(".fasta", f".l{min_length}c{chunk_size}o{overlap}.h5")
covfile = fastafile.replace(".fasta", ".csv")
force_gc = True
# -- load sequences
if os.path.exists(h5file):
seqdb = fennec.DNASequenceBank.read_hdf(h5file)
else:
seqdb = fennec.DNASequenceBank(
min_length=min_length, chunk_size=chunk_size, overlap=overlap, verbose=2
).read_fasta(fastafile)
print(seqdb)
print(f"Saving to {h5file}")
seqdb.to_hdf(h5file)
print(f"DNASequenceBank has {len(seqdb)} sequences.")
# -- feature model definitions
models_definitions = {
# "label": fennec.xxxModel(param)
"kmers110011": fennec.MaskedKmerModel(mask="110011", n_jobs=n_jobs, verbose=3),
"kmers11000100011": fennec.MaskedKmerModel(
mask="11000100011", n_jobs=n_jobs, verbose=3
),
"kmers1001001": fennec.MaskedKmerModel(mask="1001001", n_jobs=n_jobs, verbose=3),
"kmers3": fennec.MaskedKmerModel(mask="111", n_jobs=n_jobs, verbose=3),
"kmers4": fennec.MaskedKmerModel(mask="1111", n_jobs=n_jobs, verbose=3),
# "kmers5": fennec.MaskedKmerModel(mask="11111", n_jobs=n_jobs, verbose=3),
"ind15": fennec.InterNucleotideDistanceModel(K=15, n_jobs=n_jobs, verbose=3),
"contig2vec4": fennec.Contig2VecModel(k=4, verbose=3),
"contig2vec6": fennec.Contig2VecModel(k=6, verbose=3),
}
# -- Get coverage using GATTACA (Popic et al, 2017, doi: 10.1101/130997)
# - Delete one of the model
# try:
# with h5py.File(h5file) as h:
# del h['rawmodel']['cov_gattaca31']
# except:
# pass
gattaca_bin = "./bin/gattaca"
if os.access(gattaca_bin, os.X_OK) and not os.path.exists(covfile):
fastafile = seqdb.to_fasta()
print(f"[INFO] Indexing {fastafile}")
_ = subprocess.check_output([gattaca_bin, "index", "-k", "31", "-i", fastafile])
print(f"[INFO] Running GATTACA on {fastafile}")
_ = subprocess.check_output(
[
gattaca_bin,
"lookup",
"-c",
fastafile,
"-i",
f"{fastafile}.k31.gatc",
"-o",
covfile,
"-m", # median coverage of each contig in each sample (recommended)
]
)
print(f"[INFO] Adding SequenceCoverageModel to the model list")
models_definitions["cov_gattaca31"] = fennec.SequenceCoverageModel(
(covfile,), verbose=3
)
# -- list models to apply
try:
available_features = fennec._utils.list_models(h5file)
except:
available_features = set()
models_to_apply = set(models_definitions.keys()).difference(available_features)
print(
f"Models to apply: {set(models_definitions.keys()).difference(available_features)}"
)
# -- extract features
for model in models_to_apply:
print(f" - {model}")
X = models_definitions[model].fit_transform(seqdb)
print(f" shape: {X.shape}")
print(f" saving to: {h5file}")
X.to_hdf(h5file, f"rawmodel/{model}")
if force_gc:
del X
gc.collect()
print("Bye.")