-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfungsi.py
205 lines (162 loc) · 5.72 KB
/
fungsi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import datetime
import sqlite3 as sqlite
from datetime import datetime
import logging
import sys
logging.basicConfig(format='[%(asctime)s] %(levelname)-8s: %(message)s',datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO, handlers=[logging.FileHandler('./log.log'), logging.StreamHandler(sys.stdout)])
def f_time(f_jam = "%H:%M:%S", f_hari = '%Y-%m-%d'): #mengembalikan jam dan hari
waktu = datetime.now()
return (waktu.strftime(f_jam), waktu.strftime(f_hari)) #jam - hari
def f_price(num): #mengubah angka contoh : 3200000 menjadi 3.200.000
num = str(num)
dt = []
len_num = len(num)
p = num
if len_num > 3:
sisa = len_num % 3
jml = len_num //3
if sisa != 0:
dt.append(p[:sisa])
p = p[sisa:]
for i in range(jml):
dt.append(p[:3])
p = p[3:]
num = '.'.join(dt)
return num
def to_dict_index(data):
h = {}
for i in data:
h[i[0]] = i
return h
class DB:
def __init__(self, dbLok='testing.db'):
try:
self.__connect = sqlite.connect(dbLok, check_same_thread=False)
self.__cur = self.__connect.cursor()
self.__status = True
except:
logging.exception('Database Disconnect !!')
self.status = False
def __runQuery(self, q='', fetch='all', method=1):
# method 1 = select, 2 = insert
hasil = False
if q != '':
try:
self.__cur.execute(q)
if method == 1:
if fetch == 'all':
hasil = self.__cur.fetchall()
else:
hasil = self.__cur.fetchone()
elif method == 2:
self.__connect.commit()
hasil = True
except:
hasil = False
return hasil
def getStore(self, q=False):
if q=='singkat':
list_store = self.__runQuery(f'SELECT id_store,name from store')
else:
list_store = self.__runQuery(f'SELECT * from store')
return list_store
def getVia(self, id_store=False):
try:
if id_store:
hasil = self.__runQuery(f'SELECT * from via where id_store={int(id_store)}')
else:
hasil = self.__runQuery(f'SELECT * from via')
except:
logging.exception('Terdeteksi melakukan percobaan SQLI')
hasil = []
return hasil
def getProduct_type(self, id_store=False):
try:
if id_store:
hasil = self.__runQuery(f'SELECT * from product_type where id_store={int(id_store)}')
else:
hasil = self.__runQuery(f'SELECT * from product_type')
except:
logging.exception('Terdeteksi melakukan percobaan SQLI')
hasil = []
return hasil
def getProduct(self, id_product_type=False):
try:
if id_product_type:
hasil = self.__runQuery(f'SELECT * from product where id_product_type={int(id_product_type)}')
else:
hasil = self.__runQuery(f'SELECT * from product')
except:
logging.exception('Terdeteksi melakukan percobaan SQLI')
hasil = []
return hasil
def submitHistory(self, id_store, id_product_type, id_product, id_via, price, time, amount, seller):
error = False
try:
int(id_store)
int(id_product_type)
int(id_product)
int(id_via)
int(price)
int(amount)
seller = seller[:15]
if len(time) != 10:
error = True
except:
error = True
logging.exception('Terdeteksi melakukan percobaan SQLI')
if error == False:
get_price = self.__runQuery(f'SELECT price from product where id_product={id_product} LIMIT 1', fetch='one')
if get_price != None:
if price == get_price[0]*amount:
price = get_price[0]*amount
q = self.__runQuery(f'INSERT INTO history(id_store, id_product_type, id_product, amount, id_via, price, seller, time) VALUES ({id_store}, {id_product_type}, {id_product}, {amount}, {id_via}, {price}, "{seller}", "{time} {f_time()[0]}")', method=2)
if q:
hasil = {'status':True, 'pesan':'Succes Ditambahkan', 'time':f"{time} {f_time()[0]}", 'price':price}
else:
logging.exception('Database Error')
hasil = {'status':False, 'pesan':'Database error. Please wait', 'code':3}
else:
logging.warning(f'Input tidak sesuai dengan harga database | {price} != { get_price[0]*amount}')
hasil = {'status':False, 'pesan':'Harga tidak sesuai, silahkan pilih ulang produk', 'code':1}
else:
logging.warning('Produk tidak ditemukan')
hasil = {'status':False, 'pesan':'Produk tidak ada atau telah habis', 'code':2}
else:
hasil = {'status':False, 'pesan':'Data yang di inputkan salah', 'error':4}
return hasil
def getHistory(self, data):
hasil = ()
error = False
try:
pass
except:
error = True
if error:
hasil = ()
else:
try:
if len(data) != 0:
q = f"SELECT history.id_history, product_type.name, product.name, via.service, via.name, history.amount, history.price, history.seller, history.time FROM history INNER JOIN product_type on history.id_product_type = product_type.id_product_type INNER JOIN product on history.id_product = product.id_product INNER JOIN via on history.id_via = via.id_via "
if 'id_store' in data:
q += " where history.id_store={}".format(int(data['id_store']))
if 'id_product_type' in data:
q += " and history.id_product_type={}".format(int(data['id_product_type']))
if 'id_product' in data:
q += " and history.id_product={}".format(int(data['id_product']))
if 'id_via' in data:
q += " and history.id_via={}".format(int(data['id_via']))
hasil = self.__runQuery(q)
except:
logging.exception('Terdeteksi melakukan percobaan SQLI')
hasil = ()
return hasil
# SELECT history.id_history, product_type.name, product.name, via.service, via.name, history.amount, history.price, history.seller, history.time
# FROM history
# INNER JOIN product_type on history.id_product_type = product_type.id_product_type
# INNER JOIN product on history.id_product = product.id_product
# INNER JOIN via on history.id_via = via.id_via
# where history.id_store = 2
class Main(DB):
def __init__(self, dbLok='testing.db'):
super().__init__(dbLok)