forked from Merck/TraceTrack
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathreference_db.py
154 lines (128 loc) · 5.79 KB
/
reference_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from tracetrack.entities.record import TraceSeqRecord, Record
from tracetrack.entities.alignment import Alignment
from tracetrack.alignment_utils import align_global_score
from tracetrack.server_utils import has_extension, ALLOWED_REF_EXTENSIONS
from tracetrack.entities.errors import AlignmentError
from openpyxl import load_workbook
from typing import List, Tuple
from flask import flash
from csv import reader
from Bio import SeqIO
from Bio.Seq import Seq
import pandas as pd
def read_csv(filename):
with open(filename, 'r') as f:
csv_reader = reader(f)
header = next(csv_reader)
records = [Record.read_from_csv(row) for row in csv_reader]
return records, header
def read_xlsx(filename):
table = pd.read_excel(filename, dtype=str, engine='openpyxl')
header = table.columns[:2]
records = [Record.read_from_csv(row) for i, row in table.iterrows() if row[0] and not pd.isna(row[0])]
return records, header
def read_fasta(filename):
iterator = SeqIO.parse(filename, "fasta")
records = [Record.read_from_fasta(seq_record) for seq_record in iterator]
return records
def read_gb(filename):
gb_records = SeqIO.parse(open(filename, "r"), "genbank")
records = [Record.read_from_genbank(gb_record) for gb_record in gb_records]
return records
class ReferenceDb:
def __init__(self, records):
self.sequences = records
if len(records) != len(set(records)):
flash(f"There are multiple references with same ID. The first one will be used.")
# FIXME
@classmethod
def read_file(cls, filename) -> 'ReferenceDb':
"""
Read a DataFrame from an .xlsx, .csv or fasta file. Return ReferenceDb object.
:param filename: str
:return: ReferenceDb
"""
if has_extension(filename, ["csv", "xlsx", "xls"]):
if has_extension(filename, ["csv"]):
records, header = read_csv(filename)
else:
records, header = read_xlsx(filename)
if len(records) > 0 and header[0].lower() != 'id':
flash('Warning: Expected first column name “ID”, got “{}”, make sure to include a valid header.'.format(
header[0]))
if len(records) > 0 and header[1].lower() != 'sequence':
flash(
'Warning: Expected second column name “Sequence”, got “{}”, make sure to include a valid header.'.format(
header[1]))
elif has_extension(filename, ["fa", "fasta"]):
records = read_fasta(filename)
elif has_extension(filename, ["genbank", "gbk", "gb"]):
records = read_gb(filename)
else:
raise ReferenceError(f"The provided file {filename} has none of the accepted extensions: {ALLOWED_REF_EXTENSIONS}")
# TODO
return cls(records)
def get_ref_sequence(self, id) -> Record:
"""
Return the reference sequence record from table based on its ID. Returns None if the ID is not found.
:param id: reference ID
:return: reference sequence Record
"""
rec = next((x for x in self.sequences if x.id == id), None)
return rec
# useless
def create_ref_record(self, id) -> Record:
"""
Create reference sequence record
:param id: reference ID
:return: SeqRecord with sequence and reference ID
"""
return Record(Seq(self.get_ref_sequence(id).seq), id)
def align_sequences(self, records: List[TraceSeqRecord], refid, population: str = None) -> Tuple[
Alignment, List[str]]:
"""
Sort list of TraceSeqRecord by reference id, create Alignment object for each reference, return as a list.
:param records: list of TraceSeqRecords to align
:param population: str, name of population
:param search: bool, align each trace sequence to the best matching reference instead of parsing filename
:return: list of Alignment objects, one object for each reference
"""
ref_record = self.get_ref_sequence(refid)
alignment = None
warnings = []
try:
alignment = Alignment.align_multiple(ref_record, records, population=population)
except AlignmentError as e:
warnings.append(f"Alignment error for reference ID {refid}: {e.message}")
return alignment, warnings
def match_trace_to_ref(self, record: TraceSeqRecord):
"""
Assign given TraceSeqRecord a reference id and direction by best match.
:param record: Trace record to find the best reference for
:return: nothing
"""
# match according to filename
max_match = 0
reference = None
for ref in self.sequences:
if ref.id in record.id:
if len(ref.id) > max_match:
max_match = len(ref.id)
reference = ref
if reference is not None:
fwd = align_global_score(reference.seq, record.seq)
rev = align_global_score(reference.seq, record.seq.reverse_complement())
if rev > fwd:
record.flag_as_reverse(True)
record.assign_reference(reference.id)
# match by best alignment score
else:
scores_fwd = [align_global_score(ref.seq, record.seq) for ref in self.sequences]
scores_rev = [align_global_score(ref.seq, record.seq.reverse_complement()) for ref in self.sequences]
if max(scores_fwd) > max(scores_rev):
idx = scores_fwd.index(max(scores_fwd))
record.assign_reference(self.sequences[idx].id)
else:
idx = scores_rev.index(max(scores_rev))
record.assign_reference(self.sequences[idx].id)
record.flag_as_reverse(True)