This repository was archived by the owner on Oct 10, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweb.py
358 lines (297 loc) · 11.5 KB
/
web.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
from flask import Flask, request, session, flash, redirect, url_for, jsonify, render_template
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import re
import os
import json
import sql_class
from selenium_class import Selenium
import pandas as pd
from datetime import datetime
import Spider
from data_process import data_process
from LSTM import LSTM_predict
app = Flask(__name__)
# 测试用,实际使用需要更改主机与端口号
# host = '172.17.151.119' # 校园网
host = '192.168.43.135' # 热点
port = '8000'
# # 配置每周更新cookies
# weekly_trigger = CronTrigger(day_of_week="mon", hour=0, minute=0) # 每周一的00:00执行
# scheduler = BackgroundScheduler()
# scheduler.add_job(Selenium.refresh_cookies(), 'cron', trigger=weekly_trigger)
# scheduler.start()
# 初始化数据库表
def initialize_database():
os.remove('accounts.db')
os.remove('users.db')
os.remove('queries.db')
os.remove('train_result.db')
os.remove('predict_result.db')
sql_class.accounts_db_init()
sql_class.users_db_init()
sql_class.queries_db_init()
sql_class.train_result_db_init()
sql_class.predict_result_db_init()
# flask-login配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = '请先登录以访问此页面。'
login_manager.login_message_category = 'Access denied.'
app.config['SECRET_KEY'] = os.urandom(16).hex()
# 用户模型
class User(UserMixin):
db_path = 'users.db'
def __init__(self, uid, username, password, email):
self.uid = uid
self.username = username
self.password = password
self.email = email
@classmethod
def get_by_id(cls, uid):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT * FROM users WHERE uid = " + str(uid)
user_data = db.query_data(query_sql)
db.close_connection()
if user_data:
return cls(*user_data[0])
return None
# 通过用户名查找用户
@classmethod
def get_by_username(cls, username):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT * FROM users WHERE username = " + \
f'\'{str(username)}\''
user_data = db.query_data(query_sql)
db.close_connection()
if user_data:
return cls(*user_data[0])
return None
# 通过邮箱查找用户
@classmethod
def get_by_email(cls, email):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT * FROM users WHERE email = " + \
f'\'{str(email)}\''
user_data = db.query_data(query_sql)
db.close_connection()
if user_data:
return cls(*user_data[0])
return None
@classmethod
def create_user(cls, username, password, email):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT MAX(uid) FROM users"
max_uid = db.query_data(query_sql)[0][0]
uid = max_uid+1
insert_sql = "INSERT INTO users VALUES (?, ?, ?, ?)"
db.insert_data(insert_sql, (uid, username, password, email))
db.close_connection()
return cls(uid, username, password, email)
def get_id(self):
return str(self.uid)
# 用户加载回调
@login_manager.user_loader
def load_user(uid):
return User.get_by_id(uid)
# @login_manager.request_loader
# def request_loader(request):
# pass
# 登录
@app.route('/login', methods=['POST'])
def login():
username = request.form['username']
password = request.form['password']
user = User.get_by_username(username=username)
if user and user.password == password:
login_user(user)
return redirect(url_for('index')) # 登录成功后重定向
else:
session['error_msg'] = '用户名或密码错误'
return redirect(url_for('error'))
# 注册
@app.route('/register', methods=['POST'])
def register():
username = request.form['username']
email = request.form['email']
password = request.form['password']
confirm_password = request.form['confirm_password']
if password != confirm_password:
session['error_msg'] = '两次密码不匹配'
return redirect(url_for('error'))
user1 = User.get_by_username(username=username)
user2 = User.get_by_email(email=email)
if user1 or user2:
session['error_msg'] = '用户名或邮箱已被使用'
return redirect(url_for('error'))
# 创建新用户
new_user = User.create_user(username, password, email)
# 注册完成直接登录
login_user(new_user)
return redirect(url_for('index'))
# 登出
@app.route('/logout')
@login_required # 确保只有登录用户才能访问此视图
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/error')
def error():
# 从session中取出并删除msg,防止重放
msg = session.pop('error_msg', '未知错误')
return render_template('error.html', msg=msg)
@app.route('/')
@app.route('/index')
def index():
return render_template('index.html')
@app.route('/login_page')
def login_page():
return render_template('login.html')
@app.route('/register_page')
def register_page():
return render_template('register.html')
@app.route('/history')
# @login_required
def history():
if not current_user.is_authenticated:
session['error_msg'] = '请先登录'
return redirect(url_for('error'))
# info_list = [
# {
# 'uid': 1,
# 'qid': 1,
# 'goods_id': '4979408',
# 'query_date': '2024-03-28',
# 'predict_days': 30,
# },
# ]
uid = current_user.get_id()
db = sql_class.SQLiteTool('queries.db')
query_sql = f"SELECT * FROM queries WHERE uid = {uid}"
res = db.query_data(query_sql)
db.close_connection()
info_list = []
for row in res:
data_dict = {
"qid": row[0],
"uid": row[1],
"goods_id": row[2],
"query_date": row[3],
"predict_days": row[4]
}
info_list.append(data_dict)
# 转换为JSON字符串
# info_list = json.dumps(info_list, ensure_ascii=False, indent=4)
return render_template('history.html', info_list=info_list)
@app.route('/predict_history', methods=['POST'])
# @login_required
def predict_history():
if not current_user.is_authenticated:
session['error_msg'] = '请先登录'
return redirect(url_for('error'))
qid = request.form['rowSelection']
# 取出训练结果
db = sql_class.SQLiteTool('train_result.db')
query_sql = f"SELECT date, actual_price, predict_price FROM train_result WHERE qid = {qid}"
# res = db.query_data(query_sql)
df1 = pd.read_sql_query(query_sql, db.conn)
db.close_connection()
# 取出预测结果
db = sql_class.SQLiteTool('predict_result.db')
query_sql = f"SELECT date, predict_price FROM predict_result WHERE qid = {qid}"
# res = db.query_data(query_sql)
df2 = pd.read_sql_query(query_sql, db.conn)
db.close_connection()
return predict_result(df1, df2)
# 搜索功能 接收关键词返回搜索结果
@app.route('/search', methods=['POST'])
def search():
# 使用get方法获取'keywords'字段的值,并去除首尾空白
keywords_input = request.form.get('keywords', '').strip()
keywords_list = keywords_input.split() # 使用split方法按空格分割字符串,得到关键词列表
info_list = Spider.get_goods_info_jd(keywords_list)
# print(info_list)
return render_template('search_result.html', info_list=info_list)
# TEST:
@app.route('/predict', methods=['POST'])
def predict():
selected_goods_id = request.form['rowSelection'] # 获取商品id
predict_days = request.form['Days'] # 获取预测天数
print(f"goods id: {selected_goods_id}")
print(f"predict days: {predict_days}")
if current_user.is_authenticated:
# TODO:存入queries数据库
uid = current_user.get_id()
db = sql_class.SQLiteTool('queries.db')
query_sql = "SELECT MAX(qid) FROM queries"
max_qid = db.query_data(query_sql)[0][0]
qid = max_qid+1
insert_sql = "INSERT INTO queries VALUES (?, ?, ?, ?, ?)"
current_date = datetime.now().strftime('%Y-%m-%d')
db.insert_data(
insert_sql, (qid, uid, selected_goods_id, current_date, predict_days))
db.close_connection()
else:
qid = None
# if selected_goods_id:
# print(f"Selected goods id: {selected_goods_id}")
# print(f"predict days: {predict_days}")
# # 读取CSV文件
# df = pd.read_csv('data/processed_data/4979408.csv')
# # 将日期列转换为字符串格式,便于在HTML中直接使用
# df['Date'] = pd.to_datetime(df['Date']).dt.strftime('%Y-%m-%d')
# # 准备数据为JSON格式,但这里直接传递DataFrame给模板更直观
# return render_template('predict_result.html', data=df.to_dict(orient='records'))
# else:
# print("No row was selected.")
# 获取原始数据
price_list = Spider.get_history_price(selected_goods_id)
processed_data = data_process(price_list) # 数据处理
train_result, future_predictions = LSTM_predict(
processed_data, int(predict_days), qid) # LSTM进行时间序列预测
return predict_result(train_result, future_predictions)
def predict_result(df1, df2):
# 将日期列转换为字符串格式,便于在HTML中直接使用
# df1['date'] = pd.to_datetime(df1['date']).dt.strftime('%Y-%m-%d')
df2['date'] = pd.to_datetime(df2['date']).dt.strftime('%Y-%m-%d')
# 准备数据为JSON格式,但这里直接传递DataFrame给模板更直观
return render_template('predict_result.html', data1=df1.to_dict(orient='records'), data2=df2.to_dict(orient='records'))
# 获取SmsForwarder转发的验证码
@app.route('/receive_sms', methods=['POST'])
def receive_sms():
# 获取并解析请求中的数据
data = request.get_json()
# print(data)
message = data.get('msg')
print(message)
if message:
# 处理短信内容,通过正则匹配6位数字验证码
pattern = r"【京东】(\d{6})|【购物党】.*(\d{4})"
# pattern = r"【购物党】.*(\d{4})"
# pattern_gwd = r""
match = re.search(pattern, message)
verification_code = None
if match.group(1): # 检查6位数的捕获组
verification_code = match.group(1)
else: # 如果6位数的捕获组未匹配到,则返回4位数的捕获组
verification_code = match.group(2)
# verification_code = match.group(1)
# 正则匹配接收信息的号码
pattern = r"\+86(\d{11})"
match = re.search(pattern, message)
mobile_number = match.group(1)
print(f'接收号码:{mobile_number}')
print(f'验证码:{verification_code}')
db = sql_class.SQLiteTool('accounts.db')
update_sql = "UPDATE accounts SET verification_code = ? WHERE mobile_number = ?"
db.update_data(
update_sql, (verification_code, mobile_number))
return jsonify({"status": "success", "message": "SMS received"}), 200
else:
return jsonify({"status": "error", "message": "No message found in the request"}), 400
if __name__ == '__main__':
# Selenium.refresh_cookies()
initialize_database()
app.run(port=port, host=host, debug=True)