-
-
Notifications
You must be signed in to change notification settings - Fork 646
/
Copy pathlive-query.ts
157 lines (145 loc) · 5.21 KB
/
live-query.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import { isAsyncFunction, keys, objectIsEmpty } from '../functions/utils';
import {
globalEvents,
DEXIE_STORAGE_MUTATED_EVENT_NAME,
} from '../globals/global-events';
import {
decrementExpectedAwaits,
incrementExpectedAwaits,
newScope,
PSD,
usePSD,
} from '../helpers/promise';
import { ObservabilitySet } from '../public/types/db-events';
import {
Observable as IObservable,
Subscription,
} from '../public/types/observable';
import { Observable } from '../classes/observable/observable';
import { extendObservabilitySet } from './extend-observability-set';
import { rangesOverlap } from '../helpers/rangeset';
import { domDeps } from '../classes/dexie/dexie-dom-dependencies';
import { Transaction } from '../classes/transaction';
import { obsSetsOverlap } from './obs-sets-overlap';
export interface LiveQueryContext {
subscr: ObservabilitySet;
txs: IDBTransaction[];
signal: AbortSignal;
requery: () => void;
trans: null | Transaction;
}
export function liveQuery<T>(querier: () => T | Promise<T>): IObservable<T> {
let hasValue = false;
let currentValue: T;
const observable = new Observable<T>((observer) => {
const scopeFuncIsAsync = isAsyncFunction(querier);
function execute(ctx: LiveQueryContext) {
if (scopeFuncIsAsync) {
incrementExpectedAwaits();
}
const rv = newScope(querier, ctx);
if (scopeFuncIsAsync) {
(rv as Promise<any>).finally(decrementExpectedAwaits);
}
return rv;
}
let closed = false;
let abortController: AbortController;
const txs: IDBTransaction[] = [];
let accumMuts: ObservabilitySet = {};
let currentObs: ObservabilitySet = {};
const subscription: Subscription = {
get closed() {
return closed;
},
unsubscribe: () => {
if (closed) return;
closed = true;
if (abortController) abortController.abort();
if (startedListening) globalEvents.storagemutated.unsubscribe(mutationListener);
txs.forEach(idbtrans => {
//@ts-ignore
idbtrans.aborted = true;
try { idbtrans.abort(); } catch {}});
},
};
observer.start && observer.start(subscription); // https://github.com/tc39/proposal-observable
let startedListening = false;
function shouldNotify() {
return obsSetsOverlap(currentObs, accumMuts);
}
const mutationListener = (parts: ObservabilitySet) => {
extendObservabilitySet(accumMuts, parts);
if (shouldNotify()) {
doQuery();
}
};
const doQuery = () => {
if (
closed || // closed - don't run!
!domDeps.indexedDB) // SSR in sveltekit, nextjs etc
{
return;
}
accumMuts = {};
const subscr: ObservabilitySet = {};
// Abort signal fill three purposes:
// 1. Abort the query if the observable is unsubscribed.
// 2. Abort the query if a new query is made before the previous one has completed.
// 3. For cached queries to know if they should remain in memory or could be enqued for being freed up.
// (they will remain in memory for a short time and if noone needs them again, they will eventually be freed up)
if (abortController) abortController.abort(); // Cancel previous query. Last query will be cancelled on unsubscribe().
abortController = new AbortController();
if (txs.length) {
txs.forEach(idbtrans => {
//@ts-ignore
idbtrans.aborted = true;
try {idbtrans.abort();} catch {}});
txs.length = 0;
}
const ctx: LiveQueryContext = {
subscr,
txs,
signal: abortController.signal,
requery: doQuery,
trans: null // Make the scope transactionless (don't reuse transaction from outer scope of the caller of subscribe())
}
const ret = execute(ctx);
Promise.resolve(ret).then(
(result) => {
hasValue = true;
currentValue = result;
if (closed || ctx.signal.aborted) {
// closed - no subscriber anymore.
// signal.aborted - new query was made before this one completed and
// the querier might have catched AbortError and return successful result.
// If so, we should not rely in that result because we know we have aborted
// this run, which means there's another run going on that will handle accumMuts
// and we must not base currentObs on the half-baked subscr.
return;
}
accumMuts = {};
// Update what we are subscribing for based on this last run:
currentObs = subscr;
if (!objectIsEmpty(currentObs) && !startedListening) {
globalEvents(DEXIE_STORAGE_MUTATED_EVENT_NAME, mutationListener);
startedListening = true;
}
observer.next && observer.next(result);
},
(err) => {
hasValue = false;
if (!['DatabaseClosedError', 'AbortError'].includes(err?.name)) {
if (closed) return;
observer.error && observer.error(err);
}
}
);
};
doQuery();
return subscription;
});
observable.hasValue = () => hasValue;
observable.getValue = () => currentValue;
return observable;
}