-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextractor.py
214 lines (194 loc) · 9.02 KB
/
extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import column
import uuid
import xlrd
import openpyxl
from collections import OrderedDict
from datetime import datetime, timedelta
from column_types import column_types
from summary_types import summary_types
class extractor:
def __init__(self, configuration: str):
self._configuration = configuration
@property
def delimter(self):
if self._configuration['delimter'] is None:
return ';'
else:
return self._configuration['delimter']
@property
def start_row(self):
if self._configuration['start_row'] is None:
return 0
else:
return self._configuration['start_row']
@property
def filepattern(self):
if self._configuration['filepattern'] is None:
return '*.*'
else:
return self._configuration['filepattern']
@property
def name(self):
if self._configuration['name'] is None:
return uuid.UUID
else:
return self._configuration['name']
@property
def summary(self):
if 'summary' not in self._configuration or self._configuration['summary'] is None or self._configuration['summary'] =='' :
return summary_types.single
else:
temp = self._configuration['summary'].lower()
if temp == 'daily':
return summary_types.daily
if temp == 'weekly':
return summary_types.weekly
if temp == 'monthly':
return summary_types.daily
return summary_types.single
@property
def columns(self):
temp_columns: OrderedDict ={}
if self._configuration['columns'] is not None:
for column_config in self._configuration['columns']:
column_name = column_config['name']
temp_class =column.column(column_config)
temp_columns[column_name] = temp_class
return temp_columns
def extract_lines(self, file: str):
transactions = []
if self.filepattern.lower().endswith('.xls'):
transactions = self._extract_xls_file(file)
elif self.filepattern.lower().endswith('.xlsx'):
transactions = self._extract_xlsx_file(file)
elif self.filepattern.lower().endswith('.csv'):
transactions = self._extract_csv_file(file)
return self._convert_transactions(transactions)
def _extract_xls_file(self, input_file: str):
transaction = []
if input_file.lower().endswith('.xls'):
try:
workbook = xlrd.open_workbook(input_file)
print('Datei erfolgreich geöffnet')
sheet = workbook.sheet_by_index(0)
max_rows = sheet.nrows
columns = self.columns.values()
for r in range(self.start_row-1, max_rows):
print('Verarbeite Zeile ' + str(r))
try:
line = {}
for column in columns:
cell_value = column.get_formated_value(sheet.cell_value(rowx=r, colx=column.column_number-1))
line[column.header.name] = cell_value
transaction.append(line)
except ValueError:
print('Keine Buchungstyp gefunden --> Zeile wird nicht importiert')
except Exception as inner_error:
print('Fehler bei der Verarbeitung der Zeile ' + str(r) + ': ' + repr(inner_error))
except Exception as error:
print('Allgemeiner Fehler beim Verarbeiten der XLS Datei: ' + repr(error))
return transaction
def _extract_xlsx_file(self, input_file: str):
transaction = []
if input_file.lower().endswith('.xlsx'):
try:
workbook = openpyxl.load_workbook(input_file)
print('Datei erfolgreich geöffnet')
sheet = workbook.active
max_rows = sheet.max_row
columns = self.columns.values()
for r in range(self.start_row, max_rows):
print('Verarbeite Zeile ' + str(r))
try:
line ={}
for column in columns:
cell_value = column.get_formated_value(sheet.cell(row=r, column=column.column_number).value)
line[column.header.name] = cell_value
transaction.append(line)
except ValueError:
print('Keine Buchungstyp gefunden --> Zeile wird nicht importiert')
except Exception as inner_error:
print('Fehler bei der Verarbeitung der Zeile ' + str(r) + ': ' + repr(inner_error))
except Exception as error:
print('Allgemeiner Fehler beim Verarbeiten der XLSX Datei: ' + repr(error))
return transaction
def _extract_csv_file(self, input_file: str):
transaction = []
if input_file.lower().endswith('.csv'):
try:
csv_file = open(input_file, 'r')
print('Datei erfolgreich geöffnet')
lines = csv_file.readlines()
columns = self.columns.values()
count = 1
for file_line in lines:
print('Verarbeite Zeile ' + str(count))
try:
line = {}
if(count >= self.start_row):
csv_columns = file_line.split(self.delimter)
for column in columns:
cell_value = column.get_formated_value(csv_columns[column.column_number - 1])
line[column.header.name] = cell_value
transaction.append(line)
count += 1
except ValueError:
print('Keine Buchungstyp gefunden --> Zeile wird nicht importiert')
except Exception as inner_error:
print('Fehler bei der Verarbeitung der Zeile ' + str(count) + ': ' + repr(inner_error))
except Exception as error:
print('Allgemeiner Fehler beim Verarbeiten der XLS Datei: ' + repr(error))
return transaction
def _convert_transactions(self, transactions):
result =[]
temp_helper = {}
for tran in transactions:
line_date = tran[column_types.valuta_date.name]
if self.summary == summary_types.daily:
line_date = datetime.strptime(tran[column_types.valuta_date.name], '%Y-%m-%d %H:%M').strftime('%Y-%m-%d')
elif self.summary == summary_types.weekly:
line_date = self._last_day_of_week(datetime.strptime(tran[column_types.valuta_date.name], '%Y-%m-%d %H:%M')).strftime('%Y-%m-%d')
elif self.summary == summary_types.monthly:
line_date = self._last_day_of_month(datetime.strptime(tran[column_types.valuta_date.name], '%Y-%m-%d %H:%M')).strftime('%Y-%m-%d')
search_key = str.format("{}-{}", line_date, tran[column_types.booking_type.name])
if column_types.wkn.name in tran and tran[column_types.wkn.name] is not None:
search_key= str.format("{}-{}-{}",line_date,tran[column_types.booking_type.name],tran[column_types.wkn.name])
if search_key in temp_helper:
temp_helper[search_key] = self._update_transaction(temp_helper[search_key],tran)
else:
tran[column_types.valuta_date.name] = line_date
temp_helper[search_key] = tran
for summary in temp_helper.values():
line=''
for key in summary.keys():
if summary[key] is not None:
try:
line = line+'{0:.8f}'.format(summary[key])+";"
except Exception:
line = line+str(summary[key])+";"
else:
line = line + ";;"
line +='\r'
result.append(line)
return result
def _update_transaction(self, current, add):
for key in current.keys():
if key != 'valuta_date':
try:
if add[key] is not None:
current[key] = float(current[key])+float(add[key])
except ValueError:
if(current[key] != add[key]):
current[key] = str.format('{} | {}',current[key],add[key])
return current
def _last_day_of_week(self, any_day):
week_start = any_day - timedelta(days=any_day.weekday())
week_end = week_start + timedelta(days=6)
last_day_of_month = self._last_day_of_month(any_day)
if week_end <= last_day_of_month:
return week_end
else:
return last_day_of_month
def _last_day_of_month(self, any_day):
next_month = any_day.replace(day=28) + timedelta(days=4)
return next_month - timedelta(days=next_month.day)