-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmyanki.py
executable file
·181 lines (144 loc) · 5.77 KB
/
myanki.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/python3
# List all anki notes in database
# Testing: run as ./myanki.py <.anki2 or .apkg filename>
import os
import sys
import time
import shutil
import sqlite3
import zipfile
import hashlib
import platform
def eprint(*args, **kargs):
args = list(args)
if args:
args[0] = '\t' + str(args[0])
print(*args, file=sys.stderr, **kargs)
def sanitize(string, bits=128):
"Turn a string into a valid filename."
return hashlib.sha512(string.encode()).hexdigest()[:(bits//4)]
def read_database(filename, retries=3):
"Read notes with read only connection"
for tri in range(retries + 1):
if tri:
eprint("Retrying", tri, 'of', str(retries) + '...')
time.sleep(1 + tri * 2**0.5)
eprint("Connecting to database:", filename)
con = sqlite3.connect('file:' + filename + '?mode=ro', timeout=1, uri=True)
cur = con.cursor()
try:
tables = cur.execute("SELECT name FROM sqlite_master WHERE type = 'table'").fetchall()
tables = [x[0] for x in tables]
except sqlite3.OperationalError as err:
con.close()
eprint("Error:", err)
continue
# Database structure: https://github.com/ankidroid/Anki-Android/wiki/Database-Structure
notes = cur.execute("SELECT id, mod, flds, tags, mid FROM notes").fetchall()
# Associate notes with decks
cards = cur.execute("SELECT nid, id, did, queue, ord FROM cards").fetchall()
# Associate deck names with ids
if 'decks' in tables:
decks = cur.execute("SELECT id, name FROM decks").fetchall()
else:
eprint("No deck table in database.")
decks = []
break
else:
eprint("\nFailed to connect to database.")
eprint("Make sure the filename is correct or try closing anki before proceeding.")
if platform.system() == 'Darwin':
eprint("If your Mac doesn't let you access the database, try selecting all of the notes in Anki and Right click->Notes->Export Note. Check the box to support older versions. Save as a .apkg file and open that instead with --anki (your location).apkg")
return None, None, None
con.close()
return notes, cards, decks
def getnotes(filename):
'''Fetch all data from anki filename and return dictionary'''
if not os.path.exists(filename):
eprint("Can't find filename:", filename)
sys.exit(1)
ext = os.path.splitext(filename.lower())[-1]
if ext == '.apkg':
try:
tmp = zipfile.ZipFile(filename).extract('collection.anki21', 'cache')
except KeyError:
eprint("Try using the option to 'Support older versions of Anki' when exporting")
sys.exit(1)
notes, cards, decks = read_database(tmp)
os.remove(tmp)
else:
# Read cards and/or make/read a cached copy of database
cached = sanitize(os.path.abspath(filename)) + ext
cached = os.path.join('cache', cached)
notes, cards, decks = read_database(filename, retries=0)
if notes:
# Make a cached copy if anki database is readable and more than x seconds newer than cached
if not os.path.exists(cached) or os.path.getmtime(filename) - os.path.getmtime(cached) >= 60:
eprint("Make cache of database in:", cached)
shutil.copy(filename, cached)
else:
if os.path.exists(cached):
eprint("Using cached anki database:", cached)
notes, cards, decks = read_database(cached, retries=0)
time.sleep(1)
else:
sys.exit(1)
# Change cards into dictionary of note id -> deck id
# nid = note id
# id = card id
# did = deck id
# queue = queue (suspended, new...)
cards_src = cards
cards = dict()
for nid, id, did, queue, ord in cards_src:
if nid not in cards:
cards[nid] = dict()
cards[nid][id] = [did, queue, ord]
# Get deck names
decks = {id:name.split('\x1f') for id, name in decks}
# Combine everything into a dictionary
col = dict()
for nid, mod, flds, tags, mid in notes:
# process fld to split into question, answer and minor txt processing
out = dict()
line = flds.split('\x1f')
out['question'] = line[0]
out['ans'] = line[1]
out['fields'] = line
out['mid'] = mid
# eprint(line[0])
# Look at cards
out['decks'] = set()
out['queues'] = list()
out['cards'] = dict()
# todo change this into a list ordered by ord
for card, (did, que, order) in cards[nid].items():
ci = dict()
ci['order'] = order
ci['deck_id'] = did
ci['deck'] = '::'.join(decks.get(did, ['unknown_deck',]))
out['decks'].add(ci['deck'])
ci['queue'] = que
out['queues'].append(que)
out['cards'][card] = ci
if order == 0:
out['deck'] = '::'.join(decks.get(did, ['unknown_deck',]))
out['queue'] = que
'''
out['deck_id'] = list(set([did for did, que in ci.values()])) # List of decks
out['cards'] = {card:que for card, (did, que) in ci.items()} # card to que
out['decks'] = [decks[did] for did in out['deck_id']]
out['queue'] = list(out['cards'].values())
'''
out['mod'] = mod
out['tags'] = tags.strip().split()
col[nid] = out # nid is id of the note
# eprint(out,'\n\n')
return col
def _tester():
notes = getnotes(sys.argv[1]).items()
for question, ans in notes:
eprint(question, ans, '\n', sep='\t')
eprint("Found", len(notes), 'notes')
if __name__ == "__main__":
_tester()