forked from scp-cs/translatordb_web
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.py
471 lines (415 loc) · 19.4 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# Builtins
import sqlite3
from datetime import datetime
import typing as t
from collections import namedtuple
from logging import error, warning, critical
import time
from secrets import token_urlsafe
# External
# Internal
from models.user import User
from passwords import pw_check, pw_hash
from models.article import Article
from models.correction import Correction
PAGE_ITEMS = 15
# Scripts
db_create_script = """
CREATE TABLE IF NOT EXISTS UserType (
id INTEGER NOT NULL PRIMARY KEY,
name TEXT NOT NULL UNIQUE
);
INSERT OR IGNORE INTO UserType (id, name) VALUES (1, "translator"), (2, "corrector"), (3, "writer"), (4, "staff");
CREATE TABLE IF NOT EXISTS User (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
nickname TEXT NOT NULL UNIQUE,
wikidot TEXT NOT NULL UNIQUE,
password BLOB DEFAULT NULL,
discord TEXT ,
temp_pw BOOLEAN DEFAULT 1,
display_name TEXT DEFAULT NULL
);
CREATE TABLE IF NOT EXISTS UserHasType (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
iduser INTEGER NOT NULL,
idtype INTEGER NOT NULL,
FOREIGN KEY (iduser) REFERENCES User(id) ON DELETE CASCADE,
FOREIGN KEY (idtype) REFERENCES UserType(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS Article (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
words INTEGER NOT NULL,
bonus INTEGER NOT NULL,
added DATETIME NOT NULL DEFAULT (datetime('now','localtime')),
link TEXT DEFAULT NULL,
idauthor INTEGER NOT NULL,
idcorrector INTEGER DEFAULT NULL,
corrected DATETIME DEFAULT NULL,
is_original BOOLEAN NOT NULL DEFAULT FALSE,
FOREIGN KEY (idauthor) REFERENCES User(id) ON DELETE CASCADE,
FOREIGN KEY (idcorrector) REFERENCES User(id)
);
CREATE TABLE IF NOT EXISTS Note (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
idauthor INTEGER NOT NULL,
FOREIGN KEY (idauthor) REFERENCES User(id) ON DELETE CASCADE
);
CREATE VIEW IF NOT EXISTS Frontpage AS
SELECT User.id AS id, User.nickname AS nickname, User.discord AS discord, User.wikidot AS wikidot, User.display_name as display,
SUM(CASE WHEN Article.is_original=FALSE THEN 1 ELSE 0 END) AS translation_count,
(SUM(CASE WHEN Article.is_original=FALSE THEN Article.words ELSE 0 END)/1000.0)+TOTAL(Article.bonus) AS points,
(SELECT COUNT(article_id) FROM Correction WHERE Corrector=User.id) AS correction_count,
SUM(CASE WHEN Article.is_original=TRUE THEN 1 ELSE 0 END) AS original_count
FROM user
LEFT JOIN Article
ON User.id = Article.idauthor
GROUP BY User.id;
CREATE VIEW IF NOT EXISTS Series AS
SELECT (SUBSTR(name, 5)/1000)+1 AS series, COUNT(id) AS articles, SUM(words) AS words
FROM Article
WHERE (name
LIKE 'SCP-___' OR name LIKE 'SCP-____') AND is_original=FALSE
GROUP BY SERIES
UNION
SELECT 999 AS series, COUNT(id) AS articles, SUM(words) AS words
FROM Article
WHERE name
NOT LIKE 'SCP-___' AND name NOT LIKE 'SCP-____' AND is_original=FALSE;
CREATE VIEW IF NOT EXISTS Statistics AS
SELECT SUM(t.words) AS total_words, COUNT(t.id) AS total_articles, (SELECT COUNT(id) FROM user) AS total_users
FROM Article AS t WHERE t.is_original=FALSE;
CREATE VIEW IF NOT EXISTS Correction AS
SELECT id as article_id, idauthor AS author, idcorrector AS corrector, corrected AS timestamp, words, name
FROM Article WHERE idcorrector IS NOT NULL;
"""
StatRow = namedtuple('StatRow', "id nickname discord wikidot display count points corrections originals")
SeriesRow = namedtuple('SeriesRow', "series articles words")
StatisticsRow = namedtuple('StatisticsRow', "total_words total_articles total_users")
Counts = namedtuple('Counts', 'translations corrections originals')
class Database():
def __init__(self, filepath: str = "data/scp.db") -> None:
try:
self.connection = sqlite3.connect(filepath, check_same_thread=False)
self.__tryexec(db_create_script, script=True)
# self.connection.execute('PRAGMA journal_mode=wal') # Enable write-ahead logging
self.connection.execute('PRAGMA foreign_keys=1') # Enable SQLite foreign keys
except Exception as e:
critical(f'Error opening database {filepath} ({str(e)})')
raise RuntimeError(str(e))
self.__mark_updated()
def __tryexec(self, query: str, data: t.Tuple = (), script=False) -> sqlite3.Cursor:
try:
with self.connection as con:
cursor = con.cursor()
return cursor.executescript(query) if script else cursor.execute(query, data)
except sqlite3.Error as e:
error(f'Database query "{query}" aborted with error: {str(e)}')
def __mark_updated(self) -> None:
query = "SELECT MAX(added) FROM Article"
try:
self.__lastupdate = datetime.strptime(self.__tryexec(query).fetchone()[0], "%Y-%m-%d %H:%M:%S")
except TypeError:
warning(f"Unable to get last update timestamp")
self.__lastupdate = datetime(2005, 1, 1)
def __make_article(self, row) -> Article:
return Article(
row[0], # id
row[1], # title
row[2], # word count
row[3], # bonus points
datetime.strptime(row[4], '%Y-%m-%d %H:%M:%S'), # date added
self.get_user(row[6]), # author
self.get_user(row[7]), # corrector
datetime.strptime(row[9], '%Y-%m-%d %H:%M:%S') if row[9] else None, # date corrected
row[5], # link
row[8]) # original or translation
@property
def lastupdated(self) -> datetime:
return self.__lastupdate
def get_stats(self, sort='points', page=0):
match sort:
case 'az':
sorter = 'ORDER BY nickname COLLATE NOCASE ASC'
case 'points':
sorter = 'ORDER BY points DESC'
case 'count':
sorter = 'ORDER BY translation_count DESC'
case 'corrections':
sorter = 'ORDER BY correction_count DESC'
case 'originals':
sorter = 'ORDER BY original_count DESC'
case _:
sorter = 'ORDER BY nickname COLLATE NOCASE ASC'
data = self.__tryexec("SELECT * FROM Frontpage " + sorter + " LIMIT ? OFFSET ?", (PAGE_ITEMS, PAGE_ITEMS*page)).fetchall()
return [StatRow(*row) for row in data]
def get_article_counts(self, uid: int) -> Counts:
translations = self.__tryexec("SELECT COUNT(id) FROM Article WHERE idauthor=? AND is_original=FALSE", (uid,)).fetchone()
corrections = self.__tryexec("SELECT COUNT(article_id) FROM Correction WHERE corrector=?", (uid,)).fetchone()
originals = self.__tryexec("SELECT COUNT(id) FROM Article WHERE idauthor=? AND is_original=TRUE", (uid,)).fetchone()
return Counts(translations, corrections, originals)
def get_user_count(self) -> int:
count = self.__tryexec("SELECT COUNT(id) FROM Frontpage").fetchone()[0]
return count
def update_password(self, uid: int, new_pw: bytes):
query = "UPDATE User SET password=?, temp_pw=0 WHERE id=?"
data = (new_pw, uid)
self.__tryexec(query, data)
def get_user(self, uid: int) -> t.Optional[User]:
query = "SELECT * FROM User WHERE id=?"
data = (uid,)
row = self.__tryexec(query, data).fetchone()
if row is None:
return None
return User(*row)
def get_user_by_discord(self, dscid: int) -> t.Optional[User]:
query = "SELECT * FROM User WHERE discord=?"
data = (dscid,)
row = self.__tryexec(query, data).fetchone()
if row is None:
return None
return User(*row)
def get_user_by_wikidot(self, wdid: str) -> t.Optional[User]:
query = "SELECT * FROM User WHERE wikidot=? COLLATE NOCASE"
data = (wdid,)
row = self.__tryexec(query, data).fetchone()
if row is None:
return None
return User(*row)
def user_exists(self, username: str) -> bool:
query = "SELECT * FROM User WHERE nickname=?"
data = (username,)
if not self.__tryexec(query, data).fetchone():
return False
else:
return True
def get_user_stats(self, uid: int) -> t.Type[StatRow]:
return StatRow(*self.__tryexec("SELECT * FROM Frontpage WHERE id=?", (uid,)).fetchone())
def verify_login(self, username: str, password: str) -> t.Optional[int]:
query = "SELECT id, nickname, password, temp_pw FROM User WHERE nickname=?"
cursor = self.__tryexec(query, (username,))
row = cursor.fetchone()
if not row or not row[2]:
return None
if not pw_check(password, row[2]):
return None
return row[0]
def delete_user(self, uid: int) -> None:
queries = [ # TODO: Maybe we shouldn't delete the articles with the user
"DELETE FROM Article WHERE idauthor=?",
"DELETE FROM Note WHERE idauthor=?",
"DELETE FROM User WHERE id=?"]
for query in queries: # No cascade delete because I'm dumb
self.__tryexec(query, (uid, ))
def delete_article(self, aid: int) -> None:
query = "DELETE FROM Article WHERE id=?"
self.__tryexec(query, (aid, ))
def users(self) -> t.List[User]:
"""
Fetches all users from the database
"""
query = "SELECT * FROM User"
rows = self.__tryexec(query).fetchall()
return [User(*row) for row in rows]
def get_article(self, tid: int) -> t.Optional[Article]:
query = "SELECT * FROM Article WHERE id=?"
data = (tid,)
row = self.__tryexec(query, data).fetchone()
if row is None:
return None
return self.__make_article(row)
def update_article(self, t: Article) -> None:
query = "UPDATE Article SET name=?, words=?, bonus=?, link=? WHERE id=?"
data = (t.name, t.words, t.bonus, t.link, t.id)
self.__tryexec(query, data)
def assign_corrector(self, article: Article, user: User):
query = "UPDATE Article SET idcorrector=?, corrected=? WHERE id=?"
data = (user.uid, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), article.id)
self.__tryexec(query, data)
def unassign_corrector(self, article: Article):
query = "UPDATE Article SET idcorrector=NULL, corrected=NULL WHERE id=?"
data = (article.id,)
self.__tryexec(query, data)
def update_user(self, u: User) -> None:
query = "UPDATE User SET nickname=?, wikidot=?, discord=?, password=?, display_name=? WHERE id=?"
data = (u.nickname, u.wikidot, u.discord, u.password, u.display_name, u.uid)
self.__tryexec(query, data)
def rename_article(self, name: str, new_name: str):
... # TODO: We need to update the link too
def translation_exists(self, name: str) -> bool:
query = "SELECT * FROM Article WHERE name=? COLLATE NOCASE"
cursor = self.__tryexec(query, (name.lower(),))
if len(cursor.fetchall()) != 0:
return True
else:
return False
def get_translations_by_user(self, uid: int, sort='latest', page=0) -> t.Optional[list[Article]]:
match sort:
case 'az':
sorter = 'ORDER BY name COLLATE NOCASE ASC'
case 'latest':
sorter = 'ORDER BY added DESC, id DESC'
case 'words':
sorter = 'ORDER BY words DESC'
case _:
sorter = 'ORDER BY name COLLATE NOCASE ASC'
query = "SELECT * FROM Article WHERE idauthor=? AND is_original=FALSE " + sorter + " LIMIT ? OFFSET ?"
data = (uid, PAGE_ITEMS, page*PAGE_ITEMS)
rows = self.__tryexec(query, data).fetchall()
if rows is None:
return []
return [self.__make_article(row) for row in rows]
def get_originals_by_user(self, uid: int, sort='latest', page=0) -> t.Optional[list[Article]]:
match sort:
case 'az':
sorter = 'ORDER BY name COLLATE NOCASE ASC'
case 'latest':
sorter = 'ORDER BY added DESC, id DESC'
case _:
sorter = 'ORDER BY name COLLATE NOCASE ASC'
query = "SELECT * FROM Article WHERE idauthor=? AND is_original=TRUE " + sorter + " LIMIT ? OFFSET ?"
data = (uid, PAGE_ITEMS, page*PAGE_ITEMS)
rows = self.__tryexec(query, data).fetchall()
if rows is None:
return []
return [self.__make_article(row) for row in rows]
def get_article_by_link(self, link: str):
query = "SELECT * FROM Article WHERE link=?"
data = (link,)
row = self.__tryexec(query, data).fetchone()
return None if not row else self.__make_article(row)
def get_user_point_count(self, uid: int) -> int:
row = self.__tryexec("SELECT points FROM Frontpage WHERE id=?", (uid,)).fetchone()
return row[0] if row else 0
def get_series_info(self, sid: int = 0) -> list[SeriesRow] | SeriesRow:
query = "SELECT * FROM Series" if not sid else "SELECT * FROM SERIES WHERE series=?"
data = self.__tryexec(query, () if not sid else (sid,)).fetchall()
return [SeriesRow(*d) for d in data] if not sid else SeriesRow(*data[0])
def get_global_stats(self) -> StatisticsRow:
query = "SELECT * FROM Statistics"
data = self.__tryexec(query).fetchone()
return StatisticsRow(*data)
def get_last_article(self, uid: int, original: bool = False) -> t.Optional[Article]:
query = "SELECT * FROM Article WHERE idauthor=? AND is_original=? ORDER BY added DESC, id DESC LIMIT 1"
data = (uid, original)
row = self.__tryexec(query, data).fetchone()
if not row:
return None
return self.__make_article(row)
def add_article(self, a: Article) -> int:
query = "INSERT INTO Article (name, words, bonus, added, link, idauthor, is_original) VALUES (?, ?, ?, ?, ?, ?, ?)"
data = (a.name, a.words, a.bonus, a.added.strftime('%Y-%m-%d %H:%M:%S'), a.link, a.author.get_id(), a.is_original)
rowid = self.__tryexec(query, data).lastrowid
self.__mark_updated()
return rowid
# TODO: Generate tpw in controller
def add_user(self, u: User, gen_password=False) -> t.Tuple[int, t.Optional[str]]:
query = "INSERT INTO User (nickname, wikidot, password, discord) VALUES (?, ?, ?, ?)"
if gen_password:
temp_password = token_urlsafe(8)
password = pw_hash(temp_password)
else:
temp_password = None
password = u.password
return (self.__tryexec(query, (u.nickname, u.wikidot, password, u.discord)).lastrowid, temp_password)
def search_user(self, param: str) -> t.List[dict]:
query = "SELECT * FROM Frontpage WHERE nickname LIKE :param OR wikidot LIKE :param OR display LIKE :param OR discord=:param"
results = self.__tryexec(query, {'param': f'%{param}%'}).fetchall()
if not results:
return list()
return [{
'id': result[0],
'nickname': result[1],
'discord': result[2],
'wikidot': result[3],
'displayname': result[4],
'tr_count': result[5],
'points': result[6]
} for result in results]
def search_article(self, param: str) -> t.List[Article]: # TODO: Add parameter for translation/original
query = "SELECT * FROM Article WHERE name LIKE :param OR link LIKE :link"
results = self.__tryexec(query, {'param': f'%{param}%', 'link': f"%.wikidot.com/%{param}%"}).fetchall()
search_result = []
user_cache = {}
for result in results:
if result[6] not in user_cache: # Ugly but saves us a lot of queries
author = user_cache[result[6]] = self.get_user(result[6])
else:
author = user_cache[result[6]]
if not author: # Ideally, this shouldn't happen. In practice I forgot to enable foreign keys initially so it's possible
continue
if result[7]:
if result[7] not in user_cache:
corrector = user_cache[result[7]] = self.get_user(result[7])
else:
corrector = user_cache[result[7]]
else:
corrector = None
search_result.append({
'id': result[0],
'name': result[1],
'link': result[5],
'words': result[2],
'author': {
'id': author.uid,
'name': author.display_name or author.nickname
},
'corrector': {
'id': corrector.uid if corrector else 0,
'name': (corrector.display_name or corrector.nickname) if corrector else 'N/A'
}
})
return search_result
def search_article_by_user(self, param: str, uid: int):
query = "SELECT * FROM Article WHERE (name LIKE :param OR link LIKE :link) AND idauthor=:uid"
results = self.__tryexec(query, {'param': f'%{param}%', 'link': f"%.wikidot.com/%{param}%", 'uid': uid}).fetchall()
user_cache = {}
search_result = []
for result in results:
if result[7]:
if result[7] not in user_cache:
corrector = user_cache[result[7]] = self.get_user(result[7])
else:
corrector = user_cache[result[7]]
else:
corrector = None
search_result.append({
'id': result[0],
'name': result[1],
'link': result[5],
'words': result[2],
'bonus': result[3],
'added': result[4],
'corrector': {
'id': corrector.uid if corrector else 0,
'name': (corrector.display_name or corrector.nickname) if corrector else 'N/A'
}
})
return search_result
def get_corrections_by_user(self, user_id: int, page=0, sort='latest') -> t.Optional[t.List[Correction]]:
match sort:
case 'az':
sorter = 'ORDER BY name COLLATE NOCASE ASC'
case 'latest':
sorter = 'ORDER BY timestamp DESC, article_id DESC'
case 'words':
sorter = 'ORDER BY words DESC'
case _:
sorter = 'ORDER BY name COLLATE NOCASE ASC'
user_cache = {}
result = []
corrector = self.get_user(user_id)
corrections = self.__tryexec("SELECT * FROM Correction WHERE corrector=? " + sorter + " LIMIT ? OFFSET ?", (user_id, PAGE_ITEMS, PAGE_ITEMS*page)).fetchall()
if not corrections:
return []
for correction in corrections:
article = self.get_article(correction[0])
if correction[1] in user_cache:
author = user_cache[correction[1]]
else:
author = self.get_user(correction[1])
user_cache[correction[1]] = author
result.append(Correction(article, author, corrector, datetime.strptime(correction[3], '%Y-%m-%d %H:%M:%S'), correction[4]))
return result