-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdrift_crdt.dart
411 lines (351 loc) · 13.3 KB
/
drift_crdt.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
/// Flutter implementation for the drift database packages.
///
/// The [CrdtQueryExecutor] class can be used as a drift database
/// implementation based on the `sqflite` package.
library drift_crdt;
import 'dart:async';
import 'dart:io';
import 'package:drift/backends.dart';
import 'package:drift/drift.dart';
import 'package:path/path.dart';
import 'package:sqflite/sqflite.dart' as sqflite;
import 'package:synchroflite/synchroflite.dart';
export 'package:crdt/crdt.dart';
const _crdtDeletedOn = 'CRDT QUERY DELETED ON';
const _crdtDeletedOff = 'CRDT QUERY DELETED OFF';
/// Signature of a function that runs when a database doesn't exist on file.
/// This can be useful to, for instance, load the database from an asset if it
/// doesn't exist.
typedef DatabaseCreator = FutureOr<void> Function(File file);
typedef Query = (String sql, List<Object?> args);
class SqliteTransactionCrdt {
final TransactionSynchroflite txn;
SqliteTransactionCrdt(this.txn);
Future<void> execute(String sql, [List<Object?>? args]) async {
await txn.execute(sql, args);
}
Future<List<Map<String, Object?>>> query(String sql,
[List<Object?>? arguments]) {
return txn.query(sql, arguments);
}
Future<List<Map<String, Object?>>> rawQuery(String sql,
[List<Object?>? arguments]) {
return txn.rawQuery(sql, arguments);
}
Future<int> rawUpdate(String sql, [List<Object?>? arguments]) {
return txn.rawUpdate(sql, arguments);
}
Future<int> rawInsert(String sql, [List<Object?>? arguments]) {
return txn.rawInsert(sql, arguments);
}
}
class _CrdtQueryDelegate extends QueryDelegate {
late final SqliteTransactionCrdt _transactionCrdt;
final bool _queryDeleted;
_CrdtQueryDelegate(this._transactionCrdt, this._queryDeleted);
@override
Future<void> runCustom(String statement, List<Object?> args) {
return _transactionCrdt.execute(statement, args);
}
@override
Future<int> runInsert(String statement, List<Object?> args) {
return _transactionCrdt.rawInsert(statement, args);
}
@override
Future<QueryResult> runSelect(String statement, List<Object?> args) async {
if (_queryDeleted) {
final result = await _transactionCrdt.query(statement, args);
return QueryResult.fromRows(result);
} else {
final result = await _transactionCrdt.rawQuery(statement, args);
return QueryResult.fromRows(result);
}
}
@override
Future<int> runUpdate(String statement, List<Object?> args) {
return _transactionCrdt.rawUpdate(statement, args);
}
}
class _CrdtTransactionDelegate extends SupportedTransactionDelegate {
final _CrdtDelegate api;
bool queryDeleted = false;
_CrdtTransactionDelegate(this.api);
@override
FutureOr<void> startTransaction(Future Function(QueryDelegate) run) {
return api.synchroflite.transaction((txn) async {
return run(_CrdtQueryDelegate(SqliteTransactionCrdt(txn), queryDeleted));
});
}
Future<void> runBatched(BatchedStatements statements) async {
final batch = api.synchroflite.batch();
for (final arg in statements.arguments) {
batch.execute(statements.statements[arg.statementIndex], arg.arguments);
}
await batch.apply(noResult: true);
}
}
class _CrdtDelegateInMemory extends _CrdtDelegate {
_CrdtDelegateInMemory({singleInstance = true, migrate = false, creator})
: super(false, '',
singleInstance: singleInstance, migrate: migrate, creator: creator);
@override
Future<void> open(QueryExecutorUser user) async {
synchroflite = await Synchroflite.openInMemory(
singleInstance: singleInstance,
migrate: migrate,
);
_transactionDelegate = _CrdtTransactionDelegate(this);
_isOpen = true;
}
}
class _CrdtDelegate extends DatabaseDelegate {
late Synchroflite synchroflite;
bool _isOpen = false;
bool _queryDeleted = false;
final bool inDbFolder;
final String path;
final bool migrate;
bool singleInstance;
final DatabaseCreator? creator;
late _CrdtTransactionDelegate? _transactionDelegate;
_CrdtDelegate(this.inDbFolder, this.path,
{this.singleInstance = true, this.creator, this.migrate = false});
@override
late final DbVersionDelegate versionDelegate =
_CrdtVersionDelegate(synchroflite);
@override
TransactionDelegate get transactionDelegate {
final delegate = _transactionDelegate ??= _CrdtTransactionDelegate(this);
delegate.queryDeleted = _queryDeleted;
return delegate;
}
@override
bool get isOpen => _isOpen;
@override
Future<void> open(QueryExecutorUser user) async {
String resolvedPath;
if (inDbFolder) {
resolvedPath = join(await sqflite.getDatabasesPath(), path);
} else {
resolvedPath = path;
}
final file = File(resolvedPath);
if (creator != null && !await file.exists()) {
if (!Directory(dirname(resolvedPath)).existsSync()) {
await Directory(dirname(resolvedPath)).create(recursive: true);
}
await creator!(file);
}
// default value when no migration happened
synchroflite = await Synchroflite.open(
resolvedPath,
singleInstance: singleInstance,
migrate: migrate,
);
_transactionDelegate = _CrdtTransactionDelegate(this);
_isOpen = true;
}
Future<void> openInMemory(QueryExecutorUser user) async {
// default value when no migration happened
synchroflite = await Synchroflite.openInMemory(
singleInstance: singleInstance,
migrate: migrate,
);
_transactionDelegate = _CrdtTransactionDelegate(this);
_isOpen = true;
}
@override
Future<void> close() {
return synchroflite.close();
}
@override
Future<void> runBatched(BatchedStatements statements) async {
final batch = synchroflite.batch();
for (final arg in statements.arguments) {
batch.execute(statements.statements[arg.statementIndex], arg.arguments);
}
await batch.apply(noResult: true);
}
@override
Future<void> runCustom(String statement, List<Object?> args) {
switch (statement) {
case _crdtDeletedOn:
_queryDeleted = true;
break;
case _crdtDeletedOff:
_queryDeleted = false;
break;
default:
return synchroflite.execute(statement, args);
}
return Future.value();
}
@override
Future<int> runInsert(String statement, List<Object?> args) {
return synchroflite.rawInsert(statement, args);
}
@override
Future<QueryResult> runSelect(String statement, List<Object?> args) async {
if (_queryDeleted) {
final result = await synchroflite.query(statement, args);
return QueryResult.fromRows(result);
} else {
final result = await synchroflite.rawQuery(statement, args);
return QueryResult.fromRows(result);
}
}
@override
Future<int> runUpdate(String statement, List<Object?> args) {
return synchroflite.rawUpdate(statement, args);
}
}
class _CrdtVersionDelegate extends DynamicVersionDelegate {
final Synchroflite _db;
_CrdtVersionDelegate(this._db);
@override
Future<int> get schemaVersion async {
final result = await _db.rawQuery('PRAGMA user_version;');
return result.single.values.first as int;
}
@override
Future<void> setSchemaVersion(int version) async {
await _db.execute('PRAGMA user_version = $version;');
}
}
/// A query executor that uses sqflite internally.
class CrdtQueryExecutor extends DelegatedDatabase {
/// A query executor that will store the database in the file declared by
/// [path]. If [logStatements] is true, statements sent to the database will
/// be [print]ed, which can be handy for debugging. The [singleInstance]
/// parameter sets the corresponding parameter on [s.openDatabase].
/// The [creator] will be called when the database file doesn't exist. It can
/// be used to, for instance, populate default data from an asset. Note that
/// migrations might behave differently when populating the database this way.
/// For instance, a database created by an [creator] will not receive the
/// [MigrationStrategy.onCreate] callback because it hasn't been created by
/// drift.
CrdtQueryExecutor(
{required String path,
bool? logStatements,
bool singleInstance = true,
DatabaseCreator? creator,
bool migrate = false})
: super(
_CrdtDelegate(false, path,
singleInstance: singleInstance,
creator: creator,
migrate: migrate),
logStatements: logStatements);
CrdtQueryExecutor.inMemory(
{bool? logStatements,
bool singleInstance = true,
DatabaseCreator? creator,
bool migrate = false})
: super(
_CrdtDelegateInMemory(
singleInstance: singleInstance,
creator: creator,
migrate: migrate),
logStatements: logStatements);
/// A query executor that will store the database in the file declared by
/// [path], which will be resolved relative to [s.getDatabasesPath()].
/// If [logStatements] is true, statements sent to the database will
/// be [print]ed, which can be handy for debugging. The [singleInstance]
/// parameter sets the corresponding parameter on [s.openDatabase].
/// The [creator] will be called when the database file doesn't exist. It can
/// be used to, for instance, populate default data from an asset. Note that
/// migrations might behave differently when populating the database this way.
/// For instance, a database created by an [creator] will not receive the
/// [MigrationStrategy.onCreate] callback because it hasn't been created by
/// drift.
CrdtQueryExecutor.inDatabaseFolder(
{required String path,
bool? logStatements,
bool singleInstance = true,
DatabaseCreator? creator,
bool migrate = false})
: super(
_CrdtDelegate(true, path,
singleInstance: singleInstance,
creator: creator,
migrate: migrate),
logStatements: logStatements);
/// The underlying sqflite [s.Database] object used by drift to send queries.
///
/// Using the sqflite database can cause unexpected behavior in drift. For
/// instance, stream queries won't update for updates sent to the [s.Database]
/// directly. Further, drift assumes full control over the database for its
/// internal connection management.
/// For this reason, projects shouldn't use this getter unless they absolutely
/// need to. The database is exposed to make migrating from sqflite to drift
/// easier.
///
/// Note that this returns null until the drift database has been opened.
/// A drift database is opened lazily when the first query runs.
Synchroflite? get sqfliteDb {
final crdtDelegate = delegate as _CrdtDelegate;
return crdtDelegate.isOpen ? crdtDelegate.synchroflite : null;
}
@override
// We're not really required to be sequential since sqflite has an internal
// lock to bring statements into a sequential order.
// Setting isSequential here helps with cancellations in stream queries
// though.
bool get isSequential => true;
/// Returns the last modified timestamp of the database.
/// [onlyNodeId] only return the last modified timestamp of the given node
/// [exceptNodeId] do not return the last modified timestamp of the given node
Future<Hlc?> getLastModified(
{String? onlyNodeId, String? exceptNodeId}) async {
final crdtDelegate = delegate as _CrdtDelegate;
return crdtDelegate.synchroflite
.getLastModified(onlyNodeId: onlyNodeId, exceptNodeId: exceptNodeId);
}
/// Returns the database changeset according to the given parameters.
/// [customQueries] can be used to add custom queries to the changeset.
/// [onlyTables] only return changes for the given tables
/// [onlyNodeId] only return changes for the given node
/// [exceptNodeId] do not return changes for the given node
/// [modifiedOn] only return changes that were modified on the given timestamp
/// [modifiedAfter] only return changes that were modified after the given timestamp
Future<CrdtChangeset> getChangeset({
Map<String, Query>? customQueries,
Iterable<String>? onlyTables,
String? onlyNodeId,
String? exceptNodeId,
Hlc? modifiedOn,
Hlc? modifiedAfter,
}) async {
final crdtDelegate = delegate as _CrdtDelegate;
return crdtDelegate.synchroflite.getChangeset(
customQueries: customQueries,
onlyTables: onlyTables,
exceptNodeId: exceptNodeId,
modifiedOn: modifiedOn,
modifiedAfter: modifiedAfter);
}
/// merges the provided changeset with the database
Future<void> merge(CrdtChangeset changeset) async {
final crdtDelegate = delegate as _CrdtDelegate;
return crdtDelegate.synchroflite.merge(changeset);
}
}
typedef DelegateCallback<R> = Future<R> Function();
/// Allows access to the deleted records using the Drift API
/// [db] the database executor to query
/// callback the callback to execute, works with transactions too.
Future<R> queryDeleted<T, R>(T db, DelegateCallback<R> callback) async {
if (db is QueryExecutor) {
if (db is CrdtQueryExecutor) {
await db.runCustom(_crdtDeletedOn);
final result = await callback();
await db.runCustom(_crdtDeletedOff);
return result;
} else {
// queryDeleted is a noop for non CrdtQueryExecutor
return await callback();
}
} else {
throw "Database executor is null";
}
}