-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathworker.rs
161 lines (136 loc) · 5.63 KB
/
worker.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use crate::error::Error;
use crate::sqlite::statement::StatementHandle;
use crossbeam_channel::{unbounded, Sender};
use either::Either;
use futures_channel::oneshot;
use std::sync::{Arc, Weak};
use std::thread;
use crate::sqlite::connection::ConnectionHandleRef;
use libsqlite3_sys::{sqlite3_reset, sqlite3_step, SQLITE_DONE, SQLITE_ROW};
use std::future::Future;
// Each SQLite connection has a dedicated thread.
// TODO: Tweak this so that we can use a thread pool per pool of SQLite3 connections to reduce
// OS resource usage. Low priority because a high concurrent load for SQLite3 is very
// unlikely.
pub(crate) struct StatementWorker {
tx: Sender<StatementWorkerCommand>,
}
enum StatementWorkerCommand {
Step {
statement: Weak<StatementHandle>,
tx: oneshot::Sender<Result<Either<u64, ()>, Error>>,
},
Reset {
statement: Weak<StatementHandle>,
tx: oneshot::Sender<()>,
},
Shutdown {
tx: oneshot::Sender<()>,
},
}
impl StatementWorker {
pub(crate) fn new(conn: ConnectionHandleRef) -> Self {
let (tx, rx) = unbounded();
thread::spawn(move || {
for cmd in rx {
match cmd {
StatementWorkerCommand::Step { statement, tx } => {
let statement = if let Some(statement) = statement.upgrade() {
statement
} else {
// statement is already finalized, the sender shouldn't be expecting a response
continue;
};
// SAFETY: only the `StatementWorker` calls this function
let status = unsafe { sqlite3_step(statement.as_ptr()) };
let result = match status {
SQLITE_ROW => Ok(Either::Right(())),
SQLITE_DONE => Ok(Either::Left(statement.changes())),
_ => Err(statement.last_error().into()),
};
let _ = tx.send(result);
}
StatementWorkerCommand::Reset { statement, tx } => {
if let Some(statement) = statement.upgrade() {
// SAFETY: this must be the only place we call `sqlite3_reset`
unsafe { sqlite3_reset(statement.as_ptr()) };
// `sqlite3_reset()` always returns either `SQLITE_OK`
// or the last error code for the statement,
// which should have already been handled;
// so it's assumed the return value is safe to ignore.
//
// https://www.sqlite.org/c3ref/reset.html
let _ = tx.send(());
}
}
StatementWorkerCommand::Shutdown { tx } => {
// drop the connection reference before sending confirmation
// and ending the command loop
drop(conn);
let _ = tx.send(());
return;
}
}
}
// SAFETY: we need to make sure a strong ref to `conn` always outlives anything in `rx`
drop(conn);
});
Self { tx }
}
pub(crate) async fn step(
&mut self,
statement: &Arc<StatementHandle>,
) -> Result<Either<u64, ()>, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(StatementWorkerCommand::Step {
statement: Arc::downgrade(statement),
tx,
})
.map_err(|_| Error::WorkerCrashed)?;
rx.await.map_err(|_| Error::WorkerCrashed)?
}
/// Send a command to the worker to execute `sqlite3_reset()` next.
///
/// This method is written to execute the sending of the command eagerly so
/// you do not need to await the returned future unless you want to.
///
/// The only error is `WorkerCrashed` as `sqlite3_reset()` returns the last error
/// in the statement execution which should have already been handled from `step()`.
pub(crate) fn reset(
&mut self,
statement: &Arc<StatementHandle>,
) -> impl Future<Output = Result<(), Error>> {
// execute the sending eagerly so we don't need to spawn the future
let (tx, rx) = oneshot::channel();
let send_res = self
.tx
.send(StatementWorkerCommand::Reset {
statement: Arc::downgrade(statement),
tx,
})
.map_err(|_| Error::WorkerCrashed);
async move {
send_res?;
// wait for the response
rx.await.map_err(|_| Error::WorkerCrashed)
}
}
/// Send a command to the worker to shut down the processing thread.
///
/// A `WorkerCrashed` error may be returned if the thread has already stopped.
/// Subsequent calls to `step()`, `reset()`, or this method will fail with
/// `WorkerCrashed`. Ensure that any associated statements are dropped first.
pub(crate) fn shutdown(&mut self) -> impl Future<Output = Result<(), Error>> {
let (tx, rx) = oneshot::channel();
let send_res = self
.tx
.send(StatementWorkerCommand::Shutdown { tx })
.map_err(|_| Error::WorkerCrashed);
async move {
send_res?;
// wait for the response
rx.await.map_err(|_| Error::WorkerCrashed)
}
}
}